feat(go): fixture capture + characterization framework (M3)
All checks were successful
Deploy to K8s / deploy (push) Successful in 7s

Closes M3.1–M3.6.  Parity safety net proving Go output matches Python
for every ported pure-domain function (M2.1–M2.9) and reconcile (M2.10).

Capture pipeline:
- scripts/capture_fixtures.py: calls each Python function with seeded
  inputs, emits JSON fixtures to stdout (never writes files directly).
- scripts/scrub_fixtures.py: deterministic PII scrubber — SHA-256
  pseudonyms for member names, digit-preserving hashes for VS/account/
  bank_id, name-sweep in message text.  Idempotent; no salt.
- scripts/_fixture_seeds.py: handcrafted seeds for all 11 functions;
  synthetic names throughout (no real roster members).
- scripts/capture_all_fixtures.sh: convenience wrapper for full corpus
  regeneration outside of make.

Fixture corpus (98 files, all PII-free):
- go/tests/fixtures/pure/<func>/<case>.json — 10 function directories.
- go/tests/fixtures/reconcile/<NN>_<case>.json — 10 branch-coverage
  cases: greedy, overpayment credit, proportional remainder, even-split,
  out-of-window, exception override, other: purpose, junior ?, multi-
  person+month fan-out, unmatched.

Go parity tests (//go:build parity):
- go/tests/parity/parityio.go: generic LoadDir/RunAll helpers + typed
  In/Out struct pairs for all 10 pure functions; Envelope decoder for
  int/float/none disambiguation.
- 10 pure-function test packages + bespoke reconcile test with per-cell
  float tolerance (math.Abs <= 0.01 for `paid` values).

Makefile: go-parity, go-test-all, capture-fixtures targets.
go/tests/fixtures/README.md: refresh workflow + PII audit guide.

Gate: make go-test green, make go-parity green (11/11 packages),
      make go-lint clean (parity tag), make go-build clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 23:26:24 +02:00
parent 28f0e468f7
commit 67d2f11d7c
119 changed files with 4931 additions and 10 deletions

330
scripts/scrub_fixtures.py Normal file
View File

@@ -0,0 +1,330 @@
#!/usr/bin/env python3
"""Scrub PII from fixture JSON.
Reads one JSON fixture from stdin (as produced by capture_fixtures.py),
replaces PII fields with deterministic pseudonyms, writes scrubbed JSON
to stdout.
Run in the two-step pipeline:
python capture_fixtures.py ... | python scrub_fixtures.py > fixture.json
Or process multiple lines (--multi for newline-delimited input):
python capture_fixtures.py --func foo --all | python scrub_fixtures.py --multi \\
| while read line; do ...
PII handling:
- Member names: replaced with Member_<8hex> (sha256(name)[:8]), deterministic.
- Senders / account numbers / VS / bank_id / user_id: stable digit-preserving hash.
- Notes (exception text): replaced with "<scrubbed>".
- Messages: name-substring sweep applied; rest preserved.
- All other fields (dates, amounts, months, fees): preserved verbatim.
Function-specific exceptions:
- match_members / infer_transaction_details: these functions are tested with
synthetic member names only. Only real-roster message sweeping is applied;
field-key scrubbing is skipped so Go can perform genuine name matching.
- generate_sync_id: after normal field-key scrubbing the output sync_id is
recomputed from the now-scrubbed inputs so the hash remains consistent.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import sys
from typing import Any
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
_REPO = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# ---------------------------------------------------------------------------
# Bijection helpers
# ---------------------------------------------------------------------------
def _sha256_hex(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def scrub_name(name: str) -> str:
"""Deterministic pseudonym for a member name."""
if not name:
return name
return f"Member_{_sha256_hex(name)[:8]}"
def scrub_id_digits(s: str) -> str:
"""Length-preserving digit hash for VS, bank_id, user_id, etc."""
s = str(s)
if not s:
return s
if re.match(r"^\d+$", s):
n = len(s)
hashed = int(_sha256_hex(s), 16) % (10 ** n)
return f"{hashed:0{n}d}"
return f"id_{_sha256_hex(s)[:8]}"
def scrub_account(s: str) -> str:
"""Preserve Czech bank account format PREFIX/BANKCODE."""
s = str(s)
if not s:
return s
m = re.match(r"^(\d+)/(\d{4})$", s)
if m:
prefix, bank = m.group(1), m.group(2)
n = len(prefix)
new_prefix = int(_sha256_hex(prefix), 16) % (10 ** n)
new_bank = int(_sha256_hex(bank), 16) % 10000
return f"{new_prefix:0{n}d}/{new_bank:04d}"
return scrub_id_digits(s)
# ---------------------------------------------------------------------------
# Name roster for message sweeps
# ---------------------------------------------------------------------------
def _load_member_names() -> list[str]:
"""Load canonical names from the attendance cache (may not exist)."""
path = os.path.join(_REPO, "tmp", "attendance_regular_cache.json")
if not os.path.exists(path):
return []
try:
with open(path, encoding="utf-8") as f:
cache = json.load(f)
rows = cache.get("data", [])
if rows and isinstance(rows[0], list):
rows = rows[0]
names = []
for row in rows:
if isinstance(row, (list, tuple)) and len(row) >= 1:
names.append(str(row[0]))
return names
except Exception:
return []
def _build_name_map(names: list[str]) -> dict[str, str]:
"""Map each real name (and its normalized form) to its pseudonym."""
mapping: dict[str, str] = {}
for name in names:
pseudo = scrub_name(name)
mapping[name] = pseudo
# Also add first+last without parenthetical nicknames
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
if base != name:
mapping[base] = pseudo
return mapping
def _sweep_names_in_text(text: str, name_map: dict[str, str]) -> str:
"""Replace real-name substrings in free text, longest match first."""
# Sort descending by length so longer names replace before their substrings
for real in sorted(name_map, key=len, reverse=True):
if real and real in text:
text = text.replace(real, name_map[real])
return text
# ---------------------------------------------------------------------------
# Scramble whitelist — only these keys are scrambled; everything else is kept
# ---------------------------------------------------------------------------
_SCRAMBLE_KEYS = {
"name",
"member_names",
"person",
"sender",
"sender_account",
"account",
"vs",
"bank_id",
"user_id",
"note",
}
# Dict keys whose *child keys* (not values) are member names and need scrubbing.
# e.g. the reconcile output: {"members": {"Alice Dvořák": {...}}, "credits": {"Alice Dvořák": 0}}
_MEMBER_KEY_DICTS = {"members", "credits"}
_MESSAGE_KEYS = {"message", "text", "search_text"}
def _scrub_value(key: str, value: Any, name_map: dict[str, str]) -> Any:
"""Scrub a single value based on its field key."""
if isinstance(value, list):
if key == "member_names":
return [scrub_name(str(v)) for v in value]
# Don't propagate parent key into list elements — each element is an
# independent document. Propagating would incorrectly flag nested dicts
# (e.g. the fees dict inside a member tuple) as member-name-keyed dicts.
return [_scrub_doc(v, name_map) for v in value]
if isinstance(value, dict):
# Pass the current key as parent context so dicts like
# {"members": {"Real Name": ...}} get their keys scrubbed too.
return _scrub_doc(value, name_map, _parent_key=key)
if key not in _SCRAMBLE_KEYS and key not in _MESSAGE_KEYS:
return value
if not isinstance(value, str):
value = str(value)
if key in _MESSAGE_KEYS:
return _sweep_names_in_text(value, name_map)
if key == "name":
return scrub_name(value)
if key in ("sender_account", "account"):
return scrub_account(value)
if key == "note":
return "<scrubbed>"
if key == "person":
# "person" may contain comma-separated member names (e.g. "Alice, Bob").
# Sweep with name_map so each name gets its own consistent pseudonym,
# matching what the output.members keys will look like.
return _sweep_names_in_text(value, name_map) if value else value
# vs, bank_id, user_id, sender
return scrub_id_digits(value) if re.match(r"^\d+$", value) else scrub_name(value) if value else value
def _scrub_doc(doc: Any, name_map: dict[str, str], _parent_key: str = "") -> Any:
"""Recursively scrub a JSON document."""
if isinstance(doc, dict):
if _parent_key in _MEMBER_KEY_DICTS:
# Keys of this dict are member names — scrub the keys and recurse.
return {
scrub_name(k): _scrub_doc(v, name_map)
for k, v in doc.items()
}
return {k: _scrub_value(k, v, name_map) for k, v in doc.items()}
if isinstance(doc, list):
return [_scrub_doc(item, name_map) for item in doc]
return doc
# Functions where field-key scrubbing would break parity (name matching tests).
# Only real-roster message sweep is applied for these.
_NO_FIELD_SCRUB_FUNCS = {
"scripts.match_payments.match_members",
"scripts.match_payments.infer_transaction_details",
}
def _scrub_messages_only(doc: Any, name_map: dict[str, str]) -> Any:
"""Sweep only message/text/search_text fields; leave all other values unchanged."""
if isinstance(doc, dict):
return {
k: (_sweep_names_in_text(v, name_map) if k in _MESSAGE_KEYS and isinstance(v, str)
else _scrub_messages_only(v, name_map))
for k, v in doc.items()
}
if isinstance(doc, list):
return [_scrub_messages_only(item, name_map) for item in doc]
return doc
def _recompute_sync_id(tx_scrubbed: dict) -> str:
"""Recompute generate_sync_id hash from already-scrubbed tx fields.
After the scrubber changes sender/vs/bank_id the original hash is invalid.
Replicates the Python generate_sync_id formula (pipe-separated, lowercased)
and always treats amount as float64 to match Go's formatAmount behaviour.
"""
envelope = tx_scrubbed.get("amount", {})
if isinstance(envelope, dict):
t = envelope.get("type", "")
v = envelope.get("value")
if t in ("int", "float"):
amount = float(v) # always float — matches Go's formatAmount
else:
amount = ""
else:
amount = float(envelope) if envelope not in (None, "") else ""
currency = tx_scrubbed.get("currency", "") or "CZK"
components = [
str(tx_scrubbed.get("date", "")),
str(amount),
currency,
str(tx_scrubbed.get("sender", "")),
str(tx_scrubbed.get("vs", "")),
str(tx_scrubbed.get("message", "")),
str(tx_scrubbed.get("bank_id", "")),
]
raw_str = "|".join(components).lower()
return hashlib.sha256(raw_str.encode("utf-8")).hexdigest()
def _extract_inline_names(doc: Any) -> list[str]:
"""Extract names from member_names and 'name' fields in the fixture itself."""
names: list[str] = []
if isinstance(doc, dict):
for k, v in doc.items():
if k == "member_names" and isinstance(v, list):
names.extend(str(n) for n in v)
elif k == "name" and isinstance(v, str):
names.append(v)
else:
names.extend(_extract_inline_names(v))
elif isinstance(doc, list):
for item in doc:
names.extend(_extract_inline_names(item))
return names
def scrub_fixture(doc: dict) -> dict:
"""Scrub a single fixture document in-place (returns new dict)."""
roster_names = _load_member_names()
inline_names = _extract_inline_names(doc)
all_names = list(dict.fromkeys(roster_names + inline_names))
name_map = _build_name_map(all_names)
func = doc.get("func", "")
# match_members / infer_transaction_details: tested with synthetic names only.
# Field-key scrubbing would make member_names pseudonyms inconsistent with
# the text, breaking Go's name-matching assertions. Only sweep messages.
if func in _NO_FIELD_SCRUB_FUNCS:
# Synthetic member names only — no field scrubbing, no message sweep.
# Any sweep would create inconsistency between scrubbed output fields
# (search_text) and un-scrubbed input fields (sender, member_names).
return _scrub_messages_only(doc, {})
result = _scrub_doc(doc, name_map)
# generate_sync_id: recompute hash from the now-scrubbed inputs so the
# fixture is self-consistent (scrubbed fields → Go hashes scrubbed values).
if func.endswith("generate_sync_id"):
result["output"]["sync_id"] = _recompute_sync_id(result["input"].get("tx", {}))
return result
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Scrub PII from fixture JSON.")
parser.add_argument(
"--multi", action="store_true",
help="Process newline-delimited JSON (one object per line) from stdin.",
)
args = parser.parse_args()
if args.multi:
for line in sys.stdin:
line = line.strip()
if not line:
continue
doc = json.loads(line)
print(json.dumps(scrub_fixture(doc), ensure_ascii=False))
else:
doc = json.load(sys.stdin)
out = scrub_fixture(doc)
print(json.dumps(out, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()