|
"""
|
|
MIT License
|
|
|
|
Author: retoor <retoor@molodetz.nl>
|
|
|
|
This module provides a `GiteaRepoManager` class for managing Gitea repositories.
|
|
It supports authentication via username/password, token generation, listing repositories,
|
|
checking for commits, and deleting repositories with safety and logging.
|
|
|
|
Usage:
|
|
Instantiate the class with your Gitea API URL, username, and password.
|
|
Use methods like `list_repositories()`, `manage_repositories()`, etc., to perform operations.
|
|
|
|
Dependencies:
|
|
- requests
|
|
|
|
"""
|
|
|
|
import requests
|
|
import logging
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
# Configure logger for the module
|
|
logger = logging.getLogger("GiteaRepoManager")
|
|
logger.setLevel(logging.INFO) # Default level; override as needed externally
|
|
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(name)s: %(message)s')
|
|
handler.setFormatter(formatter)
|
|
if not logger.hasHandlers():
|
|
logger.addHandler(handler)
|
|
|
|
class GiteaRepoManager:
|
|
"""
|
|
Manage Gitea repositories using username/password authentication.
|
|
Generates a personal access token internally for API calls.
|
|
"""
|
|
|
|
def __init__(self, api_url: str, username: str, password: str, token: str):
|
|
"""
|
|
Initialize the GiteaRepoManager with API URL and credentials.
|
|
|
|
Args:
|
|
api_url (str): Base URL of the Gitea API (e.g., 'https://gitea.example.com').
|
|
username (str): Gitea username.
|
|
password (str): Gitea password.
|
|
token (Optional[str]): Optional pre-existing token. If None, a new token is generated.
|
|
otp (Optional[str]): Optional one-time password for 2FA.
|
|
|
|
Raises:
|
|
ValueError: If token generation fails.
|
|
"""
|
|
self.api_url = api_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.token = token
|
|
if self.token:
|
|
self.headers = {
|
|
"Authorization": f"token {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
else:
|
|
raise ValueError("Token is required.")
|
|
|
|
def list_repositories(self,page=1) -> List[Dict[str, Any]]:
|
|
"""
|
|
List repositories accessible with the current token.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of repository data dictionaries.
|
|
"""
|
|
url = f"{self.api_url}/repos/search?limit=500&page={page}"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
if response.status_code == 200:
|
|
|
|
result = response.json().get('data', [])
|
|
logger.info(f"Successfully listed {len(result)} repositories from page {page}.")
|
|
if result:
|
|
return result + self.list_repositories(page+1)
|
|
if page == 1:
|
|
logger.warning(f"Error listing repositories: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during listing repositories: {e}", exc_info=True)
|
|
return []
|
|
|
|
def delete_repository(self, owner: str, repo_name: str, dry_run: bool = False) -> None:
|
|
"""
|
|
Delete a repository by owner and name.
|
|
|
|
Args:
|
|
owner (str): Username of the repository owner.
|
|
repo_name (str): Name of the repository.
|
|
dry_run (bool): If True, only logs the intended deletion without executing.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
url = f"{self.api_url}/repos/{owner}/{repo_name}"
|
|
if dry_run:
|
|
logger.info(f"Would delete repository: {owner}/{repo_name}")
|
|
return
|
|
try:
|
|
response = requests.delete(url, headers=self.headers)
|
|
if response.status_code == 204:
|
|
logger.info(f"Deleted repository: {owner}/{repo_name}")
|
|
else:
|
|
logger.warning(f"Error deleting repository {owner}/{repo_name}: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during deleting repository {owner}/{repo_name}: {e}", exc_info=True)
|
|
|
|
def has_commits(self, owner: str, repo_name: str) -> bool:
|
|
"""
|
|
Check if a repository has commits.
|
|
|
|
Args:
|
|
owner (str): Username of the repository owner.
|
|
repo_name (str): Repository name.
|
|
|
|
Returns:
|
|
bool: True if commits exist, False otherwise.
|
|
"""
|
|
url = f"{self.api_url}/repos/{owner}/{repo_name}/commits"
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
if response.status_code == 409:
|
|
# 409 indicates no commits (empty repository)
|
|
logger.debug(f"No commits found for {owner}/{repo_name}.")
|
|
return False
|
|
elif response.status_code == 200:
|
|
logger.debug(f"Commits found for {owner}/{repo_name}.")
|
|
return True
|
|
else:
|
|
logger.warning(f"Unexpected status code checking commits for {owner}/{repo_name}: {response.status_code}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during checking commits for {owner}/{repo_name}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def should_delete(self, repo: Dict[str, Any]) -> bool:
|
|
"""
|
|
Determine if a repository should be deleted based on criteria.
|
|
|
|
Args:
|
|
repo (Dict[str, Any]): Repository data dictionary.
|
|
|
|
Returns:
|
|
bool: True if the repository should be deleted, False otherwise.
|
|
"""
|
|
try:
|
|
owner = repo['owner']['username']
|
|
repo_name = repo['name']
|
|
# Placeholder logic: delete if no commits
|
|
return not self.has_commits(owner, repo_name)
|
|
except Exception as e:
|
|
logger.error(f"Exception in should_delete for repo {repo}: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def list_users(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""
|
|
List all users in the Gitea instance.
|
|
|
|
Args:
|
|
page (int): Page number for pagination.
|
|
limit (int): Number of users per page.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of user data dictionaries.
|
|
"""
|
|
url = f"{self.api_url}/admin/users?page={page}&limit={limit}"
|
|
all_users = []
|
|
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
if response.status_code == 200:
|
|
users = response.json()
|
|
if users:
|
|
all_users.extend(users)
|
|
logger.info(f"Successfully listed {len(users)} users from page {page}.")
|
|
# If we got a full page, there might be more users
|
|
if len(users) == limit:
|
|
all_users.extend(self.list_users(page + 1, limit))
|
|
else:
|
|
logger.info(f"No more users found on page {page}.")
|
|
elif response.status_code == 403:
|
|
logger.error("Access denied. Admin privileges required to list users.")
|
|
else:
|
|
logger.warning(f"Error listing users: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during listing users: {e}", exc_info=True)
|
|
|
|
return all_users
|
|
|
|
def delete_user(self, username: str, purge: bool = True, dry_run: bool = True) -> bool:
|
|
"""
|
|
Delete a user by username.
|
|
|
|
Args:
|
|
username (str): Username to delete.
|
|
purge (bool): If True, completely purge user data.
|
|
dry_run (bool): If True, only logs the intended deletion without executing.
|
|
|
|
Returns:
|
|
bool: True if deletion was successful, False otherwise.
|
|
"""
|
|
purge_param = "?purge=true" if purge else ""
|
|
url = f"{self.api_url}/admin/users/{username}{purge_param}"
|
|
|
|
if dry_run:
|
|
logger.info(f"Would delete user: {username} (purge: {purge})")
|
|
return True
|
|
|
|
try:
|
|
response = requests.delete(url, headers=self.headers)
|
|
if response.status_code == 204:
|
|
logger.info(f"Successfully deleted user: {username}")
|
|
return True
|
|
elif response.status_code == 403:
|
|
logger.error(f"Access denied when deleting user {username}. Admin privileges required.")
|
|
elif response.status_code == 404:
|
|
logger.warning(f"User {username} not found.")
|
|
else:
|
|
logger.warning(f"Error deleting user {username}: {response.status_code} - {response.text}")
|
|
except requests.RequestException as e:
|
|
logger.error(f"Exception during deleting user {username}: {e}", exc_info=True)
|
|
|
|
return False
|
|
|
|
def filter_users_by_id(self, users: List[Dict[str, Any]], min_id: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter users with ID above the specified threshold.
|
|
|
|
Args:
|
|
users (List[Dict[str, Any]]): List of user dictionaries.
|
|
min_id (int): Minimum ID threshold (exclusive).
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: Filtered list of users with ID > min_id.
|
|
"""
|
|
filtered_users = [user for user in users if user.get('id', 0) > min_id]
|
|
logger.info(f"Filtered {len(filtered_users)} users with ID > {min_id} from {len(users)} total users.")
|
|
return filtered_users
|
|
|
|
def delete_users_above_id(self, min_id: int = 18, purge: bool = False, dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Delete all users with ID above the specified threshold.
|
|
|
|
Args:
|
|
min_id (int): Minimum ID threshold (exclusive). Default is 18.
|
|
purge (bool): If True, completely purge user data.
|
|
dry_run (bool): If True, only logs actions without executing deletions.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Summary of the operation including counts and failed deletions.
|
|
"""
|
|
logger.info(f"Starting user deletion process for users with ID > {min_id} (dry_run: {dry_run})")
|
|
|
|
# Get all users
|
|
all_users = self.list_users()
|
|
if not all_users:
|
|
logger.warning("No users found or error occurred while listing users.")
|
|
return {"total_users": 0, "filtered_users": 0, "deleted_count": 0, "failed_deletions": []}
|
|
|
|
# Filter users by ID
|
|
users_to_delete = self.filter_users_by_id(all_users, min_id)
|
|
|
|
if not users_to_delete:
|
|
logger.info(f"No users found with ID > {min_id}")
|
|
return {"total_users": len(all_users), "filtered_users": 0, "deleted_count": 0, "failed_deletions": []}
|
|
|
|
# Delete filtered users
|
|
deleted_count = 0
|
|
failed_deletions = []
|
|
|
|
for user in users_to_delete:
|
|
username = user.get('login', user.get('username', 'unknown'))
|
|
user_id = user.get('id', 'unknown')
|
|
|
|
logger.info(f"Processing user: {username} (ID: {user_id})")
|
|
|
|
if self.delete_user(username, purge=purge, dry_run=dry_run):
|
|
deleted_count += 1
|
|
else:
|
|
failed_deletions.append({"username": username, "id": user_id})
|
|
|
|
result = {
|
|
"total_users": len(all_users),
|
|
"filtered_users": len(users_to_delete),
|
|
"deleted_count": deleted_count,
|
|
"failed_deletions": failed_deletions
|
|
}
|
|
|
|
logger.info(f"User deletion process completed. Deleted: {deleted_count}, Failed: {len(failed_deletions)}")
|
|
return result
|
|
|
|
|
|
def manage_repositories(self, dry_run: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
List repositories and delete those that meet deletion criteria.
|
|
|
|
Args:
|
|
dry_run (bool): If True, only logs actions without executing deletions.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: List of repositories with updated 'pending_delete' flags.
|
|
"""
|
|
repos = self.list_repositories()
|
|
if not repos:
|
|
logger.warning("No repositories found or error occurred.")
|
|
return []
|
|
for repo in repos:
|
|
try:
|
|
repo['pending_delete'] = self.should_delete(repo)
|
|
if repo['pending_delete']:
|
|
self.delete_repository(repo['owner']['username'], repo['name'], dry_run=dry_run)
|
|
self.delete_user(repo['owner']['username'], purge=True, dry_run=dry_run)
|
|
except Exception as e:
|
|
logger.error(f"Exception managing repository {repo.get('name', 'unknown')}: {e}", exc_info=True)
|
|
return repos
|