Files
fuj-management/scripts/match_payments.py

497 lines
18 KiB
Python

#!/usr/bin/env python3
"""Match Fio bank payments against expected attendance fees."""
import argparse
import json
import os
import re
import urllib.request
from datetime import datetime, timedelta
from html.parser import HTMLParser
from attendance import get_members_with_fees
from czech_utils import normalize, parse_month_references
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
# ---------------------------------------------------------------------------
# Name matching
# ---------------------------------------------------------------------------
def _build_name_variants(name: str) -> list[str]:
"""Build searchable name variants from a member name.
E.g. 'František Vrbík (Štrúdl)' → ['frantisek vrbik', 'strudl', 'vrbik']
"""
# Extract nickname from parentheses
nickname_match = re.search(r"\(([^)]+)\)", name)
nickname = nickname_match.group(1) if nickname_match else ""
# Base name without nickname
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
normalized_base = normalize(base)
normalized_nick = normalize(nickname)
variants = [normalized_base]
if normalized_nick:
variants.append(normalized_nick)
# Also add last name alone (for matching in messages)
parts = normalized_base.split()
if len(parts) >= 2:
variants.append(parts[-1]) # last name
variants.append(parts[0]) # first name
return [v for v in variants if len(v) >= 3]
def match_members(
text: str, member_names: list[str]
) -> list[tuple[str, str]]:
"""Find members mentioned in text.
Returns list of (member_name, confidence) where confidence is 'auto' or 'review'.
"""
normalized_text = normalize(text)
matches = []
for name in member_names:
variants = _build_name_variants(name)
full_name = variants[0] if variants else ""
parts = full_name.split()
# 1. Full name match (exact sequence) = high confidence
if full_name and full_name in normalized_text:
matches.append((name, "auto"))
continue
# 2. Both first and last name present (any order) = high confidence
if len(parts) >= 2:
if parts[0] in normalized_text and parts[-1] in normalized_text:
matches.append((name, "auto"))
continue
# 3. Nickname + one part of the name = high confidence
nickname = ""
nickname_match = re.search(r"\(([^)]+)\)", name)
if nickname_match:
nickname = normalize(nickname_match.group(1))
if nickname and nickname in normalized_text:
# Nickname alone is often enough, but let's check if it's combined with a name part
matches.append((name, "auto"))
continue
# 4. Partial matches = review confidence
if len(parts) >= 2:
first_name = parts[0]
last_name = parts[-1]
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
# Match last name
if len(last_name) >= 4 and last_name not in _COMMON_SURNAMES and last_name in normalized_text:
matches.append((name, "review"))
continue
# Match first name (if not too short)
if len(first_name) >= 3 and first_name in normalized_text:
matches.append((name, "review"))
continue
elif len(parts) == 1:
# Single name member
if len(parts[0]) >= 4 and parts[0] in normalized_text:
matches.append((name, "review"))
continue
# --- Filtering ---
# If we have any "auto" matches, discard all "review" matches
auto_matches = [m for m in matches if m[1] == "auto"]
if auto_matches:
# If multiple auto matches, keep them (ambiguous but high priority)
return auto_matches
return matches
# ---------------------------------------------------------------------------
# Reconciliation
# ---------------------------------------------------------------------------
def infer_transaction_details(tx: dict, member_names: list[str]) -> dict:
"""Infer member(s) and month(s) for a single transaction.
Returns:
{
'members': [(name, confidence)],
'months': [YYYY-MM],
'matched_text': str
}
"""
# Combine sender + message for searching
search_text = f"{tx.get('sender', '')} {tx.get('message', '')} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx.get("message", "") + " " + tx.get("user_id", "")
)
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx.get("sender", ""), member_names)
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx.get("date")
if tx_date:
try:
if isinstance(tx_date, (int, float)):
# Handle Google Sheets serial date
dt = datetime(1899, 12, 30) + timedelta(days=tx_date)
else:
dt = datetime.strptime(str(tx_date), "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except (ValueError, TypeError):
pass
return {
"members": matched_members,
"months": matched_months,
"search_text": search_text
}
def fetch_sheet_data(spreadsheet_id: str, credentials_path: str) -> list[dict]:
"""Fetch all rows from the Google Sheet and convert to a list of dicts."""
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:Z",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
if not rows:
return []
header = rows[0]
def get_col_index(label):
normalized_label = label.lower().strip()
for i, h in enumerate(header):
if h.lower().strip() == normalized_label:
return i
return -1
idx_date = get_col_index("Date")
idx_amount = get_col_index("Amount")
idx_manual = get_col_index("manual fix")
idx_person = get_col_index("Person")
idx_purpose = get_col_index("Purpose")
idx_inferred_amount = get_col_index("Inferred Amount")
idx_sender = get_col_index("Sender")
idx_message = get_col_index("Message")
idx_bank_id = get_col_index("Bank ID")
transactions = []
for row in rows[1:]:
def get_val(idx):
return row[idx] if idx != -1 and idx < len(row) else ""
tx = {
"date": get_val(idx_date),
"amount": get_val(idx_amount),
"manual_fix": get_val(idx_manual),
"person": get_val(idx_person),
"purpose": get_val(idx_purpose),
"inferred_amount": get_val(idx_inferred_amount),
"sender": get_val(idx_sender),
"message": get_val(idx_message),
"bank_id": get_val(idx_bank_id),
}
transactions.append(tx)
return transactions
def reconcile(
members: list[tuple[str, str, dict[str, int]]],
sorted_months: list[str],
transactions: list[dict],
) -> dict:
"""Match transactions to members and months.
Returns a dict with:
- 'members': {name: {'tier': str, 'months': {YYYY-MM: {'expected': int, 'paid': int, 'transactions': list}}}}
- 'unmatched': list of transactions that couldn't be matched
- 'credits': {name: int} — excess payments tracked as credit
"""
member_names = [name for name, _, _ in members]
member_tiers = {name: tier for name, tier, _ in members}
member_fees = {name: {m: fee for m, (fee, _) in fees.items()} for name, _, fees in members}
# Initialize ledger
ledger: dict[str, dict[str, dict]] = {}
for name in member_names:
ledger[name] = {}
for m in sorted_months:
ledger[name][m] = {
"expected": member_fees[name].get(m, 0),
"paid": 0,
"transactions": [],
}
unmatched = []
credits: dict[str, int] = {}
for tx in transactions:
# Use sheet columns if they exist, otherwise fallback to inference
person_str = str(tx.get("person", "")).strip()
purpose_str = str(tx.get("purpose", "")).strip()
# Strip markers like [?]
person_str = re.sub(r"\[\?\]\s*", "", person_str)
if person_str and purpose_str:
# We have pre-matched data (either from script or manual)
# Support multiple people/months in the comma-separated string
matched_members = [(p.strip(), "auto") for p in person_str.split(",") if p.strip()]
matched_months = [m.strip() for m in purpose_str.split(",") if m.strip()]
# Use Inferred Amount if available, otherwise bank Amount
amount = tx.get("inferred_amount")
if amount is None or amount == "":
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
else:
# Fallback to inference (for rows not yet processed by infer_payments.py)
inference = infer_transaction_details(tx, member_names)
matched_members = inference["members"]
matched_months = inference["months"]
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
if not matched_members or not matched_months:
unmatched.append(tx)
continue
# Allocate payment across matched members and months
num_allocations = len(matched_members) * len(matched_months)
per_allocation = amount / num_allocations if num_allocations > 0 else 0
for member_name, confidence in matched_members:
# If we matched via sheet 'Person' column, name might be partial or have markers
# but usually it's the exact member name from get_members_with_fees.
# Let's ensure it exists in our ledger.
if member_name not in ledger:
# Try matching by base name if it was Jan Novak (Kačerr) etc.
pass
for month_key in matched_months:
entry = {
"amount": per_allocation,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
}
if month_key in ledger.get(member_name, {}):
ledger[member_name][month_key]["paid"] += per_allocation
ledger[member_name][month_key]["transactions"].append(entry)
else:
# Future month — track as credit
credits[member_name] = credits.get(member_name, 0) + int(per_allocation)
# Calculate final total balances (window + off-window credits)
final_balances: dict[str, int] = {}
for name in member_names:
window_balance = sum(
int(mdata["paid"]) - mdata["expected"]
for mdata in ledger[name].values()
)
final_balances[name] = window_balance + credits.get(name, 0)
return {
"members": {
name: {
"tier": member_tiers[name],
"months": ledger[name],
"total_balance": final_balances[name]
}
for name in member_names
},
"unmatched": unmatched,
"credits": final_balances, # Redefine credits as any positive total balance
}
# ---------------------------------------------------------------------------
# Report output
# ---------------------------------------------------------------------------
def print_report(result: dict, sorted_months: list[str]):
month_labels = {
m: datetime.strptime(m, "%Y-%m").strftime("%b %Y") for m in sorted_months
}
# --- Per-member breakdown (adults only) ---
print("=" * 80)
print("PAYMENT RECONCILIATION REPORT")
print("=" * 80)
adults = {
name: data
for name, data in result["members"].items()
if data["tier"] == "A"
}
total_expected = 0
total_paid = 0
# Summary table
name_width = max((len(n) for n in adults), default=20)
header = f"{'Member':<{name_width}}"
for m in sorted_months:
header += f" | {month_labels[m]:>10}"
header += " | {'Balance':>10}"
print(f"\n{'Member':<{name_width}}", end="")
for m in sorted_months:
print(f" | {month_labels[m]:>10}", end="")
print(f" | {'Balance':>10}")
print("-" * (name_width + (len(sorted_months) + 1) * 13))
for name in sorted(adults.keys()):
data = adults[name]
line = f"{name:<{name_width}}"
member_balance = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
expected = mdata["expected"]
paid = int(mdata["paid"])
total_expected += expected
total_paid += paid
if expected == 0 and paid == 0:
cell = "-"
elif paid >= expected and expected > 0:
cell = "OK"
elif paid > 0:
cell = f"{paid}/{expected}"
else:
cell = f"UNPAID {expected}"
member_balance += paid - expected
line += f" | {cell:>10}"
balance_str = f"{member_balance:+d}" if member_balance != 0 else "0"
line += f" | {balance_str:>10}"
print(line)
print("-" * (name_width + (len(sorted_months) + 1) * 13))
print(f"{'TOTAL':<{name_width}}", end="")
for _ in sorted_months:
print(f" | {'':>10}", end="")
balance = total_paid - total_expected
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
# --- Credits (Total Surplus) ---
all_credits = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] > 0
}
if all_credits:
print(f"\n{'TOTAL CREDITS (advance payments or surplus):'}")
for name, amount in sorted(all_credits.items()):
print(f" {name}: {amount} CZK")
# --- Debts (Missing Payments) ---
all_debts = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] < 0
}
if all_debts:
print(f"\n{'TOTAL DEBTS (missing payments):'}")
for name, amount in sorted(all_debts.items()):
print(f" {name}: {abs(amount)} CZK")
# --- Unmatched transactions ---
if result["unmatched"]:
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
print(f" {'Date':<12} {'Amount':>10} {'Sender':<30} {'Message'}")
print(f" {'-'*12} {'-'*10} {'-'*30} {'-'*30}")
for tx in result["unmatched"]:
print(
f" {tx['date']:<12} {tx['amount']:>10.0f} "
f"{tx['sender']:<30} {tx['message']}"
)
# --- Detailed matched transactions ---
print(f"\n{'MATCHED TRANSACTION DETAILS':}")
for name in sorted(adults.keys()):
data = adults[name]
has_payments = any(
data["months"].get(m, {}).get("transactions")
for m in sorted_months
)
if not has_payments:
continue
print(f"\n {name}:")
for m in sorted_months:
mdata = data["months"].get(m, {})
for tx in mdata.get("transactions", []):
conf = " [REVIEW]" if tx["confidence"] == "review" else ""
print(
f" {month_labels[m]}: {tx['amount']:.0f} CZK "
f"from {tx['sender']}\"{tx['message']}\"{conf}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Match bank payments against expected attendance fees."
)
parser.add_argument(
"--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID"
)
parser.add_argument(
"--credentials", default=".secret/fuj-management-bot-credentials.json",
help="Path to Google API credentials JSON"
)
parser.add_argument(
"--bank", action="store_true", help="Scrape bank instead of using Sheet data"
)
args = parser.parse_args()
print(f"Fetching attendance data...")
members, sorted_months = get_members_with_fees()
if not members:
print("No attendance data found.")
return
if args.bank:
print(f"Fetching transactions from Fio bank ({args.date_from} to {args.date_to})...")
from fio_utils import fetch_transactions
transactions = fetch_transactions(args.date_from, args.date_to)
else:
print(f"Fetching transactions from Google Sheet ({args.sheet_id})...")
transactions = fetch_sheet_data(args.sheet_id, args.credentials)
print(f"Processing {len(transactions)} transactions.\n")
result = reconcile(members, sorted_months, transactions)
print_report(result, sorted_months)
if __name__ == "__main__":
main()