Files
maru-hleda-byt/scrape_idnes.py
Marie Michalova b8d4d44164 Rewrite PSN + CityHome scrapers, add price/m² map coloring, ratings system, and status dashboard
- Rewrite PSN scraper to use /api/units-list endpoint (single API call, no HTML parsing)
- Fix CityHome scraper: GPS from multiple URL patterns, address from table cells, no 404 retries
- Color map markers by price/m² instead of disposition (blue→green→orange→red scale)
- Add persistent rating system (favorite/reject) with Flask ratings server and localStorage fallback
- Rejected markers show original color at reduced opacity with 🚫 SVG overlay
- Favorite markers shown as  star icons with gold pulse animation
- Add "new today" marker logic (scraped_at == today) with larger pulsing green outline
- Add filter panel with floor, price, hide-rejected controls and ☰/✕ toggle buttons
- Add generate_status.py for scraper run statistics and status.html dashboard
- Add scraped_at field to all scrapers for freshness tracking
- Update run_all.sh with log capture and status generation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 15:15:25 +01:00

515 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Reality iDNES scraper.
Stáhne byty na prodej v Praze a vyfiltruje podle kritérií.
Výstup: byty_idnes.json
"""
from __future__ import annotations
import argparse
from datetime import datetime
import json
import logging
import math
import re
import time
import urllib.request
import urllib.parse
from html.parser import HTMLParser
from pathlib import Path
logger = logging.getLogger(__name__)
# ── Konfigurace ─────────────────────────────────────────────────────────────
MAX_PRICE = 13_500_000
MIN_AREA = 69
MIN_FLOOR = 2
PER_PAGE = 26 # iDNES vrací 26 na stránku
# Dispozice — kódy pro s-qc[subtypeFlat]
DISPOSITION_CODES = "3k|31|4k|41|5k|51|6k"
# Mapování dispozice z titulku na label
DISPOSITION_MAP = {
"3+kk": "3+kk", "3+1": "3+1",
"4+kk": "4+kk", "4+1": "4+1",
"5+kk": "5+kk", "5+1": "5+1",
"6+kk": "6+", "6+1": "6+",
"6 a více": "6+",
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "cs,en;q=0.9",
"Accept-Encoding": "identity",
"Connection": "keep-alive",
}
BASE_URL = "https://reality.idnes.cz"
MAX_RETRIES = 5
def fetch_url(url: str) -> str:
"""Fetch URL and return HTML string with retry logic."""
for attempt in range(MAX_RETRIES):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{MAX_RETRIES}): {url}")
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
data = resp.read()
logger.debug(f"HTTP response: status={resp.status}, size={len(data)} bytes")
return data.decode("utf-8")
except (ConnectionResetError, ConnectionError, urllib.error.URLError,
OSError) as e:
if attempt < MAX_RETRIES - 1:
wait = (attempt + 1) * 3 # 3, 6, 9, 12s
logger.warning(f"Connection error (retry {attempt + 1}/{MAX_RETRIES} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {MAX_RETRIES} attempts: {e}", exc_info=True)
raise
def build_list_url(page: int = 0) -> str:
"""Build listing URL with all filters."""
base = f"{BASE_URL}/s/prodej/byty/cena-do-{MAX_PRICE}/praha/"
params = {
"s-qc[subtypeFlat]": DISPOSITION_CODES,
"s-qc[usableAreaMin]": str(MIN_AREA),
}
url = f"{base}?{urllib.parse.urlencode(params)}"
if page > 0:
url += f"&page={page}"
return url
def parse_total_count(html: str) -> int:
"""Extract total listing count from page."""
# Look for "720 inzerátů" or similar
match = re.search(r'(\d[\d\s]*)\s*inzerát', html)
if match:
return int(match.group(1).replace(" ", "").replace("\xa0", ""))
return 0
def parse_listings(html: str) -> list[dict]:
"""Parse listing cards from HTML using regex."""
results = []
# Find each listing block — look for c-products__link with detail URL
# Pattern: <a ... class="c-products__link" href="/detail/..."> ... block ... </a>
# Each listing card contains: title (h2), price (strong), info (p.c-products__info)
# Split by listing items, skip ads
items = re.findall(
r'<div[^>]*class="c-products__item(?:(?!advertisment)[^"]*)"[^>]*>(.*?)</div>\s*</div>\s*</div>',
html, re.DOTALL
)
# Alternative: find all detail links and extract surrounding context
# More robust approach: find each detail link and parse nearby elements
link_pattern = re.compile(
r'<a[^>]*href="([^"]*?/detail/[^"]*?)"[^>]*class="c-products__link"[^>]*>',
re.DOTALL
)
# Also match when class comes before href
link_pattern2 = re.compile(
r'<a[^>]*class="c-products__link"[^>]*href="([^"]*?/detail/[^"]*?)"[^>]*>',
re.DOTALL
)
# Find all c-products__link anchors
all_links = link_pattern.findall(html) + link_pattern2.findall(html)
seen_urls = set()
# For each link, find the surrounding product block
for link_url in all_links:
if link_url in seen_urls:
continue
seen_urls.add(link_url)
# Find context around this link (the product card)
escaped_url = re.escape(link_url)
context_match = re.search(
escaped_url + r'(.*?)</div>\s*</div>',
html, re.DOTALL
)
if not context_match:
continue
block = context_match.group(1)
# Ensure full URL
url = link_url
if not url.startswith("http"):
url = BASE_URL + url
# Skip ads
ad_check_start = max(0, context_match.start() - 500)
ad_block = html[ad_check_start:context_match.start()]
if "advertisment" in ad_block or "advertisement" in ad_block:
continue
# Parse title: <h2 class="c-products__title">prodej bytu 3+kk 79 m2</h2>
title_match = re.search(r'class="c-products__title"[^>]*>(.*?)</h2>', block, re.DOTALL)
title = re.sub(r'<[^>]+>', '', title_match.group(1)).strip().lower() if title_match else ""
# Parse price: <p class="c-products__price"><strong>12 950 000 Kč</strong></p>
price_match = re.search(r'c-products__price[^>]*>.*?<strong>(.*?)</strong>', block, re.DOTALL)
price_text = re.sub(r'<[^>]+>', '', price_match.group(1)).strip() if price_match else ""
# Parse address: <p class="c-products__info">Klečkova, Praha 5 - Stodůlky</p>
info_match = re.search(r'class="c-products__info"[^>]*>(.*?)</p>', block, re.DOTALL)
info = re.sub(r'<[^>]+>', '', info_match.group(1)).strip() if info_match else ""
# Parse disposition and area from title
disp_match = re.search(r'(\d\+(?:kk|\d))', title)
area_match = re.search(r'(\d+)\s*m[²2]', title)
disposition = disp_match.group(1) if disp_match else None
area = int(area_match.group(1)) if area_match else None
if not disposition and ("6 a" in title or "6+" in title):
disposition = "6+"
# Parse price
price = 0
if price_text and "vyžádání" not in price_text.lower():
price_clean = re.sub(r'[^\d]', '', price_text)
if price_clean:
price = int(price_clean)
# Extract listing ID from URL
id_match = re.search(r'/([a-f0-9]{24})/?', url)
listing_id = id_match.group(1) if id_match else url
results.append({
"id": listing_id,
"url": url,
"disposition": DISPOSITION_MAP.get(disposition, disposition or "?"),
"area": area,
"price": price,
"locality": info,
})
return results
def parse_detail(html: str) -> dict:
"""Parse detail page for GPS, floor, construction, ownership."""
detail = {}
# 1. Parse dataLayer.push() for GPS and other data
dl_match = re.search(
r'dataLayer\.push\(\s*(\{[^}]+?"listing_lat"[^}]+?\})\s*\)',
html, re.DOTALL
)
if dl_match:
# Clean up JS object to valid JSON
js_obj = dl_match.group(1)
# Replace single quotes with double, handle trailing commas, etc.
# The dataLayer is usually valid JSON-like, let's try parsing
try:
# Remove JS comments, handle unquoted keys
# Most importantly: listing_lat, listing_lon, listing_price, listing_area
lat_match = re.search(r'"listing_lat"\s*:\s*([\d.]+)', js_obj)
lon_match = re.search(r'"listing_lon"\s*:\s*([\d.]+)', js_obj)
if lat_match:
detail["lat"] = float(lat_match.group(1))
if lon_match:
detail["lon"] = float(lon_match.group(1))
except (ValueError, AttributeError):
pass
# 2. Parse DT/DD pairs for floor, construction, ownership
# Pattern: <dt>Label</dt><dd>Value</dd>
dt_dd_pairs = re.findall(
r'<dt[^>]*>(.*?)</dt>\s*<dd[^>]*>(.*?)</dd>',
html, re.DOTALL
)
for dt, dd in dt_dd_pairs:
dt_clean = re.sub(r'<[^>]+>', '', dt).strip().lower()
dd_clean = re.sub(r'<[^>]+>', '', dd).strip()
if "podlaží" in dt_clean or "podlazi" in dt_clean or "patro" in dt_clean:
# "2. patro (3. NP)" or "3. podlaží z celkem 5"
# Try to find NP first
np_match = re.search(r'(\d+)\.\s*NP', dd_clean)
if np_match:
detail["floor"] = int(np_match.group(1))
else:
# Try "X. patro" — patro = NP - 1 usually, but iDNES seems to use NP directly
patro_match = re.search(r'(\d+)', dd_clean)
if patro_match:
detail["floor"] = int(patro_match.group(1))
if "konstrukce" in dt_clean or "stavba" in dt_clean:
detail["construction"] = dd_clean.lower()
if "vlastnictví" in dt_clean or "vlastnictvi" in dt_clean:
detail["ownership"] = dd_clean
return detail
def format_price(price: int) -> str:
s = str(price)
parts = []
while s:
parts.append(s[-3:])
s = s[:-3]
return " ".join(reversed(parts)) + ""
def load_cache(json_path: str = "byty_idnes.json") -> dict[str, dict]:
"""Load previously scraped data as cache keyed by hash_id."""
path = Path(json_path)
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return {str(e["hash_id"]): e for e in data if "hash_id" in e}
except (json.JSONDecodeError, KeyError):
return {}
def scrape(max_pages: int | None = None, max_properties: int | None = None):
cache = load_cache()
logger.info("=" * 60)
logger.info("Stahuji inzeráty z Reality iDNES")
logger.info(f"Cena: do {format_price(MAX_PRICE)}")
logger.info(f"Min. plocha: {MIN_AREA}")
logger.info(f"Patro: od {MIN_FLOOR}. NP")
logger.info(f"Region: Praha")
if cache:
logger.info(f"Cache: {len(cache)} bytů z minulého běhu")
if max_pages:
logger.info(f"Max. stran: {max_pages}")
if max_properties:
logger.info(f"Max. bytů: {max_properties}")
logger.info("=" * 60)
# Step 1: Fetch listing pages
logger.info("\nFáze 1: Stahování seznamu inzerátů...")
all_listings = {} # id -> listing dict
page = 0
total = None
while True:
if max_pages and page >= max_pages:
logger.debug(f"Max pages limit reached: {max_pages}")
break
url = build_list_url(page)
logger.info(f"Strana {page + 1} ...")
html = fetch_url(url)
if total is None:
total = parse_total_count(html)
total_pages = math.ceil(total / PER_PAGE) if total > 0 else 1
logger.info(f"→ Celkem {total} inzerátů, ~{total_pages} stran")
listings = parse_listings(html)
logger.debug(f"Page {page}: found {len(listings)} listings")
if not listings:
logger.debug(f"No listings found on page {page}, stopping")
break
for item in listings:
lid = item["id"]
if lid not in all_listings:
all_listings[lid] = item
page += 1
if total and page >= math.ceil(total / PER_PAGE):
break
time.sleep(1.0)
logger.info(f"\nStaženo: {len(all_listings)} unikátních inzerátů")
# Step 2: Pre-filter by price and area from list data
pre_filtered = []
excluded_price = 0
excluded_area = 0
excluded_disp = 0
for item in all_listings.values():
item_id = item["id"]
if item["price"] <= 0 or item["price"] > MAX_PRICE:
excluded_price += 1
logger.debug(f"Filter: id={item_id} - excluded (price {item['price']})")
continue
if item["area"] is not None and item["area"] < MIN_AREA:
excluded_area += 1
logger.debug(f"Filter: id={item_id} - excluded (area {item['area']} m²)")
continue
if item["disposition"] == "?":
excluded_disp += 1
logger.debug(f"Filter: id={item_id} - excluded (unknown disposition)")
continue
pre_filtered.append(item)
logger.info(f"\nPo předfiltraci:")
logger.info(f" Vyloučeno (cena): {excluded_price}")
logger.info(f" Vyloučeno (plocha): {excluded_area}")
logger.info(f" Vyloučeno (dispozice): {excluded_disp}")
logger.info(f" Zbývá: {len(pre_filtered)}")
# Step 3: Fetch details for GPS, floor, construction
logger.info(f"\nFáze 2: Stahování detailů ({len(pre_filtered)} bytů)...")
results = []
excluded_panel = 0
excluded_floor = 0
excluded_no_gps = 0
excluded_detail = 0
cache_hits = 0
properties_fetched = 0
for i, item in enumerate(pre_filtered):
if max_properties and properties_fetched >= max_properties:
logger.debug(f"Max properties limit reached: {max_properties}")
break
# Check cache — if hash_id exists and price unchanged, reuse
cached = cache.get(str(item["id"]))
if cached and cached.get("price") == item["price"]:
cache_hits += 1
logger.debug(f"Cache hit for id={item['id']}")
results.append(cached)
continue
url = item["url"]
time.sleep(0.4)
try:
html = fetch_url(url)
except Exception as e:
excluded_detail += 1
logger.warning(f"Detail failed for id={item['id']}: {e}")
continue
detail = parse_detail(html)
logger.debug(f"Detail parsed for id={item['id']}: lat={detail.get('lat')}, lon={detail.get('lon')}, floor={detail.get('floor')}")
# Must have GPS
if not detail.get("lat") or not detail.get("lon"):
excluded_no_gps += 1
logger.debug(f"Filter: id={item['id']} - excluded (no GPS)")
continue
# Check construction — exclude panel
construction = detail.get("construction", "")
if "panel" in construction:
excluded_panel += 1
logger.debug(f"Filter: id={item['id']} - excluded (panel construction)")
logger.info(f"✗ Vyloučen {item['id'][:12]}...: panel ({construction})")
continue
# Check for sídliště in construction/description
if "sídliště" in construction or "sidliste" in construction:
excluded_panel += 1
logger.debug(f"Filter: id={item['id']} - excluded (housing estate)")
logger.info(f"✗ Vyloučen {item['id'][:12]}...: sídliště")
continue
# Check floor
floor = detail.get("floor")
if floor is not None and floor < MIN_FLOOR:
excluded_floor += 1
logger.debug(f"Filter: id={item['id']} - excluded (floor {floor})")
continue
# Map construction to Czech label
building_type = "neuvedeno"
if construction:
if "cihlo" in construction or "cihla" in construction:
building_type = "Cihlová"
elif "smíšen" in construction or "smisen" in construction:
building_type = "Smíšená"
elif "skelet" in construction:
building_type = "Skeletová"
elif "dřevo" in construction or "drevo" in construction:
building_type = "Dřevostavba"
elif "mont" in construction:
building_type = "Montovaná"
else:
building_type = construction.capitalize()
result = {
"hash_id": item["id"],
"name": f"Prodej bytu {item['disposition']} {item.get('area', '?')}",
"price": item["price"],
"price_formatted": format_price(item["price"]),
"locality": item["locality"],
"lat": detail["lat"],
"lon": detail["lon"],
"disposition": item["disposition"],
"floor": floor,
"area": item["area"],
"building_type": building_type,
"ownership": detail.get("ownership", "neuvedeno"),
"url": item["url"],
"source": "idnes",
"image": "",
"scraped_at": datetime.now().strftime("%Y-%m-%d"),
}
results.append(result)
properties_fetched += 1
if (i + 1) % 20 == 0:
logger.info(f"Zpracováno {i + 1}/{len(pre_filtered)} ...")
logger.info(f"\n{'=' * 60}")
logger.info(f"Výsledky Reality iDNES:")
logger.info(f" Předfiltrováno: {len(pre_filtered)}")
logger.info(f" Z cache (přeskočeno): {cache_hits}")
logger.info(f" Vyloučeno (panel/síd): {excluded_panel}")
logger.info(f" Vyloučeno (patro): {excluded_floor}")
logger.info(f" Vyloučeno (bez GPS): {excluded_no_gps}")
logger.info(f" Vyloučeno (bez detailu): {excluded_detail}")
logger.info(f" ✓ Vyhovující byty: {len(results)}")
logger.info(f"{'=' * 60}")
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape apartments from Reality iDNES")
parser.add_argument("--max-pages", type=int, default=None,
help="Maximum number of listing pages to scrape")
parser.add_argument("--max-properties", type=int, default=None,
help="Maximum number of properties to fetch details for")
parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level (default: INFO)")
args = parser.parse_args()
# Configure logging
logging.basicConfig(
level=getattr(logging, args.log_level),
format="[%(levelname)s] %(asctime)s - %(name)s - %(message)s",
handlers=[logging.StreamHandler()]
)
start = time.time()
estates = scrape(max_pages=args.max_pages, max_properties=args.max_properties)
if estates:
json_path = Path("byty_idnes.json")
json_path.write_text(
json.dumps(estates, ensure_ascii=False, indent=2),
encoding="utf-8",
)
elapsed = time.time() - start
logger.info(f"\n✓ Data uložena: {json_path.resolve()}")
logger.info(f"⏱ Celkový čas: {elapsed:.0f} s")
else:
logger.info("\nŽádné byty z Reality iDNES neodpovídají kritériím :(")