From cff3dc708719a6d7f0f5403d9e9514463aeeb4ca Mon Sep 17 00:00:00 2001 From: retoor Date: Mon, 4 Aug 2025 19:22:16 +0200 Subject: [PATCH] Initial commit. --- .gitignore | 4 + changelog.html | 25 +++ client.py | 17 ++ env.py | 1 + index.html | 54 +++++ main.py | 587 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 688 insertions(+) create mode 100644 .gitignore create mode 100644 changelog.html create mode 100755 client.py create mode 100755 env.py create mode 100644 index.html create mode 100644 main.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..66c5db3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +logs +zhurnal.db +repos diff --git a/changelog.html b/changelog.html new file mode 100644 index 0000000..9632d0d --- /dev/null +++ b/changelog.html @@ -0,0 +1,25 @@ + + + + + + Zhurnal Changelog + + + + +

Changelog

+ {{content}} +

+ Download JSON | + Download Markdown +

+ + diff --git a/client.py b/client.py new file mode 100755 index 0000000..6d5d49f --- /dev/null +++ b/client.py @@ -0,0 +1,17 @@ +import requests +import env +BASE_URL = env.base_url + +class APIClient: + def __init__(self, base_url=BASE_URL): + self.base_url = base_url + self.session = requests.Session() + + def prompt(self, prompt,json=True, model='google/gemma-3-12b-it',use_cache=True,reduce_tokens=None): + """No documentation available.""" + url = f"{self.base_url}/ai/prompt" + response = self.session.post(url, json={ "prompt": prompt, "model": model,"json":json,'use_cache':use_cache,reduce_tokens:reduce_tokens }) + response.raise_for_status() + return response.json() + + diff --git a/env.py b/env.py new file mode 100755 index 0000000..99c835a --- /dev/null +++ b/env.py @@ -0,0 +1 @@ +base_url = "https://x:x@ipa.molodetz.nl/api" diff --git a/index.html b/index.html new file mode 100644 index 0000000..d3b78a2 --- /dev/null +++ b/index.html @@ -0,0 +1,54 @@ + + + + + + Zhurnal + + + +
+

Zhurnal

+

Enter repository URL (supports https://, http://, Gitea, GitLab, auth like https://user:pass@domain/repo)

+
+ + +
+ +
+ + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..1224d9c --- /dev/null +++ b/main.py @@ -0,0 +1,587 @@ +import asyncio +import collections +import datetime +import hashlib +import json +import os +import pathlib +import re +import subprocess +import urllib.parse +from datetime import datetime, timezone +from typing import Any, Dict, Iterable, List, Optional, AsyncGenerator, Union, Tuple, Set +from uuid import uuid4 +import aiosqlite +from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, Response, PlainTextResponse +import client + +class AsyncDataSet: + _KV_TABLE = "__kv_store" + _DEFAULT_COLUMNS = { + "uid": "TEXT PRIMARY KEY", + "created_at": "TEXT", + "updated_at": "TEXT", + "deleted_at": "TEXT", + } + def __init__(self, file: str): + self._file = file + self._table_columns_cache: Dict[str, Set[str]] = {} + @staticmethod + def _utc_iso() -> str: + return ( + datetime.now(timezone.utc) + .replace(microsecond=0) + .isoformat() + .replace("+00:00", "Z") + ) + @staticmethod + def _py_to_sqlite_type(value: Any) -> str: + if value is None: + return "TEXT" + if isinstance(value, bool): + return "INTEGER" + if isinstance(value, int): + return "INTEGER" + if isinstance(value, float): + return "REAL" + if isinstance(value, (bytes, bytearray, memoryview)): + return "BLOB" + return "TEXT" + async def _get_table_columns(self, table: str) -> Set[str]: + if table in self._table_columns_cache: + return self._table_columns_cache[table] + columns = set() + try: + async with aiosqlite.connect(self._file) as db: + async with db.execute(f"PRAGMA table_info({table})") as cursor: + async for row in cursor: + columns.add(row[1]) + self._table_columns_cache[table] = columns + except: + pass + return columns + async def _invalidate_column_cache(self, table: str): + if table in self._table_columns_cache: + del self._table_columns_cache[table] + async def _ensure_column(self, table: str, name: str, value: Any) -> None: + col_type = self._py_to_sqlite_type(value) + try: + async with aiosqlite.connect(self._file) as db: + await db.execute(f"ALTER TABLE {table} ADD COLUMN `{name}` {col_type}") + await db.commit() + await self._invalidate_column_cache(table) + except aiosqlite.OperationalError as e: + if "duplicate column name" in str(e).lower(): + pass + else: + raise + async def _ensure_table(self, table: str, col_sources: Dict[str, Any]) -> None: + cols = self._DEFAULT_COLUMNS.copy() + for key, val in col_sources.items(): + if key not in cols: + cols[key] = self._py_to_sqlite_type(val) + columns_sql = ", ".join(f"`{k}` {t}" for k, t in cols.items()) + async with aiosqlite.connect(self._file) as db: + await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})") + await db.commit() + await self._invalidate_column_cache(table) + async def _table_exists(self, table: str) -> bool: + async with aiosqlite.connect(self._file) as db: + async with db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,) + ) as cursor: + return await cursor.fetchone() is not None + _RE_NO_COLUMN = re.compile(r"(?:no such column:|has no column named) (\w+)") + _RE_NO_TABLE = re.compile(r"no such table: (\w+)") + @classmethod + def _missing_column_from_error( + cls, err: aiosqlite.OperationalError + ) -> Optional[str]: + m = cls._RE_NO_COLUMN.search(str(err)) + return m.group(1) if m else None + @classmethod + def _missing_table_from_error( + cls, err: aiosqlite.OperationalError + ) -> Optional[str]: + m = cls._RE_NO_TABLE.search(str(err)) + return m.group(1) if m else None + async def _safe_execute( + self, + table: str, + sql: str, + params: Iterable[Any], + col_sources: Dict[str, Any], + max_retries: int = 10 + ) -> aiosqlite.Cursor: + retries = 0 + while retries < max_retries: + try: + async with aiosqlite.connect(self._file) as db: + cursor = await db.execute(sql, params) + await db.commit() + return cursor + except aiosqlite.OperationalError as err: + retries += 1 + err_str = str(err).lower() + col = self._missing_column_from_error(err) + if col: + if col in col_sources: + await self._ensure_column(table, col, col_sources[col]) + else: + await self._ensure_column(table, col, None) + continue + tbl = self._missing_table_from_error(err) + if tbl: + await self._ensure_table(tbl, col_sources) + continue + if "has no column named" in err_str: + match = re.search(r"table \w+ has no column named (\w+)", err_str) + if match: + col_name = match.group(1) + if col_name in col_sources: + await self._ensure_column(table, col_name, col_sources[col_name]) + else: + await self._ensure_column(table, col_name, None) + continue + raise + raise Exception(f"Max retries ({max_retries}) exceeded") + async def _filter_existing_columns(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]: + if not await self._table_exists(table): + return data + existing_columns = await self._get_table_columns(table) + if not existing_columns: + return data + return {k: v for k, v in data.items() if k in existing_columns} + async def _safe_query( + self, + table: str, + sql: str, + params: Iterable[Any], + col_sources: Dict[str, Any], + ) -> AsyncGenerator[Dict[str, Any], None]: + if not await self._table_exists(table): + return + max_retries = 10 + retries = 0 + while retries < max_retries: + try: + async with aiosqlite.connect(self._file) as db: + db.row_factory = aiosqlite.Row + async with db.execute(sql, params) as cursor: + async for row in cursor: + yield dict(row) + return + except aiosqlite.OperationalError as err: + retries += 1 + err_str = str(err).lower() + tbl = self._missing_table_from_error(err) + if tbl: + return + if "no such column" in err_str: + return + raise + @staticmethod + def _build_where(where: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]: + if not where: + return "", [] + clauses, vals = zip(*[(f"`{k}` = ?", v) for k, v in where.items()]) + return " WHERE " + " AND ".join(clauses), list(vals) + async def insert(self, table: str, args: Dict[str, Any], return_id: bool = False) -> Union[str, int]: + uid = str(uuid4()) + now = self._utc_iso() + record = { + "uid": uid, + "created_at": now, + "updated_at": now, + "deleted_at": None, + **args, + } + await self._ensure_table(table, record) + if return_id and 'id' not in args: + async with aiosqlite.connect(self._file) as db: + try: + await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER PRIMARY KEY AUTOINCREMENT") + await db.commit() + except aiosqlite.OperationalError as e: + if "duplicate column name" not in str(e).lower(): + try: + await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER") + await db.commit() + except: + pass + await self._invalidate_column_cache(table) + cols = "`" + "`, `".join(record.keys()) + "`" + qs = ", ".join(["?"] * len(record)) + sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})" + cursor = await self._safe_execute(table, sql, list(record.values()), record) + return cursor.lastrowid + cols = "`" + "`, `".join(record.keys()) + "`" + qs = ", ".join(["?"] * len(record)) + sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})" + await self._safe_execute(table, sql, list(record.values()), record) + return uid + async def update( + self, + table: str, + args: Dict[str, Any], + where: Optional[Dict[str, Any]] = None, + ) -> int: + if not args: + return 0 + if not await self._table_exists(table): + return 0 + args["updated_at"] = self._utc_iso() + all_cols = {**args, **(where or {})} + await self._ensure_table(table, all_cols) + for col, val in all_cols.items(): + await self._ensure_column(table, col, val) + set_clause = ", ".join(f"`{k}` = ?" for k in args) + where_clause, where_params = self._build_where(where) + sql = f"UPDATE {table} SET {set_clause}{where_clause}" + params = list(args.values()) + where_params + cur = await self._safe_execute(table, sql, params, all_cols) + return cur.rowcount + async def delete(self, table: str, where: Optional[Dict[str, Any]] = None) -> int: + if not await self._table_exists(table): + return 0 + where_clause, where_params = self._build_where(where) + sql = f"DELETE FROM {table}{where_clause}" + cur = await self._safe_execute(table, sql, where_params, where or {}) + return cur.rowcount + async def upsert( + self, + table: str, + args: Dict[str, Any], + where: Optional[Dict[str, Any]] = None, + ) -> str | None: + if not args: + raise ValueError("Nothing to update. Empty dict given.") + args['updated_at'] = self._utc_iso() + affected = await self.update(table, args, where) + if affected: + rec = await self.get(table, where) + return rec.get("uid") if rec else None + merged = {**(where or {}), **args} + return await self.insert(table, merged) + async def get( + self, table: str, where: Optional[Dict[str, Any]] = None + ) -> Optional[Dict[str, Any]]: + where_clause, where_params = self._build_where(where) + sql = f"SELECT * FROM {table}{where_clause} LIMIT 1" + async for row in self._safe_query(table, sql, where_params, where or {}): + return row + return None + async def find( + self, + table: str, + where: Optional[Dict[str, Any]] = None, + *, + limit: int = 0, + offset: int = 0, + order_by: Optional[str] = None, + ) -> List[Dict[str, Any]]: + where_clause, where_params = self._build_where(where) + order_clause = f" ORDER BY {order_by}" if order_by else "" + extra = (f" LIMIT {limit}" if limit else "") + ( + f" OFFSET {offset}" if offset else "" + ) + sql = f"SELECT * FROM {table}{where_clause}{order_clause}{extra}" + return [ + row async for row in self._safe_query(table, sql, where_params, where or {}) + ] + async def count(self, table: str, where: Optional[Dict[str, Any]] = None) -> int: + if not await self._table_exists(table): + return 0 + where_clause, where_params = self._build_where(where) + sql = f"SELECT COUNT(*) FROM {table}{where_clause}" + gen = self._safe_query(table, sql, where_params, where or {}) + async for row in gen: + return next(iter(row.values()), 0) + return 0 + async def exists(self, table: str, where: Dict[str, Any]) -> bool: + return (await self.count(table, where)) > 0 + async def kv_set( + self, + key: str, + value: Any, + *, + table: str | None = None, + ) -> None: + tbl = table or self._KV_TABLE + json_val = json.dumps(value, default=str) + await self.upsert(tbl, {"value": json_val}, {"key": key}) + async def kv_get( + self, + key: str, + *, + default: Any = None, + table: str | None = None, + ) -> Any: + tbl = table or self._KV_TABLE + row = await self.get(tbl, {"key": key}) + if not row: + return default + try: + return json.loads(row["value"]) + except Exception: + return default + async def execute_raw(self, sql: str, params: Optional[Tuple] = None) -> Any: + async with aiosqlite.connect(self._file) as db: + cursor = await db.execute(sql, params or ()) + await db.commit() + return cursor + async def query_raw(self, sql: str, params: Optional[Tuple] = None) -> List[Dict[str, Any]]: + try: + async with aiosqlite.connect(self._file) as db: + db.row_factory = aiosqlite.Row + async with db.execute(sql, params or ()) as cursor: + return [dict(row) async for row in cursor] + except aiosqlite.OperationalError: + return [] + async def query_one(self, sql: str, params: Optional[Tuple] = None) -> Optional[Dict[str, Any]]: + results = await self.query_raw(sql + " LIMIT 1", params) + return results[0] if results else None + async def create_table(self, table: str, schema: Dict[str, str], constraints: Optional[List[str]] = None): + full_schema = self._DEFAULT_COLUMNS.copy() + full_schema.update(schema) + columns = [f"`{col}` {dtype}" for col, dtype in full_schema.items()] + if constraints: + columns.extend(constraints) + columns_sql = ", ".join(columns) + async with aiosqlite.connect(self._file) as db: + await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})") + await db.commit() + await self._invalidate_column_cache(table) + async def insert_unique(self, table: str, args: Dict[str, Any], unique_fields: List[str]) -> Union[str, None]: + try: + return await self.insert(table, args) + except aiosqlite.IntegrityError as e: + if "UNIQUE" in str(e): + return None + raise + async def transaction(self): + return TransactionContext(self._file) + async def aggregate(self, table: str, function: str, column: str = "*", where: Optional[Dict[str, Any]] = None) -> Any: + if not await self._table_exists(table): + return None + where_clause, where_params = self._build_where(where) + sql = f"SELECT {function}({column}) as result FROM {table}{where_clause}" + result = await self.query_one(sql, tuple(where_params)) + return result['result'] if result else None + +class TransactionContext: + def __init__(self, db_file: str): + self.db_file = db_file + self.conn = None + async def __aenter__(self): + self.conn = await aiosqlite.connect(self.db_file) + self.conn.row_factory = aiosqlite.Row + await self.conn.execute("BEGIN") + return self.conn + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + await self.conn.commit() + else: + await self.conn.rollback() + await self.conn.close() + +db = AsyncDataSet("zhurnal.db") +app = FastAPI() +repos_dir = pathlib.Path("repos") +logs_dir = pathlib.Path("logs") +logs_dir.mkdir(exist_ok=True) +log_file = logs_dir / "app.log" +salt = "x1337x" + +def rotate_logs(): + if not log_file.exists(): + return + if log_file.stat().st_size < 1024 * 1024: + return + for i in range(5, 0, -1): + old = logs_dir / f"app.log.{i}" + new = logs_dir / f"app.log.{i+1}" if i < 5 else None + if old.exists(): + if new: + old.rename(new) + else: + old.unlink() + log_file.rename(logs_dir / "app.log.1") + +def log(message: str, redact_url: Optional[str] = None): + rotate_logs() + timestamp = datetime.now().isoformat() + if redact_url: + parsed = urllib.parse.urlparse(redact_url) + if parsed.username or parsed.password: + redacted_netloc = f"***@{parsed.hostname}" + if parsed.port: + redacted_netloc += f":{parsed.port}" + redacted_url = parsed._replace(netloc=redacted_netloc).geturl() + message = message.replace(redact_url, redacted_url) + with log_file.open("a") as f: + f.write(f"[{timestamp}] {message}\n") + +connections: Dict[str, List[WebSocket]] = collections.defaultdict(list) +tasks: Dict[str, asyncio.Task] = {} + +async def broadcast(repo_hash: str, data: dict): + for connection in connections[repo_hash]: + await connection.send_json(data) + +def run_git_command(repo_path: str, command: list[str]) -> str: + result = subprocess.run(["git", "-C", repo_path] + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + raise Exception(result.stderr.strip()) + return result.stdout.strip()[:1024 * 1024] + +def get_commits_by_day(repo_path: pathlib.Path) -> Dict[str, List[str]]: + log_output = run_git_command(str(repo_path), ["log", "--pretty=format:%H|%ad", "--date=short"]) + commits_by_date = collections.defaultdict(list) + for line in log_output.splitlines(): + if "|" in line: + commit_hash, commit_date = line.split("|", 1) + commits_by_date[commit_date].append(commit_hash) + return commits_by_date + +async def process_repo(url: str, repo_hash: str, is_user_request: bool = False): + try: + log(f"Processing repo {url} ({repo_hash})", redact_url=url) + repo_path = repos_dir / repo_hash + repos_dir.mkdir(exist_ok=True) + if not repo_path.exists(): + await broadcast(repo_hash, {"type": "message", "content": "Cloning repository..."}) + for _ in range(3): + try: + subprocess.run(["git", "clone", url, str(repo_path)], check=True) + break + except subprocess.CalledProcessError: + await asyncio.sleep(1) + else: + raise Exception("Clone failed after 3 tries") + else: + await broadcast(repo_hash, {"type": "message", "content": "Updating repository..."}) + for _ in range(3): + try: + subprocess.run(["git", "-C", str(repo_path), "pull"], check=True) + break + except subprocess.CalledProcessError: + await asyncio.sleep(1) + else: + raise Exception("Pull failed after 3 tries") + commits_by_day = get_commits_by_day(repo_path) + history = {c["uid"]: c for c in await db.find("commits", {"repo": repo_hash})} + new_commits = 0 + for date in sorted(commits_by_day): + await broadcast(repo_hash, {"type": "message", "content": f"Diffs for {date}"}) + for commit in reversed(commits_by_day[date]): + if commit in history and history[commit]["status"] == "success": + continue + new_commits += 1 + diff = run_git_command(str(repo_path), ["show", commit, "--no-color", "--format=medium"]) + messages = f"### Commit: `{commit}`\n{diff}" + prompt = f"""1. Generate a git one liner based on git diff +2. Example of a good message is: `feat: Enhanced message display with emojis and code highlighting` +3. Example of a good message is: `fix: Fixed a bug that caused the app to crash` +4. Do not include explanations in your response. It must look like human written. +5. The diff to respond to with a git one liner: {messages}""" + for _ in range(3): + try: + ai = client.APIClient() + result = ai.prompt(prompt, json=False, reduce_tokens=79000) + await broadcast(repo_hash, {"type": "message", "content": result}) + await db.upsert("commits", {"repo": repo_hash, "date": date, "line": result, "status": "success", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit}) + break + except Exception as e: + if _ == 2: + await broadcast(repo_hash, {"type": "message", "content": f"Commit {commit}: failed"}) + await db.upsert("commits", {"repo": repo_hash, "date": date, "error": str(e), "status": "failed", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit}) + await asyncio.sleep(1) + if new_commits == 0 and is_user_request: + await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"}) + else: + await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"}) + await db.update("repos", {"last_synced": datetime.utcnow().isoformat()}, {"hash": repo_hash}) + except Exception as e: + log(f"Error: {str(e)}", redact_url=url) + await broadcast(repo_hash, {"type": "error", "content": str(e)}) + +async def scheduler(): + while True: + await asyncio.sleep(3600) + repos = await db.find("repos", {"hourly_enabled": True}) + for repo in repos: + repo_hash = repo["hash"] + url = repo["original_url"] + if repo_hash not in tasks or tasks[repo_hash].done(): + tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash)) + + +schedule_task = None + +@app.get("/", response_class=HTMLResponse) +async def root(): + global schedule_task + if not schedule_task: + schedule_task = asyncio.create_task(scheduler()) + + return open("index.html").read() + +@app.post("/submit") +async def submit(request: Request): + form = await request.form() + url = form.get("repo_url") + if not url or len(url) > 1000: + return Response("Invalid URL", status_code=400) + parsed = urllib.parse.urlparse(url) + if parsed.scheme not in ("http", "https") or not parsed.netloc: + return Response("Invalid URL", status_code=400) + repo_hash = hashlib.sha256(url.encode()).hexdigest() + repo = await db.get("repos", {"hash": repo_hash}) + if not repo: + await db.insert("repos", {"hash": repo_hash, "original_url": url, "last_synced": None, "hourly_enabled": True}) + if repo_hash not in tasks or tasks[repo_hash].done(): + tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash, True)) + return {"hash": repo_hash} + +@app.websocket("/ws/{repo_hash}") +async def ws(websocket: WebSocket, repo_hash: str): + await websocket.accept() + connections[repo_hash].append(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + connections[repo_hash].remove(websocket) + +@app.get("/changelog/{repo_hash}", response_class=HTMLResponse) +async def get_changelog(repo_hash: str): + commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC") + by_date = collections.defaultdict(list) + for c in commits: + by_date[c["date"]].append(c["line"]) + content = "".join(f"

{date}

" for date in sorted(by_date, reverse=True)) + html = open("changelog.html").read().replace("{{content}}", content).replace("{{hash}}", repo_hash) + return html + +@app.get("/download/json/{repo_hash}") +async def download_json(repo_hash: str): + commits = await db.find("commits", {"repo": repo_hash, "status": "success"}) + data = {c["uid"]: {"repo": ".", "date": c["date"], "line": c["line"], "status": c["status"], "timestamp": c["timestamp"]} for c in commits} + filename = f"changelog_{repo_hash}.json" + return Response(json.dumps(data), media_type="application/json", headers={"Content-Disposition": f"attachment; filename={filename}"}) + +@app.get("/download/md/{repo_hash}") +async def download_md(repo_hash: str): + commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC") + by_date = collections.defaultdict(list) + for c in commits: + by_date[c["date"]].append(c["line"]) + md = "# Changelog\n\n" + "\n".join(f"## {date}\n" + "\n".join(f"- {line}" for line in by_date[date]) + "\n" for date in sorted(by_date, reverse=True)) + filename = f"changelog_{repo_hash}.md" + return Response(md, media_type="text/markdown", headers={"Content-Disposition": f"attachment; filename={filename}"}) + +@app.get("/system-log", response_class=PlainTextResponse) +async def get_log(): + return log_file.read_text() if log_file.exists() else ""