Files
maru-hleda-byt/generate_status.py
Marie Michalova b8d4d44164 Rewrite PSN + CityHome scrapers, add price/m² map coloring, ratings system, and status dashboard
- Rewrite PSN scraper to use /api/units-list endpoint (single API call, no HTML parsing)
- Fix CityHome scraper: GPS from multiple URL patterns, address from table cells, no 404 retries
- Color map markers by price/m² instead of disposition (blue→green→orange→red scale)
- Add persistent rating system (favorite/reject) with Flask ratings server and localStorage fallback
- Rejected markers show original color at reduced opacity with 🚫 SVG overlay
- Favorite markers shown as  star icons with gold pulse animation
- Add "new today" marker logic (scraped_at == today) with larger pulsing green outline
- Add filter panel with floor, price, hide-rejected controls and ☰/✕ toggle buttons
- Add generate_status.py for scraper run statistics and status.html dashboard
- Add scraped_at field to all scrapers for freshness tracking
- Update run_all.sh with log capture and status generation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 15:15:25 +01:00

203 lines
6.4 KiB
Python

#!/usr/bin/env python3
"""Generate status.json from scraper JSON outputs and run log."""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
HERE = Path(__file__).parent
SOURCE_FILES = {
"Sreality": "byty_sreality.json",
"Realingo": "byty_realingo.json",
"Bezrealitky": "byty_bezrealitky.json",
"iDNES": "byty_idnes.json",
"PSN": "byty_psn.json",
"CityHome": "byty_cityhome.json",
}
MERGED_FILE = "byty_merged.json"
def count_source(path: Path) -> dict:
"""Read a scraper JSON and return accepted count + file mtime."""
if not path.exists():
return {"accepted": 0, "error": "soubor nenalezen"}
try:
data = json.loads(path.read_text(encoding="utf-8"))
mtime = datetime.fromtimestamp(path.stat().st_mtime).isoformat(timespec="seconds")
return {"accepted": len(data), "updated_at": mtime}
except Exception as e:
return {"accepted": 0, "error": str(e)}
def parse_log(log_path: str) -> dict[str, dict]:
"""Parse scraper run log and extract per-source statistics.
Scrapers log summary lines like:
✓ Vyhovující byty: 12
Vyloučeno (prodáno): 5
Staženo stránek: 3
Staženo inzerátů: 48
Celkem bytů v cache: 120
and section headers like:
[2/6] Realingo
"""
if not log_path or not os.path.exists(log_path):
return {}
with open(log_path, encoding="utf-8") as f:
content = f.read()
# Split into per-source sections by the [N/6] Step header
# Each section header looks like "[2/6] Realingo\n----..."
section_pattern = re.compile(r'\[(\d+)/\d+\]\s+(.+)\n-+', re.MULTILINE)
sections_found = list(section_pattern.finditer(content))
if not sections_found:
return {}
stats = {}
for i, match in enumerate(sections_found):
step_name = match.group(2).strip()
start = match.end()
end = sections_found[i + 1].start() if i + 1 < len(sections_found) else len(content)
section_text = content[start:end]
# Identify which sources this section covers
# "PSN + CityHome" covers both
source_names = []
for name in SOURCE_FILES:
if name.lower() in step_name.lower():
source_names.append(name)
if not source_names:
continue
# Parse numeric summary lines
def extract(pattern: str) -> Optional[int]:
m = re.search(pattern, section_text)
return int(m.group(1)) if m else None
# Lines present in all/most scrapers
accepted = extract(r'Vyhovující byty[:\s]+(\d+)')
fetched = extract(r'Staženo inzerátů[:\s]+(\d+)')
pages = extract(r'Staženo stránek[:\s]+(\d+)')
cached = extract(r'Celkem bytů v cache[:\s]+(\d+)')
cache_hits = extract(r'Cache hit[:\s]+(\d+)')
# Rejection reasons — collect all into a dict
excluded = {}
for m in re.finditer(r'Vyloučeno\s+\(([^)]+)\)[:\s]+(\d+)', section_text):
excluded[m.group(1)] = int(m.group(2))
# Also PSN-style "Vyloučeno (prodáno): N"
total_excluded = sum(excluded.values()) if excluded else extract(r'Vyloučen\w*[:\s]+(\d+)')
entry = {}
if accepted is not None:
entry["accepted"] = accepted
if fetched is not None:
entry["fetched"] = fetched
if pages is not None:
entry["pages"] = pages
if cached is not None:
entry["cached"] = cached
if cache_hits is not None:
entry["cache_hits"] = cache_hits
if excluded:
entry["excluded"] = excluded
elif total_excluded is not None:
entry["excluded_total"] = total_excluded
for name in source_names:
stats[name] = entry
return stats
def main():
start_time = None
duration_sec = None
if len(sys.argv) >= 3:
start_time = sys.argv[1]
try:
duration_sec = int(sys.argv[2])
except ValueError:
pass
if not start_time:
start_time = datetime.now().isoformat(timespec="seconds")
log_path = sys.argv[3] if len(sys.argv) >= 4 else None
log_stats = parse_log(log_path)
sources = []
for name, filename in SOURCE_FILES.items():
path = HERE / filename
info = count_source(path)
info["name"] = name
# Merge log stats
ls = log_stats.get(name, {})
for k in ("fetched", "pages", "cached", "cache_hits", "excluded", "excluded_total"):
if k in ls:
info[k] = ls[k]
# Override accepted from log if available (log is authoritative for latest run)
if "accepted" in ls:
info["accepted"] = ls["accepted"]
sources.append(info)
# Total accepted before dedup
total_accepted = sum(s.get("accepted", 0) for s in sources)
# Merged / deduplicated count
merged_path = HERE / MERGED_FILE
deduplicated = 0
if merged_path.exists():
try:
merged = json.loads(merged_path.read_text(encoding="utf-8"))
deduplicated = len(merged)
except Exception:
pass
duplicates_removed = total_accepted - deduplicated if deduplicated else 0
status = {
"status": "done",
"timestamp": start_time,
"duration_sec": duration_sec,
"total_accepted": total_accepted,
"deduplicated": deduplicated,
"duplicates_removed": duplicates_removed,
"sources": sources,
}
out = HERE / "status.json"
out.write_text(json.dumps(status, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Status uložen: {out}")
print(f" Celkem bytů (před dedup): {total_accepted}")
print(f" Po deduplikaci: {deduplicated}")
if duplicates_removed:
print(f" Odstraněno duplikátů: {duplicates_removed}")
for s in sources:
acc = s.get("accepted", 0)
err = s.get("error", "")
exc = s.get("excluded", {})
exc_total = sum(exc.values()) if exc else s.get("excluded_total", 0)
parts = [f"{s['name']:12s}: {acc} bytů"]
if exc_total:
parts.append(f"({exc_total} vyloučeno)")
if err:
parts.append(f"[CHYBA: {err}]")
print(" " + " ".join(parts))
if __name__ == "__main__":
main()