8 Commits

Author SHA1 Message Date
59ef3274b6 Add CLAUDE.md project documentation for session context
Provides automatic context loading for new Claude Code sessions,
documenting architecture, filters, sources, and conventions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:58:01 +01:00
27e5b05f88 Add Bazoš.cz as new apartment scraper source
New scraper for reality.bazos.cz with full HTML parsing (no API),
GPS extraction from Google Maps links, panel/sídliště filtering,
floor/area parsing from free text, and pagination fix for Bazoš's
numeric locality codes. Integrated into merge pipeline and map
with purple (#7B1FA2) markers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:47:37 +01:00
63663e4b6b Merge pull request 'Move Realingo scraper to run last' (#6) from fix/scraper-order into main
All checks were successful
Build and Push / build (push) Successful in 6s
Reviewed-on: #6
2026-02-27 21:19:29 +00:00
8c052840cd Move Realingo scraper to run last in pipeline
Reorder scrapers: Sreality → Bezrealitky → iDNES → PSN+CityHome → Realingo → Merge

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 21:35:54 +01:00
39e4b9ce2a Merge pull request 'Reliability improvements and cleanup' (#5) from improve/reliability-and-fixes into main
Reviewed-on: #5
2026-02-27 10:26:04 +00:00
Jan Novak
fd3991f8d6 Remove regen_map.py references from Dockerfile and README
All checks were successful
Build and Push / build (push) Successful in 6s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 10:44:08 +01:00
Jan Novak
27a7834eb6 Reliability improvements: retry logic, validation, ratings sync
Some checks failed
Build and Push / build (push) Failing after 4s
- Add 3-attempt retry with exponential backoff to Sreality, Realingo,
  Bezrealitky, and PSN scrapers (CityHome and iDNES already had it)
- Add shared validate_listing() in scraper_stats.py; all 6 scrapers now
  validate GPS bounds, price, area, and required fields before output
- Wire ratings to server /api/ratings on page load (merge with
  localStorage) and save (async POST); ratings now persist across
  browsers and devices
- Namespace JS hash IDs as {source}_{id} to prevent rating collisions
  between listings from different portals with the same numeric ID
- Replace manual Czech diacritic table with unicodedata.normalize()
  in merge_and_map.py for correct deduplication of all edge cases
- Correct README schedule docs: every 4 hours, not twice daily

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 10:36:37 +01:00
57a9f6f21a Add NEW badge for recent listings, text input for price filter, cleanup
- New listings (≤1 day) show yellow NEW badge instead of oversized marker
- Price filter changed from dropdown to text input (max 14M)
- Cap price filter at 14M in JS
- Remove unused regen_map.py
- Remove unused HTMLParser import in scrape_idnes.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 21:14:48 +01:00
14 changed files with 952 additions and 267 deletions

124
CLAUDE.md Normal file
View File

@@ -0,0 +1,124 @@
# Maru hledá byt
Projekt pro hledání bytů v Praze. Scrapuje inzeráty ze 7 realitních portálů, filtruje, deduplikuje a generuje interaktivní mapu.
**Jazyk komunikace:** Čeština (uživatelka Marie). Kód a komentáře v kódu jsou mix CZ/EN.
## Architektura
```
run_all.sh (orchestrátor)
├─ scrape_and_map.py → byty_sreality.json (Sreality API)
├─ scrape_bezrealitky.py → byty_bezrealitky.json (HTML Apollo cache)
├─ scrape_idnes.py → byty_idnes.json (HTML regex)
├─ scrape_psn.py } → byty_psn.json (React API + curl)
├─ scrape_cityhome.py } → byty_cityhome.json (HTML tabulky)
├─ scrape_bazos.py → byty_bazos.json (HTML regex)
└─ scrape_realingo.py → byty_realingo.json (Next.js __NEXT_DATA__)
merge_and_map.py
├─ byty_merged.json (deduplikovaná data)
└─ mapa_bytu.html (Leaflet.js mapa)
generate_status.py → status.json + scraper_history.json
server.py (port 8080) → servíruje mapu + status page + ratings API
```
## Filtry (společné všem scraperům)
| Parametr | Hodnota | Poznámka |
|----------|---------|----------|
| Max cena | 13.5M Kč (Sreality/Realingo/Bezrealitky/iDNES), 14M Kč (PSN/CityHome/Bazoš) | Rozdíl je záměrný |
| Min plocha | 69 m² | |
| Min patro | 2. NP | 2. NP se na mapě označí varováním |
| Dispozice | 3+kk, 3+1, 4+kk, 4+1, 5+kk, 5+1, 6+ | |
| Region | Praha | |
| Vyloučit | panelové domy, sídliště | regex v popisu/polích |
## Klíčové soubory
- **scrape_and_map.py** — Sreality scraper + `generate_map()` funkce (sdílená, generuje HTML mapu)
- **merge_and_map.py** — sloučí 7 JSON zdrojů, deduplikuje (klíč: ulice + cena + plocha), volá `generate_map()`
- **scraper_stats.py** — utility: `validate_listing()` (validace povinných polí + GPS bounds) a `write_stats()`
- **generate_status.py** — generuje status.json a scraper_history.json z výstupů scraperů
- **server.py** — HTTP server (port 8080), endpointy: `/mapa_bytu.html`, `/scrapers-status`, `/api/ratings`, `/api/status`
- **run_all.sh** — orchestrátor, spouští scrapery postupně (PSN+CityHome paralelně), pak merge + status
## Mapa (mapa_bytu.html)
- Leaflet.js + CARTO tiles
- Barvy markerů podle ceny/m² (modrá < 110k → červená > 165k, šedá = neuvedeno)
- PSN/CityHome = srdíčkové markery (❤️)
- Nové inzeráty (≤ 1 den) = žlutý badge "NEW"
- Zamítnuté = zprůhledněné + 🚫 SVG overlay
- Oblíbené = hvězdička (⭐)
- Filtry: patro, max cena (input, default 13.5M, max 14M), datum přidání, skrýt zamítnuté, klik na cenový pás
- Ratings uložené v localStorage + sync na server `/api/ratings`
## Barvy zdrojů na mapě
```python
source_colors = {
"sreality": "#1976D2", # modrá
"realingo": "#00897B", # teal
"bezrealitky": "#E91E63", # růžová
"idnes": "#FF6F00", # oranžová
"psn": "#D32F2F", # červená
"cityhome": "#D32F2F", # červená
"bazos": "#7B1FA2", # fialová
}
```
## Deduplikace (merge_and_map.py)
- Klíč: `normalize_street(locality) + price + area`
- Normalizace ulice: první část před čárkou, lowercase, odstranění diakritiky, jen alfanumerické znaky
- PSN a CityHome mají prioritu (načtou se první)
## Vývoj
- **Git remote:** `https://gitea.home.hrajfrisbee.cz/littlemeat/maru-hleda-byt.git`
- **Gitea API token:** uložen v `.claude/settings.local.json`
- **Python 3.9+** kompatibilita (`from __future__ import annotations`)
- **Žádné pip závislosti** — jen stdlib (urllib, json, re, logging, pathlib, subprocess)
- **Docker:** `build/Dockerfile` (python:3.13-alpine), cron každé 4 hodiny
- Generované soubory (`byty_*.json`, `mapa_bytu.html`, `*.log`) jsou v `.gitignore`
## Typické úlohy
```bash
# Rychlý test scraperu
python3 scrape_bazos.py --max-pages 1 --max-properties 5 --log-level DEBUG
# Lokální validace (všechny scrapery s limity)
make validation-local
# Vygenerovat mapu z existujících dat
python3 merge_and_map.py
# Spustit server
python3 server.py # nebo: make serve
# Plný scrape
./run_all.sh
```
## Pořadí scraperů v run_all.sh
1. Sreality
2. Bezrealitky
3. iDNES
4. PSN + CityHome (paralelně)
5. Bazoš
6. Realingo (poslední — uživatelka ho nemá ráda)
7. Merge + mapa
8. Status generování
## Konvence
- Commit messages v angličtině, PR popis v angličtině
- Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- PRy přes Gitea API (viz create_pr.sh pattern v historii)
- Nové scrapery kopírují vzor z `scrape_bezrealitky.py`
- Každý scraper má argparse s `--max-pages`, `--max-properties`, `--log-level`

View File

@@ -83,10 +83,6 @@ Merges all `byty_*.json` files into `byty_merged.json` and generates `mapa_bytu.
**Deduplication logic:** Two listings are considered duplicates if they share the same normalized street name + price + area. PSN and CityHome have priority during dedup (loaded first), so their listings are kept over duplicates from other portals.
### `regen_map.py`
Regenerates the map from existing `byty_sreality.json` data without re-scraping. Fetches missing area values from the Sreality API, fixes URLs, and re-applies the area filter. Useful for tweaking map output after data has already been collected.
## Interactive map (`mapa_bytu.html`)
The generated map is a standalone HTML file using Leaflet.js with CARTO basemap tiles. Features:
@@ -151,7 +147,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
│ PID 1: python3 -m http.server :8080 │
│ serves /app/data/ │
│ │
│ crond: runs run_all.sh at 06:00/18:00
│ crond: runs run_all.sh every 4 hours
│ Europe/Prague timezone │
│ │
│ /app/ -- scripts (.py, .sh) │
@@ -160,7 +156,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
└─────────────────────────────────────────┘
```
On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place twice daily at 06:00 and 18:00 CET/CEST.
On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place every 4 hours.
### Quick start
@@ -201,14 +197,13 @@ Validation targets run scrapers with `--max-pages 1 --max-properties 10` for a f
├── scrape_psn.py # PSN scraper
├── scrape_cityhome.py # CityHome scraper
├── merge_and_map.py # Merge all sources + generate final map
├── regen_map.py # Regenerate map from cached Sreality data
├── run_all.sh # Orchestrator script (runs all scrapers + merge)
├── mapa_bytu.html # Generated interactive map (output)
├── Makefile # Docker management + validation shortcuts
├── build/
│ ├── Dockerfile # Container image definition (python:3.13-alpine)
│ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape)
│ ├── crontab # Cron schedule (06:00 and 18:00 CET)
│ ├── crontab # Cron schedule (every 4 hours)
│ └── CONTAINER.md # Container-specific documentation
└── .gitignore # Ignores byty_*.json, __pycache__, .vscode
```

View File

@@ -11,7 +11,7 @@ WORKDIR /app
COPY scrape_and_map.py scrape_realingo.py scrape_bezrealitky.py \
scrape_idnes.py scrape_psn.py scrape_cityhome.py \
merge_and_map.py regen_map.py generate_status.py scraper_stats.py \
merge_and_map.py generate_status.py scraper_stats.py \
run_all.sh server.py ./
COPY build/crontab /etc/crontabs/root

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN a CityHome,
Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN, CityHome a Bazoše,
deduplikuje a vygeneruje mapu.
Deduplikace: stejná ulice (z locality) + stejná cena + stejná plocha = duplikát.
PSN a CityHome mají při deduplikaci prioritu (načtou se první).
@@ -9,6 +9,7 @@ from __future__ import annotations
import json
import re
import unicodedata
from pathlib import Path
from scrape_and_map import generate_map, format_price
@@ -19,14 +20,8 @@ def normalize_street(locality: str) -> str:
# "Studentská, Praha 6 - Dejvice" → "studentska"
# "Rýnská, Praha" → "rynska"
street = locality.split(",")[0].strip().lower()
# Remove diacritics (simple Czech)
replacements = {
"á": "a", "č": "c", "ď": "d", "é": "e", "ě": "e",
"í": "i", "ň": "n", "ó": "o", "ř": "r", "š": "s",
"ť": "t", "ú": "u", "ů": "u", "ý": "y", "ž": "z",
}
for src, dst in replacements.items():
street = street.replace(src, dst)
# Remove diacritics using Unicode decomposition (handles all Czech characters)
street = unicodedata.normalize("NFKD", street).encode("ascii", "ignore").decode("ascii")
# Remove non-alphanumeric
street = re.sub(r"[^a-z0-9]", "", street)
return street
@@ -49,6 +44,7 @@ def main():
("Realingo", "byty_realingo.json"),
("Bezrealitky", "byty_bezrealitky.json"),
("iDNES", "byty_idnes.json"),
("Bazoš", "byty_bazos.json"),
]
all_estates = []

View File

@@ -1,114 +0,0 @@
#!/usr/bin/env python3
"""
Přegeneruje mapu z již stažených dat (byty_sreality.json).
Doplní chybějící plochy ze Sreality API, opraví URL, aplikuje filtry.
"""
from __future__ import annotations
import json
import time
import urllib.request
from pathlib import Path
from scrape_and_map import (
generate_map, format_price, MIN_AREA, HEADERS, DETAIL_API
)
def api_get(url: str) -> dict:
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def fix_sreality_url(estate: dict) -> str:
"""Fix the Sreality URL to include disposition segment (only if missing)."""
disp = estate.get("disposition", "")
slug_map = {
"1+kk": "1+kk", "1+1": "1+1", "2+kk": "2+kk", "2+1": "2+1",
"3+kk": "3+kk", "3+1": "3+1", "4+kk": "4+kk", "4+1": "4+1",
"5+kk": "5+kk", "5+1": "5+1", "6+": "6-a-vice", "Atypický": "atypicky",
}
slug = slug_map.get(disp, "byt")
old_url = estate.get("url", "")
parts = old_url.split("/")
try:
byt_idx = parts.index("byt")
# Only insert if disposition slug is not already there
if byt_idx + 1 < len(parts) and parts[byt_idx + 1] == slug:
return old_url # already correct
parts.insert(byt_idx + 1, slug)
return "/".join(parts)
except ValueError:
return old_url
def fetch_area(hash_id: int) -> int | None:
"""Fetch area from detail API."""
try:
url = DETAIL_API.format(hash_id)
detail = api_get(url)
for item in detail.get("items", []):
name = item.get("name", "")
if "žitná ploch" in name or "zitna ploch" in name.lower():
return int(item["value"])
except Exception:
pass
return None
def main():
json_path = Path("byty_sreality.json")
if not json_path.exists():
print("Soubor byty_sreality.json nenalezen. Nejprve spusť scrape_and_map.py")
return
estates = json.loads(json_path.read_text(encoding="utf-8"))
print(f"Načteno {len(estates)} bytů z byty_sreality.json")
# Step 1: Fetch missing areas
missing_area = [e for e in estates if e.get("area") is None]
print(f"Doplňuji plochu u {len(missing_area)} bytů...")
for i, e in enumerate(missing_area):
time.sleep(0.3)
area = fetch_area(e["hash_id"])
if area is not None:
e["area"] = area
if (i + 1) % 50 == 0:
print(f" {i + 1}/{len(missing_area)} ...")
# Count results
with_area = sum(1 for e in estates if e.get("area") is not None)
print(f"Plocha doplněna: {with_area}/{len(estates)}")
# Step 2: Fix URLs
for e in estates:
e["url"] = fix_sreality_url(e)
# Step 3: Filter by min area
filtered = []
excluded = 0
for e in estates:
area = e.get("area")
if area is not None and area < MIN_AREA:
excluded += 1
continue
filtered.append(e)
print(f"Vyloučeno (< {MIN_AREA} m²): {excluded}")
print(f"Zbývá: {len(filtered)} bytů")
# Save updated data
filtered_path = Path("byty_sreality.json")
filtered_path.write_text(
json.dumps(filtered, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# Generate map
generate_map(filtered)
if __name__ == "__main__":
main()

View File

@@ -13,7 +13,7 @@ RED='\033[0;31m'
BOLD='\033[1m'
NC='\033[0m'
TOTAL=6
TOTAL=7
CURRENT=0
FAILED=0
START_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
@@ -84,9 +84,6 @@ exec > >(tee -a "$LOG_FILE") 2>&1
step "Sreality"
python3 scrape_and_map.py $SCRAPER_ARGS || { echo -e "${RED}✗ Sreality selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bezrealitky"
python3 scrape_bezrealitky.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bezrealitky selhalo${NC}"; FAILED=$((FAILED + 1)); }
@@ -101,6 +98,12 @@ PID_CH=$!
wait $PID_PSN || { echo -e "${RED}✗ PSN selhalo${NC}"; FAILED=$((FAILED + 1)); }
wait $PID_CH || { echo -e "${RED}✗ CityHome selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bazoš"
python3 scrape_bazos.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bazoš selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
# ── Sloučení + mapa ──────────────────────────────────────────
step "Sloučení dat a generování mapy"
@@ -117,7 +120,7 @@ python3 generate_status.py --start-time "$START_TIME" --duration "$DURATION" $KE
echo ""
echo "============================================================"
if [ $FAILED -eq 0 ]; then
echo -e "${GREEN}${BOLD}Hotovo! Všech 6 zdrojů úspěšně staženo.${NC}"
echo -e "${GREEN}${BOLD}Hotovo! Všech 7 zdrojů úspěšně staženo.${NC}"
else
echo -e "${RED}${BOLD}Hotovo s $FAILED chybami.${NC}"
fi

View File

@@ -13,9 +13,9 @@ import math
import time
import urllib.request
import urllib.parse
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_sreality.json"
@@ -45,9 +45,9 @@ HEADERS = {
def api_get(url: str) -> dict:
"""Fetch JSON from Sreality API."""
logger.debug(f"HTTP GET request: {url}")
logger.debug(f"Headers: {HEADERS}")
"""Fetch JSON from Sreality API with retry."""
for attempt in range(3):
logger.debug(f"HTTP GET request (attempt {attempt + 1}/3): {url}")
req = urllib.request.Request(url, headers=HEADERS)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
@@ -55,8 +55,15 @@ def api_get(url: str) -> dict:
logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes")
logger.debug(f"Response preview: {response_data[:200]}")
return json.loads(response_data)
except urllib.error.HTTPError:
raise
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
if attempt < 2:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/3 after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after 3 attempts: {e}", exc_info=True)
raise
@@ -356,6 +363,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
}
if not validate_listing(result, "sreality"):
continue
results.append(result)
details_fetched += 1
@@ -448,9 +457,13 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
price_legend_items += (
'<div style="display:flex;align-items:center;gap:6px;margin:6px 0 0 0;'
'padding-top:6px;border-top:1px solid #eee;">'
'<span style="width:18px;height:18px;border-radius:50%;background:#66BB6A;'
'display:inline-block;box-shadow:0 1px 4px rgba(0,0,0,0.35);flex-shrink:0;"></span>'
'<span>Nové (z dnešního scrapu) — větší</span></div>'
'<span style="display:inline-flex;align-items:center;gap:3px;flex-shrink:0;">'
'<span style="width:14px;height:14px;border-radius:50%;background:#66BB6A;'
'display:inline-block;box-shadow:0 1px 3px rgba(0,0,0,0.3);"></span>'
'<span style="font-size:8px;font-weight:700;background:#FFD600;color:#333;'
'padding:1px 3px;border-radius:2px;">NEW</span>'
'</span>'
'<span>Nové (≤ 1 den)</span></div>'
)
markers_js = ""
@@ -467,16 +480,18 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
floor_note = '<br><span style="color:#FF9800;font-weight:bold;">⚠ 2. NP — zvážit klidnost lokality</span>'
source = e.get("source", "sreality")
source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome"}
source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F"}
source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome", "bazos": "Bazoš"}
source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F", "bazos": "#7B1FA2"}
source_label = source_labels.get(source, source)
source_color = source_colors.get(source, "#999")
hash_id = e.get("hash_id", "")
hash_id = f"{source}_{e.get('hash_id', '')}"
first_seen = e.get("first_seen", "")
last_changed = e.get("last_changed", "")
is_new = first_seen == datetime.now().strftime("%Y-%m-%d")
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
is_new = first_seen in (today, yesterday)
new_badge = (
'<span style="margin-left:6px;font-size:11px;background:#FFD600;color:#333;'
@@ -603,12 +618,12 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
.heart-icon-fav svg path {{ stroke: gold !important; stroke-width: 2.5 !important; filter: drop-shadow(0 0 4px rgba(255,193,7,0.7)); }}
.heart-icon-rej {{ opacity: 0.4 !important; filter: grayscale(1); }}
.reject-overlay {{ background: none !important; border: none !important; pointer-events: none !important; }}
@keyframes pulse-new {{
0% {{ stroke-opacity: 1; stroke-width: 3px; r: 11; }}
50% {{ stroke-opacity: 0.4; stroke-width: 6px; r: 12; }}
100% {{ stroke-opacity: 1; stroke-width: 3px; r: 11; }}
.new-badge-icon {{ background: none !important; border: none !important; pointer-events: none !important; }}
.new-badge {{
font-size: 9px; font-weight: 700; color: #333; background: #FFD600;
padding: 1px 4px; border-radius: 3px; white-space: nowrap;
box-shadow: 0 1px 3px rgba(0,0,0,0.3); letter-spacing: 0.5px;
}}
.marker-new {{ animation: pulse-new 2s ease-in-out infinite; }}
.info-panel {{
position: absolute; top: 10px; right: 10px; z-index: 1000;
background: white; padding: 16px; border-radius: 10px;
@@ -683,12 +698,9 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
</div>
<div style="margin-top:6px;">
<label>Max cena:
<select id="max-price" onchange="applyFilters()">
<option value="13500000">13 500 000 Kč</option>
<option value="12000000">12 000 000 Kč</option>
<option value="10000000">10 000 000 Kč</option>
<option value="8000000">8 000 000 Kč</option>
</select>
<input type="number" id="max-price" value="13500000" max="14000000" step="500000"
style="width:130px;padding:2px 4px;border:1px solid #ccc;border-radius:3px;"
onchange="applyFilters()" onkeyup="applyFilters()"> Kč
</label>
</div>
<div style="margin-top:6px;">
@@ -784,19 +796,28 @@ function addMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{
function addNewMarker(lat, lon, color, popup, hashId, firstSeen, lastChanged) {{
var marker = L.circleMarker([lat, lon], {{
radius: 12,
radius: 8,
fillColor: color,
color: color,
weight: 4,
opacity: 0.35,
fillOpacity: 0.95,
color: '#fff',
weight: 2,
opacity: 1,
fillOpacity: 0.85,
}}).bindPopup(popup);
marker._data = {{ lat: lat, lon: lon, color: color, hashId: hashId, isNew: true, firstSeen: firstSeen || '', lastChanged: lastChanged || '' }};
allMarkers.push(marker);
marker.addTo(map);
marker.on('add', function() {{
if (marker._path) marker._path.classList.add('marker-new');
var badge = L.marker([lat, lon], {{
icon: L.divIcon({{
className: 'new-badge-icon',
html: '<span class="new-badge">NEW</span>',
iconSize: [32, 14],
iconAnchor: [-6, 7],
}}),
interactive: false,
pane: 'markerPane',
}});
badge.addTo(map);
marker._newBadge = badge;
}}
function heartIcon(color) {{
@@ -852,6 +873,11 @@ function loadRatings() {{
function saveRatings(ratings) {{
localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings));
fetch('/api/ratings', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify(ratings)
}}).catch(function() {{}});
}}
function addRejectStrike(marker) {{
@@ -899,6 +925,7 @@ function applyMarkerStyle(marker, status) {{
}} else {{
if (status === 'fav') {{
removeRejectStrike(marker);
if (marker._newBadge && map.hasLayer(marker._newBadge)) map.removeLayer(marker._newBadge);
if (!marker._data._origCircle) marker._data._origCircle = true;
var popup = marker.getPopup();
var popupContent = popup ? popup.getContent() : '';
@@ -922,6 +949,7 @@ function applyMarkerStyle(marker, status) {{
}}
// Add strikethrough line over the marker
addRejectStrike(marker);
if (marker._newBadge && map.hasLayer(marker._newBadge)) map.removeLayer(marker._newBadge);
}} else {{
if (marker._data._origCircle && !(marker instanceof L.CircleMarker)) {{
revertToCircle(marker, {{ radius: 8, fillColor: marker._data.color, color: '#fff', weight: 2, fillOpacity: 0.85 }});
@@ -934,6 +962,7 @@ function applyMarkerStyle(marker, status) {{
}}
if (marker._path) marker._path.classList.remove('marker-rejected');
removeRejectStrike(marker);
if (marker._newBadge && !map.hasLayer(marker._newBadge)) marker._newBadge.addTo(map);
}}
}}
}}
@@ -1089,7 +1118,9 @@ map.on('popupopen', function(e) {{
// ── Filters ────────────────────────────────────────────────────
function applyFilters() {{
var minFloor = parseInt(document.getElementById('min-floor').value);
var maxPrice = parseInt(document.getElementById('max-price').value);
var maxPriceEl = document.getElementById('max-price');
var maxPrice = parseInt(maxPriceEl.value) || 14000000;
if (maxPrice > 14000000) {{ maxPrice = 14000000; maxPriceEl.value = 14000000; }}
var hideRejected = document.getElementById('hide-rejected').checked;
var daysFilter = parseInt(document.getElementById('days-filter').value) || 0;
var ratings = loadRatings();
@@ -1130,10 +1161,12 @@ function applyFilters() {{
visible++;
// Show strike line if rejected and visible
if (m._rejectStrike && !map.hasLayer(m._rejectStrike)) m._rejectStrike.addTo(map);
if (m._newBadge && !map.hasLayer(m._newBadge)) m._newBadge.addTo(map);
}} else {{
if (map.hasLayer(m)) map.removeLayer(m);
// Hide strike line when marker hidden
if (m._rejectStrike && map.hasLayer(m._rejectStrike)) map.removeLayer(m._rejectStrike);
if (m._newBadge && map.hasLayer(m._newBadge)) map.removeLayer(m._newBadge);
}}
}});
@@ -1148,8 +1181,25 @@ function applyFilters() {{
document.getElementById('visible-count').textContent = visible;
}}
// Initialize ratings on load
// Initialize ratings: load from server, merge with localStorage, then restore
function initRatings() {{
var local = loadRatings();
fetch('/api/ratings')
.then(function(r) {{ return r.ok ? r.json() : null; }})
.then(function(server) {{
if (server && typeof server === 'object') {{
var merged = Object.assign({{}}, local, server);
localStorage.setItem(RATINGS_KEY, JSON.stringify(merged));
}}
restoreRatings();
updateRatingCounts();
}})
.catch(function() {{
restoreRatings();
updateRatingCounts();
}});
}}
initRatings();
// ── Panel toggle ──────────────────────────────────────────────
function togglePanel() {{

560
scrape_bazos.py Normal file
View File

@@ -0,0 +1,560 @@
#!/usr/bin/env python3
"""
Bazoš.cz scraper.
Stáhne byty na prodej v Praze a vyfiltruje podle kritérií.
Výstup: byty_bazos.json
"""
from __future__ import annotations
import argparse
from datetime import datetime
import json
import logging
import math
import re
import time
import urllib.request
import urllib.parse
from pathlib import Path
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bazos.json"
logger = logging.getLogger(__name__)
# ── Konfigurace ─────────────────────────────────────────────────────────────
MAX_PRICE = 14_000_000
MIN_AREA = 69
MIN_FLOOR = 2
PER_PAGE = 20 # Bazoš vrací 20 na stránku
WANTED_DISPOSITIONS = {"3+kk", "3+1", "4+kk", "4+1", "5+kk", "5+1", "6+kk", "6+1"}
# Regex patterns pro parsování dispozice, plochy a patra z textu
DISP_RE = re.compile(r'(\d)\s*\+\s*(kk|1)', re.IGNORECASE)
AREA_RE = re.compile(r'(\d+(?:[.,]\d+)?)\s*m[²2\s,.]', re.IGNORECASE)
FLOOR_RE = re.compile(r'(\d+)\s*[./]\s*(\d+)\s*(?:NP|patr|podlaž|floor)', re.IGNORECASE)
FLOOR_RE2 = re.compile(r'(\d+)\.\s*(?:NP|patr[eouě]|podlaž[ií])', re.IGNORECASE)
FLOOR_RE3 = re.compile(r'(?:patr[eouě]|podlaž[ií]|NP)\s*[:\s]*(\d+)', re.IGNORECASE)
PANEL_RE = re.compile(r'panel(?:ov|ák|\.)', re.IGNORECASE)
SIDLISTE_RE = re.compile(r'sídliště|sidliste|panelák', re.IGNORECASE)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "cs,en;q=0.9",
}
BASE_URL = "https://reality.bazos.cz"
SEARCH_PARAMS = "hledat=&rubriky=reality&hlokalita=Praha&humkreis=25&cenado={max_price}&kitx=ano"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8", errors="replace")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 3
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def format_price(price: int) -> str:
s = str(price)
parts = []
while s:
parts.append(s[-3:])
s = s[:-3]
return " ".join(reversed(parts)) + ""
def parse_price(text: str) -> int:
"""Parse price from text like '5 250 000 Kč' → 5250000."""
cleaned = re.sub(r'[^\d]', '', text)
return int(cleaned) if cleaned else 0
def parse_disposition(text: str) -> str | None:
"""Parse disposition from title/description like '3+kk', '4+1'."""
m = DISP_RE.search(text)
if m:
rooms = m.group(1)
suffix = m.group(2).lower()
return f"{rooms}+{suffix}"
return None
def parse_area(text: str) -> float | None:
"""Parse area from text like '82 m²' → 82.0."""
m = AREA_RE.search(text)
if m:
return float(m.group(1).replace(',', '.'))
return None
def parse_floor(text: str) -> int | None:
"""Parse floor number from description."""
for pattern in [FLOOR_RE, FLOOR_RE2, FLOOR_RE3]:
m = pattern.search(text)
if m:
return int(m.group(1))
return None
def is_panel(text: str) -> bool:
"""Check if description mentions panel construction."""
return bool(PANEL_RE.search(text))
def is_sidliste(text: str) -> bool:
"""Check if description mentions housing estate."""
return bool(SIDLISTE_RE.search(text))
def fetch_listing_page(offset: int = 0, pagination_params: str | None = None) -> tuple[list[dict], int, str | None]:
"""
Fetch a page of listings from Bazoš.
Returns (list of basic listing dicts, total count, pagination_params for next pages).
"""
if pagination_params and offset > 0:
# Use resolved numeric params from first page's pagination links
url = f"{BASE_URL}/prodam/byt/{offset}/?{pagination_params}"
else:
params = SEARCH_PARAMS.format(max_price=MAX_PRICE)
if offset > 0:
url = f"{BASE_URL}/prodam/byt/{offset}/?{params}"
else:
url = f"{BASE_URL}/prodam/byt/?{params}"
html = fetch_url(url)
# Parse total count: "Zobrazeno 1-20 z 727"
total = 0
total_match = re.search(r'z\s+([\d\s]+)\s', html)
if total_match:
total = int(total_match.group(1).replace(' ', ''))
# Extract resolved pagination params from first page (Bazoš converts
# hlokalita=Praha → hlokalita=11000, and pagination only works with numeric form)
resolved_params = None
pag_link = re.search(r'href="/prodam/byt/\d+/\?([^"]+)"', html)
if pag_link:
resolved_params = pag_link.group(1)
# Parse listings — split by listing blocks (class="inzeraty inzeratyflex")
listings = []
all_blocks = re.split(r'<div class="inzeraty\s+inzeratyflex">', html)[1:] # skip before first
for block in all_blocks:
# Extract URL and ID from first link (/inzerat/XXXXXX/slug.php)
url_match = re.search(r'href="(/inzerat/(\d+)/[^"]*)"', block)
if not url_match:
continue
detail_path = url_match.group(1)
listing_id = int(url_match.group(2))
# Title — class=nadpis (without quotes) or class="nadpis"
title_match = re.search(r'class=.?nadpis.?[^>]*>\s*<a[^>]*>([^<]+)</a>', block)
title = title_match.group(1).strip() if title_match else ""
# Price — inside <span translate="no"> within inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>.*?<span[^>]*>([^<]+)</span>', block, re.DOTALL)
if not price_match:
# Fallback: direct text in inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>\s*(?:<b>)?([^<]+)', block)
price_text = price_match.group(1).strip() if price_match else ""
price = parse_price(price_text)
# Location
loc_match = re.search(r'class="inzeratylok"[^>]*>(.*?)</div>', block, re.DOTALL)
location = ""
if loc_match:
location = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
location = re.sub(r'\s+', ' ', location)
# Date — [5.3. 2026]
date_match = re.search(r'\[(\d+\.\d+\.\s*\d{4})\]', block)
date_str = date_match.group(1).strip() if date_match else ""
# Description preview — class=popis (without quotes) or class="popis"
desc_match = re.search(r'class=.?popis.?[^>]*>(.*?)</div>', block, re.DOTALL)
description = ""
if desc_match:
description = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
description = re.sub(r'\s+', ' ', description)
# Image — <img ... class="obrazek" ... src="...">
img_match = re.search(r'<img[^>]*src="([^"]+)"[^>]*class="obrazek"', block)
if not img_match:
img_match = re.search(r'class="obrazek"[^>]*src="([^"]+)"', block)
image = img_match.group(1) if img_match else ""
if "empty.gif" in image:
image = ""
listings.append({
"id": listing_id,
"title": title,
"price": price,
"location": location,
"date": date_str,
"description": description,
"detail_path": detail_path,
"image": image,
})
logger.debug(f"Offset {offset}: found {len(listings)} listings, total={total}")
return listings, total, resolved_params
def fetch_detail(path: str) -> dict | None:
"""Fetch listing detail page and extract GPS, full description."""
try:
url = f"{BASE_URL}{path}"
html = fetch_url(url)
result = {}
# GPS from Google Maps link
gps_match = re.search(r'google\.com/maps[^"]*place/([\d.]+),([\d.]+)', html)
if gps_match:
result["lat"] = float(gps_match.group(1))
result["lon"] = float(gps_match.group(2))
# Full description — Bazoš uses unquoted class=popisdetail
desc_match = re.search(r'class=.?popisdetail.?[^>]*>(.*?)</div>', html, re.DOTALL)
if desc_match:
desc = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
desc = re.sub(r'\s+', ' ', desc)
result["description"] = desc
# Location from detail
loc_match = re.search(r'Lokalita:</td>\s*<td[^>]*>(.*?)</td>', html, re.DOTALL)
if loc_match:
loc = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
loc = re.sub(r'\s+', ' ', loc)
result["detail_location"] = loc
return result
except Exception as e:
logger.warning(f"Detail fetch failed for {path}: {e}")
return None
def load_cache(json_path: str = "byty_bazos.json") -> dict[int, dict]:
"""Load previously scraped data as cache keyed by hash_id."""
path = Path(json_path)
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return {e["hash_id"]: e for e in data if "hash_id" in e}
except (json.JSONDecodeError, KeyError):
return {}
def scrape(max_pages: int | None = None, max_properties: int | None = None):
_run_start = time.time()
_run_ts = datetime.now().isoformat(timespec="seconds")
cache = load_cache()
today = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info("Stahuji inzeráty z Bazoš.cz")
logger.info(f"Cena: do {format_price(MAX_PRICE)}")
logger.info(f"Min. plocha: {MIN_AREA}")
logger.info(f"Patro: od {MIN_FLOOR}. NP")
logger.info(f"Region: Praha")
if cache:
logger.info(f"Cache: {len(cache)} bytů z minulého běhu")
if max_pages:
logger.info(f"Max. stran: {max_pages}")
if max_properties:
logger.info(f"Max. bytů: {max_properties}")
logger.info("=" * 60)
# Step 1: Fetch listing pages
logger.info("\nFáze 1: Stahování seznamu inzerátů...")
all_listings = {} # id -> listing dict (dedup)
page = 1
offset = 0
total = None
pagination_params = None # resolved numeric params from first page
while True:
if max_pages and page > max_pages:
logger.debug(f"Max pages limit reached: {max_pages}")
break
logger.info(f"Strana {page} (offset {offset}) ...")
listings, total_count, resolved = fetch_listing_page(offset, pagination_params)
if resolved and not pagination_params:
pagination_params = resolved
logger.debug(f"Resolved pagination params: {pagination_params}")
if total is None and total_count > 0:
total = total_count
total_pages = math.ceil(total / PER_PAGE)
logger.info(f"→ Celkem {total} inzerátů, ~{total_pages} stran")
if not listings:
logger.debug(f"No listings found on page {page}, stopping")
break
for lst in listings:
lid = lst["id"]
if lid not in all_listings:
all_listings[lid] = lst
page += 1
offset += PER_PAGE
if total and offset >= total:
break
time.sleep(0.5)
logger.info(f"\nStaženo: {len(all_listings)} unikátních inzerátů")
# Step 2: Pre-filter by disposition, price, area from listing data
pre_filtered = []
excluded_disp = 0
excluded_price = 0
excluded_area = 0
excluded_no_disp = 0
for lst in all_listings.values():
title_and_desc = f"{lst['title']} {lst['description']}"
# Parse disposition
disp = parse_disposition(title_and_desc)
if not disp:
excluded_no_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (no disposition found in '{lst['title']}')")
continue
if disp not in WANTED_DISPOSITIONS:
excluded_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (disposition {disp})")
continue
# Price
price = lst["price"]
if price <= 0 or price > MAX_PRICE:
excluded_price += 1
logger.debug(f"Filter: id={lst['id']} - excluded (price {price})")
continue
# Area (if parseable from listing)
area = parse_area(title_and_desc)
if area is not None and area < MIN_AREA:
excluded_area += 1
logger.debug(f"Filter: id={lst['id']} - excluded (area {area} m²)")
continue
lst["_disposition"] = disp
lst["_area"] = area
pre_filtered.append(lst)
logger.info(f"\nPo předfiltraci:")
logger.info(f" Vyloučeno (bez dispozice): {excluded_no_disp}")
logger.info(f" Vyloučeno (dispozice): {excluded_disp}")
logger.info(f" Vyloučeno (cena): {excluded_price}")
logger.info(f" Vyloučeno (plocha): {excluded_area}")
logger.info(f" Zbývá: {len(pre_filtered)}")
# Step 3: Fetch details (for GPS + full description)
logger.info(f"\nFáze 2: Stahování detailů ({len(pre_filtered)} bytů)...")
results = []
excluded_panel = 0
excluded_floor = 0
excluded_no_gps = 0
excluded_detail = 0
excluded_area_detail = 0
cache_hits = 0
properties_fetched = 0
for i, lst in enumerate(pre_filtered):
if max_properties and properties_fetched >= max_properties:
logger.debug(f"Max properties limit reached: {max_properties}")
break
listing_id = lst["id"]
price = lst["price"]
# Check cache
cached = cache.get(listing_id)
if cached and cached.get("price") == price:
cache_hits += 1
logger.debug(f"Cache hit for id={listing_id}")
results.append(cached)
continue
time.sleep(0.4)
detail = fetch_detail(lst["detail_path"])
if not detail:
excluded_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (detail fetch failed)")
continue
# GPS required
lat = detail.get("lat")
lon = detail.get("lon")
if not lat or not lon:
excluded_no_gps += 1
logger.debug(f"Filter: id={listing_id} - excluded (no GPS)")
continue
# Full text for filtering
full_desc = detail.get("description", "")
full_text = f"{lst['title']} {lst['description']} {full_desc}"
# Panel check
if is_panel(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: panelová stavba")
continue
# Sídliště check
if is_sidliste(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: sídliště")
continue
# Floor
floor = parse_floor(full_text)
if floor is not None and floor < MIN_FLOOR:
excluded_floor += 1
logger.debug(f"Filter: id={listing_id} - excluded (floor {floor})")
continue
# Area — re-check from detail if not found before
area = lst.get("_area") or parse_area(full_desc)
if area is not None and area < MIN_AREA:
excluded_area_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (area {area} m² from detail)")
continue
disp = lst["_disposition"]
locality = detail.get("detail_location") or lst["location"]
result = {
"hash_id": listing_id,
"name": f"Prodej bytu {disp} {int(area) if area else '?'}",
"price": price,
"price_formatted": format_price(price),
"locality": locality,
"lat": lat,
"lon": lon,
"disposition": disp,
"floor": floor,
"area": area,
"building_type": "neuvedeno",
"ownership": "neuvedeno",
"url": f"{BASE_URL}{lst['detail_path']}",
"source": "bazos",
"image": lst.get("image", ""),
"scraped_at": today,
"first_seen": cached.get("first_seen", today) if cached else today,
"last_changed": today if not cached or cached.get("price") != price else cached.get("last_changed", today),
}
if not validate_listing(result, "bazos"):
continue
results.append(result)
properties_fetched += 1
if (i + 1) % 20 == 0:
logger.info(f"Zpracováno {i + 1}/{len(pre_filtered)} ...")
logger.info(f"\n{'=' * 60}")
logger.info(f"Výsledky Bazoš:")
logger.info(f" Předfiltrováno: {len(pre_filtered)}")
logger.info(f" Z cache (přeskočeno): {cache_hits}")
logger.info(f" Vyloučeno (panel/síd): {excluded_panel}")
logger.info(f" Vyloučeno (patro): {excluded_floor}")
logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}")
logger.info(f" Vyloučeno (bez detailu): {excluded_detail}")
logger.info(f" Vyloučeno (plocha det): {excluded_area_detail}")
logger.info(f" ✓ Vyhovující byty: {len(results)}")
logger.info(f"{'=' * 60}")
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - _run_start, 1),
"success": True,
"accepted": len(results),
"fetched": len(all_listings),
"pages": page - 1,
"cache_hits": cache_hits,
"excluded": {
"bez dispozice": excluded_no_disp,
"dispozice": excluded_disp,
"cena": excluded_price,
"plocha": excluded_area + excluded_area_detail,
"bez GPS": excluded_no_gps,
"panel/síd": excluded_panel,
"patro": excluded_floor,
"bez detailu": excluded_detail,
},
})
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape apartments from Bazoš.cz")
parser.add_argument("--max-pages", type=int, default=None,
help="Maximum number of listing pages to scrape")
parser.add_argument("--max-properties", type=int, default=None,
help="Maximum number of properties to fetch details for")
parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level (default: INFO)")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
handlers=[logging.StreamHandler()]
)
_run_ts = datetime.now().isoformat(timespec="seconds")
start = time.time()
try:
estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties)
except Exception as e:
logger.error(f"Scraper failed: {e}", exc_info=True)
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - start, 1),
"success": False,
"accepted": 0,
"fetched": 0,
"error": str(e),
})
raise
if estates:
json_path = Path("byty_bazos.json")
json_path.write_text(
json.dumps(estates, ensure_ascii=False, indent=2),
encoding="utf-8",
)
elapsed = time.time() - start
logger.info(f"\n✓ Data uložena: {json_path.resolve()}")
logger.info(f"⏱ Celkový čas: {elapsed:.0f} s")
else:
logger.info("\nŽádné byty z Bazoše neodpovídají kritériím :(")

View File

@@ -15,7 +15,7 @@ import re
import time
import urllib.request
from pathlib import Path
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bezrealitky.json"
@@ -71,19 +71,35 @@ HEADERS = {
BASE_URL = "https://www.bezrealitky.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_page(page: int) -> tuple[list[dict], int]:
"""
Fetch a listing page from Bezrealitky.
Returns (list of advert dicts from Apollo cache, total count).
"""
url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}"
logger.debug(f"HTTP GET request: {url}")
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
html = fetch_url(url)
match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -113,20 +129,13 @@ def fetch_page(page: int) -> tuple[list[dict], int]:
logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}")
return adverts, total
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(uri: str) -> dict | None:
"""Fetch detail page for a listing."""
try:
url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}"
logger.debug(f"HTTP GET request: {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
html = fetch_url(url)
match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -365,6 +374,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
}
if not validate_listing(result, "bezrealitky"):
continue
results.append(result)
properties_fetched += 1

View File

@@ -14,7 +14,7 @@ import time
import urllib.request
from datetime import datetime
from pathlib import Path
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_cityhome.json"
@@ -375,6 +375,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
}
if not validate_listing(result, "cityhome"):
continue
results.append(result)
properties_fetched += 1

View File

@@ -15,9 +15,8 @@ import re
import time
import urllib.request
import urllib.parse
from html.parser import HTMLParser
from pathlib import Path
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_idnes.json"
@@ -468,6 +467,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
}
if not validate_listing(result, "idnes"):
continue
results.append(result)
properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import time
from datetime import datetime
from pathlib import Path
from urllib.parse import urlencode
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_psn.json"
@@ -38,9 +38,10 @@ BASE_URL = "https://psn.cz"
UNITS_API = f"{BASE_URL}/api/units-list"
def fetch_json(url: str) -> dict:
"""Fetch JSON via curl (urllib SSL may fail on Cloudflare)."""
logger.debug(f"HTTP GET: {url}")
def fetch_json(url: str, retries: int = 3) -> dict:
"""Fetch JSON via curl (urllib SSL may fail on Cloudflare) with retry."""
for attempt in range(retries):
logger.debug(f"HTTP GET (attempt {attempt + 1}/{retries}): {url}")
result = subprocess.run(
["curl", "-s", "-L", "--max-time", "30",
"-H", f"User-Agent: {UA}",
@@ -48,9 +49,14 @@ def fetch_json(url: str) -> dict:
url],
capture_output=True, text=True, timeout=60
)
if result.returncode != 0:
raise RuntimeError(f"curl failed ({result.returncode}): {result.stderr[:200]}")
if result.returncode == 0:
return json.loads(result.stdout)
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"curl failed (retry {attempt + 1}/{retries} after {wait}s): {result.stderr[:200]}")
time.sleep(wait)
else:
raise RuntimeError(f"curl failed after {retries} attempts ({result.returncode}): {result.stderr[:200]}")
def fix_gps(lat, lng):
@@ -255,6 +261,8 @@ def scrape(max_properties: int | None = None):
"first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
}
if not validate_listing(result, "psn"):
continue
results.append(result)
properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import re
import time
import urllib.request
from pathlib import Path
from scraper_stats import write_stats
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_realingo.json"
@@ -56,6 +56,28 @@ HEADERS = {
BASE_URL = "https://www.realingo.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
"""Fetch a page of Prague listings. Returns (items, total_count)."""
if page == 1:
@@ -63,14 +85,7 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
else:
url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/"
logger.debug(f"HTTP GET request: {url}")
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
html = fetch_url(url)
match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL
@@ -83,21 +98,13 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
offer_list = data["props"]["pageProps"]["store"]["offer"]["list"]
logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}")
return offer_list["data"], offer_list["total"]
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(listing_url: str) -> dict | None:
"""Fetch detail page for a listing to get floor, building type, etc."""
try:
url = f"{BASE_URL}{listing_url}"
logger.debug(f"HTTP GET request: {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
html = fetch_url(url)
match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL
@@ -324,6 +331,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"),
}
if not validate_listing(result, "realingo"):
continue
results.append(result)
properties_fetched += 1

View File

@@ -1,13 +1,53 @@
"""Shared utility for writing per-scraper run statistics to JSON."""
"""Shared utilities for scraper run statistics and listing validation."""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
HERE = Path(__file__).parent
DATA_DIR = Path(os.environ.get("DATA_DIR", HERE))
_val_log = logging.getLogger(__name__)
_REQUIRED_FIELDS = ("hash_id", "price", "locality", "lat", "lon", "url", "source")
def validate_listing(listing: dict, context: str = "") -> bool:
"""
Validate a listing dict before it is written to the output JSON.
Returns True if valid, False if the listing should be skipped.
Logs a warning for each invalid listing.
"""
prefix = f"[{context}] " if context else ""
for field in _REQUIRED_FIELDS:
val = listing.get(field)
if val is None or val == "":
_val_log.warning(f"{prefix}Skipping listing — missing field '{field}': {listing.get('hash_id', '?')}")
return False
price = listing.get("price")
if not isinstance(price, (int, float)) or price <= 0:
_val_log.warning(f"{prefix}Skipping listing — invalid price={price!r}: {listing.get('hash_id', '?')}")
return False
lat, lon = listing.get("lat"), listing.get("lon")
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
_val_log.warning(f"{prefix}Skipping listing — non-numeric GPS lat={lat!r} lon={lon!r}: {listing.get('hash_id', '?')}")
return False
if not (47.0 <= lat <= 52.0) or not (12.0 <= lon <= 19.0):
_val_log.warning(f"{prefix}Skipping listing — GPS outside Czech Republic lat={lat} lon={lon}: {listing.get('hash_id', '?')}")
return False
area = listing.get("area")
if area is not None and (not isinstance(area, (int, float)) or area <= 0):
_val_log.warning(f"{prefix}Skipping listing — invalid area={area!r}: {listing.get('hash_id', '?')}")
return False
return True
def write_stats(filename: str, stats: dict) -> None:
"""Write scraper run stats dict to the data directory."""