1 Commits

Author SHA1 Message Date
Jan Novak
27a7834eb6 Reliability improvements: retry logic, validation, ratings sync
Some checks failed
Build and Push / build (push) Failing after 4s
- Add 3-attempt retry with exponential backoff to Sreality, Realingo,
  Bezrealitky, and PSN scrapers (CityHome and iDNES already had it)
- Add shared validate_listing() in scraper_stats.py; all 6 scrapers now
  validate GPS bounds, price, area, and required fields before output
- Wire ratings to server /api/ratings on page load (merge with
  localStorage) and save (async POST); ratings now persist across
  browsers and devices
- Namespace JS hash IDs as {source}_{id} to prevent rating collisions
  between listings from different portals with the same numeric ID
- Replace manual Czech diacritic table with unicodedata.normalize()
  in merge_and_map.py for correct deduplication of all edge cases
- Correct README schedule docs: every 4 hours, not twice daily

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 10:36:37 +01:00
9 changed files with 212 additions and 114 deletions

View File

@@ -151,7 +151,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
│ PID 1: python3 -m http.server :8080 │ │ PID 1: python3 -m http.server :8080 │
│ serves /app/data/ │ │ serves /app/data/ │
│ │ │ │
│ crond: runs run_all.sh at 06:00/18:00 │ crond: runs run_all.sh every 4 hours
│ Europe/Prague timezone │ │ Europe/Prague timezone │
│ │ │ │
│ /app/ -- scripts (.py, .sh) │ │ /app/ -- scripts (.py, .sh) │
@@ -160,7 +160,7 @@ The project includes a Docker setup for unattended operation with a cron-based s
└─────────────────────────────────────────┘ └─────────────────────────────────────────┘
``` ```
On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place twice daily at 06:00 and 18:00 CET/CEST. On startup, the HTTP server starts immediately. The initial scrape runs in the background. Subsequent cron runs update data in-place every 4 hours.
### Quick start ### Quick start
@@ -208,7 +208,7 @@ Validation targets run scrapers with `--max-pages 1 --max-properties 10` for a f
├── build/ ├── build/
│ ├── Dockerfile # Container image definition (python:3.13-alpine) │ ├── Dockerfile # Container image definition (python:3.13-alpine)
│ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape) │ ├── entrypoint.sh # Container entrypoint (HTTP server + cron + initial scrape)
│ ├── crontab # Cron schedule (06:00 and 18:00 CET) │ ├── crontab # Cron schedule (every 4 hours)
│ └── CONTAINER.md # Container-specific documentation │ └── CONTAINER.md # Container-specific documentation
└── .gitignore # Ignores byty_*.json, __pycache__, .vscode └── .gitignore # Ignores byty_*.json, __pycache__, .vscode
``` ```

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import json import json
import re import re
import unicodedata
from pathlib import Path from pathlib import Path
from scrape_and_map import generate_map, format_price from scrape_and_map import generate_map, format_price
@@ -19,14 +20,8 @@ def normalize_street(locality: str) -> str:
# "Studentská, Praha 6 - Dejvice" → "studentska" # "Studentská, Praha 6 - Dejvice" → "studentska"
# "Rýnská, Praha" → "rynska" # "Rýnská, Praha" → "rynska"
street = locality.split(",")[0].strip().lower() street = locality.split(",")[0].strip().lower()
# Remove diacritics (simple Czech) # Remove diacritics using Unicode decomposition (handles all Czech characters)
replacements = { street = unicodedata.normalize("NFKD", street).encode("ascii", "ignore").decode("ascii")
"á": "a", "č": "c", "ď": "d", "é": "e", "ě": "e",
"í": "i", "ň": "n", "ó": "o", "ř": "r", "š": "s",
"ť": "t", "ú": "u", "ů": "u", "ý": "y", "ž": "z",
}
for src, dst in replacements.items():
street = street.replace(src, dst)
# Remove non-alphanumeric # Remove non-alphanumeric
street = re.sub(r"[^a-z0-9]", "", street) street = re.sub(r"[^a-z0-9]", "", street)
return street return street

View File

@@ -15,7 +15,7 @@ import urllib.request
import urllib.parse import urllib.parse
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_sreality.json" STATS_FILE = "stats_sreality.json"
@@ -45,19 +45,26 @@ HEADERS = {
def api_get(url: str) -> dict: def api_get(url: str) -> dict:
"""Fetch JSON from Sreality API.""" """Fetch JSON from Sreality API with retry."""
logger.debug(f"HTTP GET request: {url}") for attempt in range(3):
logger.debug(f"Headers: {HEADERS}") logger.debug(f"HTTP GET request (attempt {attempt + 1}/3): {url}")
req = urllib.request.Request(url, headers=HEADERS) req = urllib.request.Request(url, headers=HEADERS)
try: try:
with urllib.request.urlopen(req, timeout=30) as resp: with urllib.request.urlopen(req, timeout=30) as resp:
response_data = resp.read().decode("utf-8") response_data = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes") logger.debug(f"HTTP response: status={resp.status}, size={len(response_data)} bytes")
logger.debug(f"Response preview: {response_data[:200]}") logger.debug(f"Response preview: {response_data[:200]}")
return json.loads(response_data) return json.loads(response_data)
except (urllib.error.URLError, ConnectionError, OSError) as e: except urllib.error.HTTPError:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True) raise
raise except (urllib.error.URLError, ConnectionError, OSError) as e:
if attempt < 2:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/3 after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after 3 attempts: {e}", exc_info=True)
raise
def build_list_url(disposition: int, page: int = 1) -> str: def build_list_url(disposition: int, page: int = 1) -> str:
@@ -356,6 +363,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "sreality"):
continue
results.append(result) results.append(result)
details_fetched += 1 details_fetched += 1
@@ -476,7 +485,7 @@ def generate_map(estates: list[dict], output_path: str = "mapa_bytu.html"):
source_label = source_labels.get(source, source) source_label = source_labels.get(source, source)
source_color = source_colors.get(source, "#999") source_color = source_colors.get(source, "#999")
hash_id = e.get("hash_id", "") hash_id = f"{source}_{e.get('hash_id', '')}"
first_seen = e.get("first_seen", "") first_seen = e.get("first_seen", "")
last_changed = e.get("last_changed", "") last_changed = e.get("last_changed", "")
@@ -864,6 +873,11 @@ function loadRatings() {{
function saveRatings(ratings) {{ function saveRatings(ratings) {{
localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings)); localStorage.setItem(RATINGS_KEY, JSON.stringify(ratings));
fetch('/api/ratings', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify(ratings)
}}).catch(function() {{}});
}} }}
function addRejectStrike(marker) {{ function addRejectStrike(marker) {{
@@ -1167,8 +1181,25 @@ function applyFilters() {{
document.getElementById('visible-count').textContent = visible; document.getElementById('visible-count').textContent = visible;
}} }}
// Initialize ratings on load // Initialize ratings: load from server, merge with localStorage, then restore
restoreRatings(); function initRatings() {{
var local = loadRatings();
fetch('/api/ratings')
.then(function(r) {{ return r.ok ? r.json() : null; }})
.then(function(server) {{
if (server && typeof server === 'object') {{
var merged = Object.assign({{}}, local, server);
localStorage.setItem(RATINGS_KEY, JSON.stringify(merged));
}}
restoreRatings();
updateRatingCounts();
}})
.catch(function() {{
restoreRatings();
updateRatingCounts();
}});
}}
initRatings();
// ── Panel toggle ────────────────────────────────────────────── // ── Panel toggle ──────────────────────────────────────────────
function togglePanel() {{ function togglePanel() {{

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_bezrealitky.json" STATS_FILE = "stats_bezrealitky.json"
@@ -71,62 +71,71 @@ HEADERS = {
BASE_URL = "https://www.bezrealitky.cz" BASE_URL = "https://www.bezrealitky.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_page(page: int) -> tuple[list[dict], int]: def fetch_page(page: int) -> tuple[list[dict], int]:
""" """
Fetch a listing page from Bezrealitky. Fetch a listing page from Bezrealitky.
Returns (list of advert dicts from Apollo cache, total count). Returns (list of advert dicts from Apollo cache, total count).
""" """
url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}" url = f"{BASE_URL}/vypis/nabidka-prodej/byt/praha?page={page}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
logger.debug(f"Headers: {HEADERS}")
req = urllib.request.Request(url, headers=HEADERS)
try:
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
) )
if not match: if not match:
logger.debug("No __NEXT_DATA__ script found in HTML") logger.debug("No __NEXT_DATA__ script found in HTML")
return [], 0 return [], 0
data = json.loads(match.group(1)) data = json.loads(match.group(1))
cache = data["props"]["pageProps"]["apolloCache"] cache = data["props"]["pageProps"]["apolloCache"]
# Extract adverts from cache # Extract adverts from cache
adverts = [] adverts = []
for key, val in cache.items(): for key, val in cache.items():
if key.startswith("Advert:") and isinstance(val, dict) and val.get("__typename") == "Advert": if key.startswith("Advert:") and isinstance(val, dict) and val.get("__typename") == "Advert":
adverts.append(val) adverts.append(val)
# Get total count from ROOT_QUERY # Get total count from ROOT_QUERY
total = 0 total = 0
root = cache.get("ROOT_QUERY", {}) root = cache.get("ROOT_QUERY", {})
for key, val in root.items(): for key, val in root.items():
if "listAdverts" in key and isinstance(val, dict): if "listAdverts" in key and isinstance(val, dict):
tc = val.get("totalCount") tc = val.get("totalCount")
if tc and tc > total: if tc and tc > total:
total = tc total = tc
logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}") logger.debug(f"Page {page}: found {len(adverts)} adverts, total={total}")
return adverts, total return adverts, total
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(uri: str) -> dict | None: def fetch_detail(uri: str) -> dict | None:
"""Fetch detail page for a listing.""" """Fetch detail page for a listing."""
try: try:
url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}" url = f"{BASE_URL}/nemovitosti-byty-domy/{uri}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
@@ -365,6 +374,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "bezrealitky"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -14,7 +14,7 @@ import time
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_cityhome.json" STATS_FILE = "stats_cityhome.json"
@@ -375,6 +375,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")), "first_seen": _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")), "last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(f"cityhome_{slug}_{listing['unit_name']}", {}).get("price") != price else _prev_cache[f"cityhome_{slug}_{listing['unit_name']}"].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "cityhome"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -16,7 +16,7 @@ import time
import urllib.request import urllib.request
import urllib.parse import urllib.parse
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_idnes.json" STATS_FILE = "stats_idnes.json"
@@ -467,6 +467,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "idnes"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from urllib.parse import urlencode from urllib.parse import urlencode
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_psn.json" STATS_FILE = "stats_psn.json"
@@ -38,19 +38,25 @@ BASE_URL = "https://psn.cz"
UNITS_API = f"{BASE_URL}/api/units-list" UNITS_API = f"{BASE_URL}/api/units-list"
def fetch_json(url: str) -> dict: def fetch_json(url: str, retries: int = 3) -> dict:
"""Fetch JSON via curl (urllib SSL may fail on Cloudflare).""" """Fetch JSON via curl (urllib SSL may fail on Cloudflare) with retry."""
logger.debug(f"HTTP GET: {url}") for attempt in range(retries):
result = subprocess.run( logger.debug(f"HTTP GET (attempt {attempt + 1}/{retries}): {url}")
["curl", "-s", "-L", "--max-time", "30", result = subprocess.run(
"-H", f"User-Agent: {UA}", ["curl", "-s", "-L", "--max-time", "30",
"-H", "Accept: application/json", "-H", f"User-Agent: {UA}",
url], "-H", "Accept: application/json",
capture_output=True, text=True, timeout=60 url],
) capture_output=True, text=True, timeout=60
if result.returncode != 0: )
raise RuntimeError(f"curl failed ({result.returncode}): {result.stderr[:200]}") if result.returncode == 0:
return json.loads(result.stdout) return json.loads(result.stdout)
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"curl failed (retry {attempt + 1}/{retries} after {wait}s): {result.stderr[:200]}")
time.sleep(wait)
else:
raise RuntimeError(f"curl failed after {retries} attempts ({result.returncode}): {result.stderr[:200]}")
def fix_gps(lat, lng): def fix_gps(lat, lng):
@@ -255,6 +261,8 @@ def scrape(max_properties: int | None = None):
"first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")), "first_seen": _prev_cache.get(str(unit_id), {}).get("first_seen", datetime.now().strftime("%Y-%m-%d")),
"last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")), "last_changed": datetime.now().strftime("%Y-%m-%d") if _prev_cache.get(str(unit_id), {}).get("price") != int(price) else _prev_cache[str(unit_id)].get("last_changed", datetime.now().strftime("%Y-%m-%d")),
} }
if not validate_listing(result, "psn"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -15,7 +15,7 @@ import re
import time import time
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from scraper_stats import write_stats from scraper_stats import write_stats, validate_listing
STATS_FILE = "stats_realingo.json" STATS_FILE = "stats_realingo.json"
@@ -56,6 +56,28 @@ HEADERS = {
BASE_URL = "https://www.realingo.cz" BASE_URL = "https://www.realingo.cz"
def fetch_url(url: str, retries: int = 3) -> str:
"""Fetch URL and return HTML string with retry on transient errors."""
for attempt in range(retries):
try:
logger.debug(f"HTTP GET request (attempt {attempt + 1}/{retries}): {url}")
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
return html
except urllib.error.HTTPError:
raise
except (ConnectionResetError, ConnectionError, urllib.error.URLError, OSError) as e:
if attempt < retries - 1:
wait = (attempt + 1) * 2
logger.warning(f"Connection error (retry {attempt + 1}/{retries} after {wait}s): {e}")
time.sleep(wait)
else:
logger.error(f"HTTP request failed after {retries} attempts: {e}", exc_info=True)
raise
def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]: def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
"""Fetch a page of Prague listings. Returns (items, total_count).""" """Fetch a page of Prague listings. Returns (items, total_count)."""
if page == 1: if page == 1:
@@ -63,41 +85,26 @@ def fetch_listing_page(page: int = 1) -> tuple[list[dict], int]:
else: else:
url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/" url = f"{BASE_URL}/prodej_byty/praha/{page}_strana/"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
logger.debug(f"Headers: {HEADERS}") match = re.search(
req = urllib.request.Request(url, headers=HEADERS) r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
try: html, re.DOTALL
resp = urllib.request.urlopen(req, timeout=30) )
html = resp.read().decode("utf-8") if not match:
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes") logger.debug("No __NEXT_DATA__ script found in HTML")
return [], 0
match = re.search( data = json.loads(match.group(1))
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', offer_list = data["props"]["pageProps"]["store"]["offer"]["list"]
html, re.DOTALL logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}")
) return offer_list["data"], offer_list["total"]
if not match:
logger.debug("No __NEXT_DATA__ script found in HTML")
return [], 0
data = json.loads(match.group(1))
offer_list = data["props"]["pageProps"]["store"]["offer"]["list"]
logger.debug(f"Page {page}: found {len(offer_list['data'])} items, total={offer_list['total']}")
return offer_list["data"], offer_list["total"]
except (urllib.error.URLError, ConnectionError, OSError) as e:
logger.error(f"HTTP request failed for {url}: {e}", exc_info=True)
raise
def fetch_detail(listing_url: str) -> dict | None: def fetch_detail(listing_url: str) -> dict | None:
"""Fetch detail page for a listing to get floor, building type, etc.""" """Fetch detail page for a listing to get floor, building type, etc."""
try: try:
url = f"{BASE_URL}{listing_url}" url = f"{BASE_URL}{listing_url}"
logger.debug(f"HTTP GET request: {url}") html = fetch_url(url)
req = urllib.request.Request(url, headers=HEADERS)
resp = urllib.request.urlopen(req, timeout=30)
html = resp.read().decode("utf-8")
logger.debug(f"HTTP response: status={resp.status}, size={len(html)} bytes")
match = re.search( match = re.search(
r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
html, re.DOTALL html, re.DOTALL
@@ -324,6 +331,8 @@ def scrape(max_pages: int | None = None, max_properties: int | None = None):
"first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"), "first_seen": cached.get("first_seen", datetime.now().strftime("%Y-%m-%d")) if cached else datetime.now().strftime("%Y-%m-%d"),
"last_changed": datetime.now().strftime("%Y-%m-%d"), "last_changed": datetime.now().strftime("%Y-%m-%d"),
} }
if not validate_listing(result, "realingo"):
continue
results.append(result) results.append(result)
properties_fetched += 1 properties_fetched += 1

View File

@@ -1,13 +1,53 @@
"""Shared utility for writing per-scraper run statistics to JSON.""" """Shared utilities for scraper run statistics and listing validation."""
from __future__ import annotations from __future__ import annotations
import json import json
import logging
import os import os
from pathlib import Path from pathlib import Path
HERE = Path(__file__).parent HERE = Path(__file__).parent
DATA_DIR = Path(os.environ.get("DATA_DIR", HERE)) DATA_DIR = Path(os.environ.get("DATA_DIR", HERE))
_val_log = logging.getLogger(__name__)
_REQUIRED_FIELDS = ("hash_id", "price", "locality", "lat", "lon", "url", "source")
def validate_listing(listing: dict, context: str = "") -> bool:
"""
Validate a listing dict before it is written to the output JSON.
Returns True if valid, False if the listing should be skipped.
Logs a warning for each invalid listing.
"""
prefix = f"[{context}] " if context else ""
for field in _REQUIRED_FIELDS:
val = listing.get(field)
if val is None or val == "":
_val_log.warning(f"{prefix}Skipping listing — missing field '{field}': {listing.get('hash_id', '?')}")
return False
price = listing.get("price")
if not isinstance(price, (int, float)) or price <= 0:
_val_log.warning(f"{prefix}Skipping listing — invalid price={price!r}: {listing.get('hash_id', '?')}")
return False
lat, lon = listing.get("lat"), listing.get("lon")
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
_val_log.warning(f"{prefix}Skipping listing — non-numeric GPS lat={lat!r} lon={lon!r}: {listing.get('hash_id', '?')}")
return False
if not (47.0 <= lat <= 52.0) or not (12.0 <= lon <= 19.0):
_val_log.warning(f"{prefix}Skipping listing — GPS outside Czech Republic lat={lat} lon={lon}: {listing.get('hash_id', '?')}")
return False
area = listing.get("area")
if area is not None and (not isinstance(area, (int, float)) or area <= 0):
_val_log.warning(f"{prefix}Skipping listing — invalid area={area!r}: {listing.get('hash_id', '?')}")
return False
return True
def write_stats(filename: str, stats: dict) -> None: def write_stats(filename: str, stats: dict) -> None:
"""Write scraper run stats dict to the data directory.""" """Write scraper run stats dict to the data directory."""