7 Commits

Author SHA1 Message Date
59ef3274b6 Add CLAUDE.md project documentation for session context
Provides automatic context loading for new Claude Code sessions,
documenting architecture, filters, sources, and conventions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:58:01 +01:00
27e5b05f88 Add Bazoš.cz as new apartment scraper source
New scraper for reality.bazos.cz with full HTML parsing (no API),
GPS extraction from Google Maps links, panel/sídliště filtering,
floor/area parsing from free text, and pagination fix for Bazoš's
numeric locality codes. Integrated into merge pipeline and map
with purple (#7B1FA2) markers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 09:47:37 +01:00
63663e4b6b Merge pull request 'Move Realingo scraper to run last' (#6) from fix/scraper-order into main
All checks were successful
Build and Push / build (push) Successful in 6s
Reviewed-on: #6
2026-02-27 21:19:29 +00:00
8c052840cd Move Realingo scraper to run last in pipeline
Reorder scrapers: Sreality → Bezrealitky → iDNES → PSN+CityHome → Realingo → Merge

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 21:35:54 +01:00
39e4b9ce2a Merge pull request 'Reliability improvements and cleanup' (#5) from improve/reliability-and-fixes into main
Reviewed-on: #5
2026-02-27 10:26:04 +00:00
Jan Novak
fd3991f8d6 Remove regen_map.py references from Dockerfile and README
All checks were successful
Build and Push / build (push) Successful in 6s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 10:44:08 +01:00
Jan Novak
27a7834eb6 Reliability improvements: retry logic, validation, ratings sync
Some checks failed
Build and Push / build (push) Failing after 4s
- Add 3-attempt retry with exponential backoff to Sreality, Realingo,
  Bezrealitky, and PSN scrapers (CityHome and iDNES already had it)
- Add shared validate_listing() in scraper_stats.py; all 6 scrapers now
  validate GPS bounds, price, area, and required fields before output
- Wire ratings to server /api/ratings on page load (merge with
  localStorage) and save (async POST); ratings now persist across
  browsers and devices
- Namespace JS hash IDs as {source}_{id} to prevent rating collisions
  between listings from different portals with the same numeric ID
- Replace manual Czech diacritic table with unicodedata.normalize()
  in merge_and_map.py for correct deduplication of all edge cases
- Correct README schedule docs: every 4 hours, not twice daily

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 10:36:37 +01:00
13 changed files with 909 additions and 128 deletions

124
CLAUDE.md Normal file
View File

@@ -0,0 +1,124 @@
# Maru hledá byt
Projekt pro hledání bytů v Praze. Scrapuje inzeráty ze 7 realitních portálů, filtruje, deduplikuje a generuje interaktivní mapu.
**Jazyk komunikace:** Čeština (uživatelka Marie). Kód a komentáře v kódu jsou mix CZ/EN.
## Architektura
```
run_all.sh (orchestrátor)
├─ scrape_and_map.py → byty_sreality.json (Sreality API)
├─ scrape_bezrealitky.py → byty_bezrealitky.json (HTML Apollo cache)
├─ scrape_idnes.py → byty_idnes.json (HTML regex)
├─ scrape_psn.py } → byty_psn.json (React API + curl)
├─ scrape_cityhome.py } → byty_cityhome.json (HTML tabulky)
├─ scrape_bazos.py → byty_bazos.json (HTML regex)
└─ scrape_realingo.py → byty_realingo.json (Next.js __NEXT_DATA__)
merge_and_map.py
├─ byty_merged.json (deduplikovaná data)
└─ mapa_bytu.html (Leaflet.js mapa)
generate_status.py → status.json + scraper_history.json
server.py (port 8080) → servíruje mapu + status page + ratings API
```
## Filtry (společné všem scraperům)
| Parametr | Hodnota | Poznámka |
|----------|---------|----------|
| Max cena | 13.5M Kč (Sreality/Realingo/Bezrealitky/iDNES), 14M Kč (PSN/CityHome/Bazoš) | Rozdíl je záměrný |
| Min plocha | 69 m² | |
| Min patro | 2. NP | 2. NP se na mapě označí varováním |
| Dispozice | 3+kk, 3+1, 4+kk, 4+1, 5+kk, 5+1, 6+ | |
| Region | Praha | |
| Vyloučit | panelové domy, sídliště | regex v popisu/polích |
## Klíčové soubory
- **scrape_and_map.py** — Sreality scraper + `generate_map()` funkce (sdílená, generuje HTML mapu)
- **merge_and_map.py** — sloučí 7 JSON zdrojů, deduplikuje (klíč: ulice + cena + plocha), volá `generate_map()`
- **scraper_stats.py** — utility: `validate_listing()` (validace povinných polí + GPS bounds) a `write_stats()`
- **generate_status.py** — generuje status.json a scraper_history.json z výstupů scraperů
- **server.py** — HTTP server (port 8080), endpointy: `/mapa_bytu.html`, `/scrapers-status`, `/api/ratings`, `/api/status`
- **run_all.sh** — orchestrátor, spouští scrapery postupně (PSN+CityHome paralelně), pak merge + status
## Mapa (mapa_bytu.html)
- Leaflet.js + CARTO tiles
- Barvy markerů podle ceny/m² (modrá < 110k → červená > 165k, šedá = neuvedeno)
- PSN/CityHome = srdíčkové markery (❤️)
- Nové inzeráty (≤ 1 den) = žlutý badge "NEW"
- Zamítnuté = zprůhledněné + 🚫 SVG overlay
- Oblíbené = hvězdička (⭐)
- Filtry: patro, max cena (input, default 13.5M, max 14M), datum přidání, skrýt zamítnuté, klik na cenový pás
- Ratings uložené v localStorage + sync na server `/api/ratings`
## Barvy zdrojů na mapě
```python
source_colors = {
"sreality": "#1976D2", # modrá
"realingo": "#00897B", # teal
"bezrealitky": "#E91E63", # růžová
"idnes": "#FF6F00", # oranžová
"psn": "#D32F2F", # červená
"cityhome": "#D32F2F", # červená
"bazos": "#7B1FA2", # fialová
}
```
## Deduplikace (merge_and_map.py)
- Klíč: `normalize_street(locality) + price + area`
- Normalizace ulice: první část před čárkou, lowercase, odstranění diakritiky, jen alfanumerické znaky
- PSN a CityHome mají prioritu (načtou se první)
## Vývoj
- **Git remote:** `https://gitea.home.hrajfrisbee.cz/littlemeat/maru-hleda-byt.git`
- **Gitea API token:** uložen v `.claude/settings.local.json`
- **Python 3.9+** kompatibilita (`from __future__ import annotations`)
- **Žádné pip závislosti** — jen stdlib (urllib, json, re, logging, pathlib, subprocess)
- **Docker:** `build/Dockerfile` (python:3.13-alpine), cron každé 4 hodiny
- Generované soubory (`byty_*.json`, `mapa_bytu.html`, `*.log`) jsou v `.gitignore`
## Typické úlohy
```bash
# Rychlý test scraperu
python3 scrape_bazos.py --max-pages 1 --max-properties 5 --log-level DEBUG
# Lokální validace (všechny scrapery s limity)
make validation-local
# Vygenerovat mapu z existujících dat
python3 merge_and_map.py
# Spustit server
python3 server.py # nebo: make serve
# Plný scrape
./run_all.sh
```
## Pořadí scraperů v run_all.sh
1. Sreality
2. Bezrealitky
3. iDNES
4. PSN + CityHome (paralelně)
5. Bazoš
6. Realingo (poslední — uživatelka ho nemá ráda)
7. Merge + mapa
8. Status generování
## Konvence
- Commit messages v angličtině, PR popis v angličtině
- Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- PRy přes Gitea API (viz create_pr.sh pattern v historii)
- Nové scrapery kopírují vzor z `scrape_bezrealitky.py`
- Každý scraper má argparse s `--max-pages`, `--max-properties`, `--log-level`

View File

@@ -83,10 +83,6 @@ Merges all `byty_*.json` files into `byty_merged.json` and generates `mapa_bytu.
**Deduplication logic:** Two listings are considered duplicates if they share the same normalized street name + price + area. PSN and CityHome have priority during dedup (loaded first), so their listings are kept over duplicates from other portals. **Deduplication logic:** Two listings are considered duplicates if they share the same normalized street name + price + area. PSN and CityHome have priority during dedup (loaded first), so their listings are kept over duplicates from other portals.
### `regen_map.py`
Regenerates the map from existing `byty_sreality.json` data without re-scraping. Fetches missing area values from the Sreality API, fixes URLs, and re-applies the area filter. Useful for tweaking map output after data has already been collected.
## Interactive map (`mapa_bytu.html`) ## Interactive map (`mapa_bytu.html`)
The generated map is a standalone HTML file using Leaflet.js with CARTO basemap tiles. Features: The generated map is a standalone HTML file using Leaflet.js with CARTO basemap tiles. Features:
@@ -151,7 +147,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
│ PID 1: python3 -m http.server :8080 │ │ PID 1: python3 -m http.server :8080 │
│ serves /app/data/ │ │ serves /app/data/ │
│ │ │ │
│ crond: runs run_all.sh at 06:00/18:00 │ crond: runs run_all.sh every 4 hours
│ Europe/Prague timezone │ │ Europe/Prague timezone │
│ │ │ │
│ /app/ -- scripts (.py, .sh) │ │ /app/ -- scripts (.py, .sh) │
@@ -160,7 +156,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
└─────────────────────────────────────────┘ └─────────────────────────────────────────┘
``` ```
On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place twice daily at 06:00 and 18:00 CET/CEST. On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place every 4 hours.
### Quick start ### Quick start
@@ -201,14 +197,13 @@ Validation targets run scrapers with `--max-pages 1 --max-properties 10` for a f
├── scrape_psn.py # PSN scraper ├── scrape_psn.py # PSN scraper
├── scrape_cityhome.py # CityHome scraper ├── scrape_cityhome.py # CityHome scraper
├── merge_and_map.py # Merge all sources + generate final map ├── merge_and_map.py # Merge all sources + generate final map
├── regen_map.py # Regenerate map from cached Sreality data
├── run_all.sh # Orchestrator script (runs all scrapers + merge) ├── run_all.sh # Orchestrator script (runs all scrapers + merge)
├── mapa_bytu.html # Generated interactive map (output) ├── mapa_bytu.html # Generated interactive map (output)
├── Makefile # Docker management + validation shortcuts ├── Makefile # Docker management + validation shortcuts
├── build/ ├── build/
│ ├── Dockerfile # Container image definition (python:3.13-alpine) │ ├── Dockerfile # Container image definition (python:3.13-alpine)
│ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape) │ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape)
│ ├── crontab # Cron schedule (06:00 and 18:00 CET) │ ├── crontab # Cron schedule (every 4 hours)
│ └── CONTAINER.md # Container-specific documentation │ └── CONTAINER.md # Container-specific documentation
└── .gitignore # Ignores byty_*.json, __pycache__, .vscode └── .gitignore # Ignores byty_*.json, __pycache__, .vscode
``` ```

View File

@@ -11,7 +11,7 @@ WORKDIR /app
COPY scrape_and_map.py scrape_realingo.py scrape_bezrealitky.py \ COPY scrape_and_map.py scrape_realingo.py scrape_bezrealitky.py \
scrape_idnes.py scrape_psn.py scrape_cityhome.py \ scrape_idnes.py scrape_psn.py scrape_cityhome.py \
merge_and_map.py regen_map.py generate_status.py scraper_stats.py \ merge_and_map.py generate_status.py scraper_stats.py \
run_all.sh server.py ./ run_all.sh server.py ./
COPY build/crontab /etc/crontabs/root COPY build/crontab /etc/crontabs/root

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN a CityHome, Sloučí data ze Sreality, Realinga, Bezrealitek, iDNES, PSN, CityHome a Bazoše,
deduplikuje a vygeneruje mapu. deduplikuje a vygeneruje mapu.
Deduplikace: stejná ulice (z locality) + stejná cena + stejná plocha = duplikát. Deduplikace: stejná ulice (z locality) + stejná cena + stejná plocha = duplikát.
PSN a CityHome mají při deduplikaci prioritu (načtou se první). PSN a CityHome mají při deduplikaci prioritu (načtou se první).
@@ -9,6 +9,7 @@ from __future__ import annotations
import json import json
import re import re
import unicodedata
from pathlib import Path from pathlib import Path
from scrape_and_map import generate_map, format_price from scrape_and_map import generate_map, format_price
@@ -19,14 +20,8 @@ def normalize_street(locality: str) -> str:
# "Studentská, Praha 6 - Dejvice" → "studentska" # "Studentská, Praha 6 - Dejvice" → "studentska"
# "Rýnská, Praha" → "rynska" # "Rýnská, Praha" → "rynska"
street = locality.split(",")[0].strip().lower() street = locality.split(",")[0].strip().lower()
# Remove diacritics (simple Czech) # Remove diacritics using Unicode decomposition (handles all Czech characters)
replacements = { street = unicodedata.normalize("NFKD", street).encode("ascii", "ignore").decode("ascii")
"á": "a", "č": "c", "ď": "d", "é": "e", "ě": "e",
"í": "i", "ň": "n", "ó": "o", "ř": "r", "š": "s",
"ť": "t", "ú": "u", "ů": "u", "ý": "y", "ž": "z",
}
for src, dst in replacements.items():
street = street.replace(src, dst)
# Remove non-alphanumeric # Remove non-alphanumeric
street = re.sub(r"[^a-z0-9]", "", street) street = re.sub(r"[^a-z0-9]", "", street)
return street return street
@@ -49,6 +44,7 @@ def main():
("Realingo", "byty_realingo.json"), ("Realingo", "byty_realingo.json"),
("Bezrealitky", "byty_bezrealitky.json"), ("Bezrealitky", "byty_bezrealitky.json"),
("iDNES", "byty_idnes.json"), ("iDNES", "byty_idnes.json"),
("Bazoš", "byty_bazos.json"),
] ]
all_estates = [] all_estates = []

View File

@@ -13,7 +13,7 @@ RED='\033[0;31m'
BOLD='\033[1m' BOLD='\033[1m'
NC='\033[0m' NC='\033[0m'
TOTAL=6 TOTAL=7
CURRENT=0 CURRENT=0
FAILED=0 FAILED=0
START_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") START_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
@@ -84,9 +84,6 @@ exec > >(tee -a "$LOG_FILE") 2>&1
step "Sreality" step "Sreality"
python3 scrape_and_map.py $SCRAPER_ARGS || { echo -e "${RED}✗ Sreality selhalo${NC}"; FAILED=$((FAILED + 1)); } python3 scrape_and_map.py $SCRAPER_ARGS || { echo -e "${RED}✗ Sreality selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bezrealitky" step "Bezrealitky"
python3 scrape_bezrealitky.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bezrealitky selhalo${NC}"; FAILED=$((FAILED + 1)); } python3 scrape_bezrealitky.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bezrealitky selhalo${NC}"; FAILED=$((FAILED + 1)); }
@@ -101,6 +98,12 @@ PID_CH=$!
wait $PID_PSN || { echo -e "${RED}✗ PSN selhalo${NC}"; FAILED=$((FAILED + 1)); } wait $PID_PSN || { echo -e "${RED}✗ PSN selhalo${NC}"; FAILED=$((FAILED + 1)); }
wait $PID_CH || { echo -e "${RED}✗ CityHome selhalo${NC}"; FAILED=$((FAILED + 1)); } wait $PID_CH || { echo -e "${RED}✗ CityHome selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Bazoš"
python3 scrape_bazos.py $SCRAPER_ARGS || { echo -e "${RED}✗ Bazoš selhalo${NC}"; FAILED=$((FAILED + 1)); }
step "Realingo"
python3 scrape_realingo.py $SCRAPER_ARGS || { echo -e "${RED}✗ Realingo selhalo${NC}"; FAILED=$((FAILED + 1)); }
# ── Sloučení + mapa ────────────────────────────────────────── # ── Sloučení + mapa ──────────────────────────────────────────
step "Sloučení dat a generování mapy" step "Sloučení dat a generování mapy"
@@ -117,7 +120,7 @@ python3 generate_status.py --start-time "$START_TIME" --duration "$DURATION" $KE
echo "" echo ""
echo "============================================================" echo "============================================================"
if [ $FAILED -eq 0 ]; then if [ $FAILED -eq 0 ]; then
echo -e "${GREEN}${BOLD}Hotovo! Všech 6 zdrojů úspěšně staženo.${NC}" echo -e "${GREEN}${BOLD}Hotovo! Všech 7 zdrojů úspěšně staženo.${NC}"
else else
echo -e "${RED}${BOLD}Hotovo s $FAILED chybami.${NC}" echo -e "${RED}${BOLD}Hotovo s $FAILED chybami.${NC}"
fi fi

View File

@@ -15,7 +15,7 @@ import urllib.request
import urllib.parse import urllib.parse
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_sreality.json" STATS_FILE = "stats_sreality.json"
@@ -45,9 +45,9 @@ HEADERS = {
def api_get(url: str) -> dict: def api_get(url: str) -> dict:
"""Fetch JSON from Sreality API.""" """Fetch JSON from Sreality API with retry."""
logger.debug(f"HTTP GET request: {url}") for attempt in range(3):
logger.debug(f"Headers: {HEADERS}") logger.debug(f"HTTP GET request (attempt {attempt + 1}/3): {url}")
req = urllib.request.Request(url, headers=HEADERS) req = urllib.request.Request(url, headers=HEADERS)
try: try:
with urllib.request.urlopen(req, timeout=30) as resp: with urllib.request.urlopen(req, timeout=30) as resp:
@@ -55,8 +55,15 @@ def api_get(url: str) -> dict:
logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes") logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes")
logger.debug(f"Response preview: {response_data[:200]}") logger.debug(f"Response preview: {response_data[:200]}")
return json.loads(response_data) return json.loads(response_data)
except urllib.error.HTTPError:
raise
except (urllib.error.URLError, ConnectionError, OSError) as e: except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True) if attempt < 2:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/3 after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after 3 attempts: {e}", exc_info=True)
raise raise
@@ -356,6 +363,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "sreality"):
continue
results.append(result) results.append(result)
details_fetched += 1 details_fetched += 1
@@ -471,12 +480,12 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
floor_note = '<br><span style="color:#FF9800;font-weight:bold;">⚠ 2. NP — zvážit klidnost lokality</span>' floor_note = '<br><span style="color:#FF9800;font-weight:bold;">⚠ 2. NP — zvážit klidnost lokality</span>'
source = e.get("source", "sreality") source = e.get("source", "sreality")
source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome"} source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome", "bazos": "Bazoš"}
source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F"} source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F", "bazos": "#7B1FA2"}
source_label = source_labels.get(source, source) source_label = source_labels.get(source, source)
source_color = source_colors.get(source, "#999") source_color = source_colors.get(source, "#999")
hash_id = e.get("hash_id", "") hash_id = f"{source}_{e.get('hash_id', '')}"
first_seen = e.get("first_seen", "") first_seen = e.get("first_seen", "")
last_changed = e.get("last_changed", "") last_changed = e.get("last_changed", "")
@@ -864,6 +873,11 @@ function loadRatings() {{
function saveRatings(ratings) {{ function saveRatings(ratings) {{
localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings)); localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings));
fetch('/api/ratings', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify(ratings)
}}).catch(function() {{}});
}} }}
function addRejectStrike(marker) {{ function addRejectStrike(marker) {{
@@ -1167,8 +1181,25 @@ function applyFilters() {{
document.getElementById('visible-count').textContent = visible; document.getElementById('visible-count').textContent = visible;
}} }}
// Initialize ratings on load // Initialize ratings: load from server, merge with localStorage, then restore
restoreRatings(); function initRatings() {{
var local = loadRatings();
fetch('/api/ratings')
.then(function(r) {{ return r.ok ? r.json() : null; }})
.then(function(server) {{
if (server && typeof server === 'object') {{
var merged = Object.assign({{}}, local, server);
localStorage.setItem(RATINGS_KEY, JSON.stringify(merged));
}}
restoreRatings();
updateRatingCounts();
}})
.catch(function() {{
restoreRatings();
updateRatingCounts();
}});
}}
initRatings();
// ── Panel toggle ────────────────────────────────────────────── // ── Panel toggle ──────────────────────────────────────────────
function togglePanel() {{ function togglePanel() {{

560
scrape_bazos.py Normal file
View File

@@ -0,0 +1,560 @@
#!/usr/bin/env python3
"""
Bazoš.cz scraper.
Stáhne byty na prodej v Praze a vyfiltruje podle kritérií.
Výstup: byty_bazos.json
"""
from __future__ import annotations
import argparse
from datetime import datetime
import json
import logging
import math
import re
import time
import urllib.request
import urllib.parse
from pathlib import Path
from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bazos.json"
logger = logging.getLogger(__name__)
# ── Konfigurace ─────────────────────────────────────────────────────────────
MAX_PRICE = 14_000_000
MIN_AREA = 69
MIN_FLOOR = 2
PER_PAGE = 20 # Bazoš vrací 20 na stránku
WANTED_DISPOSITIONS = {"3+kk", "3+1", "4+kk", "4+1", "5+kk", "5+1", "6+kk", "6+1"}
# Regex patterns pro parsování dispozice, plochy a patra z textu
DISP_RE = re.compile(r'(\d)\s*\+\s*(kk|1)', re.IGNORECASE)
AREA_RE = re.compile(r'(\d+(?:[.,]\d+)?)\s*m[²2\s,.]', re.IGNORECASE)
FLOOR_RE = re.compile(r'(\d+)\s*[./]\s*(\d+)\s*(?:NP|patr|podlaž|floor)', re.IGNORECASE)
FLOOR_RE2 = re.compile(r'(\d+)\.\s*(?:NP|patr[eouě]|podlaž[ií])', re.IGNORECASE)
FLOOR_RE3 = re.compile(r'(?:patr[eouě]|podlaž[ií]|NP)\s*[:\s]*(\d+)', re.IGNORECASE)
PANEL_RE = re.compile(r'panel(?:ov|ák|\.)', re.IGNORECASE)
SIDLISTE_RE = re.compile(r'sídliště|sidliste|panelák', re.IGNORECASE)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "cs,en;q=0.9",
}
BASE_URL = "https://reality.bazos.cz"
SEARCH_PARAMS = "hledat=&rubriky=reality&hlokalita=Praha&humkreis=25&cenado={max_price}&kitx=ano"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8", errors="replace")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 3
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def format_price(price: int) -> str:
s = str(price)
parts = []
while s:
parts.append(s[-3:])
s = s[:-3]
return " ".join(reversed(parts)) + ""
def parse_price(text: str) -> int:
"""Parse price from text like '5 250 000 Kč' → 5250000."""
cleaned = re.sub(r'[^\d]', '', text)
return int(cleaned) if cleaned else 0
def parse_disposition(text: str) -> str | None:
"""Parse disposition from title/description like '3+kk', '4+1'."""
m = DISP_RE.search(text)
if m:
rooms = m.group(1)
suffix = m.group(2).lower()
return f"{rooms}+{suffix}"
return None
def parse_area(text: str) -> float | None:
"""Parse area from text like '82 m²' → 82.0."""
m = AREA_RE.search(text)
if m:
return float(m.group(1).replace(',', '.'))
return None
def parse_floor(text: str) -> int | None:
"""Parse floor number from description."""
for pattern in [FLOOR_RE, FLOOR_RE2, FLOOR_RE3]:
m = pattern.search(text)
if m:
return int(m.group(1))
return None
def is_panel(text: str) -> bool:
"""Check if description mentions panel construction."""
return bool(PANEL_RE.search(text))
def is_sidliste(text: str) -> bool:
"""Check if description mentions housing estate."""
return bool(SIDLISTE_RE.search(text))
def fetch_listing_page(offset: int = 0, pagination_params: str | None = None) -> tuple[list[dict], int, str | None]:
"""
Fetch a page of listings from Bazoš.
Returns (list of basic listing dicts, total count, pagination_params for next pages).
"""
if pagination_params and offset > 0:
# Use resolved numeric params from first page's pagination links
url = f"{BASE_URL}/prodam/byt/{offset}/?{pagination_params}"
else:
params = SEARCH_PARAMS.format(max_price=MAX_PRICE)
if offset > 0:
url = f"{BASE_URL}/prodam/byt/{offset}/?{params}"
else:
url = f"{BASE_URL}/prodam/byt/?{params}"
html = fetch_url(url)
# Parse total count: "Zobrazeno 1-20 z 727"
total = 0
total_match = re.search(r'z\s+([\d\s]+)\s', html)
if total_match:
total = int(total_match.group(1).replace(' ', ''))
# Extract resolved pagination params from first page (Bazoš converts
# hlokalita=Praha → hlokalita=11000, and pagination only works with numeric form)
resolved_params = None
pag_link = re.search(r'href="/prodam/byt/\d+/\?([^"]+)"', html)
if pag_link:
resolved_params = pag_link.group(1)
# Parse listings — split by listing blocks (class="inzeraty inzeratyflex")
listings = []
all_blocks = re.split(r'<div class="inzeraty\s+inzeratyflex">', html)[1:] # skip before first
for block in all_blocks:
# Extract URL and ID from first link (/inzerat/XXXXXX/slug.php)
url_match = re.search(r'href="(/inzerat/(\d+)/[^"]*)"', block)
if not url_match:
continue
detail_path = url_match.group(1)
listing_id = int(url_match.group(2))
# Title — class=nadpis (without quotes) or class="nadpis"
title_match = re.search(r'class=.?nadpis.?[^>]*>\s*<a[^>]*>([^<]+)</a>', block)
title = title_match.group(1).strip() if title_match else ""
# Price — inside <span translate="no"> within inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>.*?<span[^>]*>([^<]+)</span>', block, re.DOTALL)
if not price_match:
# Fallback: direct text in inzeratycena
price_match = re.search(r'class="inzeratycena"[^>]*>\s*(?:<b>)?([^<]+)', block)
price_text = price_match.group(1).strip() if price_match else ""
price = parse_price(price_text)
# Location
loc_match = re.search(r'class="inzeratylok"[^>]*>(.*?)</div>', block, re.DOTALL)
location = ""
if loc_match:
location = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
location = re.sub(r'\s+', ' ', location)
# Date — [5.3. 2026]
date_match = re.search(r'\[(\d+\.\d+\.\s*\d{4})\]', block)
date_str = date_match.group(1).strip() if date_match else ""
# Description preview — class=popis (without quotes) or class="popis"
desc_match = re.search(r'class=.?popis.?[^>]*>(.*?)</div>', block, re.DOTALL)
description = ""
if desc_match:
description = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
description = re.sub(r'\s+', ' ', description)
# Image — <img ... class="obrazek" ... src="...">
img_match = re.search(r'<img[^>]*src="([^"]+)"[^>]*class="obrazek"', block)
if not img_match:
img_match = re.search(r'class="obrazek"[^>]*src="([^"]+)"', block)
image = img_match.group(1) if img_match else ""
if "empty.gif" in image:
image = ""
listings.append({
"id": listing_id,
"title": title,
"price": price,
"location": location,
"date": date_str,
"description": description,
"detail_path": detail_path,
"image": image,
})
logger.debug(f"Offset {offset}: found {len(listings)} listings, total={total}")
return listings, total, resolved_params
def fetch_detail(path: str) -> dict | None:
"""Fetch listing detail page and extract GPS, full description."""
try:
url = f"{BASE_URL}{path}"
html = fetch_url(url)
result = {}
# GPS from Google Maps link
gps_match = re.search(r'google\.com/maps[^"]*place/([\d.]+),([\d.]+)', html)
if gps_match:
result["lat"] = float(gps_match.group(1))
result["lon"] = float(gps_match.group(2))
# Full description — Bazoš uses unquoted class=popisdetail
desc_match = re.search(r'class=.?popisdetail.?[^>]*>(.*?)</div>', html, re.DOTALL)
if desc_match:
desc = re.sub(r'<[^>]+>', ' ', desc_match.group(1)).strip()
desc = re.sub(r'\s+', ' ', desc)
result["description"] = desc
# Location from detail
loc_match = re.search(r'Lokalita:</td>\s*<td[^>]*>(.*?)</td>', html, re.DOTALL)
if loc_match:
loc = re.sub(r'<[^>]+>', ' ', loc_match.group(1)).strip()
loc = re.sub(r'\s+', ' ', loc)
result["detail_location"] = loc
return result
except Exception as e:
logger.warning(f"Detail fetch failed for {path}: {e}")
return None
def load_cache(json_path: str = "byty_bazos.json") -> dict[int, dict]:
"""Load previously scraped data as cache keyed by hash_id."""
path = Path(json_path)
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return {e["hash_id"]: e for e in data if "hash_id" in e}
except (json.JSONDecodeError, KeyError):
return {}
def scrape(max_pages: int | None = None, max_properties: int | None = None):
_run_start = time.time()
_run_ts = datetime.now().isoformat(timespec="seconds")
cache = load_cache()
today = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info("Stahuji inzeráty z Bazoš.cz")
logger.info(f"Cena: do {format_price(MAX_PRICE)}")
logger.info(f"Min. plocha: {MIN_AREA}")
logger.info(f"Patro: od {MIN_FLOOR}. NP")
logger.info(f"Region: Praha")
if cache:
logger.info(f"Cache: {len(cache)} bytů z minulého běhu")
if max_pages:
logger.info(f"Max. stran: {max_pages}")
if max_properties:
logger.info(f"Max. bytů: {max_properties}")
logger.info("=" * 60)
# Step 1: Fetch listing pages
logger.info("\nFáze 1: Stahování seznamu inzerátů...")
all_listings = {} # id -> listing dict (dedup)
page = 1
offset = 0
total = None
pagination_params = None # resolved numeric params from first page
while True:
if max_pages and page > max_pages:
logger.debug(f"Max pages limit reached: {max_pages}")
break
logger.info(f"Strana {page} (offset {offset}) ...")
listings, total_count, resolved = fetch_listing_page(offset, pagination_params)
if resolved and not pagination_params:
pagination_params = resolved
logger.debug(f"Resolved pagination params: {pagination_params}")
if total is None and total_count > 0:
total = total_count
total_pages = math.ceil(total / PER_PAGE)
logger.info(f"→ Celkem {total} inzerátů, ~{total_pages} stran")
if not listings:
logger.debug(f"No listings found on page {page}, stopping")
break
for lst in listings:
lid = lst["id"]
if lid not in all_listings:
all_listings[lid] = lst
page += 1
offset += PER_PAGE
if total and offset >= total:
break
time.sleep(0.5)
logger.info(f"\nStaženo: {len(all_listings)} unikátních inzerátů")
# Step 2: Pre-filter by disposition, price, area from listing data
pre_filtered = []
excluded_disp = 0
excluded_price = 0
excluded_area = 0
excluded_no_disp = 0
for lst in all_listings.values():
title_and_desc = f"{lst['title']} {lst['description']}"
# Parse disposition
disp = parse_disposition(title_and_desc)
if not disp:
excluded_no_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (no disposition found in '{lst['title']}')")
continue
if disp not in WANTED_DISPOSITIONS:
excluded_disp += 1
logger.debug(f"Filter: id={lst['id']} - excluded (disposition {disp})")
continue
# Price
price = lst["price"]
if price <= 0 or price > MAX_PRICE:
excluded_price += 1
logger.debug(f"Filter: id={lst['id']} - excluded (price {price})")
continue
# Area (if parseable from listing)
area = parse_area(title_and_desc)
if area is not None and area < MIN_AREA:
excluded_area += 1
logger.debug(f"Filter: id={lst['id']} - excluded (area {area} m²)")
continue
lst["_disposition"] = disp
lst["_area"] = area
pre_filtered.append(lst)
logger.info(f"\nPo předfiltraci:")
logger.info(f" Vyloučeno (bez dispozice): {excluded_no_disp}")
logger.info(f" Vyloučeno (dispozice): {excluded_disp}")
logger.info(f" Vyloučeno (cena): {excluded_price}")
logger.info(f" Vyloučeno (plocha): {excluded_area}")
logger.info(f" Zbývá: {len(pre_filtered)}")
# Step 3: Fetch details (for GPS + full description)
logger.info(f"\nFáze 2: Stahování detailů ({len(pre_filtered)} bytů)...")
results = []
excluded_panel = 0
excluded_floor = 0
excluded_no_gps = 0
excluded_detail = 0
excluded_area_detail = 0
cache_hits = 0
properties_fetched = 0
for i, lst in enumerate(pre_filtered):
if max_properties and properties_fetched >= max_properties:
logger.debug(f"Max properties limit reached: {max_properties}")
break
listing_id = lst["id"]
price = lst["price"]
# Check cache
cached = cache.get(listing_id)
if cached and cached.get("price") == price:
cache_hits += 1
logger.debug(f"Cache hit for id={listing_id}")
results.append(cached)
continue
time.sleep(0.4)
detail = fetch_detail(lst["detail_path"])
if not detail:
excluded_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (detail fetch failed)")
continue
# GPS required
lat = detail.get("lat")
lon = detail.get("lon")
if not lat or not lon:
excluded_no_gps += 1
logger.debug(f"Filter: id={listing_id} - excluded (no GPS)")
continue
# Full text for filtering
full_desc = detail.get("description", "")
full_text = f"{lst['title']} {lst['description']} {full_desc}"
# Panel check
if is_panel(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: panelová stavba")
continue
# Sídliště check
if is_sidliste(full_text):
excluded_panel += 1
logger.info(f"✗ Vyloučen #{listing_id}: sídliště")
continue
# Floor
floor = parse_floor(full_text)
if floor is not None and floor < MIN_FLOOR:
excluded_floor += 1
logger.debug(f"Filter: id={listing_id} - excluded (floor {floor})")
continue
# Area — re-check from detail if not found before
area = lst.get("_area") or parse_area(full_desc)
if area is not None and area < MIN_AREA:
excluded_area_detail += 1
logger.debug(f"Filter: id={listing_id} - excluded (area {area} m² from detail)")
continue
disp = lst["_disposition"]
locality = detail.get("detail_location") or lst["location"]
result = {
"hash_id": listing_id,
"name": f"Prodej bytu {disp} {int(area) if area else '?'}",
"price": price,
"price_formatted": format_price(price),
"locality": locality,
"lat": lat,
"lon": lon,
"disposition": disp,
"floor": floor,
"area": area,
"building_type": "neuvedeno",
"ownership": "neuvedeno",
"url": f"{BASE_URL}{lst['detail_path']}",
"source": "bazos",
"image": lst.get("image", ""),
"scraped_at": today,
"first_seen": cached.get("first_seen", today) if cached else today,
"last_changed": today if not cached or cached.get("price") != price else cached.get("last_changed", today),
}
if not validate_listing(result, "bazos"):
continue
results.append(result)
properties_fetched += 1
if (i + 1) % 20 == 0:
logger.info(f"Zpracováno {i + 1}/{len(pre_filtered)} ...")
logger.info(f"\n{'=' * 60}")
logger.info(f"Výsledky Bazoš:")
logger.info(f" Předfiltrováno: {len(pre_filtered)}")
logger.info(f" Z cache (přeskočeno): {cache_hits}")
logger.info(f" Vyloučeno (panel/síd): {excluded_panel}")
logger.info(f" Vyloučeno (patro): {excluded_floor}")
logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}")
logger.info(f" Vyloučeno (bez detailu): {excluded_detail}")
logger.info(f" Vyloučeno (plocha det): {excluded_area_detail}")
logger.info(f" ✓ Vyhovující byty: {len(results)}")
logger.info(f"{'=' * 60}")
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - _run_start, 1),
"success": True,
"accepted": len(results),
"fetched": len(all_listings),
"pages": page - 1,
"cache_hits": cache_hits,
"excluded": {
"bez dispozice": excluded_no_disp,
"dispozice": excluded_disp,
"cena": excluded_price,
"plocha": excluded_area + excluded_area_detail,
"bez GPS": excluded_no_gps,
"panel/síd": excluded_panel,
"patro": excluded_floor,
"bez detailu": excluded_detail,
},
})
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape apartments from Bazoš.cz")
parser.add_argument("--max-pages", type=int, default=None,
help="Maximum number of listing pages to scrape")
parser.add_argument("--max-properties", type=int, default=None,
help="Maximum number of properties to fetch details for")
parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level (default: INFO)")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
handlers=[logging.StreamHandler()]
)
_run_ts = datetime.now().isoformat(timespec="seconds")
start = time.time()
try:
estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties)
except Exception as e:
logger.error(f"Scraper failed: {e}", exc_info=True)
write_stats(STATS_FILE, {
"source": "Bazoš",
"timestamp": _run_ts,
"duration_sec": round(time.time() - start, 1),
"success": False,
"accepted": 0,
"fetched": 0,
"error": str(e),
})
raise
if estates:
json_path = Path("byty_bazos.json")
json_path.write_text(
json.dumps(estates, ensure_ascii=False, indent=2),
encoding="utf-8",
)
elapsed = time.time() - start
logger.info(f"\n✓ Data uložena: {json_path.resolve()}")
logger.info(f"⏱ Celkový čas: {elapsed:.0f} s")
else:
logger.info("\nŽádné byty z Bazoše neodpovídají kritériím :(")

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bezrealitky.json" STATS_FILE = "stats_bezrealitky.json"
@@ -71,19 +71,35 @@ HEADERS = {
BASE_URL = "https://www.bezrealitky.cz" BASE_URL = "https://www.bezrealitky.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_page(page: int) -> tuple[list[dict], int]: def fetch_page(page: int) -> tuple[list[dict], int]:
""" """
Fetch a listing page from Bezrealitky. Fetch a listing page from Bezrealitky.
Returns (list of advert dicts from Apollo cache, total count). Returns (list of advert dicts from Apollo cache, total count).
""" """
url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}" url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -113,20 +129,13 @@ def fetch_page(page: int) -> tuple[list[dict], int]:
logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}") logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}")
return adverts, total return adverts, total
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(uri: str) -> dict | None: def fetch_detail(uri: str) -> dict | None:
"""Fetch detail page for a listing.""" """Fetch detail page for a listing."""
try: try:
url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}" url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -365,6 +374,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "bezrealitky"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -14,7 +14,7 @@ import time
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_cityhome.json" STATS_FILE = "stats_cityhome.json"
@@ -375,6 +375,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")), "first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")), "last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "cityhome"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -16,7 +16,7 @@ import time
import urllib.request import urllib.request
import urllib.parse import urllib.parse
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_idnes.json" STATS_FILE = "stats_idnes.json"
@@ -467,6 +467,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "idnes"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from urllib.parse import urlencode from urllib.parse import urlencode
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_psn.json" STATS_FILE = "stats_psn.json"
@@ -38,9 +38,10 @@ BASE_URL = "https://psn.cz"
UNITS_API = f"{BASE_URL}/api/units-list" UNITS_API = f"{BASE_URL}/api/units-list"
def fetch_json(url: str) -> dict: def fetch_json(url: str, retries: int = 3) -> dict:
"""Fetch JSON via curl (urllib SSL may fail on Cloudflare).""" """Fetch JSON via curl (urllib SSL may fail on Cloudflare) with retry."""
logger.debug(f"HTTP GET: {url}") for attempt in range(retries):
logger.debug(f"HTTP GET (attempt {attempt + 1}/{retries}): {url}")
result = subprocess.run( result = subprocess.run(
["curl", "-s", "-L", "--max-time", "30", ["curl", "-s", "-L", "--max-time", "30",
"-H", f"User-Agent: {UA}", "-H", f"User-Agent: {UA}",
@@ -48,9 +49,14 @@ def fetch_json(url: str) -> dict:
url], url],
capture_output=True, text=True, timeout=60 capture_output=True, text=True, timeout=60
) )
if result.returncode != 0: if result.returncode == 0:
raise RuntimeError(f"curl failed ({result.returncode}): {result.stderr[:200]}")
return json.loads(result.stdout) return json.loads(result.stdout)
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"curl failed (retry {attempt + 1}/{retries} after {wait}s): {result.stderr[:200]}")
time.sleep(wait)
else:
raise RuntimeError(f"curl failed after {retries} attempts ({result.returncode}): {result.stderr[:200]}")
def fix_gps(lat, lng): def fix_gps(lat, lng):
@@ -255,6 +261,8 @@ def scrape(max_properties: int | None = None):
"first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")), "first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")), "last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "psn"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_realingo.json" STATS_FILE = "stats_realingo.json"
@@ -56,6 +56,28 @@ HEADERS = {
BASE_URL = "https://www.realingo.cz" BASE_URL = "https://www.realingo.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]: def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
"""Fetch a page of Prague listings. Returns (items, total_count).""" """Fetch a page of Prague listings. Returns (items, total_count)."""
if page == 1: if page == 1:
@@ -63,14 +85,7 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
else: else:
url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/" url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
@@ -83,21 +98,13 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
offer_list = data["props"]["pageProps"]["store"]["offer"]["list"] offer_list = data["props"]["pageProps"]["store"]["offer"]["list"]
logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}") logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}")
return offer_list["data"], offer_list["total"] return offer_list["data"], offer_list["total"]
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(listing_url: str) -> dict | None: def fetch_detail(listing_url: str) -> dict | None:
"""Fetch detail page for a listing to get floor, building type, etc.""" """Fetch detail page for a listing to get floor, building type, etc."""
try: try:
url = f"{BASE_URL}{listing_url}" url = f"{BASE_URL}{listing_url}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
@@ -324,6 +331,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "realingo"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -1,13 +1,53 @@
"""Shared utility for writing per-scraper run statistics to JSON.""" """Shared utilities for scraper run statistics and listing validation."""
from __future__ import annotations from __future__ import annotations
import json import json
import logging
import os import os
from pathlib import Path from pathlib import Path
HERE = Path(__file__).parent HERE = Path(__file__).parent
DATA_DIR = Path(os.environ.get("DATA_DIR", HERE)) DATA_DIR = Path(os.environ.get("DATA_DIR", HERE))
_val_log = logging.getLogger(__name__)
_REQUIRED_FIELDS = ("hash_id", "price", "locality", "lat", "lon", "url", "source")
def validate_listing(listing: dict, context: str = "") -> bool:
"""
Validate a listing dict before it is written to the output JSON.
Returns True if valid, False if the listing should be skipped.
Logs a warning for each invalid listing.
"""
prefix = f"[{context}] " if context else ""
for field in _REQUIRED_FIELDS:
val = listing.get(field)
if val is None or val == "":
_val_log.warning(f"{prefix}Skipping listing — missing field '{field}': {listing.get('hash_id', '?')}")
return False
price = listing.get("price")
if not isinstance(price, (int, float)) or price <= 0:
_val_log.warning(f"{prefix}Skipping listing — invalid price={price!r}: {listing.get('hash_id', '?')}")
return False
lat, lon = listing.get("lat"), listing.get("lon")
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
_val_log.warning(f"{prefix}Skipping listing — non-numeric GPS lat={lat!r} lon={lon!r}: {listing.get('hash_id', '?')}")
return False
if not (47.0 <= lat <= 52.0) or not (12.0 <= lon <= 19.0):
_val_log.warning(f"{prefix}Skipping listing — GPS outside Czech Republic lat={lat} lon={lon}: {listing.get('hash_id', '?')}")
return False
area = listing.get("area")
if area is not None and (not isinstance(area, (int, float)) or area <= 0):
_val_log.warning(f"{prefix}Skipping listing — invalid area={area!r}: {listing.get('hash_id', '?')}")
return False
return True
def write_stats(filename: str, stats: dict) -> None: def write_stats(filename: str, stats: dict) -> None:
"""Write scraper run stats dict to the data directory.""" """Write scraper run stats dict to the data directory."""