"""
MIT License
Author: retoor <retoor@molodetz.nl>
This module provides a `GiteaRepoManager` class for managing Gitea repositories.
It supports authentication via username/password, token generation, listing repositories,
checking for commits, and deleting repositories with safety and logging.
Usage:
Instantiate the class with your Gitea API URL, username, and password.
Use methods like `list_repositories()`, `manage_repositories()`, etc., to perform operations.
Dependencies:
- requests
"""
import requests
import logging
from typing import Optional, List, Dict, Any
# Configure logger for the module
logger = logging.getLogger("GiteaRepoManager")
logger.setLevel(logging.INFO) # Default level; override as needed externally
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(name)s: %(message)s')
handler.setFormatter(formatter)
if not logger.hasHandlers():
logger.addHandler(handler)
class GiteaRepoManager:
"""
Manage Gitea repositories using username/password authentication.
Generates a personal access token internally for API calls.
"""
def __init__(self, api_url: str, username: str, password: str, token: str):
"""
Initialize the GiteaRepoManager with API URL and credentials.
Args:
api_url (str): Base URL of the Gitea API (e.g., 'https://gitea.example.com').
username (str): Gitea username.
password (str): Gitea password.
token (Optional[str]): Optional pre-existing token. If None, a new token is generated.
otp (Optional[str]): Optional one-time password for 2FA.
Raises:
ValueError: If token generation fails.
"""
self.api_url = api_url.rstrip('/')
self.username = username
self.password = password
self.token = token
if self.token:
self.headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
else:
raise ValueError("Token is required.")
def list_repositories(self,page=1) -> List[Dict[str, Any]]:
"""
List repositories accessible with the current token.
Returns:
List[Dict[str, Any]]: List of repository data dictionaries.
"""
url = f"{self.api_url}/repos/search?limit=500&page={page}"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
result = response.json().get('data', [])
logger.info(f"Successfully listed {len(result)} repositories from page {page}.")
if result:
return result + self.list_repositories(page+1)
if page == 1:
logger.warning(f"Error listing repositories: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during listing repositories: {e}", exc_info=True)
return []
def delete_repository(self, owner: str, repo_name: str, dry_run: bool = False) -> None:
"""
Delete a repository by owner and name.
Args:
owner (str): Username of the repository owner.
repo_name (str): Name of the repository.
dry_run (bool): If True, only logs the intended deletion without executing.
Returns:
None
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}"
if dry_run:
logger.info(f"Would delete repository: {owner}/{repo_name}")
return
try:
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
logger.info(f"Deleted repository: {owner}/{repo_name}")
else:
logger.warning(f"Error deleting repository {owner}/{repo_name}: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during deleting repository {owner}/{repo_name}: {e}", exc_info=True)
def has_commits(self, owner: str, repo_name: str) -> bool:
"""
Check if a repository has commits.
Args:
owner (str): Username of the repository owner.
repo_name (str): Repository name.
Returns:
bool: True if commits exist, False otherwise.
"""
url = f"{self.api_url}/repos/{owner}/{repo_name}/commits"
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 409:
# 409 indicates no commits (empty repository)
logger.debug(f"No commits found for {owner}/{repo_name}.")
return False
elif response.status_code == 200:
logger.debug(f"Commits found for {owner}/{repo_name}.")
return True
else:
logger.warning(f"Unexpected status code checking commits for {owner}/{repo_name}: {response.status_code}")
except requests.RequestException as e:
logger.error(f"Exception during checking commits for {owner}/{repo_name}: {e}", exc_info=True)
return False
def should_delete(self, repo: Dict[str, Any]) -> bool:
"""
Determine if a repository should be deleted based on criteria.
Args:
repo (Dict[str, Any]): Repository data dictionary.
Returns:
bool: True if the repository should be deleted, False otherwise.
"""
try:
owner = repo['owner']['username']
repo_name = repo['name']
# Placeholder logic: delete if no commits
return not self.has_commits(owner, repo_name)
except Exception as e:
logger.error(f"Exception in should_delete for repo {repo}: {e}", exc_info=True)
return False
def list_users(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
"""
List all users in the Gitea instance.
Args:
page (int): Page number for pagination.
limit (int): Number of users per page.
Returns:
List[Dict[str, Any]]: List of user data dictionaries.
"""
url = f"{self.api_url}/admin/users?page={page}&limit={limit}"
all_users = []
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
users = response.json()
if users:
all_users.extend(users)
logger.info(f"Successfully listed {len(users)} users from page {page}.")
# If we got a full page, there might be more users
if len(users) == limit:
all_users.extend(self.list_users(page + 1, limit))
else:
logger.info(f"No more users found on page {page}.")
elif response.status_code == 403:
logger.error("Access denied. Admin privileges required to list users.")
else:
logger.warning(f"Error listing users: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during listing users: {e}", exc_info=True)
return all_users
def delete_user(self, username: str, purge: bool = True, dry_run: bool = True) -> bool:
"""
Delete a user by username.
Args:
username (str): Username to delete.
purge (bool): If True, completely purge user data.
dry_run (bool): If True, only logs the intended deletion without executing.
Returns:
bool: True if deletion was successful, False otherwise.
"""
purge_param = "?purge=true" if purge else ""
url = f"{self.api_url}/admin/users/{username}{purge_param}"
if dry_run:
logger.info(f"Would delete user: {username} (purge: {purge})")
return True
try:
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
logger.info(f"Successfully deleted user: {username}")
return True
elif response.status_code == 403:
logger.error(f"Access denied when deleting user {username}. Admin privileges required.")
elif response.status_code == 404:
logger.warning(f"User {username} not found.")
else:
logger.warning(f"Error deleting user {username}: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Exception during deleting user {username}: {e}", exc_info=True)
return False
def filter_users_by_id(self, users: List[Dict[str, Any]], min_id: int) -> List[Dict[str, Any]]:
"""
Filter users with ID above the specified threshold.
Args:
users (List[Dict[str, Any]]): List of user dictionaries.
min_id (int): Minimum ID threshold (exclusive).
Returns:
List[Dict[str, Any]]: Filtered list of users with ID > min_id.
"""
filtered_users = [user for user in users if user.get('id', 0) > min_id]
logger.info(f"Filtered {len(filtered_users)} users with ID > {min_id} from {len(users)} total users.")
return filtered_users
def delete_users_above_id(self, min_id: int = 18, purge: bool = False, dry_run: bool = False) -> Dict[str, Any]:
"""
Delete all users with ID above the specified threshold.
Args:
min_id (int): Minimum ID threshold (exclusive). Default is 18.
purge (bool): If True, completely purge user data.
dry_run (bool): If True, only logs actions without executing deletions.
Returns:
Dict[str, Any]: Summary of the operation including counts and failed deletions.
"""
logger.info(f"Starting user deletion process for users with ID > {min_id} (dry_run: {dry_run})")
# Get all users
all_users = self.list_users()
if not all_users:
logger.warning("No users found or error occurred while listing users.")
return {"total_users": 0, "filtered_users": 0, "deleted_count": 0, "failed_deletions": []}
# Filter users by ID
users_to_delete = self.filter_users_by_id(all_users, min_id)
if not users_to_delete:
logger.info(f"No users found with ID > {min_id}")
return {"total_users": len(all_users), "filtered_users": 0, "deleted_count": 0, "failed_deletions": []}
# Delete filtered users
deleted_count = 0
failed_deletions = []
for user in users_to_delete:
username = user.get('login', user.get('username', 'unknown'))
user_id = user.get('id', 'unknown')
logger.info(f"Processing user: {username} (ID: {user_id})")
if self.delete_user(username, purge=purge, dry_run=dry_run):
deleted_count += 1
else:
failed_deletions.append({"username": username, "id": user_id})
result = {
"total_users": len(all_users),
"filtered_users": len(users_to_delete),
"deleted_count": deleted_count,
"failed_deletions": failed_deletions
}
logger.info(f"User deletion process completed. Deleted: {deleted_count}, Failed: {len(failed_deletions)}")
return result
def manage_repositories(self, dry_run: bool = False) -> List[Dict[str, Any]]:
"""
List repositories and delete those that meet deletion criteria.
Args:
dry_run (bool): If True, only logs actions without executing deletions.
Returns:
List[Dict[str, Any]]: List of repositories with updated 'pending_delete' flags.
"""
repos = self.list_repositories()
if not repos:
logger.warning("No repositories found or error occurred.")
return []
for repo in repos:
try:
repo['pending_delete'] = self.should_delete(repo)
if repo['pending_delete']:
self.delete_repository(repo['owner']['username'], repo['name'], dry_run=dry_run)
self.delete_user(repo['owner']['username'], purge=True, dry_run=dry_run)
except Exception as e:
logger.error(f"Exception managing repository {repo.get('name', 'unknown')}: {e}", exc_info=True)
return repos