This commit is contained in:
parent
c23e289e0c
commit
0cc2a7022c
@ -2,6 +2,8 @@ import asyncio
|
||||
import json
|
||||
import shlex
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from app.app import Application as BaseApplication
|
||||
@ -204,13 +206,60 @@ class Zhurnal(BaseApplication):
|
||||
def __init__(self, commands: List[str], *args, **kwargs):
|
||||
self.commands = commands or []
|
||||
self.processes = {}
|
||||
self.process_tasks = {}
|
||||
self.shutdown_event = asyncio.Event()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Register signal handlers for graceful shutdown
|
||||
loop = asyncio.get_event_loop()
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, self.handle_exit)
|
||||
|
||||
self.on_startup.append(self.start_processes)
|
||||
self.on_cleanup.append(self.cleanup_processes)
|
||||
|
||||
self.clients = []
|
||||
self.router.add_get("/ws", self.websocket_handler)
|
||||
self.router.add_get("/", self.index_handler)
|
||||
log.info("Application created")
|
||||
|
||||
def handle_exit(self):
|
||||
"""Handle application exit signals."""
|
||||
log.info("Received exit signal. Initiating graceful shutdown...")
|
||||
self.shutdown_event.set()
|
||||
|
||||
async def cleanup_processes(self, app):
|
||||
"""Cleanup all running subprocesses."""
|
||||
log.info("Cleaning up processes...")
|
||||
|
||||
# Terminate all running processes
|
||||
for command, process_info in list(self.processes.items()):
|
||||
try:
|
||||
# Get the subprocess
|
||||
process = process_info.get('process')
|
||||
if process and process.returncode is None:
|
||||
log.info(f"Terminating process: {command}")
|
||||
try:
|
||||
# First try to terminate gracefully
|
||||
process.terminate()
|
||||
# Wait a short time for process to exit
|
||||
await asyncio.wait_for(process.wait(), timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
# If process doesn't exit, force kill
|
||||
log.warning(f"Force killing process: {command}")
|
||||
process.kill()
|
||||
except Exception as e:
|
||||
log.error(f"Error cleaning up process {command}: {e}")
|
||||
|
||||
# Cancel any running tasks
|
||||
for task in list(self.process_tasks.values()):
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def index_handler(self, request):
|
||||
return web.Response(text=index_html, content_type="text/html")
|
||||
|
||||
@ -240,68 +289,102 @@ class Zhurnal(BaseApplication):
|
||||
return ws
|
||||
|
||||
async def run_process(self, process_name, command):
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*shlex.split(command),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
log.info(f"Running process {process_name}: {command}")
|
||||
"""Run a single process with enhanced monitoring and error handling."""
|
||||
try:
|
||||
# Create subprocess
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*shlex.split(command),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
async def read_output(app, name, process, f):
|
||||
time_previous = 0
|
||||
async for line in f:
|
||||
time_current = time.time()
|
||||
time_elapsed = round(
|
||||
time_previous and time_current - time_previous or 0, 4
|
||||
)
|
||||
decoded_line = line.decode("utf-8", "ignore").strip()
|
||||
print(decoded_line)
|
||||
decoded_line = "".join(c for c in decoded_line if c.isprintable())
|
||||
for client in app.clients:
|
||||
await client.send_str(
|
||||
json.dumps(
|
||||
{
|
||||
"elapsed": time_elapsed,
|
||||
"timestamp": datetime.now().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
"line": decoded_line,
|
||||
"name": name,
|
||||
"command": command,
|
||||
}
|
||||
# Store process information
|
||||
self.processes[command] = {
|
||||
'process': process,
|
||||
'name': process_name
|
||||
}
|
||||
|
||||
log.info(f"Running process {process_name}: {command}")
|
||||
|
||||
async def read_output(app, name, process, f, is_stderr=False):
|
||||
time_previous = 0
|
||||
try:
|
||||
async for line in f:
|
||||
# Check if shutdown is requested
|
||||
if self.shutdown_event.is_set():
|
||||
break
|
||||
|
||||
time_current = time.time()
|
||||
time_elapsed = round(
|
||||
time_previous and time_current - time_previous or 0, 4
|
||||
)
|
||||
)
|
||||
time_previous = time.time()
|
||||
decoded_line = line.decode("utf-8", "ignore").strip()
|
||||
print(decoded_line)
|
||||
decoded_line = "".join(c for c in decoded_line if c.isprintable())
|
||||
|
||||
await asyncio.gather(
|
||||
read_output(self, f"{process_name}:stdout", command, process.stdout),
|
||||
read_output(self, f"{process_name}:stderr", process, process.stderr),
|
||||
)
|
||||
if process.returncode == 0:
|
||||
log.info(
|
||||
f"Process {process_name}:{command} exited with {process.returncode}."
|
||||
)
|
||||
else:
|
||||
log.error(
|
||||
f"Process {process_name}:{command} exited with {process.returncode}."
|
||||
# Broadcast to all WebSocket clients
|
||||
for client in app.clients:
|
||||
await client.send_str(
|
||||
json.dumps(
|
||||
{
|
||||
"elapsed": time_elapsed,
|
||||
"timestamp": datetime.now().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
),
|
||||
"line": decoded_line,
|
||||
"name": name,
|
||||
"command": command,
|
||||
}
|
||||
)
|
||||
)
|
||||
time_previous = time.time()
|
||||
except Exception as e:
|
||||
log.error(f"Error reading {name} output: {e}")
|
||||
finally:
|
||||
log.info(f"Finished reading {name} output")
|
||||
|
||||
# Read stdout and stderr concurrently
|
||||
await asyncio.gather(
|
||||
read_output(self, f"{process_name}:stdout", process, process.stdout),
|
||||
read_output(self, f"{process_name}:stderr", process, process.stderr, is_stderr=True)
|
||||
)
|
||||
|
||||
# Wait for process to complete
|
||||
await process.wait()
|
||||
|
||||
# Log process exit status
|
||||
if process.returncode == 0:
|
||||
log.info(
|
||||
f"Process {process_name}:{command} exited successfully with {process.returncode}."
|
||||
)
|
||||
else:
|
||||
log.error(
|
||||
f"Process {process_name}:{command} exited with non-zero status {process.returncode}."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error running process {process_name}: {e}")
|
||||
finally:
|
||||
# Remove process from tracking
|
||||
if command in self.processes:
|
||||
del self.processes[command]
|
||||
|
||||
async def start_processes(self, app):
|
||||
"""Start all configured processes."""
|
||||
for x, command in enumerate(self.commands):
|
||||
self.processes[command] = asyncio.create_task(
|
||||
self.run_process(f"process-{x}", command)
|
||||
)
|
||||
# asyncio.create_task(asyncio.gather(*self.processes.values()))
|
||||
# Create a task for each process
|
||||
task = asyncio.create_task(self.run_process(f"process-{x}", command))
|
||||
self.process_tasks[command] = task
|
||||
|
||||
|
||||
def parse_args():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Executle proccesses and monitor trough web interface."
|
||||
description="Execute processes and monitor through web interface."
|
||||
)
|
||||
parser.add_argument(
|
||||
"commands", nargs="+", help="List of files to commands to execute and monitor."
|
||||
"commands", nargs="+", help="List of commands to execute and monitor."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
@ -313,7 +396,7 @@ def parse_args():
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=0,
|
||||
default=8080,
|
||||
required=False,
|
||||
help="Port number (default: 8080).",
|
||||
)
|
||||
@ -338,3 +421,6 @@ def cli():
|
||||
log.info(f"Host: {args.host} Port: {args.port}")
|
||||
|
||||
app.run(host=args.host, port=args.port)
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
|
Loading…
Reference in New Issue
Block a user