import asyncio
import collections
import datetime
import hashlib
import json
import os
import pathlib
import re
import subprocess
import urllib.parse
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, AsyncGenerator, Union, Tuple, Set
from uuid import uuid4
import aiosqlite
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, Response, PlainTextResponse
import client
class AsyncDataSet:
_KV_TABLE = "__kv_store"
_DEFAULT_COLUMNS = {
"uid": "TEXT PRIMARY KEY",
"created_at": "TEXT",
"updated_at": "TEXT",
"deleted_at": "TEXT",
}
def __init__(self, file: str):
self._file = file
self._table_columns_cache: Dict[str, Set[str]] = {}
@staticmethod
def _utc_iso() -> str:
return (
datetime.now(timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
@staticmethod
def _py_to_sqlite_type(value: Any) -> str:
if value is None:
return "TEXT"
if isinstance(value, bool):
return "INTEGER"
if isinstance(value, int):
return "INTEGER"
if isinstance(value, float):
return "REAL"
if isinstance(value, (bytes, bytearray, memoryview)):
return "BLOB"
return "TEXT"
async def _get_table_columns(self, table: str) -> Set[str]:
if table in self._table_columns_cache:
return self._table_columns_cache[table]
columns = set()
try:
async with aiosqlite.connect(self._file) as db:
async with db.execute(f"PRAGMA table_info({table})") as cursor:
async for row in cursor:
columns.add(row[1])
self._table_columns_cache[table] = columns
except:
pass
return columns
async def _invalidate_column_cache(self, table: str):
if table in self._table_columns_cache:
del self._table_columns_cache[table]
async def _ensure_column(self, table: str, name: str, value: Any) -> None:
col_type = self._py_to_sqlite_type(value)
try:
async with aiosqlite.connect(self._file) as db:
await db.execute(f"ALTER TABLE {table} ADD COLUMN `{name}` {col_type}")
await db.commit()
await self._invalidate_column_cache(table)
except aiosqlite.OperationalError as e:
if "duplicate column name" in str(e).lower():
pass
else:
raise
async def _ensure_table(self, table: str, col_sources: Dict[str, Any]) -> None:
cols = self._DEFAULT_COLUMNS.copy()
for key, val in col_sources.items():
if key not in cols:
cols[key] = self._py_to_sqlite_type(val)
columns_sql = ", ".join(f"`{k}` {t}" for k, t in cols.items())
async with aiosqlite.connect(self._file) as db:
await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})")
await db.commit()
await self._invalidate_column_cache(table)
async def _table_exists(self, table: str) -> bool:
async with aiosqlite.connect(self._file) as db:
async with db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,)
) as cursor:
return await cursor.fetchone() is not None
_RE_NO_COLUMN = re.compile(r"(?:no such column:|has no column named) (\w+)")
_RE_NO_TABLE = re.compile(r"no such table: (\w+)")
@classmethod
def _missing_column_from_error(
cls, err: aiosqlite.OperationalError
) -> Optional[str]:
m = cls._RE_NO_COLUMN.search(str(err))
return m.group(1) if m else None
@classmethod
def _missing_table_from_error(
cls, err: aiosqlite.OperationalError
) -> Optional[str]:
m = cls._RE_NO_TABLE.search(str(err))
return m.group(1) if m else None
async def _safe_execute(
self,
table: str,
sql: str,
params: Iterable[Any],
col_sources: Dict[str, Any],
max_retries: int = 10
) -> aiosqlite.Cursor:
retries = 0
while retries < max_retries:
try:
async with aiosqlite.connect(self._file) as db:
cursor = await db.execute(sql, params)
await db.commit()
return cursor
except aiosqlite.OperationalError as err:
retries += 1
err_str = str(err).lower()
col = self._missing_column_from_error(err)
if col:
if col in col_sources:
await self._ensure_column(table, col, col_sources[col])
else:
await self._ensure_column(table, col, None)
continue
tbl = self._missing_table_from_error(err)
if tbl:
await self._ensure_table(tbl, col_sources)
continue
if "has no column named" in err_str:
match = re.search(r"table \w+ has no column named (\w+)", err_str)
if match:
col_name = match.group(1)
if col_name in col_sources:
await self._ensure_column(table, col_name, col_sources[col_name])
else:
await self._ensure_column(table, col_name, None)
continue
raise
raise Exception(f"Max retries ({max_retries}) exceeded")
async def _filter_existing_columns(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
if not await self._table_exists(table):
return data
existing_columns = await self._get_table_columns(table)
if not existing_columns:
return data
return {k: v for k, v in data.items() if k in existing_columns}
async def _safe_query(
self,
table: str,
sql: str,
params: Iterable[Any],
col_sources: Dict[str, Any],
) -> AsyncGenerator[Dict[str, Any], None]:
if not await self._table_exists(table):
return
max_retries = 10
retries = 0
while retries < max_retries:
try:
async with aiosqlite.connect(self._file) as db:
db.row_factory = aiosqlite.Row
async with db.execute(sql, params) as cursor:
async for row in cursor:
yield dict(row)
return
except aiosqlite.OperationalError as err:
retries += 1
err_str = str(err).lower()
tbl = self._missing_table_from_error(err)
if tbl:
return
if "no such column" in err_str:
return
raise
@staticmethod
def _build_where(where: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]:
if not where:
return "", []
clauses, vals = zip(*[(f"`{k}` = ?", v) for k, v in where.items()])
return " WHERE " + " AND ".join(clauses), list(vals)
async def insert(self, table: str, args: Dict[str, Any], return_id: bool = False) -> Union[str, int]:
uid = str(uuid4())
now = self._utc_iso()
record = {
"uid": uid,
"created_at": now,
"updated_at": now,
"deleted_at": None,
**args,
}
await self._ensure_table(table, record)
if return_id and 'id' not in args:
async with aiosqlite.connect(self._file) as db:
try:
await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER PRIMARY KEY AUTOINCREMENT")
await db.commit()
except aiosqlite.OperationalError as e:
if "duplicate column name" not in str(e).lower():
try:
await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER")
await db.commit()
except:
pass
await self._invalidate_column_cache(table)
cols = "`" + "`, `".join(record.keys()) + "`"
qs = ", ".join(["?"] * len(record))
sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})"
cursor = await self._safe_execute(table, sql, list(record.values()), record)
return cursor.lastrowid
cols = "`" + "`, `".join(record.keys()) + "`"
qs = ", ".join(["?"] * len(record))
sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})"
await self._safe_execute(table, sql, list(record.values()), record)
return uid
async def update(
self,
table: str,
args: Dict[str, Any],
where: Optional[Dict[str, Any]] = None,
) -> int:
if not args:
return 0
if not await self._table_exists(table):
return 0
args["updated_at"] = self._utc_iso()
all_cols = {**args, **(where or {})}
await self._ensure_table(table, all_cols)
for col, val in all_cols.items():
await self._ensure_column(table, col, val)
set_clause = ", ".join(f"`{k}` = ?" for k in args)
where_clause, where_params = self._build_where(where)
sql = f"UPDATE {table} SET {set_clause}{where_clause}"
params = list(args.values()) + where_params
cur = await self._safe_execute(table, sql, params, all_cols)
return cur.rowcount
async def delete(self, table: str, where: Optional[Dict[str, Any]] = None) -> int:
if not await self._table_exists(table):
return 0
where_clause, where_params = self._build_where(where)
sql = f"DELETE FROM {table}{where_clause}"
cur = await self._safe_execute(table, sql, where_params, where or {})
return cur.rowcount
async def upsert(
self,
table: str,
args: Dict[str, Any],
where: Optional[Dict[str, Any]] = None,
) -> str | None:
if not args:
raise ValueError("Nothing to update. Empty dict given.")
args['updated_at'] = self._utc_iso()
affected = await self.update(table, args, where)
if affected:
rec = await self.get(table, where)
return rec.get("uid") if rec else None
merged = {**(where or {}), **args}
return await self.insert(table, merged)
async def get(
self, table: str, where: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
where_clause, where_params = self._build_where(where)
sql = f"SELECT * FROM {table}{where_clause} LIMIT 1"
async for row in self._safe_query(table, sql, where_params, where or {}):
return row
return None
async def find(
self,
table: str,
where: Optional[Dict[str, Any]] = None,
*,
limit: int = 0,
offset: int = 0,
order_by: Optional[str] = None,
) -> List[Dict[str, Any]]:
where_clause, where_params = self._build_where(where)
order_clause = f" ORDER BY {order_by}" if order_by else ""
extra = (f" LIMIT {limit}" if limit else "") + (
f" OFFSET {offset}" if offset else ""
)
sql = f"SELECT * FROM {table}{where_clause}{order_clause}{extra}"
return [
row async for row in self._safe_query(table, sql, where_params, where or {})
]
async def count(self, table: str, where: Optional[Dict[str, Any]] = None) -> int:
if not await self._table_exists(table):
return 0
where_clause, where_params = self._build_where(where)
sql = f"SELECT COUNT(*) FROM {table}{where_clause}"
gen = self._safe_query(table, sql, where_params, where or {})
async for row in gen:
return next(iter(row.values()), 0)
return 0
async def exists(self, table: str, where: Dict[str, Any]) -> bool:
return (await self.count(table, where)) > 0
async def kv_set(
self,
key: str,
value: Any,
*,
table: str | None = None,
) -> None:
tbl = table or self._KV_TABLE
json_val = json.dumps(value, default=str)
await self.upsert(tbl, {"value": json_val}, {"key": key})
async def kv_get(
self,
key: str,
*,
default: Any = None,
table: str | None = None,
) -> Any:
tbl = table or self._KV_TABLE
row = await self.get(tbl, {"key": key})
if not row:
return default
try:
return json.loads(row["value"])
except Exception:
return default
async def execute_raw(self, sql: str, params: Optional[Tuple] = None) -> Any:
async with aiosqlite.connect(self._file) as db:
cursor = await db.execute(sql, params or ())
await db.commit()
return cursor
async def query_raw(self, sql: str, params: Optional[Tuple] = None) -> List[Dict[str, Any]]:
try:
async with aiosqlite.connect(self._file) as db:
db.row_factory = aiosqlite.Row
async with db.execute(sql, params or ()) as cursor:
return [dict(row) async for row in cursor]
except aiosqlite.OperationalError:
return []
async def query_one(self, sql: str, params: Optional[Tuple] = None) -> Optional[Dict[str, Any]]:
results = await self.query_raw(sql + " LIMIT 1", params)
return results[0] if results else None
async def create_table(self, table: str, schema: Dict[str, str], constraints: Optional[List[str]] = None):
full_schema = self._DEFAULT_COLUMNS.copy()
full_schema.update(schema)
columns = [f"`{col}` {dtype}" for col, dtype in full_schema.items()]
if constraints:
columns.extend(constraints)
columns_sql = ", ".join(columns)
async with aiosqlite.connect(self._file) as db:
await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})")
await db.commit()
await self._invalidate_column_cache(table)
async def insert_unique(self, table: str, args: Dict[str, Any], unique_fields: List[str]) -> Union[str, None]:
try:
return await self.insert(table, args)
except aiosqlite.IntegrityError as e:
if "UNIQUE" in str(e):
return None
raise
async def transaction(self):
return TransactionContext(self._file)
async def aggregate(self, table: str, function: str, column: str = "*", where: Optional[Dict[str, Any]] = None) -> Any:
if not await self._table_exists(table):
return None
where_clause, where_params = self._build_where(where)
sql = f"SELECT {function}({column}) as result FROM {table}{where_clause}"
result = await self.query_one(sql, tuple(where_params))
return result['result'] if result else None
class TransactionContext:
def __init__(self, db_file: str):
self.db_file = db_file
self.conn = None
async def __aenter__(self):
self.conn = await aiosqlite.connect(self.db_file)
self.conn.row_factory = aiosqlite.Row
await self.conn.execute("BEGIN")
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
await self.conn.commit()
else:
await self.conn.rollback()
await self.conn.close()
db = AsyncDataSet("zhurnal.db")
app = FastAPI()
repos_dir = pathlib.Path("repos")
logs_dir = pathlib.Path("logs")
logs_dir.mkdir(exist_ok=True)
log_file = logs_dir / "app.log"
salt = "x1337x"
def rotate_logs():
if not log_file.exists():
return
if log_file.stat().st_size < 1024 * 1024:
return
for i in range(5, 0, -1):
old = logs_dir / f"app.log.{i}"
new = logs_dir / f"app.log.{i+1}" if i < 5 else None
if old.exists():
if new:
old.rename(new)
else:
old.unlink()
log_file.rename(logs_dir / "app.log.1")
def log(message: str, redact_url: Optional[str] = None):
rotate_logs()
timestamp = datetime.now().isoformat()
if redact_url:
parsed = urllib.parse.urlparse(redact_url)
if parsed.username or parsed.password:
redacted_netloc = f"***@{parsed.hostname}"
if parsed.port:
redacted_netloc += f":{parsed.port}"
redacted_url = parsed._replace(netloc=redacted_netloc).geturl()
message = message.replace(redact_url, redacted_url)
with log_file.open("a") as f:
f.write(f"[{timestamp}] {message}\n")
connections: Dict[str, List[WebSocket]] = collections.defaultdict(list)
tasks: Dict[str, asyncio.Task] = {}
async def broadcast(repo_hash: str, data: dict):
for connection in connections[repo_hash]:
await connection.send_json(data)
async def run_git_command(repo_path: str, command: list[str]) -> str:
proc = await asyncio.create_subprocess_exec(
'git', '-C', repo_path, *command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise Exception(stderr.decode().strip())
return stdout.decode().strip()[:1024 * 1024]
async def get_commits_by_day(repo_path: pathlib.Path) -> Dict[str, List[str]]:
log_output = await run_git_command(str(repo_path), ["log", "--pretty=format:%H|%ad", "--date=short"])
commits_by_date = collections.defaultdict(list)
for line in log_output.splitlines():
if "|" in line:
commit_hash, commit_date = line.split("|", 1)
commits_by_date[commit_date].append(commit_hash)
return commits_by_date
async def process_repo(url: str, repo_hash: str, is_user_request: bool = False):
try:
log(f"Processing repo {url} ({repo_hash})", redact_url=url)
repo_path = repos_dir / repo_hash
repos_dir.mkdir(exist_ok=True)
if not repo_path.exists():
await broadcast(repo_hash, {"type": "message", "content": "Cloning repository..."})
for _ in range(3):
try:
proc = await asyncio.create_subprocess_exec(
'git', 'clone', url, str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
break
else:
raise subprocess.CalledProcessError(proc.returncode, ['git', 'clone'], stderr.decode())
except subprocess.CalledProcessError:
await asyncio.sleep(1)
else:
raise Exception("Clone failed after 3 tries")
else:
await broadcast(repo_hash, {"type": "message", "content": "Updating repository..."})
for _ in range(3):
try:
proc = await asyncio.create_subprocess_exec(
'git', '-C', str(repo_path), 'pull',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
break
else:
raise subprocess.CalledProcessError(proc.returncode, ['git', 'pull'], stderr.decode())
except subprocess.CalledProcessError:
await asyncio.sleep(1)
else:
raise Exception("Pull failed after 3 tries")
commits_by_day = await get_commits_by_day(repo_path)
history = {c["uid"]: c for c in await db.find("commits", {"repo": repo_hash})}
new_commits = 0
for date in sorted(commits_by_day):
await broadcast(repo_hash, {"type": "message", "content": f"Diffs for {date}"})
for commit in reversed(commits_by_day[date]):
if commit in history and history[commit]["status"] == "success":
continue
new_commits += 1
diff = await run_git_command(str(repo_path), ["show", commit, "--no-color", "--format=medium"])
prompt = (
"Generate a describtion about what is done (or fixed) functionally for end users.\n"
"Allowed prefixes: feat, fix, security, change.\n"
"No explanations.\n\n"
f"GIT DIFF:\n{diff}\n"
)
for _ in range(3):
try:
ai = client.APIClient()
result = ai.prompt(prompt, json=False, reduce_tokens=79000)
await broadcast(repo_hash, {"type": "message", "content": result})
await db.upsert("commits", {"repo": repo_hash, "date": date, "line": result, "status": "success", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit})
break
except Exception as e:
if _ == 2:
await broadcast(repo_hash, {"type": "message", "content": f"Commit {commit}: failed"})
await db.upsert("commits", {"repo": repo_hash, "date": date, "error": str(e), "status": "failed", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit})
await asyncio.sleep(1)
if new_commits == 0 and is_user_request:
await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"})
else:
await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"})
await db.update("repos", {"last_synced": datetime.utcnow().isoformat()}, {"hash": repo_hash})
except Exception as e:
log(f"Error: {str(e)}", redact_url=url)
await broadcast(repo_hash, {"type": "error", "content": str(e)})
async def scheduler():
while True:
await asyncio.sleep(3600)
repos = await db.find("repos", {"hourly_enabled": True})
for repo in repos:
repo_hash = repo["hash"]
url = repo["original_url"]
if repo_hash not in tasks or tasks[repo_hash].done():
tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash))
schedule_task = None
@app.get("/", response_class=HTMLResponse)
async def root():
global schedule_task
if not schedule_task:
schedule_task = asyncio.create_task(scheduler())
return open("index.html").read()
@app.post("/submit")
async def submit(request: Request):
form = await request.form()
url = form.get("repo_url")
if not url or len(url) > 1000:
return Response("Invalid URL", status_code=400)
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https") or not parsed.netloc:
return Response("Invalid URL", status_code=400)
repo_hash = hashlib.sha256(url.encode()).hexdigest()
repo = await db.get("repos", {"hash": repo_hash})
if not repo:
await db.insert("repos", {"hash": repo_hash, "original_url": url, "last_synced": None, "hourly_enabled": True})
if repo_hash not in tasks or tasks[repo_hash].done():
tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash, True))
return {"hash": repo_hash}
@app.websocket("/ws/{repo_hash}")
async def ws(websocket: WebSocket, repo_hash: str):
await websocket.accept()
connections[repo_hash].append(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
connections[repo_hash].remove(websocket)
@app.get("/changelog/{repo_hash}", response_class=HTMLResponse)
async def get_changelog(repo_hash: str):
commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC")
by_date = collections.defaultdict(list)
for c in commits:
by_date[c["date"]].append(c["line"])
content = "".join(f"<h2>{date}</h2><ul>" + "".join(f"<li>{line}</li>" for line in by_date[date]) + "</ul>" for date in sorted(by_date, reverse=True))
html = open("changelog.html").read().replace("{{content}}", content).replace("{{hash}}", repo_hash)
return html
@app.get("/download/json/{repo_hash}")
async def download_json(repo_hash: str):
commits = await db.find("commits", {"repo": repo_hash, "status": "success"})
data = {c["uid"]: {"repo": ".", "date": c["date"], "line": c["line"], "status": c["status"], "timestamp": c["timestamp"]} for c in commits}
filename = f"changelog_{repo_hash}.json"
return Response(json.dumps(data), media_type="application/json", headers={"Content-Disposition": f"attachment; filename={filename}"})
@app.get("/download/md/{repo_hash}")
async def download_md(repo_hash: str):
commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC")
by_date = collections.defaultdict(list)
for c in commits:
by_date[c["date"]].append(c["line"])
md = "# Changelog\n\n" + "\n".join(f"## {date}\n" + "\n".join(f"- {line}" for line in by_date[date]) + "\n" for date in sorted(by_date, reverse=True))
filename = f"changelog_{repo_hash}.md"
return Response(md, media_type="text/markdown", headers={"Content-Disposition": f"attachment; filename={filename}"})
@app.get("/system-log", response_class=PlainTextResponse)
async def get_log():
return log_file.read_text() if log_file.exists() else ""