diff --git a/src/zhurnal/app.py b/src/zhurnal/app.py index ce60390..8e4f65b 100644 --- a/src/zhurnal/app.py +++ b/src/zhurnal/app.py @@ -2,6 +2,8 @@ import asyncio import json import shlex import time +import signal +import sys from datetime import datetime from typing import List from app.app import Application as BaseApplication @@ -204,13 +206,60 @@ class Zhurnal(BaseApplication): def __init__(self, commands: List[str], *args, **kwargs): self.commands = commands or [] self.processes = {} + self.process_tasks = {} + self.shutdown_event = asyncio.Event() super().__init__(*args, **kwargs) + + # Register signal handlers for graceful shutdown + loop = asyncio.get_event_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, self.handle_exit) + self.on_startup.append(self.start_processes) + self.on_cleanup.append(self.cleanup_processes) + self.clients = [] self.router.add_get("/ws", self.websocket_handler) self.router.add_get("/", self.index_handler) log.info("Application created") + def handle_exit(self): + """Handle application exit signals.""" + log.info("Received exit signal. Initiating graceful shutdown...") + self.shutdown_event.set() + + async def cleanup_processes(self, app): + """Cleanup all running subprocesses.""" + log.info("Cleaning up processes...") + + # Terminate all running processes + for command, process_info in list(self.processes.items()): + try: + # Get the subprocess + process = process_info.get('process') + if process and process.returncode is None: + log.info(f"Terminating process: {command}") + try: + # First try to terminate gracefully + process.terminate() + # Wait a short time for process to exit + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + # If process doesn't exit, force kill + log.warning(f"Force killing process: {command}") + process.kill() + except Exception as e: + log.error(f"Error cleaning up process {command}: {e}") + + # Cancel any running tasks + for task in list(self.process_tasks.values()): + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + async def index_handler(self, request): return web.Response(text=index_html, content_type="text/html") @@ -240,68 +289,102 @@ class Zhurnal(BaseApplication): return ws async def run_process(self, process_name, command): - process = await asyncio.create_subprocess_exec( - *shlex.split(command), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - log.info(f"Running process {process_name}: {command}") + """Run a single process with enhanced monitoring and error handling.""" + try: + # Create subprocess + process = await asyncio.create_subprocess_exec( + *shlex.split(command), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Store process information + self.processes[command] = { + 'process': process, + 'name': process_name + } + + log.info(f"Running process {process_name}: {command}") - async def read_output(app, name, process, f): - time_previous = 0 - async for line in f: - time_current = time.time() - time_elapsed = round( - time_previous and time_current - time_previous or 0, 4 - ) - decoded_line = line.decode("utf-8", "ignore").strip() - print(decoded_line) - decoded_line = "".join(c for c in decoded_line if c.isprintable()) - for client in app.clients: - await client.send_str( - json.dumps( - { - "elapsed": time_elapsed, - "timestamp": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - "line": decoded_line, - "name": name, - "command": command, - } + async def read_output(app, name, process, f, is_stderr=False): + time_previous = 0 + try: + async for line in f: + # Check if shutdown is requested + if self.shutdown_event.is_set(): + break + + time_current = time.time() + time_elapsed = round( + time_previous and time_current - time_previous or 0, 4 ) - ) - time_previous = time.time() + decoded_line = line.decode("utf-8", "ignore").strip() + print(decoded_line) + decoded_line = "".join(c for c in decoded_line if c.isprintable()) + + # Broadcast to all WebSocket clients + for client in app.clients: + await client.send_str( + json.dumps( + { + "elapsed": time_elapsed, + "timestamp": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + "line": decoded_line, + "name": name, + "command": command, + } + ) + ) + time_previous = time.time() + except Exception as e: + log.error(f"Error reading {name} output: {e}") + finally: + log.info(f"Finished reading {name} output") - await asyncio.gather( - read_output(self, f"{process_name}:stdout", command, process.stdout), - read_output(self, f"{process_name}:stderr", process, process.stderr), - ) - if process.returncode == 0: - log.info( - f"Process {process_name}:{command} exited with {process.returncode}." - ) - else: - log.error( - f"Process {process_name}:{command} exited with {process.returncode}." + # Read stdout and stderr concurrently + await asyncio.gather( + read_output(self, f"{process_name}:stdout", process, process.stdout), + read_output(self, f"{process_name}:stderr", process, process.stderr, is_stderr=True) ) + # Wait for process to complete + await process.wait() + + # Log process exit status + if process.returncode == 0: + log.info( + f"Process {process_name}:{command} exited successfully with {process.returncode}." + ) + else: + log.error( + f"Process {process_name}:{command} exited with non-zero status {process.returncode}." + ) + + except Exception as e: + log.error(f"Error running process {process_name}: {e}") + finally: + # Remove process from tracking + if command in self.processes: + del self.processes[command] + async def start_processes(self, app): + """Start all configured processes.""" for x, command in enumerate(self.commands): - self.processes[command] = asyncio.create_task( - self.run_process(f"process-{x}", command) - ) - # asyncio.create_task(asyncio.gather(*self.processes.values())) + # Create a task for each process + task = asyncio.create_task(self.run_process(f"process-{x}", command)) + self.process_tasks[command] = task def parse_args(): import argparse parser = argparse.ArgumentParser( - description="Executle proccesses and monitor trough web interface." + description="Execute processes and monitor through web interface." ) parser.add_argument( - "commands", nargs="+", help="List of files to commands to execute and monitor." + "commands", nargs="+", help="List of commands to execute and monitor." ) parser.add_argument( "--host", @@ -313,7 +396,7 @@ def parse_args(): parser.add_argument( "--port", type=int, - default=0, + default=8080, required=False, help="Port number (default: 8080).", ) @@ -338,3 +421,6 @@ def cli(): log.info(f"Host: {args.host} Port: {args.port}") app.run(host=args.host, port=args.port) + +if __name__ == "__main__": + cli()