|
import os
|
|
import json
|
|
import asyncio
|
|
from typing import Optional
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
|
|
from fastapi.responses import HTMLResponse, FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import aiohttp
|
|
from dotenv import load_dotenv
|
|
|
|
import os
|
|
if not os.path.exists(".env"):
|
|
with open(".env.example", "w") as f:
|
|
f.write("GITEA_BASE_URL=https://gitea.example.com\n")
|
|
print("Created .env.example - please configure and rename to .env")
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
|
|
token = os.getenv('GITEA_TOKEN')
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class GiteaClient:
|
|
def __init__(self, base_url, token=None, otp=None, sudo=None, timeout=30):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
self.otp = otp
|
|
self.sudo = sudo
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
|
self.session = None
|
|
|
|
async def start(self):
|
|
headers = {"Accept": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
if self.otp:
|
|
headers["X-Gitea-OTP"] = self.otp
|
|
if self.sudo:
|
|
headers["Sudo"] = self.sudo
|
|
self.session = aiohttp.ClientSession(headers=headers, timeout=self.timeout)
|
|
|
|
async def stop(self):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def _request(self, method, path, params=None, json=None):
|
|
url = f"{self.base_url}/{path.lstrip('/')}"
|
|
async with self.session.request(method, url, params=params, json=json) as resp:
|
|
status = resp.status
|
|
headers = dict(resp.headers)
|
|
ct = headers.get("Content-Type", "")
|
|
if "application/json" in ct:
|
|
data = await resp.json()
|
|
else:
|
|
data = await resp.text()
|
|
return {"status": status, "headers": headers, "data": data}
|
|
|
|
async def me(self):
|
|
return await self._request("GET", "api/v1/user")
|
|
|
|
async def user(self, username):
|
|
return await self._request("GET", f"api/v1/users/{username}")
|
|
|
|
async def user_tokens(self, username, page=None, limit=None):
|
|
params = {}
|
|
if page is not None:
|
|
params["page"] = page
|
|
if limit is not None:
|
|
params["limit"] = limit
|
|
return await self._request("GET", f"api/v1/users/{username}/tokens", params=params)
|
|
|
|
async def create_token(self, username, name):
|
|
return await self._request("POST", f"api/v1/users/{username}/tokens", json={"name": name})
|
|
|
|
async def delete_token(self, username, token_id):
|
|
return await self._request("DELETE", f"api/v1/users/{username}/tokens/{token_id}")
|
|
|
|
async def user_repos(self, username, page=None, limit=None, sort=None):
|
|
params = {}
|
|
if page is not None:
|
|
params["page"] = page
|
|
if limit is not None:
|
|
params["limit"] = limit
|
|
if sort is not None:
|
|
params["sort"] = sort
|
|
return await self._request("GET", f"api/v1/users/{username}/repos", params=params)
|
|
|
|
async def repo(self, owner, repo):
|
|
return await self._request("GET", f"api/v1/repos/{owner}/{repo}")
|
|
|
|
async def create_repo(self, name, private=False, description=None, auto_init=False, default_branch=None):
|
|
payload = {"name": name, "private": private, "auto_init": auto_init}
|
|
if description is not None:
|
|
payload["description"] = description
|
|
if default_branch is not None:
|
|
payload["default_branch"] = default_branch
|
|
return await self._request("POST", "api/v1/user/repos", json=payload)
|
|
|
|
async def repo_issues(self, owner, repo, state=None, page=None, limit=None):
|
|
params = {}
|
|
if state is not None:
|
|
params["state"] = state
|
|
if page is not None:
|
|
params["page"] = page
|
|
if limit is not None:
|
|
params["limit"] = limit
|
|
return await self._request("GET", f"api/v1/repos/{owner}/{repo}/issues", params=params)
|
|
|
|
async def create_issue(self, owner, repo, title, body=None, assignees=None, labels=None, milestone=None):
|
|
payload = {"title": title}
|
|
if body is not None:
|
|
payload["body"] = body
|
|
if assignees is not None:
|
|
payload["assignees"] = assignees
|
|
if labels is not None:
|
|
payload["labels"] = labels
|
|
if milestone is not None:
|
|
payload["milestone"] = milestone
|
|
return await self._request("POST", f"api/v1/repos/{owner}/{repo}/issues", json=payload)
|
|
|
|
class OpenAIClient:
|
|
def __init__(self, api_key):
|
|
self.api_key = api_key
|
|
self.base_url = "https://api.openai.com/v1"
|
|
self.session = None
|
|
|
|
async def start(self):
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
self.session = aiohttp.ClientSession(headers=headers)
|
|
|
|
async def stop(self):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def chat_completion(self, messages, model="gpt-4o-mini", tools=None, tool_choice=None):
|
|
payload = {"model": model, "messages": messages}
|
|
if tools:
|
|
payload["tools"] = tools
|
|
if tool_choice:
|
|
payload["tool_choice"] = tool_choice
|
|
async with self.session.post(f"{self.base_url}/chat/completions", json=payload) as resp:
|
|
return await resp.json()
|
|
|
|
async def authenticate_gitea(username, password):
|
|
global token
|
|
base_url = os.getenv("GITEA_BASE_URL", "https://gitea.example.com")
|
|
return token
|
|
async with aiohttp.ClientSession() as session:
|
|
auth = aiohttp.BasicAuth(username, password)
|
|
token = os.getenv('GITEA_TOKEN')
|
|
print("DE TOKEN:", token)
|
|
async with session.post(f"{base_url}/api/v1/users/{username}/tokens",
|
|
json={"name": "mrissue_temp"}, headers={"Accept": "application/json","Authorization": f"token {token}", "Content-Type": "application/json"}) as resp:
|
|
print(resp)
|
|
if resp.status == 201:
|
|
data = await resp.json()
|
|
token = data.get("sha1")
|
|
token_id = data.get("id")
|
|
async with session.delete(f"{base_url}/api/v1/users/{username}/tokens/{token_id}",
|
|
headers={"Authorization": f"token {token}"}) as del_resp:
|
|
pass
|
|
return token
|
|
return None
|
|
|
|
@app.get("/")
|
|
async def read_root():
|
|
return FileResponse("index.html")
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
|
|
try:
|
|
auth_data = await websocket.receive_json()
|
|
username = auth_data.get("username")
|
|
password = auth_data.get("password")
|
|
|
|
await websocket.send_json({"type": "log", "message": "Authenticating with Gitea..."})
|
|
|
|
token = await authenticate_gitea(username, password)
|
|
if not token:
|
|
await websocket.send_json({"type": "error", "message": "Authentication failed"})
|
|
await websocket.close()
|
|
return
|
|
|
|
await websocket.send_json({"type": "log", "message": "Authentication successful"})
|
|
|
|
prompt_data = await websocket.receive_json()
|
|
prompt = prompt_data.get("prompt")
|
|
|
|
gitea_base = os.getenv("GITEA_BASE_URL", "https://gitea.example.com")
|
|
openai_key = os.getenv("OPENAI_API_KEY")
|
|
|
|
gitea = GiteaClient(gitea_base, token=token)
|
|
openai_client = OpenAIClient(openai_key)
|
|
|
|
await gitea.start()
|
|
await openai_client.start()
|
|
|
|
try:
|
|
await websocket.send_json({"type": "log", "message": "Fetching user repositories..."})
|
|
repos = []
|
|
page = 1
|
|
while True:
|
|
repos_resp = await gitea.user_repos(username, limit=100, page=page)
|
|
page_repos = repos_resp.get("data", [])
|
|
if not page_repos:
|
|
break
|
|
repos.extend(page_repos)
|
|
page += 1
|
|
repo_names = [r["name"] for r in repos]
|
|
await websocket.send_json({"type": "log", "message": f"Found {len(repo_names)} repositories"})
|
|
|
|
await websocket.send_json({"type": "log", "message": "Analyzing prompt with AI..."})
|
|
|
|
messages = [
|
|
{"role": "system", "content": "You are a helpful assistant that extracts project names from user prompts."},
|
|
{"role": "user", "content": f"User prompt: {prompt}\n\nAvailable repositories: {', '.join(repo_names)}\n\nWhich repository should these issues be created in? Provide the exact repository name or the closest match. respond with the literal name only."}
|
|
]
|
|
|
|
ai_resp = await openai_client.chat_completion(messages)
|
|
project_match = ai_resp["choices"][0]["message"]["content"].strip()
|
|
|
|
await websocket.send_json({"type": "log", "message": f"AI suggested project: {project_match}"})
|
|
|
|
matched_repo = None
|
|
for repo in repos:
|
|
if repo["name"].lower().strip() == project_match.lower().strip() or project_match.lower().strip() in repo["name"].lower().strip():
|
|
matched_repo = repo
|
|
break
|
|
|
|
if not matched_repo:
|
|
await websocket.send_json({"type": "error", "message": f"Project not found: {project_match}"})
|
|
await websocket.close()
|
|
return
|
|
|
|
await websocket.send_json({"type": "log", "message": f"Matched to repository: {matched_repo['name']}"})
|
|
|
|
await websocket.send_json({"type": "log", "message": "Generating issue list with AI..."})
|
|
|
|
issue_messages = [
|
|
{"role": "system", "content": "You are a helpful assistant that creates structured issue lists from user prompts. Return ONLY a valid JSON array with objects containing 'title' and 'description' fields."},
|
|
{"role": "user", "content": f"Create a list of issues based on this prompt: {prompt}\n\nReturn as valid JSON array: [{'{\"title\": \"...\", \"description\": \"...\"}'}] without markup."}
|
|
]
|
|
issues_resp = await openai_client.chat_completion(issue_messages)
|
|
issues_text = issues_resp["choices"][0]["message"]["content"].strip()
|
|
|
|
if "```json" in issues_text:
|
|
issues_text = issues_text.split("```json")[0].split("""```""")[0]
|
|
elif "```" in issues_text:
|
|
issues_text = issues_text.split("``````")[0].strip()
|
|
|
|
issues = json.loads(issues_text)
|
|
|
|
await websocket.send_json({"type": "log", "message": f"Generated {len(issues)} issues"})
|
|
|
|
for idx, issue in enumerate(issues, 1):
|
|
await websocket.send_json({"type": "log", "message": f"Creating issue {idx}/{len(issues)}: {issue['title']}"})
|
|
|
|
create_resp = await gitea.create_issue(
|
|
username,
|
|
matched_repo["name"],
|
|
issue["title"],
|
|
body=issue.get("description")
|
|
)
|
|
|
|
if create_resp["status"] == 201:
|
|
await websocket.send_json({"type": "log", "message": f"✓ Created: {issue['title']}"})
|
|
else:
|
|
await websocket.send_json({"type": "log", "message": f"✗ Failed: {issue['title']}"})
|
|
|
|
await websocket.send_json({"type": "complete", "message": "All issues created successfully"})
|
|
|
|
finally:
|
|
await gitea.stop()
|
|
await openai_client.stop()
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
await websocket.send_json({"type": "error", "message": str(e)})
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="127.0.0.1", port=8590)
|
|
|