/* * ===================================================================================== * * Filename: proxy.c * * Description: High-performance Reverse Proxy with Real-time Monitoring Dashboard. * Single-file C implementation using epoll - PRODUCTION READY VERSION * * Version: 3.0 (Enhanced Reliability & Complete Stats) * Created: 2024 * Compiler: gcc * * Author: Production Ready Version * * To Compile: * gcc -Wall -Wextra -O2 -g -pthread cJSON.c proxy.c -o rproxy -lssl -lcrypto -lsqlite3 -lm * * ===================================================================================== */ // --- Standard Library Includes --- #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // --- Local Includes --- #include "cJSON.h" // --- Constants --- #define MAX_EVENTS 1024 #define MAX_FDS 65536 #define CHUNK_SIZE 65536 #define HISTORY_SECONDS 300 #define MAX_HEADER_SIZE 8192 #define MAX_REQUEST_LINE_SIZE 4096 #define MAX_URI_SIZE 2048 #define CONNECTION_TIMEOUT 300 // --- Enums and Structs --- typedef enum { CONN_TYPE_UNUSED, CONN_TYPE_LISTENER, CONN_TYPE_CLIENT, CONN_TYPE_UPSTREAM } conn_type_t; typedef enum { CLIENT_STATE_READING_HEADERS, // Reading the initial HTTP request CLIENT_STATE_FORWARDING, // Actively forwarding data in both directions CLIENT_STATE_SERVING_INTERNAL, // Serving dashboard or stats CLIENT_STATE_ERROR, // An error occurred, connection will be closed CLIENT_STATE_CLOSING // Connection is being closed } client_state_t; struct connection_s; struct vhost_stats_s; typedef struct { char hostname[256]; char upstream_host[256]; int upstream_port; int use_ssl; int rewrite_host; } route_config_t; typedef struct { int port; route_config_t *routes; int route_count; } app_config_t; // High-performance buffer using head/tail pointers to avoid memmove typedef struct { char *data; size_t capacity; size_t head; // Read position size_t tail; // Write position } buffer_t; typedef struct { char method[16]; char uri[MAX_URI_SIZE]; char version[16]; char host[256]; long content_length; int is_websocket; int keep_alive; int connection_close; } http_request_t; typedef struct connection_s { conn_type_t type; client_state_t state; int fd; struct connection_s *pair; struct vhost_stats_s *vhost_stats; buffer_t read_buf; buffer_t write_buf; SSL *ssl; int ssl_handshake_done; http_request_t request; double request_start_time; time_t last_activity; int half_closed; // For handling connection shutdown int write_shutdown; // Write side shutdown } connection_t; typedef struct { double time; double value; } history_point_t; typedef struct { history_point_t *points; int capacity; int head; int count; } history_deque_t; typedef struct { double time; double rx_kbps; double tx_kbps; } network_history_point_t; typedef struct { network_history_point_t *points; int capacity; int head; int count; } network_history_deque_t; typedef struct { double time; double read_mbps; double write_mbps; } disk_history_point_t; typedef struct { disk_history_point_t *points; int capacity; int head; int count; } disk_history_deque_t; typedef struct request_time_s { double *times; int capacity; int head; int count; } request_time_deque_t; typedef struct vhost_stats_s { char vhost_name[256]; long long http_requests; long long websocket_requests; long long total_requests; long long bytes_sent; long long bytes_recv; double avg_request_time_ms; long long last_bytes_sent; long long last_bytes_recv; double last_update; history_deque_t throughput_history; request_time_deque_t request_times; struct vhost_stats_s *next; } vhost_stats_t; typedef struct { time_t start_time; int active_connections; history_deque_t cpu_history; history_deque_t memory_history; network_history_deque_t network_history; disk_history_deque_t disk_history; history_deque_t throughput_history; history_deque_t load1_history; history_deque_t load5_history; history_deque_t load15_history; long long last_net_sent; long long last_net_recv; long long last_disk_read; long long last_disk_write; double last_net_update_time; double last_disk_update_time; vhost_stats_t *vhost_stats_head; sqlite3 *db; } system_monitor_t; // --- Global State --- connection_t connections[MAX_FDS]; app_config_t config; system_monitor_t monitor; int epoll_fd; SSL_CTX *ssl_ctx; static int g_debug_mode = 0; static volatile int g_shutdown = 0; // --- Function Prototypes --- void log_error(const char *msg); void log_info(const char *format, ...); void log_debug(const char *format, ...); void setup_listener_socket(int port); void accept_new_connection(int listener_fd); void close_connection(int fd); void handle_connection_event(struct epoll_event *event); void connect_to_upstream(connection_t *client_conn, const char *data, size_t data_len); void handle_client_read(connection_t *conn); int parse_http_request(const char *data, size_t len, http_request_t *req); void monitor_init(const char *db_file); void free_config(); void monitor_update(); void monitor_cleanup(); void monitor_record_request_start(vhost_stats_t *stats, int is_websocket); void monitor_record_request_end(vhost_stats_t *stats, double start_time); vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name); void serve_dashboard(connection_t *conn); void serve_stats_api(connection_t *conn); void send_error_response(connection_t *conn, int code, const char* status, const char* body); static void signal_handler(int sig); // --- Buffer Helpers --- static inline int buffer_init(buffer_t *buf, size_t capacity) { buf->data = malloc(capacity); if (!buf->data) { log_error("Failed to allocate buffer"); return -1; } buf->capacity = capacity; buf->head = 0; buf->tail = 0; return 0; } static inline void buffer_free(buffer_t *buf) { if (buf->data) { free(buf->data); buf->data = NULL; } buf->capacity = 0; buf->head = 0; buf->tail = 0; } static inline size_t buffer_available_read(buffer_t *buf) { return buf->tail - buf->head; } static inline size_t buffer_available_write(buffer_t *buf) { return buf->capacity - buf->tail; } static inline int buffer_ensure_capacity(buffer_t *buf, size_t required) { if (buf->capacity >= required) return 0; size_t new_capacity = buf->capacity; while (new_capacity < required) { new_capacity *= 2; if (new_capacity > 1024*1024*10) { // 10MB max log_error("Buffer size limit exceeded"); return -1; } } char *new_data = realloc(buf->data, new_capacity); if (!new_data) { log_error("Failed to reallocate buffer"); return -1; } buf->data = new_data; buf->capacity = new_capacity; return 0; } // Compacts the buffer by moving unread data to the beginning static inline void buffer_compact(buffer_t *buf) { if (buf->head == 0) return; size_t len = buf->tail - buf->head; if (len > 0) { memmove(buf->data, buf->data + buf->head, len); } buf->head = 0; buf->tail = len; } static inline void buffer_consume(buffer_t *buf, size_t bytes) { buf->head += bytes; if (buf->head >= buf->tail) { buf->head = 0; buf->tail = 0; } } // --- String Utilities --- static int is_valid_http_method(const char *method) { const char *valid_methods[] = {"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH", "TRACE", "CONNECT", NULL}; for (int i = 0; valid_methods[i]; i++) { if (strcmp(method, valid_methods[i]) == 0) return 1; } return 0; } // ================================================================================================= // // Logging Implementation // // ================================================================================================= void log_error(const char *msg) { perror(msg); } void log_message(const char *level, const char *format, va_list args) { time_t now; time(&now); char buf[32]; strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", localtime(&now)); printf("%s - %-5s - ", buf, level); vprintf(format, args); printf("\n"); fflush(stdout); } void log_info(const char *format, ...) { va_list args; va_start(args, format); log_message("INFO", format, args); va_end(args); } void log_debug(const char *format, ...) { if (!g_debug_mode) return; va_list args; va_start(args, format); log_message("DEBUG", format, args); va_end(args); } // ================================================================================================= // // Configuration Implementation // // ================================================================================================= static char* read_file_to_string(const char *filename) { FILE *f = fopen(filename, "rb"); if (!f) return NULL; fseek(f, 0, SEEK_END); long length = ftell(f); if (length < 0 || length > 1024*1024) { // Max 1MB config fclose(f); return NULL; } fseek(f, 0, SEEK_SET); char *buffer = malloc(length + 1); if (buffer) { size_t read_len = fread(buffer, 1, length, f); buffer[read_len] = '\0'; } fclose(f); return buffer; } int load_config(const char *filename) { log_info("Loading configuration from %s", filename); char *json_string = read_file_to_string(filename); if (!json_string) { log_error("Could not read config file"); return 0; } cJSON *root = cJSON_Parse(json_string); free(json_string); if (!root) { fprintf(stderr, "JSON parse error: %s\n", cJSON_GetErrorPtr()); return 0; } cJSON *port_item = cJSON_GetObjectItem(root, "port"); config.port = cJSON_IsNumber(port_item) ? port_item->valueint : 8080; if (config.port < 1 || config.port > 65535) { fprintf(stderr, "Invalid port number: %d\n", config.port); cJSON_Delete(root); return 0; } cJSON *proxy_array = cJSON_GetObjectItem(root, "reverse_proxy"); if (cJSON_IsArray(proxy_array)) { config.route_count = cJSON_GetArraySize(proxy_array); if (config.route_count <= 0) { cJSON_Delete(root); return 0; } config.routes = calloc(config.route_count, sizeof(route_config_t)); if (!config.routes) { log_error("Failed to allocate memory for routes"); cJSON_Delete(root); return 0; } int i = 0; cJSON *route_item; cJSON_ArrayForEach(route_item, proxy_array) { route_config_t *route = &config.routes[i]; cJSON *hostname = cJSON_GetObjectItem(route_item, "hostname"); cJSON *upstream_host = cJSON_GetObjectItem(route_item, "upstream_host"); cJSON *upstream_port = cJSON_GetObjectItem(route_item, "upstream_port"); if (!cJSON_IsString(hostname) || !cJSON_IsString(upstream_host) || !cJSON_IsNumber(upstream_port)) { fprintf(stderr, "Invalid route configuration at index %d\n", i); continue; } strncpy(route->hostname, hostname->valuestring, sizeof(route->hostname) - 1); strncpy(route->upstream_host, upstream_host->valuestring, sizeof(route->upstream_host) - 1); route->upstream_port = upstream_port->valueint; if (route->upstream_port < 1 || route->upstream_port > 65535) { fprintf(stderr, "Invalid upstream port for %s: %d\n", route->hostname, route->upstream_port); continue; } route->use_ssl = cJSON_IsTrue(cJSON_GetObjectItem(route_item, "use_ssl")); route->rewrite_host = cJSON_IsTrue(cJSON_GetObjectItem(route_item, "rewrite_host")); log_info("Route configured: %s -> %s:%d (SSL: %s, Rewrite Host: %s)", route->hostname, route->upstream_host, route->upstream_port, route->use_ssl ? "yes" : "no", route->rewrite_host ? "yes" : "no"); i++; } } cJSON_Delete(root); log_info("Loaded %d routes from %s", config.route_count, filename); return 1; } route_config_t *find_route(const char *hostname) { if (!hostname) return NULL; for (int i = 0; i < config.route_count; i++) { if (strcasecmp(hostname, config.routes[i].hostname) == 0) { return &config.routes[i]; } } return NULL; } // ================================================================================================= // // Monitor Implementation // // ================================================================================================= static void history_deque_init(history_deque_t *dq, int capacity) { dq->points = calloc(capacity, sizeof(history_point_t)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void history_deque_push(history_deque_t *dq, double time, double value) { if (!dq || !dq->points) return; dq->points[dq->head] = (history_point_t){ .time = time, .value = value }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void network_history_deque_init(network_history_deque_t *dq, int capacity) { dq->points = calloc(capacity, sizeof(network_history_point_t)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void network_history_deque_push(network_history_deque_t *dq, double time, double rx, double tx) { if (!dq || !dq->points) return; dq->points[dq->head] = (network_history_point_t){ .time = time, .rx_kbps = rx, .tx_kbps = tx }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void disk_history_deque_init(disk_history_deque_t *dq, int capacity) { dq->points = calloc(capacity, sizeof(disk_history_point_t)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void disk_history_deque_push(disk_history_deque_t *dq, double time, double read_mbps, double write_mbps) { if (!dq || !dq->points) return; dq->points[dq->head] = (disk_history_point_t){ .time = time, .read_mbps = read_mbps, .write_mbps = write_mbps }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void request_time_deque_init(request_time_deque_t *dq, int capacity) { dq->times = calloc(capacity, sizeof(double)); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void request_time_deque_push(request_time_deque_t *dq, double time_ms) { if (!dq || !dq->times) return; dq->times[dq->head] = time_ms; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void init_db() { if (!monitor.db) return; char *err_msg = 0; const char *sql_create_table = "CREATE TABLE IF NOT EXISTS vhost_stats (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " vhost TEXT NOT NULL," " timestamp REAL NOT NULL," " http_requests INTEGER DEFAULT 0," " websocket_requests INTEGER DEFAULT 0," " total_requests INTEGER DEFAULT 0," " bytes_sent INTEGER DEFAULT 0," " bytes_recv INTEGER DEFAULT 0," " avg_request_time_ms REAL DEFAULT 0," " UNIQUE(vhost, timestamp)" ");"; const char *sql_create_index = "CREATE INDEX IF NOT EXISTS idx_vhost_timestamp ON vhost_stats(vhost, timestamp);"; if (sqlite3_exec(monitor.db, sql_create_table, 0, 0, &err_msg) != SQLITE_OK || sqlite3_exec(monitor.db, sql_create_index, 0, 0, &err_msg) != SQLITE_OK) { fprintf(stderr, "SQL error: %s\n", err_msg); sqlite3_free(err_msg); } } static void load_stats_from_db() { if (!monitor.db) return; sqlite3_stmt *res; const char *sql = "SELECT vhost, http_requests, websocket_requests, total_requests, " "bytes_sent, bytes_recv, avg_request_time_ms " "FROM vhost_stats v1 WHERE timestamp = (" " SELECT MAX(timestamp) FROM vhost_stats v2 WHERE v2.vhost = v1.vhost" ")"; if (sqlite3_prepare_v2(monitor.db, sql, -1, &res, 0) != SQLITE_OK) { fprintf(stderr, "Failed to execute statement: %s\n", sqlite3_errmsg(monitor.db)); return; } int vhost_count = 0; while (sqlite3_step(res) == SQLITE_ROW) { vhost_stats_t *stats = monitor_get_or_create_vhost_stats((const char*)sqlite3_column_text(res, 0)); if (stats) { stats->http_requests = sqlite3_column_int64(res, 1); stats->websocket_requests = sqlite3_column_int64(res, 2); stats->total_requests = sqlite3_column_int64(res, 3); stats->bytes_sent = sqlite3_column_int64(res, 4); stats->bytes_recv = sqlite3_column_int64(res, 5); stats->avg_request_time_ms = sqlite3_column_double(res, 6); vhost_count++; } } sqlite3_finalize(res); log_info("Loaded statistics for %d vhosts from database", vhost_count); } void monitor_init(const char *db_file) { memset(&monitor, 0, sizeof(system_monitor_t)); monitor.start_time = time(NULL); history_deque_init(&monitor.cpu_history, HISTORY_SECONDS); history_deque_init(&monitor.memory_history, HISTORY_SECONDS); network_history_deque_init(&monitor.network_history, HISTORY_SECONDS); disk_history_deque_init(&monitor.disk_history, HISTORY_SECONDS); history_deque_init(&monitor.throughput_history, HISTORY_SECONDS); history_deque_init(&monitor.load1_history, HISTORY_SECONDS); history_deque_init(&monitor.load5_history, HISTORY_SECONDS); history_deque_init(&monitor.load15_history, HISTORY_SECONDS); if (sqlite3_open(db_file, &monitor.db) != SQLITE_OK) { fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(monitor.db)); if (monitor.db) { sqlite3_close(monitor.db); monitor.db = NULL; } } else { init_db(); load_stats_from_db(); } monitor_update(); } void monitor_cleanup() { if (monitor.db) { sqlite3_close(monitor.db); monitor.db = NULL; } vhost_stats_t *current = monitor.vhost_stats_head; while (current) { vhost_stats_t *next = current->next; if (current->throughput_history.points) free(current->throughput_history.points); if (current->request_times.times) free(current->request_times.times); free(current); current = next; } monitor.vhost_stats_head = NULL; if (monitor.cpu_history.points) free(monitor.cpu_history.points); if (monitor.memory_history.points) free(monitor.memory_history.points); if (monitor.network_history.points) free(monitor.network_history.points); if (monitor.disk_history.points) free(monitor.disk_history.points); if (monitor.throughput_history.points) free(monitor.throughput_history.points); if (monitor.load1_history.points) free(monitor.load1_history.points); if (monitor.load5_history.points) free(monitor.load5_history.points); if (monitor.load15_history.points) free(monitor.load15_history.points); } static void save_stats_to_db() { if (!monitor.db) return; sqlite3_stmt *stmt; const char *sql = "INSERT OR REPLACE INTO vhost_stats " "(vhost, timestamp, http_requests, websocket_requests, total_requests, " "bytes_sent, bytes_recv, avg_request_time_ms) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?);"; if (sqlite3_prepare_v2(monitor.db, sql, -1, &stmt, NULL) != SQLITE_OK) return; double current_time = (double)time(NULL); for (vhost_stats_t *s = monitor.vhost_stats_head; s != NULL; s = s->next) { if (s->request_times.count > 0) { double total_time = 0; for(int i = 0; i < s->request_times.count; i++) { total_time += s->request_times.times[i]; } s->avg_request_time_ms = total_time / s->request_times.count; } sqlite3_bind_text(stmt, 1, s->vhost_name, -1, SQLITE_STATIC); sqlite3_bind_double(stmt, 2, current_time); sqlite3_bind_int64(stmt, 3, s->http_requests); sqlite3_bind_int64(stmt, 4, s->websocket_requests); sqlite3_bind_int64(stmt, 5, s->total_requests); sqlite3_bind_int64(stmt, 6, s->bytes_sent); sqlite3_bind_int64(stmt, 7, s->bytes_recv); sqlite3_bind_double(stmt, 8, s->avg_request_time_ms); sqlite3_step(stmt); sqlite3_reset(stmt); } sqlite3_finalize(stmt); } static double get_cpu_usage() { static long long prev_user = 0, prev_nice = 0, prev_system = 0, prev_idle = 0; long long user, nice, system, idle, iowait, irq, softirq; FILE *f = fopen("/proc/stat", "r"); if (!f) return 0.0; if (fscanf(f, "cpu %lld %lld %lld %lld %lld %lld %lld", &user, &nice, &system, &idle, &iowait, &irq, &softirq) != 7) { fclose(f); return 0.0; } fclose(f); long long prev_total = prev_user + prev_nice + prev_system + prev_idle; long long total = user + nice + system + idle; long long totald = total - prev_total; long long idled = idle - prev_idle; prev_user = user; prev_nice = nice; prev_system = system; prev_idle = idle; return totald == 0 ? 0.0 : (double)(totald - idled) * 100.0 / totald; } static void get_memory_usage(double *used_gb) { struct sysinfo info; if (sysinfo(&info) != 0) { *used_gb = 0; return; } *used_gb = (double)(info.totalram - info.freeram - info.bufferram) * info.mem_unit / (1024.0 * 1024.0 * 1024.0); } static void get_network_stats(long long *bytes_sent, long long *bytes_recv) { FILE *f = fopen("/proc/net/dev", "r"); if (!f) { *bytes_sent = 0; *bytes_recv = 0; return; } char line[256]; if (!fgets(line, sizeof(line), f) || !fgets(line, sizeof(line), f)) { // Skip headers fclose(f); *bytes_sent = 0; *bytes_recv = 0; return; } long long total_recv = 0, total_sent = 0; while (fgets(line, sizeof(line), f)) { char iface[32]; long long r, t; if (sscanf(line, "%31[^:]: %lld %*d %*d %*d %*d %*d %*d %*d %lld", iface, &r, &t) == 3) { char *trimmed = iface; while (*trimmed == ' ') trimmed++; if (strcmp(trimmed, "lo") != 0) { total_recv += r; total_sent += t; } } } fclose(f); *bytes_sent = total_sent; *bytes_recv = total_recv; } static void get_disk_stats(long long *sectors_read, long long *sectors_written) { FILE *f = fopen("/proc/diskstats", "r"); if (!f) { *sectors_read = 0; *sectors_written = 0; return; } char line[2048]; long long total_read = 0, total_written = 0; while (fgets(line, sizeof(line), f)) { char device[64]; long long sectors_r = 0, sectors_w = 0; int nfields = 0; // Format: major minor device reads_completed reads_merged sectors_read ... writes_completed writes_merged sectors_written // Read all fields as strings, then convert needed ones char major[16], minor[16], dev[64]; char rc[32], rm[32], sr[32], rtm[32], rtm2[32], wc[32], wm[32], sw[32]; nfields = sscanf(line, "%15s %15s %63s %31s %31s %31s %31s %31s %31s %31s %31s %31s", major, minor, dev, rc, rm, sr, rtm, rtm2, wc, wm, sw, sw); if (nfields >= 11) { strncpy(device, dev, sizeof(device)-1); device[sizeof(device)-1] = '\0'; sectors_r = atoll(sr); sectors_w = atoll(sw); // Only count physical devices (sda, nvme0n1, etc), not partitions if (strncmp(device, "loop", 4) != 0 && strncmp(device, "ram", 3) != 0) { // Check if it's a main device (no numbers at the end for sd* devices) int len = strlen(device); if ((strncmp(device, "sd", 2) == 0 && len == 3) || (strncmp(device, "nvme", 4) == 0 && strstr(device, "n1p") == NULL) || (strncmp(device, "vd", 2) == 0 && len == 3) || (strncmp(device, "hd", 2) == 0 && len == 3)) { total_read += sectors_r; total_written += sectors_w; } } } } fclose(f); *sectors_read = total_read; *sectors_written = total_written; } static void get_load_averages(double *load1, double *load5, double *load15) { FILE *f = fopen("/proc/loadavg", "r"); if (!f) { *load1 = *load5 = *load15 = 0.0; return; } if (fscanf(f, "%lf %lf %lf", load1, load5, load15) != 3) { *load1 = *load5 = *load15 = 0.0; } fclose(f); } void monitor_update() { double current_time = time(NULL); // CPU usage history_deque_push(&monitor.cpu_history, current_time, get_cpu_usage()); // Memory usage double mem_used_gb; get_memory_usage(&mem_used_gb); history_deque_push(&monitor.memory_history, current_time, mem_used_gb); // Network stats long long net_sent, net_recv; get_network_stats(&net_sent, &net_recv); double time_delta = current_time - monitor.last_net_update_time; if (time_delta > 0 && monitor.last_net_update_time > 0) { double rx = (net_recv - monitor.last_net_recv) / time_delta / 1024.0; double tx = (net_sent - monitor.last_net_sent) / time_delta / 1024.0; network_history_deque_push(&monitor.network_history, current_time, fmax(0, rx), fmax(0, tx)); history_deque_push(&monitor.throughput_history, current_time, fmax(0, rx + tx)); } monitor.last_net_sent = net_sent; monitor.last_net_recv = net_recv; monitor.last_net_update_time = current_time; // Disk I/O stats long long disk_read, disk_write; get_disk_stats(&disk_read, &disk_write); double disk_time_delta = current_time - monitor.last_disk_update_time; if (disk_time_delta > 0 && monitor.last_disk_update_time > 0) { // Convert sectors to MB/s (sector = 512 bytes typically) double read_mbps = (disk_read - monitor.last_disk_read) * 512.0 / disk_time_delta / (1024.0 * 1024.0); double write_mbps = (disk_write - monitor.last_disk_write) * 512.0 / disk_time_delta / (1024.0 * 1024.0); disk_history_deque_push(&monitor.disk_history, current_time, fmax(0, read_mbps), fmax(0, write_mbps)); } monitor.last_disk_read = disk_read; monitor.last_disk_write = disk_write; monitor.last_disk_update_time = current_time; // Load averages double load1, load5, load15; get_load_averages(&load1, &load5, &load15); history_deque_push(&monitor.load1_history, current_time, load1); history_deque_push(&monitor.load5_history, current_time, load5); history_deque_push(&monitor.load15_history, current_time, load15); // VHost stats for (vhost_stats_t *s = monitor.vhost_stats_head; s != NULL; s = s->next) { double vhost_delta = current_time - s->last_update; if (vhost_delta >= 1.0) { double kbps = 0; if (s->last_update > 0) { long long bytes_diff = (s->bytes_sent - s->last_bytes_sent) + (s->bytes_recv - s->last_bytes_recv); kbps = bytes_diff / vhost_delta / 1024.0; } history_deque_push(&s->throughput_history, current_time, fmax(0, kbps)); s->last_bytes_sent = s->bytes_sent; s->last_bytes_recv = s->bytes_recv; s->last_update = current_time; } } static time_t last_db_save = 0; if (current_time - last_db_save >= 10) { save_stats_to_db(); last_db_save = current_time; } } vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name) { if (!vhost_name || strlen(vhost_name) == 0) return NULL; for (vhost_stats_t *curr = monitor.vhost_stats_head; curr; curr = curr->next) { if (strcmp(curr->vhost_name, vhost_name) == 0) return curr; } vhost_stats_t *new_stats = calloc(1, sizeof(vhost_stats_t)); if (!new_stats) return NULL; strncpy(new_stats->vhost_name, vhost_name, sizeof(new_stats->vhost_name) - 1); new_stats->last_update = time(NULL); history_deque_init(&new_stats->throughput_history, 60); request_time_deque_init(&new_stats->request_times, 100); new_stats->next = monitor.vhost_stats_head; monitor.vhost_stats_head = new_stats; return new_stats; } void monitor_record_request_start(vhost_stats_t *stats, int is_websocket) { if (!stats) return; if (is_websocket) { __sync_fetch_and_add(&stats->websocket_requests, 1); } else { __sync_fetch_and_add(&stats->http_requests, 1); } __sync_fetch_and_add(&stats->total_requests, 1); } void monitor_record_request_end(vhost_stats_t *stats, double start_time) { if (!stats || start_time <= 0) return; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &end_time); double duration_ms = ((end_time.tv_sec + end_time.tv_nsec / 1e9) - start_time) * 1000.0; if (duration_ms >= 0 && duration_ms < 60000) { // Sanity check: < 60 seconds request_time_deque_push(&stats->request_times, duration_ms); } } void monitor_record_bytes(vhost_stats_t *stats, long long sent, long long recv) { if (!stats) return; __sync_fetch_and_add(&stats->bytes_sent, sent); __sync_fetch_and_add(&stats->bytes_recv, recv); } // ================================================================================================= // // Dashboard Implementation // // ================================================================================================= const char *DASHBOARD_HTML = "\n" "\n" "\n" " Reverse Proxy Monitor\n" " \n" " \n" "\n" "\n" "
\n" "
\n" "
0
\n" "
Connections
\n" "
\n" "
\n" "
0
\n" "
Memory
\n" "
\n" "
\n" "
0
\n" "
CPU %
\n" "
\n" "
\n" "
0.00
\n" "
Load 1m
\n" "
\n" "
\n" "
0.00
\n" "
Load 5m
\n" "
\n" "
\n" "
0.00
\n" "
Load 15m
\n" "
\n" "
\n" "\n" "
\n" "
CPU Usage
\n" " \n" "
\n" "\n" "
\n" "
Memory Usage
\n" " \n" "
\n" "\n" "
\n" "
Network I/O
\n" "
\n" "
RX
\n" "
TX
\n" "
\n" " \n" "
\n" "\n" "
\n" "
Disk I/O
\n" "
\n" "
Read
\n" "
Write
\n" "
\n" " \n" "
\n" "\n" "
\n" "
Load Average
\n" "
\n" "
1 min
\n" "
5 min
\n" "
15 min
\n" "
\n" " \n" "
\n" "\n" "
\n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" "
Virtual HostHTTP ReqWS ReqTotal ReqAvg Resp (ms)SentReceived
\n" "
\n" "\n" " \n" "\n" "\n" ; void serve_dashboard(connection_t *conn) { if (!conn) return; size_t content_len = strlen(DASHBOARD_HTML); char header[512]; int len = snprintf(header, sizeof(header), "HTTP/1.1 200 OK\r\n" "Content-Type: text/html; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: %s\r\n" "Cache-Control: no-cache\r\n" "\r\n", content_len, conn->request.keep_alive ? "keep-alive" : "close"); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + len + content_len) < 0) { send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } memcpy(conn->write_buf.data + conn->write_buf.tail, header, len); conn->write_buf.tail += len; memcpy(conn->write_buf.data + conn->write_buf.tail, DASHBOARD_HTML, content_len); conn->write_buf.tail += content_len; struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); } static cJSON* format_history(history_deque_t *dq, int window_seconds) { cJSON *arr = cJSON_CreateArray(); if (!arr || !dq || !dq->points || dq->count == 0) return arr; double current_time = time(NULL); int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; for (int i = 0; i < dq->count; ++i) { int current_index = (start_index + i) % dq->capacity; history_point_t *p = &dq->points[current_index]; if ((current_time - p->time) <= window_seconds) { cJSON *pt = cJSON_CreateObject(); if (pt) { cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); cJSON_AddNumberToObject(pt, "y", p->value); cJSON_AddItemToArray(arr, pt); } } } return arr; } static cJSON* format_network_history(network_history_deque_t *dq, int window_seconds, const char *key) { cJSON *arr = cJSON_CreateArray(); if (!arr || !dq || !dq->points || !key || dq->count == 0) return arr; double current_time = time(NULL); int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; for (int i = 0; i < dq->count; ++i) { int current_index = (start_index + i) % dq->capacity; network_history_point_t *p = &dq->points[current_index]; if ((current_time - p->time) <= window_seconds) { cJSON *pt = cJSON_CreateObject(); if (pt) { cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); cJSON_AddNumberToObject(pt, "y", strcmp(key, "rx_kbps") == 0 ? p->rx_kbps : p->tx_kbps); cJSON_AddItemToArray(arr, pt); } } } return arr; } static cJSON* format_disk_history(disk_history_deque_t *dq, int window_seconds, const char *key) { cJSON *arr = cJSON_CreateArray(); if (!arr || !dq || !dq->points || !key || dq->count == 0) return arr; double current_time = time(NULL); int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; for (int i = 0; i < dq->count; ++i) { int current_index = (start_index + i) % dq->capacity; disk_history_point_t *p = &dq->points[current_index]; if ((current_time - p->time) <= window_seconds) { cJSON *pt = cJSON_CreateObject(); if (pt) { cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); cJSON_AddNumberToObject(pt, "y", strcmp(key, "read_mbps") == 0 ? p->read_mbps : p->write_mbps); cJSON_AddItemToArray(arr, pt); } } } return arr; } void serve_stats_api(connection_t *conn) { if (!conn) return; cJSON *root = cJSON_CreateObject(); if (!root) { send_error_response(conn, 500, "Internal Server Error", "JSON creation failed"); return; } cJSON *current = cJSON_CreateObject(); if (!current) { cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "JSON creation failed"); return; } cJSON_AddItemToObject(root, "current", current); char buffer[64]; double last_cpu = 0, last_mem = 0; double load1 = 0, load5 = 0, load15 = 0; if (monitor.cpu_history.count > 0) { int last_idx = (monitor.cpu_history.head - 1 + monitor.cpu_history.capacity) % monitor.cpu_history.capacity; last_cpu = monitor.cpu_history.points[last_idx].value; } if (monitor.memory_history.count > 0) { int last_idx = (monitor.memory_history.head - 1 + monitor.memory_history.capacity) % monitor.memory_history.capacity; last_mem = monitor.memory_history.points[last_idx].value; } if (monitor.load1_history.count > 0) { int idx = (monitor.load1_history.head - 1 + monitor.load1_history.capacity) % monitor.load1_history.capacity; load1 = monitor.load1_history.points[idx].value; } if (monitor.load5_history.count > 0) { int idx = (monitor.load5_history.head - 1 + monitor.load5_history.capacity) % monitor.load5_history.capacity; load5 = monitor.load5_history.points[idx].value; } if (monitor.load15_history.count > 0) { int idx = (monitor.load15_history.head - 1 + monitor.load15_history.capacity) % monitor.load15_history.capacity; load15 = monitor.load15_history.points[idx].value; } snprintf(buffer, sizeof(buffer), "%.2f", last_cpu); cJSON_AddStringToObject(current, "cpu_percent", buffer); snprintf(buffer, sizeof(buffer), "%.2f", last_mem); cJSON_AddStringToObject(current, "memory_gb", buffer); cJSON_AddNumberToObject(current, "active_connections", monitor.active_connections); cJSON_AddNumberToObject(current, "load_1m", load1); cJSON_AddNumberToObject(current, "load_5m", load5); cJSON_AddNumberToObject(current, "load_15m", load15); cJSON_AddItemToObject(root, "cpu_history", format_history(&monitor.cpu_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "memory_history", format_history(&monitor.memory_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "network_rx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "rx_kbps")); cJSON_AddItemToObject(root, "network_tx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "tx_kbps")); cJSON_AddItemToObject(root, "disk_read_history", format_disk_history(&monitor.disk_history, HISTORY_SECONDS, "read_mbps")); cJSON_AddItemToObject(root, "disk_write_history", format_disk_history(&monitor.disk_history, HISTORY_SECONDS, "write_mbps")); cJSON_AddItemToObject(root, "throughput_history", format_history(&monitor.throughput_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "load1_history", format_history(&monitor.load1_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "load5_history", format_history(&monitor.load5_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "load15_history", format_history(&monitor.load15_history, HISTORY_SECONDS)); cJSON *processes = cJSON_CreateArray(); if (processes) { cJSON_AddItemToObject(root, "processes", processes); for (vhost_stats_t *s = monitor.vhost_stats_head; s; s = s->next) { cJSON *p = cJSON_CreateObject(); if (p) { cJSON_AddStringToObject(p, "name", s->vhost_name); cJSON_AddNumberToObject(p, "http_requests", s->http_requests); cJSON_AddNumberToObject(p, "websocket_requests", s->websocket_requests); cJSON_AddNumberToObject(p, "total_requests", s->total_requests); cJSON_AddNumberToObject(p, "avg_request_time_ms", s->avg_request_time_ms); cJSON_AddNumberToObject(p, "bytes_sent", s->bytes_sent); cJSON_AddNumberToObject(p, "bytes_recv", s->bytes_recv); cJSON_AddItemToObject(p, "throughput_history", format_history(&s->throughput_history, 60)); cJSON_AddItemToArray(processes, p); } } } char *json_string = cJSON_PrintUnformatted(root); if (!json_string) { cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "JSON serialization failed"); return; } char header[512]; int hlen = snprintf(header, sizeof(header), "HTTP/1.1 200 OK\r\n" "Content-Type: application/json; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: %s\r\n" "Cache-Control: no-cache\r\n" "\r\n", strlen(json_string), conn->request.keep_alive ? "keep-alive" : "close"); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + hlen + strlen(json_string)) < 0) { free(json_string); cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } memcpy(conn->write_buf.data + conn->write_buf.tail, header, hlen); conn->write_buf.tail += hlen; memcpy(conn->write_buf.data + conn->write_buf.tail, json_string, strlen(json_string)); conn->write_buf.tail += strlen(json_string); struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); cJSON_Delete(root); free(json_string); } // ================================================================================================= // // HTTP Implementation // // ================================================================================================= // Helper to find a header value without modifying the source string static int find_header_value(const char* data, size_t len, const char* name, char* value, size_t value_size) { size_t name_len = strlen(name); const char* end = data + len; for (const char* p = data; p < end; ) { const char* line_end = memchr(p, '\n', end - p); if (!line_end) break; if (line_end > p && *(line_end - 1) == '\r') { line_end--; } if ((size_t)(line_end - p) > name_len && strncasecmp(p, name, name_len) == 0 && p[name_len] == ':') { const char* v_start = p + name_len + 1; while (v_start < line_end && (*v_start == ' ' || *v_start == '\t')) v_start++; const char* v_end = line_end; while (v_end > v_start && (*(v_end - 1) == ' ' || *(v_end - 1) == '\t')) v_end--; size_t copy_len = v_end - v_start; if (copy_len >= value_size) copy_len = value_size - 1; memcpy(value, v_start, copy_len); value[copy_len] = '\0'; return 1; } p = line_end + ( (line_end < end && *(line_end) == '\r') ? 2 : 1 ); } return 0; } // A robust, non-destructive HTTP request parser int parse_http_request(const char *data, size_t len, http_request_t *req) { memset(req, 0, sizeof(http_request_t)); req->content_length = -1; req->keep_alive = 1; // HTTP/1.1 default // Find the end of the request line const char *line_end = memchr(data, '\n', len); if (!line_end) return -1; // Incomplete size_t line_len = line_end - data; if (line_len > 0 && data[line_len - 1] == '\r') { line_len--; } // Parse Method const char *method_end = memchr(data, ' ', line_len); if (!method_end) return 0; // Malformed size_t method_len = method_end - data; if (method_len >= sizeof(req->method)) return 0; memcpy(req->method, data, method_len); req->method[method_len] = '\0'; if (!is_valid_http_method(req->method)) return 0; // Parse URI const char *uri_start = method_end + 1; while (uri_start < data + line_len && *uri_start == ' ') uri_start++; const char *uri_end = data + line_len; // Look for version part from the right const char *version_start = memrchr(uri_start, ' ', uri_end - uri_start); if (!version_start || version_start == uri_start) return 0; // Set URI size_t uri_len = version_start - uri_start; if (uri_len >= sizeof(req->uri)) return 0; memcpy(req->uri, uri_start, uri_len); req->uri[uri_len] = '\0'; // Set Version version_start++; while (version_start < uri_end && *version_start == ' ') version_start++; size_t version_len = uri_end - version_start; if (version_len >= sizeof(req->version)) return 0; memcpy(req->version, version_start, version_len); req->version[version_len] = '\0'; // HTTP/1.0 defaults to close if (strncmp(req->version, "HTTP/1.0", 8) == 0) { req->keep_alive = 0; } // Parse headers const char *headers_start = line_end + 1; char value[1024]; if (find_header_value(headers_start, len - (headers_start - data), "Host", req->host, sizeof(req->host))) { char *port_colon = strchr(req->host, ':'); if (port_colon) *port_colon = '\0'; } if (find_header_value(headers_start, len - (headers_start - data), "Content-Length", value, sizeof(value))) { req->content_length = atol(value); } if (find_header_value(headers_start, len - (headers_start - data), "Connection", value, sizeof(value))) { if (strcasecmp(value, "close") == 0) { req->keep_alive = 0; req->connection_close = 1; } else if (strcasecmp(value, "keep-alive") == 0) { req->keep_alive = 1; } else if (strcasecmp(value, "upgrade") == 0) { req->is_websocket = 1; } } if (find_header_value(headers_start, len - (headers_start - data), "Upgrade", value, sizeof(value))) { if (strcasecmp(value, "websocket") == 0) req->is_websocket = 1; } return 1; } void send_error_response(connection_t *conn, int code, const char* status, const char* body) { if (!conn || !status || !body) return; char response[2048]; int len = snprintf(response, sizeof(response), "HTTP/1.1 %d %s\r\n" "Content-Type: text/plain; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: close\r\n" "Server: ReverseProxy/3.0\r\n" "\r\n" "%s", code, status, strlen(body), body); if (len > 0 && (size_t)len < sizeof(response)) { if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + len) == 0) { memcpy(conn->write_buf.data + conn->write_buf.tail, response, len); conn->write_buf.tail += len; struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); } } conn->state = CLIENT_STATE_ERROR; conn->request.keep_alive = 0; // Force close after error } // ================================================================================================= // // Proxy Implementation // // ================================================================================================= static void set_non_blocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags >= 0) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } static void set_tcp_keepalive(int fd) { int yes = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); int idle = 60; setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)); int interval = 10; setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)); int maxpkt = 6; setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &maxpkt, sizeof(maxpkt)); } static void add_to_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { log_error("epoll_ctl_add failed"); close(fd); } } static void modify_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) == -1) { // This can happen if fd was closed if(errno != EBADF && errno != ENOENT) log_debug("epoll_ctl_mod failed for fd %d: %s", fd, strerror(errno)); } } void setup_listener_socket(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { log_error("socket failed"); exit(EXIT_FAILURE); } int reuse = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { log_error("setsockopt SO_REUSEADDR failed"); close(fd); exit(EXIT_FAILURE); } struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr.s_addr = htonl(INADDR_ANY) }; if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { log_error("bind failed"); close(fd); exit(EXIT_FAILURE); } if (listen(fd, SOMAXCONN) == -1) { log_error("listen failed"); close(fd); exit(EXIT_FAILURE); } set_non_blocking(fd); add_to_epoll(fd, EPOLLIN); connections[fd].type = CONN_TYPE_LISTENER; connections[fd].fd = fd; log_info("Listening on port %d (fd=%d)", port, fd); } void accept_new_connection(int listener_fd) { while (1) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(listener_fd, (struct sockaddr*)&client_addr, &client_len); if (client_fd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { log_error("accept failed"); } break; } if (client_fd >= MAX_FDS) { log_error("Connection fd too high, closing"); close(client_fd); continue; } set_non_blocking(client_fd); set_tcp_keepalive(client_fd); add_to_epoll(client_fd, EPOLLIN); connection_t *conn = &connections[client_fd]; memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_CLIENT; conn->state = CLIENT_STATE_READING_HEADERS; conn->fd = client_fd; conn->last_activity = time(NULL); if (buffer_init(&conn->read_buf, CHUNK_SIZE) < 0 || buffer_init(&conn->write_buf, CHUNK_SIZE) < 0) { close_connection(client_fd); continue; } __sync_fetch_and_add(&monitor.active_connections, 1); log_debug("New connection on fd %d from %s, total: %d", client_fd, inet_ntoa(client_addr.sin_addr), monitor.active_connections); } } void close_connection(int fd) { if (fd < 0 || fd >= MAX_FDS) return; connection_t *conn = &connections[fd]; if (conn->type == CONN_TYPE_UNUSED) return; // Prevent double-closing if (conn->fd == -1) return; int pair_fd = -1; if (conn->pair && conn->pair->fd != -1) { pair_fd = conn->pair->fd; conn->pair->pair = NULL; } // Record request end if needed if (conn->vhost_stats && conn->request_start_time > 0) { monitor_record_request_end(conn->vhost_stats, conn->request_start_time); conn->request_start_time = 0; } // Update connection count if (conn->type == CONN_TYPE_CLIENT) { int old_count = __sync_fetch_and_sub(&monitor.active_connections, 1); if (old_count <= 0) { monitor.active_connections = 0; } } log_debug("Closing connection on fd %d, pair %d, remaining: %d", fd, pair_fd, monitor.active_connections); // Remove from epoll before closing epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); // Clean up SSL if (conn->ssl) { SSL_shutdown(conn->ssl); SSL_free(conn->ssl); conn->ssl = NULL; } // Close socket close(fd); // Free buffers buffer_free(&conn->read_buf); buffer_free(&conn->write_buf); // Mark as unused memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_UNUSED; conn->fd = -1; // Close the paired connection if (pair_fd != -1) { close_connection(pair_fd); } } void connect_to_upstream(connection_t *client, const char *data, size_t data_len) { if (!client || !data) return; route_config_t *route = find_route(client->request.host); if (!route) { send_error_response(client, 502, "Bad Gateway", "No route configured for this host"); return; } int up_fd = socket(AF_INET, SOCK_STREAM, 0); if (up_fd < 0) { send_error_response(client, 502, "Bad Gateway", "Failed to create upstream socket"); return; } if (up_fd >= MAX_FDS) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Connection limit exceeded"); return; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(route->upstream_port); // Resolve hostname if (inet_pton(AF_INET, route->upstream_host, &addr.sin_addr) <= 0) { struct hostent *he = gethostbyname(route->upstream_host); if (!he) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Cannot resolve upstream hostname"); return; } memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length); } set_non_blocking(up_fd); set_tcp_keepalive(up_fd); int connect_result = connect(up_fd, (struct sockaddr*)&addr, sizeof(addr)); if (connect_result < 0 && errno != EINPROGRESS) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Failed to connect to upstream"); return; } add_to_epoll(up_fd, EPOLLIN | EPOLLOUT); connection_t *up = &connections[up_fd]; memset(up, 0, sizeof(connection_t)); up->type = CONN_TYPE_UPSTREAM; up->fd = up_fd; up->last_activity = time(NULL); client->pair = up; up->pair = client; up->vhost_stats = client->vhost_stats; if (buffer_init(&up->read_buf, CHUNK_SIZE) < 0 || buffer_init(&up->write_buf, CHUNK_SIZE) < 0) { close_connection(client->fd); return; } // Prepare data to send char *data_to_send = (char*)data; size_t len_to_send = data_len; char *modified_request = NULL; if (route->rewrite_host) { char new_host_header[512]; const char *old_host_header_start = NULL; const char *old_host_header_end = NULL; // Find the old host header const char *current = data; const char *end = data + data_len; while(current < end) { const char* line_end = memchr(current, '\n', end - current); if (!line_end) break; if (strncasecmp(current, "Host:", 5) == 0) { old_host_header_start = current; old_host_header_end = line_end + 1; break; } current = line_end + 1; } if (old_host_header_start) { if (route->upstream_port == 80 || route->upstream_port == 443) { snprintf(new_host_header, sizeof(new_host_header), "Host: %s\r\n", route->upstream_host); } else { snprintf(new_host_header, sizeof(new_host_header), "Host: %s:%d\r\n", route->upstream_host, route->upstream_port); } size_t new_host_len = strlen(new_host_header); size_t old_host_len = old_host_header_end - old_host_header_start; len_to_send = data_len - old_host_len + new_host_len; modified_request = malloc(len_to_send + 1); if (modified_request) { char* p = modified_request; // Copy part before Host header size_t prefix_len = old_host_header_start - data; memcpy(p, data, prefix_len); p += prefix_len; // Copy new Host header memcpy(p, new_host_header, new_host_len); p += new_host_len; // Copy part after Host header size_t suffix_len = data_len - (old_host_header_end - data); memcpy(p, old_host_header_end, suffix_len); data_to_send = modified_request; } } } if (buffer_ensure_capacity(&up->write_buf, len_to_send) == 0) { memcpy(up->write_buf.data, data_to_send, len_to_send); up->write_buf.tail = len_to_send; } if (modified_request) { free(modified_request); } if (route->use_ssl) { up->ssl = SSL_new(ssl_ctx); if (!up->ssl) { close_connection(client->fd); return; } SSL_set_tlsext_host_name(up->ssl, route->upstream_host); // SNI SSL_set_fd(up->ssl, up_fd); SSL_set_connect_state(up->ssl); } up->state = CLIENT_STATE_FORWARDING; log_debug("Connecting to upstream %s:%d on fd %d (SSL: %s)", route->upstream_host, route->upstream_port, up_fd, route->use_ssl ? "yes" : "no"); } static int do_read(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->read_buf; buffer_compact(buf); size_t available = buffer_available_write(buf); if (available == 0) { // Try to grow the buffer if (buffer_ensure_capacity(buf, buf->capacity * 2) < 0) { return -1; } available = buffer_available_write(buf); } int bytes_read; if (conn->ssl && conn->ssl_handshake_done) { bytes_read = SSL_read(conn->ssl, buf->data + buf->tail, available); if (bytes_read <= 0) { int ssl_error = SSL_get_error(conn->ssl, bytes_read); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return 0; } return bytes_read; } } else { bytes_read = read(conn->fd, buf->data + buf->tail, available); } if (bytes_read > 0) { buf->tail += bytes_read; conn->last_activity = time(NULL); if (conn->vhost_stats) { monitor_record_bytes(conn->vhost_stats, 0, bytes_read); } } return bytes_read; } static int do_write(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->write_buf; size_t available = buffer_available_read(buf); if (available == 0) return 0; int written; if (conn->ssl && conn->ssl_handshake_done) { written = SSL_write(conn->ssl, buf->data + buf->head, available); if (written <= 0) { int ssl_error = SSL_get_error(conn->ssl, written); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return 0; } return written; } } else { written = write(conn->fd, buf->data + buf->head, available); } if (written > 0) { buffer_consume(buf, written); conn->last_activity = time(NULL); if (conn->vhost_stats) { monitor_record_bytes(conn->vhost_stats, written, 0); } } return written; } void handle_client_read(connection_t *conn) { int bytes_read = do_read(conn); if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { log_debug("[ROUTING] Closing connection fd=%d due to read error", conn->fd); close_connection(conn->fd); return; } buffer_t *buf = &conn->read_buf; // This function can be re-entered for keep-alive connections. // Ensure state is correct if a connection was forwarding but lost its pair. if (conn->state == CLIENT_STATE_FORWARDING && conn->pair == NULL) { conn->state = CLIENT_STATE_READING_HEADERS; } // Do not attempt to parse new requests if the connection is already actively forwarding. if (conn->state == CLIENT_STATE_FORWARDING) { return; } // Process all complete HTTP requests currently in the read buffer (handles pipelining). while (buffer_available_read(buf) > 0) { char *data_start = buf->data + buf->head; size_t data_len = buffer_available_read(buf); // --- MODIFICATION START: Read until \r\n\r\n before parsing --- // Step 1: Find the end-of-headers marker ("\r\n\r\n"). char *headers_end = memmem(data_start, data_len, "\r\n\r\n", 4); // If the marker is not found, the full headers have not been received yet. if (!headers_end) { if (data_len >= MAX_HEADER_SIZE) { send_error_response(conn, 413, "Request Header Too Large", "Header is too large."); return; } // Wait for more data to arrive from the client. log_debug("fd %d: Incomplete headers, waiting for more data.", conn->fd); break; } // Step 2: The complete header block is now in the buffer. Proceed to parsing. size_t headers_len = (headers_end - data_start) + 4; int parse_result = parse_http_request(data_start, headers_len, &conn->request); if (parse_result == 0) { send_error_response(conn, 400, "Bad Request", "Malformed HTTP request."); return; } // This case should be rare, but indicates the parser needs more data than just the headers. if (parse_result < 0) { break; } // Step 3: Check if the entire request body (if any) has been received. long long body_len = (conn->request.content_length > 0) ? conn->request.content_length : 0; size_t total_request_len = headers_len + body_len; if (data_len < total_request_len) { // Body is not fully received yet, wait for more data. log_debug("fd %d: Incomplete body, waiting for more data.", conn->fd); break; } // --- MODIFICATION END: A full request is now ready for processing --- // Start timing the request struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9; // Check for internal routes like /dashboard or /api/stats if (strcmp(conn->request.method, "GET") == 0 && (strncmp(conn->request.uri, "/dashboard", 10) == 0 || strncmp(conn->request.uri, "/api/stats", 10) == 0)) { log_info("[ROUTING-INTERNAL] Serving internal route %s for fd=%d", conn->request.uri, conn->fd); // If there was a previous upstream connection, close it. if (conn->pair) { close_connection(conn->pair->fd); conn->pair = NULL; } conn->state = CLIENT_STATE_SERVING_INTERNAL; if (strncmp(conn->request.uri, "/dashboard", 10) == 0) { serve_dashboard(conn); } else { serve_stats_api(conn); } buffer_consume(buf, total_request_len); // Consume the processed request if (!conn->request.keep_alive) { conn->state = CLIENT_STATE_CLOSING; return; // Exit function, connection will be closed on write complete. } conn->state = CLIENT_STATE_READING_HEADERS; // Ready for next keep-alive request. continue; // Continue loop to process next pipelined request. } // If not an internal route, forward the request to the upstream server. log_info("[ROUTING-FORWARD] Forwarding request for fd=%d: %s %s", conn->fd, conn->request.method, conn->request.uri); conn->vhost_stats = monitor_get_or_create_vhost_stats(conn->request.host); monitor_record_request_start(conn->vhost_stats, conn->request.is_websocket); conn->state = CLIENT_STATE_FORWARDING; connect_to_upstream(conn, data_start, total_request_len); buffer_consume(buf, total_request_len); // Consume the forwarded request // After starting to forward, stop processing further pipelined requests from this client // until the current forwarding is complete. The state is now CLIENT_STATE_FORWARDING. return; } } static void handle_forwarding(connection_t *conn) { connection_t *pair = conn->pair; if (!pair || pair->fd == -1) { if (conn->type == CONN_TYPE_CLIENT) { log_info("[ROUTING-ORPHAN] Client fd=%d lost upstream, resetting to READING_HEADERS", conn->fd); conn->state = CLIENT_STATE_READING_HEADERS; conn->pair = NULL; if (buffer_available_read(&conn->read_buf) > 0) { handle_client_read(conn); } } else { close_connection(conn->fd); } return; } int bytes_read = do_read(conn); if (bytes_read <= 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) { close_connection(conn->fd); return; } // --- START: THE CHECKPOINT FIX FOR RACE CONDITION --- if (bytes_read > 0 && conn->type == CONN_TYPE_CLIENT) { // Before forwarding, we MUST check if the data is a new pipelined request. char *data_start = conn->read_buf.data + conn->read_buf.head; size_t data_len = buffer_available_read(&conn->read_buf); // A simple but highly effective check for the start of a new HTTP request. if (data_len > 4 && ( strncmp(data_start, "GET ", 4) == 0 || strncmp(data_start, "POST ", 5) == 0 || strncmp(data_start, "PUT ", 4) == 0 || strncmp(data_start, "DELETE ", 7) == 0 || strncmp(data_start, "HEAD ", 5) == 0 || strncmp(data_start, "OPTIONS ", 8) == 0 )) { log_info("[STATE-FIX] Checkpoint: Pipelined request detected from client fd=%d. Halting forwarding.", conn->fd); // This is a new request. We must stop forwarding immediately. // 1. Close the connection for the PREVIOUS request. close_connection(pair->fd); // 2. Reset the client's state to parsing mode. conn->state = CLIENT_STATE_READING_HEADERS; conn->pair = NULL; // The link to the old upstream is now severed. // 3. Immediately process the new request that's already in the buffer. handle_client_read(conn); return; // Stop execution in this function. } } // --- END: THE CHECKPOINT FIX --- // If the checkpoint is passed, the data is part of the current request body // (or is data from the upstream), so we forward it as intended. size_t data_to_forward = buffer_available_read(&conn->read_buf); if (data_to_forward > 0) { if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.tail + data_to_forward) < 0) { log_error("Failed to grow write buffer for forwarding"); close_connection(conn->fd); return; } memcpy(pair->write_buf.data + pair->write_buf.tail, conn->read_buf.data + conn->read_buf.head, data_to_forward); pair->write_buf.tail += data_to_forward; buffer_consume(&conn->read_buf, data_to_forward); modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); } } static void handle_ssl_handshake(connection_t *conn) { if (!conn->ssl || conn->ssl_handshake_done) return; int ret = SSL_do_handshake(conn->ssl); if (ret == 1) { conn->ssl_handshake_done = 1; log_debug("SSL handshake completed for fd %d", conn->fd); // Process any pending data if (buffer_available_read(&conn->write_buf) > 0) { modify_epoll(conn->fd, EPOLLIN | EPOLLOUT); } } else { int ssl_error = SSL_get_error(conn->ssl, ret); if (ssl_error == SSL_ERROR_WANT_READ) { modify_epoll(conn->fd, EPOLLIN); } else if (ssl_error == SSL_ERROR_WANT_WRITE) { modify_epoll(conn->fd, EPOLLOUT); } else { log_debug("SSL handshake failed for fd %d: %d", conn->fd, ssl_error); if (conn->pair) { send_error_response(conn->pair, 502, "Bad Gateway", "SSL handshake failed"); } else { close_connection(conn->fd); } } } } static void handle_write_event(connection_t *conn) { conn->last_activity = time(NULL); // Handle upstream connection establishment and SSL handshake if (conn->type == CONN_TYPE_UPSTREAM && !conn->ssl_handshake_done) { if (!conn->ssl) { // Check if TCP connection is established int err = 0; socklen_t len = sizeof(err); if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &err, &len) != 0 || err != 0) { if (conn->pair) { send_error_response(conn->pair, 502, "Bad Gateway", strerror(err)); } return; } conn->ssl_handshake_done = 1; // No SSL, mark as done } else { handle_ssl_handshake(conn); if (!conn->ssl_handshake_done) { return; // Handshake still in progress } } } // Write any pending data int written = do_write(conn); // Update epoll events based on buffer state if (buffer_available_read(&conn->write_buf) == 0) { // Nothing more to write if (conn->state == CLIENT_STATE_ERROR || (conn->state == CLIENT_STATE_SERVING_INTERNAL && !conn->request.keep_alive)) { close_connection(conn->fd); } else if (conn->state == CLIENT_STATE_SERVING_INTERNAL && conn->request.keep_alive) { // Done serving internal request, ready for next request conn->state = CLIENT_STATE_READING_HEADERS; modify_epoll(conn->fd, EPOLLIN); } else { modify_epoll(conn->fd, EPOLLIN); } // Handle half-close if write side was shutdown if (conn->write_shutdown && conn->half_closed) { close_connection(conn->fd); } } else if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); } } void handle_connection_event(struct epoll_event *event) { int fd = event->data.fd; if (fd < 0 || fd >= MAX_FDS) return; connection_t *conn = &connections[fd]; if (conn->type == CONN_TYPE_UNUSED || conn->fd == -1) return; // Handle errors if (event->events & (EPOLLERR | EPOLLHUP)) { if (event->events & EPOLLERR) { log_debug("EPOLLERR on fd %d", fd); } close_connection(fd); return; } if (conn->type == CONN_TYPE_LISTENER) { if (event->events & EPOLLIN) { accept_new_connection(fd); } } else { // Handle SSL handshake for upstream connections if (conn->type == CONN_TYPE_UPSTREAM && conn->ssl && !conn->ssl_handshake_done) { handle_ssl_handshake(conn); if (!conn->ssl_handshake_done) { return; // Still waiting for handshake } } // Process events if (event->events & EPOLLOUT) { handle_write_event(conn); } // Check if connection still exists after write if (connections[fd].type != CONN_TYPE_UNUSED && (event->events & EPOLLIN)) { if (conn->type == CONN_TYPE_CLIENT && conn->state == CLIENT_STATE_READING_HEADERS) { handle_client_read(conn); } else if (conn->state == CLIENT_STATE_FORWARDING) { handle_forwarding(conn); } } } } // ================================================================================================= // // Main Implementation // // ================================================================================================= static void signal_handler(int sig) { if (sig == SIGINT || sig == SIGTERM) { log_info("Received signal %d, shutting down...", sig); g_shutdown = 1; } } void create_default_config_if_not_exists(const char* filename) { FILE *f = fopen(filename, "r"); if (f) { fclose(f); return; } f = fopen(filename, "w"); if (!f) { log_error("Cannot create default config file"); return; } fprintf(f, "{\n" " \"port\": 8080,\n" " \"reverse_proxy\": [\n" " {\n" " \"hostname\": \"localhost\",\n" " \"upstream_host\": \"127.0.0.1\",\n" " \"upstream_port\": 3000,\n" " \"use_ssl\": false,\n" " \"rewrite_host\": true\n" " },\n" " {\n" " \"hostname\": \"example.com\",\n" " \"upstream_host\": \"127.0.0.1\",\n" " \"upstream_port\": 5000,\n" " \"use_ssl\": false,\n" " \"rewrite_host\": false\n" " }\n" " ]\n" "}\n"); fclose(f); log_info("Created default config file: %s", filename); } void init_ssl() { SSL_load_error_strings(); OpenSSL_add_ssl_algorithms(); ssl_ctx = SSL_CTX_new(TLS_client_method()); if (!ssl_ctx) { log_error("Failed to create SSL context"); exit(EXIT_FAILURE); } SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_NONE, NULL); SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3); SSL_CTX_set_mode(ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); } void cleanup() { log_info("Cleaning up resources..."); for (int i = 0; i < MAX_FDS; i++) { if (connections[i].type != CONN_TYPE_UNUSED && connections[i].fd != -1) { close_connection(i); } } free_config(); monitor_cleanup(); if (epoll_fd >= 0) { close(epoll_fd); epoll_fd = -1; } if (ssl_ctx) { SSL_CTX_free(ssl_ctx); ssl_ctx = NULL; } EVP_cleanup(); log_info("Cleanup complete"); } void cleanup_idle_connections() { time_t current_time = time(NULL); for (int i = 0; i < MAX_FDS; i++) { connection_t *conn = &connections[i]; if (conn->type != CONN_TYPE_UNUSED && conn->type != CONN_TYPE_LISTENER && conn->fd != -1) { if (current_time - conn->last_activity > CONNECTION_TIMEOUT) { log_debug("Closing idle connection fd=%d", i); close_connection(i); } } } } void free_config() { if (config.routes) { free(config.routes); config.routes = NULL; } config.route_count = 0; } int main(int argc, char *argv[]) { // Setup signal handlers signal(SIGPIPE, SIG_IGN); signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); if (getenv("DEBUG")) { g_debug_mode = 1; log_info("Debug mode enabled"); } const char *config_file = (argc > 1) ? argv[1] : "proxy_config.json"; create_default_config_if_not_exists(config_file); if (!load_config(config_file)) { fprintf(stderr, "Failed to load configuration\n"); return 1; } init_ssl(); monitor_init("proxy_stats.db"); epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (epoll_fd == -1) { log_error("epoll_create1 failed"); return 1; } // Initialize all connections as unused for (int i = 0; i < MAX_FDS; i++) { connections[i].type = CONN_TYPE_UNUSED; connections[i].fd = -1; } setup_listener_socket(config.port); log_info("========================================="); log_info("Reverse proxy v3.0 running on port %d", config.port); log_info("Dashboard: http://localhost:%d/dashboard", config.port); log_info("API Stats: http://localhost:%d/api/stats", config.port); log_info("========================================="); atexit(cleanup); struct epoll_event events[MAX_EVENTS]; time_t last_monitor_update = 0; time_t last_cleanup = 0; while (!g_shutdown) { int n = epoll_wait(epoll_fd, events, MAX_EVENTS, 1000); if (n == -1) { if (errno == EINTR) continue; log_error("epoll_wait failed"); break; } for (int i = 0; i < n; i++) { handle_connection_event(&events[i]); } time_t current_time = time(NULL); // Update monitoring stats every second if (current_time > last_monitor_update) { monitor_update(); last_monitor_update = current_time; } // Cleanup idle connections every 60 seconds if (current_time - last_cleanup >= 60) { cleanup_idle_connections(); last_cleanup = current_time; } } log_info("Shutdown complete"); return 0; }