|
import asyncio
|
|
import collections
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import subprocess
|
|
import urllib.parse
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, Iterable, List, Optional, AsyncGenerator, Union, Tuple, Set
|
|
from uuid import uuid4
|
|
import aiosqlite
|
|
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import HTMLResponse, Response, PlainTextResponse
|
|
import client
|
|
|
|
class AsyncDataSet:
|
|
_KV_TABLE = "__kv_store"
|
|
_DEFAULT_COLUMNS = {
|
|
"uid": "TEXT PRIMARY KEY",
|
|
"created_at": "TEXT",
|
|
"updated_at": "TEXT",
|
|
"deleted_at": "TEXT",
|
|
}
|
|
def __init__(self, file: str):
|
|
self._file = file
|
|
self._table_columns_cache: Dict[str, Set[str]] = {}
|
|
@staticmethod
|
|
def _utc_iso() -> str:
|
|
return (
|
|
datetime.now(timezone.utc)
|
|
.replace(microsecond=0)
|
|
.isoformat()
|
|
.replace("+00:00", "Z")
|
|
)
|
|
@staticmethod
|
|
def _py_to_sqlite_type(value: Any) -> str:
|
|
if value is None:
|
|
return "TEXT"
|
|
if isinstance(value, bool):
|
|
return "INTEGER"
|
|
if isinstance(value, int):
|
|
return "INTEGER"
|
|
if isinstance(value, float):
|
|
return "REAL"
|
|
if isinstance(value, (bytes, bytearray, memoryview)):
|
|
return "BLOB"
|
|
return "TEXT"
|
|
async def _get_table_columns(self, table: str) -> Set[str]:
|
|
if table in self._table_columns_cache:
|
|
return self._table_columns_cache[table]
|
|
columns = set()
|
|
try:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
async with db.execute(f"PRAGMA table_info({table})") as cursor:
|
|
async for row in cursor:
|
|
columns.add(row[1])
|
|
self._table_columns_cache[table] = columns
|
|
except:
|
|
pass
|
|
return columns
|
|
async def _invalidate_column_cache(self, table: str):
|
|
if table in self._table_columns_cache:
|
|
del self._table_columns_cache[table]
|
|
async def _ensure_column(self, table: str, name: str, value: Any) -> None:
|
|
col_type = self._py_to_sqlite_type(value)
|
|
try:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
await db.execute(f"ALTER TABLE {table} ADD COLUMN `{name}` {col_type}")
|
|
await db.commit()
|
|
await self._invalidate_column_cache(table)
|
|
except aiosqlite.OperationalError as e:
|
|
if "duplicate column name" in str(e).lower():
|
|
pass
|
|
else:
|
|
raise
|
|
async def _ensure_table(self, table: str, col_sources: Dict[str, Any]) -> None:
|
|
cols = self._DEFAULT_COLUMNS.copy()
|
|
for key, val in col_sources.items():
|
|
if key not in cols:
|
|
cols[key] = self._py_to_sqlite_type(val)
|
|
columns_sql = ", ".join(f"`{k}` {t}" for k, t in cols.items())
|
|
async with aiosqlite.connect(self._file) as db:
|
|
await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})")
|
|
await db.commit()
|
|
await self._invalidate_column_cache(table)
|
|
async def _table_exists(self, table: str) -> bool:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
async with db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,)
|
|
) as cursor:
|
|
return await cursor.fetchone() is not None
|
|
_RE_NO_COLUMN = re.compile(r"(?:no such column:|has no column named) (\w+)")
|
|
_RE_NO_TABLE = re.compile(r"no such table: (\w+)")
|
|
@classmethod
|
|
def _missing_column_from_error(
|
|
cls, err: aiosqlite.OperationalError
|
|
) -> Optional[str]:
|
|
m = cls._RE_NO_COLUMN.search(str(err))
|
|
return m.group(1) if m else None
|
|
@classmethod
|
|
def _missing_table_from_error(
|
|
cls, err: aiosqlite.OperationalError
|
|
) -> Optional[str]:
|
|
m = cls._RE_NO_TABLE.search(str(err))
|
|
return m.group(1) if m else None
|
|
async def _safe_execute(
|
|
self,
|
|
table: str,
|
|
sql: str,
|
|
params: Iterable[Any],
|
|
col_sources: Dict[str, Any],
|
|
max_retries: int = 10
|
|
) -> aiosqlite.Cursor:
|
|
retries = 0
|
|
while retries < max_retries:
|
|
try:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
cursor = await db.execute(sql, params)
|
|
await db.commit()
|
|
return cursor
|
|
except aiosqlite.OperationalError as err:
|
|
retries += 1
|
|
err_str = str(err).lower()
|
|
col = self._missing_column_from_error(err)
|
|
if col:
|
|
if col in col_sources:
|
|
await self._ensure_column(table, col, col_sources[col])
|
|
else:
|
|
await self._ensure_column(table, col, None)
|
|
continue
|
|
tbl = self._missing_table_from_error(err)
|
|
if tbl:
|
|
await self._ensure_table(tbl, col_sources)
|
|
continue
|
|
if "has no column named" in err_str:
|
|
match = re.search(r"table \w+ has no column named (\w+)", err_str)
|
|
if match:
|
|
col_name = match.group(1)
|
|
if col_name in col_sources:
|
|
await self._ensure_column(table, col_name, col_sources[col_name])
|
|
else:
|
|
await self._ensure_column(table, col_name, None)
|
|
continue
|
|
raise
|
|
raise Exception(f"Max retries ({max_retries}) exceeded")
|
|
async def _filter_existing_columns(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not await self._table_exists(table):
|
|
return data
|
|
existing_columns = await self._get_table_columns(table)
|
|
if not existing_columns:
|
|
return data
|
|
return {k: v for k, v in data.items() if k in existing_columns}
|
|
async def _safe_query(
|
|
self,
|
|
table: str,
|
|
sql: str,
|
|
params: Iterable[Any],
|
|
col_sources: Dict[str, Any],
|
|
) -> AsyncGenerator[Dict[str, Any], None]:
|
|
if not await self._table_exists(table):
|
|
return
|
|
max_retries = 10
|
|
retries = 0
|
|
while retries < max_retries:
|
|
try:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(sql, params) as cursor:
|
|
async for row in cursor:
|
|
yield dict(row)
|
|
return
|
|
except aiosqlite.OperationalError as err:
|
|
retries += 1
|
|
err_str = str(err).lower()
|
|
tbl = self._missing_table_from_error(err)
|
|
if tbl:
|
|
return
|
|
if "no such column" in err_str:
|
|
return
|
|
raise
|
|
@staticmethod
|
|
def _build_where(where: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]:
|
|
if not where:
|
|
return "", []
|
|
clauses, vals = zip(*[(f"`{k}` = ?", v) for k, v in where.items()])
|
|
return " WHERE " + " AND ".join(clauses), list(vals)
|
|
async def insert(self, table: str, args: Dict[str, Any], return_id: bool = False) -> Union[str, int]:
|
|
uid = str(uuid4())
|
|
now = self._utc_iso()
|
|
record = {
|
|
"uid": uid,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"deleted_at": None,
|
|
**args,
|
|
}
|
|
await self._ensure_table(table, record)
|
|
if return_id and 'id' not in args:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
try:
|
|
await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER PRIMARY KEY AUTOINCREMENT")
|
|
await db.commit()
|
|
except aiosqlite.OperationalError as e:
|
|
if "duplicate column name" not in str(e).lower():
|
|
try:
|
|
await db.execute(f"ALTER TABLE {table} ADD COLUMN id INTEGER")
|
|
await db.commit()
|
|
except:
|
|
pass
|
|
await self._invalidate_column_cache(table)
|
|
cols = "`" + "`, `".join(record.keys()) + "`"
|
|
qs = ", ".join(["?"] * len(record))
|
|
sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})"
|
|
cursor = await self._safe_execute(table, sql, list(record.values()), record)
|
|
return cursor.lastrowid
|
|
cols = "`" + "`, `".join(record.keys()) + "`"
|
|
qs = ", ".join(["?"] * len(record))
|
|
sql = f"INSERT INTO {table} ({cols}) VALUES ({qs})"
|
|
await self._safe_execute(table, sql, list(record.values()), record)
|
|
return uid
|
|
async def update(
|
|
self,
|
|
table: str,
|
|
args: Dict[str, Any],
|
|
where: Optional[Dict[str, Any]] = None,
|
|
) -> int:
|
|
if not args:
|
|
return 0
|
|
if not await self._table_exists(table):
|
|
return 0
|
|
args["updated_at"] = self._utc_iso()
|
|
all_cols = {**args, **(where or {})}
|
|
await self._ensure_table(table, all_cols)
|
|
for col, val in all_cols.items():
|
|
await self._ensure_column(table, col, val)
|
|
set_clause = ", ".join(f"`{k}` = ?" for k in args)
|
|
where_clause, where_params = self._build_where(where)
|
|
sql = f"UPDATE {table} SET {set_clause}{where_clause}"
|
|
params = list(args.values()) + where_params
|
|
cur = await self._safe_execute(table, sql, params, all_cols)
|
|
return cur.rowcount
|
|
async def delete(self, table: str, where: Optional[Dict[str, Any]] = None) -> int:
|
|
if not await self._table_exists(table):
|
|
return 0
|
|
where_clause, where_params = self._build_where(where)
|
|
sql = f"DELETE FROM {table}{where_clause}"
|
|
cur = await self._safe_execute(table, sql, where_params, where or {})
|
|
return cur.rowcount
|
|
async def upsert(
|
|
self,
|
|
table: str,
|
|
args: Dict[str, Any],
|
|
where: Optional[Dict[str, Any]] = None,
|
|
) -> str | None:
|
|
if not args:
|
|
raise ValueError("Nothing to update. Empty dict given.")
|
|
args['updated_at'] = self._utc_iso()
|
|
affected = await self.update(table, args, where)
|
|
if affected:
|
|
rec = await self.get(table, where)
|
|
return rec.get("uid") if rec else None
|
|
merged = {**(where or {}), **args}
|
|
return await self.insert(table, merged)
|
|
async def get(
|
|
self, table: str, where: Optional[Dict[str, Any]] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
where_clause, where_params = self._build_where(where)
|
|
sql = f"SELECT * FROM {table}{where_clause} LIMIT 1"
|
|
async for row in self._safe_query(table, sql, where_params, where or {}):
|
|
return row
|
|
return None
|
|
async def find(
|
|
self,
|
|
table: str,
|
|
where: Optional[Dict[str, Any]] = None,
|
|
*,
|
|
limit: int = 0,
|
|
offset: int = 0,
|
|
order_by: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
where_clause, where_params = self._build_where(where)
|
|
order_clause = f" ORDER BY {order_by}" if order_by else ""
|
|
extra = (f" LIMIT {limit}" if limit else "") + (
|
|
f" OFFSET {offset}" if offset else ""
|
|
)
|
|
sql = f"SELECT * FROM {table}{where_clause}{order_clause}{extra}"
|
|
return [
|
|
row async for row in self._safe_query(table, sql, where_params, where or {})
|
|
]
|
|
async def count(self, table: str, where: Optional[Dict[str, Any]] = None) -> int:
|
|
if not await self._table_exists(table):
|
|
return 0
|
|
where_clause, where_params = self._build_where(where)
|
|
sql = f"SELECT COUNT(*) FROM {table}{where_clause}"
|
|
gen = self._safe_query(table, sql, where_params, where or {})
|
|
async for row in gen:
|
|
return next(iter(row.values()), 0)
|
|
return 0
|
|
async def exists(self, table: str, where: Dict[str, Any]) -> bool:
|
|
return (await self.count(table, where)) > 0
|
|
async def kv_set(
|
|
self,
|
|
key: str,
|
|
value: Any,
|
|
*,
|
|
table: str | None = None,
|
|
) -> None:
|
|
tbl = table or self._KV_TABLE
|
|
json_val = json.dumps(value, default=str)
|
|
await self.upsert(tbl, {"value": json_val}, {"key": key})
|
|
async def kv_get(
|
|
self,
|
|
key: str,
|
|
*,
|
|
default: Any = None,
|
|
table: str | None = None,
|
|
) -> Any:
|
|
tbl = table or self._KV_TABLE
|
|
row = await self.get(tbl, {"key": key})
|
|
if not row:
|
|
return default
|
|
try:
|
|
return json.loads(row["value"])
|
|
except Exception:
|
|
return default
|
|
async def execute_raw(self, sql: str, params: Optional[Tuple] = None) -> Any:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
cursor = await db.execute(sql, params or ())
|
|
await db.commit()
|
|
return cursor
|
|
async def query_raw(self, sql: str, params: Optional[Tuple] = None) -> List[Dict[str, Any]]:
|
|
try:
|
|
async with aiosqlite.connect(self._file) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(sql, params or ()) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
except aiosqlite.OperationalError:
|
|
return []
|
|
async def query_one(self, sql: str, params: Optional[Tuple] = None) -> Optional[Dict[str, Any]]:
|
|
results = await self.query_raw(sql + " LIMIT 1", params)
|
|
return results[0] if results else None
|
|
async def create_table(self, table: str, schema: Dict[str, str], constraints: Optional[List[str]] = None):
|
|
full_schema = self._DEFAULT_COLUMNS.copy()
|
|
full_schema.update(schema)
|
|
columns = [f"`{col}` {dtype}" for col, dtype in full_schema.items()]
|
|
if constraints:
|
|
columns.extend(constraints)
|
|
columns_sql = ", ".join(columns)
|
|
async with aiosqlite.connect(self._file) as db:
|
|
await db.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns_sql})")
|
|
await db.commit()
|
|
await self._invalidate_column_cache(table)
|
|
async def insert_unique(self, table: str, args: Dict[str, Any], unique_fields: List[str]) -> Union[str, None]:
|
|
try:
|
|
return await self.insert(table, args)
|
|
except aiosqlite.IntegrityError as e:
|
|
if "UNIQUE" in str(e):
|
|
return None
|
|
raise
|
|
async def transaction(self):
|
|
return TransactionContext(self._file)
|
|
async def aggregate(self, table: str, function: str, column: str = "*", where: Optional[Dict[str, Any]] = None) -> Any:
|
|
if not await self._table_exists(table):
|
|
return None
|
|
where_clause, where_params = self._build_where(where)
|
|
sql = f"SELECT {function}({column}) as result FROM {table}{where_clause}"
|
|
result = await self.query_one(sql, tuple(where_params))
|
|
return result['result'] if result else None
|
|
|
|
class TransactionContext:
|
|
def __init__(self, db_file: str):
|
|
self.db_file = db_file
|
|
self.conn = None
|
|
async def __aenter__(self):
|
|
self.conn = await aiosqlite.connect(self.db_file)
|
|
self.conn.row_factory = aiosqlite.Row
|
|
await self.conn.execute("BEGIN")
|
|
return self.conn
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if exc_type is None:
|
|
await self.conn.commit()
|
|
else:
|
|
await self.conn.rollback()
|
|
await self.conn.close()
|
|
|
|
db = AsyncDataSet("zhurnal.db")
|
|
app = FastAPI()
|
|
repos_dir = pathlib.Path("repos")
|
|
logs_dir = pathlib.Path("logs")
|
|
logs_dir.mkdir(exist_ok=True)
|
|
log_file = logs_dir / "app.log"
|
|
salt = "x1337x"
|
|
|
|
def rotate_logs():
|
|
if not log_file.exists():
|
|
return
|
|
if log_file.stat().st_size < 1024 * 1024:
|
|
return
|
|
for i in range(5, 0, -1):
|
|
old = logs_dir / f"app.log.{i}"
|
|
new = logs_dir / f"app.log.{i+1}" if i < 5 else None
|
|
if old.exists():
|
|
if new:
|
|
old.rename(new)
|
|
else:
|
|
old.unlink()
|
|
log_file.rename(logs_dir / "app.log.1")
|
|
|
|
def log(message: str, redact_url: Optional[str] = None):
|
|
rotate_logs()
|
|
timestamp = datetime.now().isoformat()
|
|
if redact_url:
|
|
parsed = urllib.parse.urlparse(redact_url)
|
|
if parsed.username or parsed.password:
|
|
redacted_netloc = f"***@{parsed.hostname}"
|
|
if parsed.port:
|
|
redacted_netloc += f":{parsed.port}"
|
|
redacted_url = parsed._replace(netloc=redacted_netloc).geturl()
|
|
message = message.replace(redact_url, redacted_url)
|
|
with log_file.open("a") as f:
|
|
f.write(f"[{timestamp}] {message}\n")
|
|
|
|
connections: Dict[str, List[WebSocket]] = collections.defaultdict(list)
|
|
tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
async def broadcast(repo_hash: str, data: dict):
|
|
for connection in connections[repo_hash]:
|
|
await connection.send_json(data)
|
|
|
|
async def run_git_command(repo_path: str, command: list[str]) -> str:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', '-C', repo_path, *command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise Exception(stderr.decode().strip())
|
|
return stdout.decode().strip()[:1024 * 1024]
|
|
|
|
async def get_commits_by_day(repo_path: pathlib.Path) -> Dict[str, List[str]]:
|
|
log_output = await run_git_command(str(repo_path), ["log", "--pretty=format:%H|%ad", "--date=short"])
|
|
commits_by_date = collections.defaultdict(list)
|
|
for line in log_output.splitlines():
|
|
if "|" in line:
|
|
commit_hash, commit_date = line.split("|", 1)
|
|
commits_by_date[commit_date].append(commit_hash)
|
|
return commits_by_date
|
|
|
|
async def process_repo(url: str, repo_hash: str, is_user_request: bool = False):
|
|
try:
|
|
log(f"Processing repo {url} ({repo_hash})", redact_url=url)
|
|
repo_path = repos_dir / repo_hash
|
|
repos_dir.mkdir(exist_ok=True)
|
|
if not repo_path.exists():
|
|
await broadcast(repo_hash, {"type": "message", "content": "Cloning repository..."})
|
|
for _ in range(3):
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'clone', url, str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode == 0:
|
|
break
|
|
else:
|
|
raise subprocess.CalledProcessError(proc.returncode, ['git', 'clone'], stderr.decode())
|
|
except subprocess.CalledProcessError:
|
|
await asyncio.sleep(1)
|
|
else:
|
|
raise Exception("Clone failed after 3 tries")
|
|
else:
|
|
await broadcast(repo_hash, {"type": "message", "content": "Updating repository..."})
|
|
for _ in range(3):
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', '-C', str(repo_path), 'pull',
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode == 0:
|
|
break
|
|
else:
|
|
raise subprocess.CalledProcessError(proc.returncode, ['git', 'pull'], stderr.decode())
|
|
except subprocess.CalledProcessError:
|
|
await asyncio.sleep(1)
|
|
else:
|
|
raise Exception("Pull failed after 3 tries")
|
|
commits_by_day = await get_commits_by_day(repo_path)
|
|
history = {c["uid"]: c for c in await db.find("commits", {"repo": repo_hash})}
|
|
new_commits = 0
|
|
for date in sorted(commits_by_day):
|
|
await broadcast(repo_hash, {"type": "message", "content": f"Diffs for {date}"})
|
|
for commit in reversed(commits_by_day[date]):
|
|
if commit in history and history[commit]["status"] == "success":
|
|
continue
|
|
new_commits += 1
|
|
diff = await run_git_command(str(repo_path), ["show", commit, "--no-color", "--format=medium"])
|
|
prompt = (
|
|
"Generate a describtion about what is done (or fixed) functionally for end users.\n"
|
|
"Allowed prefixes: feat, fix, security, change.\n"
|
|
"No explanations.\n\n"
|
|
f"GIT DIFF:\n{diff}\n"
|
|
)
|
|
for _ in range(3):
|
|
try:
|
|
ai = client.APIClient()
|
|
result = ai.prompt(prompt, json=False, reduce_tokens=79000)
|
|
await broadcast(repo_hash, {"type": "message", "content": result})
|
|
await db.upsert("commits", {"repo": repo_hash, "date": date, "line": result, "status": "success", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit})
|
|
break
|
|
except Exception as e:
|
|
if _ == 2:
|
|
await broadcast(repo_hash, {"type": "message", "content": f"Commit {commit}: failed"})
|
|
await db.upsert("commits", {"repo": repo_hash, "date": date, "error": str(e), "status": "failed", "timestamp": datetime.utcnow().isoformat(timespec="seconds")}, {"uid": commit})
|
|
await asyncio.sleep(1)
|
|
if new_commits == 0 and is_user_request:
|
|
await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"})
|
|
else:
|
|
await broadcast(repo_hash, {"type": "redirect", "url": f"/changelog/{repo_hash}"})
|
|
await db.update("repos", {"last_synced": datetime.utcnow().isoformat()}, {"hash": repo_hash})
|
|
except Exception as e:
|
|
log(f"Error: {str(e)}", redact_url=url)
|
|
await broadcast(repo_hash, {"type": "error", "content": str(e)})
|
|
|
|
async def scheduler():
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
repos = await db.find("repos", {"hourly_enabled": True})
|
|
for repo in repos:
|
|
repo_hash = repo["hash"]
|
|
url = repo["original_url"]
|
|
if repo_hash not in tasks or tasks[repo_hash].done():
|
|
tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash))
|
|
|
|
|
|
schedule_task = None
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root():
|
|
global schedule_task
|
|
if not schedule_task:
|
|
schedule_task = asyncio.create_task(scheduler())
|
|
|
|
return open("index.html").read()
|
|
|
|
@app.post("/submit")
|
|
async def submit(request: Request):
|
|
form = await request.form()
|
|
url = form.get("repo_url")
|
|
if not url or len(url) > 1000:
|
|
return Response("Invalid URL", status_code=400)
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in ("http", "https") or not parsed.netloc:
|
|
return Response("Invalid URL", status_code=400)
|
|
repo_hash = hashlib.sha256(url.encode()).hexdigest()
|
|
repo = await db.get("repos", {"hash": repo_hash})
|
|
if not repo:
|
|
await db.insert("repos", {"hash": repo_hash, "original_url": url, "last_synced": None, "hourly_enabled": True})
|
|
if repo_hash not in tasks or tasks[repo_hash].done():
|
|
tasks[repo_hash] = asyncio.create_task(process_repo(url, repo_hash, True))
|
|
return {"hash": repo_hash}
|
|
|
|
@app.websocket("/ws/{repo_hash}")
|
|
async def ws(websocket: WebSocket, repo_hash: str):
|
|
await websocket.accept()
|
|
connections[repo_hash].append(websocket)
|
|
try:
|
|
while True:
|
|
await websocket.receive_text()
|
|
except WebSocketDisconnect:
|
|
connections[repo_hash].remove(websocket)
|
|
|
|
@app.get("/changelog/{repo_hash}", response_class=HTMLResponse)
|
|
async def get_changelog(repo_hash: str):
|
|
commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC")
|
|
by_date = collections.defaultdict(list)
|
|
for c in commits:
|
|
by_date[c["date"]].append(c["line"])
|
|
content = "".join(f"<h2>{date}</h2><ul>" + "".join(f"<li>{line}</li>" for line in by_date[date]) + "</ul>" for date in sorted(by_date, reverse=True))
|
|
html = open("changelog.html").read().replace("{{content}}", content).replace("{{hash}}", repo_hash)
|
|
return html
|
|
|
|
@app.get("/download/json/{repo_hash}")
|
|
async def download_json(repo_hash: str):
|
|
commits = await db.find("commits", {"repo": repo_hash, "status": "success"})
|
|
data = {c["uid"]: {"repo": ".", "date": c["date"], "line": c["line"], "status": c["status"], "timestamp": c["timestamp"]} for c in commits}
|
|
filename = f"changelog_{repo_hash}.json"
|
|
return Response(json.dumps(data), media_type="application/json", headers={"Content-Disposition": f"attachment; filename={filename}"})
|
|
|
|
@app.get("/download/md/{repo_hash}")
|
|
async def download_md(repo_hash: str):
|
|
commits = await db.find("commits", {"repo": repo_hash, "status": "success"}, order_by="date DESC")
|
|
by_date = collections.defaultdict(list)
|
|
for c in commits:
|
|
by_date[c["date"]].append(c["line"])
|
|
md = "# Changelog\n\n" + "\n".join(f"## {date}\n" + "\n".join(f"- {line}" for line in by_date[date]) + "\n" for date in sorted(by_date, reverse=True))
|
|
filename = f"changelog_{repo_hash}.md"
|
|
return Response(md, media_type="text/markdown", headers={"Content-Disposition": f"attachment; filename={filename}"})
|
|
|
|
@app.get("/system-log", response_class=PlainTextResponse)
|
|
async def get_log():
|
|
return log_file.read_text() if log_file.exists() else ""
|