import asyncio import collections import datetime import hashlib import json import os import pathlib import re import subprocess import urllib.parse from datetime import datetime, timezone from typing import Any, Dict, Iterable, List, Optional, AsyncGenerator, Union, Tuple, Set from uuid import uuid4 import aiosqlite from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, Response, PlainTextResponse import client class AsyncDataSet: _KV_TABLE = "__kv_store" _DEFAULT_COLUMNS = { "uid": "TEXT PRIMARY KEY", "created_at": "TEXT", "updated_at": "TEXT", "deleted_at": "TEXT", } def __init__(self, file: str): self._file = file self._table_columns_cache: Dict[str, Set[str]] = {} @staticmethod def _utc_iso() -> str: return ( datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) @staticmethod def _py_to_sqlite_type(value: Any) -> str: if value is None: return "TEXT" if isinstance(value, bool): return "INTEGER" if isinstance(value, int): return "INTEGER" if isinstance(value, float): return "REAL" if isinstance(value, (bytes, bytearray, memoryview)): return "BLOB" return "TEXT" async def _get_table_columns(self, table: str) -> Set[str]: if table in self._table_columns_cache: return self._table_columns_cache[table] columns = set() try: async with aiosqlite.connect(self._file) as db: async with db.execute(f"PRAGMA table_info({table})") as cursor: async for row in cursor: columns.add(row[1]) self._table_columns_cache[table] = columns except: pass return columns async def _invalidate_column_cache(self, table: str): if table in self._table_columns_cache: del self._table_columns_cache[table] async def _ensure_column(self, table: str, name: str, value: Any) -> None: col_type = self._py_to_sqlite_type(value) try: async with aiosqlite.connect(self._file) as db: await db.execute(f"ALTER TABLE {table} ADD COLUMN `{name}` {col_type}") await db.commit() await self._invalidate_column_cache(table) except aiosqlite.OperationalError as e: if "duplicate column name" in str(e).lower(): pass else: raise async def _ensure_table(self, table: str, col_sources: Dict[str, Any]) -> None: cols = self._DEFAULT_COLUMNS.copy() for key, val in col_sources.items(): if key not in cols: cols[key] = self._py_to_sqlite_type(val) columns_sql = ", ".join(f"`{k}` {t}" for k, t in cols.items()) async with aiosqlite.connect(self._file) as db: await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})") await db.commit() await self._invalidate_column_cache(table) async def _table_exists(self, table: str) -> bool: async with aiosqlite.connect(self._file) as db: async with db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,) ) as cursor: return await cursor.fetchone() is not None _RE_NO_COLUMN = re.compile(r"(?:no such column:|has no column named) (\w+)") _RE_NO_TABLE = re.compile(r"no such table: (\w+)") @classmethod def _missing_column_from_error( cls, err: aiosqlite.OperationalError ) -> Optional[str]: m = cls._RE_NO_COLUMN.search(str(err)) return m.group(1) if m else None @classmethod def _missing_table_from_error( cls, err: aiosqlite.OperationalError ) -> Optional[str]: m = cls._RE_NO_TABLE.search(str(err)) return m.group(1) if m else None async def _safe_execute( self, table: str, sql: str, params: Iterable[Any], col_sources: Dict[str, Any], max_retries: int = 10 ) -> aiosqlite.Cursor: retries = 0 while retries < max_retries: try: async with aiosqlite.connect(self._file) as db: cursor = await db.execute(sql, params) await db.commit() return cursor except aiosqlite.OperationalError as err: retries += 1 err_str = str(err).lower() col = self._missing_column_from_error(err) if col: if col in col_sources: await self._ensure_column(table, col, col_sources[col]) else: await self._ensure_column(table, col, None) continue tbl = self._missing_table_from_error(err) if tbl: await self._ensure_table(tbl, col_sources) continue if "has no column named" in err_str: match = re.search(r"table \w+ has no column named (\w+)", err_str) if match: col_name = match.group(1) if col_name in col_sources: await self._ensure_column(table, col_name, col_sources[col_name]) else: await self._ensure_column(table, col_name, None) continue raise raise Exception(f"Max retries ({max_retries}) exceeded") async def _filter_existing_columns(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]: if not await self._table_exists(table): return data existing_columns = await self._get_table_columns(table) if not existing_columns: return data return {k: v for k, v in data.items() if k in existing_columns} async def _safe_query( self, table: str, sql: str, params: Iterable[Any], col_sources: Dict[str, Any], ) -> AsyncGenerator[Dict[str, Any], None]: if not await self._table_exists(table): return max_retries = 10 retries = 0 while retries < max_retries: try: async with aiosqlite.connect(self._file) as db: db.row_factory = aiosqlite.Row async with db.execute(sql, params) as cursor: async for row in cursor: yield dict(row) return except aiosqlite.OperationalError as err: retries += 1 err_str = str(err).lower() tbl = self._missing_table_from_error(err) if tbl: return if "no such column" in err_str: return raise @staticmethod def _build_where(where: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]: if not where: return "", [] clauses, vals = zip(*[(f"`{k}` = ?", v) for k, v in where.items()]) return " WHERE " + " AND ".join(clauses), list(vals) async def insert(self, table: str, args: Dict[str, Any], return_id: bool = False) -> Union[str, int]: uid = str(uuid4()) now = self._utc_iso() record = { "uid": uid, "created_at": now, "updated_at": now, "deleted_at": None, **args, } await self._ensure_table(table, record) if return_id and 'id' not in args: async with aiosqlite.connect(self._file) as db: try: await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER PRIMARY KEY AUTOINCREMENT") await db.commit() except aiosqlite.OperationalError as e: if "duplicate column name" not in str(e).lower(): try: await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER") await db.commit() except: pass await self._invalidate_column_cache(table) cols = "`" + "`, `".join(record.keys()) + "`" qs = ", ".join(["?"] * len(record)) sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})" cursor = await self._safe_execute(table, sql, list(record.values()), record) return cursor.lastrowid cols = "`" + "`, `".join(record.keys()) + "`" qs = ", ".join(["?"] * len(record)) sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})" await self._safe_execute(table, sql, list(record.values()), record) return uid async def update( self, table: str, args: Dict[str, Any], where: Optional[Dict[str, Any]] = None, ) -> int: if not args: return 0 if not await self._table_exists(table): return 0 args["updated_at"] = self._utc_iso() all_cols = {**args, **(where or {})} await self._ensure_table(table, all_cols) for col, val in all_cols.items(): await self._ensure_column(table, col, val) set_clause = ", ".join(f"`{k}` = ?" for k in args) where_clause, where_params = self._build_where(where) sql = f"UPDATE {table} SET {set_clause}{where_clause}" params = list(args.values()) + where_params cur = await self._safe_execute(table, sql, params, all_cols) return cur.rowcount async def delete(self, table: str, where: Optional[Dict[str, Any]] = None) -> int: if not await self._table_exists(table): return 0 where_clause, where_params = self._build_where(where) sql = f"DELETE FROM {table}{where_clause}" cur = await self._safe_execute(table, sql, where_params, where or {}) return cur.rowcount async def upsert( self, table: str, args: Dict[str, Any], where: Optional[Dict[str, Any]] = None, ) -> str | None: if not args: raise ValueError("Nothing to update. Empty dict given.") args['updated_at'] = self._utc_iso() affected = await self.update(table, args, where) if affected: rec = await self.get(table, where) return rec.get("uid") if rec else None merged = {**(where or {}), **args} return await self.insert(table, merged) async def get( self, table: str, where: Optional[Dict[str, Any]] = None ) -> Optional[Dict[str, Any]]: where_clause, where_params = self._build_where(where) sql = f"SELECT * FROM {table}{where_clause} LIMIT 1" async for row in self._safe_query(table, sql, where_params, where or {}): return row return None async def find( self, table: str, where: Optional[Dict[str, Any]] = None, *, limit: int = 0, offset: int = 0, order_by: Optional[str] = None, ) -> List[Dict[str, Any]]: where_clause, where_params = self._build_where(where) order_clause = f" ORDER BY {order_by}" if order_by else "" extra = (f" LIMIT {limit}" if limit else "") + ( f" OFFSET {offset}" if offset else "" ) sql = f"SELECT * FROM {table}{where_clause}{order_clause}{extra}" return [ row async for row in self._safe_query(table, sql, where_params, where or {}) ] async def count(self, table: str, where: Optional[Dict[str, Any]] = None) -> int: if not await self._table_exists(table): return 0 where_clause, where_params = self._build_where(where) sql = f"SELECT COUNT(*) FROM {table}{where_clause}" gen = self._safe_query(table, sql, where_params, where or {}) async for row in gen: return next(iter(row.values()), 0) return 0 async def exists(self, table: str, where: Dict[str, Any]) -> bool: return (await self.count(table, where)) > 0 async def kv_set( self, key: str, value: Any, *, table: str | None = None, ) -> None: tbl = table or self._KV_TABLE json_val = json.dumps(value, default=str) await self.upsert(tbl, {"value": json_val}, {"key": key}) async def kv_get( self, key: str, *, default: Any = None, table: str | None = None, ) -> Any: tbl = table or self._KV_TABLE row = await self.get(tbl, {"key": key}) if not row: return default try: return json.loads(row["value"]) except Exception: return default async def execute_raw(self, sql: str, params: Optional[Tuple] = None) -> Any: async with aiosqlite.connect(self._file) as db: cursor = await db.execute(sql, params or ()) await db.commit() return cursor async def query_raw(self, sql: str, params: Optional[Tuple] = None) -> List[Dict[str, Any]]: try: async with aiosqlite.connect(self._file) as db: db.row_factory = aiosqlite.Row async with db.execute(sql, params or ()) as cursor: return [dict(row) async for row in cursor] except aiosqlite.OperationalError: return [] async def query_one(self, sql: str, params: Optional[Tuple] = None) -> Optional[Dict[str, Any]]: results = await self.query_raw(sql + " LIMIT 1", params) return results[0] if results else None async def create_table(self, table: str, schema: Dict[str, str], constraints: Optional[List[str]] = None): full_schema = self._DEFAULT_COLUMNS.copy() full_schema.update(schema) columns = [f"`{col}` {dtype}" for col, dtype in full_schema.items()] if constraints: columns.extend(constraints) columns_sql = ", ".join(columns) async with aiosqlite.connect(self._file) as db: await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})") await db.commit() await self._invalidate_column_cache(table) async def insert_unique(self, table: str, args: Dict[str, Any], unique_fields: List[str]) -> Union[str, None]: try: return await self.insert(table, args) except aiosqlite.IntegrityError as e: if "UNIQUE" in str(e): return None raise async def transaction(self): return TransactionContext(self._file) async def aggregate(self, table: str, function: str, column: str = "*", where: Optional[Dict[str, Any]] = None) -> Any: if not await self._table_exists(table): return None where_clause, where_params = self._build_where(where) sql = f"SELECT {function}({column}) as result FROM {table}{where_clause}" result = await self.query_one(sql, tuple(where_params)) return result['result'] if result else None class TransactionContext: def __init__(self, db_file: str): self.db_file = db_file self.conn = None async def __aenter__(self): self.conn = await aiosqlite.connect(self.db_file) self.conn.row_factory = aiosqlite.Row await self.conn.execute("BEGIN") return self.conn async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type is None: await self.conn.commit() else: await self.conn.rollback() await self.conn.close() db = AsyncDataSet("zhurnal.db") app = FastAPI() repos_dir = pathlib.Path("repos") logs_dir = pathlib.Path("logs") logs_dir.mkdir(exist_ok=True) log_file = logs_dir / "app.log" salt = "x1337x" def rotate_logs(): if not log_file.exists(): return if log_file.stat().st_size < 1024 * 1024: return for i in range(5, 0, -1): old = logs_dir / f"app.log.{i}" new = logs_dir / f"app.log.{i+1}" if i < 5 else None if old.exists(): if new: old.rename(new) else: old.unlink() log_file.rename(logs_dir / "app.log.1") def log(message: str, redact_url: Optional[str] = None): rotate_logs() timestamp = datetime.now().isoformat() if redact_url: parsed = urllib.parse.urlparse(redact_url) if parsed.username or parsed.password: redacted_netloc = f"***@{parsed.hostname}" if parsed.port: redacted_netloc += f":{parsed.port}" redacted_url = parsed._replace(netloc=redacted_netloc).geturl() message = message.replace(redact_url, redacted_url) with log_file.open("a") as f: f.write(f"[{timestamp}] {message}\n") connections: Dict[str, List[WebSocket]] = collections.defaultdict(list) tasks: Dict[str, asyncio.Task] = {} async def broadcast(repo_hash: str, data: dict): for connection in connections[repo_hash]: await connection.send_json(data) def run_git_command(repo_path: str, command: list[str]) -> str: result = subprocess.run(["git", "-C", repo_path] + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: raise Exception(result.stderr.strip()) return result.stdout.strip()[:1024 * 1024] def get_commits_by_day(repo_path: pathlib.Path) -> Dict[str, List[str]]: log_output = run_git_command(str(repo_path), ["log", "--pretty=format:%H|%ad", "--date=short"]) commits_by_date = collections.defaultdict(list) for line in log_output.splitlines(): if "|" in line: commit_hash, commit_date = line.split("|", 1) commits_by_date[commit_date].append(commit_hash) return commits_by_date async def process_repo(url: str, repo_hash: str, is_user_request: bool = False): try: log(f"Processing repo {url} ({repo_hash})", redact_url=url) repo_path = repos_dir / repo_hash repos_dir.mkdir(exist_ok=True) if not repo_path.exists(): await broadcast(repo_hash, {"type": "message", "content": "Cloning repository..."}) for _ in range(3): try: subprocess.run(["git", "clone", url, str(repo_path)], check=True) break except subprocess.CalledProcessError: await asyncio.sleep(1) else: raise Exception("Clone failed after 3 tries") else: await broadcast(repo_hash, {"type": "message", "content": "Updating repository..."}) for _ in range(3): try: subprocess.run(["git", "-C", str(repo_path), "pull"], check=True) break except subprocess.CalledProcessError: await asyncio.sleep(1) else: raise Exception("Pull failed after 3 tries") commits_by_day = get_commits_by_day(repo_path) history = {c["uid"]: c for c in await db.find("commits", {"repo": repo_hash})} new_commits = 0 for date in sorted(commits_by_day): await broadcast(repo_hash, {"type": "message", "content": f"Diffs for {date}"}) for commit in reversed(commits_by_day[date]): if commit in history and history[commit]["status"] == "success": continue new_commits += 1 diff = run_git_command(str(repo_path), ["show", commit, "--no-color", "--format=medium"]) messages = f"### Commit: `{commit}`\n{diff}" prompt = f"""1. Generate a git one liner based on git diff 2. Example of a good message is: `feat: Enhanced message display with emojis and code highlighting` 3. Example of a good message is: `fix: Fixed a bug that caused the app to crash` 4. Do not include explanations in your response. It must look like human written. 5. The diff to respond to with a git one liner: {messages}""" for _ in range(3): try: ai = client.APIClient() result = ai.prompt(prompt, json=False, reduce_tokens=79000) await broadcast(repo_hash, {"type": "message", "content": result}) await db.upsert("commits", {"repo": repo_hash, "date": date, "line": result, "status": "success", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit}) break except Exception as e: if _ == 2: await broadcast(repo_hash, {"type": "message", "content": f"Commit {commit}: failed"}) await db.upsert("commits", {"repo": repo_hash, "date": date, "error": str(e), "status": "failed", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit}) await asyncio.sleep(1) if new_commits == 0 and is_user_request: await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"}) else: await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"}) await db.update("repos", {"last_synced": datetime.utcnow().isoformat()}, {"hash": repo_hash}) except Exception as e: log(f"Error: {str(e)}", redact_url=url) await broadcast(repo_hash, {"type": "error", "content": str(e)}) async def scheduler(): while True: await asyncio.sleep(3600) repos = await db.find("repos", {"hourly_enabled": True}) for repo in repos: repo_hash = repo["hash"] url = repo["original_url"] if repo_hash not in tasks or tasks[repo_hash].done(): tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash)) schedule_task = None @app.get("/", response_class=HTMLResponse) async def root(): global schedule_task if not schedule_task: schedule_task = asyncio.create_task(scheduler()) return open("index.html").read() @app.post("/submit") async def submit(request: Request): form = await request.form() url = form.get("repo_url") if not url or len(url) > 1000: return Response("Invalid URL", status_code=400) parsed = urllib.parse.urlparse(url) if parsed.scheme not in ("http", "https") or not parsed.netloc: return Response("Invalid URL", status_code=400) repo_hash = hashlib.sha256(url.encode()).hexdigest() repo = await db.get("repos", {"hash": repo_hash}) if not repo: await db.insert("repos", {"hash": repo_hash, "original_url": url, "last_synced": None, "hourly_enabled": True}) if repo_hash not in tasks or tasks[repo_hash].done(): tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash, True)) return {"hash": repo_hash} @app.websocket("/ws/{repo_hash}") async def ws(websocket: WebSocket, repo_hash: str): await websocket.accept() connections[repo_hash].append(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: connections[repo_hash].remove(websocket) @app.get("/changelog/{repo_hash}", response_class=HTMLResponse) async def get_changelog(repo_hash: str): commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC") by_date = collections.defaultdict(list) for c in commits: by_date[c["date"]].append(c["line"]) content = "".join(f"