#!/usr/bin/env python3 """ Reality iDNES scraper. Stáhne byty na prodej v Praze a vyfiltruje podle kritérií. Výstup: byty_idnes.json """ from __future__ import annotations import argparse from datetime import datetime import json import logging import math import re import time import urllib.request import urllib.parse from html.parser import HTMLParser from pathlib import Path logger = logging.getLogger(__name__) # ── Konfigurace ───────────────────────────────────────────────────────────── MAX_PRICE = 13_500_000 MIN_AREA = 69 MIN_FLOOR = 2 PER_PAGE = 26 # iDNES vrací 26 na stránku # Dispozice — kódy pro s-qc[subtypeFlat] DISPOSITION_CODES = "3k|31|4k|41|5k|51|6k" # Mapování dispozice z titulku na label DISPOSITION_MAP = { "3+kk": "3+kk", "3+1": "3+1", "4+kk": "4+kk", "4+1": "4+1", "5+kk": "5+kk", "5+1": "5+1", "6+kk": "6+", "6+1": "6+", "6 a více": "6+", } HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "cs,en;q=0.9", "Accept-Encoding": "identity", "Connection": "keep-alive", } BASE_URL = "https://reality.idnes.cz" MAX_RETRIES = 5 def fetch_url(url: str) -> str: """Fetch URL and return HTML string with retry logic.""" for attempt in range(MAX_RETRIES): try: logger.debug(f"HTTP GET request (attempt {attempt + 1}/{MAX_RETRIES}): {url}") logger.debug(f"Headers: {HEADERS}") req = urllib.request.Request(url, headers=HEADERS) resp = urllib.request.urlopen(req, timeout=30) data = resp.read() logger.debug(f"HTTP response: status={resp.status}, size={len(data)} bytes") return data.decode("utf-8") except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e: if attempt < MAX_RETRIES - 1: wait = (attempt + 1) * 3 # 3, 6, 9, 12s logger.warning(f"Connection error (retry {attempt + 1}/{MAX_RETRIES} after {wait}s): {e}") time.sleep(wait) else: logger.error(f"HTTP request failed after {MAX_RETRIES} attempts: {e}", exc_info=True) raise def build_list_url(page: int = 0) -> str: """Build listing URL with all filters.""" base = f"{BASE_URL}/s/prodej/byty/cena-do-{MAX_PRICE}/praha/" params = { "s-qc[subtypeFlat]": DISPOSITION_CODES, "s-qc[usableAreaMin]": str(MIN_AREA), } url = f"{base}?{urllib.parse.urlencode(params)}" if page > 0: url += f"&page={page}" return url def parse_total_count(html: str) -> int: """Extract total listing count from page.""" # Look for "720 inzerátů" or similar match = re.search(r'(\d[\d\s]*)\s*inzerát', html) if match: return int(match.group(1).replace(" ", "").replace("\xa0", "")) return 0 def parse_listings(html: str) -> list[dict]: """Parse listing cards from HTML using regex.""" results = [] # Find each listing block — look for c-products__link with detail URL # Pattern: ... block ... # Each listing card contains: title (h2), price (strong), info (p.c-products__info) # Split by listing items, skip ads items = re.findall( r']*class="c-products__item(?:(?!advertisment)[^"]*)"[^>]*>(.*?)\s*\s*', html, re.DOTALL ) # Alternative: find all detail links and extract surrounding context # More robust approach: find each detail link and parse nearby elements link_pattern = re.compile( r']*href="([^"]*?/detail/[^"]*?)"[^>]*class="c-products__link"[^>]*>', re.DOTALL ) # Also match when class comes before href link_pattern2 = re.compile( r']*class="c-products__link"[^>]*href="([^"]*?/detail/[^"]*?)"[^>]*>', re.DOTALL ) # Find all c-products__link anchors all_links = link_pattern.findall(html) + link_pattern2.findall(html) seen_urls = set() # For each link, find the surrounding product block for link_url in all_links: if link_url in seen_urls: continue seen_urls.add(link_url) # Find context around this link (the product card) escaped_url = re.escape(link_url) context_match = re.search( escaped_url + r'(.*?)\s*', html, re.DOTALL ) if not context_match: continue block = context_match.group(1) # Ensure full URL url = link_url if not url.startswith("http"): url = BASE_URL + url # Skip ads ad_check_start = max(0, context_match.start() - 500) ad_block = html[ad_check_start:context_match.start()] if "advertisment" in ad_block or "advertisement" in ad_block: continue # Parse title:

prodej bytu 3+kk 79 m2

title_match = re.search(r'class="c-products__title"[^>]*>(.*?)', block, re.DOTALL) title = re.sub(r'<[^>]+>', '', title_match.group(1)).strip().lower() if title_match else "" # Parse price:

12 950 000 Kč

price_match = re.search(r'c-products__price[^>]*>.*?(.*?)', block, re.DOTALL) price_text = re.sub(r'<[^>]+>', '', price_match.group(1)).strip() if price_match else "" # Parse address:

Klečkova, Praha 5 - Stodůlky

info_match = re.search(r'class="c-products__info"[^>]*>(.*?)

', block, re.DOTALL) info = re.sub(r'<[^>]+>', '', info_match.group(1)).strip() if info_match else "" # Parse disposition and area from title disp_match = re.search(r'(\d\+(?:kk|\d))', title) area_match = re.search(r'(\d+)\s*m[²2]', title) disposition = disp_match.group(1) if disp_match else None area = int(area_match.group(1)) if area_match else None if not disposition and ("6 a" in title or "6+" in title): disposition = "6+" # Parse price price = 0 if price_text and "vyžádání" not in price_text.lower(): price_clean = re.sub(r'[^\d]', '', price_text) if price_clean: price = int(price_clean) # Extract listing ID from URL id_match = re.search(r'/([a-f0-9]{24})/?', url) listing_id = id_match.group(1) if id_match else url results.append({ "id": listing_id, "url": url, "disposition": DISPOSITION_MAP.get(disposition, disposition or "?"), "area": area, "price": price, "locality": info, }) return results def parse_detail(html: str) -> dict: """Parse detail page for GPS, floor, construction, ownership.""" detail = {} # 1. Parse dataLayer.push() for GPS and other data dl_match = re.search( r'dataLayer\.push\(\s*(\{[^}]+?"listing_lat"[^}]+?\})\s*\)', html, re.DOTALL ) if dl_match: # Clean up JS object to valid JSON js_obj = dl_match.group(1) # Replace single quotes with double, handle trailing commas, etc. # The dataLayer is usually valid JSON-like, let's try parsing try: # Remove JS comments, handle unquoted keys # Most importantly: listing_lat, listing_lon, listing_price, listing_area lat_match = re.search(r'"listing_lat"\s*:\s*([\d.]+)', js_obj) lon_match = re.search(r'"listing_lon"\s*:\s*([\d.]+)', js_obj) if lat_match: detail["lat"] = float(lat_match.group(1)) if lon_match: detail["lon"] = float(lon_match.group(1)) except (ValueError, AttributeError): pass # 2. Parse DT/DD pairs for floor, construction, ownership # Pattern:
Label
Value
dt_dd_pairs = re.findall( r']*>(.*?)\s*]*>(.*?)', html, re.DOTALL ) for dt, dd in dt_dd_pairs: dt_clean = re.sub(r'<[^>]+>', '', dt).strip().lower() dd_clean = re.sub(r'<[^>]+>', '', dd).strip() if "podlaží" in dt_clean or "podlazi" in dt_clean or "patro" in dt_clean: # "2. patro (3. NP)" or "3. podlaží z celkem 5" # Try to find NP first np_match = re.search(r'(\d+)\.\s*NP', dd_clean) if np_match: detail["floor"] = int(np_match.group(1)) else: # Try "X. patro" — patro = NP - 1 usually, but iDNES seems to use NP directly patro_match = re.search(r'(\d+)', dd_clean) if patro_match: detail["floor"] = int(patro_match.group(1)) if "konstrukce" in dt_clean or "stavba" in dt_clean: detail["construction"] = dd_clean.lower() if "vlastnictví" in dt_clean or "vlastnictvi" in dt_clean: detail["ownership"] = dd_clean return detail def format_price(price: int) -> str: s = str(price) parts = [] while s: parts.append(s[-3:]) s = s[:-3] return " ".join(reversed(parts)) + " Kč" def load_cache(json_path: str = "byty_idnes.json") -> dict[str, dict]: """Load previously scraped data as cache keyed by hash_id.""" path = Path(json_path) if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return {str(e["hash_id"]): e for e in data if "hash_id" in e} except (json.JSONDecodeError, KeyError): return {} def scrape(max_pages: int | None = None, max_properties: int | None = None): cache = load_cache() logger.info("=" * 60) logger.info("Stahuji inzeráty z Reality iDNES") logger.info(f"Cena: do {format_price(MAX_PRICE)}") logger.info(f"Min. plocha: {MIN_AREA} m²") logger.info(f"Patro: od {MIN_FLOOR}. NP") logger.info(f"Region: Praha") if cache: logger.info(f"Cache: {len(cache)} bytů z minulého běhu") if max_pages: logger.info(f"Max. stran: {max_pages}") if max_properties: logger.info(f"Max. bytů: {max_properties}") logger.info("=" * 60) # Step 1: Fetch listing pages logger.info("\nFáze 1: Stahování seznamu inzerátů...") all_listings = {} # id -> listing dict page = 0 total = None while True: if max_pages and page >= max_pages: logger.debug(f"Max pages limit reached: {max_pages}") break url = build_list_url(page) logger.info(f"Strana {page + 1} ...") html = fetch_url(url) if total is None: total = parse_total_count(html) total_pages = math.ceil(total / PER_PAGE) if total > 0 else 1 logger.info(f"→ Celkem {total} inzerátů, ~{total_pages} stran") listings = parse_listings(html) logger.debug(f"Page {page}: found {len(listings)} listings") if not listings: logger.debug(f"No listings found on page {page}, stopping") break for item in listings: lid = item["id"] if lid not in all_listings: all_listings[lid] = item page += 1 if total and page >= math.ceil(total / PER_PAGE): break time.sleep(1.0) logger.info(f"\nStaženo: {len(all_listings)} unikátních inzerátů") # Step 2: Pre-filter by price and area from list data pre_filtered = [] excluded_price = 0 excluded_area = 0 excluded_disp = 0 for item in all_listings.values(): item_id = item["id"] if item["price"] <= 0 or item["price"] > MAX_PRICE: excluded_price += 1 logger.debug(f"Filter: id={item_id} - excluded (price {item['price']})") continue if item["area"] is not None and item["area"] < MIN_AREA: excluded_area += 1 logger.debug(f"Filter: id={item_id} - excluded (area {item['area']} m²)") continue if item["disposition"] == "?": excluded_disp += 1 logger.debug(f"Filter: id={item_id} - excluded (unknown disposition)") continue pre_filtered.append(item) logger.info(f"\nPo předfiltraci:") logger.info(f" Vyloučeno (cena): {excluded_price}") logger.info(f" Vyloučeno (plocha): {excluded_area}") logger.info(f" Vyloučeno (dispozice): {excluded_disp}") logger.info(f" Zbývá: {len(pre_filtered)}") # Step 3: Fetch details for GPS, floor, construction logger.info(f"\nFáze 2: Stahování detailů ({len(pre_filtered)} bytů)...") results = [] excluded_panel = 0 excluded_floor = 0 excluded_no_gps = 0 excluded_detail = 0 cache_hits = 0 properties_fetched = 0 for i, item in enumerate(pre_filtered): if max_properties and properties_fetched >= max_properties: logger.debug(f"Max properties limit reached: {max_properties}") break # Check cache — if hash_id exists and price unchanged, reuse cached = cache.get(str(item["id"])) if cached and cached.get("price") == item["price"]: cache_hits += 1 logger.debug(f"Cache hit for id={item['id']}") results.append(cached) continue url = item["url"] time.sleep(0.4) try: html = fetch_url(url) except Exception as e: excluded_detail += 1 logger.warning(f"Detail failed for id={item['id']}: {e}") continue detail = parse_detail(html) logger.debug(f"Detail parsed for id={item['id']}: lat={detail.get('lat')}, lon={detail.get('lon')}, floor={detail.get('floor')}") # Must have GPS if not detail.get("lat") or not detail.get("lon"): excluded_no_gps += 1 logger.debug(f"Filter: id={item['id']} - excluded (no GPS)") continue # Check construction — exclude panel construction = detail.get("construction", "") if "panel" in construction: excluded_panel += 1 logger.debug(f"Filter: id={item['id']} - excluded (panel construction)") logger.info(f"✗ Vyloučen {item['id'][:12]}...: panel ({construction})") continue # Check for sídliště in construction/description if "sídliště" in construction or "sidliste" in construction: excluded_panel += 1 logger.debug(f"Filter: id={item['id']} - excluded (housing estate)") logger.info(f"✗ Vyloučen {item['id'][:12]}...: sídliště") continue # Check floor floor = detail.get("floor") if floor is not None and floor < MIN_FLOOR: excluded_floor += 1 logger.debug(f"Filter: id={item['id']} - excluded (floor {floor})") continue # Map construction to Czech label building_type = "neuvedeno" if construction: if "cihlo" in construction or "cihla" in construction: building_type = "Cihlová" elif "smíšen" in construction or "smisen" in construction: building_type = "Smíšená" elif "skelet" in construction: building_type = "Skeletová" elif "dřevo" in construction or "drevo" in construction: building_type = "Dřevostavba" elif "mont" in construction: building_type = "Montovaná" else: building_type = construction.capitalize() result = { "hash_id": item["id"], "name": f"Prodej bytu {item['disposition']} {item.get('area', '?')} m²", "price": item["price"], "price_formatted": format_price(item["price"]), "locality": item["locality"], "lat": detail["lat"], "lon": detail["lon"], "disposition": item["disposition"], "floor": floor, "area": item["area"], "building_type": building_type, "ownership": detail.get("ownership", "neuvedeno"), "url": item["url"], "source": "idnes", "image": "", "scraped_at": datetime.now().strftime("%Y-%m-%d"), } results.append(result) properties_fetched += 1 if (i + 1) % 20 == 0: logger.info(f"Zpracováno {i + 1}/{len(pre_filtered)} ...") logger.info(f"\n{'=' * 60}") logger.info(f"Výsledky Reality iDNES:") logger.info(f" Předfiltrováno: {len(pre_filtered)}") logger.info(f" Z cache (přeskočeno): {cache_hits}") logger.info(f" Vyloučeno (panel/síd): {excluded_panel}") logger.info(f" Vyloučeno (patro): {excluded_floor}") logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}") logger.info(f" Vyloučeno (bez detailu): {excluded_detail}") logger.info(f" ✓ Vyhovující byty: {len(results)}") logger.info(f"{'=' * 60}") return results if __name__ == "__main__": parser = argparse.ArgumentParser(description="Scrape apartments from Reality iDNES") parser.add_argument("--max-pages", type=int, default=None, help="Maximum number of listing pages to scrape") parser.add_argument("--max-properties", type=int, default=None, help="Maximum number of properties to fetch details for") parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging level (default: INFO)") args = parser.parse_args() # Configure logging logging.basicConfig( level=getattr(logging, args.log_level), format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s", handlers=[logging.StreamHandler()] ) start = time.time() estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties) if estates: json_path = Path("byty_idnes.json") json_path.write_text( json.dumps(estates, ensure_ascii=False, indent=2), encoding="utf-8", ) elapsed = time.time() - start logger.info(f"\n✓ Data uložena: {json_path.resolve()}") logger.info(f"⏱ Celkový čas: {elapsed:.0f} s") else: logger.info("\nŽádné byty z Reality iDNES neodpovídají kritériím :(")