|
"""
|
|
MIT License
|
|
|
|
Author: retoor <retoor@molodetz.nl>
|
|
|
|
This module provides a `GiteaRepoManager` class for managing Gitea repositories.
|
|
It supports authentication via username/password, token generation, listing repositories,
|
|
checking for commits, and deleting repositories with safety and logging.
|
|
|
|
Usage:
|
|
Instantiate the class with your Gitea API URL, username, and password.
|
|
Use methods like `list_repositories()`, `manage_repositories()`, etc., to perform operations.
|
|
|
|
Dependencies:
|
|
- requests
|
|
|
|
"""
|
|
|
|
import requests
|
|
import logging
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
# Configure logger for the module
|
|
logger = logging.getLogger("GiteaRepoManager")
|
|
logger.setLevel(logging.INFO) # Default level; override as needed externally
|
|
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(name)s: %(message)s')
|
|
handler.setFormatter(formatter)
|
|
if not logger.hasHandlers():
|
|
logger.addHandler(handler)
|
|
|
|
class GiteaRepoManager:
|
|
"""
|
|
Manage Gitea repositories using username/password authentication.
|
|
Generates a personal access token internally for API calls.
|
|
"""
|
|
|
|
def __init__(self, api_url: str, username: str, password: str, token: Optional[str] = None, otp: Optional[str] = None):
|
|
"""
|
|
Initialize the GiteaRepoManager with API URL and credentials.
|
|
|
|
Args:
|
|
api_url (str): Base URL of the Gitea API (e.g., 'https://gitea.example.com').
|
|
username (str): Gitea username.
|
|
password (str): Gitea password.
|
|
token (Optional[str]): Optional pre-existing token. If None, a new token is generated.
|
|
otp (Optional[str]): Optional one-time password for 2FA.
|
|
|
|
Raises:
|
|
ValueError: If token generation fails.
|
|
"""
|
|
self.api_url = api_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.token = token or self.generate_token(otp)
|
|
if self.token:
|
|
self.headers = {
|
|
"Authorization": f"token {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
logger.info("Created API token 'script_token'. Delete from Gitea settings when done.")
|
|
else:
|
|
logger.error("Failed to generate token during initialization.")
|
|
raise ValueError("Failed to generate token")
|
|
|
|
def generate_token(self, otp: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Generate a personal access token for API authentication.
|
|
|
|
Args:
|
|
otp (Optional[str]): One-time password for 2FA, if enabled.
|
|
|
|
Returns:
|
|
Optional[str]: The generated token SHA1 string if successful, None otherwise.
|
|
"""
|
|
url = f"{self.api_url}/api/v1/users/{self.username}/tokens"
|
|
auth = (self.username, self.password)
|
|
headers = {"X-Gitea-OTP": otp} if otp else {}
|
|
data = {"name": "script_token"}
|
|
try:
|
|
response = requests.post(url, auth=auth, headers=headers, json=data)
|
|
logger.debug(f"Token generation request URL: {url}")
|
|
logger.debug(f"Request headers: {headers}")
|
|
logger.debug(f"Request data: {data}")
|
|
logger.debug(f"Response status code: {response.status_code}")
|
|
logger.debug(f"Response text: {response.text}")
|
|
if response.status_code == 201:
|
|
return response.json()["sha1"]
|
|
logger.error(f"Error generating token: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during token generation: {e}", exc_info=True)
|
|
return None
|
|
|
|
def list_repositories(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
List repositories accessible with the current token.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of repository data dictionaries.
|
|
"""
|
|
url = f"{self.api_url}/repos/search"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
if response.status_code == 200:
|
|
logger.debug("Successfully listed repositories.")
|
|
return response.json().get('data', [])
|
|
logger.warning(f"Error listing repositories: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during listing repositories: {e}", exc_info=True)
|
|
return []
|
|
|
|
def delete_repository(self, owner: str, repo_name: str, dry_run: bool = False) -> None:
|
|
"""
|
|
Delete a repository by owner and name.
|
|
|
|
Args:
|
|
owner (str): Username of the repository owner.
|
|
repo_name (str): Name of the repository.
|
|
dry_run (bool): If True, only logs the intended deletion without executing.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
url = f"{self.api_url}/repos/{owner}/{repo_name}"
|
|
if dry_run:
|
|
logger.info(f"Would delete repository: {owner}/{repo_name}")
|
|
return
|
|
try:
|
|
response = requests.delete(url, headers=self.headers)
|
|
if response.status_code == 204:
|
|
logger.info(f"Deleted repository: {owner}/{repo_name}")
|
|
else:
|
|
logger.warning(f"Error deleting repository {owner}/{repo_name}: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during deleting repository {owner}/{repo_name}: {e}", exc_info=True)
|
|
|
|
def has_commits(self, owner: str, repo_name: str) -> bool:
|
|
"""
|
|
Check if a repository has commits.
|
|
|
|
Args:
|
|
owner (str): Username of the repository owner.
|
|
repo_name (str): Repository name.
|
|
|
|
Returns:
|
|
bool: True if commits exist, False otherwise.
|
|
"""
|
|
url = f"{self.api_url}/repos/{owner}/{repo_name}/commits"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
if response.status_code == 409:
|
|
# 409 indicates no commits (empty repository)
|
|
logger.debug(f"No commits found for {owner}/{repo_name}.")
|
|
return False
|
|
elif response.status_code == 200:
|
|
logger.debug(f"Commits found for {owner}/{repo_name}.")
|
|
return True
|
|
else:
|
|
logger.warning(f"Unexpected status code checking commits for {owner}/{repo_name}: {response.status_code}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during checking commits for {owner}/{repo_name}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def should_delete(self, repo: Dict[str, Any]) -> bool:
|
|
"""
|
|
Determine if a repository should be deleted based on criteria.
|
|
|
|
Args:
|
|
repo (Dict[str, Any]): Repository data dictionary.
|
|
|
|
Returns:
|
|
bool: True if the repository should be deleted, False otherwise.
|
|
"""
|
|
try:
|
|
owner = repo['owner']['username']
|
|
repo_name = repo['name']
|
|
# Placeholder logic: delete if no commits
|
|
return not self.has_commits(owner, repo_name)
|
|
except Exception as e:
|
|
logger.error(f"Exception in should_delete for repo {repo}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def manage_repositories(self, dry_run: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
List repositories and delete those that meet deletion criteria.
|
|
|
|
Args:
|
|
dry_run (bool): If True, only logs actions without executing deletions.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of repositories with updated 'pending_delete' flags.
|
|
"""
|
|
repos = self.list_repositories()
|
|
if not repos:
|
|
logger.warning("No repositories found or error occurred.")
|
|
return []
|
|
for repo in repos:
|
|
try:
|
|
repo['pending_delete'] = self.should_delete(repo)
|
|
if repo['pending_delete']:
|
|
self.delete_repository(repo['owner']['username'], repo['name'], dry_run=dry_run)
|
|
except Exception as e:
|
|
logger.error(f"Exception managing repository {repo.get('name', 'unknown')}: {e}", exc_info=True)
|
|
return repos
|