"""
MIT License
Author: retoor <retoor@molodetz.nl>
This module provides a `GiteaRepoManager` class for managing Gitea repositories.
It supports authentication via username/password, token generation, listing repositories,
checking for commits, and deleting repositories with safety and logging.
Usage:
Instantiate the class with your Gitea API URL, username, and password.
Use methods like `list_repositories()`, `manage_repositories()`, etc., to perform operations.
Dependencies:
- requests
"""
import requests
import logging
from typing import Optional, List, Dict, Any
# Configure logger for the module
logger = logging.getLogger("GiteaRepoManager")
logger.setLevel(logging.INFO) # Default level; override as needed externally
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(name)s: %(message)s')
handler.setFormatter(formatter)
if not logger.hasHandlers():
logger.addHandler(handler)
class GiteaRepoManager:
"""
Manage Gitea repositories using username/password authentication.
Generates a personal access token internally for API calls.
"""
def __init__(self, api_url: str, username: str, password: str, token: Optional[str] = None, otp: Optional[str] = None):
"""
Initialize the GiteaRepoManager with API URL and credentials.
Args:
api_url (str): Base URL of the Gitea API (e.g., 'https://gitea.example.com').
username (str): Gitea username.
password (str): Gitea password.
token (Optional[str]): Optional pre-existing token. If None, a new token is generated.
otp (Optional[str]): Optional one-time password for 2FA.
Raises:
ValueError: If token generation fails.
"""
self.api_url = api_url.rstrip('/')
self.username = username
self.password = password
self.token = token or self.generate_token(otp)
if self.token:
self.headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
logger.info("Created API token 'script_token'. Delete from Gitea settings when done.")
else:
logger.error("Failed to generate token during initialization.")
raise ValueError("Failed to generate token")
def generate_token(self, otp: Optional[str] = None) -> Optional[str]:
"""
Generate a personal access token for API authentication.
Args:
otp (Optional[str]): One-time password for 2FA, if enabled.
Returns:
Optional[str]: The generated token SHA1 string if successful, None otherwise.
"""
url = f"{self.api_url}/api/v1/users/{self.username}/tokens"
auth = (self.username, self.password)
headers = {"X-Gitea-OTP": otp} if otp else {}
data = {"name": "script_token"}
try:
response = requests.post(url, auth=auth, headers=headers, json=data)
logger.debug(f"Token generation request URL: {url}")
logger.debug(f"Request headers: {headers}")
logger.debug(f"Request data: {data}")
logger.debug(f"Response status code: {response.status_code}")
logger.debug(f"Response text: {response.text}")
if response.status_code == 201:
return response.json()["sha1"]
logger.error(f"Error generating token: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during token generation: {e}", exc_info=True)
return None
def list_repositories(self) -> List[Dict[str, Any]]:
"""
List repositories accessible with the current token.
Returns:
List[Dict[str, Any]]: List of repository data dictionaries.
"""
url = f"{self.api_url}/repos/search?limit=500"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
logger.debug("Successfully listed repositories.")
return response.json().get('data', [])
logger.warning(f"Error listing repositories: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during listing repositories: {e}", exc_info=True)
return []
def delete_repository(self, owner: str, repo_name: str, dry_run: bool = False) -> None:
"""
Delete a repository by owner and name.
Args:
owner (str): Username of the repository owner.
repo_name (str): Name of the repository.
dry_run (bool): If True, only logs the intended deletion without executing.
Returns:
None
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}"
if dry_run:
logger.info(f"Would delete repository: {owner}/{repo_name}")
return
try:
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
logger.info(f"Deleted repository: {owner}/{repo_name}")
else:
logger.warning(f"Error deleting repository {owner}/{repo_name}: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during deleting repository {owner}/{repo_name}: {e}", exc_info=True)
def has_commits(self, owner: str, repo_name: str) -> bool:
"""
Check if a repository has commits.
Args:
owner (str): Username of the repository owner.
repo_name (str): Repository name.
Returns:
bool: True if commits exist, False otherwise.
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}/commits"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 409:
# 409 indicates no commits (empty repository)
logger.debug(f"No commits found for {owner}/{repo_name}.")
return False
elif response.status_code == 200:
logger.debug(f"Commits found for {owner}/{repo_name}.")
return True
else:
logger.warning(f"Unexpected status code checking commits for {owner}/{repo_name}: {response.status_code}")
except requests.RequestException as e:
logger.error(f"Exception during checking commits for {owner}/{repo_name}: {e}", exc_info=True)
return False
def should_delete(self, repo: Dict[str, Any]) -> bool:
"""
Determine if a repository should be deleted based on criteria.
Args:
repo (Dict[str, Any]): Repository data dictionary.
Returns:
bool: True if the repository should be deleted, False otherwise.
"""
try:
owner = repo['owner']['username']
repo_name = repo['name']
# Placeholder logic: delete if no commits
return not self.has_commits(owner, repo_name)
except Exception as e:
logger.error(f"Exception in should_delete for repo {repo}: {e}", exc_info=True)
return False
def manage_repositories(self, dry_run: bool = False) -> List[Dict[str, Any]]:
"""
List repositories and delete those that meet deletion criteria.
Args:
dry_run (bool): If True, only logs actions without executing deletions.
Returns:
List[Dict[str, Any]]: List of repositories with updated 'pending_delete' flags.
"""
repos = self.list_repositories()
if not repos:
logger.warning("No repositories found or error occurred.")
return []
for repo in repos:
try:
repo['pending_delete'] = self.should_delete(repo)
if repo['pending_delete']:
self.delete_repository(repo['owner']['username'], repo['name'], dry_run=dry_run)
except Exception as e:
logger.error(f"Exception managing repository {repo.get('name', 'unknown')}: {e}", exc_info=True)
return repos