Files
fuj-management/app.py
Jan Novak 394da2e6b8
Some checks failed
Deploy to K8s / deploy (push) Successful in 11s
Build and Push / build (push) Successful in 6s
Build and Push / build-go (push) Failing after 6s
fix: Tolerate diacritic/case/whitespace mismatches in Person column matching
- Add canonical_member_key() in match_payments.py to normalize names via
  NFKD + lowercase + whitespace-collapse before ledger lookup; resolves
  payments attributed to e.g. "Maria Maco" to canonical "Mária Maco".
  Emits logger.info when a non-canonical cell is rescued so sheet typos
  are visible in logs without losing the payment allocation.
- Extend group_payments_by_person() in app.py to accept member_names and
  re-key raw-payment groups under the canonical attendance-sheet name so
  the modal's Raw Payments debug section also finds the row correctly.
- Add raw payments collapsible section to member detail modal in adults.html
  and juniors.html for debugging payment attribution issues.
- Remove 4 obsolete tests targeting routes /fees, /fees-juniors, /reconcile,
  /reconcile-juniors that no longer exist; add test_match_payments.py
  covering canonical key equivalence and reconcile() tolerance end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-05 17:22:54 +02:00

627 lines
24 KiB
Python

import sys
from pathlib import Path
from datetime import datetime
import re
import time
import os
import io
import qrcode
import logging
from flask import Flask, render_template, g, send_file, request
# Configure logging, allowing override via LOG_LEVEL environment variable
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s:%(filename)s:%(lineno)d [%(funcName)s] - %(levelname)s - %(message)s')
# Add scripts directory to path to allow importing from it
scripts_dir = Path(__file__).parent / "scripts"
sys.path.append(str(scripts_dir))
from config import (
ATTENDANCE_SHEET_ID, PAYMENTS_SHEET_ID, JUNIOR_SHEET_GID,
BANK_ACCOUNT, CREDENTIALS_PATH,
)
from attendance import get_members_with_fees, get_junior_members_with_fees, ADULT_MERGED_MONTHS, JUNIOR_MERGED_MONTHS
from match_payments import reconcile, fetch_sheet_data, fetch_exceptions, normalize, canonical_member_key
from cache_utils import get_sheet_modified_time, read_cache, write_cache, _LAST_CHECKED, flush_cache
from sync_fio_to_sheets import sync_to_sheets
from infer_payments import infer_payments
def get_cached_data(cache_key, sheet_id, fetch_func, *args, serialize=None, deserialize=None, **kwargs):
mod_time = get_sheet_modified_time(cache_key)
if mod_time:
cached = read_cache(cache_key, mod_time)
if cached is not None:
return deserialize(cached) if deserialize else cached
data = fetch_func(*args, **kwargs)
if mod_time:
write_cache(cache_key, mod_time, serialize(data) if serialize else data)
return data
def get_month_labels(sorted_months, merged_months):
labels = {}
for m in sorted_months:
dt = datetime.strptime(m, "%Y-%m")
# Find which months were merged into m (e.g. 2026-01 is merged into 2026-02)
merged_in = sorted([k for k, v in merged_months.items() if v == m])
if merged_in:
all_dts = [datetime.strptime(x, "%Y-%m") for x in sorted(merged_in + [m])]
years = {d.year for d in all_dts}
if len(years) > 1:
parts = [d.strftime("%b %Y") for d in all_dts]
labels[m] = "+".join(parts)
else:
parts = [d.strftime("%b") for d in all_dts]
labels[m] = f"{'+'.join(parts)} {dt.strftime('%Y')}"
else:
labels[m] = dt.strftime("%b %Y")
return labels
def group_payments_by_person(transactions, member_names=None):
canonical_by_key = (
{canonical_member_key(n): n for n in member_names} if member_names else {}
)
grouped = {}
for tx in transactions:
person = str(tx.get("person", "")).strip()
if not person:
continue
for p in person.split(","):
p = re.sub(r"\[\?\]\s*", "", p).strip()
if not p:
continue
key = canonical_by_key.get(canonical_member_key(p), p)
grouped.setdefault(key, []).append(tx)
for rows in grouped.values():
rows.sort(key=lambda t: str(t.get("date", "")), reverse=True)
return grouped
def warmup_cache():
"""Pre-fetch all cached data so first request is fast."""
logger = logging.getLogger(__name__)
logger.info("Warming up cache...")
credentials_path = CREDENTIALS_PATH
get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
get_cached_data("exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
logger.info("Cache warmup complete.")
app = Flask(__name__)
import json as _json
_meta_path = Path(__file__).parent / "build_meta.json"
BUILD_META = _json.loads(_meta_path.read_text()) if _meta_path.exists() else {
"tag": "dev", "commit": "local", "build_date": ""
}
warmup_cache()
@app.before_request
def start_timer():
g.start_time = time.perf_counter()
g.steps = []
def record_step(name):
g.steps.append((name, time.perf_counter()))
@app.context_processor
def inject_render_time():
def get_render_time():
total = time.perf_counter() - g.start_time
breakdown = []
last_time = g.start_time
for name, timestamp in g.steps:
duration = timestamp - last_time
breakdown.append(f"{name}:{duration:.3f}s")
last_time = timestamp
# Add remaining time as 'render'
render_duration = time.perf_counter() - last_time
breakdown.append(f"render:{render_duration:.3f}s")
return {
"total": f"{total:.3f}",
"breakdown": " | ".join(breakdown)
}
return dict(get_render_time=get_render_time, build_meta=BUILD_META)
@app.route("/")
def index():
# Redirect root to /adults for convenience while there are no other apps
return '<meta http-equiv="refresh" content="0; url=/adults" />'
@app.route("/flush-cache", methods=["GET", "POST"])
def flush_cache_endpoint():
if request.method == "GET":
return render_template("flush-cache.html")
deleted = flush_cache()
return render_template("flush-cache.html", flushed=True, deleted=deleted)
@app.route("/sync-bank")
def sync_bank():
import contextlib
output = io.StringIO()
success = True
try:
with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output):
# sync_to_sheets: equivalent of make sync-2026
output.write("=== Syncing Fio transactions (2026) ===\n")
sync_to_sheets(
spreadsheet_id=PAYMENTS_SHEET_ID,
credentials_path=CREDENTIALS_PATH,
date_from_str="2026-01-01",
date_to_str="2026-12-31",
sort_by_date=True,
)
output.write("\n=== Inferring payment details ===\n")
infer_payments(PAYMENTS_SHEET_ID, CREDENTIALS_PATH)
output.write("\n=== Flushing cache ===\n")
deleted = flush_cache()
output.write(f"Deleted {deleted} cache files.\n")
output.write("\n=== Done ===\n")
except Exception as e:
import traceback
output.write(f"\n!!! Error: {e}\n")
output.write(traceback.format_exc())
success = False
return render_template("sync.html", output=output.getvalue(), success=success)
@app.route("/version")
def version():
return BUILD_META
@app.route("/adults")
def adults_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
members_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
record_step("fetch_members")
if not members_data:
return "No data."
members, sorted_months = members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
result = reconcile(members, sorted_months, transactions, exceptions)
record_step("reconcile")
month_labels = get_month_labels(sorted_months, ADULT_MERGED_MONTHS)
adult_names = sorted([name for name, tier, _ in members if tier == "A"])
current_month = datetime.now().strftime("%Y-%m")
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in adult_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK ({count})" if count > 0 else f"{override_amount} ({original_expected}) CZK"
else:
is_overridden = False
fee_display = f"{expected} CZK ({count})" if count > 0 else f"{expected} CZK"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected > 0:
amount_to_pay = max(0, expected - paid)
if paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
else:
cell_text = "-"
amount_to_pay = 0
if expected > 0 or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
# Balance = sum of (paid - expected) for past months only; current/future months ignored.
settled_balance = 0
for m, mdata in data["months"].items():
if m >= current_month:
continue
exp = mdata.get("expected", 0)
if isinstance(exp, int):
settled_balance += int(mdata.get("paid", 0)) - exp
payable_amount = max(0, -settled_balance)
row["unpaid_periods"] = ", ".join(unpaid_months)
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = settled_balance
row["payable_amount"] = payable_amount
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
def settled_balance(name):
data = result["members"][name]
total = 0
for m, mdata in data["months"].items():
if m >= current_month:
continue
exp = mdata.get("expected", 0)
if isinstance(exp, int):
total += int(mdata.get("paid", 0)) - exp
return total
credits = sorted([{"name": n, "amount": settled_balance(n)} for n in adult_names if settled_balance(n) > 0], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(settled_balance(n))} for n in adult_names if settled_balance(n) < 0], key=lambda x: x["name"])
unmatched = result["unmatched"]
import json
raw_payments_by_person = group_payments_by_person(transactions, [name for name, _, _ in members])
record_step("process_data")
return render_template(
"adults.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
raw_payments_json=json.dumps(raw_payments_by_person),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT,
current_month=current_month
)
@app.route("/juniors")
def juniors_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit#gid={JUNIOR_SHEET_GID}"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
junior_members_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
record_step("fetch_junior_members")
if not junior_members_data:
return "No data."
junior_members, sorted_months = junior_members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
# Adapt junior tuple format (name, tier, {month: (fee, total_count, adult_count, junior_count)})
# to what match_payments expects: (name, tier, {month: (expected_fee, attendance_count)})
adapted_members = []
for name, tier, fees_dict in junior_members:
adapted_fees = {}
for m, fee_data in fees_dict.items():
if len(fee_data) == 4:
fee, total_count, _, _ = fee_data
adapted_fees[m] = (fee, total_count)
else:
fee, count = fee_data
adapted_fees[m] = (fee, count)
adapted_members.append((name, tier, adapted_fees))
result = reconcile(adapted_members, sorted_months, transactions, exceptions)
record_step("reconcile")
# Format month labels
month_labels = get_month_labels(sorted_months, JUNIOR_MERGED_MONTHS)
junior_names = sorted([name for name, tier, _ in adapted_members])
junior_members_dict = {name: fees_dict for name, _, fees_dict in junior_members}
current_month = datetime.now().strftime("%Y-%m")
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in junior_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
if expected != "?" and isinstance(expected, int):
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
orig_fee_data = junior_members_dict.get(name, {}).get(m)
adult_count = 0
junior_count = 0
if orig_fee_data and len(orig_fee_data) == 4:
_, _, adult_count, junior_count = orig_fee_data
breakdown = ""
if adult_count > 0 and junior_count > 0:
breakdown = f":{junior_count}J,{adult_count}A"
elif junior_count > 0:
breakdown = f":{junior_count}J"
elif adult_count > 0:
breakdown = f":{adult_count}A"
count_str = f" ({count}{breakdown})" if count > 0 else ""
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK{count_str}"
else:
is_overridden = False
fee_display = f"{expected} CZK{count_str}"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected == "?" or (isinstance(expected, int) and expected > 0):
if expected == "?":
status = "empty"
cell_text = f"?{count_str}"
elif paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
amount_to_pay = expected - paid
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
amount_to_pay = expected
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
if (isinstance(expected, int) and expected > 0) or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
# Balance = sum of (paid - expected) for past months only; current/future months ignored.
settled_balance = 0
for m, mdata in data["months"].items():
if m >= current_month:
continue
exp = mdata.get("expected", 0)
if isinstance(exp, int):
settled_balance += int(mdata.get("paid", 0)) - exp
payable_amount = max(0, -settled_balance)
row["unpaid_periods"] = ", ".join(unpaid_months)
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = settled_balance
row["payable_amount"] = payable_amount
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
# Format credits and debts
def junior_settled_balance(name):
data = result["members"][name]
total = 0
for m, mdata in data["months"].items():
if m >= current_month:
continue
exp = mdata.get("expected", 0)
if isinstance(exp, int):
total += int(mdata.get("paid", 0)) - exp
return total
junior_all_names = [name for name, _, _ in adapted_members]
credits = sorted([{"name": n, "amount": junior_settled_balance(n)} for n in junior_all_names if junior_settled_balance(n) > 0], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(junior_settled_balance(n))} for n in junior_all_names if junior_settled_balance(n) < 0], key=lambda x: x["name"])
unmatched = result["unmatched"]
raw_payments_by_person = group_payments_by_person(transactions, [name for name, _, _ in adapted_members])
import json
record_step("process_data")
return render_template(
"juniors.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
raw_payments_json=json.dumps(raw_payments_by_person),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT,
current_month=current_month
)
@app.route("/payments")
def payments():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
adults_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
juniors_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
member_names = []
if adults_data:
member_names.extend(name for name, _, _ in adults_data[0])
if juniors_data:
member_names.extend(name for name, _, _ in juniors_data[0])
grouped = group_payments_by_person(transactions, member_names)
# payments page also groups unmatched rows under a fallback key
for tx in transactions:
if not str(tx.get("person", "")).strip():
grouped.setdefault("Unmatched / Unknown", []).append(tx)
for rows in grouped.values():
rows.sort(key=lambda t: str(t.get("date", "")), reverse=True)
sorted_people = sorted(grouped.keys())
record_step("process_data")
return render_template(
"payments.html",
grouped_payments=grouped,
sorted_people=sorted_people,
attendance_url=attendance_url,
payments_url=payments_url
)
@app.route("/qr")
def qr_code():
account = request.args.get("account", BANK_ACCOUNT)
amount = request.args.get("amount", "0")
message = request.args.get("message", "")
# Validate account: allow IBAN (letters+digits) or Czech format (digits/digits)
if not re.match(r'^[A-Z]{2}\d{2,34}$|^\d{1,16}/\d{4}$', account):
account = BANK_ACCOUNT
# QR Platba standard: SPD*1.0*ACC:accountNumber*BC:bankCode*AM:amount*CC:CZK*MSG:message
acc_parts = account.split('/')
if len(acc_parts) == 2:
acc_str = f"{acc_parts[0]}*BC:{acc_parts[1]}"
else:
acc_str = account
try:
amt_val = float(amount)
if amt_val < 0 or amt_val > 10_000_000:
amt_val = 0
amt_str = f"{amt_val:.2f}"
except ValueError:
amt_str = "0.00"
# Message max 60 characters, strip SPD delimiters to prevent injection
msg_str = message[:60].replace("*", "")
qr_data = f"SPD*1.0*ACC:{acc_str}*AM:{amt_str}*CC:CZK*MSG:{msg_str}"
img = qrcode.make(qr_data)
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return send_file(buf, mimetype='image/png')
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5001)