#!/usr/bin/env python3 """ Sreality scraper + interactive map generator. Hledá byty na prodej v Praze podle zadaných kritérií a generuje HTML mapu. """ from __future__ import annotations import argparse import json import logging import math import time import urllib.request import urllib.parse from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) # ── Konfigurace filtrů ────────────────────────────────────────────────────── MAX_PRICE = 13_500_000 # Kč MIN_AREA = 69 # m² — vyloučit byty menší než toto DISPOSITIONS = [6, 7, 8, 9, 10, 11, 12] # 3+kk, 3+1, 4+kk, 4+1, 5+kk, 5+1, 6+ MIN_FLOOR = 2 # stáhneme od 2. NP, na mapě označíme 2. NP zvlášť REGION_ID = 10 # Praha PER_PAGE = 60 # Sreality API base API_BASE = "https://www.sreality.cz/api/cs/v2/estates" DETAIL_API = "https://www.sreality.cz/api/cs/v2/estates/{}" # Klíčová slova pro vyloučení panelových domů / sídlišť PANEL_KEYWORDS = {"panel", "panelový", "panelový dům", "panelák"} SIDLISTE_KEYWORDS = {"sídliště", "sidliste"} HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Accept": "application/json", } def api_get(url: str) -> dict: """Fetch JSON from Sreality API.""" logger.debug(f"HTTP GET request: {url}") logger.debug(f"Headers: {HEADERS}") req = urllib.request.Request(url, headers=HEADERS) try: with urllib.request.urlopen(req, timeout=30) as resp: response_data = resp.read().decode("utf-8") logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes") logger.debug(f"Response preview: {response_data[:200]}") return json.loads(response_data) except (urllib.error.URLError, ConnectionError, OSError) as e: logger.error(f"HTTP request failed for {url}: {e}", exc_info=True) raise def build_list_url(disposition: int, page: int = 1) -> str: """Build Sreality API URL for a given disposition.""" params = { "category_main_cb": 1, # byty "category_type_cb": 1, # prodej "category_sub_cb": disposition, "locality_region_id": REGION_ID, "czk_price_summary_order2": f"0|{MAX_PRICE}", "floor_number": f"{MIN_FLOOR}|99", "per_page": PER_PAGE, "page": page, } return f"{API_BASE}?{urllib.parse.urlencode(params)}" def fetch_estates_for_disposition(disposition: int, max_pages: int | None = None) -> list[dict]: """Fetch all estates for a given disposition, handling pagination.""" url = build_list_url(disposition, page=1) logger.info(f"Fetching disposition {disposition}, page 1 ...") data = api_get(url) total = data.get("result_size", 0) estates = data.get("_embedded", {}).get("estates", []) total_pages = math.ceil(total / PER_PAGE) if total > 0 else 0 logger.info(f"→ {total} results, {total_pages} pages") # Limit pages if max_pages is specified if max_pages is not None: original_pages = total_pages total_pages = min(total_pages, max_pages) logger.debug(f"Max pages limit reached: limiting {original_pages} pages to {total_pages}") for page in range(2, total_pages + 1): time.sleep(0.5) logger.info(f"Fetching page {page}/{total_pages} ...") url = build_list_url(disposition, page=page) data = api_get(url) estates.extend(data.get("_embedded", {}).get("estates", [])) return estates def get_estate_detail(hash_id: int) -> dict | None: """Fetch detail for a single estate to get floor info and building type.""" try: url = DETAIL_API.format(hash_id) logger.debug(f"Fetching detail for hash_id={hash_id}") detail = api_get(url) logger.debug(f"Detail fetched for hash_id={hash_id}, keys: {list(detail.keys())[:5]}") return detail except Exception as e: logger.warning(f"Could not fetch detail for hash_id={hash_id}: {e}", exc_info=True) return None def parse_floor_from_detail(detail: dict) -> int | None: """Extract floor number from detail items.""" for item in detail.get("items", []): if item.get("name") == "Podlaží" or item.get("name") == "Podlazi": val = item.get("value", "") # Format: "3. podlaží z celkem 5 ..." or similar parts = val.split(".") if parts: try: return int(parts[0].strip()) except ValueError: pass return None def is_panel_or_sidliste(detail: dict) -> tuple[bool, str]: """ Check if the estate is panel construction or on a sídliště. Returns (should_exclude, reason). """ reasons = [] for item in detail.get("items", []): name = (item.get("name") or "").lower() value = str(item.get("value") or "").lower() # Check "Stavba" field for panel if name in ("stavba", "konstrukce"): if "panel" in value: reasons.append(f"stavba: {value}") # Check "Umístění objektu" for sídliště if name in ("umístění objektu", "umisteni objektu"): if "sídliště" in value or "sidliste" in value: reasons.append(f"umístění: {value}") # Also check description text description = str(detail.get("text", {}).get("value", "")).lower() locality_text = str(detail.get("locality", {}).get("value", "")).lower() if isinstance(detail.get("locality"), dict) else "" return (len(reasons) > 0, "; ".join(reasons)) def disposition_label(sub_cb: int) -> str: """Human-readable disposition label.""" labels = { 2: "1+kk", 3: "1+1", 4: "2+kk", 5: "2+1", 6: "3+kk", 7: "3+1", 8: "4+kk", 9: "4+1", 10: "5+kk", 11: "5+1", 12: "6+", 16: "Atypický", } return labels.get(sub_cb, "?") def disposition_url_slug(sub_cb: int) -> str: """URL slug for disposition in Sreality URLs.""" slugs = { 2: "1+kk", 3: "1+1", 4: "2+kk", 5: "2+1", 6: "3+kk", 7: "3+1", 8: "4+kk", 9: "4+1", 10: "5+kk", 11: "5+1", 12: "6-a-vice", 16: "atypicky", } return slugs.get(sub_cb, "byt") def sreality_url(hash_id: int, seo: dict) -> str: """Build human-readable Sreality URL from estate data.""" cat_type = {1: "prodej", 2: "pronajem"}.get(seo.get("category_type_cb", 1), "prodej") cat_main = {1: "byt", 2: "dum", 3: "pozemek", 4: "komercni"}.get(seo.get("category_main_cb", 1), "byt") cat_sub = disposition_url_slug(seo.get("category_sub_cb", 0)) locality = seo.get("locality", "praha") return f"https://www.sreality.cz/detail/{cat_type}/{cat_main}/{cat_sub}/{locality}/{hash_id}" def format_price(price: int) -> str: """Format price in CZK with spaces.""" s = str(price) parts = [] while s: parts.append(s[-3:]) s = s[:-3] return " ".join(reversed(parts)) + " Kč" # ── Hlavní scraping ───────────────────────────────────────────────────────── def load_cache(json_path: str = "byty_sreality.json") -> dict[int, dict]: """Load previously scraped data as cache keyed by hash_id.""" path = Path(json_path) if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return {e["hash_id"]: e for e in data if "hash_id" in e} except (json.JSONDecodeError, KeyError): return {} def scrape(max_pages: int | None = None, max_properties: int | None = None): """Main scraping function. Returns list of filtered estates.""" all_estates_raw = [] cache = load_cache() logger.info("=" * 60) logger.info("Stahuji inzeráty ze Sreality.cz") logger.info(f"Cena: do {format_price(MAX_PRICE)}") logger.info(f"Dispozice: {', '.join(disposition_label(d) for d in DISPOSITIONS)}") logger.info(f"Patro: od {MIN_FLOOR}. NP") logger.info(f"Region: Praha") if cache: logger.info(f"Cache: {len(cache)} bytů z minulého běhu") if max_pages: logger.info(f"Limit stran: {max_pages}") if max_properties: logger.info(f"Limit majetků: {max_properties}") logger.info("=" * 60) for disp in DISPOSITIONS: logger.info(f"\n▸ Dispozice: {disposition_label(disp)}") estates = fetch_estates_for_disposition(disp, max_pages=max_pages) for e in estates: e["_disposition_cb"] = disp all_estates_raw.extend(estates) time.sleep(0.5) # Deduplicate by hash_id seen = set() unique_estates = [] for e in all_estates_raw: hid = e.get("hash_id") if hid and hid not in seen: seen.add(hid) unique_estates.append(e) logger.info(f"\n{'=' * 60}") logger.info(f"Staženo celkem: {len(unique_estates)} unikátních inzerátů") logger.info(f"Stahuji detaily pro filtrování panelu/sídlišť...") logger.info(f"{'=' * 60}") # Fetch details and filter results = [] excluded_panel = 0 excluded_no_gps = 0 excluded_no_detail = 0 excluded_small = 0 cache_hits = 0 details_fetched = 0 for i, estate in enumerate(unique_estates): # Stop if max_properties reached if max_properties is not None and details_fetched >= max_properties: logger.debug(f"Max properties limit reached: {max_properties}") break hash_id = estate.get("hash_id") gps = estate.get("gps", {}) if not gps or not gps.get("lat") or not gps.get("lon"): excluded_no_gps += 1 logger.debug(f"Filter: hash_id={hash_id} - excluded (no GPS)") continue # Check cache — if hash_id exists and price unchanged, reuse cached = cache.get(hash_id) if cached and cached.get("price") == estate.get("price", 0): cache_hits += 1 logger.debug(f"Cache hit for hash_id={hash_id}") results.append(cached) continue # Fetch detail time.sleep(0.3) detail = get_estate_detail(hash_id) if not detail: excluded_no_detail += 1 logger.debug(f"Filter: hash_id={hash_id} - excluded (no detail)") continue # Check panel / sídliště is_excluded, reason = is_panel_or_sidliste(detail) if is_excluded: excluded_panel += 1 logger.debug(f"Filter: hash_id={hash_id} - excluded (panel/sídliště): {reason}") logger.info(f"✗ Vyloučen #{hash_id}: {reason}") continue # Parse floor floor = parse_floor_from_detail(detail) # Get area — field name can be truncated ("Užitná ploch" or "Užitná plocha") area = None for item in detail.get("items", []): name = item.get("name", "") if "žitná ploch" in name or "zitna ploch" in name.lower(): try: area = int(item["value"]) except (ValueError, KeyError): pass break # Filter by minimum area if area is not None and area < MIN_AREA: excluded_small += 1 logger.debug(f"Filter: hash_id={hash_id} - excluded (area {area} m² < {MIN_AREA} m²)") logger.info(f"✗ Vyloučen #{hash_id}: malá plocha ({area} m²)") continue # Get building type building_type = None for item in detail.get("items", []): if item.get("name") in ("Stavba", "Konstrukce"): building_type = item.get("value") break # Get ownership ownership = None for item in detail.get("items", []): if item.get("name") in ("Vlastnictví", "Vlastnictvi"): ownership = item.get("value") break disp_cb = estate.get("_disposition_cb") or estate.get("seo", {}).get("category_sub_cb") seo = estate.get("seo", {}) result = { "hash_id": hash_id, "name": estate.get("name", ""), "price": estate.get("price", 0), "price_formatted": format_price(estate.get("price", 0)), "locality": estate.get("locality", ""), "lat": gps["lat"], "lon": gps["lon"], "disposition": disposition_label(disp_cb), "floor": floor, "area": area, "building_type": building_type, "ownership": ownership, "url": sreality_url(hash_id, seo), "image": (estate.get("_links", {}).get("images", [{}])[0].get("href", "") if estate.get("_links", {}).get("images") else ""), } results.append(result) details_fetched += 1 if (i + 1) % 20 == 0: logger.info(f"Zpracováno {i + 1}/{len(unique_estates)} ...") logger.info(f"\n{'=' * 60}") logger.info(f"Výsledky:") logger.info(f" Celkem staženo: {len(unique_estates)}") logger.info(f" Z cache (přeskočeno): {cache_hits}") logger.info(f" Vyloučeno (panel/síd): {excluded_panel}") logger.info(f" Vyloučeno (<{MIN_AREA} m²): {excluded_small}") logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}") logger.info(f" Vyloučeno (bez detailu): {excluded_no_detail}") logger.info(f" ✓ Vyhovující byty: {len(results)}") logger.info(f"{'=' * 60}") return results # ── Generování HTML mapy ──────────────────────────────────────────────────── def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"): """Generate an interactive Leaflet.js HTML map.""" # Color by disposition color_map = { "3+kk": "#2196F3", # blue "3+1": "#4CAF50", # green "4+kk": "#FF9800", # orange "4+1": "#F44336", # red "5+kk": "#9C27B0", # purple "5+1": "#795548", # brown "6+": "#607D8B", # grey-blue } markers_js = "" for e in estates: color = color_map.get(e["disposition"], "#999999") floor_text = f'{e["floor"]}. NP' if e["floor"] else "neuvedeno" area_text = f'{e["area"]} m²' if e["area"] else "neuvedeno" building_text = e["building_type"] or "neuvedeno" ownership_text = e["ownership"] or "neuvedeno" # Floor warning for 2nd floor floor_note = "" if e["floor"] == 2: floor_note = '
⚠ 2. NP — zvážit klidnost lokality' source = e.get("source", "sreality") source_labels = {"sreality": "Sreality", "realingo": "Realingo", "bezrealitky": "Bezrealitky", "idnes": "iDNES", "psn": "PSN", "cityhome": "CityHome"} source_colors = {"sreality": "#1976D2", "realingo": "#00897B", "bezrealitky": "#E91E63", "idnes": "#FF6F00", "psn": "#D32F2F", "cityhome": "#D32F2F"} source_label = source_labels.get(source, source) source_color = source_colors.get(source, "#999") hash_id = e.get("hash_id", "") popup = ( f'
' f'{format_price(e["price"])}' f'{source_label}
' f'{e["disposition"]} | {area_text} | {floor_text}' f'{floor_note}

' f'{e["locality"]}
' f'Stavba: {building_text}
' f'Vlastnictví: {ownership_text}

' f'' f'→ Otevřít na {source_label}' f'
' f'
' f'' f'' f'' f'
' f'' f'
' ) # Escape for JS popup = popup.replace("'", "\\'").replace("\n", "") is_fav = source in ("psn", "cityhome") marker_fn = "addHeartMarker" if is_fav else "addMarker" markers_js += ( f" {marker_fn}({e['lat']}, {e['lon']}, '{color}', '{popup}', '{hash_id}');\n" ) # Build legend legend_items = "" disp_counts = {} for e in estates: d = e["disposition"] disp_counts[d] = disp_counts.get(d, 0) + 1 for disp, color in color_map.items(): count = disp_counts.get(disp, 0) if count > 0: legend_items += ( f'
' f'' f'{disp} ({count})
' ) # Heart marker legend for PSN/CityHome fav_count = sum(1 for e in estates if e.get("source") in ("psn", "cityhome")) if fav_count > 0: legend_items += ( f'
' f'' f'' f'PSN / CityHome ({fav_count})
' ) # Price stats prices = [e["price"] for e in estates if e["price"] > 0] min_price = format_price(min(prices)) if prices else "N/A" max_price = format_price(max(prices)) if prices else "N/A" avg_price = format_price(int(sum(prices) / len(prices))) if prices else "N/A" html = f""" Byty v Praze — mapa ({len(estates)} bytů)

Byty v Praze

Celkem: {len(estates)} bytů
Cena: {min_price} — {max_price}
Průměr: {avg_price}
Dispozice:
{legend_items}
Filtry:
⭐ 0 oblíbených, 🚫 0 zamítnutých
""" path = Path(output_path) path.write_text(html, encoding="utf-8") logger.info(f"\n✓ Mapa uložena: {path.resolve()}") return str(path.resolve()) # ── Main ───────────────────────────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser(description="Scrape apartments from Sreality.cz") parser.add_argument("--max-pages", type=int, help="Maximum number of pages to scrape") parser.add_argument("--max-properties", type=int, help="Maximum number of properties to fetch details for") parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging level (default: INFO)") args = parser.parse_args() # Configure logging logging.basicConfig( level=getattr(logging, args.log_level), format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s", handlers=[logging.StreamHandler()] ) start = time.time() estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties) if estates: # Save raw data as JSON backup json_path = Path("byty_sreality.json") json_path.write_text( json.dumps(estates, ensure_ascii=False, indent=2), encoding="utf-8", ) logger.info(f"✓ Data uložena: {json_path.resolve()}") # Generate map map_path = generate_map(estates) elapsed = time.time() - start logger.info(f"\n⏱ Celkový čas: {elapsed:.0f} s") logger.info(f"\nOtevři v prohlížeči:\n file://{map_path}") else: logger.info("\nŽádné byty neodpovídají kritériím :(")