Files
fuj-management/scripts/match_payments.py
Jan Novak dfdf2aacb8
All checks were successful
Build and Push / build (push) Successful in 33s
Deploy to K8s / deploy (push) Successful in 12s
fix: Distribute multi-month payments by per-month expected fee
reconcile() previously split a multi-month payment evenly across months,
which falsely flagged months as underpaid when their expected fees
differed (e.g. 1250 CZK for 02+03+04 2026 with rates 750/350/150 was
shown as 416/month with two months red).

The allocation now runs per matched member: greedy when the share covers
the total expected (each month gets its expected fee, surplus -> credit),
proportional by expected fee otherwise. Out-of-window months keep the
previous even-split-to-credit behavior. 6 new test cases.

Also adds CHANGELOG.md and a changelog convention in CLAUDE.md.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-03 19:38:10 +02:00

654 lines
24 KiB
Python

#!/usr/bin/env python3
"""Match Fio bank payments against expected attendance fees."""
import argparse
import json
import logging
import os
import re
import urllib.request
from datetime import datetime, timedelta
from html.parser import HTMLParser
logger = logging.getLogger(__name__)
from attendance import get_members_with_fees
from czech_utils import normalize, parse_month_references
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
# ---------------------------------------------------------------------------
# Name matching
# ---------------------------------------------------------------------------
def _build_name_variants(name: str) -> list[str]:
"""Build searchable name variants from a member name.
E.g. 'František Vrbík (Štrúdl)' → ['frantisek vrbik', 'strudl', 'vrbik']
"""
# Extract nickname from parentheses
nickname_match = re.search(r"\(([^)]+)\)", name)
nickname = nickname_match.group(1) if nickname_match else ""
# Base name without nickname
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
normalized_base = normalize(base)
normalized_nick = normalize(nickname)
variants = [normalized_base]
if normalized_nick:
variants.append(normalized_nick)
# Also add last name alone (for matching in messages)
parts = normalized_base.split()
if len(parts) >= 2:
variants.append(parts[-1]) # last name
variants.append(parts[0]) # first name
return [v for v in variants if len(v) >= 3]
def match_members(
text: str, member_names: list[str]
) -> list[tuple[str, str]]:
"""Find members mentioned in text.
Returns list of (member_name, confidence) where confidence is 'auto' or 'review'.
"""
normalized_text = normalize(text)
matches = []
for name in member_names:
variants = _build_name_variants(name)
full_name = variants[0] if variants else ""
parts = full_name.split()
# 1. Full name match (exact sequence) = high confidence
if full_name and full_name in normalized_text:
matches.append((name, "auto"))
continue
# 2. Both first and last name present (any order) = high confidence
if len(parts) >= 2:
if parts[0] in normalized_text and parts[-1] in normalized_text:
matches.append((name, "auto"))
continue
# 3. Nickname + one part of the name = high confidence
nickname = ""
nickname_match = re.search(r"\(([^)]+)\)", name)
if nickname_match:
nickname = normalize(nickname_match.group(1))
if nickname and nickname in normalized_text:
# Nickname alone is often enough, but let's check if it's combined with a name part
matches.append((name, "auto"))
continue
# 4. Partial matches = review confidence
if len(parts) >= 2:
first_name = parts[0]
last_name = parts[-1]
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
# Match last name
if len(last_name) >= 4 and last_name not in _COMMON_SURNAMES and last_name in normalized_text:
matches.append((name, "review"))
continue
# Match first name (if not too short)
if len(first_name) >= 3 and first_name in normalized_text:
matches.append((name, "review"))
continue
elif len(parts) == 1:
# Single name member
if len(parts[0]) >= 4 and parts[0] in normalized_text:
matches.append((name, "review"))
continue
# --- Filtering ---
# If we have any "auto" matches, discard all "review" matches
auto_matches = [m for m in matches if m[1] == "auto"]
if auto_matches:
# If multiple auto matches, keep them (ambiguous but high priority)
return auto_matches
return matches
# ---------------------------------------------------------------------------
# Reconciliation
# ---------------------------------------------------------------------------
def infer_transaction_details(tx: dict, member_names: list[str]) -> dict:
"""Infer member(s) and month(s) for a single transaction.
Returns:
{
'members': [(name, confidence)],
'months': [YYYY-MM],
'matched_text': str
}
"""
# Combine sender + message for searching
search_text = f"{tx.get('sender', '')} {tx.get('message', '')} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx.get("message", "") + " " + tx.get("user_id", "")
)
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx.get("sender", ""), member_names)
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx.get("date")
if tx_date:
try:
if isinstance(tx_date, (int, float)):
# Handle Google Sheets serial date
dt = datetime(1899, 12, 30) + timedelta(days=tx_date)
else:
dt = datetime.strptime(str(tx_date), "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except (ValueError, TypeError):
pass
return {
"members": matched_members,
"months": matched_months,
"search_text": search_text
}
def format_date(val) -> str:
"""Normalize date from Google Sheet (handles serial numbers and strings)."""
if val is None or val == "":
return ""
# Handle Google Sheets serial dates (number of days since 1899-12-30)
if isinstance(val, (int, float)):
base_date = datetime(1899, 12, 30)
dt = base_date + timedelta(days=val)
return dt.strftime("%Y-%m-%d")
val_str = str(val).strip()
if not val_str:
return ""
# If already YYYY-MM-DD, return as is
if len(val_str) == 10 and val_str[4] == "-" and val_str[7] == "-":
return val_str
return val_str
def fetch_sheet_data(spreadsheet_id: str, credentials_path: str) -> list[dict]:
"""Fetch all rows from the Google Sheet and convert to a list of dicts."""
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:Z",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
if not rows:
return []
header = rows[0]
def get_col_index(label):
normalized_label = label.lower().strip()
for i, h in enumerate(header):
if h.lower().strip() == normalized_label:
return i
return -1
idx_date = get_col_index("Date")
idx_amount = get_col_index("Amount")
idx_manual = get_col_index("manual fix")
idx_person = get_col_index("Person")
idx_purpose = get_col_index("Purpose")
idx_inferred_amount = get_col_index("Inferred Amount")
idx_sender = get_col_index("Sender")
idx_message = get_col_index("Message")
idx_bank_id = get_col_index("Bank ID")
required = {"Date": idx_date, "Amount": idx_amount, "Person": idx_person, "Purpose": idx_purpose}
missing = [name for name, idx in required.items() if idx == -1]
if missing:
raise ValueError(f"Required columns missing from payments sheet: {', '.join(missing)}. Found headers: {header}")
transactions = []
for row in rows[1:]:
def get_val(idx):
return row[idx] if idx != -1 and idx < len(row) else ""
tx = {
"date": format_date(get_val(idx_date)),
"amount": get_val(idx_amount),
"manual_fix": get_val(idx_manual),
"person": get_val(idx_person),
"purpose": get_val(idx_purpose),
"inferred_amount": get_val(idx_inferred_amount),
"sender": get_val(idx_sender),
"message": get_val(idx_message),
"bank_id": get_val(idx_bank_id),
}
transactions.append(tx)
return transactions
def fetch_exceptions(spreadsheet_id: str, credentials_path: str) -> dict[tuple[str, str], dict]:
"""Fetch manual fee overrides from the 'exceptions' sheet.
Returns a dict mapping (member_name, period_YYYYMM) to {'amount': int, 'note': str}.
"""
service = get_sheets_service(credentials_path)
try:
result = service.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range="'exceptions'!A2:D",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
except Exception as e:
print(f"Warning: Could not fetch exceptions: {e}")
return {}
exceptions = {}
for row in rows:
if len(row) < 3 or str(row[0]).lower().startswith("name"):
continue
name = str(row[0]).strip()
period = str(row[1]).strip()
# Robust normalization using czech_utils.normalize
norm_name = normalize(name)
norm_period = normalize(period)
try:
amount = int(row[2])
note = str(row[3]).strip() if len(row) > 3 else ""
exceptions[(norm_name, norm_period)] = {"amount": amount, "note": note}
except (ValueError, TypeError):
continue
return exceptions
def reconcile(
members: list[tuple[str, str, dict[str, int]]],
sorted_months: list[str],
transactions: list[dict],
exceptions: dict[tuple[str, str], dict] = None,
) -> dict:
"""Match transactions to members and months.
Returns a dict with:
- 'members': {name: {'tier': str, 'months': {YYYY-MM: {'expected': int, 'paid': int, 'transactions': list}}}}
- 'unmatched': list of transactions that couldn't be matched
- 'credits': {name: int} — excess payments tracked as credit
"""
member_names = [name for name, _, _ in members]
member_tiers = {name: tier for name, tier, _ in members}
member_fees = {name: fees for name, _, fees in members}
# Initialize ledger
ledger: dict[str, dict[str, dict]] = {}
other_ledger: dict[str, list] = {}
exceptions = exceptions or {}
for name in member_names:
ledger[name] = {}
other_ledger[name] = []
for m in sorted_months:
# Robust normalization for lookup
norm_name = normalize(name)
norm_period = normalize(m)
fee_data = member_fees[name].get(m, (0, 0))
original_expected = fee_data[0] if isinstance(fee_data, (tuple, list)) else fee_data
attendance_count = fee_data[1] if isinstance(fee_data, (tuple, list)) else 0
ex_data = exceptions.get((norm_name, norm_period))
if ex_data is not None:
expected = ex_data["amount"]
exception_info = ex_data
else:
expected = original_expected
exception_info = None
ledger[name][m] = {
"expected": expected,
"original_expected": original_expected,
"attendance_count": attendance_count,
"exception": exception_info,
"paid": 0,
"transactions": [],
}
unmatched = []
credits: dict[str, int] = {}
for tx in transactions:
# Use sheet columns if they exist, otherwise fallback to inference
person_str = str(tx.get("person", "")).strip()
purpose_str = str(tx.get("purpose", "")).strip()
# Strip markers like [?]
person_str = re.sub(r"\[\?\]\s*", "", person_str)
is_other = purpose_str.lower().startswith("other:")
if person_str and purpose_str:
# We have pre-matched data (either from script or manual)
# Support multiple people/months in the comma-separated string
matched_members = [(p.strip(), "auto") for p in person_str.split(",") if p.strip()]
matched_months = [purpose_str] if is_other else [m.strip() for m in purpose_str.split(",") if m.strip()]
# Use Inferred Amount if available, otherwise bank Amount
amount = tx.get("inferred_amount")
if amount is None or amount == "":
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
else:
# Fallback to inference (for rows not yet processed by infer_payments.py)
inference = infer_transaction_details(tx, member_names)
matched_members = inference["members"]
matched_months = inference["months"]
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
if not matched_members or not matched_months:
unmatched.append(tx)
continue
# Allocate payment across matched members and months
if is_other:
num_allocations = len(matched_members)
per_allocation = amount / num_allocations if num_allocations > 0 else 0
for member_name, confidence in matched_members:
if member_name in other_ledger:
other_ledger[member_name].append({
"amount": per_allocation,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"purpose": purpose_str,
"confidence": confidence,
})
continue
member_share = amount / len(matched_members) if matched_members else 0
for member_name, confidence in matched_members:
if member_name not in ledger:
logger.warning(
"Payment matched to unknown member %r (tx: %s, %s) — adding to unmatched",
member_name, tx.get("date", "?"), tx.get("message", "?"),
)
unmatched.append(tx)
continue
in_window = [(m, ledger[member_name][m]["expected"]) for m in matched_months if m in ledger[member_name]]
out_of_window = [m for m in matched_months if m not in ledger[member_name]]
# Out-of-window months (outside display range): even split → credit, same as before.
n_total = len(matched_months)
if out_of_window and n_total > 0:
out_credit = member_share / n_total * len(out_of_window)
credits[member_name] = credits.get(member_name, 0) + int(out_credit)
else:
out_credit = 0.0
in_window_share = member_share - out_credit
if not in_window:
continue
total_expected = sum(e for _, e in in_window)
if total_expected > 0 and in_window_share >= total_expected:
# Greedy phase: payment covers all in-window fees; overflow → credit.
credits[member_name] = credits.get(member_name, 0) + int(in_window_share - total_expected)
for m, exp in in_window:
alloc = float(exp)
ledger[member_name][m]["paid"] += alloc
ledger[member_name][m]["transactions"].append({
"amount": alloc,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
elif total_expected > 0:
# Proportional phase: distribute in_window_share by each month's expected fee.
# Last month absorbs any float remainder so the sum equals in_window_share exactly.
remaining = in_window_share
for i, (m, exp) in enumerate(in_window):
alloc = remaining if i == len(in_window) - 1 else in_window_share * exp / total_expected
remaining -= alloc
ledger[member_name][m]["paid"] += alloc
ledger[member_name][m]["transactions"].append({
"amount": alloc,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
else:
# Fallback: no expected fees (prepayment before attendance recorded); even split.
per_month = in_window_share / len(in_window)
for m, _ in in_window:
ledger[member_name][m]["paid"] += per_month
ledger[member_name][m]["transactions"].append({
"amount": per_month,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
})
# Calculate final total balances (window + off-window credits)
final_balances: dict[str, int] = {}
for name in member_names:
window_balance = sum(
int(mdata["paid"]) - (mdata["expected"] if isinstance(mdata["expected"], int) else 0)
for mdata in ledger[name].values()
)
final_balances[name] = window_balance + credits.get(name, 0)
return {
"members": {
name: {
"tier": member_tiers[name],
"months": ledger[name],
"other_transactions": other_ledger[name],
"total_balance": final_balances[name]
}
for name in member_names
},
"unmatched": unmatched,
"credits": final_balances, # Redefine credits as any positive total balance
}
# ---------------------------------------------------------------------------
# Report output
# ---------------------------------------------------------------------------
def print_report(result: dict, sorted_months: list[str]):
month_labels = {
m: datetime.strptime(m, "%Y-%m").strftime("%b %Y") for m in sorted_months
}
# --- Per-member breakdown (adults only) ---
print("=" * 80)
print("PAYMENT RECONCILIATION REPORT")
print("=" * 80)
adults = {
name: data
for name, data in result["members"].items()
if data["tier"] == "A"
}
total_expected = 0
total_paid = 0
# Summary table
name_width = max((len(n) for n in adults), default=20)
header = f"{'Member':<{name_width}}"
for m in sorted_months:
header += f" | {month_labels[m]:>10}"
header += " | {'Balance':>10}"
print(f"\n{'Member':<{name_width}}", end="")
for m in sorted_months:
print(f" | {month_labels[m]:>10}", end="")
print(f" | {'Balance':>10}")
print("-" * (name_width + (len(sorted_months) + 1) * 13))
for name in sorted(adults.keys()):
data = adults[name]
line = f"{name:<{name_width}}"
member_balance = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
expected = mdata["expected"]
original = mdata["original_expected"]
paid = int(mdata["paid"])
total_expected += expected
total_paid += paid
cell_status = ""
if expected == 0 and paid == 0:
cell = "-"
elif paid >= expected and expected > 0:
cell = "OK"
elif paid > 0:
cell = f"{paid}/{expected}"
else:
cell = f"UNPAID {expected}"
member_balance += paid - expected
line += f" | {cell:>10}"
balance_str = f"{member_balance:+d}" if member_balance != 0 else "0"
line += f" | {balance_str:>10}"
print(line)
print("-" * (name_width + (len(sorted_months) + 1) * 13))
print(f"{'TOTAL':<{name_width}}", end="")
for _ in sorted_months:
print(f" | {'':>10}", end="")
balance = total_paid - total_expected
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
# --- Credits (Total Surplus) ---
all_credits = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] > 0
}
if all_credits:
print(f"\n{'TOTAL CREDITS (advance payments or surplus):'}")
for name, amount in sorted(all_credits.items()):
print(f" {name}: {amount} CZK")
# --- Debts (Missing Payments) ---
all_debts = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] < 0
}
if all_debts:
print(f"\n{'TOTAL DEBTS (missing payments):'}")
for name, amount in sorted(all_debts.items()):
print(f" {name}: {abs(amount)} CZK")
# --- Unmatched transactions ---
if result["unmatched"]:
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
print(f" {'Date':<12} {'Amount':>10} {'Sender':<30} {'Message'}")
print(f" {'-'*12} {'-'*10} {'-'*30} {'-'*30}")
for tx in result["unmatched"]:
print(
f" {tx['date']:<12} {tx['amount']:>10.0f} "
f"{tx['sender']:<30} {tx['message']}"
)
# --- Detailed matched transactions ---
print(f"\n{'MATCHED TRANSACTION DETAILS':}")
for name in sorted(adults.keys()):
data = adults[name]
has_payments = any(
data["months"].get(m, {}).get("transactions")
for m in sorted_months
)
if not has_payments:
continue
print(f"\n {name}:")
for m in sorted_months:
mdata = data["months"].get(m, {})
for tx in mdata.get("transactions", []):
conf = " [REVIEW]" if tx["confidence"] == "review" else ""
print(
f" {month_labels[m]}: {tx['amount']:.0f} CZK "
f"from {tx['sender']}\"{tx['message']}\"{conf}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Match bank payments against expected attendance fees."
)
parser.add_argument(
"--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID"
)
parser.add_argument(
"--credentials", default=".secret/fuj-management-bot-credentials.json",
help="Path to Google API credentials JSON"
)
parser.add_argument(
"--bank", action="store_true", help="Scrape bank instead of using Sheet data"
)
args = parser.parse_args()
print(f"Fetching attendance data...")
members, sorted_months = get_members_with_fees()
if not members:
print("No attendance data found.")
return
if args.bank:
print(f"Fetching transactions from Fio bank ({args.date_from} to {args.date_to})...")
from fio_utils import fetch_transactions
transactions = fetch_transactions(args.date_from, args.date_to)
else:
print(f"Fetching transactions from Google Sheet ({args.sheet_id})...")
transactions = fetch_sheet_data(args.sheet_id, args.credentials)
print(f"Processing {len(transactions)} transactions.\n")
exceptions = fetch_exceptions(args.sheet_id, args.credentials)
if exceptions:
print(f"Loaded {len(exceptions)} fee exceptions.")
result = reconcile(members, sorted_months, transactions, exceptions)
print_report(result, sorted_months)
if __name__ == "__main__":
main()