1 Commits

Author SHA1 Message Date
7d3021efbf Remove tracked generated/data files and fix map link on status page
- Remove byty_*.json, mapa_bytu.html, .DS_Store and settings.local.json from git tracking
  (already in .gitignore, files kept locally)
- Fix "Otevřít mapu" link on scraper status page: / → /mapa_bytu.html

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 18:50:16 +01:00
14 changed files with 277 additions and 1085 deletions

124
CLAUDE.md
View File

@@ -1,124 +0,0 @@
# Maru hledá byt
Projekt pro hledání bytů v Praze. Scrapuje inzeráty ze 7 realitních portálů, filtruje, deduplikuje a generuje interaktivní mapu.
**Jazyk komunikace:** Čeština (uživatelka Marie). Kód a komentáře v kódu jsou mix CZ/EN.
## Architektura
```
run_all.sh (orchestrátor)
├─ scrape_and_map.py → byty_sreality.json (Sreality API)
├─ scrape_bezrealitky.py → byty_bezrealitky.json (HTML Apollo cache)
├─ scrape_idnes.py → byty_idnes.json (HTML regex)
├─ scrape_psn.py } → byty_psn.json (React API + curl)
├─ scrape_cityhome.py } → byty_cityhome.json (HTML tabulky)
├─ scrape_bazos.py → byty_bazos.json (HTML regex)
└─ scrape_realingo.py → byty_realingo.json (Next.js __NEXT_DATA__)
merge_and_map.py
├─ byty_merged.json (deduplikovaná data)
└─ mapa_bytu.html (Leaflet.js mapa)
generate_status.py → status.json + scraper_history.json
server.py (port 8080) → servíruje mapu + status page + ratings API
```
## Filtry (společné všem scraperům)
| Parametr | Hodnota | Poznámka |
|----------|---------|----------|
| Max cena | 13.5M Kč (Sreality/Realingo/Bezrealitky/iDNES), 14M Kč (PSN/CityHome/Bazoš) | Rozdíl je záměrný |
| Min plocha | 69 m² | |
| Min patro | 2. NP | 2. NP se na mapě označí varováním |
| Dispozice | 3+kk, 3+1, 4+kk, 4+1, 5+kk, 5+1, 6+ | |
| Region | Praha | |
| Vyloučit | panelové domy, sídliště | regex v popisu/polích |
## Klíčové soubory
- **scrape_and_map.py** — Sreality scraper + `generate_map()` funkce (sdílená, generuje HTML mapu)
- **merge_and_map.py** — sloučí 7 JSON zdrojů, deduplikuje (klíč: ulice + cena + plocha), volá `generate_map()`
- **scraper_stats.py** — utility: `validate_listing()` (validace povinných polí + GPS bounds) a `write_stats()`
- **generate_status.py** — generuje status.json a scraper_history.json z výstupů scraperů
- **server.py** — HTTP server (port 8080), endpointy: `/mapa_bytu.html`, `/scrapers-status`, `/api/ratings`, `/api/status`
- **run_all.sh** — orchestrátor, spouští scrapery postupně (PSN+CityHome paralelně), pak merge + status
## Mapa (mapa_bytu.html)
- Leaflet.js + CARTO tiles
- Barvy markerů podle ceny/m² (modrá < 110k → červená > 165k, šedá = neuvedeno)
- PSN/CityHome = srdíčkové markery (❤️)
- Nové inzeráty (≤ 1 den) = žlutý badge "NEW"
- Zamítnuté = zprůhledněné + 🚫 SVG overlay
- Oblíbené = hvězdička (⭐)
- Filtry: patro, max cena (input, default 13.5M, max 14M), datum přidání, skrýt zamítnuté, klik na cenový pás
- Ratings uložené v localStorage + sync na server `/api/ratings`
## Barvy zdrojů na mapě
```python
source_colors = {
"sreality": "#1976D2", # modrá
"realingo": "#00897B", # teal
"bezrealitky": "#E91E63", # růžová
"idnes": "#FF6F00", # oranžová
"psn": "#D32F2F", # červená
"cityhome": "#D32F2F", # červená
"bazos": "#7B1FA2", # fialová
}
```
## Deduplikace (merge_and_map.py)
- Klíč: `normalize_street(locality) + price + area`
- Normalizace ulice: první část před čárkou, lowercase, odstranění diakritiky, jen alfanumerické znaky
- PSN a CityHome mají prioritu (načtou se první)
## Vývoj
- **Git remote:** `https://gitea.home.hrajfrisbee.cz/littlemeat/maru-hleda-byt.git`
- **Gitea API token:** uložen v `.claude/settings.local.json`
- **Python 3.9+** kompatibilita (`from __future__ import annotations`)
- **Žádné pip závislosti** — jen stdlib (urllib, json, re, logging, pathlib, subprocess)
- **Docker:** `build/Dockerfile` (python:3.13-alpine), cron každé 4 hodiny
- Generované soubory (`byty_*.json`, `mapa_bytu.html`, `*.log`) jsou v `.gitignore`
## Typické úlohy
```bash
# Rychlý test scraperu
python3 scrape_bazos.py --max-pages 1 --max-properties 5 --log-level DEBUG
# Lokální validace (všechny scrapery s limity)
make validation-local
# Vygenerovat mapu z existujících dat
python3 merge_and_map.py
# Spustit server
python3 server.py # nebo: make serve
# Plný scrape
./run_all.sh
```
## Pořadí scraperů v run_all.sh
1. Sreality
2. Bezrealitky
3. iDNES
4. PSN + CityHome (paralelně)
5. Bazoš
6. Realingo (poslední — uživatelka ho nemá ráda)
7. Merge + mapa
8. Status generování
## Konvence
- Commit messages v angličtině, PR popis v angličtině
- Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- PRy přes Gitea API (viz create_pr.sh pattern v historii)
- Nové scrapery kopírují vzor z `scrape_bezrealitky.py`
- Každý scraper má argparse s `--max-pages`, `--max-properties`, `--log-level`

View File

@@ -83,6 +83,10 @@ Merges all `byty_*.json` files into `byty_merged.json` and generates `mapa_bytu.
**Deduplication logic:** Two listings are considered duplicates if they share the same normalized street name + price + area. PSN and CityHome have priority during dedup (loaded first), so their listings are kept over duplicates from other portals. **Deduplication logic:** Two listings are considered duplicates if they share the same normalized street name + price + area. PSN and CityHome have priority during dedup (loaded first), so their listings are kept over duplicates from other portals.
### `regen_map.py`
Regenerates the map from existing `byty_sreality.json` data without re-scraping. Fetches missing area values from the Sreality API, fixes URLs, and re-applies the area filter. Useful for tweaking map output after data has already been collected.
## Interactive map (`mapa_bytu.html`) ## Interactive map (`mapa_bytu.html`)
The generated map is a standalone HTML file using Leaflet.js with CARTO basemap tiles. Features: The generated map is a standalone HTML file using Leaflet.js with CARTO basemap tiles. Features:
@@ -147,7 +151,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
│ PID 1: python3 -m http.server :8080 │ │ PID 1: python3 -m http.server :8080 │
│ serves /app/data/ │ │ serves /app/data/ │
│ │ │ │
│ crond: runs run_all.sh every 4 hours │ crond: runs run_all.sh at 06:00/18:00
│ Europe/Prague timezone │ │ Europe/Prague timezone │
│ │ │ │
│ /app/ -- scripts (.py, .sh) │ │ /app/ -- scripts (.py, .sh) │
@@ -156,7 +160,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
└─────────────────────────────────────────┘ └─────────────────────────────────────────┘
``` ```
On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place every 4 hours. On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place twice daily at 06:00 and 18:00 CET/CEST.
### Quick start ### Quick start
@@ -197,13 +201,14 @@ Validation targets run scrapers with `--max-pages 1 --max-properties 10` for a f
├── scrape_psn.py # PSN scraper ├── scrape_psn.py # PSN scraper
├── scrape_cityhome.py # CityHome scraper ├── scrape_cityhome.py # CityHome scraper
├── merge_and_map.py # Merge all sources + generate final map ├── merge_and_map.py # Merge all sources + generate final map
├── regen_map.py # Regenerate map from cached Sreality data
├── run_all.sh # Orchestrator script (runs all scrapers + merge) ├── run_all.sh # Orchestrator script (runs all scrapers + merge)
├── mapa_bytu.html # Generated interactive map (output) ├── mapa_bytu.html # Generated interactive map (output)
├── Makefile # Docker management + validation shortcuts ├── Makefile # Docker management + validation shortcuts
├── build/ ├── build/
│ ├── Dockerfile # Container image definition (python:3.13-alpine) │ ├── Dockerfile # Container image definition (python:3.13-alpine)
│ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape) │ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape)
│ ├── crontab # Cron schedule (every 4 hours) │ ├── crontab # Cron schedule (06:00 and 18:00 CET)
│ └── CONTAINER.md # Container-specific documentation │ └── CONTAINER.md # Container-specific documentation
└── .gitignore # Ignores byty_*.json, __pycache__, .vscode └── .gitignore # Ignores byty_*.json, __pycache__, .vscode
``` ```

View File

@@ -11,7 +11,7 @@ WORKDIR /app
COPY scrape_and_map.py scrape_realingo.py scrape_bezrealitky.py \ COPY scrape_and_map.py scrape_realingo.py scrape_bezrealitky.py \
scrape_idnes.py scrape_psn.py scrape_cityhome.py \ scrape_idnes.py scrape_psn.py scrape_cityhome.py \
merge_and_map.py generate_status.py scraper_stats.py \ merge_and_map.py regen_map.py generate_status.py scraper_stats.py \
run_all.sh server.py ./ run_all.sh server.py ./
COPY build/crontab /etc/crontabs/root COPY build/crontab /etc/crontabs/root

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN, CityHome a Bazoše, Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN a CityHome,
deduplikuje a vygeneruje mapu. deduplikuje a vygeneruje mapu.
Deduplikace: stejná ulice (z locality) + stejná cena + stejná plocha = duplikát. Deduplikace: stejná ulice (z locality) + stejná cena + stejná plocha = duplikát.
PSN a CityHome mají při deduplikaci prioritu (načtou se první). PSN a CityHome mají při deduplikaci prioritu (načtou se první).
@@ -9,7 +9,6 @@ from __future__ import annotations
import json import json
import re import re
import unicodedata
from pathlib import Path from pathlib import Path
from scrape_and_map import generate_map, format_price from scrape_and_map import generate_map, format_price
@@ -20,8 +19,14 @@ def normalize_street(locality: str) -> str:
# "Studentská, Praha 6 - Dejvice" → "studentska" # "Studentská, Praha 6 - Dejvice" → "studentska"
# "Rýnská, Praha" → "rynska" # "Rýnská, Praha" → "rynska"
street = locality.split(",")[0].strip().lower() street = locality.split(",")[0].strip().lower()
# Remove diacritics using Unicode decomposition (handles all Czech characters) # Remove diacritics (simple Czech)
street = unicodedata.normalize("NFKD", street).encode("ascii", "ignore").decode("ascii") replacements = {
"á": "a", "č": "c", "ď": "d", "é": "e", "ě": "e",
"í": "i", "ň": "n", "ó": "o", "ř": "r", "š": "s",
"ť": "t", "ú": "u", "ů": "u", "ý": "y", "ž": "z",
}
for src, dst in replacements.items():
street = street.replace(src, dst)
# Remove non-alphanumeric # Remove non-alphanumeric
street = re.sub(r"[^a-z0-9]", "", street) street = re.sub(r"[^a-z0-9]", "", street)
return street return street
@@ -44,7 +49,6 @@ def main():
("Realingo", "byty_realingo.json"), ("Realingo", "byty_realingo.json"),
("Bezrealitky", "byty_bezrealitky.json"), ("Bezrealitky", "byty_bezrealitky.json"),
("iDNES", "byty_idnes.json"), ("iDNES", "byty_idnes.json"),
("Bazoš", "byty_bazos.json"),
] ]
all_estates = [] all_estates = []
@@ -75,10 +79,6 @@ def main():
if key in seen_keys: if key in seen_keys:
dupes += 1 dupes += 1
existing = seen_keys[key] existing = seen_keys[key]
# Preserve earliest first_seen across sources
dup_fs = e.get("first_seen", "")
if dup_fs and (not existing.get("first_seen") or dup_fs < existing["first_seen"]):
existing["first_seen"] = dup_fs
# Log it # Log it
print(f" Duplikát: {e['locality']} | {format_price(e['price'])} | {e.get('area', '?')}" print(f" Duplikát: {e['locality']} | {format_price(e['price'])} | {e.get('area', '?')}"
f"({e.get('source', '?')} vs {existing.get('source', '?')})") f"({e.get('source', '?')} vs {existing.get('source', '?')})")

114
regen_map.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Přegeneruje mapu z již stažených dat (byty_sreality.json).
Doplní chybějící plochy ze Sreality API, opraví URL, aplikuje filtry.
"""
from __future__ import annotations
import json
import time
import urllib.request
from pathlib import Path
from scrape_and_map import (
generate_map, format_price, MIN_AREA, HEADERS, DETAIL_API
)
def api_get(url: str) -> dict:
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def fix_sreality_url(estate: dict) -> str:
"""Fix the Sreality URL to include disposition segment (only if missing)."""
disp = estate.get("disposition", "")
slug_map = {
"1+kk": "1+kk", "1+1": "1+1", "2+kk": "2+kk", "2+1": "2+1",
"3+kk": "3+kk", "3+1": "3+1", "4+kk": "4+kk", "4+1": "4+1",
"5+kk": "5+kk", "5+1": "5+1", "6+": "6-a-vice", "Atypický": "atypicky",
}
slug = slug_map.get(disp, "byt")
old_url = estate.get("url", "")
parts = old_url.split("/")
try:
byt_idx = parts.index("byt")
# Only insert if disposition slug is not already there
if byt_idx + 1 < len(parts) and parts[byt_idx + 1] == slug:
return old_url # already correct
parts.insert(byt_idx + 1, slug)
return "/".join(parts)
except ValueError:
return old_url
def fetch_area(hash_id: int) -> int | None:
"""Fetch area from detail API."""
try:
url = DETAIL_API.format(hash_id)
detail = api_get(url)
for item in detail.get("items", []):
name = item.get("name", "")
if "žitná ploch" in name or "zitna ploch" in name.lower():
return int(item["value"])
except Exception:
pass
return None
def main():
json_path = Path("byty_sreality.json")
if not json_path.exists():
print("Soubor byty_sreality.json nenalezen. Nejprve spusť scrape_and_map.py")
return
estates = json.loads(json_path.read_text(encoding="utf-8"))
print(f"Načteno {len(estates)} bytů z byty_sreality.json")
# Step 1: Fetch missing areas
missing_area = [e for e in estates if e.get("area") is None]
print(f"Doplňuji plochu u {len(missing_area)} bytů...")
for i, e in enumerate(missing_area):
time.sleep(0.3)
area = fetch_area(e["hash_id"])
if area is not None:
e["area"] = area
if (i + 1) % 50 == 0:
print(f" {i + 1}/{len(missing_area)} ...")
# Count results
with_area = sum(1 for e in estates if e.get("area") is not None)
print(f"Plocha doplněna: {with_area}/{len(estates)}")
# Step 2: Fix URLs
for e in estates:
e["url"] = fix_sreality_url(e)
# Step 3: Filter by min area
filtered = []
excluded = 0
for e in estates:
area = e.get("area")
if area is not None and area < MIN_AREA:
excluded += 1
continue
filtered.append(e)
print(f"Vyloučeno (< {MIN_AREA} m²): {excluded}")
print(f"Zbývá: {len(filtered)} bytů")
# Save updated data
filtered_path = Path("byty_sreality.json")
filtered_path.write_text(
json.dumps(filtered, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# Generate map
generate_map(filtered)
if __name__ == "__main__":
main()

View File

@@ -13,7 +13,7 @@ RED='\033[0;31m'
BOLD='\033[1m' BOLD='\033[1m'
NC='\033[0m' NC='\033[0m'
TOTAL=7 TOTAL=6
CURRENT=0 CURRENT=0
FAILED=0 FAILED=0
START_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") START_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
@@ -84,6 +84,9 @@ exec > >(tee -a "$LOG_FILE") 2>&1
step "Sreality" step "Sreality"
python3 scrape_and_map.py $SCRAPER_ARGS || { echo -e "${RED}✗ Sreality selhalo${NC}"; FAILED=$((FAILED + 1)); } python3 scrape_and_map.py $SCRAPER_ARGS || { echo -e "${RED}✗ Sreality selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bezrealitky" step "Bezrealitky"
python3 scrape_bezrealitky.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bezrealitky selhalo${NC}"; FAILED=$((FAILED + 1)); } python3 scrape_bezrealitky.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bezrealitky selhalo${NC}"; FAILED=$((FAILED + 1)); }
@@ -98,12 +101,6 @@ PID_CH=$!
wait $PID_PSN || { echo -e "${RED}✗ PSN selhalo${NC}"; FAILED=$((FAILED + 1)); } wait $PID_PSN || { echo -e "${RED}✗ PSN selhalo${NC}"; FAILED=$((FAILED + 1)); }
wait $PID_CH || { echo -e "${RED}✗ CityHome selhalo${NC}"; FAILED=$((FAILED + 1)); } wait $PID_CH || { echo -e "${RED}✗ CityHome selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bazoš"
python3 scrape_bazos.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bazoš selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
# ── Sloučení + mapa ────────────────────────────────────────── # ── Sloučení + mapa ──────────────────────────────────────────
step "Sloučení dat a generování mapy" step "Sloučení dat a generování mapy"
@@ -120,7 +117,7 @@ python3 generate_status.py --start-time "$START_TIME" --duration "$DURATION" $KE
echo "" echo ""
echo "============================================================" echo "============================================================"
if [ $FAILED -eq 0 ]; then if [ $FAILED -eq 0 ]; then
echo -e "${GREEN}${BOLD}Hotovo! Všech 7 zdrojů úspěšně staženo.${NC}" echo -e "${GREEN}${BOLD}Hotovo! Všech 6 zdrojů úspěšně staženo.${NC}"
else else
echo -e "${RED}${BOLD}Hotovo s $FAILED chybami.${NC}" echo -e "${RED}${BOLD}Hotovo s $FAILED chybami.${NC}"
fi fi

View File

@@ -13,9 +13,9 @@ import math
import time import time
import urllib.request import urllib.request
import urllib.parse import urllib.parse
from datetime import datetime, timedelta from datetime import datetime
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_sreality.json" STATS_FILE = "stats_sreality.json"
@@ -45,26 +45,19 @@ HEADERS = {
def api_get(url: str) -> dict: def api_get(url: str) -> dict:
"""Fetch JSON from Sreality API with retry.""" """Fetch JSON from Sreality API."""
for attempt in range(3): logger.debug(f"HTTP GET request: {url}")
logger.debug(f"HTTP GET request (attempt {attempt + 1}/3): {url}") logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS) req = urllib.request.Request(url, headers=HEADERS)
try: try:
with urllib.request.urlopen(req, timeout=30) as resp: with urllib.request.urlopen(req, timeout=30) as resp:
response_data = resp.read().decode("utf-8") response_data = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes") logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes")
logger.debug(f"Response preview: {response_data[:200]}") logger.debug(f"Response preview: {response_data[:200]}")
return json.loads(response_data) return json.loads(response_data)
except urllib.error.HTTPError: except (urllib.error.URLError, ConnectionError, OSError) as e:
raise logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
except (urllib.error.URLError, ConnectionError, OSError) as e: raise
if attempt < 2:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/3 after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after 3 attempts: {e}", exc_info=True)
raise
def build_list_url(disposition: int, page: int = 1) -> str: def build_list_url(disposition: int, page: int = 1) -> str:
@@ -360,11 +353,7 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"url": sreality_url(hash_id, seo), "url": sreality_url(hash_id, seo),
"image": (estate.get("_links", {}).get("images", [{}])[0].get("href", "") if estate.get("_links", {}).get("images") else ""), "image": (estate.get("_links", {}).get("images", [{}])[0].get("href", "") if estate.get("_links", {}).get("images") else ""),
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "sreality"):
continue
results.append(result) results.append(result)
details_fetched += 1 details_fetched += 1
@@ -440,30 +429,18 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
] ]
for bcolor, blabel in bands: for bcolor, blabel in bands:
price_legend_items += ( price_legend_items += (
f'<div class="price-band" data-color="{bcolor}" onclick="toggleColorFilter(\'{bcolor}\')" ' f'<div style="display:flex;align-items:center;gap:6px;margin:2px 0;">'
f'style="display:flex;align-items:center;gap:6px;margin:2px 0;padding:2px 4px;'
f'border-radius:4px;border:2px solid transparent;">'
f'<span style="width:14px;height:14px;border-radius:50%;background:{bcolor};' f'<span style="width:14px;height:14px;border-radius:50%;background:{bcolor};'
f'display:inline-block;border:2px solid white;box-shadow:0 1px 3px rgba(0,0,0,0.3);flex-shrink:0;"></span>' f'display:inline-block;border:2px solid white;box-shadow:0 1px 3px rgba(0,0,0,0.3);flex-shrink:0;"></span>'
f'<span>{blabel}</span></div>' f'<span>{blabel}</span></div>'
) )
price_legend_items += (
'<div id="price-filter-reset" style="display:none;margin:3px 0 0 4px;">'
'<a href="#" onclick="resetColorFilter();return false;" '
'style="font-size:11px;color:#1976D2;text-decoration:none;">✕ Zobrazit všechny ceny</a>'
'</div>'
)
# New marker indicator — bigger dot, no extra border # New marker indicator — bigger dot, no extra border
price_legend_items += ( price_legend_items += (
'<div style="display:flex;align-items:center;gap:6px;margin:6px 0 0 0;' '<div style="display:flex;align-items:center;gap:6px;margin:6px 0 0 0;'
'padding-top:6px;border-top:1px solid #eee;">' 'padding-top:6px;border-top:1px solid #eee;">'
'<span style="display:inline-flex;align-items:center;gap:3px;flex-shrink:0;">' '<span style="width:18px;height:18px;border-radius:50%;background:#66BB6A;'
'<span style="width:14px;height:14px;border-radius:50%;background:#66BB6A;' 'display:inline-block;box-shadow:0 1px 4px rgba(0,0,0,0.35);flex-shrink:0;"></span>'
'display:inline-block;box-shadow:0 1px 3px rgba(0,0,0,0.3);"></span>' '<span>Nové (z dnešního scrapu) — větší</span></div>'
'<span style="font-size:8px;font-weight:700;background:#FFD600;color:#333;'
'padding:1px 3px;border-radius:2px;">NEW</span>'
'</span>'
'<span>Nové (≤ 1 den)</span></div>'
) )
markers_js = "" markers_js = ""
@@ -480,37 +457,23 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
floor_note = '<br><span style="color:#FF9800;font-weight:bold;">⚠ 2. NP — zvážit klidnost lokality</span>' floor_note = '<br><span style="color:#FF9800;font-weight:bold;">⚠ 2. NP — zvážit klidnost lokality</span>'
source = e.get("source", "sreality") source = e.get("source", "sreality")
source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome", "bazos": "Bazoš"} source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome"}
source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F", "bazos": "#7B1FA2"} source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F"}
source_label = source_labels.get(source, source) source_label = source_labels.get(source, source)
source_color = source_colors.get(source, "#999") source_color = source_colors.get(source, "#999")
hash_id = f"{source}_{e.get('hash_id', '')}" hash_id = e.get("hash_id", "")
first_seen = e.get("first_seen", "") scraped_at = e.get("scraped_at", "")
last_changed = e.get("last_changed", "") is_new = scraped_at == datetime.now().strftime("%Y-%m-%d")
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
is_new = first_seen in (today, yesterday)
new_badge = ( new_badge = (
'<span style="margin-left:6px;font-size:11px;background:#FFD600;color:#333;' '<span style="margin-left:6px;font-size:11px;background:#FFD600;color:#333;'
'padding:1px 6px;border-radius:3px;font-weight:bold;">NOVÉ</span>' 'padding:1px 6px;border-radius:3px;font-weight:bold;">NOVÉ</span>'
if is_new else "" if is_new else ""
) )
date_parts = []
if first_seen:
date_parts.append(f'Přidáno: {first_seen}')
if last_changed and last_changed != first_seen:
date_parts.append(f'Změněno: {last_changed}')
date_row = (
f'<span style="font-size:11px;color:#888;">{"&nbsp;·&nbsp;".join(date_parts)}</span><br>'
if date_parts else ""
)
popup = ( popup = (
f'<div style="min-width:280px;font-family:system-ui,sans-serif;" data-hashid="{hash_id}" data-first-seen="{first_seen}" data-last-changed="{last_changed}">' f'<div style="min-width:280px;font-family:system-ui,sans-serif;" data-hashid="{hash_id}">'
f'<b style="font-size:14px;">{format_price(e["price"])}</b>' f'<b style="font-size:14px;">{format_price(e["price"])}</b>'
f'<span style="margin-left:8px;font-size:11px;background:{source_color};color:white;' f'<span style="margin-left:8px;font-size:11px;background:{source_color};color:white;'
f'padding:1px 6px;border-radius:3px;">{source_label}</span>{new_badge}<br>' f'padding:1px 6px;border-radius:3px;">{source_label}</span>{new_badge}<br>'
@@ -518,9 +481,7 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
f'{floor_note}<br><br>' f'{floor_note}<br><br>'
f'<b>{e["locality"]}</b><br>' f'<b>{e["locality"]}</b><br>'
f'Stavba: {building_text}<br>' f'Stavba: {building_text}<br>'
f'Vlastnictví: {ownership_text}<br>' f'Vlastnictví: {ownership_text}<br><br>'
f'{date_row}'
f'<br>'
f'<a href="{e["url"]}" target="_blank" ' f'<a href="{e["url"]}" target="_blank" '
f'style="color:{source_color};text-decoration:none;font-weight:bold;">' f'style="color:{source_color};text-decoration:none;font-weight:bold;">'
f'→ Otevřít na {source_label}</a>' f'→ Otevřít na {source_label}</a>'
@@ -552,7 +513,7 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
else: else:
marker_fn = "addMarker" marker_fn = "addMarker"
markers_js += ( markers_js += (
f" {marker_fn}({e['lat']}, {e['lon']}, '{color}', '{popup}', '{hash_id}', '{first_seen}', '{last_changed}');\n" f" {marker_fn}({e['lat']}, {e['lon']}, '{color}', '{popup}', '{hash_id}');\n"
) )
# Build legend — price per m² bands + disposition counts # Build legend — price per m² bands + disposition counts
@@ -618,12 +579,12 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
.heart-icon-fav svg path {{ stroke: gold !important; stroke-width: 2.5 !important; filter: drop-shadow(0 0 4px rgba(255,193,7,0.7)); }} .heart-icon-fav svg path {{ stroke: gold !important; stroke-width: 2.5 !important; filter: drop-shadow(0 0 4px rgba(255,193,7,0.7)); }}
.heart-icon-rej {{ opacity: 0.4 !important; filter: grayscale(1); }} .heart-icon-rej {{ opacity: 0.4 !important; filter: grayscale(1); }}
.reject-overlay {{ background: none !important; border: none !important; pointer-events: none !important; }} .reject-overlay {{ background: none !important; border: none !important; pointer-events: none !important; }}
.new-badge-icon {{ background: none !important; border: none !important; pointer-events: none !important; }} @keyframes pulse-new {{
.new-badge {{ 0% {{ stroke-opacity: 1; stroke-width: 3px; r: 11; }}
font-size: 9px; font-weight: 700; color: #333; background: #FFD600; 50% {{ stroke-opacity: 0.4; stroke-width: 6px; r: 12; }}
padding: 1px 4px; border-radius: 3px; white-space: nowrap; 100% {{ stroke-opacity: 1; stroke-width: 3px; r: 11; }}
box-shadow: 0 1px 3px rgba(0,0,0,0.3); letter-spacing: 0.5px;
}} }}
.marker-new {{ animation: pulse-new 2s ease-in-out infinite; }}
.info-panel {{ .info-panel {{
position: absolute; top: 10px; right: 10px; z-index: 1000; position: absolute; top: 10px; right: 10px; z-index: 1000;
background: white; padding: 16px; border-radius: 10px; background: white; padding: 16px; border-radius: 10px;
@@ -656,10 +617,6 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
.info-panel .stats {{ color: #666; margin-bottom: 10px; padding-bottom: 10px; border-bottom: 1px solid #eee; }} .info-panel .stats {{ color: #666; margin-bottom: 10px; padding-bottom: 10px; border-bottom: 1px solid #eee; }}
.filter-section {{ margin-top: 10px; padding-top: 10px; border-top: 1px solid #eee; }} .filter-section {{ margin-top: 10px; padding-top: 10px; border-top: 1px solid #eee; }}
.filter-section label {{ display: flex; align-items: center; gap: 6px; margin: 3px 0; cursor: pointer; }} .filter-section label {{ display: flex; align-items: center; gap: 6px; margin: 3px 0; cursor: pointer; }}
.price-band {{ cursor: pointer; transition: background 0.12s; }}
.price-band:hover {{ background: #f0f0f0; }}
.price-band.active {{ border-color: #333 !important; background: #e8f0fe; }}
.price-band.dimmed {{ opacity: 0.35; }}
.filter-section input[type="checkbox"] {{ accent-color: #1976D2; }} .filter-section input[type="checkbox"] {{ accent-color: #1976D2; }}
#floor-filter {{ margin-top: 8px; }} #floor-filter {{ margin-top: 8px; }}
#floor-filter select {{ width: 100%; padding: 4px; border-radius: 4px; border: 1px solid #ccc; }} #floor-filter select {{ width: 100%; padding: 4px; border-radius: 4px; border: 1px solid #ccc; }}
@@ -698,23 +655,11 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
</div> </div>
<div style="margin-top:6px;"> <div style="margin-top:6px;">
<label>Max cena: <label>Max cena:
<input type="number" id="max-price" value="13500000" max="14000000" step="500000" <select id="max-price" onchange="applyFilters()">
style="width:130px;padding:2px 4px;border:1px solid #ccc;border-radius:3px;" <option value="13500000">13 500 000 Kč</option>
onchange="applyFilters()" onkeyup="applyFilters()"> Kč <option value="12000000">12 000 000 Kč</option>
</label> <option value="10000000">10 000 000 Kč</option>
</div> <option value="8000000">8 000 000 Kč</option>
<div style="margin-top:6px;">
<label>Přidáno / změněno:
<select id="days-filter" onchange="applyFilters()" style="width:100%;padding:4px;border-radius:4px;border:1px solid #ccc;">
<option value="0">Vše</option>
<option value="1">za 1 den</option>
<option value="2">za 2 dny</option>
<option value="3">za 3 dny</option>
<option value="4">za 4 dny</option>
<option value="5">za 5 dní</option>
<option value="7">za 7 dní</option>
<option value="14">za 14 dní</option>
<option value="30">za 30 dní</option>
</select> </select>
</label> </label>
</div> </div>
@@ -748,39 +693,9 @@ L.tileLayer('https://{{s}}.basemaps.cartocdn.com/light_only_labels/{{z}}/{{x}}/{
pane: 'shadowPane', pane: 'shadowPane',
}}).addTo(map); }}).addTo(map);
var selectedColors = [];
function toggleColorFilter(color) {{
var idx = selectedColors.indexOf(color);
if (idx >= 0) selectedColors.splice(idx, 1);
else selectedColors.push(color);
document.querySelectorAll('.price-band').forEach(function(el) {{
var c = el.getAttribute('data-color');
if (selectedColors.length === 0) {{
el.classList.remove('active', 'dimmed');
}} else if (selectedColors.indexOf(c) >= 0) {{
el.classList.add('active'); el.classList.remove('dimmed');
}} else {{
el.classList.add('dimmed'); el.classList.remove('active');
}}
}});
document.getElementById('price-filter-reset').style.display =
selectedColors.length > 0 ? 'block' : 'none';
applyFilters();
}}
function resetColorFilter() {{
selectedColors = [];
document.querySelectorAll('.price-band').forEach(function(el) {{
el.classList.remove('active', 'dimmed');
}});
document.getElementById('price-filter-reset').style.display = 'none';
applyFilters();
}}
var allMarkers = []; var allMarkers = [];
function addMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{ function addMarker(lat, lon, color, popup, hashId) {{
var marker = L.circleMarker([lat, lon], {{ var marker = L.circleMarker([lat, lon], {{
radius: 8, radius: 8,
fillColor: color, fillColor: color,
@@ -789,35 +704,26 @@ function addMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{
opacity: 1, opacity: 1,
fillOpacity: 0.85, fillOpacity: 0.85,
}}).bindPopup(popup); }}).bindPopup(popup);
marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, firstSeen: firstSeen || '', lastChanged: lastChanged || '' }}; marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId }};
allMarkers.push(marker); allMarkers.push(marker);
marker.addTo(map); marker.addTo(map);
}} }}
function addNewMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{ function addNewMarker(lat, lon, color, popup, hashId) {{
var marker = L.circleMarker([lat, lon], {{ var marker = L.circleMarker([lat, lon], {{
radius: 8, radius: 12,
fillColor: color, fillColor: color,
color: '#fff', color: color,
weight: 2, weight: 4,
opacity: 1, opacity: 0.35,
fillOpacity: 0.85, fillOpacity: 0.95,
}}).bindPopup(popup); }}).bindPopup(popup);
marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, isNew: true, firstSeen: firstSeen || '', lastChanged: lastChanged || '' }}; marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, isNew: true }};
allMarkers.push(marker); allMarkers.push(marker);
marker.addTo(map); marker.addTo(map);
var badge = L.marker([lat, lon], {{ marker.on('add', function() {{
icon: L.divIcon({{ if (marker._path) marker._path.classList.add('marker-new');
className: 'new-badge-icon',
html: '<span class="new-badge">NEW</span>',
iconSize: [32, 14],
iconAnchor: [-6, 7],
}}),
interactive: false,
pane: 'markerPane',
}}); }});
badge.addTo(map);
marker._newBadge = badge;
}} }}
function heartIcon(color) {{ function heartIcon(color) {{
@@ -850,11 +756,11 @@ function starIcon() {{
}}); }});
}} }}
function addHeartMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{ function addHeartMarker(lat, lon, color, popup, hashId) {{
var marker = L.marker([lat, lon], {{ var marker = L.marker([lat, lon], {{
icon: heartIcon(color), icon: heartIcon(color),
}}).bindPopup(popup); }}).bindPopup(popup);
marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, isHeart: true, firstSeen: firstSeen || '', lastChanged: lastChanged || '' }}; marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, isHeart: true }};
allMarkers.push(marker); allMarkers.push(marker);
marker.addTo(map); marker.addTo(map);
}} }}
@@ -873,11 +779,6 @@ function loadRatings() {{
function saveRatings(ratings) {{ function saveRatings(ratings) {{
localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings)); localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings));
fetch('/api/ratings', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify(ratings)
}}).catch(function() {{}});
}} }}
function addRejectStrike(marker) {{ function addRejectStrike(marker) {{
@@ -925,7 +826,6 @@ function applyMarkerStyle(marker, status) {{
}} else {{ }} else {{
if (status === 'fav') {{ if (status === 'fav') {{
removeRejectStrike(marker); removeRejectStrike(marker);
if (marker._newBadge && map.hasLayer(marker._newBadge)) map.removeLayer(marker._newBadge);
if (!marker._data._origCircle) marker._data._origCircle = true; if (!marker._data._origCircle) marker._data._origCircle = true;
var popup = marker.getPopup(); var popup = marker.getPopup();
var popupContent = popup ? popup.getContent() : ''; var popupContent = popup ? popup.getContent() : '';
@@ -949,7 +849,6 @@ function applyMarkerStyle(marker, status) {{
}} }}
// Add strikethrough line over the marker // Add strikethrough line over the marker
addRejectStrike(marker); addRejectStrike(marker);
if (marker._newBadge && map.hasLayer(marker._newBadge)) map.removeLayer(marker._newBadge);
}} else {{ }} else {{
if (marker._data._origCircle && !(marker instanceof L.CircleMarker)) {{ if (marker._data._origCircle && !(marker instanceof L.CircleMarker)) {{
revertToCircle(marker, {{ radius: 8, fillColor: marker._data.color, color: '#fff', weight: 2, fillOpacity: 0.85 }}); revertToCircle(marker, {{ radius: 8, fillColor: marker._data.color, color: '#fff', weight: 2, fillOpacity: 0.85 }});
@@ -962,7 +861,6 @@ function applyMarkerStyle(marker, status) {{
}} }}
if (marker._path) marker._path.classList.remove('marker-rejected'); if (marker._path) marker._path.classList.remove('marker-rejected');
removeRejectStrike(marker); removeRejectStrike(marker);
if (marker._newBadge && !map.hasLayer(marker._newBadge)) marker._newBadge.addTo(map);
}} }}
}} }}
}} }}
@@ -1118,21 +1016,11 @@ map.on('popupopen', function(e) {{
// ── Filters ──────────────────────────────────────────────────── // ── Filters ────────────────────────────────────────────────────
function applyFilters() {{ function applyFilters() {{
var minFloor = parseInt(document.getElementById('min-floor').value); var minFloor = parseInt(document.getElementById('min-floor').value);
var maxPriceEl = document.getElementById('max-price'); var maxPrice = parseInt(document.getElementById('max-price').value);
var maxPrice = parseInt(maxPriceEl.value) || 14000000;
if (maxPrice > 14000000) {{ maxPrice = 14000000; maxPriceEl.value = 14000000; }}
var hideRejected = document.getElementById('hide-rejected').checked; var hideRejected = document.getElementById('hide-rejected').checked;
var daysFilter = parseInt(document.getElementById('days-filter').value) || 0;
var ratings = loadRatings(); var ratings = loadRatings();
var visible = 0; var visible = 0;
var cutoff = null;
if (daysFilter > 0) {{
cutoff = new Date();
cutoff.setDate(cutoff.getDate() - daysFilter);
cutoff.setHours(0, 0, 0, 0);
}}
allMarkers.forEach(function(m) {{ allMarkers.forEach(function(m) {{
var popup = m.getPopup().getContent(); var popup = m.getPopup().getContent();
var floorMatch = popup.match(/(\\d+)\\. NP/); var floorMatch = popup.match(/(\\d+)\\. NP/);
@@ -1145,14 +1033,6 @@ function applyFilters() {{
if (floor !== null && floor < minFloor) show = false; if (floor !== null && floor < minFloor) show = false;
if (price > maxPrice) show = false; if (price > maxPrice) show = false;
if (cutoff) {{
var fs = m._data.firstSeen ? new Date(m._data.firstSeen) : null;
var lc = m._data.lastChanged ? new Date(m._data.lastChanged) : null;
if (!((fs && fs >= cutoff) || (lc && lc >= cutoff))) show = false;
}}
if (selectedColors.length > 0 && selectedColors.indexOf(m._data.color) < 0) show = false;
var r = ratings[m._data.hashId]; var r = ratings[m._data.hashId];
if (hideRejected && r && r.status === 'reject') show = false; if (hideRejected && r && r.status === 'reject') show = false;
@@ -1161,12 +1041,10 @@ function applyFilters() {{
visible++; visible++;
// Show strike line if rejected and visible // Show strike line if rejected and visible
if (m._rejectStrike && !map.hasLayer(m._rejectStrike)) m._rejectStrike.addTo(map); if (m._rejectStrike && !map.hasLayer(m._rejectStrike)) m._rejectStrike.addTo(map);
if (m._newBadge && !map.hasLayer(m._newBadge)) m._newBadge.addTo(map);
}} else {{ }} else {{
if (map.hasLayer(m)) map.removeLayer(m); if (map.hasLayer(m)) map.removeLayer(m);
// Hide strike line when marker hidden // Hide strike line when marker hidden
if (m._rejectStrike && map.hasLayer(m._rejectStrike)) map.removeLayer(m._rejectStrike); if (m._rejectStrike && map.hasLayer(m._rejectStrike)) map.removeLayer(m._rejectStrike);
if (m._newBadge && map.hasLayer(m._newBadge)) map.removeLayer(m._newBadge);
}} }}
}}); }});
@@ -1181,25 +1059,8 @@ function applyFilters() {{
document.getElementById('visible-count').textContent = visible; document.getElementById('visible-count').textContent = visible;
}} }}
// Initialize ratings: load from server, merge with localStorage, then restore // Initialize ratings on load
function initRatings() {{ restoreRatings();
var local = loadRatings();
fetch('/api/ratings')
.then(function(r) {{ return r.ok ? r.json() : null; }})
.then(function(server) {{
if (server && typeof server === 'object') {{
var merged = Object.assign({{}}, local, server);
localStorage.setItem(RATINGS_KEY, JSON.stringify(merged));
}}
restoreRatings();
updateRatingCounts();
}})
.catch(function() {{
restoreRatings();
updateRatingCounts();
}});
}}
initRatings();
// ── Panel toggle ────────────────────────────────────────────── // ── Panel toggle ──────────────────────────────────────────────
function togglePanel() {{ function togglePanel() {{

View File

@@ -1,560 +0,0 @@
#!/usr/bin/env python3
"""
Bazoš.cz scraper.
Stáhne byty na prodej v Praze a vyfiltruje podle kritérií.
Výstup: byty_bazos.json
"""
from __future__ import annotations
import argparse
from datetime import datetime
import json
import logging
import math
import re
import time
import urllib.request
import urllib.parse
from pathlib import Path
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bazos.json"
logger = logging.getLogger(__name__)
# ── Konfigurace ─────────────────────────────────────────────────────────────
MAX_PRICE = 14_000_000
MIN_AREA = 69
MIN_FLOOR = 2
PER_PAGE = 20 # Bazoš vrací 20 na stránku
WANTED_DISPOSITIONS = {"3+kk", "3+1", "4+kk", "4+1", "5+kk", "5+1", "6+kk", "6+1"}
# Regex patterns pro parsování dispozice, plochy a patra z textu
DISP_RE = re.compile(r'(\d)\s*\+\s*(kk|1)', re.IGNORECASE)
AREA_RE = re.compile(r'(\d+(?:[.,]\d+)?)\s*m[²2\s,.]', re.IGNORECASE)
FLOOR_RE = re.compile(r'(\d+)\s*[./]\s*(\d+)\s*(?:NP|patr|podlaž|floor)', re.IGNORECASE)
FLOOR_RE2 = re.compile(r'(\d+)\.\s*(?:NP|patr[eouě]|podlaž[ií])', re.IGNORECASE)
FLOOR_RE3 = re.compile(r'(?:patr[eouě]|podlaž[ií]|NP)\s*[:\s]*(\d+)', re.IGNORECASE)
PANEL_RE = re.compile(r'panel(?:ov|ák|\.)', re.IGNORECASE)
SIDLISTE_RE = re.compile(r'sídliště|sidliste|panelák', re.IGNORECASE)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "cs,en;q=0.9",
}
BASE_URL = "https://reality.bazos.cz"
SEARCH_PARAMS = "hledat=&rubriky=reality&hlokalita=Praha&humkreis=25&cenado={max_price}&kitx=ano"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8", errors="replace")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 3
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def format_price(price: int) -> str:
s = str(price)
parts = []
while s:
parts.append(s[-3:])
s = s[:-3]
return " ".join(reversed(parts)) + ""
def parse_price(text: str) -> int:
"""Parse price from text like '5 250 000 Kč' → 5250000."""
cleaned = re.sub(r'[^\d]', '', text)
return int(cleaned) if cleaned else 0
def parse_disposition(text: str) -> str | None:
"""Parse disposition from title/description like '3+kk', '4+1'."""
m = DISP_RE.search(text)
if m:
rooms = m.group(1)
suffix = m.group(2).lower()
return f"{rooms}+{suffix}"
return None
def parse_area(text: str) -> float | None:
"""Parse area from text like '82 m²' → 82.0."""
m = AREA_RE.search(text)
if m:
return float(m.group(1).replace(',', '.'))
return None
def parse_floor(text: str) -> int | None:
"""Parse floor number from description."""
for pattern in [FLOOR_RE, FLOOR_RE2, FLOOR_RE3]:
m = pattern.search(text)
if m:
return int(m.group(1))
return None
def is_panel(text: str) -> bool:
"""Check if description mentions panel construction."""
return bool(PANEL_RE.search(text))
def is_sidliste(text: str) -> bool:
"""Check if description mentions housing estate."""
return bool(SIDLISTE_RE.search(text))
def fetch_listing_page(offset: int = 0, pagination_params: str | None = None) -> tuple[list[dict], int, str | None]:
"""
Fetch a page of listings from Bazoš.
Returns (list of basic listing dicts, total count, pagination_params for next pages).
"""
if pagination_params and offset > 0:
# Use resolved numeric params from first page's pagination links
url = f"{BASE_URL}/prodam/byt/{offset}/?{pagination_params}"
else:
params = SEARCH_PARAMS.format(max_price=MAX_PRICE)
if offset > 0:
url = f"{BASE_URL}/prodam/byt/{offset}/?{params}"
else:
url = f"{BASE_URL}/prodam/byt/?{params}"
html = fetch_url(url)
# Parse total count: "Zobrazeno 1-20 z 727"
total = 0
total_match = re.search(r'z\s+([\d\s]+)\s', html)
if total_match:
total = int(total_match.group(1).replace(' ', ''))
# Extract resolved pagination params from first page (Bazoš converts
# hlokalita=Praha → hlokalita=11000, and pagination only works with numeric form)
resolved_params = None
pag_link = re.search(r'href="/prodam/byt/\d+/\?([^"]+)"', html)
if pag_link:
resolved_params = pag_link.group(1)
# Parse listings — split by listing blocks (class="inzeraty inzeratyflex")
listings = []
all_blocks = re.split(r'<div class="inzeraty\s+inzeratyflex">', html)[1:] # skip before first
for block in all_blocks:
# Extract URL and ID from first link (/inzerat/XXXXXX/slug.php)
url_match = re.search(r'href="(/inzerat/(\d+)/[^"]*)"', block)
if not url_match:
continue
detail_path = url_match.group(1)
listing_id = int(url_match.group(2))
# Title — class=nadpis (without quotes) or class="nadpis"
title_match = re.search(r'class=.?nadpis.?[^>]*>\s*<a[^>]*>([^<]+)</a>', block)
title = title_match.group(1).strip() if title_match else ""
# Price — inside <span translate="no"> within inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>.*?<span[^>]*>([^<]+)</span>', block, re.DOTALL)
if not price_match:
# Fallback: direct text in inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>\s*(?:<b>)?([^<]+)', block)
price_text = price_match.group(1).strip() if price_match else ""
price = parse_price(price_text)
# Location
loc_match = re.search(r'class="inzeratylok"[^>]*>(.*?)</div>', block, re.DOTALL)
location = ""
if loc_match:
location = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
location = re.sub(r'\s+', ' ', location)
# Date — [5.3. 2026]
date_match = re.search(r'\[(\d+\.\d+\.\s*\d{4})\]', block)
date_str = date_match.group(1).strip() if date_match else ""
# Description preview — class=popis (without quotes) or class="popis"
desc_match = re.search(r'class=.?popis.?[^>]*>(.*?)</div>', block, re.DOTALL)
description = ""
if desc_match:
description = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
description = re.sub(r'\s+', ' ', description)
# Image — <img ... class="obrazek" ... src="...">
img_match = re.search(r'<img[^>]*src="([^"]+)"[^>]*class="obrazek"', block)
if not img_match:
img_match = re.search(r'class="obrazek"[^>]*src="([^"]+)"', block)
image = img_match.group(1) if img_match else ""
if "empty.gif" in image:
image = ""
listings.append({
"id": listing_id,
"title": title,
"price": price,
"location": location,
"date": date_str,
"description": description,
"detail_path": detail_path,
"image": image,
})
logger.debug(f"Offset {offset}: found {len(listings)} listings, total={total}")
return listings, total, resolved_params
def fetch_detail(path: str) -> dict | None:
"""Fetch listing detail page and extract GPS, full description."""
try:
url = f"{BASE_URL}{path}"
html = fetch_url(url)
result = {}
# GPS from Google Maps link
gps_match = re.search(r'google\.com/maps[^"]*place/([\d.]+),([\d.]+)', html)
if gps_match:
result["lat"] = float(gps_match.group(1))
result["lon"] = float(gps_match.group(2))
# Full description — Bazoš uses unquoted class=popisdetail
desc_match = re.search(r'class=.?popisdetail.?[^>]*>(.*?)</div>', html, re.DOTALL)
if desc_match:
desc = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
desc = re.sub(r'\s+', ' ', desc)
result["description"] = desc
# Location from detail
loc_match = re.search(r'Lokalita:</td>\s*<td[^>]*>(.*?)</td>', html, re.DOTALL)
if loc_match:
loc = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
loc = re.sub(r'\s+', ' ', loc)
result["detail_location"] = loc
return result
except Exception as e:
logger.warning(f"Detail fetch failed for {path}: {e}")
return None
def load_cache(json_path: str = "byty_bazos.json") -> dict[int, dict]:
"""Load previously scraped data as cache keyed by hash_id."""
path = Path(json_path)
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return {e["hash_id"]: e for e in data if "hash_id" in e}
except (json.JSONDecodeError, KeyError):
return {}
def scrape(max_pages: int | None = None, max_properties: int | None = None):
_run_start = time.time()
_run_ts = datetime.now().isoformat(timespec="seconds")
cache = load_cache()
today = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info("Stahuji inzeráty z Bazoš.cz")
logger.info(f"Cena: do {format_price(MAX_PRICE)}")
logger.info(f"Min. plocha: {MIN_AREA}")
logger.info(f"Patro: od {MIN_FLOOR}. NP")
logger.info(f"Region: Praha")
if cache:
logger.info(f"Cache: {len(cache)} bytů z minulého běhu")
if max_pages:
logger.info(f"Max. stran: {max_pages}")
if max_properties:
logger.info(f"Max. bytů: {max_properties}")
logger.info("=" * 60)
# Step 1: Fetch listing pages
logger.info("\nFáze 1: Stahování seznamu inzerátů...")
all_listings = {} # id -> listing dict (dedup)
page = 1
offset = 0
total = None
pagination_params = None # resolved numeric params from first page
while True:
if max_pages and page > max_pages:
logger.debug(f"Max pages limit reached: {max_pages}")
break
logger.info(f"Strana {page} (offset {offset}) ...")
listings, total_count, resolved = fetch_listing_page(offset, pagination_params)
if resolved and not pagination_params:
pagination_params = resolved
logger.debug(f"Resolved pagination params: {pagination_params}")
if total is None and total_count > 0:
total = total_count
total_pages = math.ceil(total / PER_PAGE)
logger.info(f"→ Celkem {total} inzerátů, ~{total_pages} stran")
if not listings:
logger.debug(f"No listings found on page {page}, stopping")
break
for lst in listings:
lid = lst["id"]
if lid not in all_listings:
all_listings[lid] = lst
page += 1
offset += PER_PAGE
if total and offset >= total:
break
time.sleep(0.5)
logger.info(f"\nStaženo: {len(all_listings)} unikátních inzerátů")
# Step 2: Pre-filter by disposition, price, area from listing data
pre_filtered = []
excluded_disp = 0
excluded_price = 0
excluded_area = 0
excluded_no_disp = 0
for lst in all_listings.values():
title_and_desc = f"{lst['title']} {lst['description']}"
# Parse disposition
disp = parse_disposition(title_and_desc)
if not disp:
excluded_no_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (no disposition found in '{lst['title']}')")
continue
if disp not in WANTED_DISPOSITIONS:
excluded_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (disposition {disp})")
continue
# Price
price = lst["price"]
if price <= 0 or price > MAX_PRICE:
excluded_price += 1
logger.debug(f"Filter: id={lst['id']} - excluded (price {price})")
continue
# Area (if parseable from listing)
area = parse_area(title_and_desc)
if area is not None and area < MIN_AREA:
excluded_area += 1
logger.debug(f"Filter: id={lst['id']} - excluded (area {area} m²)")
continue
lst["_disposition"] = disp
lst["_area"] = area
pre_filtered.append(lst)
logger.info(f"\nPo předfiltraci:")
logger.info(f" Vyloučeno (bez dispozice): {excluded_no_disp}")
logger.info(f" Vyloučeno (dispozice): {excluded_disp}")
logger.info(f" Vyloučeno (cena): {excluded_price}")
logger.info(f" Vyloučeno (plocha): {excluded_area}")
logger.info(f" Zbývá: {len(pre_filtered)}")
# Step 3: Fetch details (for GPS + full description)
logger.info(f"\nFáze 2: Stahování detailů ({len(pre_filtered)} bytů)...")
results = []
excluded_panel = 0
excluded_floor = 0
excluded_no_gps = 0
excluded_detail = 0
excluded_area_detail = 0
cache_hits = 0
properties_fetched = 0
for i, lst in enumerate(pre_filtered):
if max_properties and properties_fetched >= max_properties:
logger.debug(f"Max properties limit reached: {max_properties}")
break
listing_id = lst["id"]
price = lst["price"]
# Check cache
cached = cache.get(listing_id)
if cached and cached.get("price") == price:
cache_hits += 1
logger.debug(f"Cache hit for id={listing_id}")
results.append(cached)
continue
time.sleep(0.4)
detail = fetch_detail(lst["detail_path"])
if not detail:
excluded_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (detail fetch failed)")
continue
# GPS required
lat = detail.get("lat")
lon = detail.get("lon")
if not lat or not lon:
excluded_no_gps += 1
logger.debug(f"Filter: id={listing_id} - excluded (no GPS)")
continue
# Full text for filtering
full_desc = detail.get("description", "")
full_text = f"{lst['title']} {lst['description']} {full_desc}"
# Panel check
if is_panel(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: panelová stavba")
continue
# Sídliště check
if is_sidliste(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: sídliště")
continue
# Floor
floor = parse_floor(full_text)
if floor is not None and floor < MIN_FLOOR:
excluded_floor += 1
logger.debug(f"Filter: id={listing_id} - excluded (floor {floor})")
continue
# Area — re-check from detail if not found before
area = lst.get("_area") or parse_area(full_desc)
if area is not None and area < MIN_AREA:
excluded_area_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (area {area} m² from detail)")
continue
disp = lst["_disposition"]
locality = detail.get("detail_location") or lst["location"]
result = {
"hash_id": listing_id,
"name": f"Prodej bytu {disp} {int(area) if area else '?'}",
"price": price,
"price_formatted": format_price(price),
"locality": locality,
"lat": lat,
"lon": lon,
"disposition": disp,
"floor": floor,
"area": area,
"building_type": "neuvedeno",
"ownership": "neuvedeno",
"url": f"{BASE_URL}{lst['detail_path']}",
"source": "bazos",
"image": lst.get("image", ""),
"scraped_at": today,
"first_seen": cached.get("first_seen", today) if cached else today,
"last_changed": today if not cached or cached.get("price") != price else cached.get("last_changed", today),
}
if not validate_listing(result, "bazos"):
continue
results.append(result)
properties_fetched += 1
if (i + 1) % 20 == 0:
logger.info(f"Zpracováno {i + 1}/{len(pre_filtered)} ...")
logger.info(f"\n{'=' * 60}")
logger.info(f"Výsledky Bazoš:")
logger.info(f" Předfiltrováno: {len(pre_filtered)}")
logger.info(f" Z cache (přeskočeno): {cache_hits}")
logger.info(f" Vyloučeno (panel/síd): {excluded_panel}")
logger.info(f" Vyloučeno (patro): {excluded_floor}")
logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}")
logger.info(f" Vyloučeno (bez detailu): {excluded_detail}")
logger.info(f" Vyloučeno (plocha det): {excluded_area_detail}")
logger.info(f" ✓ Vyhovující byty: {len(results)}")
logger.info(f"{'=' * 60}")
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - _run_start, 1),
"success": True,
"accepted": len(results),
"fetched": len(all_listings),
"pages": page - 1,
"cache_hits": cache_hits,
"excluded": {
"bez dispozice": excluded_no_disp,
"dispozice": excluded_disp,
"cena": excluded_price,
"plocha": excluded_area + excluded_area_detail,
"bez GPS": excluded_no_gps,
"panel/síd": excluded_panel,
"patro": excluded_floor,
"bez detailu": excluded_detail,
},
})
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape apartments from Bazoš.cz")
parser.add_argument("--max-pages", type=int, default=None,
help="Maximum number of listing pages to scrape")
parser.add_argument("--max-properties", type=int, default=None,
help="Maximum number of properties to fetch details for")
parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level (default: INFO)")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
handlers=[logging.StreamHandler()]
)
_run_ts = datetime.now().isoformat(timespec="seconds")
start = time.time()
try:
estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties)
except Exception as e:
logger.error(f"Scraper failed: {e}", exc_info=True)
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - start, 1),
"success": False,
"accepted": 0,
"fetched": 0,
"error": str(e),
})
raise
if estates:
json_path = Path("byty_bazos.json")
json_path.write_text(
json.dumps(estates, ensure_ascii=False, indent=2),
encoding="utf-8",
)
elapsed = time.time() - start
logger.info(f"\n✓ Data uložena: {json_path.resolve()}")
logger.info(f"⏱ Celkový čas: {elapsed:.0f} s")
else:
logger.info("\nŽádné byty z Bazoše neodpovídají kritériím :(")

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_bezrealitky.json" STATS_FILE = "stats_bezrealitky.json"
@@ -71,71 +71,62 @@ HEADERS = {
BASE_URL = "https://www.bezrealitky.cz" BASE_URL = "https://www.bezrealitky.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_page(page: int) -> tuple[list[dict], int]: def fetch_page(page: int) -> tuple[list[dict], int]:
""" """
Fetch a listing page from Bezrealitky. Fetch a listing page from Bezrealitky.
Returns (list of advert dicts from Apollo cache, total count). Returns (list of advert dicts from Apollo cache, total count).
""" """
url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}" url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}"
html = fetch_url(url) logger.debug(f"HTTP GET request: {url}")
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
) )
if not match: if not match:
logger.debug("No __NEXT_DATA__ script found in HTML") logger.debug("No __NEXT_DATA__ script found in HTML")
return [], 0 return [], 0
data = json.loads(match.group(1)) data = json.loads(match.group(1))
cache = data["props"]["pageProps"]["apolloCache"] cache = data["props"]["pageProps"]["apolloCache"]
# Extract adverts from cache # Extract adverts from cache
adverts = [] adverts = []
for key, val in cache.items(): for key, val in cache.items():
if key.startswith("Advert:") and isinstance(val, dict) and val.get("__typename") == "Advert": if key.startswith("Advert:") and isinstance(val, dict) and val.get("__typename") == "Advert":
adverts.append(val) adverts.append(val)
# Get total count from ROOT_QUERY # Get total count from ROOT_QUERY
total = 0 total = 0
root = cache.get("ROOT_QUERY", {}) root = cache.get("ROOT_QUERY", {})
for key, val in root.items(): for key, val in root.items():
if "listAdverts" in key and isinstance(val, dict): if "listAdverts" in key and isinstance(val, dict):
tc = val.get("totalCount") tc = val.get("totalCount")
if tc and tc > total: if tc and tc > total:
total = tc total = tc
logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}") logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}")
return adverts, total return adverts, total
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(uri: str) -> dict | None: def fetch_detail(uri: str) -> dict | None:
"""Fetch detail page for a listing.""" """Fetch detail page for a listing."""
try: try:
url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}" url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}"
html = fetch_url(url) logger.debug(f"HTTP GET request: {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -371,11 +362,7 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"source": "bezrealitky", "source": "bezrealitky",
"image": "", "image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "bezrealitky"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -14,7 +14,7 @@ import time
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_cityhome.json" STATS_FILE = "stats_cityhome.json"
@@ -255,16 +255,6 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
else: else:
logger.info(f"{slug}: GPS nenalezeno") logger.info(f"{slug}: GPS nenalezeno")
# Load previous output for first_seen/last_changed tracking
_prev_cache: dict[str, dict] = {}
_prev_path = Path("byty_cityhome.json")
if _prev_path.exists():
try:
for _item in json.loads(_prev_path.read_text(encoding="utf-8")):
_prev_cache[str(_item["hash_id"])] = _item
except Exception:
pass
# Step 3: Filter listings # Step 3: Filter listings
logger.info(f"\nFáze 3: Filtrování...") logger.info(f"\nFáze 3: Filtrování...")
results = [] results = []
@@ -372,11 +362,7 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"source": "cityhome", "source": "cityhome",
"image": "", "image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "cityhome"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,8 +15,9 @@ import re
import time import time
import urllib.request import urllib.request
import urllib.parse import urllib.parse
from html.parser import HTMLParser
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_idnes.json" STATS_FILE = "stats_idnes.json"
@@ -464,11 +465,7 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"source": "idnes", "source": "idnes",
"image": "", "image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "idnes"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from urllib.parse import urlencode from urllib.parse import urlencode
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_psn.json" STATS_FILE = "stats_psn.json"
@@ -38,25 +38,19 @@ BASE_URL = "https://psn.cz"
UNITS_API = f"{BASE_URL}/api/units-list" UNITS_API = f"{BASE_URL}/api/units-list"
def fetch_json(url: str, retries: int = 3) -> dict: def fetch_json(url: str) -> dict:
"""Fetch JSON via curl (urllib SSL may fail on Cloudflare) with retry.""" """Fetch JSON via curl (urllib SSL may fail on Cloudflare)."""
for attempt in range(retries): logger.debug(f"HTTP GET: {url}")
logger.debug(f"HTTP GET (attempt {attempt + 1}/{retries}): {url}") result = subprocess.run(
result = subprocess.run( ["curl", "-s", "-L", "--max-time", "30",
["curl", "-s", "-L", "--max-time", "30", "-H", f"User-Agent: {UA}",
"-H", f"User-Agent: {UA}", "-H", "Accept: application/json",
"-H", "Accept: application/json", url],
url], capture_output=True, text=True, timeout=60
capture_output=True, text=True, timeout=60 )
) if result.returncode != 0:
if result.returncode == 0: raise RuntimeError(f"curl failed ({result.returncode}): {result.stderr[:200]}")
return json.loads(result.stdout) return json.loads(result.stdout)
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"curl failed (retry {attempt + 1}/{retries} after {wait}s): {result.stderr[:200]}")
time.sleep(wait)
else:
raise RuntimeError(f"curl failed after {retries} attempts ({result.returncode}): {result.stderr[:200]}")
def fix_gps(lat, lng): def fix_gps(lat, lng):
@@ -118,16 +112,6 @@ def scrape(max_properties: int | None = None):
all_units = data.get("units", {}).get("data", []) all_units = data.get("units", {}).get("data", [])
logger.info(f"Staženo jednotek celkem: {len(all_units)}") logger.info(f"Staženo jednotek celkem: {len(all_units)}")
# Load previous output for first_seen/last_changed tracking
_prev_cache: dict[str, dict] = {}
_prev_path = Path("byty_psn.json")
if _prev_path.exists():
try:
for _item in json.loads(_prev_path.read_text(encoding="utf-8")):
_prev_cache[str(_item["hash_id"])] = _item
except Exception:
pass
# Filtrování # Filtrování
results = [] results = []
excluded = { excluded = {
@@ -258,11 +242,7 @@ def scrape(max_properties: int | None = None):
"source": "psn", "source": "psn",
"image": "", "image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "psn"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats, validate_listing from scraper_stats import write_stats
STATS_FILE = "stats_realingo.json" STATS_FILE = "stats_realingo.json"
@@ -56,28 +56,6 @@ HEADERS = {
BASE_URL = "https://www.realingo.cz" BASE_URL = "https://www.realingo.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]: def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
"""Fetch a page of Prague listings. Returns (items, total_count).""" """Fetch a page of Prague listings. Returns (items, total_count)."""
if page == 1: if page == 1:
@@ -85,26 +63,41 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
else: else:
url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/" url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/"
html = fetch_url(url) logger.debug(f"HTTP GET request: {url}")
match = re.search( logger.debug(f"Headers: {HEADERS}")
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', req = urllib.request.Request(url, headers=HEADERS)
html, re.DOTALL try:
) resp = urllib.request.urlopen(req, timeout=30)
if not match: html = resp.read().decode("utf-8")
logger.debug("No __NEXT_DATA__ script found in HTML") logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return [], 0
data = json.loads(match.group(1)) match = re.search(
offer_list = data["props"]["pageProps"]["store"]["offer"]["list"] r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}") html, re.DOTALL
return offer_list["data"], offer_list["total"] )
if not match:
logger.debug("No __NEXT_DATA__ script found in HTML")
return [], 0
data = json.loads(match.group(1))
offer_list = data["props"]["pageProps"]["store"]["offer"]["list"]
logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}")
return offer_list["data"], offer_list["total"]
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(listing_url: str) -> dict | None: def fetch_detail(listing_url: str) -> dict | None:
"""Fetch detail page for a listing to get floor, building type, etc.""" """Fetch detail page for a listing to get floor, building type, etc."""
try: try:
url = f"{BASE_URL}{listing_url}" url = f"{BASE_URL}{listing_url}"
html = fetch_url(url) logger.debug(f"HTTP GET request: {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
@@ -328,11 +321,7 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"source": "realingo", "source": "realingo",
"image": "", "image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"), "scraped_at": datetime.now().strftime("%Y-%m-%d"),
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "realingo"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -1,53 +1,13 @@
"""Shared utilities for scraper run statistics and listing validation.""" """Shared utility for writing per-scraper run statistics to JSON."""
from __future__ import annotations from __future__ import annotations
import json import json
import logging
import os import os
from pathlib import Path from pathlib import Path
HERE = Path(__file__).parent HERE = Path(__file__).parent
DATA_DIR = Path(os.environ.get("DATA_DIR", HERE)) DATA_DIR = Path(os.environ.get("DATA_DIR", HERE))
_val_log = logging.getLogger(__name__)
_REQUIRED_FIELDS = ("hash_id", "price", "locality", "lat", "lon", "url", "source")
def validate_listing(listing: dict, context: str = "") -> bool:
"""
Validate a listing dict before it is written to the output JSON.
Returns True if valid, False if the listing should be skipped.
Logs a warning for each invalid listing.
"""
prefix = f"[{context}] " if context else ""
for field in _REQUIRED_FIELDS:
val = listing.get(field)
if val is None or val == "":
_val_log.warning(f"{prefix}Skipping listing — missing field '{field}': {listing.get('hash_id', '?')}")
return False
price = listing.get("price")
if not isinstance(price, (int, float)) or price <= 0:
_val_log.warning(f"{prefix}Skipping listing — invalid price={price!r}: {listing.get('hash_id', '?')}")
return False
lat, lon = listing.get("lat"), listing.get("lon")
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
_val_log.warning(f"{prefix}Skipping listing — non-numeric GPS lat={lat!r} lon={lon!r}: {listing.get('hash_id', '?')}")
return False
if not (47.0 <= lat <= 52.0) or not (12.0 <= lon <= 19.0):
_val_log.warning(f"{prefix}Skipping listing — GPS outside Czech Republic lat={lat} lon={lon}: {listing.get('hash_id', '?')}")
return False
area = listing.get("area")
if area is not None and (not isinstance(area, (int, float)) or area <= 0):
_val_log.warning(f"{prefix}Skipping listing — invalid area={area!r}: {listing.get('hash_id', '?')}")
return False
return True
def write_stats(filename: str, stats: dict) -> None: def write_stats(filename: str, stats: dict) -> None:
"""Write scraper run stats dict to the data directory.""" """Write scraper run stats dict to the data directory."""