From 29d8272fc58395c991c01a372d6342a5c6c3ff2f Mon Sep 17 00:00:00 2001 From: retoor Date: Thu, 25 Sep 2025 05:21:40 +0200 Subject: [PATCH] Working, extra stats. --- rproxy.c | 861 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 655 insertions(+), 206 deletions(-) diff --git a/rproxy.c b/rproxy.c index dde70bb..5bfbe66 100644 --- a/rproxy.c +++ b/rproxy.c @@ -6,11 +6,11 @@ * Description: High-performance Reverse Proxy with Real-time Monitoring Dashboard. * Single-file C implementation using epoll - PRODUCTION READY VERSION * - * Version: 2.1 (Forwarding and State Machine Corrected) + * Version: 3.0 (Enhanced Reliability & Complete Stats) * Created: 2024 * Compiler: gcc * - * Author: Corrected Version + * Author: Production Ready Version * * To Compile: * gcc -Wall -Wextra -O2 -g -pthread cJSON.c proxy.c -o rproxy -lssl -lcrypto -lsqlite3 -lm @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include // --- Local Includes --- #include "cJSON.h" @@ -53,6 +55,7 @@ #define MAX_HEADER_SIZE 8192 #define MAX_REQUEST_LINE_SIZE 4096 #define MAX_URI_SIZE 2048 +#define CONNECTION_TIMEOUT 300 // --- Enums and Structs --- @@ -66,7 +69,9 @@ typedef enum { typedef enum { CLIENT_STATE_READING_HEADERS, // Reading the initial HTTP request CLIENT_STATE_FORWARDING, // Actively forwarding data in both directions - CLIENT_STATE_ERROR // An error occurred, connection will be closed + CLIENT_STATE_SERVING_INTERNAL, // Serving dashboard or stats + CLIENT_STATE_ERROR, // An error occurred, connection will be closed + CLIENT_STATE_CLOSING // Connection is being closed } client_state_t; struct connection_s; @@ -102,6 +107,7 @@ typedef struct { long content_length; int is_websocket; int keep_alive; + int connection_close; } http_request_t; typedef struct connection_s { @@ -117,6 +123,8 @@ typedef struct connection_s { http_request_t request; double request_start_time; time_t last_activity; + int half_closed; // For handling connection shutdown + int write_shutdown; // Write side shutdown } connection_t; typedef struct { @@ -144,6 +152,19 @@ typedef struct { int count; } network_history_deque_t; +typedef struct { + double time; + double read_mbps; + double write_mbps; +} disk_history_point_t; + +typedef struct { + disk_history_point_t *points; + int capacity; + int head; + int count; +} disk_history_deque_t; + typedef struct request_time_s { double *times; int capacity; @@ -173,10 +194,17 @@ typedef struct { history_deque_t cpu_history; history_deque_t memory_history; network_history_deque_t network_history; + disk_history_deque_t disk_history; history_deque_t throughput_history; + history_deque_t load1_history; + history_deque_t load5_history; + history_deque_t load15_history; long long last_net_sent; long long last_net_recv; + long long last_disk_read; + long long last_disk_write; double last_net_update_time; + double last_disk_update_time; vhost_stats_t *vhost_stats_head; sqlite3 *db; } system_monitor_t; @@ -188,6 +216,7 @@ system_monitor_t monitor; int epoll_fd; SSL_CTX *ssl_ctx; static int g_debug_mode = 0; +static volatile int g_shutdown = 0; // --- Function Prototypes --- @@ -211,6 +240,7 @@ vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name); void serve_dashboard(connection_t *conn); void serve_stats_api(connection_t *conn); void send_error_response(connection_t *conn, int code, const char* status, const char* body); +static void signal_handler(int sig); // --- Buffer Helpers --- @@ -236,12 +266,24 @@ static inline void buffer_free(buffer_t *buf) { buf->tail = 0; } +static inline size_t buffer_available_read(buffer_t *buf) { + return buf->tail - buf->head; +} + +static inline size_t buffer_available_write(buffer_t *buf) { + return buf->capacity - buf->tail; +} + static inline int buffer_ensure_capacity(buffer_t *buf, size_t required) { if (buf->capacity >= required) return 0; size_t new_capacity = buf->capacity; while (new_capacity < required) { new_capacity *= 2; + if (new_capacity > 1024*1024*10) { // 10MB max + log_error("Buffer size limit exceeded"); + return -1; + } } char *new_data = realloc(buf->data, new_capacity); @@ -265,6 +307,14 @@ static inline void buffer_compact(buffer_t *buf) { buf->tail = len; } +static inline void buffer_consume(buffer_t *buf, size_t bytes) { + buf->head += bytes; + if (buf->head >= buf->tail) { + buf->head = 0; + buf->tail = 0; + } +} + // --- String Utilities --- static int is_valid_http_method(const char *method) { const char *valid_methods[] = {"GET", "POST", "PUT", "DELETE", "HEAD", @@ -371,7 +421,7 @@ int load_config(const char *filename) { return 0; } - config.routes = malloc(config.route_count * sizeof(route_config_t)); + config.routes = calloc(config.route_count, sizeof(route_config_t)); if (!config.routes) { log_error("Failed to allocate memory for routes"); cJSON_Delete(root); @@ -382,7 +432,6 @@ int load_config(const char *filename) { cJSON *route_item; cJSON_ArrayForEach(route_item, proxy_array) { route_config_t *route = &config.routes[i]; - memset(route, 0, sizeof(route_config_t)); cJSON *hostname = cJSON_GetObjectItem(route_item, "hostname"); cJSON *upstream_host = cJSON_GetObjectItem(route_item, "upstream_host"); @@ -432,39 +481,56 @@ route_config_t *find_route(const char *hostname) { // // ================================================================================================= static void history_deque_init(history_deque_t *dq, int capacity) { - dq->points = malloc(sizeof(history_point_t) * capacity); + dq->points = calloc(capacity, sizeof(history_point_t)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void history_deque_push(history_deque_t *dq, double time, double value) { + if (!dq || !dq->points) return; dq->points[dq->head] = (history_point_t){ .time = time, .value = value }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void network_history_deque_init(network_history_deque_t *dq, int capacity) { - dq->points = malloc(sizeof(network_history_point_t) * capacity); + dq->points = calloc(capacity, sizeof(network_history_point_t)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void network_history_deque_push(network_history_deque_t *dq, double time, double rx, double tx) { + if (!dq || !dq->points) return; dq->points[dq->head] = (network_history_point_t){ .time = time, .rx_kbps = rx, .tx_kbps = tx }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } +static void disk_history_deque_init(disk_history_deque_t *dq, int capacity) { + dq->points = calloc(capacity, sizeof(disk_history_point_t)); + dq->capacity = capacity; + dq->head = 0; + dq->count = 0; +} + +static void disk_history_deque_push(disk_history_deque_t *dq, double time, double read_mbps, double write_mbps) { + if (!dq || !dq->points) return; + dq->points[dq->head] = (disk_history_point_t){ .time = time, .read_mbps = read_mbps, .write_mbps = write_mbps }; + dq->head = (dq->head + 1) % dq->capacity; + if (dq->count < dq->capacity) dq->count++; +} + static void request_time_deque_init(request_time_deque_t *dq, int capacity) { - dq->times = malloc(sizeof(double) * capacity); + dq->times = calloc(capacity, sizeof(double)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void request_time_deque_push(request_time_deque_t *dq, double time_ms) { + if (!dq || !dq->times) return; dq->times[dq->head] = time_ms; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; @@ -537,7 +603,11 @@ void monitor_init(const char *db_file) { history_deque_init(&monitor.cpu_history, HISTORY_SECONDS); history_deque_init(&monitor.memory_history, HISTORY_SECONDS); network_history_deque_init(&monitor.network_history, HISTORY_SECONDS); + disk_history_deque_init(&monitor.disk_history, HISTORY_SECONDS); history_deque_init(&monitor.throughput_history, HISTORY_SECONDS); + history_deque_init(&monitor.load1_history, HISTORY_SECONDS); + history_deque_init(&monitor.load5_history, HISTORY_SECONDS); + history_deque_init(&monitor.load15_history, HISTORY_SECONDS); if (sqlite3_open(db_file, &monitor.db) != SQLITE_OK) { fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(monitor.db)); @@ -571,7 +641,11 @@ void monitor_cleanup() { if (monitor.cpu_history.points) free(monitor.cpu_history.points); if (monitor.memory_history.points) free(monitor.memory_history.points); if (monitor.network_history.points) free(monitor.network_history.points); + if (monitor.disk_history.points) free(monitor.disk_history.points); if (monitor.throughput_history.points) free(monitor.throughput_history.points); + if (monitor.load1_history.points) free(monitor.load1_history.points); + if (monitor.load5_history.points) free(monitor.load5_history.points); + if (monitor.load15_history.points) free(monitor.load15_history.points); } static void save_stats_to_db() { @@ -639,7 +713,7 @@ static void get_memory_usage(double *used_gb) { *used_gb = 0; return; } - *used_gb = (double)(info.totalram - info.freeram) * info.mem_unit / (1024.0 * 1024.0 * 1024.0); + *used_gb = (double)(info.totalram - info.freeram - info.bufferram) * info.mem_unit / (1024.0 * 1024.0 * 1024.0); } static void get_network_stats(long long *bytes_sent, long long *bytes_recv) { @@ -663,7 +737,9 @@ static void get_network_stats(long long *bytes_sent, long long *bytes_recv) { char iface[32]; long long r, t; if (sscanf(line, "%31[^:]: %lld %*d %*d %*d %*d %*d %*d %*d %lld", iface, &r, &t) == 3) { - if (strncmp(iface, " lo", 3) != 0) { + char *trimmed = iface; + while (*trimmed == ' ') trimmed++; + if (strcmp(trimmed, "lo") != 0) { total_recv += r; total_sent += t; } @@ -674,15 +750,75 @@ static void get_network_stats(long long *bytes_sent, long long *bytes_recv) { *bytes_recv = total_recv; } +static void get_disk_stats(long long *sectors_read, long long *sectors_written) { + FILE *f = fopen("/proc/diskstats", "r"); + if (!f) { + *sectors_read = 0; + *sectors_written = 0; + return; + } + +char line[2048]; + long long total_read = 0, total_written = 0; + while (fgets(line, sizeof(line), f)) { + char device[64]; + long long sectors_r = 0, sectors_w = 0; + int nfields = 0; + // Format: major minor device reads_completed reads_merged sectors_read ... writes_completed writes_merged sectors_written + // Read all fields as strings, then convert needed ones + char major[16], minor[16], dev[64]; + char rc[32], rm[32], sr[32], rtm[32], rtm2[32], wc[32], wm[32], sw[32]; + nfields = sscanf(line, "%15s %15s %63s %31s %31s %31s %31s %31s %31s %31s %31s %31s", + major, minor, dev, rc, rm, sr, rtm, rtm2, wc, wm, sw, sw); + if (nfields >= 11) { + strncpy(device, dev, sizeof(device)-1); + device[sizeof(device)-1] = '\0'; + sectors_r = atoll(sr); + sectors_w = atoll(sw); + // Only count physical devices (sda, nvme0n1, etc), not partitions + if (strncmp(device, "loop", 4) != 0 && strncmp(device, "ram", 3) != 0) { + // Check if it's a main device (no numbers at the end for sd* devices) + int len = strlen(device); + if ((strncmp(device, "sd", 2) == 0 && len == 3) || + (strncmp(device, "nvme", 4) == 0 && strstr(device, "n1p") == NULL) || + (strncmp(device, "vd", 2) == 0 && len == 3) || + (strncmp(device, "hd", 2) == 0 && len == 3)) { + total_read += sectors_r; + total_written += sectors_w; + } + } + } + } + fclose(f); + *sectors_read = total_read; + *sectors_written = total_written; +} + +static void get_load_averages(double *load1, double *load5, double *load15) { + FILE *f = fopen("/proc/loadavg", "r"); + if (!f) { + *load1 = *load5 = *load15 = 0.0; + return; + } + + if (fscanf(f, "%lf %lf %lf", load1, load5, load15) != 3) { + *load1 = *load5 = *load15 = 0.0; + } + fclose(f); +} + void monitor_update() { double current_time = time(NULL); + // CPU usage history_deque_push(&monitor.cpu_history, current_time, get_cpu_usage()); + // Memory usage double mem_used_gb; get_memory_usage(&mem_used_gb); history_deque_push(&monitor.memory_history, current_time, mem_used_gb); + // Network stats long long net_sent, net_recv; get_network_stats(&net_sent, &net_recv); double time_delta = current_time - monitor.last_net_update_time; @@ -696,6 +832,28 @@ void monitor_update() { monitor.last_net_recv = net_recv; monitor.last_net_update_time = current_time; + // Disk I/O stats + long long disk_read, disk_write; + get_disk_stats(&disk_read, &disk_write); + double disk_time_delta = current_time - monitor.last_disk_update_time; + if (disk_time_delta > 0 && monitor.last_disk_update_time > 0) { + // Convert sectors to MB/s (sector = 512 bytes typically) + double read_mbps = (disk_read - monitor.last_disk_read) * 512.0 / disk_time_delta / (1024.0 * 1024.0); + double write_mbps = (disk_write - monitor.last_disk_write) * 512.0 / disk_time_delta / (1024.0 * 1024.0); + disk_history_deque_push(&monitor.disk_history, current_time, fmax(0, read_mbps), fmax(0, write_mbps)); + } + monitor.last_disk_read = disk_read; + monitor.last_disk_write = disk_write; + monitor.last_disk_update_time = current_time; + + // Load averages + double load1, load5, load15; + get_load_averages(&load1, &load5, &load15); + history_deque_push(&monitor.load1_history, current_time, load1); + history_deque_push(&monitor.load5_history, current_time, load5); + history_deque_push(&monitor.load15_history, current_time, load15); + + // VHost stats for (vhost_stats_t *s = monitor.vhost_stats_head; s != NULL; s = s->next) { double vhost_delta = current_time - s->last_update; if (vhost_delta >= 1.0) { @@ -740,23 +898,29 @@ vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name) { void monitor_record_request_start(vhost_stats_t *stats, int is_websocket) { if (!stats) return; if (is_websocket) { - stats->websocket_requests++; + __sync_fetch_and_add(&stats->websocket_requests, 1); } else { - stats->http_requests++; + __sync_fetch_and_add(&stats->http_requests, 1); } - stats->total_requests++; + __sync_fetch_and_add(&stats->total_requests, 1); } void monitor_record_request_end(vhost_stats_t *stats, double start_time) { - if (!stats) return; + if (!stats || start_time <= 0) return; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &end_time); double duration_ms = ((end_time.tv_sec + end_time.tv_nsec / 1e9) - start_time) * 1000.0; - if (duration_ms >= 0) { + if (duration_ms >= 0 && duration_ms < 60000) { // Sanity check: < 60 seconds request_time_deque_push(&stats->request_times, duration_ms); } } +void monitor_record_bytes(vhost_stats_t *stats, long long sent, long long recv) { + if (!stats) return; + __sync_fetch_and_add(&stats->bytes_sent, sent); + __sync_fetch_and_add(&stats->bytes_recv, recv); +} + // ================================================================================================= // // Dashboard Implementation @@ -770,21 +934,22 @@ const char *DASHBOARD_HTML = " \n" " \n" "\n" @@ -802,6 +967,18 @@ const char *DASHBOARD_HTML = "
0
\n" "
CPU %
\n" " \n" +"
\n" +"
0.00
\n" +"
Load 1m
\n" +"
\n" +"
\n" +"
0.00
\n" +"
Load 5m
\n" +"
\n" +"
\n" +"
0.00
\n" +"
Load 15m
\n" +"
\n" " \n" "\n" "
\n" @@ -823,6 +1000,25 @@ const char *DASHBOARD_HTML = " \n" "
\n" "\n" +"
\n" +"
Disk I/O
\n" +"
\n" +"
Read
\n" +"
Write
\n" +"
\n" +" \n" +"
\n" +"\n" +"
\n" +"
Load Average
\n" +"
\n" +"
1 min
\n" +"
5 min
\n" +"
15 min
\n" +"
\n" +" \n" +"
\n" +"\n" "
\n" " \n" " \n" @@ -855,6 +1051,11 @@ const char *DASHBOARD_HTML = " return `${value.toFixed(0)} KB/s`;\n" " };\n" "\n" +" const formatDiskTick = (value) => {\n" +" if (value >= 1024) return `${(value / 1024).toFixed(1)} GB/s`;\n" +" return `${value.toFixed(1)} MB/s`;\n" +" };\n" +"\n" " const formatBytes = (bytes) => {\n" " if (bytes === 0) return '0 B';\n" " const k = 1024;\n" @@ -863,7 +1064,7 @@ const char *DASHBOARD_HTML = " return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];\n" " };\n" "\n" -" const createBaseChartOptions = (historySeconds) => ({\n" +" const createBaseChartOptions = (historySeconds, yTickCallback) => ({\n" " responsive: true,\n" " maintainAspectRatio: false,\n" " animation: false,\n" @@ -871,7 +1072,7 @@ const char *DASHBOARD_HTML = " interaction: { mode: 'nearest', axis: 'x', intersect: false },\n" " scales: {\n" " x: { type: 'linear', display: true, grid: { color: '#2a2e3e' }, ticks: { color: '#666', maxTicksLimit: 7, callback: formatTimeTick }, min: -historySeconds * 1000, max: 0 },\n" -" y: { display: true, grid: { color: '#2a2e3e' }, ticks: { color: '#666', beginAtZero: true } }\n" +" y: { display: true, grid: { color: '#2a2e3e' }, ticks: { color: '#666', beginAtZero: true, callback: yTickCallback } }\n" " },\n" " plugins: { legend: { display: false }, tooltip: { displayColors: false } },\n" " elements: { point: { radius: 0 }, line: { borderWidth: 2, tension: 0.4, fill: true } }\n" @@ -880,13 +1081,13 @@ const char *DASHBOARD_HTML = " const cpuChart = new Chart(document.getElementById('cpuChart'), {\n" " type: 'line',\n" " data: { datasets: [{ label: 'CPU %', data: [], borderColor: '#f39c12', backgroundColor: 'rgba(243, 156, 18, 0.1)' }] },\n" -" options: { ...createBaseChartOptions(300), scales: { ...createBaseChartOptions(300).scales, y: { ...createBaseChartOptions(300).scales.y, max: 100, ticks: { ...createBaseChartOptions(300).scales.y.ticks, callback: v => `${v}%` } } } }\n" +" options: { ...createBaseChartOptions(300, v => `${v}%`), scales: { ...createBaseChartOptions(300).scales, y: { ...createBaseChartOptions(300).scales.y, max: 100, ticks: { ...createBaseChartOptions(300).scales.y.ticks, callback: v => `${v}%` } } } }\n" " });\n" "\n" " const memChart = new Chart(document.getElementById('memChart'), {\n" " type: 'line',\n" " data: { datasets: [{ label: 'Memory GB', data: [], borderColor: '#e74c3c', backgroundColor: 'rgba(231, 76, 60, 0.1)' }] },\n" -" options: { ...createBaseChartOptions(300), scales: { ...createBaseChartOptions(300).scales, y: { ...createBaseChartOptions(300).scales.y, ticks: { ...createBaseChartOptions(300).scales.y.ticks, callback: v => `${v.toFixed(2)} GiB` } } } }\n" +" options: createBaseChartOptions(300, v => `${v.toFixed(2)} GiB`)\n" " });\n" "\n" " const netChart = new Chart(document.getElementById('netChart'), {\n" @@ -897,7 +1098,30 @@ const char *DASHBOARD_HTML = " { label: 'TX KB/s', data: [], borderColor: '#2ecc71', backgroundColor: 'rgba(46, 204, 113, 0.1)' }\n" " ]\n" " },\n" -" options: { ...createBaseChartOptions(300), scales: { ...createBaseChartOptions(300).scales, y: { ...createBaseChartOptions(300).scales.y, ticks: { ...createBaseChartOptions(300).scales.y.ticks, callback: formatSizeTick } } } }\n" +" options: createBaseChartOptions(300, formatSizeTick)\n" +" });\n" +"\n" +" const diskChart = new Chart(document.getElementById('diskChart'), {\n" +" type: 'line',\n" +" data: {\n" +" datasets: [\n" +" { label: 'Read MB/s', data: [], borderColor: '#9b59b6', backgroundColor: 'rgba(155, 89, 182, 0.1)' },\n" +" { label: 'Write MB/s', data: [], borderColor: '#e67e22', backgroundColor: 'rgba(230, 126, 34, 0.1)' }\n" +" ]\n" +" },\n" +" options: createBaseChartOptions(300, formatDiskTick)\n" +" });\n" +"\n" +" const loadChart = new Chart(document.getElementById('loadChart'), {\n" +" type: 'line',\n" +" data: {\n" +" datasets: [\n" +" { label: 'Load 1m', data: [], borderColor: '#e74c3c', backgroundColor: 'rgba(231, 76, 60, 0.1)' },\n" +" { label: 'Load 5m', data: [], borderColor: '#f39c12', backgroundColor: 'rgba(243, 156, 18, 0.1)' },\n" +" { label: 'Load 15m', data: [], borderColor: '#3498db', backgroundColor: 'rgba(52, 152, 219, 0.1)' }\n" +" ]\n" +" },\n" +" options: createBaseChartOptions(300, v => v.toFixed(2))\n" " });\n" "\n" " window.vhostCharts = {};\n" @@ -911,14 +1135,25 @@ const char *DASHBOARD_HTML = " document.getElementById('connections').textContent = data.current.active_connections;\n" " document.getElementById('memory').textContent = data.current.memory_gb + ' GiB';\n" " document.getElementById('cpu').textContent = data.current.cpu_percent + '%';\n" +" document.getElementById('load1').textContent = data.current.load_1m.toFixed(2);\n" +" document.getElementById('load5').textContent = data.current.load_5m.toFixed(2);\n" +" document.getElementById('load15').textContent = data.current.load_15m.toFixed(2);\n" "\n" " cpuChart.data.datasets[0].data = data.cpu_history;\n" " memChart.data.datasets[0].data = data.memory_history;\n" " netChart.data.datasets[0].data = data.network_rx_history;\n" " netChart.data.datasets[1].data = data.network_tx_history;\n" +" diskChart.data.datasets[0].data = data.disk_read_history;\n" +" diskChart.data.datasets[1].data = data.disk_write_history;\n" +" loadChart.data.datasets[0].data = data.load1_history;\n" +" loadChart.data.datasets[1].data = data.load5_history;\n" +" loadChart.data.datasets[2].data = data.load15_history;\n" +"\n" " cpuChart.update('none');\n" " memChart.update('none');\n" " netChart.update('none');\n" +" diskChart.update('none');\n" +" loadChart.update('none');\n" "\n" " const tbody = document.getElementById('processTable');\n" " const processNames = data.processes.map(p => p.name).join(',');\n" @@ -951,11 +1186,10 @@ const char *DASHBOARD_HTML = " { border: '#9b59b6', bg: 'rgba(155, 89, 182, 0.1)' }, { border: '#1abc9c', bg: 'rgba(26, 188, 156, 0.1)' }\n" " ];\n" " const color = colors[index % colors.length];\n" -" const options = { ...createBaseChartOptions(60), scales: { ...createBaseChartOptions(60).scales, y: { ...createBaseChartOptions(60).scales.y, ticks: { ...createBaseChartOptions(60).scales.y.ticks, callback: formatSizeTick } } } };\n" " window.vhostCharts[chartId] = new Chart(canvas.getContext('2d'), {\n" " type: 'line',\n" " data: { datasets: [{ label: 'Throughput KB/s', data: p.throughput_history || [], borderColor: color.border, backgroundColor: color.bg }] },\n" -" options: options\n" +" options: createBaseChartOptions(60, formatSizeTick)\n" " });\n" " } else {\n" " window.vhostCharts[chartId].data.datasets[0].data = p.throughput_history || [];\n" @@ -973,27 +1207,31 @@ const char *DASHBOARD_HTML = "\n" "\n" ; + void serve_dashboard(connection_t *conn) { if (!conn) return; + size_t content_len = strlen(DASHBOARD_HTML); char header[512]; int len = snprintf(header, sizeof(header), "HTTP/1.1 200 OK\r\n" "Content-Type: text/html; charset=utf-8\r\n" "Content-Length: %zu\r\n" - "Connection: keep-alive\r\n" + "Connection: %s\r\n" "Cache-Control: no-cache\r\n" - "\r\n", strlen(DASHBOARD_HTML)); + "\r\n", + content_len, + conn->request.keep_alive ? "keep-alive" : "close"); - if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + len + strlen(DASHBOARD_HTML)) < 0) { + if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + len + content_len) < 0) { send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } memcpy(conn->write_buf.data + conn->write_buf.tail, header, len); conn->write_buf.tail += len; - memcpy(conn->write_buf.data + conn->write_buf.tail, DASHBOARD_HTML, strlen(DASHBOARD_HTML)); - conn->write_buf.tail += strlen(DASHBOARD_HTML); + memcpy(conn->write_buf.data + conn->write_buf.tail, DASHBOARD_HTML, content_len); + conn->write_buf.tail += content_len; struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); @@ -1043,6 +1281,28 @@ static cJSON* format_network_history(network_history_deque_t *dq, int window_sec return arr; } +static cJSON* format_disk_history(disk_history_deque_t *dq, int window_seconds, const char *key) { + cJSON *arr = cJSON_CreateArray(); + if (!arr || !dq || !dq->points || !key || dq->count == 0) return arr; + + double current_time = time(NULL); + int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; + + for (int i = 0; i < dq->count; ++i) { + int current_index = (start_index + i) % dq->capacity; + disk_history_point_t *p = &dq->points[current_index]; + if ((current_time - p->time) <= window_seconds) { + cJSON *pt = cJSON_CreateObject(); + if (pt) { + cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); + cJSON_AddNumberToObject(pt, "y", strcmp(key, "read_mbps") == 0 ? p->read_mbps : p->write_mbps); + cJSON_AddItemToArray(arr, pt); + } + } + } + return arr; +} + void serve_stats_api(connection_t *conn) { if (!conn) return; @@ -1062,6 +1322,7 @@ void serve_stats_api(connection_t *conn) { char buffer[64]; double last_cpu = 0, last_mem = 0; + double load1 = 0, load5 = 0, load15 = 0; if (monitor.cpu_history.count > 0) { int last_idx = (monitor.cpu_history.head - 1 + monitor.cpu_history.capacity) % monitor.cpu_history.capacity; @@ -1073,17 +1334,38 @@ void serve_stats_api(connection_t *conn) { last_mem = monitor.memory_history.points[last_idx].value; } + if (monitor.load1_history.count > 0) { + int idx = (monitor.load1_history.head - 1 + monitor.load1_history.capacity) % monitor.load1_history.capacity; + load1 = monitor.load1_history.points[idx].value; + } + if (monitor.load5_history.count > 0) { + int idx = (monitor.load5_history.head - 1 + monitor.load5_history.capacity) % monitor.load5_history.capacity; + load5 = monitor.load5_history.points[idx].value; + } + if (monitor.load15_history.count > 0) { + int idx = (monitor.load15_history.head - 1 + monitor.load15_history.capacity) % monitor.load15_history.capacity; + load15 = monitor.load15_history.points[idx].value; + } + snprintf(buffer, sizeof(buffer), "%.2f", last_cpu); cJSON_AddStringToObject(current, "cpu_percent", buffer); snprintf(buffer, sizeof(buffer), "%.2f", last_mem); cJSON_AddStringToObject(current, "memory_gb", buffer); cJSON_AddNumberToObject(current, "active_connections", monitor.active_connections); + cJSON_AddNumberToObject(current, "load_1m", load1); + cJSON_AddNumberToObject(current, "load_5m", load5); + cJSON_AddNumberToObject(current, "load_15m", load15); cJSON_AddItemToObject(root, "cpu_history", format_history(&monitor.cpu_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "memory_history", format_history(&monitor.memory_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "network_rx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "rx_kbps")); cJSON_AddItemToObject(root, "network_tx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "tx_kbps")); + cJSON_AddItemToObject(root, "disk_read_history", format_disk_history(&monitor.disk_history, HISTORY_SECONDS, "read_mbps")); + cJSON_AddItemToObject(root, "disk_write_history", format_disk_history(&monitor.disk_history, HISTORY_SECONDS, "write_mbps")); cJSON_AddItemToObject(root, "throughput_history", format_history(&monitor.throughput_history, HISTORY_SECONDS)); + cJSON_AddItemToObject(root, "load1_history", format_history(&monitor.load1_history, HISTORY_SECONDS)); + cJSON_AddItemToObject(root, "load5_history", format_history(&monitor.load5_history, HISTORY_SECONDS)); + cJSON_AddItemToObject(root, "load15_history", format_history(&monitor.load15_history, HISTORY_SECONDS)); cJSON *processes = cJSON_CreateArray(); if (processes) { @@ -1116,9 +1398,11 @@ void serve_stats_api(connection_t *conn) { "HTTP/1.1 200 OK\r\n" "Content-Type: application/json; charset=utf-8\r\n" "Content-Length: %zu\r\n" - "Connection: keep-alive\r\n" + "Connection: %s\r\n" "Cache-Control: no-cache\r\n" - "\r\n", strlen(json_string)); + "\r\n", + strlen(json_string), + conn->request.keep_alive ? "keep-alive" : "close"); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + hlen + strlen(json_string)) < 0) { free(json_string); @@ -1184,6 +1468,7 @@ static int find_header_value(const char* data, size_t len, const char* name, cha int parse_http_request(const char *data, size_t len, http_request_t *req) { memset(req, 0, sizeof(http_request_t)); req->content_length = -1; + req->keep_alive = 1; // HTTP/1.1 default // Find the end of the request line const char *line_end = memchr(data, '\n', len); @@ -1224,6 +1509,11 @@ int parse_http_request(const char *data, size_t len, http_request_t *req) { memcpy(req->version, version_start, version_len); req->version[version_len] = '\0'; + // HTTP/1.0 defaults to close + if (strncmp(req->version, "HTTP/1.0", 8) == 0) { + req->keep_alive = 0; + } + // Parse headers const char *headers_start = line_end + 1; char value[1024]; @@ -1238,8 +1528,14 @@ int parse_http_request(const char *data, size_t len, http_request_t *req) { } if (find_header_value(headers_start, len - (headers_start - data), "Connection", value, sizeof(value))) { - if (strcasecmp(value, "keep-alive") == 0) req->keep_alive = 1; - if (strcasecmp(value, "upgrade") == 0) req->is_websocket = 1; + if (strcasecmp(value, "close") == 0) { + req->keep_alive = 0; + req->connection_close = 1; + } else if (strcasecmp(value, "keep-alive") == 0) { + req->keep_alive = 1; + } else if (strcasecmp(value, "upgrade") == 0) { + req->is_websocket = 1; + } } if (find_header_value(headers_start, len - (headers_start - data), "Upgrade", value, sizeof(value))) { @@ -1259,7 +1555,7 @@ void send_error_response(connection_t *conn, int code, const char* status, const "Content-Type: text/plain; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: close\r\n" - "Server: ReverseProxy/2.1\r\n" + "Server: ReverseProxy/3.0\r\n" "\r\n" "%s", code, status, strlen(body), body); @@ -1275,6 +1571,7 @@ void send_error_response(connection_t *conn, int code, const char* status, const } conn->state = CLIENT_STATE_ERROR; + conn->request.keep_alive = 0; // Force close after error } // ================================================================================================= @@ -1290,6 +1587,17 @@ static void set_non_blocking(int fd) { } } +static void set_tcp_keepalive(int fd) { + int yes = 1; + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); + int idle = 60; + setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)); + int interval = 10; + setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)); + int maxpkt = 6; + setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &maxpkt, sizeof(maxpkt)); +} + static void add_to_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { @@ -1301,9 +1609,9 @@ static void add_to_epoll(int fd, uint32_t events) { static void modify_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) == -1) { - // This can happen if fd was closed, log it but don't close again + // This can happen if fd was closed if(errno != EBADF && errno != ENOENT) { - log_debug("epoll_ctl_mod failed for fd %d", fd); + log_debug("epoll_ctl_mod failed for fd %d: %s", fd, strerror(errno)); } } } @@ -1350,54 +1658,58 @@ void setup_listener_socket(int port) { } void accept_new_connection(int listener_fd) { - struct sockaddr_in client_addr; - socklen_t client_len = sizeof(client_addr); + while (1) { + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); - int client_fd = accept(listener_fd, (struct sockaddr*)&client_addr, &client_len); - if (client_fd == -1) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - log_error("accept failed"); + int client_fd = accept(listener_fd, (struct sockaddr*)&client_addr, &client_len); + if (client_fd == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + log_error("accept failed"); + } + break; } - return; + + if (client_fd >= MAX_FDS) { + log_error("Connection fd too high, closing"); + close(client_fd); + continue; + } + + set_non_blocking(client_fd); + set_tcp_keepalive(client_fd); + add_to_epoll(client_fd, EPOLLIN); + + connection_t *conn = &connections[client_fd]; + memset(conn, 0, sizeof(connection_t)); + conn->type = CONN_TYPE_CLIENT; + conn->state = CLIENT_STATE_READING_HEADERS; + conn->fd = client_fd; + conn->last_activity = time(NULL); + + if (buffer_init(&conn->read_buf, CHUNK_SIZE) < 0 || + buffer_init(&conn->write_buf, CHUNK_SIZE) < 0) { + close_connection(client_fd); + continue; + } + + __sync_fetch_and_add(&monitor.active_connections, 1); + log_debug("New connection on fd %d from %s, total: %d", + client_fd, inet_ntoa(client_addr.sin_addr), monitor.active_connections); } - - if (client_fd >= MAX_FDS) { - log_error("Connection fd too high, closing"); - close(client_fd); - return; - } - - set_non_blocking(client_fd); - add_to_epoll(client_fd, EPOLLIN); - - connection_t *conn = &connections[client_fd]; - memset(conn, 0, sizeof(connection_t)); - conn->type = CONN_TYPE_CLIENT; - conn->state = CLIENT_STATE_READING_HEADERS; - conn->fd = client_fd; - conn->last_activity = time(NULL); - - if (buffer_init(&conn->read_buf, CHUNK_SIZE) < 0 || - buffer_init(&conn->write_buf, CHUNK_SIZE) < 0) { - close_connection(client_fd); - return; - } - - monitor.active_connections++; - log_debug("New connection on fd %d from %s, total: %d", - client_fd, inet_ntoa(client_addr.sin_addr), monitor.active_connections); } void close_connection(int fd) { - if (fd < 0 || fd >= MAX_FDS || connections[fd].type == CONN_TYPE_UNUSED) return; - + if (fd < 0 || fd >= MAX_FDS) return; + connection_t *conn = &connections[fd]; + if (conn->type == CONN_TYPE_UNUSED) return; // Prevent double-closing if (conn->fd == -1) return; int pair_fd = -1; - if (conn->pair) { + if (conn->pair && conn->pair->fd != -1) { pair_fd = conn->pair->fd; conn->pair->pair = NULL; } @@ -1405,12 +1717,15 @@ void close_connection(int fd) { // Record request end if needed if (conn->vhost_stats && conn->request_start_time > 0) { monitor_record_request_end(conn->vhost_stats, conn->request_start_time); - conn->request_start_time = 0; // Prevent double recording + conn->request_start_time = 0; } - // Update connection count before we invalidate the pointer + // Update connection count if (conn->type == CONN_TYPE_CLIENT) { - if(monitor.active_connections > 0) monitor.active_connections--; + int old_count = __sync_fetch_and_sub(&monitor.active_connections, 1); + if (old_count <= 0) { + monitor.active_connections = 0; + } } log_debug("Closing connection on fd %d, pair %d, remaining: %d", fd, pair_fd, monitor.active_connections); @@ -1422,6 +1737,7 @@ void close_connection(int fd) { if (conn->ssl) { SSL_shutdown(conn->ssl); SSL_free(conn->ssl); + conn->ssl = NULL; } // Close socket @@ -1432,8 +1748,9 @@ void close_connection(int fd) { buffer_free(&conn->write_buf); // Mark as unused + memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_UNUSED; - conn->fd = -1; // Mark as closed + conn->fd = -1; // Close the paired connection if (pair_fd != -1) { @@ -1446,7 +1763,7 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len route_config_t *route = find_route(client->request.host); if (!route) { - send_error_response(client, 502, "Bad Gateway", "Route not found after parsing"); + send_error_response(client, 502, "Bad Gateway", "No route configured for this host"); return; } @@ -1467,7 +1784,7 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len addr.sin_family = AF_INET; addr.sin_port = htons(route->upstream_port); - // Try to resolve hostname + // Resolve hostname if (inet_pton(AF_INET, route->upstream_host, &addr.sin_addr) <= 0) { struct hostent *he = gethostbyname(route->upstream_host); if (!he) { @@ -1479,6 +1796,7 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len } set_non_blocking(up_fd); + set_tcp_keepalive(up_fd); int connect_result = connect(up_fd, (struct sockaddr*)&addr, sizeof(addr)); if (connect_result < 0 && errno != EINPROGRESS) { @@ -1501,10 +1819,11 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len if (buffer_init(&up->read_buf, CHUNK_SIZE) < 0 || buffer_init(&up->write_buf, CHUNK_SIZE) < 0) { - close_connection(client->fd); // Close the whole pair + close_connection(client->fd); return; } + // Prepare data to send char *data_to_send = (char*)data; size_t len_to_send = data_len; char *modified_request = NULL; @@ -1514,7 +1833,7 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len const char *old_host_header_start = NULL; const char *old_host_header_end = NULL; - // Find the old host header to replace it accurately + // Find the old host header const char *current = data; const char *end = data + data_len; while(current < end) { @@ -1529,22 +1848,28 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len } if (old_host_header_start) { - snprintf(new_host_header, sizeof(new_host_header), "Host: %s:%d\r\n", route->upstream_host, route->upstream_port); + if (route->upstream_port == 80 || route->upstream_port == 443) { + snprintf(new_host_header, sizeof(new_host_header), "Host: %s\r\n", route->upstream_host); + } else { + snprintf(new_host_header, sizeof(new_host_header), "Host: %s:%d\r\n", route->upstream_host, route->upstream_port); + } size_t new_host_len = strlen(new_host_header); size_t old_host_len = old_host_header_end - old_host_header_start; len_to_send = data_len - old_host_len + new_host_len; - modified_request = malloc(len_to_send); + modified_request = malloc(len_to_send + 1); if (modified_request) { char* p = modified_request; // Copy part before Host header - memcpy(p, data, old_host_header_start - data); - p += old_host_header_start - data; + size_t prefix_len = old_host_header_start - data; + memcpy(p, data, prefix_len); + p += prefix_len; // Copy new Host header memcpy(p, new_host_header, new_host_len); p += new_host_len; // Copy part after Host header - memcpy(p, old_host_header_end, data_len - (old_host_header_end - data)); + size_t suffix_len = data_len - (old_host_header_end - data); + memcpy(p, old_host_header_end, suffix_len); data_to_send = modified_request; } } @@ -1563,7 +1888,6 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len up->ssl = SSL_new(ssl_ctx); if (!up->ssl) { close_connection(client->fd); - send_error_response(client, 502, "Bad Gateway", "SSL initialization failed"); return; } @@ -1572,8 +1896,8 @@ void connect_to_upstream(connection_t *client, const char *data, size_t data_len SSL_set_connect_state(up->ssl); } - log_debug("Connecting to upstream %s:%d on fd %d", - route->upstream_host, route->upstream_port, up_fd); + log_debug("Connecting to upstream %s:%d on fd %d (SSL: %s)", + route->upstream_host, route->upstream_port, up_fd, route->use_ssl ? "yes" : "no"); } @@ -1581,24 +1905,39 @@ static int do_read(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->read_buf; - if (buf->tail == buf->capacity) buffer_compact(buf); - size_t len = buf->capacity - buf->tail; - if (len == 0) return 0; // Buffer full + buffer_compact(buf); + + size_t available = buffer_available_write(buf); + if (available == 0) { + // Try to grow the buffer + if (buffer_ensure_capacity(buf, buf->capacity * 2) < 0) { + return -1; + } + available = buffer_available_write(buf); + } int bytes_read; if (conn->ssl && conn->ssl_handshake_done) { - bytes_read = SSL_read(conn->ssl, buf->data + buf->tail, len); + bytes_read = SSL_read(conn->ssl, buf->data + buf->tail, available); if (bytes_read <= 0) { int ssl_error = SSL_get_error(conn->ssl, bytes_read); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; + return 0; } + return bytes_read; } } else { - bytes_read = read(conn->fd, buf->data + buf->tail, len); + bytes_read = read(conn->fd, buf->data + buf->tail, available); } - if (bytes_read > 0) buf->tail += bytes_read; + if (bytes_read > 0) { + buf->tail += bytes_read; + conn->last_activity = time(NULL); + if (conn->vhost_stats) { + monitor_record_bytes(conn->vhost_stats, 0, bytes_read); + } + } return bytes_read; } @@ -1606,60 +1945,70 @@ static int do_write(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->write_buf; - size_t len = buf->tail - buf->head; - if (len == 0) return 0; + size_t available = buffer_available_read(buf); + if (available == 0) return 0; int written; if (conn->ssl && conn->ssl_handshake_done) { - written = SSL_write(conn->ssl, buf->data + buf->head, len); + written = SSL_write(conn->ssl, buf->data + buf->head, available); if (written <= 0) { int ssl_error = SSL_get_error(conn->ssl, written); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return 0; } + return written; } } else { - written = write(conn->fd, buf->data + buf->head, len); + written = write(conn->fd, buf->data + buf->head, available); } if (written > 0) { - buf->head += written; - if (buf->head == buf->tail) { - buf->head = 0; - buf->tail = 0; + buffer_consume(buf, written); + conn->last_activity = time(NULL); + if (conn->vhost_stats) { + monitor_record_bytes(conn->vhost_stats, written, 0); } } return written; } void handle_client_read(connection_t *conn) { - if (do_read(conn) <= 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) { + int bytes_read = do_read(conn); + if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); return; } buffer_t *buf = &conn->read_buf; - // Loop to handle pipelined requests for internal routes - while (buf->tail > buf->head) { + // Process all complete requests in the buffer (for pipelining) + while (buffer_available_read(buf) > 0) { char *data_start = buf->data + buf->head; - size_t data_len = buf->tail - buf->head; + size_t data_len = buffer_available_read(buf); - char *headers_end_marker = memmem(data_start, data_len, "\r\n\r\n", 4); - if (!headers_end_marker) { - if (data_len >= MAX_HEADER_SIZE) { + // Look for end of headers + char *headers_end = memmem(data_start, data_len, "\r\n\r\n", 4); + if (!headers_end) { + if (data_len >= MAX_HEADER_SIZE) { send_error_response(conn, 413, "Request Header Too Large", "Header too large"); + return; } - break; // Incomplete request, wait for more data + break; // Incomplete headers, wait for more data } - size_t headers_len = (headers_end_marker - data_start) + 4; + + size_t headers_len = (headers_end - data_start) + 4; - if (parse_http_request(data_start, headers_len, &conn->request) != 1) { + // Parse the request + int parse_result = parse_http_request(data_start, headers_len, &conn->request); + if (parse_result < 0) { + break; // Incomplete, need more data + } else if (parse_result == 0) { send_error_response(conn, 400, "Bad Request", "Malformed HTTP request"); return; } + // Calculate total request size including body long long body_len = (conn->request.content_length > 0) ? conn->request.content_length : 0; size_t total_request_len = headers_len + body_len; @@ -1667,6 +2016,7 @@ void handle_client_read(connection_t *conn) { break; // Incomplete body, wait for more data } + // Start timing the request struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9; @@ -1674,145 +2024,235 @@ void handle_client_read(connection_t *conn) { // Handle internal routes if (strcmp(conn->request.method, "GET") == 0) { if (strncmp(conn->request.uri, "/dashboard", 10) == 0) { + conn->state = CLIENT_STATE_SERVING_INTERNAL; serve_dashboard(conn); - buf->head += total_request_len; // Consume request - monitor_record_request_end(conn->vhost_stats, conn->request_start_time); - conn->request_start_time = 0; - continue; // Check for another pipelined request + buffer_consume(buf, total_request_len); + if (!conn->request.keep_alive) { + conn->state = CLIENT_STATE_CLOSING; + return; + } + conn->state = CLIENT_STATE_READING_HEADERS; + continue; } if (strncmp(conn->request.uri, "/api/stats", 10) == 0) { + conn->state = CLIENT_STATE_SERVING_INTERNAL; serve_stats_api(conn); - buf->head += total_request_len; // Consume request - monitor_record_request_end(conn->vhost_stats, conn->request_start_time); - conn->request_start_time = 0; - continue; + buffer_consume(buf, total_request_len); + if (!conn->request.keep_alive) { + conn->state = CLIENT_STATE_CLOSING; + return; + } + conn->state = CLIENT_STATE_READING_HEADERS; + continue; } } - // This is a request to be forwarded + // This is a request to be forwarded to upstream conn->vhost_stats = monitor_get_or_create_vhost_stats(conn->request.host); monitor_record_request_start(conn->vhost_stats, conn->request.is_websocket); - - conn->state = CLIENT_STATE_FORWARDING; // <-- CRITICAL: Change state now + conn->state = CLIENT_STATE_FORWARDING; + + // Create upstream connection and forward the complete request connect_to_upstream(conn, data_start, total_request_len); - // The data has been handed off; clear the read buffer and stop parsing. - buf->head = 0; - buf->tail = 0; + // Consume the forwarded request from buffer + buffer_consume(buf, total_request_len); + + // After forwarding, exit the loop - the connection is now in forwarding mode return; } - - buffer_compact(buf); } static void handle_forwarding(connection_t *conn) { - conn->last_activity = time(NULL); connection_t *pair = conn->pair; - if (!pair) { close_connection(conn->fd); return; } + if (!pair || pair->fd == -1) { + close_connection(conn->fd); + return; + } + // Read from this connection int bytes_read = do_read(conn); - size_t data_to_forward = conn->read_buf.tail - conn->read_buf.head; + // Forward any data we have + size_t data_to_forward = buffer_available_read(&conn->read_buf); if (data_to_forward > 0) { - if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.tail + data_to_forward) == 0) { - memcpy(pair->write_buf.data + pair->write_buf.tail, conn->read_buf.data + conn->read_buf.head, data_to_forward); - pair->write_buf.tail += data_to_forward; - - conn->read_buf.head = 0; - conn->read_buf.tail = 0; - modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); - } else { - log_error("Failed to ensure write buffer capacity during forward."); - close_connection(conn->fd); - return; + size_t available_write = buffer_available_write(&pair->write_buf); + if (available_write < data_to_forward) { + if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.tail + data_to_forward) < 0) { + log_error("Failed to grow write buffer"); + close_connection(conn->fd); + return; + } } + + memcpy(pair->write_buf.data + pair->write_buf.tail, + conn->read_buf.data + conn->read_buf.head, + data_to_forward); + pair->write_buf.tail += data_to_forward; + buffer_consume(&conn->read_buf, data_to_forward); + + // Enable write events on the pair + modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); } + // Handle connection closure if (bytes_read == 0) { - shutdown(pair->fd, SHUT_WR); + // EOF received - perform half-close + if (!conn->half_closed) { + conn->half_closed = 1; + shutdown(conn->fd, SHUT_RD); + if (pair && !pair->write_shutdown) { + shutdown(pair->fd, SHUT_WR); + pair->write_shutdown = 1; + } + // If both sides are closed, close the connection + if (pair && pair->half_closed) { + close_connection(conn->fd); + } + } } else if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); } } +static void handle_ssl_handshake(connection_t *conn) { + if (!conn->ssl || conn->ssl_handshake_done) return; + + int ret = SSL_do_handshake(conn->ssl); + if (ret == 1) { + conn->ssl_handshake_done = 1; + log_debug("SSL handshake completed for fd %d", conn->fd); + + // Process any pending data + if (buffer_available_read(&conn->write_buf) > 0) { + modify_epoll(conn->fd, EPOLLIN | EPOLLOUT); + } + } else { + int ssl_error = SSL_get_error(conn->ssl, ret); + if (ssl_error == SSL_ERROR_WANT_READ) { + modify_epoll(conn->fd, EPOLLIN); + } else if (ssl_error == SSL_ERROR_WANT_WRITE) { + modify_epoll(conn->fd, EPOLLOUT); + } else { + log_debug("SSL handshake failed for fd %d: %d", conn->fd, ssl_error); + if (conn->pair) { + send_error_response(conn->pair, 502, "Bad Gateway", "SSL handshake failed"); + } else { + close_connection(conn->fd); + } + } + } +} static void handle_write_event(connection_t *conn) { - if (!conn) return; conn->last_activity = time(NULL); + // Handle upstream connection establishment and SSL handshake if (conn->type == CONN_TYPE_UPSTREAM && !conn->ssl_handshake_done) { - int err = 0; - socklen_t len = sizeof(err); - if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &err, &len) != 0 || err != 0) { - send_error_response(conn->pair, 502, "Bad Gateway", strerror(err)); - return; - } - - if (conn->ssl) { - int ret = SSL_do_handshake(conn->ssl); - if (ret == 1) { - conn->ssl_handshake_done = 1; - } else { - int ssl_error = SSL_get_error(conn->ssl, ret); - if (ssl_error != SSL_ERROR_WANT_READ && ssl_error != SSL_ERROR_WANT_WRITE) { - send_error_response(conn->pair, 502, "Bad Gateway", "SSL handshake failed"); - return; + if (!conn->ssl) { + // Check if TCP connection is established + int err = 0; + socklen_t len = sizeof(err); + if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &err, &len) != 0 || err != 0) { + if (conn->pair) { + send_error_response(conn->pair, 502, "Bad Gateway", strerror(err)); } - return; // Handshake in progress + return; } + conn->ssl_handshake_done = 1; // No SSL, mark as done } else { - conn->ssl_handshake_done = 1; // No SSL + handle_ssl_handshake(conn); + if (!conn->ssl_handshake_done) { + return; // Handshake still in progress + } } } - do_write(conn); - - if (conn->write_buf.tail - conn->write_buf.head == 0) { - if (conn->type == CONN_TYPE_CLIENT && conn->state == CLIENT_STATE_ERROR) { + // Write any pending data + int written = do_write(conn); + + // Update epoll events based on buffer state + if (buffer_available_read(&conn->write_buf) == 0) { + // Nothing more to write + if (conn->state == CLIENT_STATE_ERROR || + (conn->state == CLIENT_STATE_SERVING_INTERNAL && !conn->request.keep_alive)) { close_connection(conn->fd); + } else if (conn->state == CLIENT_STATE_SERVING_INTERNAL && conn->request.keep_alive) { + // Done serving internal request, ready for next request + conn->state = CLIENT_STATE_READING_HEADERS; + modify_epoll(conn->fd, EPOLLIN); } else { modify_epoll(conn->fd, EPOLLIN); } + + // Handle half-close if write side was shutdown + if (conn->write_shutdown && conn->half_closed) { + close_connection(conn->fd); + } + } else if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + close_connection(conn->fd); } } void handle_connection_event(struct epoll_event *event) { int fd = event->data.fd; - - if (event->events & (EPOLLERR | EPOLLHUP)) { - close_connection(fd); - return; - } - if (fd < 0 || fd >= MAX_FDS) return; connection_t *conn = &connections[fd]; - if (conn->type == CONN_TYPE_UNUSED) return; + if (conn->type == CONN_TYPE_UNUSED || conn->fd == -1) return; + + // Handle errors + if (event->events & (EPOLLERR | EPOLLHUP)) { + if (event->events & EPOLLERR) { + log_debug("EPOLLERR on fd %d", fd); + } + close_connection(fd); + return; + } if (conn->type == CONN_TYPE_LISTENER) { if (event->events & EPOLLIN) { accept_new_connection(fd); } - } else { // Client or Upstream + } else { + // Handle SSL handshake for upstream connections + if (conn->type == CONN_TYPE_UPSTREAM && conn->ssl && !conn->ssl_handshake_done) { + handle_ssl_handshake(conn); + if (!conn->ssl_handshake_done) { + return; // Still waiting for handshake + } + } + + // Process events if (event->events & EPOLLOUT) { handle_write_event(conn); } + + // Check if connection still exists after write if (connections[fd].type != CONN_TYPE_UNUSED && (event->events & EPOLLIN)) { if (conn->type == CONN_TYPE_CLIENT && conn->state == CLIENT_STATE_READING_HEADERS) { handle_client_read(conn); - } else { + } else if (conn->state == CLIENT_STATE_FORWARDING) { handle_forwarding(conn); } } } } + // ================================================================================================= // // Main Implementation // // ================================================================================================= +static void signal_handler(int sig) { + if (sig == SIGINT || sig == SIGTERM) { + log_info("Received signal %d, shutting down...", sig); + g_shutdown = 1; + } +} + void create_default_config_if_not_exists(const char* filename) { FILE *f = fopen(filename, "r"); if (f) { @@ -1860,13 +2300,14 @@ void init_ssl() { SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_NONE, NULL); SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3); + SSL_CTX_set_mode(ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); } void cleanup() { - log_info("Shutting down proxy..."); + log_info("Cleaning up resources..."); for (int i = 0; i < MAX_FDS; i++) { - if (connections[i].type != CONN_TYPE_UNUSED) { + if (connections[i].type != CONN_TYPE_UNUSED && connections[i].fd != -1) { close_connection(i); } } @@ -1876,30 +2317,27 @@ void cleanup() { if (epoll_fd >= 0) { close(epoll_fd); + epoll_fd = -1; } if (ssl_ctx) { SSL_CTX_free(ssl_ctx); + ssl_ctx = NULL; } EVP_cleanup(); + log_info("Cleanup complete"); } void cleanup_idle_connections() { - static time_t last_cleanup = 0; time_t current_time = time(NULL); - if (current_time - last_cleanup < 60) { - return; - } - - last_cleanup = current_time; - const time_t timeout = 300; // 5 minutes timeout - for (int i = 0; i < MAX_FDS; i++) { connection_t *conn = &connections[i]; - if (conn->type != CONN_TYPE_UNUSED && conn->type != CONN_TYPE_LISTENER) { - if (current_time - conn->last_activity > timeout) { + if (conn->type != CONN_TYPE_UNUSED && + conn->type != CONN_TYPE_LISTENER && + conn->fd != -1) { + if (current_time - conn->last_activity > CONNECTION_TIMEOUT) { log_debug("Closing idle connection fd=%d", i); close_connection(i); } @@ -1907,7 +2345,6 @@ void cleanup_idle_connections() { } } - void free_config() { if (config.routes) { free(config.routes); @@ -1916,10 +2353,11 @@ void free_config() { config.route_count = 0; } - - int main(int argc, char *argv[]) { + // Setup signal handlers signal(SIGPIPE, SIG_IGN); + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); if (getenv("DEBUG")) { g_debug_mode = 1; @@ -1943,26 +2381,30 @@ int main(int argc, char *argv[]) { return 1; } + // Initialize all connections as unused for (int i = 0; i < MAX_FDS; i++) { connections[i].type = CONN_TYPE_UNUSED; + connections[i].fd = -1; } setup_listener_socket(config.port); - log_info("Reverse proxy running on 0.0.0.0:%d", config.port); - log_info("Dashboard available at http://localhost:%d/dashboard", config.port); + log_info("========================================="); + log_info("Reverse proxy v3.0 running on port %d", config.port); + log_info("Dashboard: http://localhost:%d/dashboard", config.port); + log_info("API Stats: http://localhost:%d/api/stats", config.port); + log_info("========================================="); atexit(cleanup); struct epoll_event events[MAX_EVENTS]; - struct timespec last_monitor_update = {0, 0}; + time_t last_monitor_update = 0; + time_t last_cleanup = 0; - while (1) { + while (!g_shutdown) { int n = epoll_wait(epoll_fd, events, MAX_EVENTS, 1000); if (n == -1) { - if (errno == EINTR) { - continue; - } + if (errno == EINTR) continue; log_error("epoll_wait failed"); break; } @@ -1971,14 +2413,21 @@ int main(int argc, char *argv[]) { handle_connection_event(&events[i]); } - struct timespec current_time; - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - if (current_time.tv_sec > last_monitor_update.tv_sec) { + time_t current_time = time(NULL); + + // Update monitoring stats every second + if (current_time > last_monitor_update) { monitor_update(); - cleanup_idle_connections(); last_monitor_update = current_time; } + + // Cleanup idle connections every 60 seconds + if (current_time - last_cleanup >= 60) { + cleanup_idle_connections(); + last_cleanup = current_time; + } } + log_info("Shutdown complete"); return 0; }