"""
MIT License
Author: retoor <retoor@molodetz.nl>
This module provides a `GiteaRepoManager` class for managing Gitea repositories.
It supports authentication via username/password, token generation, listing repositories,
checking for commits, and deleting repositories with safety and logging.
Usage:
Instantiate the class with your Gitea API URL, username, and password.
Use methods like `list_repositories()`, `manage_repositories()`, etc., to perform operations.
Dependencies:
- requests
"""
import requests
import logging
from typing import Optional, List, Dict, Any
# Configure logger for the module
logger = logging.getLogger("GiteaRepoManager")
logger.setLevel(logging.INFO) # Default level; override as needed externally
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(name)s: %(message)s')
handler.setFormatter(formatter)
if not logger.hasHandlers():
logger.addHandler(handler)
class GiteaRepoManager:
"""
Manage Gitea repositories using username/password authentication.
Generates a personal access token internally for API calls.
"""
def __init__(self, api_url: str, username: str, password: str, token: str):
"""
Initialize the GiteaRepoManager with API URL and credentials.
Args:
api_url (str): Base URL of the Gitea API (e.g., 'https://gitea.example.com').
username (str): Gitea username.
password (str): Gitea password.
token (Optional[str]): Optional pre-existing token. If None, a new token is generated.
otp (Optional[str]): Optional one-time password for 2FA.
Raises:
ValueError: If token generation fails.
"""
self.api_url = api_url.rstrip('/')
self.username = username
self.password = password
self.token = token
if self.token:
self.headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
else:
raise ValueError("Token is required.")
def list_repositories(self,page=1) -> List[Dict[str, Any]]:
"""
List repositories accessible with the current token.
Returns:
List[Dict[str, Any]]: List of repository data dictionaries.
"""
url = f"{self.api_url}/repos/search?limit=500&page={page}"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
result = response.json().get('data', [])
logger.info(f"Successfully listed {len(result)} repositories from page {page}.")
if result:
return result + self.list_repositories(page+1)
if page == 1:
logger.warning(f"Error listing repositories: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during listing repositories: {e}", exc_info=True)
return []
def delete_repository(self, owner: str, repo_name: str, dry_run: bool = False) -> None:
"""
Delete a repository by owner and name.
Args:
owner (str): Username of the repository owner.
repo_name (str): Name of the repository.
dry_run (bool): If True, only logs the intended deletion without executing.
Returns:
None
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}"
if dry_run:
logger.info(f"Would delete repository: {owner}/{repo_name}")
return
try:
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
logger.info(f"Deleted repository: {owner}/{repo_name}")
else:
logger.warning(f"Error deleting repository {owner}/{repo_name}: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during deleting repository {owner}/{repo_name}: {e}", exc_info=True)
def has_commits(self, owner: str, repo_name: str) -> bool:
"""
Check if a repository has commits.
Args:
owner (str): Username of the repository owner.
repo_name (str): Repository name.
Returns:
bool: True if commits exist, False otherwise.
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}/commits"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 409:
# 409 indicates no commits (empty repository)
logger.debug(f"No commits found for {owner}/{repo_name}.")
return False
elif response.status_code == 200:
logger.debug(f"Commits found for {owner}/{repo_name}.")
return True
else:
logger.warning(f"Unexpected status code checking commits for {owner}/{repo_name}: {response.status_code}")
except requests.RequestException as e:
logger.error(f"Exception during checking commits for {owner}/{repo_name}: {e}", exc_info=True)
return False
def should_delete(self, repo: Dict[str, Any]) -> bool:
"""
Determine if a repository should be deleted based on criteria.
Args:
repo (Dict[str, Any]): Repository data dictionary.
Returns:
bool: True if the repository should be deleted, False otherwise.
"""
try:
owner = repo['owner']['username']
repo_name = repo['name']
# Placeholder logic: delete if no commits
return not self.has_commits(owner, repo_name)
except Exception as e:
logger.error(f"Exception in should_delete for repo {repo}: {e}", exc_info=True)
return False
def manage_repositories(self, dry_run: bool = False) -> List[Dict[str, Any]]:
"""
List repositories and delete those that meet deletion criteria.
Args:
dry_run (bool): If True, only logs actions without executing deletions.
Returns:
List[Dict[str, Any]]: List of repositories with updated 'pending_delete' flags.
"""
repos = self.list_repositories()
if not repos:
logger.warning("No repositories found or error occurred.")
return []
for repo in repos:
try:
repo['pending_delete'] = self.should_delete(repo)
if repo['pending_delete']:
self.delete_repository(repo['owner']['username'], repo['name'], dry_run=dry_run)
except Exception as e:
logger.error(f"Exception managing repository {repo.get('name', 'unknown')}: {e}", exc_info=True)
return repos