""" API + static file server for the Telegram Mini App. Serves webapp/ and REST endpoints, using the existing db.py layer. """ import csv import hashlib import hmac import io import json import logging import os import pathlib import subprocess import zlib from datetime import datetime, timezone from urllib.parse import parse_qs from aiohttp import web from db import init_db, save_workout, get_workouts, get_workout_count, get_stats_sql, delete_workout, update_workout, export_workouts, get_user_workout_number, get_all_exercise_names, log_event, get_settings, update_settings, get_last_exercise from parser import parse_workout, format_workout logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # ── Version (computed once at startup) ─────────────────────────── def _resolve_ref(git_dir: pathlib.Path, ref: str) -> str | None: """Resolve a ref name to its SHA, handling loose and packed refs.""" loose = git_dir / ref if loose.is_file(): return loose.read_text().strip() packed = git_dir / "packed-refs" if packed.is_file(): for line in packed.read_text().splitlines(): if not line or line.startswith("#") or line.startswith("^"): continue parts = line.split(maxsplit=1) if len(parts) == 2 and parts[1] == ref: return parts[0] return None def _read_commit_date_utc(git_dir: pathlib.Path, sha: str) -> str | None: """Parse a loose commit object and return committer date as YYYY-MM-DD (UTC).""" obj_path = git_dir / "objects" / sha[:2] / sha[2:] if not obj_path.exists(): return None # packed — skip; recent HEAD is usually loose try: raw = zlib.decompress(obj_path.read_bytes()) content = raw[raw.index(b"\0") + 1:].decode("utf-8", errors="replace") for line in content.splitlines(): if line.startswith("committer "): # "committer Name " parts = line.rsplit(" ", 2) if len(parts) == 3 and parts[1].lstrip("-").isdigit(): ts = int(parts[1]) return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d") return None except (OSError, ValueError, zlib.error): return None def _compute_version() -> str: """Return ' ', with a pure-Python fallback for environments where the `git` binary isn't on PATH (e.g. minimal systemd service environments on NixOS). """ repo_root = pathlib.Path(__file__).parent # Preferred: git (%cs = committer date short, %h = short SHA). try: out = subprocess.run( ["git", "log", "-1", "--format=%cs %h"], cwd=repo_root, capture_output=True, text=True, timeout=2, check=True, ) if out.stdout.strip(): return out.stdout.strip() except (subprocess.SubprocessError, FileNotFoundError, OSError): pass # Fallback: resolve HEAD + read commit object for the date. try: git_dir = repo_root / ".git" if not git_dir.is_dir(): return "unknown" # worktree or unusual layout head = (git_dir / "HEAD").read_text().strip() sha = _resolve_ref(git_dir, head[5:]) if head.startswith("ref: ") else head if not sha: return "unknown" short_sha = sha[:7] date = _read_commit_date_utc(git_dir, sha) return f"{date} {short_sha}" if date else short_sha except OSError: return "unknown" _VERSION = _compute_version() # ── Token (injected by start.py via env) ───────────────────────── def _get_bot_token() -> str: return os.environ.get("BOT_TOKEN", "") # ── Telegram initData validation ───────────────────────────────── def validate_init_data(init_data: str) -> dict | None: """Validate Telegram WebApp initData. Returns user dict if valid.""" if not init_data: return None bot_token = _get_bot_token() if not bot_token: return None parsed = parse_qs(init_data, keep_blank_values=True) received_hash = parsed.get("hash", [None])[0] if not received_hash: return None data_pairs = [] for key, values in parsed.items(): if key == "hash": continue data_pairs.append(f"{key}={values[0]}") data_pairs.sort() data_check_string = "\n".join(data_pairs) secret_key = hmac.new( b"WebAppData", bot_token.encode(), hashlib.sha256 ).digest() computed_hash = hmac.new( secret_key, data_check_string.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(computed_hash, received_hash): logger.warning("Invalid initData hash") return None user_json = parsed.get("user", [None])[0] if not user_json: return None try: return json.loads(user_json) except json.JSONDecodeError: return None # ── Auth middleware ─────────────────────────────────────────────── def get_user_id(request: web.Request) -> int | None: init_data = request.headers.get("X-Telegram-Init-Data", "") user = validate_init_data(init_data) if user: return user["id"] return None def require_auth(handler): async def wrapper(request: web.Request): user_id = get_user_id(request) if not user_id: return web.json_response({"error": "Unauthorized"}, status=401) request["user_id"] = user_id return await handler(request) return wrapper # ── API Routes ─────────────────────────────────────────────────── @require_auth async def api_get_workouts(request: web.Request): """Return recent workouts with exercises.""" limit = int(request.query.get("limit", "20")) offset = int(request.query.get("offset", "0")) workouts = get_workouts(request["user_id"], limit=limit, offset=offset) total = get_workout_count(request["user_id"]) return web.json_response({"workouts": workouts, "total": total}) @require_auth async def api_save_workout(request: web.Request): """Save a workout from structured JSON or raw text.""" body = await request.json() raw_text = body.get("raw_text", "") superset_groups = body.get("superset_groups") note = body.get("note") or None if superset_groups: # Structured input from the Mini App UI from datetime import datetime, timezone workout_id = save_workout( user_id=request["user_id"], timestamp=datetime.now(timezone.utc), superset_groups=superset_groups, raw_text=raw_text or None, note=note, ) elif raw_text: # Text-based input (same format as sending a message to the bot) groups, errors = parse_workout(raw_text) if not groups: error_lines = [e.line for e in errors] if errors else [] return web.json_response( {"error": "Could not parse workout text", "failed_lines": error_lines}, status=400, ) from datetime import datetime, timezone superset_dicts = [[ex.to_dict() for ex in group] for group in groups] workout_id = save_workout( user_id=request["user_id"], timestamp=datetime.now(timezone.utc), superset_groups=superset_dicts, raw_text=raw_text, note=note, ) else: return web.json_response( {"error": "Provide superset_groups or raw_text"}, status=400 ) user_number = get_user_workout_number(request["user_id"], workout_id) log_event(request["user_id"], "workout.save", { "source": "webapp", "workout_id": workout_id, "user_number": user_number, }) return web.json_response( {"workout_id": workout_id, "user_number": user_number}, status=201, ) @require_auth async def api_update_workout(request: web.Request): """Update a workout — soft-deletes old, creates new with same timestamp.""" workout_id = int(request.match_info["workout_id"]) body = await request.json() superset_groups = body.get("superset_groups") note = body.get("note") or None if not superset_groups: return web.json_response({"error": "Provide superset_groups"}, status=400) new_id = update_workout(request["user_id"], workout_id, superset_groups, note=note) if new_id is None: return web.json_response({"error": "Not found"}, status=404) user_number = get_user_workout_number(request["user_id"], new_id) log_event(request["user_id"], "workout.update", { "old_workout_id": workout_id, "workout_id": new_id, "user_number": user_number, }) return web.json_response({"workout_id": new_id, "user_number": user_number}) @require_auth async def api_delete_workout(request: web.Request): """Soft-delete a workout by ID.""" workout_id = int(request.match_info["workout_id"]) if delete_workout(request["user_id"], workout_id): log_event(request["user_id"], "workout.delete", { "source": "webapp", "workout_id": workout_id, }) return web.json_response({"deleted": True}) return web.json_response({"error": "Not found"}, status=404) @require_auth async def api_get_exercise_names(request: web.Request): """Return exercise names (for autocomplete), aggregated globally across all users and ordered by popularity. """ return web.json_response({"exercises": get_all_exercise_names()}) @require_auth async def api_get_last_exercise(request: web.Request): """Return the user's most recent logged entry for a given exercise name.""" name = request.query.get("name", "").strip() if not name: return web.json_response({"error": "Missing name"}, status=400) last = get_last_exercise(request["user_id"], name) return web.json_response({"last": last}) @require_auth async def api_get_stats(request: web.Request): """Return summary stats for the user.""" stats = get_stats_sql(request["user_id"]) return web.json_response(stats) @require_auth async def api_export_json(request: web.Request): """Export all workouts as JSON.""" data = export_workouts(request["user_id"]) return web.json_response({"records": data, "count": len(data)}) async def api_version(request: web.Request): """Return the running server version. Unauthenticated.""" return web.json_response({"version": _VERSION}) @require_auth async def api_get_settings(request: web.Request): """Return the authenticated user's settings dict.""" return web.json_response({"settings": get_settings(request["user_id"])}) @require_auth async def api_update_settings(request: web.Request): """Merge a partial settings patch; returns the full updated settings.""" try: body = await request.json() except (ValueError, json.JSONDecodeError): return web.json_response({"error": "Invalid JSON"}, status=400) if not isinstance(body, dict): return web.json_response({"error": "Body must be an object"}, status=400) updated = update_settings(request["user_id"], body) return web.json_response({"settings": updated}) @require_auth async def api_log_event(request: web.Request): """Record a client-emitted event (Mini App telemetry).""" try: body = await request.json() except (ValueError, json.JSONDecodeError): return web.json_response({"error": "Invalid JSON"}, status=400) kind = body.get("kind") if not isinstance(kind, str) or not kind: return web.json_response({"error": "Missing kind"}, status=400) data = body.get("data") if isinstance(body.get("data"), dict) else None log_event(request["user_id"], kind, data) return web.Response(status=204) @require_auth async def api_export_csv(request: web.Request): """Export all workouts as CSV.""" data = export_workouts(request["user_id"]) output = io.StringIO() if data: writer = csv.DictWriter(output, fieldnames=data[0].keys()) writer.writeheader() writer.writerows(data) return web.Response( text=output.getvalue(), content_type="text/csv", headers={"Content-Disposition": "attachment; filename=workouts.csv"}, ) # ── App setup ──────────────────────────────────────────────────── def create_app() -> web.Application: init_db() app = web.Application() app.router.add_get("/api/workouts", api_get_workouts) app.router.add_post("/api/workouts", api_save_workout) app.router.add_put("/api/workouts/{workout_id}", api_update_workout) app.router.add_delete("/api/workouts/{workout_id}", api_delete_workout) app.router.add_get("/api/exercises", api_get_exercise_names) app.router.add_get("/api/exercises/last", api_get_last_exercise) app.router.add_get("/api/stats", api_get_stats) app.router.add_get("/api/export/json", api_export_json) app.router.add_get("/api/export/csv", api_export_csv) app.router.add_get("/api/version", api_version) app.router.add_post("/api/events", api_log_event) app.router.add_get("/api/settings", api_get_settings) app.router.add_put("/api/settings", api_update_settings) # Serve the webapp/ folder webapp_dir = pathlib.Path(__file__).parent / "webapp" async def index_handler(request): return web.FileResponse(webapp_dir / "index.html") app.router.add_get("/", index_handler) app.router.add_static("/", webapp_dir) return app if __name__ == "__main__": port = int(os.environ.get("API_PORT", "8080")) host = os.environ.get("API_HOST", "0.0.0.0") app = create_app() logger.info("Server starting on %s:%s", host, port) web.run_app(app, host=host, port=port)