import json import os import socket import logging from datetime import datetime from pathlib import Path from google.oauth2 import service_account from googleapiclient.discovery import build logger = logging.getLogger(__name__) # Constants CACHE_DIR = Path(__file__).parent.parent / "tmp" CREDS_PATH = Path(__file__).parent.parent / ".secret" / "fuj-management-bot-credentials.json" DRIVE_TIMEOUT = 10 # seconds CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", 300)) # 30 min default for max cache age CACHE_API_CHECK_TTL_SECONDS = int(os.environ.get("CACHE_API_CHECK_TTL_SECONDS", 300)) # 5 min default # Known mappings mapping "cache name" to Google Sheet ID CACHE_SHEET_MAP = { "attendance_regular": "1E2e_gT_K5AwSRCDLDTa2UetZTkHmBOcz0kFbBUNUNBA", "attendance_juniors": "1E2e_gT_K5AwSRCDLDTa2UetZTkHmBOcz0kFbBUNUNBA", "exceptions_dict": "1Om0YPoDVCH5cV8BrNz5LG5eR5MMU05ypQC7UMN1xn_Y", "payments_transactions": "1Om0YPoDVCH5cV8BrNz5LG5eR5MMU05ypQC7UMN1xn_Y" } # Global state to track last Drive API check time per sheet _LAST_CHECKED = {} _DRIVE_SERVICE = None def _get_drive_service(): global _DRIVE_SERVICE if _DRIVE_SERVICE is not None: return _DRIVE_SERVICE if not CREDS_PATH.exists(): logger.warning(f"Credentials not found at {CREDS_PATH}. Cannot check Google Drive API.") return None try: creds = service_account.Credentials.from_service_account_file( str(CREDS_PATH), scopes=["https://www.googleapis.com/auth/drive.readonly"] ) # Apply timeout safely to the httplib2 connection without mutating global socket import httplib2 import google_auth_httplib2 http = httplib2.Http(timeout=DRIVE_TIMEOUT) http = google_auth_httplib2.AuthorizedHttp(creds, http=http) _DRIVE_SERVICE = build("drive", "v3", http=http, cache_discovery=False) return _DRIVE_SERVICE except Exception as e: logger.error(f"Failed to build Drive API service: {e}") return None import time def get_sheet_modified_time(cache_key: str) -> str | None: """Gets the modifiedTime from Google Drive API for a given cache_key. Returns the ISO timestamp string if successful. If the Drive API fails (e.g., lack of permissions for public sheets), it generates a virtual time bucket string to provide a 5-minute TTL cache. """ sheet_id = CACHE_SHEET_MAP.get(cache_key, cache_key) cache_file = CACHE_DIR / f"{cache_key}_cache.json" # 1. Check if we should skip the Drive API check entirely (global memory TTL) now = time.time() last_check = _LAST_CHECKED.get(sheet_id, 0) if CACHE_API_CHECK_TTL_SECONDS > 0 and (now - last_check) < CACHE_API_CHECK_TTL_SECONDS: # We checked recently. Return cached modifiedTime if cache file exists. if cache_file.exists(): try: with open(cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) cached_time = cache_data.get("modifiedTime") if cached_time: logger.info(f"Skipping Drive API check for {sheet_id} due to {CACHE_API_CHECK_TTL_SECONDS}s API check TTL") return cached_time except Exception as e: logger.warning(f"Error reading existing cache during API skip for {sheet_id}: {e}") # 2. Check if the cache file is simply too new (legacy check) if CACHE_TTL_SECONDS > 0 and cache_file.exists(): try: file_mtime = os.path.getmtime(cache_file) if time.time() - file_mtime < CACHE_TTL_SECONDS: with open(cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) cached_time = cache_data.get("modifiedTime") if cached_time: logger.info(f"Skipping Drive API check for {sheet_id} due to {CACHE_TTL_SECONDS}s max CACHE_TTL") # We consider this a valid check, update the global state _LAST_CHECKED[sheet_id] = now return cached_time except Exception as e: logger.warning(f"Error checking cache TTL for {sheet_id}: {e}") def _fallback_ttl(): bucket = int(time.time() // 300) return f"ttl-5m-{bucket}" logger.info(f"Checking Drive API for {sheet_id}") drive_service = _get_drive_service() if not drive_service: return _fallback_ttl() try: file_meta = drive_service.files().get(fileId=sheet_id, fields="modifiedTime", supportsAllDrives=True).execute() # Successfully checked API, update the global state _LAST_CHECKED[sheet_id] = time.time() return file_meta.get("modifiedTime") except Exception as e: logger.warning(f"Could not get modifiedTime for sheet {sheet_id}: {e}. Falling back to 5-minute TTL.") return _fallback_ttl() def read_cache(sheet_id: str, current_modified_time: str) -> list | dict | None: """Reads the JSON cache for the given sheet_id. Returns the cached data if it exists AND the cached modifiedTime matches current_modified_time. Otherwise, returns None. """ if not current_modified_time: return None cache_file = CACHE_DIR / f"{sheet_id}_cache.json" if not cache_file.exists(): return None try: with open(cache_file, "r", encoding="utf-8") as f: cache_data = json.load(f) cached_time = cache_data.get("modifiedTime") if cached_time == current_modified_time: logger.info(f"Cache hit for {sheet_id} ({current_modified_time})") return cache_data.get("data") else: logger.info(f"Cache miss for {sheet_id}. Cached: {cached_time}, Current: {current_modified_time}") return None except Exception as e: logger.error(f"Failed to read cache {cache_file}: {e}") return None def write_cache(sheet_id: str, modified_time: str, data: list | dict) -> None: """Writes the data to a JSON cache file with the given modified_time.""" if not modified_time: return try: CACHE_DIR.mkdir(parents=True, exist_ok=True) cache_file = CACHE_DIR / f"{sheet_id}_cache.json" cache_data = { "modifiedTime": modified_time, "data": data, "cachedAt": datetime.now().isoformat() } with open(cache_file, "w", encoding="utf-8") as f: json.dump(cache_data, f, ensure_ascii=False) logger.info(f"Wrote cache for {sheet_id}") except Exception as e: logger.error(f"Failed to write cache {sheet_id}: {e}")