#!/usr/bin/env python3 """Generate status.json from scraper JSON outputs and per-scraper stats files.""" from __future__ import annotations import argparse import json import os from datetime import datetime from pathlib import Path HERE = Path(__file__).parent DATA_DIR = Path(os.environ.get("DATA_DIR", HERE)) SOURCE_FILES = { "Sreality": "byty_sreality.json", "Realingo": "byty_realingo.json", "Bezrealitky": "byty_bezrealitky.json", "iDNES": "byty_idnes.json", "PSN": "byty_psn.json", "CityHome": "byty_cityhome.json", } STATS_FILES = { "Sreality": "stats_sreality.json", "Realingo": "stats_realingo.json", "Bezrealitky": "stats_bezrealitky.json", "iDNES": "stats_idnes.json", "PSN": "stats_psn.json", "CityHome": "stats_cityhome.json", } MERGED_FILE = "byty_merged.json" HISTORY_FILE = "scraper_history.json" def count_source(path: Path) -> dict: """Read a scraper JSON and return accepted count + file mtime.""" if not path.exists(): return {"accepted": 0, "error": "soubor nenalezen"} try: data = json.loads(path.read_text(encoding="utf-8")) mtime = datetime.fromtimestamp(path.stat().st_mtime).isoformat(timespec="seconds") return {"accepted": len(data), "updated_at": mtime} except Exception as e: return {"accepted": 0, "error": str(e)} def read_scraper_stats(path: Path) -> dict: """Load a per-scraper stats JSON. Returns {} on missing or corrupt file.""" if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} except Exception: return {} def append_to_history(status: dict, keep: int) -> None: """Append the current status entry to scraper_history.json, keeping only `keep` latest.""" history_path = HERE / HISTORY_FILE history: list = [] if history_path.exists(): try: history = json.loads(history_path.read_text(encoding="utf-8")) if not isinstance(history, list): history = [] except Exception: history = [] history.append(status) # Keep only the N most recent entries if keep > 0 and len(history) > keep: history = history[-keep:] history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Historie uložena: {history_path} ({len(history)} záznamů)") def main(): parser = argparse.ArgumentParser(description="Generate status.json from scraper outputs.") parser.add_argument("--start-time", dest="start_time", default=None, help="ISO timestamp of scrape start (default: now)") parser.add_argument("--duration", dest="duration", type=int, default=None, help="Run duration in seconds") parser.add_argument("--keep", dest="keep", type=int, default=5, help="Number of history entries to keep (default: 5, 0=unlimited)") args = parser.parse_args() start_time = args.start_time or datetime.now().isoformat(timespec="seconds") duration_sec = args.duration sources = [] for name, filename in SOURCE_FILES.items(): path = HERE / filename info = count_source(path) info["name"] = name # Merge in stats from the per-scraper stats file (authoritative for run data) stats = read_scraper_stats(HERE / STATS_FILES[name]) for key in ("accepted", "fetched", "pages", "cache_hits", "excluded", "excluded_total", "success", "duration_sec", "error"): if key in stats: info[key] = stats[key] sources.append(info) # Total accepted before dedup total_accepted = sum(s.get("accepted", 0) for s in sources) # Merged / deduplicated count merged_path = HERE / MERGED_FILE deduplicated = 0 if merged_path.exists(): try: merged = json.loads(merged_path.read_text(encoding="utf-8")) deduplicated = len(merged) except Exception: pass duplicates_removed = total_accepted - deduplicated if deduplicated else 0 # Top-level success: True if no source has an error success = not any("error" in s for s in sources) status = { "status": "done", "timestamp": start_time, "duration_sec": duration_sec, "success": success, "total_accepted": total_accepted, "deduplicated": deduplicated, "duplicates_removed": duplicates_removed, "sources": sources, } out = DATA_DIR / "status.json" out.write_text(json.dumps(status, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Status uložen: {out}") print(f" Celkem bytů (před dedup): {total_accepted}") print(f" Po deduplikaci: {deduplicated}") if duplicates_removed: print(f" Odstraněno duplikátů: {duplicates_removed}") for s in sources: acc = s.get("accepted", 0) err = s.get("error", "") exc = s.get("excluded", {}) exc_total = sum(exc.values()) if exc else s.get("excluded_total", 0) parts = [f"{s['name']:12s}: {acc} bytů"] if exc_total: parts.append(f"({exc_total} vyloučeno)") if err: parts.append(f"[CHYBA: {err}]") print(" " + " ".join(parts)) append_to_history(status, args.keep) if __name__ == "__main__": main()