Files
fuj-management/app.py
Jan Novak ced238385e
All checks were successful
Deploy to K8s / deploy (push) Successful in 10s
Build and Push / build (push) Successful in 32s
feat: Exclude current month from Pay buttons and balance
Hide Pay/Pay All buttons for months still in progress, exclude
current month debt from balance column, and show in-progress
month debt in a muted red color.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-09 13:51:37 +02:00

610 lines
24 KiB
Python

import sys
from pathlib import Path
from datetime import datetime
import re
import time
import os
import io
import qrcode
import logging
from flask import Flask, render_template, g, send_file, request
# Configure logging, allowing override via LOG_LEVEL environment variable
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s:%(filename)s:%(lineno)d [%(funcName)s] - %(levelname)s - %(message)s')
# Add scripts directory to path to allow importing from it
scripts_dir = Path(__file__).parent / "scripts"
sys.path.append(str(scripts_dir))
from config import (
ATTENDANCE_SHEET_ID, PAYMENTS_SHEET_ID, JUNIOR_SHEET_GID,
BANK_ACCOUNT, CREDENTIALS_PATH,
)
from attendance import get_members_with_fees, get_junior_members_with_fees, ADULT_MERGED_MONTHS, JUNIOR_MERGED_MONTHS
from match_payments import reconcile, fetch_sheet_data, fetch_exceptions, normalize
from cache_utils import get_sheet_modified_time, read_cache, write_cache, _LAST_CHECKED, flush_cache
from sync_fio_to_sheets import sync_to_sheets
from infer_payments import infer_payments
def get_cached_data(cache_key, sheet_id, fetch_func, *args, serialize=None, deserialize=None, **kwargs):
mod_time = get_sheet_modified_time(cache_key)
if mod_time:
cached = read_cache(cache_key, mod_time)
if cached is not None:
return deserialize(cached) if deserialize else cached
data = fetch_func(*args, **kwargs)
if mod_time:
write_cache(cache_key, mod_time, serialize(data) if serialize else data)
return data
def get_month_labels(sorted_months, merged_months):
labels = {}
for m in sorted_months:
dt = datetime.strptime(m, "%Y-%m")
# Find which months were merged into m (e.g. 2026-01 is merged into 2026-02)
merged_in = sorted([k for k, v in merged_months.items() if v == m])
if merged_in:
all_dts = [datetime.strptime(x, "%Y-%m") for x in sorted(merged_in + [m])]
years = {d.year for d in all_dts}
if len(years) > 1:
parts = [d.strftime("%b %Y") for d in all_dts]
labels[m] = "+".join(parts)
else:
parts = [d.strftime("%b") for d in all_dts]
labels[m] = f"{'+'.join(parts)} {dt.strftime('%Y')}"
else:
labels[m] = dt.strftime("%b %Y")
return labels
def warmup_cache():
"""Pre-fetch all cached data so first request is fast."""
logger = logging.getLogger(__name__)
logger.info("Warming up cache...")
credentials_path = CREDENTIALS_PATH
get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
get_cached_data("exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
logger.info("Cache warmup complete.")
app = Flask(__name__)
import json as _json
_meta_path = Path(__file__).parent / "build_meta.json"
BUILD_META = _json.loads(_meta_path.read_text()) if _meta_path.exists() else {
"tag": "dev", "commit": "local", "build_date": ""
}
warmup_cache()
@app.before_request
def start_timer():
g.start_time = time.perf_counter()
g.steps = []
def record_step(name):
g.steps.append((name, time.perf_counter()))
@app.context_processor
def inject_render_time():
def get_render_time():
total = time.perf_counter() - g.start_time
breakdown = []
last_time = g.start_time
for name, timestamp in g.steps:
duration = timestamp - last_time
breakdown.append(f"{name}:{duration:.3f}s")
last_time = timestamp
# Add remaining time as 'render'
render_duration = time.perf_counter() - last_time
breakdown.append(f"render:{render_duration:.3f}s")
return {
"total": f"{total:.3f}",
"breakdown": " | ".join(breakdown)
}
return dict(get_render_time=get_render_time, build_meta=BUILD_META)
@app.route("/")
def index():
# Redirect root to /adults for convenience while there are no other apps
return '<meta http-equiv="refresh" content="0; url=/adults" />'
@app.route("/flush-cache", methods=["GET", "POST"])
def flush_cache_endpoint():
if request.method == "GET":
return render_template("flush-cache.html")
deleted = flush_cache()
return render_template("flush-cache.html", flushed=True, deleted=deleted)
@app.route("/sync-bank")
def sync_bank():
import contextlib
output = io.StringIO()
success = True
try:
with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output):
# sync_to_sheets: equivalent of make sync-2026
output.write("=== Syncing Fio transactions (2026) ===\n")
sync_to_sheets(
spreadsheet_id=PAYMENTS_SHEET_ID,
credentials_path=CREDENTIALS_PATH,
date_from_str="2026-01-01",
date_to_str="2026-12-31",
sort_by_date=True,
)
output.write("\n=== Inferring payment details ===\n")
infer_payments(PAYMENTS_SHEET_ID, CREDENTIALS_PATH)
output.write("\n=== Flushing cache ===\n")
deleted = flush_cache()
output.write(f"Deleted {deleted} cache files.\n")
output.write("\n=== Done ===\n")
except Exception as e:
import traceback
output.write(f"\n!!! Error: {e}\n")
output.write(traceback.format_exc())
success = False
return render_template("sync.html", output=output.getvalue(), success=success)
@app.route("/version")
def version():
return BUILD_META
@app.route("/adults")
def adults_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
members_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees)
record_step("fetch_members")
if not members_data:
return "No data."
members, sorted_months = members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
result = reconcile(members, sorted_months, transactions, exceptions)
record_step("reconcile")
month_labels = get_month_labels(sorted_months, ADULT_MERGED_MONTHS)
adult_names = sorted([name for name, tier, _ in members if tier == "A"])
current_month = datetime.now().strftime("%Y-%m")
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in adult_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
payable_amount = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK ({count})" if count > 0 else f"{override_amount} ({original_expected}) CZK"
else:
is_overridden = False
fee_display = f"{expected} CZK ({count})" if count > 0 else f"{expected} CZK"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected > 0:
amount_to_pay = max(0, expected - paid)
if paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
payable_amount += amount_to_pay
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
payable_amount += amount_to_pay
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
else:
cell_text = "-"
amount_to_pay = 0
if expected > 0 or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
# Compute balance excluding current/future months
current_month_debt = 0
for m in sorted_months:
if m >= current_month:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
exp = mdata.get("expected", 0)
pd = int(mdata.get("paid", 0))
current_month_debt += max(0, exp - pd)
settled_balance = data["total_balance"] + current_month_debt
row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if settled_balance < 0 and payable_amount == 0 else "")
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = settled_balance
row["payable_amount"] = payable_amount
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
def settled_balance(name):
data = result["members"][name]
debt = sum(max(0, data["months"].get(m, {"expected": 0, "paid": 0}).get("expected", 0) - int(data["months"].get(m, {"expected": 0, "paid": 0}).get("paid", 0))) for m in sorted_months if m >= current_month)
return data["total_balance"] + debt
credits = sorted([{"name": n, "amount": settled_balance(n)} for n in adult_names if settled_balance(n) > 0], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(settled_balance(n))} for n in adult_names if settled_balance(n) < 0], key=lambda x: x["name"])
unmatched = result["unmatched"]
import json
record_step("process_data")
return render_template(
"adults.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT,
current_month=current_month
)
@app.route("/juniors")
def juniors_view():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit#gid={JUNIOR_SHEET_GID}"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
junior_members_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees)
record_step("fetch_junior_members")
if not junior_members_data:
return "No data."
junior_members, sorted_months = junior_members_data
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
exceptions = get_cached_data(
"exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions,
PAYMENTS_SHEET_ID, credentials_path,
serialize=lambda d: [[list(k), v] for k, v in d.items()],
deserialize=lambda c: {tuple(k): v for k, v in c},
)
record_step("fetch_exceptions")
# Adapt junior tuple format (name, tier, {month: (fee, total_count, adult_count, junior_count)})
# to what match_payments expects: (name, tier, {month: (expected_fee, attendance_count)})
adapted_members = []
for name, tier, fees_dict in junior_members:
adapted_fees = {}
for m, fee_data in fees_dict.items():
if len(fee_data) == 4:
fee, total_count, _, _ = fee_data
adapted_fees[m] = (fee, total_count)
else:
fee, count = fee_data
adapted_fees[m] = (fee, count)
adapted_members.append((name, tier, adapted_fees))
result = reconcile(adapted_members, sorted_months, transactions, exceptions)
record_step("reconcile")
# Format month labels
month_labels = get_month_labels(sorted_months, JUNIOR_MERGED_MONTHS)
junior_names = sorted([name for name, tier, _ in adapted_members])
junior_members_dict = {name: fees_dict for name, _, fees_dict in junior_members}
current_month = datetime.now().strftime("%Y-%m")
monthly_totals = {m: {"expected": 0, "paid": 0} for m in sorted_months}
formatted_results = []
for name in junior_names:
data = result["members"][name]
row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": "", "raw_unpaid_periods": ""}
unpaid_months = []
raw_unpaid_months = []
payable_amount = 0
for m in sorted_months:
mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "attendance_count": 0, "paid": 0, "exception": None})
expected = mdata.get("expected", 0)
original_expected = mdata.get("original_expected", 0)
count = mdata.get("attendance_count", 0)
paid = int(mdata.get("paid", 0))
exception_info = mdata.get("exception", None)
if expected != "?" and isinstance(expected, int):
monthly_totals[m]["expected"] += expected
monthly_totals[m]["paid"] += paid
orig_fee_data = junior_members_dict.get(name, {}).get(m)
adult_count = 0
junior_count = 0
if orig_fee_data and len(orig_fee_data) == 4:
_, _, adult_count, junior_count = orig_fee_data
breakdown = ""
if adult_count > 0 and junior_count > 0:
breakdown = f":{junior_count}J,{adult_count}A"
elif junior_count > 0:
breakdown = f":{junior_count}J"
elif adult_count > 0:
breakdown = f":{adult_count}A"
count_str = f" ({count}{breakdown})" if count > 0 else ""
override_amount = exception_info["amount"] if exception_info else None
if override_amount is not None and override_amount != original_expected:
is_overridden = True
fee_display = f"{override_amount} ({original_expected}) CZK{count_str}"
else:
is_overridden = False
fee_display = f"{expected} CZK{count_str}"
status = "empty"
cell_text = "-"
amount_to_pay = 0
if expected == "?" or (isinstance(expected, int) and expected > 0):
if expected == "?":
status = "empty"
cell_text = f"?{count_str}"
elif paid >= expected:
status = "ok"
cell_text = f"{paid}/{fee_display}"
elif paid > 0:
status = "partial"
cell_text = f"{paid}/{fee_display}"
amount_to_pay = expected - paid
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
payable_amount += amount_to_pay
else:
status = "unpaid"
cell_text = f"0/{fee_display}"
amount_to_pay = expected
if m < current_month:
unpaid_months.append(month_labels[m])
raw_unpaid_months.append(datetime.strptime(m, "%Y-%m").strftime("%m/%Y"))
payable_amount += amount_to_pay
elif paid > 0:
status = "surplus"
cell_text = f"PAID {paid}"
if (isinstance(expected, int) and expected > 0) or paid > 0:
tooltip = f"Received: {paid}, Expected: {expected}"
else:
tooltip = ""
row["months"].append({
"text": cell_text,
"overridden": is_overridden,
"status": status,
"amount": amount_to_pay,
"month": month_labels[m],
"raw_month": m,
"tooltip": tooltip
})
# Compute balance excluding current/future months
current_month_debt = 0
for m in sorted_months:
if m >= current_month:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
exp = mdata.get("expected", 0)
if isinstance(exp, int):
pd = int(mdata.get("paid", 0))
current_month_debt += max(0, exp - pd)
settled_balance = data["total_balance"] + current_month_debt
row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if settled_balance < 0 and payable_amount == 0 else "")
row["raw_unpaid_periods"] = "+".join(raw_unpaid_months)
row["balance"] = settled_balance
row["payable_amount"] = payable_amount
formatted_results.append(row)
formatted_totals = []
for m in sorted_months:
t = monthly_totals[m]
status = "empty"
if t["expected"] > 0 or t["paid"] > 0:
if t["paid"] == t["expected"]:
status = "ok"
elif t["paid"] < t["expected"]:
status = "unpaid"
else:
status = "surplus"
formatted_totals.append({
"text": f"{t['paid']} / {t['expected']} CZK",
"status": status
})
# Format credits and debts
def junior_settled_balance(name):
data = result["members"][name]
debt = 0
for m in sorted_months:
if m >= current_month:
mdata = data["months"].get(m, {"expected": 0, "paid": 0})
exp = mdata.get("expected", 0)
if isinstance(exp, int):
debt += max(0, exp - int(mdata.get("paid", 0)))
return data["total_balance"] + debt
junior_all_names = [name for name, _, _ in adapted_members]
credits = sorted([{"name": n, "amount": junior_settled_balance(n)} for n in junior_all_names if junior_settled_balance(n) > 0], key=lambda x: x["name"])
debts = sorted([{"name": n, "amount": abs(junior_settled_balance(n))} for n in junior_all_names if junior_settled_balance(n) < 0], key=lambda x: x["name"])
unmatched = result["unmatched"]
import json
record_step("process_data")
return render_template(
"juniors.html",
months=[month_labels[m] for m in sorted_months],
raw_months=sorted_months,
results=formatted_results,
totals=formatted_totals,
member_data=json.dumps(result["members"]),
month_labels_json=json.dumps(month_labels),
credits=credits,
debts=debts,
unmatched=unmatched,
attendance_url=attendance_url,
payments_url=payments_url,
bank_account=BANK_ACCOUNT,
current_month=current_month
)
@app.route("/payments")
def payments():
attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit"
payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit"
credentials_path = CREDENTIALS_PATH
transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path)
record_step("fetch_payments")
# Group transactions by person
grouped = {}
for tx in transactions:
person = str(tx.get("person", "")).strip()
if not person:
person = "Unmatched / Unknown"
# Handle multiple people (comma separated)
people = [p.strip() for p in person.split(",") if p.strip()]
for p in people:
# Strip markers
clean_p = re.sub(r"\[\?\]\s*", "", p)
if clean_p not in grouped:
grouped[clean_p] = []
grouped[clean_p].append(tx)
# Sort people and their transactions
sorted_people = sorted(grouped.keys())
for p in sorted_people:
# Sort by date descending
grouped[p].sort(key=lambda x: str(x.get("date", "")), reverse=True)
record_step("process_data")
return render_template(
"payments.html",
grouped_payments=grouped,
sorted_people=sorted_people,
attendance_url=attendance_url,
payments_url=payments_url
)
@app.route("/qr")
def qr_code():
account = request.args.get("account", BANK_ACCOUNT)
amount = request.args.get("amount", "0")
message = request.args.get("message", "")
# Validate account: allow IBAN (letters+digits) or Czech format (digits/digits)
if not re.match(r'^[A-Z]{2}\d{2,34}$|^\d{1,16}/\d{4}$', account):
account = BANK_ACCOUNT
# QR Platba standard: SPD*1.0*ACC:accountNumber*BC:bankCode*AM:amount*CC:CZK*MSG:message
acc_parts = account.split('/')
if len(acc_parts) == 2:
acc_str = f"{acc_parts[0]}*BC:{acc_parts[1]}"
else:
acc_str = account
try:
amt_val = float(amount)
if amt_val < 0 or amt_val > 10_000_000:
amt_val = 0
amt_str = f"{amt_val:.2f}"
except ValueError:
amt_str = "0.00"
# Message max 60 characters, strip SPD delimiters to prevent injection
msg_str = message[:60].replace("*", "")
qr_data = f"SPD*1.0*ACC:{acc_str}*AM:{amt_str}*CC:CZK*MSG:{msg_str}"
img = qrcode.make(qr_data)
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return send_file(buf, mimetype='image/png')
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5001)