feat: implement automated payment inference and sync to Google Sheets

This commit is contained in:
Jan Novak
2026-03-02 14:29:45 +01:00
parent 65e40d116b
commit d719383c9c
10 changed files with 1520 additions and 264 deletions

215
scripts/fio_utils.py Normal file
View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""Shared Fio bank fetching utilities."""
import json
import os
import re
import urllib.request
from datetime import datetime
from html.parser import HTMLParser
# ---------------------------------------------------------------------------
# Transaction fetching
# ---------------------------------------------------------------------------
class FioTableParser(HTMLParser):
"""Parse the second <table class="table"> on the Fio transparent page.
Columns: Datum | Částka | Typ | Název protiúčtu | Zpráva pro příjemce | KS | VS | SS | Poznámka
Indices: 0 1 2 3 4 5 6 7 8
"""
def __init__(self):
super().__init__()
self._table_count = 0
self._in_target_table = False
self._in_thead = False
self._in_row = False
self._in_cell = False
self._current_row: list[str] = []
self._rows: list[list[str]] = []
self._cell_text = ""
def handle_starttag(self, tag, attrs):
cls = dict(attrs).get("class", "")
if tag == "table" and "table" in cls.split():
self._table_count += 1
if self._table_count == 2:
self._in_target_table = True
if self._in_target_table:
if tag == "thead":
self._in_thead = True
if tag == "tr" and not self._in_thead:
self._in_row = True
self._current_row = []
if self._in_row and tag in ("td", "th"):
self._in_cell = True
self._cell_text = ""
def handle_endtag(self, tag):
if self._in_cell and tag in ("td", "th"):
self._in_cell = False
self._current_row.append(self._cell_text.strip())
if tag == "thead":
self._in_thead = False
if self._in_row and tag == "tr":
self._in_row = False
if self._current_row:
self._rows.append(self._current_row)
if tag == "table" and self._in_target_table:
self._in_target_table = False
def handle_data(self, data):
if self._in_cell:
self._cell_text += data
def get_rows(self) -> list[list[str]]:
return self._rows
# Fio transparent table column indices
_COL_DATE = 0
_COL_AMOUNT = 1
_COL_SENDER = 3
_COL_MESSAGE = 4
_COL_KS = 5
_COL_VS = 6
_COL_SS = 7
_COL_NOTE = 8
def parse_czech_amount(s: str) -> float | None:
"""Parse '1 500,00 CZK' to float."""
s = s.replace("\xa0", "").replace(" ", "").replace(",", ".")
s = re.sub(r"[A-Za-z]+", "", s).strip()
try:
return float(s)
except ValueError:
return None
def parse_czech_date(s: str) -> str | None:
"""Parse 'DD.MM.YYYY' to 'YYYY-MM-DD'."""
s = s.strip()
for fmt in ("%d.%m.%Y", "%d/%m/%Y"):
try:
return datetime.strptime(s, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return None
def fetch_transactions_transparent(
date_from: str, date_to: str, account_id: str = "2800359168"
) -> list[dict]:
"""Fetch transactions from Fio transparent account HTML page.
Args:
date_from: D.M.YYYY format
date_to: D.M.YYYY format
"""
url = (
f"https://ib.fio.cz/ib/transparent?a={account_id}"
f"&f={date_from}&t={date_to}"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8")
parser = FioTableParser()
parser.feed(html)
rows = parser.get_rows()
transactions = []
for row in rows:
if len(row) < 5:
continue
def col(i):
return row[i].strip() if i < len(row) else ""
date_str = parse_czech_date(col(_COL_DATE))
amount = parse_czech_amount(col(_COL_AMOUNT))
if date_str is None or amount is None or amount <= 0:
continue
transactions.append({
"date": date_str,
"amount": amount,
"sender": col(_COL_SENDER),
"message": col(_COL_MESSAGE),
"ks": col(_COL_KS),
"vs": col(_COL_VS),
"ss": col(_COL_SS),
"note": col(_COL_NOTE),
"bank_id": "", # HTML scraping doesn't give stable ID
})
return transactions
def fetch_transactions_api(
token: str, date_from: str, date_to: str
) -> list[dict]:
"""Fetch transactions via Fio REST API (JSON).
Args:
token: Fio API token
date_from: YYYY-MM-DD format
date_to: YYYY-MM-DD format
"""
url = (
f"https://fioapi.fio.cz/v1/rest/periods/{token}"
f"/{date_from}/{date_to}/transactions.json"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode("utf-8"))
transactions = []
tx_list = data.get("accountStatement", {}).get("transactionList", {})
for tx in (tx_list.get("transaction") or []):
# Each field is {"value": ..., "name": ..., "id": ...} or null
def val(col_id):
col = tx.get(f"column{col_id}")
return col["value"] if col else ""
amount = float(val(1) or 0)
if amount <= 0:
continue # Skip outgoing
date_raw = val(0) or ""
# API returns date as "YYYY-MM-DD+HHMM" or ISO format
date_str = date_raw[:10] if date_raw else ""
transactions.append({
"date": date_str,
"amount": amount,
"sender": str(val(10) or ""), # column10 = sender name
"message": str(val(16) or ""), # column16 = message for recipient
"vs": str(val(5) or ""), # column5 = VS
"ks": str(val(4) or ""), # column4 = KS
"ss": str(val(6) or ""), # column6 = SS
"user_id": str(val(7) or ""), # column7 = user identification
"sender_account": str(val(2) or ""), # column2 = sender account
"bank_id": str(val(22) or ""), # column22 = ID operace
"currency": str(val(14) or "CZK"), # column14 = Currency
})
return transactions
def fetch_transactions(date_from: str, date_to: str) -> list[dict]:
"""Fetch transactions, using API if token available, else transparent page."""
token = os.environ.get("FIO_API_TOKEN", "").strip()
if token:
return fetch_transactions_api(token, date_from, date_to)
# Convert YYYY-MM-DD to DD.MM.YYYY for the transparent page URL
from_dt = datetime.strptime(date_from, "%Y-%m-%d")
to_dt = datetime.strptime(date_to, "%Y-%m-%d")
return fetch_transactions_transparent(
from_dt.strftime("%d.%m.%Y"),
to_dt.strftime("%d.%m.%Y"),
)

191
scripts/infer_payments.py Normal file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""Infer 'Person', 'Purpose', and 'Amount' for transactions in Google Sheets."""
import argparse
import os
import sys
from datetime import datetime
# Add the current directory to sys.path to import local modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from googleapiclient.discovery import build
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
from match_payments import infer_transaction_details
from attendance import get_members_with_fees
def parse_czk_amount(val) -> float:
"""Parse Czech currency string or handle raw numeric value."""
if val is None or val == "":
return 0.0
if isinstance(val, (int, float)):
return float(val)
val = str(val)
# Strip currency symbol and spaces
val = val.replace("", "").replace("CZK", "").strip()
# Remove thousand separators (often space or dot)
# Heuristic: if there's a comma, it's the decimal separator.
# If there's a dot, it might be a thousand separator OR decimal separator.
if "," in val:
# 1.500,00 -> 1500.00
val = val.replace(".", "").replace(" ", "").replace(",", ".")
else:
# 1 500.00 -> 1500.00 or 1.500.00 -> ???
# If there are multiple dots, it's thousand separator.
if val.count(".") > 1:
val = val.replace(".", "").replace(" ", "")
# If there's one dot, it might be decimal separator.
else:
val = val.replace(" ", "")
try:
return float(val)
except ValueError:
return 0.0
# Column names as requested by the user
COL_MANUAL = "manual fix"
COL_PERSON = "Person"
COL_PURPOSE = "Purpose"
COL_AMOUNT = "Inferred Amount"
def infer_payments(spreadsheet_id: str, credentials_path: str, dry_run: bool = False):
print(f"Connecting to Google Sheets...")
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
# 1. Fetch all data from the sheet
print("Reading sheet data...")
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:Z", # Read a broad range to find existing columns
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
if not rows:
print("Sheet is empty.")
return
header = rows[0]
# Identify indices of existing columns
def get_col_index(label):
normalized_label = label.lower().strip()
for i, h in enumerate(header):
if h.lower().strip() == normalized_label:
return i
return -1
idx_date = get_col_index("Date")
idx_amount_raw = get_col_index("Amount") # Bank Amount
idx_sender = get_col_index("Sender")
idx_message = get_col_index("Message")
idx_vs = get_col_index("VS")
target_labels = [COL_MANUAL, COL_PERSON, COL_PURPOSE, COL_AMOUNT]
# Refresh indices
idx_manual = get_col_index(COL_MANUAL)
idx_inferred_person = get_col_index(COL_PERSON)
idx_inferred_purpose = get_col_index(COL_PURPOSE)
idx_inferred_amount = get_col_index(COL_AMOUNT)
if idx_inferred_person == -1 or idx_inferred_purpose == -1 or idx_inferred_amount == -1:
print(f"Error: Required columns {target_labels[1:]} not found in sheet.")
print(f"Current header: {header}")
return
# 2. Fetch members for matching
print("Fetching member list for matching...")
members_data, _ = get_members_with_fees()
member_names = [m[0] for m in members_data]
# 3. Process rows
print("Inffering details for empty rows...")
updates = []
for i, row in enumerate(rows[1:], start=2):
# Extend row if it's shorter than existing header
while len(row) < len(header):
row.append("")
# Check if already filled (manual override)
val_manual = str(row[idx_manual]) if idx_manual != -1 and idx_manual < len(row) else ""
val_person = str(row[idx_inferred_person]) if idx_inferred_person < len(row) else ""
val_purpose = str(row[idx_inferred_purpose]) if idx_inferred_purpose < len(row) else ""
if val_manual.strip() or val_person.strip() or val_purpose.strip():
continue
# Prepare transaction dict for matching logic
tx = {
"date": row[idx_date] if idx_date != -1 and idx_date < len(row) else "",
"amount": parse_czk_amount(row[idx_amount_raw]) if idx_amount_raw != -1 and idx_amount_raw < len(row) and row[idx_amount_raw] else 0,
"sender": row[idx_sender] if idx_sender != -1 and idx_sender < len(row) else "",
"message": row[idx_message] if idx_message != -1 and idx_message < len(row) else "",
"vs": row[idx_vs] if idx_vs != -1 and idx_vs < len(row) else "",
}
inference = infer_transaction_details(tx, member_names)
# Sort members by confidence and add markers
peeps = []
for name, conf in inference["members"]:
prefix = "[?] " if conf == "review" else ""
peeps.append(f"{prefix}{name}")
matched_months = inference["months"]
if peeps or matched_months:
person_val = ", ".join(peeps)
purpose_val = ", ".join(matched_months)
amount_val = str(tx["amount"]) # For now, use total amount
print(f"Row {i}: Inferred {person_val} for {purpose_val} ({amount_val} CZK)")
# Update the row in memory (for terminal output/dry run)
row[idx_inferred_person] = person_val
row[idx_inferred_purpose] = purpose_val
row[idx_inferred_amount] = amount_val
# Prepare batch update
updates.append({
"range": f"R{i}C{idx_inferred_person+1}:R{i}C{idx_inferred_amount+1}",
"values": [[person_val, purpose_val, amount_val]]
})
if not updates:
print("No new inferences to make.")
return
if dry_run:
print(f"Dry run: would update {len(updates)} rows.")
else:
print(f"Applying {len(updates)} updates to the sheet...")
body = {
"valueInputOption": "USER_ENTERED",
"data": updates
}
sheet.values().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
print("Update completed successfully.")
def main():
parser = argparse.ArgumentParser(description="Infer payment details in Google Sheets.")
parser.add_argument("--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID")
parser.add_argument("--credentials", default="credentials.json", help="Path to Google API credentials JSON")
parser.add_argument("--dry-run", action="store_true", help="Print updates without applying them")
args = parser.parse_args()
try:
infer_payments(args.sheet_id, args.credentials, args.dry_run)
except Exception as e:
print(f"Inference failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -11,205 +11,7 @@ from html.parser import HTMLParser
from attendance import get_members_with_fees
from czech_utils import normalize, parse_month_references
# ---------------------------------------------------------------------------
# Transaction fetching
# ---------------------------------------------------------------------------
class _FioTableParser(HTMLParser):
"""Parse the second <table class="table"> on the Fio transparent page.
Columns: Datum | Částka | Typ | Název protiúčtu | Zpráva pro příjemce | KS | VS | SS | Poznámka
Indices: 0 1 2 3 4 5 6 7 8
"""
def __init__(self):
super().__init__()
self._table_count = 0
self._in_target_table = False
self._in_thead = False
self._in_row = False
self._in_cell = False
self._current_row: list[str] = []
self._rows: list[list[str]] = []
self._cell_text = ""
def handle_starttag(self, tag, attrs):
cls = dict(attrs).get("class", "")
if tag == "table" and "table" in cls.split():
self._table_count += 1
if self._table_count == 2:
self._in_target_table = True
if self._in_target_table:
if tag == "thead":
self._in_thead = True
if tag == "tr" and not self._in_thead:
self._in_row = True
self._current_row = []
if self._in_row and tag in ("td", "th"):
self._in_cell = True
self._cell_text = ""
def handle_endtag(self, tag):
if self._in_cell and tag in ("td", "th"):
self._in_cell = False
self._current_row.append(self._cell_text.strip())
if tag == "thead":
self._in_thead = False
if self._in_row and tag == "tr":
self._in_row = False
if self._current_row:
self._rows.append(self._current_row)
if tag == "table" and self._in_target_table:
self._in_target_table = False
def handle_data(self, data):
if self._in_cell:
self._cell_text += data
def get_rows(self) -> list[list[str]]:
return self._rows
# Fio transparent table column indices
_COL_DATE = 0
_COL_AMOUNT = 1
_COL_SENDER = 3
_COL_MESSAGE = 4
_COL_KS = 5
_COL_VS = 6
_COL_SS = 7
_COL_NOTE = 8
def _parse_czech_amount(s: str) -> float | None:
"""Parse '1 500,00 CZK' to float."""
s = s.replace("\xa0", "").replace(" ", "").replace(",", ".")
s = re.sub(r"[A-Za-z]+", "", s).strip()
try:
return float(s)
except ValueError:
return None
def _parse_czech_date(s: str) -> str | None:
"""Parse 'DD.MM.YYYY' to 'YYYY-MM-DD'."""
s = s.strip()
for fmt in ("%d.%m.%Y", "%d/%m/%Y"):
try:
return datetime.strptime(s, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return None
def fetch_transactions_transparent(
date_from: str, date_to: str
) -> list[dict]:
"""Fetch transactions from Fio transparent account HTML page.
Args:
date_from: D.M.YYYY format
date_to: D.M.YYYY format
"""
url = (
f"https://ib.fio.cz/ib/transparent?a=2800359168"
f"&f={date_from}&t={date_to}"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8")
parser = _FioTableParser()
parser.feed(html)
rows = parser.get_rows()
transactions = []
for row in rows:
if len(row) < 5:
continue
def col(i):
return row[i].strip() if i < len(row) else ""
date_str = _parse_czech_date(col(_COL_DATE))
amount = _parse_czech_amount(col(_COL_AMOUNT))
if date_str is None or amount is None or amount <= 0:
continue
transactions.append({
"date": date_str,
"amount": amount,
"sender": col(_COL_SENDER),
"message": col(_COL_MESSAGE),
"vs": col(_COL_VS),
})
return transactions
def fetch_transactions_api(
token: str, date_from: str, date_to: str
) -> list[dict]:
"""Fetch transactions via Fio REST API (JSON).
Args:
token: Fio API token
date_from: YYYY-MM-DD format
date_to: YYYY-MM-DD format
"""
url = (
f"https://fioapi.fio.cz/v1/rest/periods/{token}"
f"/{date_from}/{date_to}/transactions.json"
)
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode("utf-8"))
transactions = []
tx_list = data.get("accountStatement", {}).get("transactionList", {})
for tx in (tx_list.get("transaction") or []):
# Each field is {"value": ..., "name": ..., "id": ...} or null
def val(col_id):
col = tx.get(f"column{col_id}")
return col["value"] if col else ""
amount = float(val(1) or 0)
if amount <= 0:
continue # Skip outgoing
date_raw = val(0) or ""
# API returns date as "YYYY-MM-DD+HHMM" or ISO format
date_str = date_raw[:10] if date_raw else ""
transactions.append({
"date": date_str,
"amount": amount,
"sender": str(val(10) or ""), # column10 = sender name
"message": str(val(16) or ""), # column16 = message for recipient
"vs": str(val(5) or ""), # column5 = VS
"user_id": str(val(7) or ""), # column7 = user identification
"sender_account": str(val(2) or ""), # column2 = sender account
})
return transactions
def fetch_transactions(date_from: str, date_to: str) -> list[dict]:
"""Fetch transactions, using API if token available, else transparent page."""
token = os.environ.get("FIO_API_TOKEN", "").strip()
if token:
return fetch_transactions_api(token, date_from, date_to)
# Convert YYYY-MM-DD to DD.MM.YYYY for the transparent page URL
from_dt = datetime.strptime(date_from, "%Y-%m-%d")
to_dt = datetime.strptime(date_to, "%Y-%m-%d")
return fetch_transactions_transparent(
from_dt.strftime("%-d.%-m.%Y"),
to_dt.strftime("%-d.%-m.%Y"),
)
from sync_fio_to_sheets import get_sheets_service, DEFAULT_SPREADSHEET_ID
# ---------------------------------------------------------------------------
@@ -255,34 +57,57 @@ def match_members(
for name in member_names:
variants = _build_name_variants(name)
# Full name match = high confidence
full_name = variants[0] if variants else ""
parts = full_name.split()
# 1. Full name match (exact sequence) = high confidence
if full_name and full_name in normalized_text:
matches.append((name, "auto"))
continue
# Last name + first name both present = high confidence
parts = full_name.split()
# 2. Both first and last name present (any order) = high confidence
if len(parts) >= 2:
if parts[0] in normalized_text and parts[-1] in normalized_text:
matches.append((name, "auto"))
continue
# Nickname match = high confidence
if len(variants) > 1 and variants[1] in normalized_text:
matches.append((name, "auto"))
continue
# 3. Nickname + one part of the name = high confidence
nickname = ""
nickname_match = re.search(r"\(([^)]+)\)", name)
if nickname_match:
nickname = normalize(nickname_match.group(1))
if nickname and nickname in normalized_text:
# Nickname alone is often enough, but let's check if it's combined with a name part
matches.append((name, "auto"))
continue
# Last name only = lower confidence, but skip very common Czech surnames
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
if (
len(parts) >= 2
and len(parts[-1]) >= 4
and parts[-1] not in _COMMON_SURNAMES
and parts[-1] in normalized_text
):
matches.append((name, "review"))
continue
# 4. Partial matches = review confidence
if len(parts) >= 2:
first_name = parts[0]
last_name = parts[-1]
_COMMON_SURNAMES = {"novak", "novakova", "prach"}
# Match last name
if len(last_name) >= 4 and last_name not in _COMMON_SURNAMES and last_name in normalized_text:
matches.append((name, "review"))
continue
# Match first name (if not too short)
if len(first_name) >= 3 and first_name in normalized_text:
matches.append((name, "review"))
continue
elif len(parts) == 1:
# Single name member
if len(parts[0]) >= 4 and parts[0] in normalized_text:
matches.append((name, "review"))
continue
# --- Filtering ---
# If we have any "auto" matches, discard all "review" matches
auto_matches = [m for m in matches if m[1] == "auto"]
if auto_matches:
# If multiple auto matches, keep them (ambiguous but high priority)
return auto_matches
return matches
@@ -291,6 +116,102 @@ def match_members(
# Reconciliation
# ---------------------------------------------------------------------------
def infer_transaction_details(tx: dict, member_names: list[str]) -> dict:
"""Infer member(s) and month(s) for a single transaction.
Returns:
{
'members': [(name, confidence)],
'months': [YYYY-MM],
'matched_text': str
}
"""
# Combine sender + message for searching
search_text = f"{tx.get('sender', '')} {tx.get('message', '')} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx.get("message", "") + " " + tx.get("user_id", "")
)
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx.get("sender", ""), member_names)
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx.get("date")
if tx_date:
try:
if isinstance(tx_date, (int, float)):
# Handle Google Sheets serial date
dt = datetime(1899, 12, 30) + timedelta(days=tx_date)
else:
dt = datetime.strptime(str(tx_date), "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except (ValueError, TypeError):
pass
return {
"members": matched_members,
"months": matched_months,
"search_text": search_text
}
def fetch_sheet_data(spreadsheet_id: str, credentials_path: str) -> list[dict]:
"""Fetch all rows from the Google Sheet and convert to a list of dicts."""
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:Z",
valueRenderOption="UNFORMATTED_VALUE"
).execute()
rows = result.get("values", [])
if not rows:
return []
header = rows[0]
def get_col_index(label):
normalized_label = label.lower().strip()
for i, h in enumerate(header):
if h.lower().strip() == normalized_label:
return i
return -1
idx_date = get_col_index("Date")
idx_amount = get_col_index("Amount")
idx_manual = get_col_index("manual fix")
idx_person = get_col_index("Person")
idx_purpose = get_col_index("Purpose")
idx_inferred_amount = get_col_index("Inferred Amount")
idx_sender = get_col_index("Sender")
idx_message = get_col_index("Message")
idx_bank_id = get_col_index("Bank ID")
transactions = []
for row in rows[1:]:
def get_val(idx):
return row[idx] if idx != -1 and idx < len(row) else ""
tx = {
"date": get_val(idx_date),
"amount": get_val(idx_amount),
"manual_fix": get_val(idx_manual),
"person": get_val(idx_person),
"purpose": get_val(idx_purpose),
"inferred_amount": get_val(idx_inferred_amount),
"sender": get_val(idx_sender),
"message": get_val(idx_message),
"bank_id": get_val(idx_bank_id),
}
transactions.append(tx)
return transactions
def reconcile(
members: list[tuple[str, str, dict[str, int]]],
sorted_months: list[str],
@@ -322,41 +243,54 @@ def reconcile(
credits: dict[str, int] = {}
for tx in transactions:
# Combine sender + message for searching
search_text = f"{tx['sender']} {tx['message']} {tx.get('user_id', '')}"
matched_members = match_members(search_text, member_names)
matched_months = parse_month_references(
tx["message"] + " " + tx.get("user_id", "")
)
# Use sheet columns if they exist, otherwise fallback to inference
person_str = str(tx.get("person", "")).strip()
purpose_str = str(tx.get("purpose", "")).strip()
# Strip markers like [?]
person_str = re.sub(r"\[\?\]\s*", "", person_str)
if person_str and purpose_str:
# We have pre-matched data (either from script or manual)
# Support multiple people/months in the comma-separated string
matched_members = [(p.strip(), "auto") for p in person_str.split(",") if p.strip()]
matched_months = [m.strip() for m in purpose_str.split(",") if m.strip()]
# Use Inferred Amount if available, otherwise bank Amount
amount = tx.get("inferred_amount")
if amount is None or amount == "":
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
else:
# Fallback to inference (for rows not yet processed by infer_payments.py)
inference = infer_transaction_details(tx, member_names)
matched_members = inference["members"]
matched_months = inference["months"]
amount = tx.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0
if not matched_members:
# Try matching sender name alone with more lenient matching
matched_members = match_members(tx["sender"], member_names)
if not matched_members:
unmatched.append(tx)
continue
if not matched_months:
# If no month specified, try to infer from payment date
tx_date = tx["date"]
if tx_date:
try:
dt = datetime.strptime(tx_date, "%Y-%m-%d")
# Assume payment is for the current month
matched_months = [dt.strftime("%Y-%m")]
except ValueError:
pass
if not matched_months:
if not matched_members or not matched_months:
unmatched.append(tx)
continue
# Allocate payment across matched members and months
num_allocations = len(matched_members) * len(matched_months)
per_allocation = tx["amount"] / num_allocations if num_allocations > 0 else 0
per_allocation = amount / num_allocations if num_allocations > 0 else 0
for member_name, confidence in matched_members:
# If we matched via sheet 'Person' column, name might be partial or have markers
# but usually it's the exact member name from get_members_with_fees.
# Let's ensure it exists in our ledger.
if member_name not in ledger:
# Try matching by base name if it was Jan Novak (Kačerr) etc.
pass
for month_key in matched_months:
entry = {
"amount": per_allocation,
@@ -372,16 +306,26 @@ def reconcile(
# Future month — track as credit
credits[member_name] = credits.get(member_name, 0) + int(per_allocation)
# Calculate final total balances (window + off-window credits)
final_balances: dict[str, int] = {}
for name in member_names:
window_balance = sum(
int(mdata["paid"]) - mdata["expected"]
for mdata in ledger[name].values()
)
final_balances[name] = window_balance + credits.get(name, 0)
return {
"members": {
name: {
"tier": member_tiers[name],
"months": ledger[name],
"total_balance": final_balances[name]
}
for name in member_names
},
"unmatched": unmatched,
"credits": credits,
"credits": final_balances, # Redefine credits as any positive total balance
}
@@ -452,12 +396,30 @@ def print_report(result: dict, sorted_months: list[str]):
balance = total_paid - total_expected
print(f" | {f'Expected: {total_expected}, Paid: {int(total_paid)}, Balance: {balance:+d}'}")
# --- Credits ---
if result["credits"]:
print(f"\n{'CREDITS (advance payments for future months)':}")
for name, amount in sorted(result["credits"].items()):
# --- Credits (Total Surplus) ---
all_credits = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] > 0
}
if all_credits:
print(f"\n{'TOTAL CREDITS (advance payments or surplus):'}")
for name, amount in sorted(all_credits.items()):
print(f" {name}: {amount} CZK")
# --- Debts (Missing Payments) ---
all_debts = {
name: data["total_balance"]
for name, data in result["members"].items()
if data["total_balance"] < 0
}
if all_debts:
print(f"\n{'TOTAL DEBTS (missing payments):'}")
for name, amount in sorted(all_debts.items()):
print(f" {name}: {abs(amount)} CZK")
# --- Unmatched transactions ---
if result["unmatched"]:
print(f"\n{'UNMATCHED TRANSACTIONS (need manual review)':}")
@@ -499,13 +461,14 @@ def main():
description="Match bank payments against expected attendance fees."
)
parser.add_argument(
"--from", dest="date_from", default="2025-12-01",
help="Start date YYYY-MM-DD (default: 2025-12-01)",
"--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID"
)
parser.add_argument(
"--to", dest="date_to",
default=datetime.now().strftime("%Y-%m-%d"),
help="End date YYYY-MM-DD (default: today)",
"--credentials", default=".secret/fuj-management-bot-credentials.json",
help="Path to Google API credentials JSON"
)
parser.add_argument(
"--bank", action="store_true", help="Scrape bank instead of using Sheet data"
)
args = parser.parse_args()
@@ -515,9 +478,15 @@ def main():
print("No attendance data found.")
return
print(f"Fetching transactions from {args.date_from} to {args.date_to}...")
transactions = fetch_transactions(args.date_from, args.date_to)
print(f"Found {len(transactions)} incoming transactions.\n")
if args.bank:
print(f"Fetching transactions from Fio bank ({args.date_from} to {args.date_to})...")
from fio_utils import fetch_transactions
transactions = fetch_transactions(args.date_from, args.date_to)
else:
print(f"Fetching transactions from Google Sheet ({args.sheet_id})...")
transactions = fetch_sheet_data(args.sheet_id, args.credentials)
print(f"Processing {len(transactions)} transactions.\n")
result = reconcile(members, sorted_months, transactions)
print_report(result, sorted_months)

View File

@@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""Sync Fio bank transactions to a Google Sheet intermediary ledger."""
import argparse
import hashlib
import os
import pickle
from datetime import datetime, timedelta
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2 import service_account
from googleapiclient.discovery import build
from fio_utils import fetch_transactions
# Configuration
DEFAULT_SPREADSHEET_ID = "1Om0YPoDVCH5cV8BrNz5LG5eR5MMU05ypQC7UMN1xn_Y"
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
TOKEN_FILE = "token.pickle"
COLUMN_LABELS = ["Date", "Amount", "manual fix", "Person", "Purpose", "Inferred Amount", "Sender", "VS", "Message", "Bank ID", "Sync ID"]
def get_sheets_service(credentials_path: str):
"""Authenticate and return the Google Sheets API service."""
if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
# Check if it's a service account
import json
with open(credentials_path, "r") as f:
creds_data = json.load(f)
if creds_data.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_path, scopes=SCOPES
)
else:
# Fallback to OAuth2 flow
creds = None
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "rb") as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES)
creds = flow.run_local_server(port=0)
with open(TOKEN_FILE, "wb") as token:
pickle.dump(creds, token)
return build("sheets", "v4", credentials=creds)
def generate_sync_id(tx: dict) -> str:
"""Generate a unique SHA-256 hash for a transaction.
Hash components: date|amount|currency|sender|vs|message|bank_id
"""
components = [
str(tx.get("date", "")),
str(tx.get("amount", "")),
str(tx.get("currency", "CZK")),
str(tx.get("sender", "")),
str(tx.get("vs", "")),
str(tx.get("message", "")),
str(tx.get("bank_id", "")),
]
raw_str = "|".join(components).lower()
return hashlib.sha256(raw_str.encode("utf-8")).hexdigest()
def sort_sheet_by_date(service, spreadsheet_id):
"""Sort the sheet by the Date column (Column B)."""
# Get the sheet ID (gid) of the first sheet
spreadsheet = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheet_id = spreadsheet['sheets'][0]['properties']['sheetId']
requests = [{
"sortRange": {
"range": {
"sheetId": sheet_id,
"startRowIndex": 1, # Skip header
"endRowIndex": 10000
},
"sortSpecs": [{
"dimensionIndex": 0, # Column A (Date)
"sortOrder": "ASCENDING"
}]
}
}]
service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={"requests": requests}
).execute()
print("Sheet sorted by date.")
def sync_to_sheets(spreadsheet_id: str, credentials_path: str, days: int = None, date_from_str: str = None, date_to_str: str = None, sort_by_date: bool = False):
print(f"Connecting to Google Sheets using {credentials_path}...")
service = get_sheets_service(credentials_path)
sheet = service.spreadsheets()
# 1. Fetch existing IDs from Column G (last column in A-G range)
print(f"Reading existing sync IDs from sheet...")
try:
result = sheet.values().get(
spreadsheetId=spreadsheet_id,
range="A1:K" # Include header and all columns to check Sync ID
).execute()
values = result.get("values", [])
# Check and insert labels if missing
if not values or values[0] != COLUMN_LABELS:
print("Inserting column labels...")
sheet.values().update(
spreadsheetId=spreadsheet_id,
range="A1",
valueInputOption="USER_ENTERED",
body={"values": [COLUMN_LABELS]}
).execute()
existing_ids = set()
else:
# Sync ID is now the last column (index 10)
existing_ids = {row[10] for row in values[1:] if len(row) > 10}
except Exception as e:
print(f"Error reading sheet (maybe empty?): {e}")
existing_ids = set()
# 2. Fetch Fio transactions
if date_from_str and date_to_str:
df_str = date_from_str
dt_str = date_to_str
else:
now = datetime.now()
date_to = now
date_from = now - timedelta(days=days or 30)
df_str = date_from.strftime("%Y-%m-%d")
dt_str = date_to.strftime("%Y-%m-%d")
print(f"Fetching Fio transactions from {df_str} to {dt_str}...")
transactions = fetch_transactions(df_str, dt_str)
print(f"Found {len(transactions)} transactions.")
# 3. Filter for new transactions
new_rows = []
for tx in transactions:
sync_id = generate_sync_id(tx)
if sync_id not in existing_ids:
# Schema: Date | Amount | Manual | Person | Purpose | Inferred Amount | Sender | VS | Message | Bank ID | Sync ID
new_rows.append([
tx.get("date", ""),
tx.get("amount", ""),
"", # Manual
"", # Person
"", # Purpose
"", # Inferred Amount
tx.get("sender", ""),
tx.get("vs", ""),
tx.get("message", ""),
tx.get("bank_id", ""),
sync_id,
])
if not new_rows:
print("No new transactions to sync.")
return
# 4. Append to sheet
print(f"Appending {len(new_rows)} new transactions to the sheet...")
body = {"values": new_rows}
sheet.values().append(
spreadsheetId=spreadsheet_id,
range="A2", # Appends to the end of the sheet
valueInputOption="USER_ENTERED",
body=body
).execute()
print("Sync completed successfully.")
if sort_by_date:
sort_sheet_by_date(service, spreadsheet_id)
def main():
parser = argparse.ArgumentParser(description="Sync Fio transactions to Google Sheets.")
parser.add_argument("--days", type=int, default=30, help="Days to look back (default: 30)")
parser.add_argument("--sheet-id", default=DEFAULT_SPREADSHEET_ID, help="Google Sheet ID")
parser.add_argument("--credentials", default="credentials.json", help="Path to Google API credentials JSON")
parser.add_argument("--from", dest="date_from", help="Start date YYYY-MM-DD")
parser.add_argument("--to", dest="date_to", help="End date YYYY-MM-DD")
parser.add_argument("--sort-by-date", action="store_true", help="Sort the sheet by date after sync")
args = parser.parse_args()
try:
sync_to_sheets(
spreadsheet_id=args.sheet_id,
credentials_path=args.credentials,
days=args.days,
date_from_str=args.date_from,
date_to_str=args.date_to,
sort_by_date=args.sort_by_date
)
except Exception as e:
print(f"Sync failed: {e}")
if __name__ == "__main__":
main()