#!/usr/bin/env python3 """Scrub PII from fixture JSON. Reads one JSON fixture from stdin (as produced by capture_fixtures.py), replaces PII fields with deterministic pseudonyms, writes scrubbed JSON to stdout. Run in the two-step pipeline: python capture_fixtures.py ... | python scrub_fixtures.py > fixture.json Or process multiple lines (--multi for newline-delimited input): python capture_fixtures.py --func foo --all | python scrub_fixtures.py --multi \\ | while read line; do ... PII handling: - Member names: replaced with Member_<8hex> (sha256(name)[:8]), deterministic. - Senders / account numbers / VS / bank_id / user_id: stable digit-preserving hash. - Notes (exception text): replaced with "". - Messages: name-substring sweep applied; rest preserved. - All other fields (dates, amounts, months, fees): preserved verbatim. Function-specific exceptions: - match_members / infer_transaction_details: these functions are tested with synthetic member names only. Only real-roster message sweeping is applied; field-key scrubbing is skipped so Go can perform genuine name matching. - generate_sync_id: after normal field-key scrubbing the output sync_id is recomputed from the now-scrubbed inputs so the hash remains consistent. """ from __future__ import annotations import argparse import hashlib import json import os import re import sys from typing import Any sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) _REPO = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # --------------------------------------------------------------------------- # Bijection helpers # --------------------------------------------------------------------------- def _sha256_hex(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest() def scrub_name(name: str) -> str: """Deterministic pseudonym for a member name.""" if not name: return name return f"Member_{_sha256_hex(name)[:8]}" def scrub_id_digits(s: str) -> str: """Length-preserving digit hash for VS, bank_id, user_id, etc.""" s = str(s) if not s: return s if re.match(r"^\d+$", s): n = len(s) hashed = int(_sha256_hex(s), 16) % (10 ** n) return f"{hashed:0{n}d}" return f"id_{_sha256_hex(s)[:8]}" def scrub_account(s: str) -> str: """Preserve Czech bank account format PREFIX/BANKCODE.""" s = str(s) if not s: return s m = re.match(r"^(\d+)/(\d{4})$", s) if m: prefix, bank = m.group(1), m.group(2) n = len(prefix) new_prefix = int(_sha256_hex(prefix), 16) % (10 ** n) new_bank = int(_sha256_hex(bank), 16) % 10000 return f"{new_prefix:0{n}d}/{new_bank:04d}" return scrub_id_digits(s) # --------------------------------------------------------------------------- # Name roster for message sweeps # --------------------------------------------------------------------------- def _load_member_names() -> list[str]: """Load canonical names from the attendance cache (may not exist).""" path = os.path.join(_REPO, "tmp", "attendance_regular_cache.json") if not os.path.exists(path): return [] try: with open(path, encoding="utf-8") as f: cache = json.load(f) rows = cache.get("data", []) if rows and isinstance(rows[0], list): rows = rows[0] names = [] for row in rows: if isinstance(row, (list, tuple)) and len(row) >= 1: names.append(str(row[0])) return names except Exception: return [] def _build_name_map(names: list[str]) -> dict[str, str]: """Map each real name (and its normalized form) to its pseudonym.""" mapping: dict[str, str] = {} for name in names: pseudo = scrub_name(name) mapping[name] = pseudo # Also add first+last without parenthetical nicknames base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip() if base != name: mapping[base] = pseudo return mapping def _sweep_names_in_text(text: str, name_map: dict[str, str]) -> str: """Replace real-name substrings in free text, longest match first.""" # Sort descending by length so longer names replace before their substrings for real in sorted(name_map, key=len, reverse=True): if real and real in text: text = text.replace(real, name_map[real]) return text # --------------------------------------------------------------------------- # Scramble whitelist — only these keys are scrambled; everything else is kept # --------------------------------------------------------------------------- _SCRAMBLE_KEYS = { "name", "member_names", "person", "sender", "sender_account", "account", "vs", "bank_id", "user_id", "note", } # Dict keys whose *child keys* (not values) are member names and need scrubbing. # e.g. the reconcile output: {"members": {"Alice Dvořák": {...}}, "credits": {"Alice Dvořák": 0}} _MEMBER_KEY_DICTS = {"members", "credits"} _MESSAGE_KEYS = {"message", "text", "search_text"} def _scrub_value(key: str, value: Any, name_map: dict[str, str]) -> Any: """Scrub a single value based on its field key.""" if isinstance(value, list): if key == "member_names": return [scrub_name(str(v)) for v in value] # Don't propagate parent key into list elements — each element is an # independent document. Propagating would incorrectly flag nested dicts # (e.g. the fees dict inside a member tuple) as member-name-keyed dicts. return [_scrub_doc(v, name_map) for v in value] if isinstance(value, dict): # Pass the current key as parent context so dicts like # {"members": {"Real Name": ...}} get their keys scrubbed too. return _scrub_doc(value, name_map, _parent_key=key) if key not in _SCRAMBLE_KEYS and key not in _MESSAGE_KEYS: return value if not isinstance(value, str): value = str(value) if key in _MESSAGE_KEYS: return _sweep_names_in_text(value, name_map) if key == "name": return scrub_name(value) if key in ("sender_account", "account"): return scrub_account(value) if key == "note": return "" if key == "person": # "person" may contain comma-separated member names (e.g. "Alice, Bob"). # Sweep with name_map so each name gets its own consistent pseudonym, # matching what the output.members keys will look like. return _sweep_names_in_text(value, name_map) if value else value # vs, bank_id, user_id, sender return scrub_id_digits(value) if re.match(r"^\d+$", value) else scrub_name(value) if value else value def _scrub_doc(doc: Any, name_map: dict[str, str], _parent_key: str = "") -> Any: """Recursively scrub a JSON document.""" if isinstance(doc, dict): if _parent_key in _MEMBER_KEY_DICTS: # Keys of this dict are member names — scrub the keys and recurse. return { scrub_name(k): _scrub_doc(v, name_map) for k, v in doc.items() } return {k: _scrub_value(k, v, name_map) for k, v in doc.items()} if isinstance(doc, list): return [_scrub_doc(item, name_map) for item in doc] return doc # Functions where field-key scrubbing would break parity (name matching tests). # Only real-roster message sweep is applied for these. _NO_FIELD_SCRUB_FUNCS = { "scripts.match_payments.match_members", "scripts.match_payments.infer_transaction_details", } def _scrub_messages_only(doc: Any, name_map: dict[str, str]) -> Any: """Sweep only message/text/search_text fields; leave all other values unchanged.""" if isinstance(doc, dict): return { k: (_sweep_names_in_text(v, name_map) if k in _MESSAGE_KEYS and isinstance(v, str) else _scrub_messages_only(v, name_map)) for k, v in doc.items() } if isinstance(doc, list): return [_scrub_messages_only(item, name_map) for item in doc] return doc def _recompute_sync_id(tx_scrubbed: dict) -> str: """Recompute generate_sync_id hash from already-scrubbed tx fields. After the scrubber changes sender/vs/bank_id the original hash is invalid. Replicates the Python generate_sync_id formula (pipe-separated, lowercased) and always treats amount as float64 to match Go's formatAmount behaviour. """ envelope = tx_scrubbed.get("amount", {}) if isinstance(envelope, dict): t = envelope.get("type", "") v = envelope.get("value") if t in ("int", "float"): amount = float(v) # always float — matches Go's formatAmount else: amount = "" else: amount = float(envelope) if envelope not in (None, "") else "" currency = tx_scrubbed.get("currency", "") or "CZK" components = [ str(tx_scrubbed.get("date", "")), str(amount), currency, str(tx_scrubbed.get("sender", "")), str(tx_scrubbed.get("vs", "")), str(tx_scrubbed.get("message", "")), str(tx_scrubbed.get("bank_id", "")), ] raw_str = "|".join(components).lower() return hashlib.sha256(raw_str.encode("utf-8")).hexdigest() def _extract_inline_names(doc: Any) -> list[str]: """Extract names from member_names and 'name' fields in the fixture itself.""" names: list[str] = [] if isinstance(doc, dict): for k, v in doc.items(): if k == "member_names" and isinstance(v, list): names.extend(str(n) for n in v) elif k == "name" and isinstance(v, str): names.append(v) else: names.extend(_extract_inline_names(v)) elif isinstance(doc, list): for item in doc: names.extend(_extract_inline_names(item)) return names def scrub_fixture(doc: dict) -> dict: """Scrub a single fixture document in-place (returns new dict).""" roster_names = _load_member_names() inline_names = _extract_inline_names(doc) all_names = list(dict.fromkeys(roster_names + inline_names)) name_map = _build_name_map(all_names) func = doc.get("func", "") # match_members / infer_transaction_details: tested with synthetic names only. # Field-key scrubbing would make member_names pseudonyms inconsistent with # the text, breaking Go's name-matching assertions. Only sweep messages. if func in _NO_FIELD_SCRUB_FUNCS: # Synthetic member names only — no field scrubbing, no message sweep. # Any sweep would create inconsistency between scrubbed output fields # (search_text) and un-scrubbed input fields (sender, member_names). return _scrub_messages_only(doc, {}) result = _scrub_doc(doc, name_map) # generate_sync_id: recompute hash from the now-scrubbed inputs so the # fixture is self-consistent (scrubbed fields → Go hashes scrubbed values). if func.endswith("generate_sync_id"): result["output"]["sync_id"] = _recompute_sync_id(result["input"].get("tx", {})) return result # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="Scrub PII from fixture JSON.") parser.add_argument( "--multi", action="store_true", help="Process newline-delimited JSON (one object per line) from stdin.", ) args = parser.parse_args() if args.multi: for line in sys.stdin: line = line.strip() if not line: continue doc = json.loads(line) print(json.dumps(scrub_fixture(doc), ensure_ascii=False)) else: doc = json.load(sys.stdin) out = scrub_fixture(doc) print(json.dumps(out, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()