Add validation mode, structured logging, and CLI args to all scrapers
- Replace print() with Python logging module across all 6 scrapers for configurable log levels (DEBUG/INFO/WARNING/ERROR) - Add --max-pages, --max-properties, and --log-level CLI arguments to each scraper via argparse for limiting scrape scope - Add validation Make targets (validation, validation-local, validation-local-debug) for quick test runs with limited data - Update run_all.sh to parse and forward CLI args to all scrapers - Update mapa_bytu.html with latest scrape results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,7 +6,9 @@ Hledá byty na prodej v Praze podle zadaných kritérií a generuje HTML mapu.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
import urllib.request
|
||||
@@ -14,6 +16,8 @@ import urllib.parse
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Konfigurace filtrů ──────────────────────────────────────────────────────
|
||||
|
||||
MAX_PRICE = 13_500_000 # Kč
|
||||
@@ -39,9 +43,18 @@ HEADERS = {
|
||||
|
||||
def api_get(url: str) -> dict:
|
||||
"""Fetch JSON from Sreality API."""
|
||||
logger.debug(f"HTTP GET request: {url}")
|
||||
logger.debug(f"Headers: {HEADERS}")
|
||||
req = urllib.request.Request(url, headers=HEADERS)
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
response_data = resp.read().decode("utf-8")
|
||||
logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes")
|
||||
logger.debug(f"Response preview: {response_data[:200]}")
|
||||
return json.loads(response_data)
|
||||
except (urllib.error.URLError, ConnectionError, OSError) as e:
|
||||
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def build_list_url(disposition: int, page: int = 1) -> str:
|
||||
@@ -59,20 +72,26 @@ def build_list_url(disposition: int, page: int = 1) -> str:
|
||||
return f"{API_BASE}?{urllib.parse.urlencode(params)}"
|
||||
|
||||
|
||||
def fetch_estates_for_disposition(disposition: int) -> list[dict]:
|
||||
def fetch_estates_for_disposition(disposition: int, max_pages: int | None = None) -> list[dict]:
|
||||
"""Fetch all estates for a given disposition, handling pagination."""
|
||||
url = build_list_url(disposition, page=1)
|
||||
print(f" Fetching disposition {disposition}, page 1 ...")
|
||||
logger.info(f"Fetching disposition {disposition}, page 1 ...")
|
||||
data = api_get(url)
|
||||
total = data.get("result_size", 0)
|
||||
estates = data.get("_embedded", {}).get("estates", [])
|
||||
total_pages = math.ceil(total / PER_PAGE) if total > 0 else 0
|
||||
|
||||
print(f" → {total} results, {total_pages} pages")
|
||||
logger.info(f"→ {total} results, {total_pages} pages")
|
||||
|
||||
# Limit pages if max_pages is specified
|
||||
if max_pages is not None:
|
||||
original_pages = total_pages
|
||||
total_pages = min(total_pages, max_pages)
|
||||
logger.debug(f"Max pages limit reached: limiting {original_pages} pages to {total_pages}")
|
||||
|
||||
for page in range(2, total_pages + 1):
|
||||
time.sleep(0.5)
|
||||
print(f" Fetching page {page}/{total_pages} ...")
|
||||
logger.info(f"Fetching page {page}/{total_pages} ...")
|
||||
url = build_list_url(disposition, page=page)
|
||||
data = api_get(url)
|
||||
estates.extend(data.get("_embedded", {}).get("estates", []))
|
||||
@@ -84,9 +103,12 @@ def get_estate_detail(hash_id: int) -> dict | None:
|
||||
"""Fetch detail for a single estate to get floor info and building type."""
|
||||
try:
|
||||
url = DETAIL_API.format(hash_id)
|
||||
return api_get(url)
|
||||
logger.debug(f"Fetching detail for hash_id={hash_id}")
|
||||
detail = api_get(url)
|
||||
logger.debug(f"Detail fetched for hash_id={hash_id}, keys: {list(detail.keys())[:5]}")
|
||||
return detail
|
||||
except Exception as e:
|
||||
print(f" Warning: Could not fetch detail for {hash_id}: {e}")
|
||||
logger.warning(f"Could not fetch detail for hash_id={hash_id}: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
@@ -185,24 +207,28 @@ def load_cache(json_path: str = "byty_sreality.json") -> dict[int, dict]:
|
||||
return {}
|
||||
|
||||
|
||||
def scrape():
|
||||
def scrape(max_pages: int | None = None, max_properties: int | None = None):
|
||||
"""Main scraping function. Returns list of filtered estates."""
|
||||
all_estates_raw = []
|
||||
cache = load_cache()
|
||||
|
||||
print("=" * 60)
|
||||
print("Stahuji inzeráty ze Sreality.cz")
|
||||
print(f"Cena: do {format_price(MAX_PRICE)}")
|
||||
print(f"Dispozice: {', '.join(disposition_label(d) for d in DISPOSITIONS)}")
|
||||
print(f"Patro: od {MIN_FLOOR}. NP")
|
||||
print(f"Region: Praha")
|
||||
logger.info("=" * 60)
|
||||
logger.info("Stahuji inzeráty ze Sreality.cz")
|
||||
logger.info(f"Cena: do {format_price(MAX_PRICE)}")
|
||||
logger.info(f"Dispozice: {', '.join(disposition_label(d) for d in DISPOSITIONS)}")
|
||||
logger.info(f"Patro: od {MIN_FLOOR}. NP")
|
||||
logger.info(f"Region: Praha")
|
||||
if cache:
|
||||
print(f"Cache: {len(cache)} bytů z minulého běhu")
|
||||
print("=" * 60)
|
||||
logger.info(f"Cache: {len(cache)} bytů z minulého běhu")
|
||||
if max_pages:
|
||||
logger.info(f"Limit stran: {max_pages}")
|
||||
if max_properties:
|
||||
logger.info(f"Limit majetků: {max_properties}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
for disp in DISPOSITIONS:
|
||||
print(f"\n▸ Dispozice: {disposition_label(disp)}")
|
||||
estates = fetch_estates_for_disposition(disp)
|
||||
logger.info(f"\n▸ Dispozice: {disposition_label(disp)}")
|
||||
estates = fetch_estates_for_disposition(disp, max_pages=max_pages)
|
||||
for e in estates:
|
||||
e["_disposition_cb"] = disp
|
||||
all_estates_raw.extend(estates)
|
||||
@@ -217,10 +243,10 @@ def scrape():
|
||||
seen.add(hid)
|
||||
unique_estates.append(e)
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f"Staženo celkem: {len(unique_estates)} unikátních inzerátů")
|
||||
print(f"Stahuji detaily pro filtrování panelu/sídlišť...")
|
||||
print(f"{'=' * 60}")
|
||||
logger.info(f"\n{'=' * 60}")
|
||||
logger.info(f"Staženo celkem: {len(unique_estates)} unikátních inzerátů")
|
||||
logger.info(f"Stahuji detaily pro filtrování panelu/sídlišť...")
|
||||
logger.info(f"{'=' * 60}")
|
||||
|
||||
# Fetch details and filter
|
||||
results = []
|
||||
@@ -229,19 +255,26 @@ def scrape():
|
||||
excluded_no_detail = 0
|
||||
excluded_small = 0
|
||||
cache_hits = 0
|
||||
details_fetched = 0
|
||||
|
||||
for i, estate in enumerate(unique_estates):
|
||||
# Stop if max_properties reached
|
||||
if max_properties is not None and details_fetched >= max_properties:
|
||||
logger.debug(f"Max properties limit reached: {max_properties}")
|
||||
break
|
||||
hash_id = estate.get("hash_id")
|
||||
gps = estate.get("gps", {})
|
||||
|
||||
if not gps or not gps.get("lat") or not gps.get("lon"):
|
||||
excluded_no_gps += 1
|
||||
logger.debug(f"Filter: hash_id={hash_id} - excluded (no GPS)")
|
||||
continue
|
||||
|
||||
# Check cache — if hash_id exists and price unchanged, reuse
|
||||
cached = cache.get(hash_id)
|
||||
if cached and cached.get("price") == estate.get("price", 0):
|
||||
cache_hits += 1
|
||||
logger.debug(f"Cache hit for hash_id={hash_id}")
|
||||
results.append(cached)
|
||||
continue
|
||||
|
||||
@@ -250,13 +283,15 @@ def scrape():
|
||||
detail = get_estate_detail(hash_id)
|
||||
if not detail:
|
||||
excluded_no_detail += 1
|
||||
logger.debug(f"Filter: hash_id={hash_id} - excluded (no detail)")
|
||||
continue
|
||||
|
||||
# Check panel / sídliště
|
||||
is_excluded, reason = is_panel_or_sidliste(detail)
|
||||
if is_excluded:
|
||||
excluded_panel += 1
|
||||
print(f" ✗ Vyloučen #{hash_id}: {reason}")
|
||||
logger.debug(f"Filter: hash_id={hash_id} - excluded (panel/sídliště): {reason}")
|
||||
logger.info(f"✗ Vyloučen #{hash_id}: {reason}")
|
||||
continue
|
||||
|
||||
# Parse floor
|
||||
@@ -276,7 +311,8 @@ def scrape():
|
||||
# Filter by minimum area
|
||||
if area is not None and area < MIN_AREA:
|
||||
excluded_small += 1
|
||||
print(f" ✗ Vyloučen #{hash_id}: malá plocha ({area} m²)")
|
||||
logger.debug(f"Filter: hash_id={hash_id} - excluded (area {area} m² < {MIN_AREA} m²)")
|
||||
logger.info(f"✗ Vyloučen #{hash_id}: malá plocha ({area} m²)")
|
||||
continue
|
||||
|
||||
# Get building type
|
||||
@@ -313,20 +349,21 @@ def scrape():
|
||||
"image": (estate.get("_links", {}).get("images", [{}])[0].get("href", "") if estate.get("_links", {}).get("images") else ""),
|
||||
}
|
||||
results.append(result)
|
||||
details_fetched += 1
|
||||
|
||||
if (i + 1) % 20 == 0:
|
||||
print(f" Zpracováno {i + 1}/{len(unique_estates)} ...")
|
||||
logger.info(f"Zpracováno {i + 1}/{len(unique_estates)} ...")
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f"Výsledky:")
|
||||
print(f" Celkem staženo: {len(unique_estates)}")
|
||||
print(f" Z cache (přeskočeno): {cache_hits}")
|
||||
print(f" Vyloučeno (panel/síd): {excluded_panel}")
|
||||
print(f" Vyloučeno (<{MIN_AREA} m²): {excluded_small}")
|
||||
print(f" Vyloučeno (bez GPS): {excluded_no_gps}")
|
||||
print(f" Vyloučeno (bez detailu): {excluded_no_detail}")
|
||||
print(f" ✓ Vyhovující byty: {len(results)}")
|
||||
print(f"{'=' * 60}")
|
||||
logger.info(f"\n{'=' * 60}")
|
||||
logger.info(f"Výsledky:")
|
||||
logger.info(f" Celkem staženo: {len(unique_estates)}")
|
||||
logger.info(f" Z cache (přeskočeno): {cache_hits}")
|
||||
logger.info(f" Vyloučeno (panel/síd): {excluded_panel}")
|
||||
logger.info(f" Vyloučeno (<{MIN_AREA} m²): {excluded_small}")
|
||||
logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}")
|
||||
logger.info(f" Vyloučeno (bez detailu): {excluded_no_detail}")
|
||||
logger.info(f" ✓ Vyhovující byty: {len(results)}")
|
||||
logger.info(f"{'=' * 60}")
|
||||
|
||||
return results
|
||||
|
||||
@@ -820,15 +857,29 @@ restoreRatings();
|
||||
|
||||
path = Path(output_path)
|
||||
path.write_text(html, encoding="utf-8")
|
||||
print(f"\n✓ Mapa uložena: {path.resolve()}")
|
||||
logger.info(f"\n✓ Mapa uložena: {path.resolve()}")
|
||||
return str(path.resolve())
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Scrape apartments from Sreality.cz")
|
||||
parser.add_argument("--max-pages", type=int, help="Maximum number of pages to scrape")
|
||||
parser.add_argument("--max-properties", type=int, help="Maximum number of properties to fetch details for")
|
||||
parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
||||
help="Logging level (default: INFO)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, args.log_level),
|
||||
format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
|
||||
handlers=[logging.StreamHandler()]
|
||||
)
|
||||
|
||||
start = time.time()
|
||||
estates = scrape()
|
||||
estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties)
|
||||
|
||||
if estates:
|
||||
# Save raw data as JSON backup
|
||||
@@ -837,12 +888,12 @@ if __name__ == "__main__":
|
||||
json.dumps(estates, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
print(f"✓ Data uložena: {json_path.resolve()}")
|
||||
logger.info(f"✓ Data uložena: {json_path.resolve()}")
|
||||
|
||||
# Generate map
|
||||
map_path = generate_map(estates)
|
||||
elapsed = time.time() - start
|
||||
print(f"\n⏱ Celkový čas: {elapsed:.0f} s")
|
||||
print(f"\nOtevři v prohlížeči:\n file://{map_path}")
|
||||
logger.info(f"\n⏱ Celkový čas: {elapsed:.0f} s")
|
||||
logger.info(f"\nOtevři v prohlížeči:\n file://{map_path}")
|
||||
else:
|
||||
print("\nŽádné byty neodpovídají kritériím :(")
|
||||
logger.info("\nŽádné byty neodpovídají kritériím :(")
|
||||
|
||||
Reference in New Issue
Block a user