import sys from pathlib import Path from datetime import datetime import re import time import os import io import qrcode import logging from flask import Flask, render_template, g, send_file, request # Configure logging, allowing override via LOG_LEVEL environment variable log_level = os.environ.get("LOG_LEVEL", "INFO").upper() logging.basicConfig(level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s:%(filename)s:%(lineno)d [%(funcName)s] - %(levelname)s - %(message)s') # Add scripts directory to path to allow importing from it scripts_dir = Path(__file__).parent / "scripts" sys.path.append(str(scripts_dir)) from config import ( ATTENDANCE_SHEET_ID, PAYMENTS_SHEET_ID, JUNIOR_SHEET_GID, BANK_ACCOUNT, CREDENTIALS_PATH, ) from attendance import get_members_with_fees, get_junior_members_with_fees, ADULT_MERGED_MONTHS, JUNIOR_MERGED_MONTHS from match_payments import reconcile, fetch_sheet_data, fetch_exceptions, normalize from cache_utils import get_sheet_modified_time, read_cache, write_cache, _LAST_CHECKED def get_cached_data(cache_key, sheet_id, fetch_func, *args, serialize=None, deserialize=None, **kwargs): mod_time = get_sheet_modified_time(cache_key) if mod_time: cached = read_cache(cache_key, mod_time) if cached is not None: return deserialize(cached) if deserialize else cached data = fetch_func(*args, **kwargs) if mod_time: write_cache(cache_key, mod_time, serialize(data) if serialize else data) return data def get_month_labels(sorted_months, merged_months): labels = {} for m in sorted_months: dt = datetime.strptime(m, "%Y-%m") # Find which months were merged into m (e.g. 2026-01 is merged into 2026-02) merged_in = sorted([k for k, v in merged_months.items() if v == m]) if merged_in: all_dts = [datetime.strptime(x, "%Y-%m") for x in sorted(merged_in + [m])] years = {d.year for d in all_dts} if len(years) > 1: parts = [d.strftime("%b %Y") for d in all_dts] labels[m] = "+".join(parts) else: parts = [d.strftime("%b") for d in all_dts] labels[m] = f"{'+'.join(parts)} {dt.strftime('%Y')}" else: labels[m] = dt.strftime("%b %Y") return labels def warmup_cache(): """Pre-fetch all cached data so first request is fast.""" logger = logging.getLogger(__name__) logger.info("Warming up cache...") credentials_path = CREDENTIALS_PATH get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees) get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees) get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path) get_cached_data("exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions, PAYMENTS_SHEET_ID, credentials_path, serialize=lambda d: [[list(k), v] for k, v in d.items()], deserialize=lambda c: {tuple(k): v for k, v in c}, ) logger.info("Cache warmup complete.") app = Flask(__name__) warmup_cache() @app.before_request def start_timer(): g.start_time = time.perf_counter() g.steps = [] def record_step(name): g.steps.append((name, time.perf_counter())) @app.context_processor def inject_render_time(): def get_render_time(): total = time.perf_counter() - g.start_time breakdown = [] last_time = g.start_time for name, timestamp in g.steps: duration = timestamp - last_time breakdown.append(f"{name}:{duration:.3f}s") last_time = timestamp # Add remaining time as 'render' render_duration = time.perf_counter() - last_time breakdown.append(f"render:{render_duration:.3f}s") return { "total": f"{total:.3f}", "breakdown": " | ".join(breakdown) } return dict(get_render_time=get_render_time) @app.route("/") def index(): # Redirect root to /fees for convenience while there are no other apps return '' @app.route("/fees") def fees(): attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit" payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit" members_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees) record_step("fetch_members") if not members_data: return "No data." members, sorted_months = members_data # Filter to adults only for display results = [(name, fees) for name, tier, fees in members if tier == "A"] # Format month labels month_labels = get_month_labels(sorted_months, ADULT_MERGED_MONTHS) monthly_totals = {m: 0 for m in sorted_months} # Get exceptions for formatting credentials_path = CREDENTIALS_PATH exceptions = get_cached_data( "exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions, PAYMENTS_SHEET_ID, credentials_path, serialize=lambda d: [[list(k), v] for k, v in d.items()], deserialize=lambda c: {tuple(k): v for k, v in c}, ) record_step("fetch_exceptions") formatted_results = [] for name, month_fees in results: row = {"name": name, "months": []} norm_name = normalize(name) for m in sorted_months: fee, count = month_fees.get(m, (0, 0)) # Check for exception norm_period = normalize(m) ex_data = exceptions.get((norm_name, norm_period)) override_amount = ex_data["amount"] if ex_data else None if override_amount is not None and override_amount != fee: cell = f"{override_amount} ({fee}) CZK ({count})" if count > 0 else f"{override_amount} ({fee}) CZK" is_overridden = True else: if isinstance(fee, int): monthly_totals[m] += fee cell = f"{fee} CZK ({count})" if count > 0 else "-" is_overridden = False row["months"].append({"cell": cell, "overridden": is_overridden}) formatted_results.append(row) record_step("process_data") return render_template( "fees.html", months=[month_labels[m] for m in sorted_months], results=formatted_results, totals=[f"{monthly_totals[m]} CZK" for m in sorted_months], attendance_url=attendance_url, payments_url=payments_url ) @app.route("/fees-juniors") def fees_juniors(): attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit#gid={JUNIOR_SHEET_GID}" payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit" members_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees) record_step("fetch_junior_members") if not members_data: return "No data." members, sorted_months = members_data # Sort members by name results = sorted([(name, fees) for name, tier, fees in members], key=lambda x: x[0]) # Format month labels month_labels = get_month_labels(sorted_months, JUNIOR_MERGED_MONTHS) monthly_totals = {m: 0 for m in sorted_months} # Get exceptions for formatting (reusing payments sheet) credentials_path = CREDENTIALS_PATH exceptions = get_cached_data( "exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions, PAYMENTS_SHEET_ID, credentials_path, serialize=lambda d: [[list(k), v] for k, v in d.items()], deserialize=lambda c: {tuple(k): v for k, v in c}, ) record_step("fetch_exceptions") formatted_results = [] for name, month_fees in results: row = {"name": name, "months": []} norm_name = normalize(name) for m in sorted_months: fee_data = month_fees.get(m, (0, 0, 0, 0)) if len(fee_data) == 4: fee, total_count, adult_count, junior_count = fee_data else: fee, total_count = fee_data adult_count, junior_count = 0, 0 # Check for exception norm_period = normalize(m) ex_data = exceptions.get((norm_name, norm_period)) override_amount = ex_data["amount"] if ex_data else None if ex_data is None and isinstance(fee, int): monthly_totals[m] += fee # Formulate the count string display if adult_count > 0 and junior_count > 0: count_str = f"{total_count} ({adult_count}A+{junior_count}J)" elif adult_count > 0: count_str = f"{total_count} (A)" elif junior_count > 0: count_str = f"{total_count} (J)" else: count_str = f"{total_count}" if override_amount is not None and override_amount != fee: cell = f"{override_amount} ({fee}) CZK / {count_str}" if total_count > 0 else f"{override_amount} ({fee}) CZK" is_overridden = True else: if fee == "?": cell = f"? / {count_str}" if total_count > 0 else "-" else: cell = f"{fee} CZK / {count_str}" if total_count > 0 else "-" is_overridden = False row["months"].append({"cell": cell, "overridden": is_overridden}) formatted_results.append(row) record_step("process_data") return render_template( "fees-juniors.html", months=[month_labels[m] for m in sorted_months], results=formatted_results, totals=[f"{t} CZK" if isinstance(t, int) else t for t in monthly_totals.values()], attendance_url=attendance_url, payments_url=payments_url ) @app.route("/reconcile") def reconcile_view(): attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit" payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit" # Use hardcoded credentials path for now, consistent with other scripts credentials_path = CREDENTIALS_PATH members_data = get_cached_data("attendance_regular", ATTENDANCE_SHEET_ID, get_members_with_fees) record_step("fetch_members") if not members_data: return "No data." members, sorted_months = members_data transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path) record_step("fetch_payments") exceptions = get_cached_data( "exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions, PAYMENTS_SHEET_ID, credentials_path, serialize=lambda d: [[list(k), v] for k, v in d.items()], deserialize=lambda c: {tuple(k): v for k, v in c}, ) record_step("fetch_exceptions") result = reconcile(members, sorted_months, transactions, exceptions) record_step("reconcile") # Format month labels month_labels = get_month_labels(sorted_months, ADULT_MERGED_MONTHS) # Filter to adults for the main table adult_names = sorted([name for name, tier, _ in members if tier == "A"]) formatted_results = [] for name in adult_names: data = result["members"][name] row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": ""} unpaid_months = [] for m in sorted_months: mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "paid": 0}) expected = mdata["expected"] paid = int(mdata["paid"]) status = "empty" cell_text = "-" amount_to_pay = 0 if expected > 0: if paid >= expected: status = "ok" cell_text = "OK" elif paid > 0: status = "partial" cell_text = f"{paid}/{expected}" amount_to_pay = expected - paid unpaid_months.append(month_labels[m]) else: status = "unpaid" cell_text = f"UNPAID {expected}" amount_to_pay = expected unpaid_months.append(month_labels[m]) elif paid > 0: status = "surplus" cell_text = f"PAID {paid}" row["months"].append({ "text": cell_text, "status": status, "amount": amount_to_pay, "month": month_labels[m] }) row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if data["total_balance"] < 0 else "") row["balance"] = data["total_balance"] # Updated to use total_balance formatted_results.append(row) # Format credits and debts credits = sorted([{"name": n, "amount": a["total_balance"]} for n, a in result["members"].items() if a["total_balance"] > 0 and n in adult_names], key=lambda x: x["name"]) debts = sorted([{"name": n, "amount": abs(a["total_balance"])} for n, a in result["members"].items() if a["total_balance"] < 0 and n in adult_names], key=lambda x: x["name"]) # Format unmatched unmatched = result["unmatched"] import json record_step("process_data") return render_template( "reconcile.html", months=[month_labels[m] for m in sorted_months], raw_months=sorted_months, results=formatted_results, member_data=json.dumps(result["members"]), month_labels_json=json.dumps(month_labels), credits=credits, debts=debts, unmatched=unmatched, attendance_url=attendance_url, payments_url=payments_url, bank_account=BANK_ACCOUNT ) @app.route("/reconcile-juniors") def reconcile_juniors_view(): attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit#gid={JUNIOR_SHEET_GID}" payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit" credentials_path = CREDENTIALS_PATH junior_members_data = get_cached_data("attendance_juniors", ATTENDANCE_SHEET_ID, get_junior_members_with_fees) record_step("fetch_junior_members") if not junior_members_data: return "No data." junior_members, sorted_months = junior_members_data transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path) record_step("fetch_payments") exceptions = get_cached_data( "exceptions_dict", PAYMENTS_SHEET_ID, fetch_exceptions, PAYMENTS_SHEET_ID, credentials_path, serialize=lambda d: [[list(k), v] for k, v in d.items()], deserialize=lambda c: {tuple(k): v for k, v in c}, ) record_step("fetch_exceptions") # Adapt junior tuple format (name, tier, {month: (fee, total_count, adult_count, junior_count)}) # to what match_payments expects: (name, tier, {month: (expected_fee, attendance_count)}) adapted_members = [] for name, tier, fees_dict in junior_members: adapted_fees = {} for m, fee_data in fees_dict.items(): if len(fee_data) == 4: fee, total_count, _, _ = fee_data adapted_fees[m] = (fee, total_count) else: fee, count = fee_data adapted_fees[m] = (fee, count) adapted_members.append((name, tier, adapted_fees)) result = reconcile(adapted_members, sorted_months, transactions, exceptions) record_step("reconcile") # Format month labels month_labels = get_month_labels(sorted_months, JUNIOR_MERGED_MONTHS) # Filter to juniors for the main table junior_names = sorted([name for name, tier, _ in adapted_members]) formatted_results = [] for name in junior_names: data = result["members"][name] row = {"name": name, "months": [], "balance": data["total_balance"], "unpaid_periods": ""} unpaid_months = [] for m in sorted_months: mdata = data["months"].get(m, {"expected": 0, "original_expected": 0, "paid": 0}) expected = mdata["expected"] paid = int(mdata["paid"]) status = "empty" cell_text = "-" amount_to_pay = 0 if expected == "?" or (isinstance(expected, int) and expected > 0): if expected == "?": status = "empty" cell_text = "?" elif paid >= expected: status = "ok" cell_text = "OK" elif paid > 0: status = "partial" cell_text = f"{paid}/{expected}" amount_to_pay = expected - paid unpaid_months.append(month_labels[m]) else: status = "unpaid" cell_text = f"UNPAID {expected}" amount_to_pay = expected unpaid_months.append(month_labels[m]) elif paid > 0: status = "surplus" cell_text = f"PAID {paid}" row["months"].append({ "text": cell_text, "status": status, "amount": amount_to_pay, "month": month_labels[m] }) row["unpaid_periods"] = ", ".join(unpaid_months) if unpaid_months else ("Older debt" if data["total_balance"] < 0 else "") row["balance"] = data["total_balance"] formatted_results.append(row) # Format credits and debts credits = sorted([{"name": n, "amount": a["total_balance"]} for n, a in result["members"].items() if a["total_balance"] > 0], key=lambda x: x["name"]) debts = sorted([{"name": n, "amount": abs(a["total_balance"])} for n, a in result["members"].items() if a["total_balance"] < 0], key=lambda x: x["name"]) import json record_step("process_data") return render_template( "reconcile-juniors.html", months=[month_labels[m] for m in sorted_months], raw_months=sorted_months, results=formatted_results, member_data=json.dumps(result["members"]), month_labels_json=json.dumps(month_labels), credits=credits, debts=debts, unmatched=[], attendance_url=attendance_url, payments_url=payments_url, bank_account=BANK_ACCOUNT ) @app.route("/payments") def payments(): attendance_url = f"https://docs.google.com/spreadsheets/d/{ATTENDANCE_SHEET_ID}/edit" payments_url = f"https://docs.google.com/spreadsheets/d/{PAYMENTS_SHEET_ID}/edit" credentials_path = CREDENTIALS_PATH transactions = get_cached_data("payments_transactions", PAYMENTS_SHEET_ID, fetch_sheet_data, PAYMENTS_SHEET_ID, credentials_path) record_step("fetch_payments") # Group transactions by person grouped = {} for tx in transactions: person = str(tx.get("person", "")).strip() if not person: person = "Unmatched / Unknown" # Handle multiple people (comma separated) people = [p.strip() for p in person.split(",") if p.strip()] for p in people: # Strip markers clean_p = re.sub(r"\[\?\]\s*", "", p) if clean_p not in grouped: grouped[clean_p] = [] grouped[clean_p].append(tx) # Sort people and their transactions sorted_people = sorted(grouped.keys()) for p in sorted_people: # Sort by date descending grouped[p].sort(key=lambda x: str(x.get("date", "")), reverse=True) record_step("process_data") return render_template( "payments.html", grouped_payments=grouped, sorted_people=sorted_people, attendance_url=attendance_url, payments_url=payments_url ) @app.route("/qr") def qr_code(): account = request.args.get("account", BANK_ACCOUNT) amount = request.args.get("amount", "0") message = request.args.get("message", "") # Validate account: allow IBAN (letters+digits) or Czech format (digits/digits) if not re.match(r'^[A-Z]{2}\d{2,34}$|^\d{1,16}/\d{4}$', account): account = BANK_ACCOUNT # QR Platba standard: SPD*1.0*ACC:accountNumber*BC:bankCode*AM:amount*CC:CZK*MSG:message acc_parts = account.split('/') if len(acc_parts) == 2: acc_str = f"{acc_parts[0]}*BC:{acc_parts[1]}" else: acc_str = account try: amt_val = float(amount) if amt_val < 0 or amt_val > 10_000_000: amt_val = 0 amt_str = f"{amt_val:.2f}" except ValueError: amt_str = "0.00" # Message max 60 characters, strip SPD delimiters to prevent injection msg_str = message[:60].replace("*", "") qr_data = f"SPD*1.0*ACC:{acc_str}*AM:{amt_str}*CC:CZK*MSG:{msg_str}" img = qrcode.make(qr_data) buf = io.BytesIO() img.save(buf, format='PNG') buf.seek(0) return send_file(buf, mimetype='image/png') if __name__ == "__main__": app.run(debug=True, host='0.0.0.0', port=5001)