Files
fuj-management/scripts/match_payments.py
Jan Novak 65694ad378
All checks were successful
Deploy to K8s / deploy (push) Successful in 7s
feat(py): M5.4 fix #2 — add vs and sync_id to payments tx projection
Python's fetch_sheet_data read 9 sheet columns but skipped VS and
Sync ID, causing make parity to report extra fields on every raw
payment row emitted by the Go backend. Both columns are already on
the sheet; add idx_vs / idx_sync_id lookups and the matching keys
to the tx dict so the Python /api/* wire shape matches Go's
RawTransaction.

Update /api/* test fixtures to include vs/sync_id keys for realism.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-07 23:50:33 +02:00

693 lines
26 KiB
Python

#!/usr/bin/env python3
"""Match Fio bank payments against expected attendance fees."""
import argparse
import json
import logging
import os
import re
import urllib.request
from datetime import datetime, timedelta
from html.parser import HTMLParser
logger = logging.getLogger(__name__)
from attendance import get_members_with_fees
from czech_utils import normalize, parse_month_references
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
def canonical_member_key(name: str) -> str:
"""Diacritic-, case-, and whitespace-insensitive key for member-name matching.
Used to resolve `Person`-column values from the payments sheet to canonical
attendance-sheet names, tolerating cells like "Maria Maco" vs "Mária Maco".
"""
return re.sub(r"\s+", " ", normalize(name)).strip()
# ---------------------------------------------------------------------------
# Name matching
# ---------------------------------------------------------------------------
def _build_name_variants(name: str) -> list[str]:
"""Build searchable name variants from a member name.
E.g. 'František Vrbík (Štrúdl)' → ['frantisek vrbik', 'strudl', 'vrbik']
"""
# Extract nickname from parentheses
nickname_match = re.search(r"\(([^)]+)\)", name)
nickname = nickname_match.group(1) if nickname_match else ""
# Base name without nickname
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
normalized_base = normalize(base)
normalized_nick = normalize(nickname)
variants = [normalized_base]
if normalized_nick:
variants.append(normalized_nick)
# Also add last name alone (for matching in messages)
parts = normalized_base.split()
if len(parts) >= 2:
variants.append(parts[-1]) # last name
variants.append(parts[0]) # first name
return [v for v in variants if len(v) >= 3]
def _word_in(needle: str, haystack: str) -> bool:
"""Return True if needle appears as a whole word in haystack."""
return bool(re.search(rf"\b{re.escape(needle)}\b", haystack))
def match_members(
text: str, member_names: list[str]
) -> list[tuple[str, str]]:
"""Find members mentioned in text.
Returns list of (member_name, confidence) where confidence is 'auto' or 'review'.
"""
normalized_text = normalize(text)
# Short-circuit: if any member's full canonical name appears verbatim (whole words),
# return only those matches and skip all fuzzy/nickname checks. This prevents a
# nickname that is a substring of another member's surname from producing false hits.
exact_matches = []
for name in member_names:
variants = _build_name_variants(name)
full_name = variants[0] if variants else ""
if full_name and _word_in(full_name, normalized_text):
exact_matches.append((name, "auto"))
if exact_matches:
return exact_matches
matches = []
for name in member_names:
variants = _build_name_variants(name)
full_name = variants[0] if variants else ""
parts = full_name.split()
# 1. Full name match (exact sequence) = high confidence
if full_name and full_name in normalized_text:
matches.append((name, "auto"))
continue
# 2. Both first and last name present (any order) = high confidence
if len(parts) >= 2:
if _word_in(parts[0], normalized_text) and _word_in(parts[-1], normalized_text):
matches.append((name, "auto"))
continue
# 3. Nickname present = high confidence
nickname = ""
nickname_match = re.search(r"\(([^)]+)\)", name)
if nickname_match:
nickname = normalize(nickname_match.group(1))
if nickname and _word_in(nickname, normalized_text):
matches.append((name, "auto"))
continue
# 4. Partial matches = review confidence
if len(parts) >= 2:
first_name = parts[0]
last_name = parts[-1]
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
if len(last_name) >= 4 and last_name not in _COMMON_SURNAMES and _word_in(last_name, normalized_text):
matches.append((name, "review"))
continue
if len(first_name) >= 3 and _word_in(first_name, normalized_text):
matches.append((name, "review"))
continue
elif len(parts) == 1:
if len(parts[0]) >= 4 and _word_in(parts[0], normalized_text):
matches.append((name, "review"))
continue
# --- Filtering ---
# If we have any "auto" matches, discard all "review" matches
auto_matches = [m for m in matches if m[1] == "auto"]
if auto_matches:
return auto_matches
return matches
# ---------------------------------------------------------------------------
# Reconciliation
# ---------------------------------------------------------------------------
def infer_transaction_details(tx: dict, member_names: list[str]) -> dict:
"""Infer member(s) and month(s) for a single transaction.
Returns:
{
'members': [(name, confidence)],
'months': [YYYY-MM],
'matched_text': str
}
"""
# Combine sender + message for searching
search_text = f"{tx.get('sender', '')} {tx.get('message', '')} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx.get("message", "") + " " + tx.get("user_id", "")
)
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx.get("sender", ""), member_names)
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx.get("date")
if tx_date:
try:
if isinstance(tx_date, (int, float)):
# Handle Google Sheets serial date
dt = datetime(1899, 12, 30) + timedelta(days=tx_date)
else:
dt = datetime.strptime(str(tx_date), "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except (ValueError, TypeError):
pass
return {
"members": matched_members,
"months": matched_months,
"search_text": search_text
}
def format_date(val) -> str:
"""Normalize date from Google Sheet (handles serial numbers and strings)."""
if val is None or val == "":
return ""
# Handle Google Sheets serial dates (number of days since 1899-12-30)
if isinstance(val, (int, float)):
base_date = datetime(1899, 12, 30)
dt = base_date + timedelta(days=val)
return dt.strftime("%Y-%m-%d")
val_str = str(val).strip()
if not val_str:
return ""
# If already YYYY-MM-DD, return as is
if len(val_str) == 10 and val_str[4] == "-" and val_str[7] == "-":
return val_str
return val_str
def fetch_sheet_data(spreadsheet_id: str, credentials_path: str) -> list[dict]:
"""Fetch all rows from the Google Sheet and convert to a list of dicts."""
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:Z",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
if not rows:
return []
header = rows[0]
def get_col_index(label):
normalized_label = label.lower().strip()
for i, h in enumerate(header):
if h.lower().strip() == normalized_label:
return i
return -1
idx_date = get_col_index("Date")
idx_amount = get_col_index("Amount")
idx_manual = get_col_index("manual fix")
idx_person = get_col_index("Person")
idx_purpose = get_col_index("Purpose")
idx_inferred_amount = get_col_index("Inferred Amount")
idx_sender = get_col_index("Sender")
idx_message = get_col_index("Message")
idx_bank_id = get_col_index("Bank ID")
idx_vs = get_col_index("VS")
idx_sync_id = get_col_index("Sync ID")
required = {"Date": idx_date, "Amount": idx_amount, "Person": idx_person, "Purpose": idx_purpose}
missing = [name for name, idx in required.items() if idx == -1]
if missing:
raise ValueError(f"Required columns missing from payments sheet: {', '.join(missing)}. Found headers: {header}")
transactions = []
for row in rows[1:]:
def get_val(idx):
return row[idx] if idx != -1 and idx < len(row) else ""
tx = {
"date": format_date(get_val(idx_date)),
"amount": get_val(idx_amount),
"manual_fix": get_val(idx_manual),
"person": get_val(idx_person),
"purpose": get_val(idx_purpose),
"inferred_amount": get_val(idx_inferred_amount),
"sender": get_val(idx_sender),
"vs": get_val(idx_vs),
"message": get_val(idx_message),
"bank_id": get_val(idx_bank_id),
"sync_id": get_val(idx_sync_id),
}
transactions.append(tx)
return transactions
def fetch_exceptions(spreadsheet_id: str, credentials_path: str) -> dict[tuple[str, str], dict]:
"""Fetch manual fee overrides from the 'exceptions' sheet.
Returns a dict mapping (member_name, period_YYYYMM) to {'amount': int, 'note': str}.
"""
service = get_sheets_service(credentials_path)
try:
result = service.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range="'exceptions'!A2:D",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
except Exception as e:
print(f"Warning: Could not fetch exceptions: {e}")
return {}
exceptions = {}
for row in rows:
if len(row) < 3 or str(row[0]).lower().startswith("name"):
continue
name = str(row[0]).strip()
period = str(row[1]).strip()
# Robust normalization using czech_utils.normalize
norm_name = normalize(name)
norm_period = normalize(period)
try:
amount = int(row[2])
note = str(row[3]).strip() if len(row) > 3 else ""
exceptions[(norm_name, norm_period)] = {"amount": amount, "note": note}
except (ValueError, TypeError):
continue
return exceptions
def reconcile(
members: list[tuple[str, str, dict[str, int]]],
sorted_months: list[str],
transactions: list[dict],
exceptions: dict[tuple[str, str], dict] = None,
) -> dict:
"""Match transactions to members and months.
Returns a dict with:
- 'members': {name: {'tier': str, 'months': {YYYY-MM: {'expected': int, 'paid': int, 'transactions': list}}}}
- 'unmatched': list of transactions that couldn't be matched
- 'credits': {name: int} — excess payments tracked as credit
"""
member_names = [name for name, _, _ in members]
member_tiers = {name: tier for name, tier, _ in members}
member_fees = {name: fees for name, _, fees in members}
# Map canonical key → first attendance-sheet name with that key, so a
# `Person` cell that drifts in diacritics/case/whitespace still resolves.
canonical_by_key: dict[str, str] = {}
for name in member_names:
canonical_by_key.setdefault(canonical_member_key(name), name)
# Initialize ledger
ledger: dict[str, dict[str, dict]] = {}
other_ledger: dict[str, list] = {}
exceptions = exceptions or {}
for name in member_names:
ledger[name] = {}
other_ledger[name] = []
for m in sorted_months:
# Robust normalization for lookup
norm_name = normalize(name)
norm_period = normalize(m)
fee_data = member_fees[name].get(m, (0, 0))
original_expected = fee_data[0] if isinstance(fee_data, (tuple, list)) else fee_data
attendance_count = fee_data[1] if isinstance(fee_data, (tuple, list)) else 0
ex_data = exceptions.get((norm_name, norm_period))
if ex_data is not None:
expected = ex_data["amount"]
exception_info = ex_data
else:
expected = original_expected
exception_info = None
ledger[name][m] = {
"expected": expected,
"original_expected": original_expected,
"attendance_count": attendance_count,
"exception": exception_info,
"paid": 0,
"transactions": [],
}
unmatched = []
credits: dict[str, int] = {}
for tx in transactions:
# Use sheet columns if they exist, otherwise fallback to inference
person_str = str(tx.get("person", "")).strip()
purpose_str = str(tx.get("purpose", "")).strip()
# Strip markers like [?]
person_str = re.sub(r"\[\?\]\s*", "", person_str)
is_other = purpose_str.lower().startswith("other:")
if person_str and purpose_str:
# We have pre-matched data (either from script or manual)
# Support multiple people/months in the comma-separated string
matched_members = [(p.strip(), "auto") for p in person_str.split(",") if p.strip()]
matched_months = [purpose_str] if is_other else [m.strip() for m in purpose_str.split(",") if m.strip()]
# Use Inferred Amount if available, otherwise bank Amount
amount = tx.get("inferred_amount")
if amount is None or amount == "":
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
else:
# Fallback to inference (for rows not yet processed by infer_payments.py)
inference = infer_transaction_details(tx, member_names)
matched_members = inference["members"]
matched_months = inference["months"]
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
if not matched_members or not matched_months:
unmatched.append(tx)
continue
# Allocate payment across matched members and months
if is_other:
num_allocations = len(matched_members)
per_allocation = amount / num_allocations if num_allocations > 0 else 0
for raw_member_name, confidence in matched_members:
member_name = canonical_by_key.get(canonical_member_key(raw_member_name))
if member_name is not None:
other_ledger[member_name].append({
"amount": per_allocation,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"purpose": purpose_str,
"confidence": confidence,
})
continue
member_share = amount / len(matched_members) if matched_members else 0
for raw_member_name, confidence in matched_members:
member_name = canonical_by_key.get(canonical_member_key(raw_member_name))
if member_name is None:
logger.warning(
"Payment matched to unknown member %r (tx: %s, %s) — adding to unmatched",
raw_member_name, tx.get("date", "?"), tx.get("message", "?"),
)
unmatched.append(tx)
continue
if member_name != raw_member_name:
logger.info(
"Person cell %r resolved to canonical member %r — consider fixing the sheet",
raw_member_name, member_name,
)
in_window = [(m, ledger[member_name][m]["expected"]) for m in matched_months if m in ledger[member_name]]
out_of_window = [m for m in matched_months if m not in ledger[member_name]]
# Out-of-window months (outside display range): even split → credit, same as before.
n_total = len(matched_months)
if out_of_window and n_total > 0:
out_credit = member_share / n_total * len(out_of_window)
credits[member_name] = credits.get(member_name, 0) + int(out_credit)
else:
out_credit = 0.0
in_window_share = member_share - out_credit
if not in_window:
continue
total_expected = sum(e for _, e in in_window)
if total_expected > 0 and in_window_share >= total_expected:
# Greedy phase: payment covers all in-window fees; overflow → credit.
credits[member_name] = credits.get(member_name, 0) + int(in_window_share - total_expected)
for m, exp in in_window:
alloc = float(exp)
ledger[member_name][m]["paid"] += alloc
ledger[member_name][m]["transactions"].append({
"amount": alloc,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
elif total_expected > 0:
# Proportional phase: distribute in_window_share by each month's expected fee.
# Last month absorbs any float remainder so the sum equals in_window_share exactly.
remaining = in_window_share
for i, (m, exp) in enumerate(in_window):
alloc = remaining if i == len(in_window) - 1 else in_window_share * exp / total_expected
remaining -= alloc
ledger[member_name][m]["paid"] += alloc
ledger[member_name][m]["transactions"].append({
"amount": alloc,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
else:
# Fallback: no expected fees (prepayment before attendance recorded); even split.
per_month = in_window_share / len(in_window)
for m, _ in in_window:
ledger[member_name][m]["paid"] += per_month
ledger[member_name][m]["transactions"].append({
"amount": per_month,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
# Calculate final total balances (window + off-window credits)
final_balances: dict[str, int] = {}
for name in member_names:
window_balance = sum(
int(mdata["paid"]) - (mdata["expected"] if isinstance(mdata["expected"], int) else 0)
for mdata in ledger[name].values()
)
final_balances[name] = window_balance + credits.get(name, 0)
return {
"members": {
name: {
"tier": member_tiers[name],
"months": ledger[name],
"other_transactions": other_ledger[name],
"total_balance": final_balances[name]
}
for name in member_names
},
"unmatched": unmatched,
"credits": final_balances, # Redefine credits as any positive total balance
}
# ---------------------------------------------------------------------------
# Report output
# ---------------------------------------------------------------------------
def print_report(result: dict, sorted_months: list[str]):
month_labels = {
m: datetime.strptime(m, "%Y-%m").strftime("%b %Y") for m in sorted_months
}
# --- Per-member breakdown (adults only) ---
print("=" * 80)
print("PAYMENT RECONCILIATION REPORT")
print("=" * 80)
adults = {
name: data
for name, data in result["members"].items()
if data["tier"] == "A"
}
total_expected = 0
total_paid = 0
# Summary table
name_width = max((len(n) for n in adults), default=20)
header = f"{'Member':<{name_width}}"
for m in sorted_months:
header += f" | {month_labels[m]:>10}"
header += " | {'Balance':>10}"
print(f"\n{'Member':<{name_width}}", end="")
for m in sorted_months:
print(f" | {month_labels[m]:>10}", end="")
print(f" | {'Balance':>10}")
print("-" * (name_width + (len(sorted_months) + 1) * 13))
for name in sorted(adults.keys()):
data = adults[name]
line = f"{name:<{name_width}}"
member_balance = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
expected = mdata["expected"]
original = mdata["original_expected"]
paid = int(mdata["paid"])
total_expected += expected
total_paid += paid
cell_status = ""
if expected == 0 and paid == 0:
cell = "-"
elif paid >= expected and expected > 0:
cell = "OK"
elif paid > 0:
cell = f"{paid}/{expected}"
else:
cell = f"UNPAID {expected}"
member_balance += paid - expected
line += f" | {cell:>10}"
balance_str = f"{member_balance:+d}" if member_balance != 0 else "0"
line += f" | {balance_str:>10}"
print(line)
print("-" * (name_width + (len(sorted_months) + 1) * 13))
print(f"{'TOTAL':<{name_width}}", end="")
for _ in sorted_months:
print(f" | {'':>10}", end="")
balance = total_paid - total_expected
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
# --- Credits (Total Surplus) ---
all_credits = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] > 0
}
if all_credits:
print(f"\n{'TOTAL CREDITS (advance payments or surplus):'}")
for name, amount in sorted(all_credits.items()):
print(f" {name}: {amount} CZK")
# --- Debts (Missing Payments) ---
all_debts = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] < 0
}
if all_debts:
print(f"\n{'TOTAL DEBTS (missing payments):'}")
for name, amount in sorted(all_debts.items()):
print(f" {name}: {abs(amount)} CZK")
# --- Unmatched transactions ---
if result["unmatched"]:
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
print(f" {'Date':<12} {'Amount':>10} {'Sender':<30} {'Message'}")
print(f" {'-'*12} {'-'*10} {'-'*30} {'-'*30}")
for tx in result["unmatched"]:
print(
f" {tx['date']:<12} {tx['amount']:>10.0f} "
f"{tx['sender']:<30} {tx['message']}"
)
# --- Detailed matched transactions ---
print(f"\n{'MATCHED TRANSACTION DETAILS':}")
for name in sorted(adults.keys()):
data = adults[name]
has_payments = any(
data["months"].get(m, {}).get("transactions")
for m in sorted_months
)
if not has_payments:
continue
print(f"\n {name}:")
for m in sorted_months:
mdata = data["months"].get(m, {})
for tx in mdata.get("transactions", []):
conf = " [REVIEW]" if tx["confidence"] == "review" else ""
print(
f" {month_labels[m]}: {tx['amount']:.0f} CZK "
f"from {tx['sender']}\"{tx['message']}\"{conf}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Match bank payments against expected attendance fees."
)
parser.add_argument(
"--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID"
)
parser.add_argument(
"--credentials", default=".secret/fuj-management-bot-credentials.json",
help="Path to Google API credentials JSON"
)
parser.add_argument(
"--bank", action="store_true", help="Scrape bank instead of using Sheet data"
)
args = parser.parse_args()
print(f"Fetching attendance data...")
members, sorted_months = get_members_with_fees()
if not members:
print("No attendance data found.")
return
if args.bank:
print(f"Fetching transactions from Fio bank ({args.date_from} to {args.date_to})...")
from fio_utils import fetch_transactions
transactions = fetch_transactions(args.date_from, args.date_to)
else:
print(f"Fetching transactions from Google Sheet ({args.sheet_id})...")
transactions = fetch_sheet_data(args.sheet_id, args.credentials)
print(f"Processing {len(transactions)} transactions.\n")
exceptions = fetch_exceptions(args.sheet_id, args.credentials)
if exceptions:
print(f"Loaded {len(exceptions)} fee exceptions.")
result = reconcile(members, sorted_months, transactions, exceptions)
print_report(result, sorted_months)
if __name__ == "__main__":
main()