import os
import json
import asyncio
from typing import Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
import aiohttp
from dotenv import load_dotenv
import os
if not os.path.exists(".env"):
with open(".env.example", "w") as f:
f.write("GITEA_BASE_URL=https://gitea.example.com\n")
print("Created .env.example - please configure and rename to .env")
load_dotenv()
app = FastAPI()
token = os.getenv('GITEA_TOKEN')
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class GiteaClient:
def __init__(self, base_url, token=None, otp=None, sudo=None, timeout=30):
self.base_url = base_url.rstrip("/")
self.token = token
self.otp = otp
self.sudo = sudo
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
async def start(self):
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
if self.otp:
headers["X-Gitea-OTP"] = self.otp
if self.sudo:
headers["Sudo"] = self.sudo
self.session = aiohttp.ClientSession(headers=headers, timeout=self.timeout)
async def stop(self):
if self.session:
await self.session.close()
async def _request(self, method, path, params=None, json=None):
url = f"{self.base_url}/{path.lstrip('/')}"
async with self.session.request(method, url, params=params, json=json) as resp:
status = resp.status
headers = dict(resp.headers)
ct = headers.get("Content-Type", "")
if "application/json" in ct:
data = await resp.json()
else:
data = await resp.text()
return {"status": status, "headers": headers, "data": data}
async def me(self):
return await self._request("GET", "api/v1/user")
async def user(self, username):
return await self._request("GET", f"api/v1/users/{username}")
async def user_tokens(self, username, page=None, limit=None):
params = {}
if page is not None:
params["page"] = page
if limit is not None:
params["limit"] = limit
return await self._request("GET", f"api/v1/users/{username}/tokens", params=params)
async def create_token(self, username, name):
return await self._request("POST", f"api/v1/users/{username}/tokens", json={"name": name})
async def delete_token(self, username, token_id):
return await self._request("DELETE", f"api/v1/users/{username}/tokens/{token_id}")
async def user_repos(self, username, page=None, limit=None, sort=None):
params = {}
if page is not None:
params["page"] = page
if limit is not None:
params["limit"] = limit
if sort is not None:
params["sort"] = sort
return await self._request("GET", f"api/v1/users/{username}/repos", params=params)
async def repo(self, owner, repo):
return await self._request("GET", f"api/v1/repos/{owner}/{repo}")
async def create_repo(self, name, private=False, description=None, auto_init=False, default_branch=None):
payload = {"name": name, "private": private, "auto_init": auto_init}
if description is not None:
payload["description"] = description
if default_branch is not None:
payload["default_branch"] = default_branch
return await self._request("POST", "api/v1/user/repos", json=payload)
async def repo_issues(self, owner, repo, state=None, page=None, limit=None):
params = {}
if state is not None:
params["state"] = state
if page is not None:
params["page"] = page
if limit is not None:
params["limit"] = limit
return await self._request("GET", f"api/v1/repos/{owner}/{repo}/issues", params=params)
async def create_issue(self, owner, repo, title, body=None, assignees=None, labels=None, milestone=None):
payload = {"title": title}
if body is not None:
payload["body"] = body
if assignees is not None:
payload["assignees"] = assignees
if labels is not None:
payload["labels"] = labels
if milestone is not None:
payload["milestone"] = milestone
return await self._request("POST", f"api/v1/repos/{owner}/{repo}/issues", json=payload)
class OpenAIClient:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
self.session = None
async def start(self):
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
self.session = aiohttp.ClientSession(headers=headers)
async def stop(self):
if self.session:
await self.session.close()
async def chat_completion(self, messages, model="gpt-4o-mini", tools=None, tool_choice=None):
payload = {"model": model, "messages": messages}
if tools:
payload["tools"] = tools
if tool_choice:
payload["tool_choice"] = tool_choice
async with self.session.post(f"{self.base_url}/chat/completions", json=payload) as resp:
return await resp.json()
async def authenticate_gitea(username, password):
global token
base_url = os.getenv("GITEA_BASE_URL", "https://gitea.example.com")
return token
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(username, password)
token = os.getenv('GITEA_TOKEN')
print("DE TOKEN:", token)
async with session.post(f"{base_url}/api/v1/users/{username}/tokens",
json={"name": "mrissue_temp"}, headers={"Accept": "application/json","Authorization": f"token {token}", "Content-Type": "application/json"}) as resp:
print(resp)
if resp.status == 201:
data = await resp.json()
token = data.get("sha1")
token_id = data.get("id")
async with session.delete(f"{base_url}/api/v1/users/{username}/tokens/{token_id}",
headers={"Authorization": f"token {token}"}) as del_resp:
pass
return token
return None
@app.get("/")
async def read_root():
return FileResponse("index.html")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
auth_data = await websocket.receive_json()
username = auth_data.get("username")
password = auth_data.get("password")
await websocket.send_json({"type": "log", "message": "Authenticating with Gitea..."})
token = await authenticate_gitea(username, password)
if not token:
await websocket.send_json({"type": "error", "message": "Authentication failed"})
await websocket.close()
return
await websocket.send_json({"type": "log", "message": "Authentication successful"})
prompt_data = await websocket.receive_json()
prompt = prompt_data.get("prompt")
gitea_base = os.getenv("GITEA_BASE_URL", "https://gitea.example.com")
openai_key = os.getenv("OPENAI_API_KEY")
gitea = GiteaClient(gitea_base, token=token)
openai_client = OpenAIClient(openai_key)
await gitea.start()
await openai_client.start()
try:
await websocket.send_json({"type": "log", "message": "Fetching user repositories..."})
repos = []
page = 1
while True:
repos_resp = await gitea.user_repos(username, limit=100, page=page)
page_repos = repos_resp.get("data", [])
if not page_repos:
break
repos.extend(page_repos)
page += 1
repo_names = [r["name"] for r in repos]
await websocket.send_json({"type": "log", "message": f"Found {len(repo_names)} repositories"})
await websocket.send_json({"type": "log", "message": "Analyzing prompt with AI..."})
messages = [
{"role": "system", "content": "You are a helpful assistant that extracts project names from user prompts."},
{"role": "user", "content": f"User prompt: {prompt}\n\nAvailable repositories: {', '.join(repo_names)}\n\nWhich repository should these issues be created in? Provide the exact repository name or the closest match. respond with the literal name only."}
]
ai_resp = await openai_client.chat_completion(messages)
project_match = ai_resp["choices"][0]["message"]["content"].strip()
await websocket.send_json({"type": "log", "message": f"AI suggested project: {project_match}"})
matched_repo = None
for repo in repos:
if repo["name"].lower().strip() == project_match.lower().strip() or project_match.lower().strip() in repo["name"].lower().strip():
matched_repo = repo
break
if not matched_repo:
await websocket.send_json({"type": "error", "message": f"Project not found: {project_match}"})
await websocket.close()
return
await websocket.send_json({"type": "log", "message": f"Matched to repository: {matched_repo['name']}"})
await websocket.send_json({"type": "log", "message": "Generating issue list with AI..."})
issue_messages = [
{"role": "system", "content": "You are a helpful assistant that creates structured issue lists from user prompts. Return ONLY a valid JSON array with objects containing 'title' and 'description' fields."},
{"role": "user", "content": f"Create a list of issues based on this prompt: {prompt}\n\nReturn as valid JSON array: [{'{\"title\": \"...\", \"description\": \"...\"}'}] without markup."}
]
issues_resp = await openai_client.chat_completion(issue_messages)
issues_text = issues_resp["choices"][0]["message"]["content"].strip()
if "```json" in issues_text:
issues_text = issues_text.split("```json")[0].split("""```""")[0]
elif "```" in issues_text:
issues_text = issues_text.split("``````")[0].strip()
issues = json.loads(issues_text)
await websocket.send_json({"type": "log", "message": f"Generated {len(issues)} issues"})
for idx, issue in enumerate(issues, 1):
await websocket.send_json({"type": "log", "message": f"Creating issue {idx}/{len(issues)}: {issue['title']}"})
create_resp = await gitea.create_issue(
username,
matched_repo["name"],
issue["title"],
body=issue.get("description")
)
if create_resp["status"] == 201:
await websocket.send_json({"type": "log", "message": f"✓ Created: {issue['title']}"})
else:
await websocket.send_json({"type": "log", "message": f"✗ Failed: {issue['title']}"})
await websocket.send_json({"type": "complete", "message": "All issues created successfully"})
finally:
await gitea.stop()
await openai_client.stop()
except WebSocketDisconnect:
pass
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8590)