602 lines
21 KiB
Python
602 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Match Fio bank payments against expected attendance fees."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import urllib.request
|
|
from datetime import datetime, timedelta
|
|
from html.parser import HTMLParser
|
|
|
|
from attendance import get_members_with_fees
|
|
from czech_utils import normalize, parse_month_references
|
|
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Name matching
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_name_variants(name: str) -> list[str]:
|
|
"""Build searchable name variants from a member name.
|
|
|
|
E.g. 'František Vrbík (Štrúdl)' → ['frantisek vrbik', 'strudl', 'vrbik']
|
|
"""
|
|
# Extract nickname from parentheses
|
|
nickname_match = re.search(r"\(([^)]+)\)", name)
|
|
nickname = nickname_match.group(1) if nickname_match else ""
|
|
|
|
# Base name without nickname
|
|
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
|
|
normalized_base = normalize(base)
|
|
normalized_nick = normalize(nickname)
|
|
|
|
variants = [normalized_base]
|
|
if normalized_nick:
|
|
variants.append(normalized_nick)
|
|
|
|
# Also add last name alone (for matching in messages)
|
|
parts = normalized_base.split()
|
|
if len(parts) >= 2:
|
|
variants.append(parts[-1]) # last name
|
|
variants.append(parts[0]) # first name
|
|
|
|
return [v for v in variants if len(v) >= 3]
|
|
|
|
|
|
def match_members(
|
|
text: str, member_names: list[str]
|
|
) -> list[tuple[str, str]]:
|
|
"""Find members mentioned in text.
|
|
|
|
Returns list of (member_name, confidence) where confidence is 'auto' or 'review'.
|
|
"""
|
|
normalized_text = normalize(text)
|
|
matches = []
|
|
|
|
for name in member_names:
|
|
variants = _build_name_variants(name)
|
|
full_name = variants[0] if variants else ""
|
|
parts = full_name.split()
|
|
|
|
# 1. Full name match (exact sequence) = high confidence
|
|
if full_name and full_name in normalized_text:
|
|
matches.append((name, "auto"))
|
|
continue
|
|
|
|
# 2. Both first and last name present (any order) = high confidence
|
|
if len(parts) >= 2:
|
|
if parts[0] in normalized_text and parts[-1] in normalized_text:
|
|
matches.append((name, "auto"))
|
|
continue
|
|
|
|
# 3. Nickname + one part of the name = high confidence
|
|
nickname = ""
|
|
nickname_match = re.search(r"\(([^)]+)\)", name)
|
|
if nickname_match:
|
|
nickname = normalize(nickname_match.group(1))
|
|
if nickname and nickname in normalized_text:
|
|
# Nickname alone is often enough, but let's check if it's combined with a name part
|
|
matches.append((name, "auto"))
|
|
continue
|
|
|
|
# 4. Partial matches = review confidence
|
|
if len(parts) >= 2:
|
|
first_name = parts[0]
|
|
last_name = parts[-1]
|
|
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
|
|
|
|
# Match last name
|
|
if len(last_name) >= 4 and last_name not in _COMMON_SURNAMES and last_name in normalized_text:
|
|
matches.append((name, "review"))
|
|
continue
|
|
|
|
# Match first name (if not too short)
|
|
if len(first_name) >= 3 and first_name in normalized_text:
|
|
matches.append((name, "review"))
|
|
continue
|
|
elif len(parts) == 1:
|
|
# Single name member
|
|
if len(parts[0]) >= 4 and parts[0] in normalized_text:
|
|
matches.append((name, "review"))
|
|
continue
|
|
|
|
# --- Filtering ---
|
|
# If we have any "auto" matches, discard all "review" matches
|
|
auto_matches = [m for m in matches if m[1] == "auto"]
|
|
if auto_matches:
|
|
# If multiple auto matches, keep them (ambiguous but high priority)
|
|
return auto_matches
|
|
|
|
return matches
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reconciliation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def infer_transaction_details(tx: dict, member_names: list[str]) -> dict:
|
|
"""Infer member(s) and month(s) for a single transaction.
|
|
|
|
Returns:
|
|
{
|
|
'members': [(name, confidence)],
|
|
'months': [YYYY-MM],
|
|
'matched_text': str
|
|
}
|
|
"""
|
|
# Combine sender + message for searching
|
|
search_text = f"{tx.get('sender', '')} {tx.get('message', '')} {tx.get('user_id', '')}"
|
|
matched_members = match_members(search_text, member_names)
|
|
matched_months = parse_month_references(
|
|
tx.get("message", "") + " " + tx.get("user_id", "")
|
|
)
|
|
|
|
if not matched_members:
|
|
# Try matching sender name alone with more lenient matching
|
|
matched_members = match_members(tx.get("sender", ""), member_names)
|
|
|
|
if not matched_months:
|
|
# If no month specified, try to infer from payment date
|
|
tx_date = tx.get("date")
|
|
if tx_date:
|
|
try:
|
|
if isinstance(tx_date, (int, float)):
|
|
# Handle Google Sheets serial date
|
|
dt = datetime(1899, 12, 30) + timedelta(days=tx_date)
|
|
else:
|
|
dt = datetime.strptime(str(tx_date), "%Y-%m-%d")
|
|
# Assume payment is for the current month
|
|
matched_months = [dt.strftime("%Y-%m")]
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return {
|
|
"members": matched_members,
|
|
"months": matched_months,
|
|
"search_text": search_text
|
|
}
|
|
|
|
|
|
def format_date(val) -> str:
|
|
"""Normalize date from Google Sheet (handles serial numbers and strings)."""
|
|
if val is None or val == "":
|
|
return ""
|
|
|
|
# Handle Google Sheets serial dates (number of days since 1899-12-30)
|
|
if isinstance(val, (int, float)):
|
|
base_date = datetime(1899, 12, 30)
|
|
dt = base_date + timedelta(days=val)
|
|
return dt.strftime("%Y-%m-%d")
|
|
|
|
val_str = str(val).strip()
|
|
if not val_str:
|
|
return ""
|
|
|
|
# If already YYYY-MM-DD, return as is
|
|
if len(val_str) == 10 and val_str[4] == "-" and val_str[7] == "-":
|
|
return val_str
|
|
|
|
return val_str
|
|
|
|
def fetch_sheet_data(spreadsheet_id: str, credentials_path: str) -> list[dict]:
|
|
"""Fetch all rows from the Google Sheet and convert to a list of dicts."""
|
|
service = get_sheets_service(credentials_path)
|
|
sheet = service.spreadsheets()
|
|
|
|
result = sheet.values().get(
|
|
spreadsheetId=spreadsheet_id,
|
|
range="A1:Z",
|
|
valueRenderOption="UNFORMATTED_VALUE"
|
|
).execute()
|
|
rows = result.get("values", [])
|
|
if not rows:
|
|
return []
|
|
|
|
header = rows[0]
|
|
def get_col_index(label):
|
|
normalized_label = label.lower().strip()
|
|
for i, h in enumerate(header):
|
|
if h.lower().strip() == normalized_label:
|
|
return i
|
|
return -1
|
|
|
|
idx_date = get_col_index("Date")
|
|
idx_amount = get_col_index("Amount")
|
|
idx_manual = get_col_index("manual fix")
|
|
idx_person = get_col_index("Person")
|
|
idx_purpose = get_col_index("Purpose")
|
|
idx_inferred_amount = get_col_index("Inferred Amount")
|
|
idx_sender = get_col_index("Sender")
|
|
idx_message = get_col_index("Message")
|
|
idx_bank_id = get_col_index("Bank ID")
|
|
|
|
transactions = []
|
|
for row in rows[1:]:
|
|
def get_val(idx):
|
|
return row[idx] if idx != -1 and idx < len(row) else ""
|
|
|
|
tx = {
|
|
"date": format_date(get_val(idx_date)),
|
|
"amount": get_val(idx_amount),
|
|
"manual_fix": get_val(idx_manual),
|
|
"person": get_val(idx_person),
|
|
"purpose": get_val(idx_purpose),
|
|
"inferred_amount": get_val(idx_inferred_amount),
|
|
"sender": get_val(idx_sender),
|
|
"message": get_val(idx_message),
|
|
"bank_id": get_val(idx_bank_id),
|
|
}
|
|
transactions.append(tx)
|
|
|
|
return transactions
|
|
|
|
|
|
def fetch_exceptions(spreadsheet_id: str, credentials_path: str) -> dict[tuple[str, str], dict]:
|
|
"""Fetch manual fee overrides from the 'exceptions' sheet.
|
|
|
|
Returns a dict mapping (member_name, period_YYYYMM) to {'amount': int, 'note': str}.
|
|
"""
|
|
service = get_sheets_service(credentials_path)
|
|
try:
|
|
result = service.spreadsheets().values().get(
|
|
spreadsheetId=spreadsheet_id,
|
|
range="'exceptions'!A2:D",
|
|
valueRenderOption="UNFORMATTED_VALUE"
|
|
).execute()
|
|
rows = result.get("values", [])
|
|
except Exception as e:
|
|
print(f"Warning: Could not fetch exceptions: {e}")
|
|
return {}
|
|
|
|
exceptions = {}
|
|
for row in rows:
|
|
if len(row) < 3 or str(row[0]).lower().startswith("name"):
|
|
continue
|
|
|
|
name = str(row[0]).strip()
|
|
period = str(row[1]).strip()
|
|
# Robust normalization using czech_utils.normalize
|
|
norm_name = normalize(name)
|
|
norm_period = normalize(period)
|
|
|
|
try:
|
|
amount = int(row[2])
|
|
note = str(row[3]).strip() if len(row) > 3 else ""
|
|
exceptions[(norm_name, norm_period)] = {"amount": amount, "note": note}
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
return exceptions
|
|
|
|
|
|
def reconcile(
|
|
members: list[tuple[str, str, dict[str, int]]],
|
|
sorted_months: list[str],
|
|
transactions: list[dict],
|
|
exceptions: dict[tuple[str, str], dict] = None,
|
|
) -> dict:
|
|
"""Match transactions to members and months.
|
|
|
|
Returns a dict with:
|
|
- 'members': {name: {'tier': str, 'months': {YYYY-MM: {'expected': int, 'paid': int, 'transactions': list}}}}
|
|
- 'unmatched': list of transactions that couldn't be matched
|
|
- 'credits': {name: int} — excess payments tracked as credit
|
|
"""
|
|
member_names = [name for name, _, _ in members]
|
|
member_tiers = {name: tier for name, tier, _ in members}
|
|
member_fees = {name: fees for name, _, fees in members}
|
|
|
|
# Initialize ledger
|
|
ledger: dict[str, dict[str, dict]] = {}
|
|
other_ledger: dict[str, list] = {}
|
|
exceptions = exceptions or {}
|
|
for name in member_names:
|
|
ledger[name] = {}
|
|
other_ledger[name] = []
|
|
for m in sorted_months:
|
|
# Robust normalization for lookup
|
|
norm_name = normalize(name)
|
|
norm_period = normalize(m)
|
|
fee_data = member_fees[name].get(m, (0, 0))
|
|
original_expected = fee_data[0] if isinstance(fee_data, tuple) else fee_data
|
|
attendance_count = fee_data[1] if isinstance(fee_data, tuple) else 0
|
|
|
|
ex_data = exceptions.get((norm_name, norm_period))
|
|
if ex_data is not None:
|
|
expected = ex_data["amount"]
|
|
exception_info = ex_data
|
|
else:
|
|
expected = original_expected
|
|
exception_info = None
|
|
|
|
ledger[name][m] = {
|
|
"expected": expected,
|
|
"original_expected": original_expected,
|
|
"attendance_count": attendance_count,
|
|
"exception": exception_info,
|
|
"paid": 0,
|
|
"transactions": [],
|
|
}
|
|
|
|
unmatched = []
|
|
credits: dict[str, int] = {}
|
|
|
|
for tx in transactions:
|
|
# Use sheet columns if they exist, otherwise fallback to inference
|
|
person_str = str(tx.get("person", "")).strip()
|
|
purpose_str = str(tx.get("purpose", "")).strip()
|
|
|
|
# Strip markers like [?]
|
|
person_str = re.sub(r"\[\?\]\s*", "", person_str)
|
|
is_other = purpose_str.lower().startswith("other:")
|
|
|
|
if person_str and purpose_str:
|
|
# We have pre-matched data (either from script or manual)
|
|
# Support multiple people/months in the comma-separated string
|
|
matched_members = [(p.strip(), "auto") for p in person_str.split(",") if p.strip()]
|
|
matched_months = [purpose_str] if is_other else [m.strip() for m in purpose_str.split(",") if m.strip()]
|
|
|
|
# Use Inferred Amount if available, otherwise bank Amount
|
|
amount = tx.get("inferred_amount")
|
|
if amount is None or amount == "":
|
|
amount = tx.get("amount", 0)
|
|
try:
|
|
amount = float(amount)
|
|
except (ValueError, TypeError):
|
|
amount = 0
|
|
else:
|
|
# Fallback to inference (for rows not yet processed by infer_payments.py)
|
|
inference = infer_transaction_details(tx, member_names)
|
|
matched_members = inference["members"]
|
|
matched_months = inference["months"]
|
|
amount = tx.get("amount", 0)
|
|
try:
|
|
amount = float(amount)
|
|
except (ValueError, TypeError):
|
|
amount = 0
|
|
|
|
if not matched_members or not matched_months:
|
|
unmatched.append(tx)
|
|
continue
|
|
|
|
# Allocate payment across matched members and months
|
|
if is_other:
|
|
num_allocations = len(matched_members)
|
|
per_allocation = amount / num_allocations if num_allocations > 0 else 0
|
|
for member_name, confidence in matched_members:
|
|
if member_name in other_ledger:
|
|
other_ledger[member_name].append({
|
|
"amount": per_allocation,
|
|
"date": tx["date"],
|
|
"sender": tx["sender"],
|
|
"message": tx["message"],
|
|
"purpose": purpose_str,
|
|
"confidence": confidence,
|
|
})
|
|
continue
|
|
|
|
num_allocations = len(matched_members) * len(matched_months)
|
|
per_allocation = amount / num_allocations if num_allocations > 0 else 0
|
|
|
|
for member_name, confidence in matched_members:
|
|
# If we matched via sheet 'Person' column, name might be partial or have markers
|
|
# but usually it's the exact member name from get_members_with_fees.
|
|
# Let's ensure it exists in our ledger.
|
|
if member_name not in ledger:
|
|
# Try matching by base name if it was Jan Novak (Kačerr) etc.
|
|
pass
|
|
|
|
for month_key in matched_months:
|
|
entry = {
|
|
"amount": per_allocation,
|
|
"date": tx["date"],
|
|
"sender": tx["sender"],
|
|
"message": tx["message"],
|
|
"confidence": confidence,
|
|
}
|
|
if month_key in ledger.get(member_name, {}):
|
|
ledger[member_name][month_key]["paid"] += per_allocation
|
|
ledger[member_name][month_key]["transactions"].append(entry)
|
|
else:
|
|
# Future month — track as credit
|
|
credits[member_name] = credits.get(member_name, 0) + int(per_allocation)
|
|
|
|
# Calculate final total balances (window + off-window credits)
|
|
final_balances: dict[str, int] = {}
|
|
for name in member_names:
|
|
window_balance = sum(
|
|
int(mdata["paid"]) - (mdata["expected"] if isinstance(mdata["expected"], int) else 0)
|
|
for mdata in ledger[name].values()
|
|
)
|
|
final_balances[name] = window_balance + credits.get(name, 0)
|
|
|
|
return {
|
|
"members": {
|
|
name: {
|
|
"tier": member_tiers[name],
|
|
"months": ledger[name],
|
|
"other_transactions": other_ledger[name],
|
|
"total_balance": final_balances[name]
|
|
}
|
|
for name in member_names
|
|
},
|
|
"unmatched": unmatched,
|
|
"credits": final_balances, # Redefine credits as any positive total balance
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Report output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_report(result: dict, sorted_months: list[str]):
|
|
month_labels = {
|
|
m: datetime.strptime(m, "%Y-%m").strftime("%b %Y") for m in sorted_months
|
|
}
|
|
|
|
# --- Per-member breakdown (adults only) ---
|
|
print("=" * 80)
|
|
print("PAYMENT RECONCILIATION REPORT")
|
|
print("=" * 80)
|
|
|
|
adults = {
|
|
name: data
|
|
for name, data in result["members"].items()
|
|
if data["tier"] == "A"
|
|
}
|
|
|
|
total_expected = 0
|
|
total_paid = 0
|
|
|
|
# Summary table
|
|
name_width = max((len(n) for n in adults), default=20)
|
|
header = f"{'Member':<{name_width}}"
|
|
for m in sorted_months:
|
|
header += f" | {month_labels[m]:>10}"
|
|
header += " | {'Balance':>10}"
|
|
print(f"\n{'Member':<{name_width}}", end="")
|
|
for m in sorted_months:
|
|
print(f" | {month_labels[m]:>10}", end="")
|
|
print(f" | {'Balance':>10}")
|
|
print("-" * (name_width + (len(sorted_months) + 1) * 13))
|
|
|
|
for name in sorted(adults.keys()):
|
|
data = adults[name]
|
|
line = f"{name:<{name_width}}"
|
|
member_balance = 0
|
|
for m in sorted_months:
|
|
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
|
|
expected = mdata["expected"]
|
|
original = mdata["original_expected"]
|
|
paid = int(mdata["paid"])
|
|
total_expected += expected
|
|
total_paid += paid
|
|
|
|
cell_status = ""
|
|
if expected == 0 and paid == 0:
|
|
cell = "-"
|
|
elif paid >= expected and expected > 0:
|
|
cell = "OK"
|
|
elif paid > 0:
|
|
cell = f"{paid}/{expected}"
|
|
else:
|
|
cell = f"UNPAID {expected}"
|
|
|
|
member_balance += paid - expected
|
|
line += f" | {cell:>10}"
|
|
balance_str = f"{member_balance:+d}" if member_balance != 0 else "0"
|
|
line += f" | {balance_str:>10}"
|
|
print(line)
|
|
|
|
print("-" * (name_width + (len(sorted_months) + 1) * 13))
|
|
print(f"{'TOTAL':<{name_width}}", end="")
|
|
for _ in sorted_months:
|
|
print(f" | {'':>10}", end="")
|
|
balance = total_paid - total_expected
|
|
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
|
|
|
|
# --- Credits (Total Surplus) ---
|
|
all_credits = {
|
|
name: data["total_balance"]
|
|
for name, data in result["members"].items()
|
|
if data["total_balance"] > 0
|
|
}
|
|
|
|
if all_credits:
|
|
print(f"\n{'TOTAL CREDITS (advance payments or surplus):'}")
|
|
for name, amount in sorted(all_credits.items()):
|
|
print(f" {name}: {amount} CZK")
|
|
|
|
# --- Debts (Missing Payments) ---
|
|
all_debts = {
|
|
name: data["total_balance"]
|
|
for name, data in result["members"].items()
|
|
if data["total_balance"] < 0
|
|
}
|
|
|
|
if all_debts:
|
|
print(f"\n{'TOTAL DEBTS (missing payments):'}")
|
|
for name, amount in sorted(all_debts.items()):
|
|
print(f" {name}: {abs(amount)} CZK")
|
|
|
|
# --- Unmatched transactions ---
|
|
if result["unmatched"]:
|
|
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
|
|
print(f" {'Date':<12} {'Amount':>10} {'Sender':<30} {'Message'}")
|
|
print(f" {'-'*12} {'-'*10} {'-'*30} {'-'*30}")
|
|
for tx in result["unmatched"]:
|
|
print(
|
|
f" {tx['date']:<12} {tx['amount']:>10.0f} "
|
|
f"{tx['sender']:<30} {tx['message']}"
|
|
)
|
|
|
|
# --- Detailed matched transactions ---
|
|
print(f"\n{'MATCHED TRANSACTION DETAILS':}")
|
|
for name in sorted(adults.keys()):
|
|
data = adults[name]
|
|
has_payments = any(
|
|
data["months"].get(m, {}).get("transactions")
|
|
for m in sorted_months
|
|
)
|
|
if not has_payments:
|
|
continue
|
|
print(f"\n {name}:")
|
|
for m in sorted_months:
|
|
mdata = data["months"].get(m, {})
|
|
for tx in mdata.get("transactions", []):
|
|
conf = " [REVIEW]" if tx["confidence"] == "review" else ""
|
|
print(
|
|
f" {month_labels[m]}: {tx['amount']:.0f} CZK "
|
|
f"from {tx['sender']} — \"{tx['message']}\"{conf}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Match bank payments against expected attendance fees."
|
|
)
|
|
parser.add_argument(
|
|
"--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID"
|
|
)
|
|
parser.add_argument(
|
|
"--credentials", default=".secret/fuj-management-bot-credentials.json",
|
|
help="Path to Google API credentials JSON"
|
|
)
|
|
parser.add_argument(
|
|
"--bank", action="store_true", help="Scrape bank instead of using Sheet data"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print(f"Fetching attendance data...")
|
|
members, sorted_months = get_members_with_fees()
|
|
if not members:
|
|
print("No attendance data found.")
|
|
return
|
|
|
|
if args.bank:
|
|
print(f"Fetching transactions from Fio bank ({args.date_from} to {args.date_to})...")
|
|
from fio_utils import fetch_transactions
|
|
transactions = fetch_transactions(args.date_from, args.date_to)
|
|
else:
|
|
print(f"Fetching transactions from Google Sheet ({args.sheet_id})...")
|
|
transactions = fetch_sheet_data(args.sheet_id, args.credentials)
|
|
|
|
print(f"Processing {len(transactions)} transactions.\n")
|
|
|
|
exceptions = fetch_exceptions(args.sheet_id, args.credentials)
|
|
if exceptions:
|
|
print(f"Loaded {len(exceptions)} fee exceptions.")
|
|
|
|
result = reconcile(members, sorted_months, transactions, exceptions)
|
|
print_report(result, sorted_months)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|