Files
fuj-management/scripts/match_payments.py
Jan Novak 3bfea4e0a4 feat: initial dashboard implementation and robust attendance parsing
- Added a Makefile to easily run project scripts (fees, match, web, image)
- Modified attendance.py to dynamically handle a variable number of header rows from the Google Sheet
- Updated both attendance calculations and calculate_fees terminal output to show actual attendance counts (e.g., '750 CZK (3)')
- Created a Flask web dashboard (app.py and templates/fees.html) to view member fees in an attractive, condensed, terminal-like UI
- Bound the Flask server to port 5000 and added a routing alias from '/' to '/fees'
- Configured Python virtual environment (.venv) creation directly into the Makefile to resolve global pip install errors on macOS

Co-authored-by: Antigravity <antigravity@deepmind.com>
2026-02-27 13:20:42 +01:00

528 lines
17 KiB
Python

#!/usr/bin/env python3
"""Match Fio bank payments against expected attendance fees."""
import argparse
import json
import os
import re
import urllib.request
from datetime import datetime, timedelta
from html.parser import HTMLParser
from attendance import get_members_with_fees
from czech_utils import normalize, parse_month_references
# ---------------------------------------------------------------------------
# Transaction fetching
# ---------------------------------------------------------------------------
class _FioTableParser(HTMLParser):
"""Parse the second <table class="table"> on the Fio transparent page.
Columns: Datum | Částka | Typ | Název protiúčtu | Zpráva pro příjemce | KS | VS | SS | Poznámka
Indices: 0 1 2 3 4 5 6 7 8
"""
def __init__(self):
super().__init__()
self._table_count = 0
self._in_target_table = False
self._in_thead = False
self._in_row = False
self._in_cell = False
self._current_row: list[str] = []
self._rows: list[list[str]] = []
self._cell_text = ""
def handle_starttag(self, tag, attrs):
cls = dict(attrs).get("class", "")
if tag == "table" and "table" in cls.split():
self._table_count += 1
if self._table_count == 2:
self._in_target_table = True
if self._in_target_table:
if tag == "thead":
self._in_thead = True
if tag == "tr" and not self._in_thead:
self._in_row = True
self._current_row = []
if self._in_row and tag in ("td", "th"):
self._in_cell = True
self._cell_text = ""
def handle_endtag(self, tag):
if self._in_cell and tag in ("td", "th"):
self._in_cell = False
self._current_row.append(self._cell_text.strip())
if tag == "thead":
self._in_thead = False
if self._in_row and tag == "tr":
self._in_row = False
if self._current_row:
self._rows.append(self._current_row)
if tag == "table" and self._in_target_table:
self._in_target_table = False
def handle_data(self, data):
if self._in_cell:
self._cell_text += data
def get_rows(self) -> list[list[str]]:
return self._rows
# Fio transparent table column indices
_COL_DATE = 0
_COL_AMOUNT = 1
_COL_SENDER = 3
_COL_MESSAGE = 4
_COL_KS = 5
_COL_VS = 6
_COL_SS = 7
_COL_NOTE = 8
def _parse_czech_amount(s: str) -> float | None:
"""Parse '1 500,00 CZK' to float."""
s = s.replace("\xa0", "").replace(" ", "").replace(",", ".")
s = re.sub(r"[A-Za-z]+", "", s).strip()
try:
return float(s)
except ValueError:
return None
def _parse_czech_date(s: str) -> str | None:
"""Parse 'DD.MM.YYYY' to 'YYYY-MM-DD'."""
s = s.strip()
for fmt in ("%d.%m.%Y", "%d/%m/%Y"):
try:
return datetime.strptime(s, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return None
def fetch_transactions_transparent(
date_from: str, date_to: str
) -> list[dict]:
"""Fetch transactions from Fio transparent account HTML page.
Args:
date_from: D.M.YYYY format
date_to: D.M.YYYY format
"""
url = (
f"https://ib.fio.cz/ib/transparent?a=2800359168"
f"&f={date_from}&t={date_to}"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8")
parser = _FioTableParser()
parser.feed(html)
rows = parser.get_rows()
transactions = []
for row in rows:
if len(row) < 5:
continue
def col(i):
return row[i].strip() if i < len(row) else ""
date_str = _parse_czech_date(col(_COL_DATE))
amount = _parse_czech_amount(col(_COL_AMOUNT))
if date_str is None or amount is None or amount <= 0:
continue
transactions.append({
"date": date_str,
"amount": amount,
"sender": col(_COL_SENDER),
"message": col(_COL_MESSAGE),
"vs": col(_COL_VS),
})
return transactions
def fetch_transactions_api(
token: str, date_from: str, date_to: str
) -> list[dict]:
"""Fetch transactions via Fio REST API (JSON).
Args:
token: Fio API token
date_from: YYYY-MM-DD format
date_to: YYYY-MM-DD format
"""
url = (
f"https://fioapi.fio.cz/v1/rest/periods/{token}"
f"/{date_from}/{date_to}/transactions.json"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode("utf-8"))
transactions = []
tx_list = data.get("accountStatement", {}).get("transactionList", {})
for tx in (tx_list.get("transaction") or []):
# Each field is {"value": ..., "name": ..., "id": ...} or null
def val(col_id):
col = tx.get(f"column{col_id}")
return col["value"] if col else ""
amount = float(val(1) or 0)
if amount <= 0:
continue # Skip outgoing
date_raw = val(0) or ""
# API returns date as "YYYY-MM-DD+HHMM" or ISO format
date_str = date_raw[:10] if date_raw else ""
transactions.append({
"date": date_str,
"amount": amount,
"sender": str(val(10) or ""), # column10 = sender name
"message": str(val(16) or ""), # column16 = message for recipient
"vs": str(val(5) or ""), # column5 = VS
"user_id": str(val(7) or ""), # column7 = user identification
"sender_account": str(val(2) or ""), # column2 = sender account
})
return transactions
def fetch_transactions(date_from: str, date_to: str) -> list[dict]:
"""Fetch transactions, using API if token available, else transparent page."""
token = os.environ.get("FIO_API_TOKEN", "").strip()
if token:
return fetch_transactions_api(token, date_from, date_to)
# Convert YYYY-MM-DD to DD.MM.YYYY for the transparent page URL
from_dt = datetime.strptime(date_from, "%Y-%m-%d")
to_dt = datetime.strptime(date_to, "%Y-%m-%d")
return fetch_transactions_transparent(
from_dt.strftime("%-d.%-m.%Y"),
to_dt.strftime("%-d.%-m.%Y"),
)
# ---------------------------------------------------------------------------
# Name matching
# ---------------------------------------------------------------------------
def _build_name_variants(name: str) -> list[str]:
"""Build searchable name variants from a member name.
E.g. 'František Vrbík (Štrúdl)' → ['frantisek vrbik', 'strudl', 'vrbik']
"""
# Extract nickname from parentheses
nickname_match = re.search(r"\(([^)]+)\)", name)
nickname = nickname_match.group(1) if nickname_match else ""
# Base name without nickname
base = re.sub(r"\s*\([^)]*\)\s*", " ", name).strip()
normalized_base = normalize(base)
normalized_nick = normalize(nickname)
variants = [normalized_base]
if normalized_nick:
variants.append(normalized_nick)
# Also add last name alone (for matching in messages)
parts = normalized_base.split()
if len(parts) >= 2:
variants.append(parts[-1]) # last name
variants.append(parts[0]) # first name
return [v for v in variants if len(v) >= 3]
def match_members(
text: str, member_names: list[str]
) -> list[tuple[str, str]]:
"""Find members mentioned in text.
Returns list of (member_name, confidence) where confidence is 'auto' or 'review'.
"""
normalized_text = normalize(text)
matches = []
for name in member_names:
variants = _build_name_variants(name)
# Full name match = high confidence
full_name = variants[0] if variants else ""
if full_name and full_name in normalized_text:
matches.append((name, "auto"))
continue
# Last name + first name both present = high confidence
parts = full_name.split()
if len(parts) >= 2:
if parts[0] in normalized_text and parts[-1] in normalized_text:
matches.append((name, "auto"))
continue
# Nickname match = high confidence
if len(variants) > 1 and variants[1] in normalized_text:
matches.append((name, "auto"))
continue
# Last name only = lower confidence, but skip very common Czech surnames
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
if (
len(parts) >= 2
and len(parts[-1]) >= 4
and parts[-1] not in _COMMON_SURNAMES
and parts[-1] in normalized_text
):
matches.append((name, "review"))
continue
return matches
# ---------------------------------------------------------------------------
# Reconciliation
# ---------------------------------------------------------------------------
def reconcile(
members: list[tuple[str, str, dict[str, int]]],
sorted_months: list[str],
transactions: list[dict],
) -> dict:
"""Match transactions to members and months.
Returns a dict with:
- 'members': {name: {'tier': str, 'months': {YYYY-MM: {'expected': int, 'paid': int, 'transactions': list}}}}
- 'unmatched': list of transactions that couldn't be matched
- 'credits': {name: int} — excess payments tracked as credit
"""
member_names = [name for name, _, _ in members]
member_tiers = {name: tier for name, tier, _ in members}
member_fees = {name: {m: fee for m, (fee, _) in fees.items()} for name, _, fees in members}
# Initialize ledger
ledger: dict[str, dict[str, dict]] = {}
for name in member_names:
ledger[name] = {}
for m in sorted_months:
ledger[name][m] = {
"expected": member_fees[name].get(m, 0),
"paid": 0,
"transactions": [],
}
unmatched = []
credits: dict[str, int] = {}
for tx in transactions:
# Combine sender + message for searching
search_text = f"{tx['sender']} {tx['message']} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx["message"] + " " + tx.get("user_id", "")
)
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx["sender"], member_names)
if not matched_members:
unmatched.append(tx)
continue
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx["date"]
if tx_date:
try:
dt = datetime.strptime(tx_date, "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except ValueError:
pass
if not matched_months:
unmatched.append(tx)
continue
# Allocate payment across matched members and months
num_allocations = len(matched_members) * len(matched_months)
per_allocation = tx["amount"] / num_allocations if num_allocations > 0 else 0
for member_name, confidence in matched_members:
for month_key in matched_months:
entry = {
"amount": per_allocation,
"date": tx["date"],
"sender": tx["sender"],
"message": tx["message"],
"confidence": confidence,
}
if month_key in ledger.get(member_name, {}):
ledger[member_name][month_key]["paid"] += per_allocation
ledger[member_name][month_key]["transactions"].append(entry)
else:
# Future month — track as credit
credits[member_name] = credits.get(member_name, 0) + int(per_allocation)
return {
"members": {
name: {
"tier": member_tiers[name],
"months": ledger[name],
}
for name in member_names
},
"unmatched": unmatched,
"credits": credits,
}
# ---------------------------------------------------------------------------
# Report output
# ---------------------------------------------------------------------------
def print_report(result: dict, sorted_months: list[str]):
month_labels = {
m: datetime.strptime(m, "%Y-%m").strftime("%b %Y") for m in sorted_months
}
# --- Per-member breakdown (adults only) ---
print("=" * 80)
print("PAYMENT RECONCILIATION REPORT")
print("=" * 80)
adults = {
name: data
for name, data in result["members"].items()
if data["tier"] == "A"
}
total_expected = 0
total_paid = 0
# Summary table
name_width = max((len(n) for n in adults), default=20)
header = f"{'Member':<{name_width}}"
for m in sorted_months:
header += f" | {month_labels[m]:>10}"
header += " | {'Balance':>10}"
print(f"\n{'Member':<{name_width}}", end="")
for m in sorted_months:
print(f" | {month_labels[m]:>10}", end="")
print(f" | {'Balance':>10}")
print("-" * (name_width + (len(sorted_months) + 1) * 13))
for name in sorted(adults.keys()):
data = adults[name]
line = f"{name:<{name_width}}"
member_balance = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
expected = mdata["expected"]
paid = int(mdata["paid"])
total_expected += expected
total_paid += paid
if expected == 0 and paid == 0:
cell = "-"
elif paid >= expected and expected > 0:
cell = "OK"
elif paid > 0:
cell = f"{paid}/{expected}"
else:
cell = f"UNPAID {expected}"
member_balance += paid - expected
line += f" | {cell:>10}"
balance_str = f"{member_balance:+d}" if member_balance != 0 else "0"
line += f" | {balance_str:>10}"
print(line)
print("-" * (name_width + (len(sorted_months) + 1) * 13))
print(f"{'TOTAL':<{name_width}}", end="")
for _ in sorted_months:
print(f" | {'':>10}", end="")
balance = total_paid - total_expected
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
# --- Credits ---
if result["credits"]:
print(f"\n{'CREDITS (advance payments for future months)':}")
for name, amount in sorted(result["credits"].items()):
print(f" {name}: {amount} CZK")
# --- Unmatched transactions ---
if result["unmatched"]:
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
print(f" {'Date':<12} {'Amount':>10} {'Sender':<30} {'Message'}")
print(f" {'-'*12} {'-'*10} {'-'*30} {'-'*30}")
for tx in result["unmatched"]:
print(
f" {tx['date']:<12} {tx['amount']:>10.0f} "
f"{tx['sender']:<30} {tx['message']}"
)
# --- Detailed matched transactions ---
print(f"\n{'MATCHED TRANSACTION DETAILS':}")
for name in sorted(adults.keys()):
data = adults[name]
has_payments = any(
data["months"].get(m, {}).get("transactions")
for m in sorted_months
)
if not has_payments:
continue
print(f"\n {name}:")
for m in sorted_months:
mdata = data["months"].get(m, {})
for tx in mdata.get("transactions", []):
conf = " [REVIEW]" if tx["confidence"] == "review" else ""
print(
f" {month_labels[m]}: {tx['amount']:.0f} CZK "
f"from {tx['sender']}\"{tx['message']}\"{conf}"
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Match bank payments against expected attendance fees."
)
parser.add_argument(
"--from", dest="date_from", default="2025-12-01",
help="Start date YYYY-MM-DD (default: 2025-12-01)",
)
parser.add_argument(
"--to", dest="date_to",
default=datetime.now().strftime("%Y-%m-%d"),
help="End date YYYY-MM-DD (default: today)",
)
args = parser.parse_args()
print(f"Fetching attendance data...")
members, sorted_months = get_members_with_fees()
if not members:
print("No attendance data found.")
return
print(f"Fetching transactions from {args.date_from} to {args.date_to}...")
transactions = fetch_transactions(args.date_from, args.date_to)
print(f"Found {len(transactions)} incoming transactions.\n")
result = reconcile(members, sorted_months, transactions)
print_report(result, sorted_months)
if __name__ == "__main__":
main()