import os import json import asyncio from typing import Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException from fastapi.responses import HTMLResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware import aiohttp from dotenv import load_dotenv import os if not os.path.exists(".env"): with open(".env.example", "w") as f: f.write("GITEA_BASE_URL=https://gitea.example.com\n") print("Created .env.example - please configure and rename to .env") load_dotenv() app = FastAPI() token = os.getenv('GITEA_TOKEN') app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class GiteaClient: def __init__(self, base_url, token=None, otp=None, sudo=None, timeout=30): self.base_url = base_url.rstrip("/") self.token = token self.otp = otp self.sudo = sudo self.timeout = aiohttp.ClientTimeout(total=timeout) self.session = None async def start(self): headers = {"Accept": "application/json"} if self.token: headers["Authorization"] = f"token {self.token}" if self.otp: headers["X-Gitea-OTP"] = self.otp if self.sudo: headers["Sudo"] = self.sudo self.session = aiohttp.ClientSession(headers=headers, timeout=self.timeout) async def stop(self): if self.session: await self.session.close() async def _request(self, method, path, params=None, json=None): url = f"{self.base_url}/{path.lstrip('/')}" async with self.session.request(method, url, params=params, json=json) as resp: status = resp.status headers = dict(resp.headers) ct = headers.get("Content-Type", "") if "application/json" in ct: data = await resp.json() else: data = await resp.text() return {"status": status, "headers": headers, "data": data} async def me(self): return await self._request("GET", "api/v1/user") async def user(self, username): return await self._request("GET", f"api/v1/users/{username}") async def user_tokens(self, username, page=None, limit=None): params = {} if page is not None: params["page"] = page if limit is not None: params["limit"] = limit return await self._request("GET", f"api/v1/users/{username}/tokens", params=params) async def create_token(self, username, name): return await self._request("POST", f"api/v1/users/{username}/tokens", json={"name": name}) async def delete_token(self, username, token_id): return await self._request("DELETE", f"api/v1/users/{username}/tokens/{token_id}") async def user_repos(self, username, page=None, limit=None, sort=None): params = {} if page is not None: params["page"] = page if limit is not None: params["limit"] = limit if sort is not None: params["sort"] = sort return await self._request("GET", f"api/v1/users/{username}/repos", params=params) async def repo(self, owner, repo): return await self._request("GET", f"api/v1/repos/{owner}/{repo}") async def create_repo(self, name, private=False, description=None, auto_init=False, default_branch=None): payload = {"name": name, "private": private, "auto_init": auto_init} if description is not None: payload["description"] = description if default_branch is not None: payload["default_branch"] = default_branch return await self._request("POST", "api/v1/user/repos", json=payload) async def repo_issues(self, owner, repo, state=None, page=None, limit=None): params = {} if state is not None: params["state"] = state if page is not None: params["page"] = page if limit is not None: params["limit"] = limit return await self._request("GET", f"api/v1/repos/{owner}/{repo}/issues", params=params) async def create_issue(self, owner, repo, title, body=None, assignees=None, labels=None, milestone=None): payload = {"title": title} if body is not None: payload["body"] = body if assignees is not None: payload["assignees"] = assignees if labels is not None: payload["labels"] = labels if milestone is not None: payload["milestone"] = milestone return await self._request("POST", f"api/v1/repos/{owner}/{repo}/issues", json=payload) class OpenAIClient: def __init__(self, api_key): self.api_key = api_key self.base_url = "https://api.openai.com/v1" self.session = None async def start(self): headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } self.session = aiohttp.ClientSession(headers=headers) async def stop(self): if self.session: await self.session.close() async def chat_completion(self, messages, model="gpt-4o-mini", tools=None, tool_choice=None): payload = {"model": model, "messages": messages} if tools: payload["tools"] = tools if tool_choice: payload["tool_choice"] = tool_choice async with self.session.post(f"{self.base_url}/chat/completions", json=payload) as resp: return await resp.json() async def authenticate_gitea(username, password): global token base_url = os.getenv("GITEA_BASE_URL", "https://gitea.example.com") return token async with aiohttp.ClientSession() as session: auth = aiohttp.BasicAuth(username, password) token = os.getenv('GITEA_TOKEN') print("DE TOKEN:", token) async with session.post(f"{base_url}/api/v1/users/{username}/tokens", json={"name": "mrissue_temp"}, headers={"Accept": "application/json","Authorization": f"token {token}", "Content-Type": "application/json"}) as resp: print(resp) if resp.status == 201: data = await resp.json() token = data.get("sha1") token_id = data.get("id") async with session.delete(f"{base_url}/api/v1/users/{username}/tokens/{token_id}", headers={"Authorization": f"token {token}"}) as del_resp: pass return token return None @app.get("/") async def read_root(): return FileResponse("index.html") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: auth_data = await websocket.receive_json() username = auth_data.get("username") password = auth_data.get("password") await websocket.send_json({"type": "log", "message": "Authenticating with Gitea..."}) token = await authenticate_gitea(username, password) if not token: await websocket.send_json({"type": "error", "message": "Authentication failed"}) await websocket.close() return await websocket.send_json({"type": "log", "message": "Authentication successful"}) prompt_data = await websocket.receive_json() prompt = prompt_data.get("prompt") gitea_base = os.getenv("GITEA_BASE_URL", "https://gitea.example.com") openai_key = os.getenv("OPENAI_API_KEY") gitea = GiteaClient(gitea_base, token=token) openai_client = OpenAIClient(openai_key) await gitea.start() await openai_client.start() try: await websocket.send_json({"type": "log", "message": "Fetching user repositories..."}) repos = [] page = 1 while True: repos_resp = await gitea.user_repos(username, limit=100, page=page) page_repos = repos_resp.get("data", []) if not page_repos: break repos.extend(page_repos) page += 1 repo_names = [r["name"] for r in repos] await websocket.send_json({"type": "log", "message": f"Found {len(repo_names)} repositories"}) await websocket.send_json({"type": "log", "message": "Analyzing prompt with AI..."}) messages = [ {"role": "system", "content": "You are a helpful assistant that extracts project names from user prompts."}, {"role": "user", "content": f"User prompt: {prompt}\n\nAvailable repositories: {', '.join(repo_names)}\n\nWhich repository should these issues be created in? Provide the exact repository name or the closest match. respond with the literal name only."} ] ai_resp = await openai_client.chat_completion(messages) project_match = ai_resp["choices"][0]["message"]["content"].strip() await websocket.send_json({"type": "log", "message": f"AI suggested project: {project_match}"}) matched_repo = None for repo in repos: if repo["name"].lower().strip() == project_match.lower().strip() or project_match.lower().strip() in repo["name"].lower().strip(): matched_repo = repo break if not matched_repo: await websocket.send_json({"type": "error", "message": f"Project not found: {project_match}"}) await websocket.close() return await websocket.send_json({"type": "log", "message": f"Matched to repository: {matched_repo['name']}"}) await websocket.send_json({"type": "log", "message": "Generating issue list with AI..."}) issue_messages = [ {"role": "system", "content": "You are a helpful assistant that creates structured issue lists from user prompts. Return ONLY a valid JSON array with objects containing 'title' and 'description' fields."}, {"role": "user", "content": f"Create a list of issues based on this prompt: {prompt}\n\nReturn as valid JSON array: [{'{\"title\": \"...\", \"description\": \"...\"}'}] without markup."} ] issues_resp = await openai_client.chat_completion(issue_messages) issues_text = issues_resp["choices"][0]["message"]["content"].strip() if "```json" in issues_text: issues_text = issues_text.split("```json")[0].split("""```""")[0] elif "```" in issues_text: issues_text = issues_text.split("``````")[0].strip() issues = json.loads(issues_text) await websocket.send_json({"type": "log", "message": f"Generated {len(issues)} issues"}) for idx, issue in enumerate(issues, 1): await websocket.send_json({"type": "log", "message": f"Creating issue {idx}/{len(issues)}: {issue['title']}"}) create_resp = await gitea.create_issue( username, matched_repo["name"], issue["title"], body=issue.get("description") ) if create_resp["status"] == 201: await websocket.send_json({"type": "log", "message": f"✓ Created: {issue['title']}"}) else: await websocket.send_json({"type": "log", "message": f"✗ Failed: {issue['title']}"}) await websocket.send_json({"type": "complete", "message": "All issues created successfully"}) finally: await gitea.stop() await openai_client.stop() except WebSocketDisconnect: pass except Exception as e: await websocket.send_json({"type": "error", "message": str(e)}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8590)