Add status dashboard, server, scraper stats, and DATA_DIR support
All checks were successful
Build and Push / build (push) Successful in 7s

Key changes:
- Replace ratings_server.py + status.html with a unified server.py that
  serves the map, scraper status dashboard, and ratings API in one process
- Add scraper_stats.py utility: each scraper writes per-run stats (fetched,
  accepted, excluded, duration) to stats_<source>.json for the status page
- generate_status.py: respect DATA_DIR env var so status.json lands in the
  configured data directory instead of always the project root
- run_all.sh: replace the {"status":"running"} overwrite of status.json with
  a dedicated scraper_running.json lock file; trap on EXIT ensures cleanup
  even on kill/error, preventing the previous run's results from being wiped
- server.py: detect running state via scraper_running.json existence instead
  of status["status"] field, eliminating the dual-use race condition
- Makefile: add serve (local dev), debug (Docker debug container) targets;
  add SERVER_PORT variable
- build/Dockerfile + entrypoint.sh: switch to server.py, set DATA_DIR,
  adjust volume mounts
- .gitignore: add *.json and *.log to keep runtime data files out of VCS
- mapa_bytu.html: price-per-m² colouring, status link, UX tweaks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jan Novak
2026-02-26 00:30:25 +01:00
parent 6f49533c94
commit 5fb3b984b6
17 changed files with 929 additions and 1122 deletions

View File

@@ -1,16 +1,15 @@
#!/usr/bin/env python3
"""Generate status.json from scraper JSON outputs and run log."""
"""Generate status.json from scraper JSON outputs and per-scraper stats files."""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
HERE = Path(__file__).parent
DATA_DIR = Path(os.environ.get("DATA_DIR", HERE))
SOURCE_FILES = {
"Sreality": "byty_sreality.json",
@@ -21,7 +20,17 @@ SOURCE_FILES = {
"CityHome": "byty_cityhome.json",
}
STATS_FILES = {
"Sreality": "stats_sreality.json",
"Realingo": "stats_realingo.json",
"Bezrealitky": "stats_bezrealitky.json",
"iDNES": "stats_idnes.json",
"PSN": "stats_psn.json",
"CityHome": "stats_cityhome.json",
}
MERGED_FILE = "byty_merged.json"
HISTORY_FILE = "scraper_history.json"
def count_source(path: Path) -> dict:
@@ -36,105 +45,51 @@ def count_source(path: Path) -> dict:
return {"accepted": 0, "error": str(e)}
def parse_log(log_path: str) -> dict[str, dict]:
"""Parse scraper run log and extract per-source statistics.
Scrapers log summary lines like:
✓ Vyhovující byty: 12
Vyloučeno (prodáno): 5
Staženo stránek: 3
Staženo inzerátů: 48
Celkem bytů v cache: 120
and section headers like:
[2/6] Realingo
"""
if not log_path or not os.path.exists(log_path):
def read_scraper_stats(path: Path) -> dict:
"""Load a per-scraper stats JSON. Returns {} on missing or corrupt file."""
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
with open(log_path, encoding="utf-8") as f:
content = f.read()
# Split into per-source sections by the [N/6] Step header
# Each section header looks like "[2/6] Realingo\n----..."
section_pattern = re.compile(r'\[(\d+)/\d+\]\s+(.+)\n-+', re.MULTILINE)
sections_found = list(section_pattern.finditer(content))
def append_to_history(status: dict, keep: int) -> None:
"""Append the current status entry to scraper_history.json, keeping only `keep` latest."""
history_path = HERE / HISTORY_FILE
history: list = []
if history_path.exists():
try:
history = json.loads(history_path.read_text(encoding="utf-8"))
if not isinstance(history, list):
history = []
except Exception:
history = []
if not sections_found:
return {}
history.append(status)
stats = {}
for i, match in enumerate(sections_found):
step_name = match.group(2).strip()
start = match.end()
end = sections_found[i + 1].start() if i + 1 < len(sections_found) else len(content)
section_text = content[start:end]
# Keep only the N most recent entries
if keep > 0 and len(history) > keep:
history = history[-keep:]
# Identify which sources this section covers
# "PSN + CityHome" covers both
source_names = []
for name in SOURCE_FILES:
if name.lower() in step_name.lower():
source_names.append(name)
if not source_names:
continue
# Parse numeric summary lines
def extract(pattern: str) -> Optional[int]:
m = re.search(pattern, section_text)
return int(m.group(1)) if m else None
# Lines present in all/most scrapers
accepted = extract(r'Vyhovující byty[:\s]+(\d+)')
fetched = extract(r'Staženo inzerátů[:\s]+(\d+)')
pages = extract(r'Staženo stránek[:\s]+(\d+)')
cached = extract(r'Celkem bytů v cache[:\s]+(\d+)')
cache_hits = extract(r'Cache hit[:\s]+(\d+)')
# Rejection reasons — collect all into a dict
excluded = {}
for m in re.finditer(r'Vyloučeno\s+\(([^)]+)\)[:\s]+(\d+)', section_text):
excluded[m.group(1)] = int(m.group(2))
# Also PSN-style "Vyloučeno (prodáno): N"
total_excluded = sum(excluded.values()) if excluded else extract(r'Vyloučen\w*[:\s]+(\d+)')
entry = {}
if accepted is not None:
entry["accepted"] = accepted
if fetched is not None:
entry["fetched"] = fetched
if pages is not None:
entry["pages"] = pages
if cached is not None:
entry["cached"] = cached
if cache_hits is not None:
entry["cache_hits"] = cache_hits
if excluded:
entry["excluded"] = excluded
elif total_excluded is not None:
entry["excluded_total"] = total_excluded
for name in source_names:
stats[name] = entry
return stats
history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Historie uložena: {history_path} ({len(history)} záznamů)")
def main():
start_time = None
duration_sec = None
parser = argparse.ArgumentParser(description="Generate status.json from scraper outputs.")
parser.add_argument("--start-time", dest="start_time", default=None,
help="ISO timestamp of scrape start (default: now)")
parser.add_argument("--duration", dest="duration", type=int, default=None,
help="Run duration in seconds")
parser.add_argument("--keep", dest="keep", type=int, default=5,
help="Number of history entries to keep (default: 5, 0=unlimited)")
args = parser.parse_args()
if len(sys.argv) >= 3:
start_time = sys.argv[1]
try:
duration_sec = int(sys.argv[2])
except ValueError:
pass
if not start_time:
start_time = datetime.now().isoformat(timespec="seconds")
log_path = sys.argv[3] if len(sys.argv) >= 4 else None
log_stats = parse_log(log_path)
start_time = args.start_time or datetime.now().isoformat(timespec="seconds")
duration_sec = args.duration
sources = []
for name, filename in SOURCE_FILES.items():
@@ -142,14 +97,12 @@ def main():
info = count_source(path)
info["name"] = name
# Merge log stats
ls = log_stats.get(name, {})
for k in ("fetched", "pages", "cached", "cache_hits", "excluded", "excluded_total"):
if k in ls:
info[k] = ls[k]
# Override accepted from log if available (log is authoritative for latest run)
if "accepted" in ls:
info["accepted"] = ls["accepted"]
# Merge in stats from the per-scraper stats file (authoritative for run data)
stats = read_scraper_stats(HERE / STATS_FILES[name])
for key in ("accepted", "fetched", "pages", "cache_hits", "excluded", "excluded_total",
"success", "duration_sec", "error"):
if key in stats:
info[key] = stats[key]
sources.append(info)
@@ -168,17 +121,21 @@ def main():
duplicates_removed = total_accepted - deduplicated if deduplicated else 0
# Top-level success: True if no source has an error
success = not any("error" in s for s in sources)
status = {
"status": "done",
"timestamp": start_time,
"duration_sec": duration_sec,
"success": success,
"total_accepted": total_accepted,
"deduplicated": deduplicated,
"duplicates_removed": duplicates_removed,
"sources": sources,
}
out = HERE / "status.json"
out = DATA_DIR / "status.json"
out.write_text(json.dumps(status, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Status uložen: {out}")
print(f" Celkem bytů (před dedup): {total_accepted}")
@@ -197,6 +154,8 @@ def main():
parts.append(f"[CHYBA: {err}]")
print(" " + " ".join(parts))
append_to_history(status, args.keep)
if __name__ == "__main__":
main()