#!/usr/bin/env python3
"""
WebDAV Server CLI Management Tool
Command-line interface for managing users, permissions, and server configuration
"""
import sys
import asyncio
import argparse
from pathlib import Path
from getpass import getpass
from datetime import datetime
import sqlite3
# Import from main application
from main import Database, Config
class WebDAVCLI:
"""Command-line interface for WebDAV server management"""
def __init__(self):
self.db = Database(Config.DB_PATH)
# ========================================================================
# User Management
# ========================================================================
async def create_user(self, username: str, password: str = None,
email: str = None, role: str = 'user'):
"""Create a new user"""
if not password:
password = getpass(f"Enter password for {username}: ")
password_confirm = getpass("Confirm password: ")
if password != password_confirm:
print("❌ Passwords do not match!")
return False
if len(password) < Config.PASSWORD_MIN_LENGTH:
print(f"❌ Password must be at least {Config.PASSWORD_MIN_LENGTH} characters!")
return False
try:
user_id = await self.db.create_user(username, password, email, role)
print(f"✅ User created successfully!")
print(f" ID: {user_id}")
print(f" Username: {username}")
print(f" Email: {email or 'N/A'}")
print(f" Role: {role}")
return True
except Exception as e:
print(f"❌ Error creating user: {e}")
return False
async def list_users(self):
"""List all users"""
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, username, email, role, is_active, created_at, last_login FROM users')
users = cursor.fetchall()
conn.close()
if not users:
print("No users found.")
return
print(f"\n{'ID':<5} {'Username':<20} {'Email':<30} {'Role':<10} {'Active':<8} {'Created':<20}")
print("=" * 100)
for user in users:
active = "" if user['is_active'] else ""
created = user['created_at'][:19] if user['created_at'] else 'N/A'
email = user['email'] or 'N/A'
print(f"{user['id']:<5} {user['username']:<20} {email:<30} {user['role']:<10} {active:<8} {created:<20}")
async def delete_user(self, username: str, force: bool = False):
"""Delete a user"""
if not force:
confirm = input(f"Are you sure you want to delete user '{username}'? (yes/no): ")
if confirm.lower() != 'yes':
print("Deletion cancelled.")
return False
try:
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM users WHERE username = ?', (username,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
if deleted:
# Remove user directory
user_dir = Path(Config.WEBDAV_ROOT) / 'users' / username
if user_dir.exists():
import shutil
shutil.rmtree(user_dir)
print(f"✅ User '{username}' deleted successfully!")
return True
else:
print(f"❌ User '{username}' not found!")
return False
except Exception as e:
print(f"❌ Error deleting user: {e}")
return False
async def change_password(self, username: str, new_password: str = None):
"""Change user password"""
if not new_password:
new_password = getpass(f"Enter new password for {username}: ")
password_confirm = getpass("Confirm new password: ")
if new_password != password_confirm:
print("❌ Passwords do not match!")
return False
if len(new_password) < Config.PASSWORD_MIN_LENGTH:
print(f"❌ Password must be at least {Config.PASSWORD_MIN_LENGTH} characters!")
return False
try:
import hashlib
import secrets
salt = secrets.token_hex(16)
password_hash = hashlib.pbkdf2_hmac('sha256', new_password.encode(), salt.encode(), 100000).hex()
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('UPDATE users SET password_hash = ?, salt = ? WHERE username = ?',
(password_hash, salt, username))
updated = cursor.rowcount > 0
conn.commit()
conn.close()
if updated:
print(f"✅ Password changed successfully for user '{username}'!")
return True
else:
print(f"❌ User '{username}' not found!")
return False
except Exception as e:
print(f"❌ Error changing password: {e}")
return False
async def activate_user(self, username: str, active: bool = True):
"""Activate or deactivate a user"""
try:
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('UPDATE users SET is_active = ? WHERE username = ?',
(1 if active else 0, username))
updated = cursor.rowcount > 0
conn.commit()
conn.close()
if updated:
status = "activated" if active else "deactivated"
print(f"✅ User '{username}' {status} successfully!")
return True
else:
print(f"❌ User '{username}' not found!")
return False
except Exception as e:
print(f"❌ Error updating user: {e}")
return False
# ========================================================================
# Lock Management
# ========================================================================
async def list_locks(self):
"""List all active locks"""
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT l.lock_token, l.resource_path, l.lock_type, l.lock_scope,
l.created_at, l.timeout_seconds, u.username
FROM locks l
LEFT JOIN users u ON l.user_id = u.id
WHERE datetime(l.created_at, '+' || l.timeout_seconds || ' seconds') > datetime('now')
''')
locks = cursor.fetchall()
conn.close()
if not locks:
print("No active locks found.")
return
print(f"\n{'Resource':<40} {'Type':<10} {'Scope':<12} {'Owner':<15} {'Created':<20}")
print("=" * 100)
for lock in locks:
created = lock['created_at'][:19] if lock['created_at'] else 'N/A'
owner = lock['username'] or 'Unknown'
print(f"{lock['resource_path']:<40} {lock['lock_type']:<10} {lock['lock_scope']:<12} {owner:<15} {created:<20}")
async def remove_lock(self, resource_path: str):
"""Remove a lock from a resource"""
try:
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM locks WHERE resource_path = ?', (resource_path,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
if deleted:
print(f"✅ Lock removed from '{resource_path}'!")
return True
else:
print(f"❌ No lock found on '{resource_path}'!")
return False
except Exception as e:
print(f"❌ Error removing lock: {e}")
return False
# ========================================================================
# Statistics and Info
# ========================================================================
async def show_stats(self):
"""Show server statistics"""
conn = self.db.get_connection()
cursor = conn.cursor()
# User statistics
cursor.execute('SELECT COUNT(*) as total, SUM(is_active) as active FROM users')
user_stats = cursor.fetchone()
# Lock statistics
cursor.execute('''
SELECT COUNT(*) as total FROM locks
WHERE datetime(created_at, '+' || timeout_seconds || ' seconds') > datetime('now')
''')
lock_stats = cursor.fetchone()
# Session statistics
cursor.execute('''
SELECT COUNT(*) as total FROM sessions
WHERE datetime(expires_at) > datetime('now')
''')
session_stats = cursor.fetchone()
# Properties statistics
cursor.execute('SELECT COUNT(*) as total FROM properties')
prop_stats = cursor.fetchone()
conn.close()
# Storage statistics
webdav_root = Path(Config.WEBDAV_ROOT)
if webdav_root.exists():
total_size = sum(f.stat().st_size for f in webdav_root.rglob('*') if f.is_file())
total_files = sum(1 for f in webdav_root.rglob('*') if f.is_file())
else:
total_size = 0
total_files = 0
print("\n" + "=" * 60)
print("WebDAV Server Statistics")
print("=" * 60)
print(f"\n📊 Users:")
print(f" Total: {user_stats['total']}")
print(f" Active: {user_stats['active']}")
print(f" Inactive: {user_stats['total'] - user_stats['active']}")
print(f"\n🔒 Locks:")
print(f" Active: {lock_stats['total']}")
print(f"\n🔑 Sessions:")
print(f" Active: {session_stats['total']}")
print(f"\n📝 Properties:")
print(f" Total: {prop_stats['total']}")
print(f"\n💾 Storage:")
print(f" Total files: {total_files:,}")
print(f" Total size: {total_size:,} bytes ({total_size / 1024 / 1024:.2f} MB)")
print("\n" + "=" * 60)
# ========================================================================
# Database Management
# ========================================================================
async def backup_database(self, output_path: str = None):
"""Backup the database"""
if not output_path:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = f"./backups/webdav_backup_{timestamp}.db"
try:
import shutil
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(Config.DB_PATH, output_path)
print(f"✅ Database backed up to: {output_path}")
return True
except Exception as e:
print(f"❌ Error backing up database: {e}")
return False
async def restore_database(self, backup_path: str):
"""Restore database from backup"""
if not Path(backup_path).exists():
print(f"❌ Backup file not found: {backup_path}")
return False
confirm = input("⚠️ This will overwrite the current database. Continue? (yes/no): ")
if confirm.lower() != 'yes':
print("Restore cancelled.")
return False
try:
import shutil
shutil.copy2(backup_path, Config.DB_PATH)
print(f"✅ Database restored from: {backup_path}")
return True
except Exception as e:
print(f"❌ Error restoring database: {e}")
return False
# ============================================================================
# CLI Command Handlers
# ============================================================================
async def main():
"""Main CLI entry point"""
parser = argparse.ArgumentParser(
description='WebDAV Server Management CLI',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s user create john --email john@example.com
%(prog)s user list
%(prog)s user delete john
%(prog)s user password john
%(prog)s lock list
%(prog)s stats
%(prog)s backup
"""
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# User commands
user_parser = subparsers.add_parser('user', help='User management')
user_subparsers = user_parser.add_subparsers(dest='user_command')
# Create user
create_parser = user_subparsers.add_parser('create', help='Create a new user')
create_parser.add_argument('username', help='Username')
create_parser.add_argument('--password', help='Password (will prompt if not provided)')
create_parser.add_argument('--email', help='Email address')
create_parser.add_argument('--role', default='user', choices=['user', 'admin'], help='User role')
# List users
user_subparsers.add_parser('list', help='List all users')
# Delete user
delete_parser = user_subparsers.add_parser('delete', help='Delete a user')
delete_parser.add_argument('username', help='Username to delete')
delete_parser.add_argument('--force', action='store_true', help='Skip confirmation')
# Change password
password_parser = user_subparsers.add_parser('password', help='Change user password')
password_parser.add_argument('username', help='Username')
password_parser.add_argument('--new-password', help='New password (will prompt if not provided)')
# Activate/deactivate user
activate_parser = user_subparsers.add_parser('activate', help='Activate a user')
activate_parser.add_argument('username', help='Username')
deactivate_parser = user_subparsers.add_parser('deactivate', help='Deactivate a user')
deactivate_parser.add_argument('username', help='Username')
# Lock commands
lock_parser = subparsers.add_parser('lock', help='Lock management')
lock_subparsers = lock_parser.add_subparsers(dest='lock_command')
lock_subparsers.add_parser('list', help='List all active locks')
remove_parser = lock_subparsers.add_parser('remove', help='Remove a lock')
remove_parser.add_argument('resource', help='Resource path')
# Statistics
subparsers.add_parser('stats', help='Show server statistics')
# Backup
backup_parser = subparsers.add_parser('backup', help='Backup database')
backup_parser.add_argument('--output', help='Output path for backup')
# Restore
restore_parser = subparsers.add_parser('restore', help='Restore database from backup')
restore_parser.add_argument('backup', help='Path to backup file')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
cli = WebDAVCLI()
# Execute command
try:
if args.command == 'user':
if args.user_command == 'create':
await cli.create_user(args.username, args.password, args.email, args.role)
elif args.user_command == 'list':
await cli.list_users()
elif args.user_command == 'delete':
await cli.delete_user(args.username, args.force)
elif args.user_command == 'password':
await cli.change_password(args.username, getattr(args, 'new_password', None))
elif args.user_command == 'activate':
await cli.activate_user(args.username, True)
elif args.user_command == 'deactivate':
await cli.activate_user(args.username, False)
else:
user_parser.print_help()
elif args.command == 'lock':
if args.lock_command == 'list':
await cli.list_locks()
elif args.lock_command == 'remove':
await cli.remove_lock(args.resource)
else:
lock_parser.print_help()
elif args.command == 'stats':
await cli.show_stats()
elif args.command == 'backup':
await cli.backup_database(args.output)
elif args.command == 'restore':
await cli.restore_database(args.backup)
except KeyboardInterrupt:
print("\n\nOperation cancelled.")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error: {e}")
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())