Files
fuj-management/app.py
Jan Novak 1ac5df7be5 chore: Remove archived pages (fees, reconcile) from web UI
Deleted /fees, /fees-juniors, /reconcile, /reconcile-juniors routes and
their templates. Payment Ledger (/payments) is retained. Nav updated
across all remaining templates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 12:48:15 +02:00

556 lines
22 KiB
Python

import sys
from pathlib import Path
from datetime import datetime
import re
import time
import os
import io
import qrcode
import logging
from flask import Flask, render_template, g, send_file, request
# Configure logging, allowing override via LOG_LEVEL environment variable
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s:%(filename)s:%(lineno)d [%(funcName)s] - %(levelname)s - %(message)s')
# Add scripts directory to path to allow importing from it
scripts_dir = Path(__file__).parent / "scripts"
sys.path.append(str(scripts_dir))
from config import (
ATTENDANCE_SHEET_ID, PAYMENTS_SHEET_ID, JUNIOR_SHEET_GID,
BANK_ACCOUNT, CREDENTIALS_PATH,
)
from attendance import get_members_with_fees, get_junior_members_with_fees, ADULT_MERGED_MONTHS, JUNIOR_MERGED_MONTHS
from match_payments import reconcile, fetch_sheet_data, fetch_exceptions, normalize
from cache_utils import get_sheet_modified_time, read_cache, write_cache, _LAST_CHECKED, flush_cache
from sync_fio_to_sheets import sync_to_sheets
from infer_payments import infer_payments
def get_cached_data(cache_key, sheet_id, fetch_func, *args, serialize=None, deserialize=None, **kwargs):
mod_time = get_sheet_modified_time(cache_key)
if mod_time:
cached = read_cache(cache_key, mod_time)
if cached is not None:
return deserialize(cached) if deserialize else cached
data = fetch_func(*args, **kwargs)
if mod_time:
write_cache(cache_key, mod_time, serialize(data) if serialize else data)
return data
def get_month_labels(sorted_months, merged_months):
labels = {}
for m in sorted_months:
dt = datetime.strptime(m, "%Y-%m")
# Find which months were merged into m (e.g. 2026-01 is merged into 2026-02)
merged_in = sorted([k for k, v in merged_months.items() if v == m])
if merged_in:
all_dts = [datetime.strptime(x, "%Y-%m") for x in sorted(merged_in + [m])]
years = {d.year for d in all_dts}
if len(years) > 1:
parts = [d.strftime("%b %Y") for d in all_dts]
labels[m] = "+".join(parts)
else:
parts = [d.strftime("%b") for d in all_dts]
labels[m] = f"{'+'.join(parts)} {dt.strftime('%Y')}"
else:
labels[m] = dt.strftime("%b %Y")
return labels
def warmup_cache():
"""Pre-fetch all cached data so first request is fast."""
logger = logging.getLogger(__name__)
logger.info("Warming up cache...")
credentials_path = CREDENTIALS_PATH
get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
get_cached_data("exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
logger.info("Cache warmup complete.")
app = Flask(__name__)
import json as _json
_meta_path = Path(__file__).parent / "build_meta.json"
BUILD_META = _json.loads(_meta_path.read_text()) if _meta_path.exists() else {
"tag": "dev", "commit": "local", "build_date": ""
}
warmup_cache()
@app.before_request
def start_timer():
g.start_time = time.perf_counter()
g.steps = []
def record_step(name):
g.steps.append((name, time.perf_counter()))
@app.context_processor
def inject_render_time():
def get_render_time():
total = time.perf_counter() - g.start_time
breakdown = []
last_time = g.start_time
for name, timestamp in g.steps:
duration = timestamp - last_time
breakdown.append(f"{name}:{duration:.3f}s")
last_time = timestamp
# Add remaining time as 'render'
render_duration = time.perf_counter() - last_time
breakdown.append(f"render:{render_duration:.3f}s")
return {
"total": f"{total:.3f}",
"breakdown": " | ".join(breakdown)
}
return dict(get_render_time=get_render_time, build_meta=BUILD_META)
@app.route("/")
def index():
# Redirect root to /adults for convenience while there are no other apps
return '<meta http-equiv="refresh" content="0; url=/adults" />'
@app.route("/flush-cache", methods=["GET", "POST"])
def flush_cache_endpoint():
if request.method == "GET":
return render_template("flush-cache.html")
deleted = flush_cache()
return render_template("flush-cache.html", flushed=True, deleted=deleted)
@app.route("/sync-bank")
def sync_bank():
import contextlib
output = io.StringIO()
success = True
try:
with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output):
# sync_to_sheets: equivalent of make sync-2026
output.write("=== Syncing Fio transactions (2026) ===\n")
sync_to_sheets(
spreadsheet_id=PAYMENTS_SHEET_ID,
credentials_path=CREDENTIALS_PATH,
date_from_str="2026-01-01",
date_to_str="2026-12-31",
sort_by_date=True,
)
output.write("\n=== Inferring payment details ===\n")
infer_payments(PAYMENTS_SHEET_ID, CREDENTIALS_PATH)
output.write("\n=== Flushing cache ===\n")
deleted = flush_cache()
output.write(f"Deleted {deleted} cache files.\n")
output.write("\n=== Done ===\n")
except Exception as e:
import traceback
output.write(f"\n!!! Error: {e}\n")
output.write(traceback.format_exc())
success = False
return render_template("sync.html", output=output.getvalue(), success=success)
@app.route("/version")
def version():
return BUILD_META
@app.route("/adults")
def adults_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
members_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
record_step("fetch_members")
if not members_data:
return "No data."
members, sorted_months = members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
result = reconcile(members, sorted_months, transactions, exceptions)
record_step("reconcile")
month_labels = get_month_labels(sorted_months, ADULT_MERGED_MONTHS)
adult_names = sorted([name for name, tier, _ in members if tier == "A"])
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in adult_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK ({count})" if count > 0 else f"{override_amount} ({original_expected}) CZK"
else:
is_overridden = False
fee_display = f"{expected} CZK ({count})" if count > 0 else f"{expected} CZK"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected > 0:
amount_to_pay = max(0, expected - paid)
if paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
else:
cell_text = "-"
amount_to_pay = 0
if expected > 0 or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if data["total_balance"] < 0 else "")
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = data["total_balance"]
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
credits = sorted([{"name": n, "amount": a["total_balance"]} for n, a in result["members"].items() if a["total_balance"] > 0 and n in adult_names], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(a["total_balance"])} for n, a in result["members"].items() if a["total_balance"] < 0 and n in adult_names], key=lambda x: x["name"])
unmatched = result["unmatched"]
import json
record_step("process_data")
return render_template(
"adults.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT
)
@app.route("/juniors")
def juniors_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit#gid={JUNIOR_SHEET_GID}"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
junior_members_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
record_step("fetch_junior_members")
if not junior_members_data:
return "No data."
junior_members, sorted_months = junior_members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
# Adapt junior tuple format (name, tier, {month: (fee, total_count, adult_count, junior_count)})
# to what match_payments expects: (name, tier, {month: (expected_fee, attendance_count)})
adapted_members = []
for name, tier, fees_dict in junior_members:
adapted_fees = {}
for m, fee_data in fees_dict.items():
if len(fee_data) == 4:
fee, total_count, _, _ = fee_data
adapted_fees[m] = (fee, total_count)
else:
fee, count = fee_data
adapted_fees[m] = (fee, count)
adapted_members.append((name, tier, adapted_fees))
result = reconcile(adapted_members, sorted_months, transactions, exceptions)
record_step("reconcile")
# Format month labels
month_labels = get_month_labels(sorted_months, JUNIOR_MERGED_MONTHS)
junior_names = sorted([name for name, tier, _ in adapted_members])
junior_members_dict = {name: fees_dict for name, _, fees_dict in junior_members}
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in junior_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
if expected != "?" and isinstance(expected, int):
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
orig_fee_data = junior_members_dict.get(name, {}).get(m)
adult_count = 0
junior_count = 0
if orig_fee_data and len(orig_fee_data) == 4:
_, _, adult_count, junior_count = orig_fee_data
breakdown = ""
if adult_count > 0 and junior_count > 0:
breakdown = f":{junior_count}J,{adult_count}A"
elif junior_count > 0:
breakdown = f":{junior_count}J"
elif adult_count > 0:
breakdown = f":{adult_count}A"
count_str = f" ({count}{breakdown})" if count > 0 else ""
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK{count_str}"
else:
is_overridden = False
fee_display = f"{expected} CZK{count_str}"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected == "?" or (isinstance(expected, int) and expected > 0):
if expected == "?":
status = "empty"
cell_text = f"?{count_str}"
elif paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
amount_to_pay = expected - paid
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
amount_to_pay = expected
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
if (isinstance(expected, int) and expected > 0) or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if data["total_balance"] < 0 else "")
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = data["total_balance"]
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
# Format credits and debts
credits = sorted([{"name": n, "amount": a["total_balance"]} for n, a in result["members"].items() if a["total_balance"] > 0], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(a["total_balance"])} for n, a in result["members"].items() if a["total_balance"] < 0], key=lambda x: x["name"])
unmatched = result["unmatched"]
import json
record_step("process_data")
return render_template(
"juniors.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT
)
@app.route("/payments")
def payments():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
# Group transactions by person
grouped = {}
for tx in transactions:
person = str(tx.get("person", "")).strip()
if not person:
person = "Unmatched / Unknown"
# Handle multiple people (comma separated)
people = [p.strip() for p in person.split(",") if p.strip()]
for p in people:
# Strip markers
clean_p = re.sub(r"\[\?\]\s*", "", p)
if clean_p not in grouped:
grouped[clean_p] = []
grouped[clean_p].append(tx)
# Sort people and their transactions
sorted_people = sorted(grouped.keys())
for p in sorted_people:
# Sort by date descending
grouped[p].sort(key=lambda x: str(x.get("date", "")), reverse=True)
record_step("process_data")
return render_template(
"payments.html",
grouped_payments=grouped,
sorted_people=sorted_people,
attendance_url=attendance_url,
payments_url=payments_url
)
@app.route("/qr")
def qr_code():
account = request.args.get("account", BANK_ACCOUNT)
amount = request.args.get("amount", "0")
message = request.args.get("message", "")
# Validate account: allow IBAN (letters+digits) or Czech format (digits/digits)
if not re.match(r'^[A-Z]{2}\d{2,34}$|^\d{1,16}/\d{4}$', account):
account = BANK_ACCOUNT
# QR Platba standard: SPD*1.0*ACC:accountNumber*BC:bankCode*AM:amount*CC:CZK*MSG:message
acc_parts = account.split('/')
if len(acc_parts) == 2:
acc_str = f"{acc_parts[0]}*BC:{acc_parts[1]}"
else:
acc_str = account
try:
amt_val = float(amount)
if amt_val < 0 or amt_val > 10_000_000:
amt_val = 0
amt_str = f"{amt_val:.2f}"
except ValueError:
amt_str = "0.00"
# Message max 60 characters, strip SPD delimiters to prevent injection
msg_str = message[:60].replace("*", "")
qr_data = f"SPD*1.0*ACC:{acc_str}*AM:{amt_str}*CC:CZK*MSG:{msg_str}"
img = qrcode.make(qr_data)
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return send_file(buf, mimetype='image/png')
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5001)