/* * ===================================================================================== * * Filename: proxy.c * * Description: High-performance Reverse Proxy with Real-time Monitoring Dashboard. * Single-file C implementation using epoll - PRODUCTION READY VERSION * * Version: 1.5 (Compiler Warnings Fixed) * Created: 2024 * Compiler: gcc * * Author: Fixed Version * * To Compile: * gcc -Wall -Wextra -O2 -g -pthread cJSON.c proxy.c -o rproxy -lssl -lcrypto -lsqlite3 -lm * * ===================================================================================== */ // --- Standard Library Includes --- #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // --- Local Includes --- #include "cJSON.h" // --- Constants --- #define MAX_EVENTS 1024 #define MAX_FDS 65536 #define CHUNK_SIZE 65536 #define HISTORY_SECONDS 300 #define MAX_HEADER_SIZE 8192 #define MAX_REQUEST_LINE_SIZE 4096 #define MAX_URI_SIZE 2048 // --- Enums and Structs --- typedef enum { CONN_TYPE_UNUSED, CONN_TYPE_LISTENER, CONN_TYPE_CLIENT, CONN_TYPE_UPSTREAM } conn_type_t; typedef enum { CLIENT_STATE_READING_REQUEST_LINE, // Initial state, reading headers CLIENT_STATE_READING_HEADERS, // Legacy, handled by above CLIENT_STATE_READING_BODY, // New state for handling request bodies CLIENT_STATE_CONNECTING_UPSTREAM, CLIENT_STATE_TUNNELING, // Full-duplex forwarding state CLIENT_STATE_DONE, CLIENT_STATE_ERROR } client_state_t; struct connection_s; struct vhost_stats_s; typedef struct { char hostname[256]; char upstream_host[256]; int upstream_port; int use_ssl; int rewrite_host; } route_config_t; typedef struct { int port; route_config_t *routes; int route_count; } app_config_t; typedef struct { char *data; size_t size; size_t capacity; } buffer_t; typedef struct { char method[16]; char uri[MAX_URI_SIZE]; char version[16]; char host[256]; int content_length; int is_websocket; int keep_alive; int connection_upgrade; } http_request_t; typedef struct connection_s { conn_type_t type; client_state_t state; int fd; struct connection_s *pair; struct vhost_stats_s *vhost_stats; buffer_t read_buf; buffer_t write_buf; SSL *ssl; int ssl_handshake_done; http_request_t request; char *http_body_start; long http_content_length; long http_body_read; int is_websocket; int keep_alive; double request_start_time; time_t last_activity; } connection_t; typedef struct { double time; double value; } history_point_t; typedef struct { history_point_t *points; int capacity; int head; int count; } history_deque_t; typedef struct { double time; double rx_kbps; double tx_kbps; } network_history_point_t; typedef struct { network_history_point_t *points; int capacity; int head; int count; } network_history_deque_t; typedef struct request_time_s { double *times; int capacity; int head; int count; } request_time_deque_t; typedef struct vhost_stats_s { char vhost_name[256]; long long http_requests; long long websocket_requests; long long total_requests; long long bytes_sent; long long bytes_recv; double avg_request_time_ms; long long last_bytes_sent; long long last_bytes_recv; double last_update; history_deque_t throughput_history; request_time_deque_t request_times; struct vhost_stats_s *next; } vhost_stats_t; typedef struct { time_t start_time; int active_connections; history_deque_t cpu_history; history_deque_t memory_history; network_history_deque_t network_history; history_deque_t throughput_history; long long last_net_sent; long long last_net_recv; double last_net_update_time; vhost_stats_t *vhost_stats_head; sqlite3 *db; } system_monitor_t; // --- Global State --- connection_t connections[MAX_FDS]; app_config_t config; system_monitor_t monitor; int epoll_fd; SSL_CTX *ssl_ctx; static int g_debug_mode = 0; // --- Function Prototypes --- void log_error(const char *msg); void log_info(const char *format, ...); void log_debug(const char *format, ...); int load_config(const char *filename); void free_config(); route_config_t *find_route(const char *hostname); void setup_listener_socket(int port); void accept_new_connection(int listener_fd); void close_connection(int fd); void handle_connection_event(struct epoll_event *event); void connect_to_upstream(connection_t *client_conn, route_config_t *route); void handle_client_data(connection_t *conn); void handle_client_body(connection_t *conn); int parse_http_request_line(const char *line, http_request_t *request); int parse_http_headers(const char *headers, http_request_t *request); int find_header_value(const char *headers, const char *header_name, char *value, size_t value_size); void monitor_init(const char *db_file); void monitor_update(); void monitor_cleanup(); void monitor_record_request_start(vhost_stats_t *stats, int is_websocket); void monitor_record_request_end(vhost_stats_t *stats, double start_time); vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name); void serve_dashboard(connection_t *conn); void serve_stats_api(connection_t *conn); void send_error_response(connection_t *conn, int code, const char* status, const char* body); // --- Buffer Helpers --- static inline int buffer_init(buffer_t *buf, size_t capacity) { buf->data = malloc(capacity); if (!buf->data) { log_error("Failed to allocate buffer"); return -1; } buf->capacity = capacity; buf->size = 0; return 0; } static inline void buffer_free(buffer_t *buf) { if (buf->data) { free(buf->data); buf->data = NULL; } buf->capacity = 0; buf->size = 0; } static inline int buffer_ensure_capacity(buffer_t *buf, size_t required) { if (buf->capacity >= required) return 0; size_t new_capacity = buf->capacity; while (new_capacity < required) { new_capacity *= 2; } char *new_data = realloc(buf->data, new_capacity); if (!new_data) { log_error("Failed to reallocate buffer"); return -1; } buf->data = new_data; buf->capacity = new_capacity; return 0; } // --- String Utilities --- static char* safe_strdup(const char *str, size_t max_len) { if (!str) return NULL; size_t len = strnlen(str, max_len); char *dup = malloc(len + 1); if (!dup) return NULL; memcpy(dup, str, len); dup[len] = '\0'; return dup; } static void trim_whitespace(char *str) { if (!str) return; // Trim leading whitespace char *start = str; while (isspace((unsigned char)*start)) start++; if (start != str) { memmove(str, start, strlen(start) + 1); } // Trim trailing whitespace char *end = str + strlen(str) - 1; while (end >= str && isspace((unsigned char)*end)) { *end = '\0'; end--; } } static int is_valid_http_method(const char *method) { const char *valid_methods[] = {"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH", "TRACE", "CONNECT", NULL}; for (int i = 0; valid_methods[i]; i++) { if (strcmp(method, valid_methods[i]) == 0) return 1; } return 0; } static int is_valid_http_version(const char *version) { return (strcmp(version, "HTTP/1.0") == 0 || strcmp(version, "HTTP/1.1") == 0); } // ================================================================================================= // // Logging Implementation // // ================================================================================================= void log_error(const char *msg) { perror(msg); } void log_message(const char *level, const char *format, va_list args) { time_t now; time(&now); char buf[32]; strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", localtime(&now)); printf("%s - %-5s - ", buf, level); vprintf(format, args); printf("\n"); fflush(stdout); } void log_info(const char *format, ...) { va_list args; va_start(args, format); log_message("INFO", format, args); va_end(args); } void log_debug(const char *format, ...) { if (!g_debug_mode) return; va_list args; va_start(args, format); log_message("DEBUG", format, args); va_end(args); } // ================================================================================================= // // Configuration Implementation // // ================================================================================================= static char* read_file_to_string(const char *filename) { FILE *f = fopen(filename, "rb"); if (!f) return NULL; fseek(f, 0, SEEK_END); long length = ftell(f); if (length < 0 || length > 1024*1024) { // Max 1MB config fclose(f); return NULL; } fseek(f, 0, SEEK_SET); char *buffer = malloc(length + 1); if (buffer) { size_t read_len = fread(buffer, 1, length, f); buffer[read_len] = '\0'; } fclose(f); return buffer; } int load_config(const char *filename) { log_info("Loading configuration from %s", filename); char *json_string = read_file_to_string(filename); if (!json_string) { log_error("Could not read config file"); return 0; } cJSON *root = cJSON_Parse(json_string); free(json_string); if (!root) { fprintf(stderr, "JSON parse error: %s\n", cJSON_GetErrorPtr()); return 0; } cJSON *port_item = cJSON_GetObjectItem(root, "port"); config.port = cJSON_IsNumber(port_item) ? port_item->valueint : 8080; if (config.port < 1 || config.port > 65535) { fprintf(stderr, "Invalid port number: %d\n", config.port); cJSON_Delete(root); return 0; } cJSON *proxy_array = cJSON_GetObjectItem(root, "reverse_proxy"); if (cJSON_IsArray(proxy_array)) { config.route_count = cJSON_GetArraySize(proxy_array); if (config.route_count <= 0) { cJSON_Delete(root); return 0; } config.routes = malloc(config.route_count * sizeof(route_config_t)); if (!config.routes) { log_error("Failed to allocate memory for routes"); cJSON_Delete(root); return 0; } int i = 0; cJSON *route_item; cJSON_ArrayForEach(route_item, proxy_array) { route_config_t *route = &config.routes[i]; memset(route, 0, sizeof(route_config_t)); cJSON *hostname = cJSON_GetObjectItem(route_item, "hostname"); cJSON *upstream_host = cJSON_GetObjectItem(route_item, "upstream_host"); cJSON *upstream_port = cJSON_GetObjectItem(route_item, "upstream_port"); if (!cJSON_IsString(hostname) || !cJSON_IsString(upstream_host) || !cJSON_IsNumber(upstream_port)) { fprintf(stderr, "Invalid route configuration at index %d\n", i); continue; } strncpy(route->hostname, hostname->valuestring, sizeof(route->hostname) - 1); strncpy(route->upstream_host, upstream_host->valuestring, sizeof(route->upstream_host) - 1); route->upstream_port = upstream_port->valueint; if (route->upstream_port < 1 || route->upstream_port > 65535) { fprintf(stderr, "Invalid upstream port for %s: %d\n", route->hostname, route->upstream_port); continue; } route->use_ssl = cJSON_IsTrue(cJSON_GetObjectItem(route_item, "use_ssl")); route->rewrite_host = cJSON_IsTrue(cJSON_GetObjectItem(route_item, "rewrite_host")); log_info("Route configured: %s -> %s:%d (SSL: %s, Rewrite Host: %s)", route->hostname, route->upstream_host, route->upstream_port, route->use_ssl ? "yes" : "no", route->rewrite_host ? "yes" : "no"); i++; } } cJSON_Delete(root); log_info("Loaded %d routes from %s", config.route_count, filename); return 1; } void free_config() { if (config.routes) { free(config.routes); config.routes = NULL; } config.route_count = 0; } route_config_t *find_route(const char *hostname) { if (!hostname) return NULL; for (int i = 0; i < config.route_count; i++) { if (strcasecmp(hostname, config.routes[i].hostname) == 0) { return &config.routes[i]; } } return NULL; } // ================================================================================================= // // Monitor Implementation // // ================================================================================================= static void history_deque_init(history_deque_t *dq, int capacity) { dq->points = malloc(sizeof(history_point_t) * capacity); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void history_deque_push(history_deque_t *dq, double time, double value) { dq->points[dq->head] = (history_point_t){ .time = time, .value = value }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void network_history_deque_init(network_history_deque_t *dq, int capacity) { dq->points = malloc(sizeof(network_history_point_t) * capacity); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void network_history_deque_push(network_history_deque_t *dq, double time, double rx, double tx) { dq->points[dq->head] = (network_history_point_t){ .time = time, .rx_kbps = rx, .tx_kbps = tx }; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void request_time_deque_init(request_time_deque_t *dq, int capacity) { dq->times = malloc(sizeof(double) * capacity); dq->capacity = capacity; dq->head = 0; dq->count = 0; } static void request_time_deque_push(request_time_deque_t *dq, double time_ms) { dq->times[dq->head] = time_ms; dq->head = (dq->head + 1) % dq->capacity; if (dq->count < dq->capacity) dq->count++; } static void init_db() { if (!monitor.db) return; char *err_msg = 0; const char *sql_create_table = "CREATE TABLE IF NOT EXISTS vhost_stats (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " vhost TEXT NOT NULL," " timestamp REAL NOT NULL," " http_requests INTEGER DEFAULT 0," " websocket_requests INTEGER DEFAULT 0," " total_requests INTEGER DEFAULT 0," " bytes_sent INTEGER DEFAULT 0," " bytes_recv INTEGER DEFAULT 0," " avg_request_time_ms REAL DEFAULT 0," " UNIQUE(vhost, timestamp)" ");"; const char *sql_create_index = "CREATE INDEX IF NOT EXISTS idx_vhost_timestamp ON vhost_stats(vhost, timestamp);"; if (sqlite3_exec(monitor.db, sql_create_table, 0, 0, &err_msg) != SQLITE_OK || sqlite3_exec(monitor.db, sql_create_index, 0, 0, &err_msg) != SQLITE_OK) { fprintf(stderr, "SQL error: %s\n", err_msg); sqlite3_free(err_msg); } } static void load_stats_from_db() { if (!monitor.db) return; sqlite3_stmt *res; const char *sql = "SELECT vhost, http_requests, websocket_requests, total_requests, " "bytes_sent, bytes_recv, avg_request_time_ms " "FROM vhost_stats v1 WHERE timestamp = (" " SELECT MAX(timestamp) FROM vhost_stats v2 WHERE v2.vhost = v1.vhost" ")"; if (sqlite3_prepare_v2(monitor.db, sql, -1, &res, 0) != SQLITE_OK) { fprintf(stderr, "Failed to execute statement: %s\n", sqlite3_errmsg(monitor.db)); return; } int vhost_count = 0; while (sqlite3_step(res) == SQLITE_ROW) { vhost_stats_t *stats = monitor_get_or_create_vhost_stats((const char*)sqlite3_column_text(res, 0)); if (stats) { stats->http_requests = sqlite3_column_int64(res, 1); stats->websocket_requests = sqlite3_column_int64(res, 2); stats->total_requests = sqlite3_column_int64(res, 3); stats->bytes_sent = sqlite3_column_int64(res, 4); stats->bytes_recv = sqlite3_column_int64(res, 5); stats->avg_request_time_ms = sqlite3_column_double(res, 6); vhost_count++; } } sqlite3_finalize(res); log_info("Loaded statistics for %d vhosts from database", vhost_count); } void monitor_init(const char *db_file) { memset(&monitor, 0, sizeof(system_monitor_t)); monitor.start_time = time(NULL); history_deque_init(&monitor.cpu_history, HISTORY_SECONDS); history_deque_init(&monitor.memory_history, HISTORY_SECONDS); network_history_deque_init(&monitor.network_history, HISTORY_SECONDS); history_deque_init(&monitor.throughput_history, HISTORY_SECONDS); if (sqlite3_open(db_file, &monitor.db) != SQLITE_OK) { fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(monitor.db)); if (monitor.db) { sqlite3_close(monitor.db); monitor.db = NULL; } } else { init_db(); load_stats_from_db(); } monitor_update(); } void monitor_cleanup() { if (monitor.db) { sqlite3_close(monitor.db); monitor.db = NULL; } vhost_stats_t *current = monitor.vhost_stats_head; while (current) { vhost_stats_t *next = current->next; if (current->throughput_history.points) free(current->throughput_history.points); if (current->request_times.times) free(current->request_times.times); free(current); current = next; } monitor.vhost_stats_head = NULL; if (monitor.cpu_history.points) free(monitor.cpu_history.points); if (monitor.memory_history.points) free(monitor.memory_history.points); if (monitor.network_history.points) free(monitor.network_history.points); if (monitor.throughput_history.points) free(monitor.throughput_history.points); } static void save_stats_to_db() { if (!monitor.db) return; sqlite3_stmt *stmt; const char *sql = "INSERT OR REPLACE INTO vhost_stats " "(vhost, timestamp, http_requests, websocket_requests, total_requests, " "bytes_sent, bytes_recv, avg_request_time_ms) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?);"; if (sqlite3_prepare_v2(monitor.db, sql, -1, &stmt, NULL) != SQLITE_OK) return; double current_time = (double)time(NULL); for (vhost_stats_t *s = monitor.vhost_stats_head; s != NULL; s = s->next) { if (s->request_times.count > 0) { double total_time = 0; for(int i = 0; i < s->request_times.count; i++) { total_time += s->request_times.times[i]; } s->avg_request_time_ms = total_time / s->request_times.count; } sqlite3_bind_text(stmt, 1, s->vhost_name, -1, SQLITE_STATIC); sqlite3_bind_double(stmt, 2, current_time); sqlite3_bind_int64(stmt, 3, s->http_requests); sqlite3_bind_int64(stmt, 4, s->websocket_requests); sqlite3_bind_int64(stmt, 5, s->total_requests); sqlite3_bind_int64(stmt, 6, s->bytes_sent); sqlite3_bind_int64(stmt, 7, s->bytes_recv); sqlite3_bind_double(stmt, 8, s->avg_request_time_ms); sqlite3_step(stmt); sqlite3_reset(stmt); } sqlite3_finalize(stmt); } static double get_cpu_usage() { static long long prev_user = 0, prev_nice = 0, prev_system = 0, prev_idle = 0; long long user, nice, system, idle, iowait, irq, softirq; FILE *f = fopen("/proc/stat", "r"); if (!f) return 0.0; if (fscanf(f, "cpu %lld %lld %lld %lld %lld %lld %lld", &user, &nice, &system, &idle, &iowait, &irq, &softirq) != 7) { fclose(f); return 0.0; } fclose(f); long long prev_total = prev_user + prev_nice + prev_system + prev_idle; long long total = user + nice + system + idle; long long totald = total - prev_total; long long idled = idle - prev_idle; prev_user = user; prev_nice = nice; prev_system = system; prev_idle = idle; return totald == 0 ? 0.0 : (double)(totald - idled) * 100.0 / totald; } static void get_memory_usage(double *used_gb) { struct sysinfo info; if (sysinfo(&info) != 0) { *used_gb = 0; return; } *used_gb = (double)(info.totalram - info.freeram) * info.mem_unit / (1024.0 * 1024.0 * 1024.0); } static void get_network_stats(long long *bytes_sent, long long *bytes_recv) { FILE *f = fopen("/proc/net/dev", "r"); if (!f) { *bytes_sent = 0; *bytes_recv = 0; return; } char line[256]; if (!fgets(line, sizeof(line), f) || !fgets(line, sizeof(line), f)) { // Skip headers fclose(f); *bytes_sent = 0; *bytes_recv = 0; return; } long long total_recv = 0, total_sent = 0; while (fgets(line, sizeof(line), f)) { char iface[32]; long long r, t; if (sscanf(line, "%31[^:]: %lld %*d %*d %*d %*d %*d %*d %*d %lld", iface, &r, &t) == 3) { if (strncmp(iface, " lo", 3) != 0) { total_recv += r; total_sent += t; } } } fclose(f); *bytes_sent = total_sent; *bytes_recv = total_recv; } void monitor_update() { double current_time = time(NULL); history_deque_push(&monitor.cpu_history, current_time, get_cpu_usage()); double mem_used_gb; get_memory_usage(&mem_used_gb); history_deque_push(&monitor.memory_history, current_time, mem_used_gb); long long net_sent, net_recv; get_network_stats(&net_sent, &net_recv); double time_delta = current_time - monitor.last_net_update_time; if (time_delta > 0 && monitor.last_net_update_time > 0) { double rx = (net_recv - monitor.last_net_recv) / time_delta / 1024.0; double tx = (net_sent - monitor.last_net_sent) / time_delta / 1024.0; network_history_deque_push(&monitor.network_history, current_time, fmax(0, rx), fmax(0, tx)); history_deque_push(&monitor.throughput_history, current_time, fmax(0, rx + tx)); } monitor.last_net_sent = net_sent; monitor.last_net_recv = net_recv; monitor.last_net_update_time = current_time; for (vhost_stats_t *s = monitor.vhost_stats_head; s != NULL; s = s->next) { double vhost_delta = current_time - s->last_update; if (vhost_delta >= 1.0) { double kbps = 0; if (s->last_update > 0) { long long bytes_diff = (s->bytes_sent - s->last_bytes_sent) + (s->bytes_recv - s->last_bytes_recv); kbps = bytes_diff / vhost_delta / 1024.0; } history_deque_push(&s->throughput_history, current_time, fmax(0, kbps)); s->last_bytes_sent = s->bytes_sent; s->last_bytes_recv = s->bytes_recv; s->last_update = current_time; } } static time_t last_db_save = 0; if (current_time - last_db_save >= 10) { save_stats_to_db(); last_db_save = current_time; } } vhost_stats_t* monitor_get_or_create_vhost_stats(const char *vhost_name) { if (!vhost_name || strlen(vhost_name) == 0) return NULL; for (vhost_stats_t *curr = monitor.vhost_stats_head; curr; curr = curr->next) { if (strcmp(curr->vhost_name, vhost_name) == 0) return curr; } vhost_stats_t *new_stats = calloc(1, sizeof(vhost_stats_t)); if (!new_stats) return NULL; strncpy(new_stats->vhost_name, vhost_name, sizeof(new_stats->vhost_name) - 1); new_stats->last_update = time(NULL); history_deque_init(&new_stats->throughput_history, 60); request_time_deque_init(&new_stats->request_times, 100); new_stats->next = monitor.vhost_stats_head; monitor.vhost_stats_head = new_stats; return new_stats; } void monitor_record_request_start(vhost_stats_t *stats, int is_websocket) { if (!stats) return; if (is_websocket) { stats->websocket_requests++; } else { stats->http_requests++; } stats->total_requests++; } void monitor_record_request_end(vhost_stats_t *stats, double start_time) { if (!stats) return; struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &end_time); double duration_ms = ((end_time.tv_sec + end_time.tv_nsec / 1e9) - start_time) * 1000.0; if (duration_ms >= 0) { request_time_deque_push(&stats->request_times, duration_ms); } } // ================================================================================================= // // Dashboard Implementation // // ================================================================================================= const char *DASHBOARD_HTML = "\n" "\n" "\n" " Reverse Proxy Monitor\n" " \n" " \n" "\n" "\n" "
\n" "
\n" "
0
\n" "
Connections
\n" "
\n" "
\n" "
0
\n" "
Memory
\n" "
\n" "
\n" "
0
\n" "
CPU %
\n" "
\n" "
\n" "\n" "
\n" "
CPU Usage
\n" " \n" "
\n" "\n" "
\n" "
Memory Usage
\n" " \n" "
\n" "\n" "
\n" "
Network I/O
\n" "
\n" "
RX
\n" "
TX
\n" "
\n" " \n" "
\n" "\n" "
\n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" "
Virtual HostHTTP ReqWS ReqTotal ReqAvg Resp (ms)SentReceived
\n" "
\n" "\n" " \n" "\n" "\n" ; void serve_dashboard(connection_t *conn) { if (!conn) return; char header[512]; int len = snprintf(header, sizeof(header), "HTTP/1.1 200 OK\r\n" "Content-Type: text/html; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: keep-alive\r\n" "Cache-Control: no-cache\r\n" "\r\n", strlen(DASHBOARD_HTML)); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.size + len + strlen(DASHBOARD_HTML)) < 0) { send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } memcpy(conn->write_buf.data + conn->write_buf.size, header, len); conn->write_buf.size += len; memcpy(conn->write_buf.data + conn->write_buf.size, DASHBOARD_HTML, strlen(DASHBOARD_HTML)); conn->write_buf.size += strlen(DASHBOARD_HTML); struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); conn->read_buf.size = 0; conn->keep_alive = 1; } static cJSON* format_history(history_deque_t *dq, int window_seconds) { cJSON *arr = cJSON_CreateArray(); if (!arr || !dq || !dq->points || dq->count == 0) return arr; double current_time = time(NULL); int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; for (int i = 0; i < dq->count; ++i) { int current_index = (start_index + i) % dq->capacity; history_point_t *p = &dq->points[current_index]; if ((current_time - p->time) <= window_seconds) { cJSON *pt = cJSON_CreateObject(); if (pt) { cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); cJSON_AddNumberToObject(pt, "y", p->value); cJSON_AddItemToArray(arr, pt); } } } return arr; } static cJSON* format_network_history(network_history_deque_t *dq, int window_seconds, const char *key) { cJSON *arr = cJSON_CreateArray(); if (!arr || !dq || !dq->points || !key || dq->count == 0) return arr; double current_time = time(NULL); int start_index = (dq->head - dq->count + dq->capacity) % dq->capacity; for (int i = 0; i < dq->count; ++i) { int current_index = (start_index + i) % dq->capacity; network_history_point_t *p = &dq->points[current_index]; if ((current_time - p->time) <= window_seconds) { cJSON *pt = cJSON_CreateObject(); if (pt) { cJSON_AddNumberToObject(pt, "x", (long)((p->time - current_time) * 1000)); cJSON_AddNumberToObject(pt, "y", strcmp(key, "rx_kbps") == 0 ? p->rx_kbps : p->tx_kbps); cJSON_AddItemToArray(arr, pt); } } } return arr; } void serve_stats_api(connection_t *conn) { if (!conn) return; cJSON *root = cJSON_CreateObject(); if (!root) { send_error_response(conn, 500, "Internal Server Error", "JSON creation failed"); return; } cJSON *current = cJSON_CreateObject(); if (!current) { cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "JSON creation failed"); return; } cJSON_AddItemToObject(root, "current", current); char buffer[64]; double last_cpu = 0, last_mem = 0; if (monitor.cpu_history.count > 0) { int last_idx = (monitor.cpu_history.head - 1 + monitor.cpu_history.capacity) % monitor.cpu_history.capacity; last_cpu = monitor.cpu_history.points[last_idx].value; } if (monitor.memory_history.count > 0) { int last_idx = (monitor.memory_history.head - 1 + monitor.memory_history.capacity) % monitor.memory_history.capacity; last_mem = monitor.memory_history.points[last_idx].value; } snprintf(buffer, sizeof(buffer), "%.2f", last_cpu); cJSON_AddStringToObject(current, "cpu_percent", buffer); snprintf(buffer, sizeof(buffer), "%.2f", last_mem); cJSON_AddStringToObject(current, "memory_gb", buffer); cJSON_AddNumberToObject(current, "active_connections", monitor.active_connections); cJSON_AddItemToObject(root, "cpu_history", format_history(&monitor.cpu_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "memory_history", format_history(&monitor.memory_history, HISTORY_SECONDS)); cJSON_AddItemToObject(root, "network_rx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "rx_kbps")); cJSON_AddItemToObject(root, "network_tx_history", format_network_history(&monitor.network_history, HISTORY_SECONDS, "tx_kbps")); cJSON_AddItemToObject(root, "throughput_history", format_history(&monitor.throughput_history, HISTORY_SECONDS)); cJSON *processes = cJSON_CreateArray(); if (processes) { cJSON_AddItemToObject(root, "processes", processes); for (vhost_stats_t *s = monitor.vhost_stats_head; s; s = s->next) { cJSON *p = cJSON_CreateObject(); if (p) { cJSON_AddStringToObject(p, "name", s->vhost_name); cJSON_AddNumberToObject(p, "http_requests", s->http_requests); cJSON_AddNumberToObject(p, "websocket_requests", s->websocket_requests); cJSON_AddNumberToObject(p, "total_requests", s->total_requests); cJSON_AddNumberToObject(p, "avg_request_time_ms", s->avg_request_time_ms); cJSON_AddNumberToObject(p, "bytes_sent", s->bytes_sent); cJSON_AddNumberToObject(p, "bytes_recv", s->bytes_recv); cJSON_AddItemToObject(p, "throughput_history", format_history(&s->throughput_history, 60)); cJSON_AddItemToArray(processes, p); } } } char *json_string = cJSON_PrintUnformatted(root); if (!json_string) { cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "JSON serialization failed"); return; } char header[512]; int hlen = snprintf(header, sizeof(header), "HTTP/1.1 200 OK\r\n" "Content-Type: application/json; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: keep-alive\r\n" "Cache-Control: no-cache\r\n" "\r\n", strlen(json_string)); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.size + hlen + strlen(json_string)) < 0) { free(json_string); cJSON_Delete(root); send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } memcpy(conn->write_buf.data + conn->write_buf.size, header, hlen); conn->write_buf.size += hlen; memcpy(conn->write_buf.data + conn->write_buf.size, json_string, strlen(json_string)); conn->write_buf.size += strlen(json_string); struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); cJSON_Delete(root); free(json_string); conn->read_buf.size = 0; conn->keep_alive = 1; } // ================================================================================================= // // HTTP Implementation // // ================================================================================================= int find_header_value(const char *headers, const char *header_name, char *value, size_t value_size) { if (!headers || !header_name || !value || value_size == 0) return 0; size_t header_name_len = strlen(header_name); const char *line_start = headers; while (*line_start) { const char *line_end = strstr(line_start, "\r\n"); if (!line_end) { line_end = line_start + strlen(line_start); } size_t line_len = line_end - line_start; if (line_len > header_name_len + 1) { if (strncasecmp(line_start, header_name, header_name_len) == 0 && line_start[header_name_len] == ':') { const char *value_start = line_start + header_name_len + 1; while (value_start < line_end && isspace((unsigned char)*value_start)) { value_start++; } size_t value_len = line_end - value_start; if (value_len >= value_size) { value_len = value_size - 1; } memcpy(value, value_start, value_len); value[value_len] = '\0'; trim_whitespace(value); return 1; } } if (*line_end == '\r') { line_start = line_end + 2; } else if (*line_end == '\0') { break; } else { line_start = line_end + 1; } } return 0; } int parse_http_request_line(const char *line, http_request_t *request) { if (!line || !request) return 0; memset(request, 0, sizeof(http_request_t)); char *line_copy = safe_strdup(line, MAX_REQUEST_LINE_SIZE); if (!line_copy) return 0; trim_whitespace(line_copy); char *method = strtok(line_copy, " "); char *uri = strtok(NULL, " "); char *version = strtok(NULL, " "); if (!method || !uri || !version) { free(line_copy); return 0; } if (!is_valid_http_method(method) || !is_valid_http_version(version)) { free(line_copy); return 0; } if (strlen(uri) >= sizeof(request->uri)) { free(line_copy); return 0; } strncpy(request->method, method, sizeof(request->method) - 1); request->method[sizeof(request->method) - 1] = '\0'; // Fix: Ensure null termination strncpy(request->uri, uri, sizeof(request->uri) - 1); request->uri[sizeof(request->uri) - 1] = '\0'; // Fix: Ensure null termination strncpy(request->version, version, sizeof(request->version) - 1); request->version[sizeof(request->version) - 1] = '\0'; // Fix: Ensure null termination free(line_copy); return 1; } int parse_http_headers(const char *headers, http_request_t *request) { if (!headers || !request) return 0; char value[256]; // Parse Host header if (find_header_value(headers, "Host", value, sizeof(value))) { char *colon = strchr(value, ':'); if (colon) { *colon = '\0'; // Remove port from hostname } strncpy(request->host, value, sizeof(request->host) - 1); request->host[sizeof(request->host) - 1] = '\0'; // Fix: [-Wstringop-truncation] Ensure null termination } // Parse Content-Length request->content_length = -1; // Default to unknown if (find_header_value(headers, "Content-Length", value, sizeof(value))) { request->content_length = atoi(value); if (request->content_length < 0) { request->content_length = 0; } } // Parse Connection header if (find_header_value(headers, "Connection", value, sizeof(value))) { if (strcasecmp(value, "keep-alive") == 0) { request->keep_alive = 1; } else if (strcasecmp(value, "upgrade") == 0) { request->connection_upgrade = 1; } } // Parse Upgrade header for WebSocket if (find_header_value(headers, "Upgrade", value, sizeof(value))) { if (strcasecmp(value, "websocket") == 0) { request->is_websocket = 1; } } return 1; } void send_error_response(connection_t *conn, int code, const char* status, const char* body) { if (!conn || !status || !body) return; char response[2048]; int len = snprintf(response, sizeof(response), "HTTP/1.1 %d %s\r\n" "Content-Type: text/plain; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: close\r\n" "Server: ReverseProxy/1.1\r\n" "\r\n" "%s", code, status, strlen(body), body); if (len > 0 && len < (int)sizeof(response)) { // Fix: [-Wsign-compare] Cast sizeof to int if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.size + len) == 0) { memcpy(conn->write_buf.data + conn->write_buf.size, response, len); conn->write_buf.size += len; struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); } } conn->keep_alive = 0; conn->state = CLIENT_STATE_ERROR; } void handle_client_data(connection_t *conn) { if (!conn) return; // Update last activity time conn->last_activity = time(NULL); int bytes_read = read(conn->fd, conn->read_buf.data + conn->read_buf.size, conn->read_buf.capacity - conn->read_buf.size - 1); if (bytes_read <= 0) { if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { return; } close_connection(conn->fd); return; } conn->read_buf.size += bytes_read; conn->read_buf.data[conn->read_buf.size] = '\0'; // Check for complete headers char *headers_end = strstr(conn->read_buf.data, "\r\n\r\n"); if (!headers_end) { // Check if we've exceeded max header size if (conn->read_buf.size >= MAX_HEADER_SIZE) { send_error_response(conn, 413, "Request Header Too Large", "Request headers too large"); } return; } // Mark request start time struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9; // Extract request line char *first_line_end = strstr(conn->read_buf.data, "\r\n"); if (!first_line_end) { send_error_response(conn, 400, "Bad Request", "Invalid request line"); return; } *first_line_end = '\0'; // Parse request line if (!parse_http_request_line(conn->read_buf.data, &conn->request)) { send_error_response(conn, 400, "Bad Request", "Invalid request line format"); *first_line_end = '\r'; // restore return; } *first_line_end = '\r'; // restore // Parse headers char *headers_start = first_line_end + 2; *headers_end = '\0'; if (!parse_http_headers(headers_start, &conn->request)) { send_error_response(conn, 400, "Bad Request", "Invalid headers"); *headers_end = '\r'; // restore return; } *headers_end = '\r'; // restore // Handle special dashboard/API routes if (strcmp(conn->request.method, "GET") == 0) { if (strncmp(conn->request.uri, "/dashboard", 10) == 0) { serve_dashboard(conn); return; } if (strncmp(conn->request.uri, "/api/stats", 10) == 0) { serve_stats_api(conn); return; } } // Validate Host header if (strlen(conn->request.host) == 0) { send_error_response(conn, 400, "Bad Request", "Host header is required"); return; } // Find route configuration route_config_t *route = find_route(conn->request.host); if (!route) { char body[512]; snprintf(body, sizeof(body), "Host not configured: %s", conn->request.host); send_error_response(conn, 404, "Not Found", body); return; } // Get or create vhost stats conn->vhost_stats = monitor_get_or_create_vhost_stats(route->hostname); conn->is_websocket = conn->request.is_websocket; conn->keep_alive = conn->request.keep_alive; // **FIX**: Store content length and how much body we've already read conn->http_content_length = conn->request.content_length; conn->http_body_read = conn->read_buf.size - ((headers_end + 4) - conn->read_buf.data); if (conn->http_content_length < 0) conn->http_content_length = 0; // Treat unknown length as 0 for this stage // Record request start monitor_record_request_start(conn->vhost_stats, conn->is_websocket); // Rewrite Host header if needed if (route->rewrite_host) { char new_host_header[320]; snprintf(new_host_header, sizeof(new_host_header), "Host: %s:%d\r\n", route->upstream_host, route->upstream_port); char host_search[] = "Host:"; char *host_start = strcasestr(conn->read_buf.data, host_search); if (host_start) { char *host_line_end = strstr(host_start, "\r\n"); if (host_line_end) { size_t old_len = (host_line_end + 2) - host_start; size_t new_len = strlen(new_host_header); size_t tail_len = conn->read_buf.size - (host_start - conn->read_buf.data) - old_len; if (new_len != old_len) { if (buffer_ensure_capacity(&conn->read_buf, conn->read_buf.size - old_len + new_len + 1) < 0) { send_error_response(conn, 500, "Internal Server Error", "Memory allocation failed"); return; } // Recalculate pointer after potential realloc host_start = strcasestr(conn->read_buf.data, host_search); memmove(host_start + new_len, host_start + old_len, tail_len); conn->read_buf.size = conn->read_buf.size - old_len + new_len; } memcpy(host_start, new_host_header, new_len); conn->read_buf.data[conn->read_buf.size] = '\0'; } } } // Transition to connecting upstream conn->state = CLIENT_STATE_CONNECTING_UPSTREAM; connect_to_upstream(conn, route); } // ================================================================================================= // // Proxy Implementation // // ================================================================================================= static void set_non_blocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags >= 0) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } static void add_to_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { log_error("epoll_ctl_add failed"); close(fd); } } static void modify_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) == -1) { close_connection(fd); } } void setup_listener_socket(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { log_error("socket failed"); exit(EXIT_FAILURE); } int reuse = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { log_error("setsockopt SO_REUSEADDR failed"); close(fd); exit(EXIT_FAILURE); } struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr.s_addr = htonl(INADDR_ANY) }; if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { log_error("bind failed"); close(fd); exit(EXIT_FAILURE); } if (listen(fd, SOMAXCONN) == -1) { log_error("listen failed"); close(fd); exit(EXIT_FAILURE); } set_non_blocking(fd); add_to_epoll(fd, EPOLLIN); connections[fd].type = CONN_TYPE_LISTENER; connections[fd].fd = fd; log_info("Listening on port %d (fd=%d)", port, fd); } void accept_new_connection(int listener_fd) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(listener_fd, (struct sockaddr*)&client_addr, &client_len); if (client_fd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { log_error("accept failed"); } return; } if (client_fd >= MAX_FDS) { log_error("Connection fd too high, closing"); close(client_fd); return; } set_non_blocking(client_fd); add_to_epoll(client_fd, EPOLLIN); connection_t *conn = &connections[client_fd]; memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_CLIENT; conn->state = CLIENT_STATE_READING_REQUEST_LINE; conn->fd = client_fd; conn->last_activity = time(NULL); if (buffer_init(&conn->read_buf, CHUNK_SIZE) < 0 || buffer_init(&conn->write_buf, CHUNK_SIZE) < 0) { close_connection(client_fd); return; } monitor.active_connections++; log_debug("New connection on fd %d from %s, total: %d", client_fd, inet_ntoa(client_addr.sin_addr), monitor.active_connections); } void close_connection(int fd) { if (fd < 0 || fd >= MAX_FDS || connections[fd].type == CONN_TYPE_UNUSED) return; connection_t *conn = &connections[fd]; // Close paired connection if exists if (conn->pair) { conn->pair->pair = NULL; close_connection(conn->pair->fd); } // Record request end if needed if (conn->vhost_stats && conn->request_start_time > 0) { monitor_record_request_end(conn->vhost_stats, conn->request_start_time); } // Remove from epoll epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); // Clean up SSL if (conn->ssl) { SSL_shutdown(conn->ssl); SSL_free(conn->ssl); conn->ssl = NULL; } // Close socket close(fd); // Free buffers buffer_free(&conn->read_buf); buffer_free(&conn->write_buf); // Update connection count if (conn->type == CONN_TYPE_CLIENT) { monitor.active_connections--; } log_debug("Closed connection on fd %d, remaining: %d", fd, monitor.active_connections); // Reset connection structure memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_UNUSED; } void connect_to_upstream(connection_t *client, route_config_t *route) { if (!client || !route) return; int up_fd = socket(AF_INET, SOCK_STREAM, 0); if (up_fd < 0) { send_error_response(client, 502, "Bad Gateway", "Failed to create upstream socket"); return; } if (up_fd >= MAX_FDS) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Connection limit exceeded"); return; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(route->upstream_port); // Try to resolve hostname if (inet_pton(AF_INET, route->upstream_host, &addr.sin_addr) <= 0) { struct hostent *he = gethostbyname(route->upstream_host); if (!he) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Cannot resolve upstream hostname"); return; } memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length); } set_non_blocking(up_fd); int connect_result = connect(up_fd, (struct sockaddr*)&addr, sizeof(addr)); if (connect_result < 0 && errno != EINPROGRESS) { close(up_fd); send_error_response(client, 502, "Bad Gateway", "Failed to connect to upstream"); return; } add_to_epoll(up_fd, EPOLLIN | EPOLLOUT); connection_t *up = &connections[up_fd]; memset(up, 0, sizeof(connection_t)); up->type = CONN_TYPE_UPSTREAM; up->fd = up_fd; up->last_activity = time(NULL); client->pair = up; up->pair = client; up->vhost_stats = client->vhost_stats; if (buffer_init(&up->read_buf, CHUNK_SIZE) < 0 || buffer_init(&up->write_buf, CHUNK_SIZE) < 0) { close_connection(up_fd); send_error_response(client, 502, "Bad Gateway", "Memory allocation failed"); return; } if (route->use_ssl) { up->ssl = SSL_new(ssl_ctx); if (!up->ssl) { close_connection(up_fd); send_error_response(client, 502, "Bad Gateway", "SSL initialization failed"); return; } SSL_set_fd(up->ssl, up_fd); SSL_set_connect_state(up->ssl); } log_debug("Connecting to upstream %s:%d on fd %d", route->upstream_host, route->upstream_port, up_fd); } static int do_read(connection_t *conn) { if (!conn) return -1; if (buffer_ensure_capacity(&conn->read_buf, conn->read_buf.size + CHUNK_SIZE) < 0) { return -1; } int bytes_read; if (conn->ssl && conn->ssl_handshake_done) { bytes_read = SSL_read(conn->ssl, conn->read_buf.data + conn->read_buf.size, CHUNK_SIZE); if (bytes_read <= 0) { int ssl_error = SSL_get_error(conn->ssl, bytes_read); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; } } } else { bytes_read = read(conn->fd, conn->read_buf.data + conn->read_buf.size, CHUNK_SIZE); } return bytes_read; } static int do_write(connection_t *conn) { if (!conn || conn->write_buf.size == 0) return 0; int written; if (conn->ssl && conn->ssl_handshake_done) { written = SSL_write(conn->ssl, conn->write_buf.data, conn->write_buf.size); if (written <= 0) { int ssl_error = SSL_get_error(conn->ssl, written); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return 0; } } } else { written = write(conn->fd, conn->write_buf.data, conn->write_buf.size); } if (written > 0) { memmove(conn->write_buf.data, conn->write_buf.data + written, conn->write_buf.size - written); conn->write_buf.size -= written; } return written; } void handle_client_body(connection_t *conn) { if (!conn || !conn->pair) { close_connection(conn->fd); return; } connection_t *upstream = conn->pair; conn->last_activity = time(NULL); long remaining_to_read_total = conn->http_content_length - conn->http_body_read; if (remaining_to_read_total <= 0) { conn->state = CLIENT_STATE_TUNNELING; return; } size_t can_read_now = conn->read_buf.capacity - conn->read_buf.size; if (can_read_now > (size_t)remaining_to_read_total) { // Fix: [-Wsign-compare] Cast to size_t can_read_now = remaining_to_read_total; } if(can_read_now == 0) return; int bytes = read(conn->fd, conn->read_buf.data, can_read_now); if (bytes > 0) { conn->http_body_read += bytes; if (buffer_ensure_capacity(&upstream->write_buf, upstream->write_buf.size + bytes) < 0) { close_connection(conn->fd); return; } memcpy(upstream->write_buf.data + upstream->write_buf.size, conn->read_buf.data, bytes); upstream->write_buf.size += bytes; modify_epoll(upstream->fd, EPOLLIN | EPOLLOUT); if (conn->http_body_read >= conn->http_content_length) { log_debug("Finished reading body for fd %d, transitioning to TUNNELING", conn->fd); conn->state = CLIENT_STATE_TUNNELING; } } else if (bytes == 0) { close_connection(conn->fd); } else if (errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); } } static void handle_tunneling(connection_t *conn) { if (!conn || !conn->pair) { close_connection(conn->fd); return; } conn->last_activity = time(NULL); int bytes = do_read(conn); if (bytes > 0) { conn->read_buf.size += bytes; connection_t *pair = conn->pair; if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.size + conn->read_buf.size) < 0) { close_connection(conn->fd); return; } memcpy(pair->write_buf.data + pair->write_buf.size, conn->read_buf.data, conn->read_buf.size); pair->write_buf.size += conn->read_buf.size; // Update stats if (conn->vhost_stats) { if (conn->type == CONN_TYPE_CLIENT) { conn->vhost_stats->bytes_recv += bytes; } else { conn->vhost_stats->bytes_sent += bytes; } } conn->read_buf.size = 0; modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); } else if (bytes == 0) { close_connection(conn->fd); } else if (errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); } } static void handle_write_event(connection_t *conn) { if (!conn) return; conn->last_activity = time(NULL); if (conn->type == CONN_TYPE_UPSTREAM && !conn->ssl_handshake_done) { // Check if connection is established int err = 0; socklen_t len = sizeof(err); if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &err, &len) != 0 || err != 0) { close_connection(conn->fd); return; } // Handle SSL handshake if needed if (conn->ssl) { int ret = SSL_do_handshake(conn->ssl); if (ret == 1) { conn->ssl_handshake_done = 1; } else { int ssl_error = SSL_get_error(conn->ssl, ret); if (ssl_error != SSL_ERROR_WANT_READ && ssl_error != SSL_ERROR_WANT_WRITE) { close_connection(conn->fd); return; } // Continue handshake later return; } } else { conn->ssl_handshake_done = 1; // No SSL, consider "handshake" done } // Connection established, forward client request if (conn->ssl_handshake_done && conn->pair) { connection_t *client = conn->pair; if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.size + client->read_buf.size) < 0) { close_connection(conn->fd); return; } memcpy(conn->write_buf.data + conn->write_buf.size, client->read_buf.data, client->read_buf.size); conn->write_buf.size += client->read_buf.size; client->read_buf.size = 0; // **FIX**: Transition to the correct state based on Content-Length if (client->http_body_read >= client->http_content_length) { client->state = CLIENT_STATE_TUNNELING; log_debug("Request has no body or was already read, tunneling fd %d", client->fd); } else { client->state = CLIENT_STATE_READING_BODY; log_debug("Request has body, reading body for fd %d", client->fd); } // Record WebSocket request end immediately after connection if (client->is_websocket && client->vhost_stats) { monitor_record_request_end(client->vhost_stats, client->request_start_time); } } } // Write data if available int written = do_write(conn); if (written > 0) { // Update stats if(conn->vhost_stats) { if(conn->type == CONN_TYPE_CLIENT) { conn->vhost_stats->bytes_sent += written; } else { conn->vhost_stats->bytes_recv += written; // This seems reversed, but from perspective of vhost, upstream sends to it. } } } if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { close_connection(conn->fd); return; } // Adjust epoll events if (conn->write_buf.size == 0) { modify_epoll(conn->fd, EPOLLIN); } } void handle_connection_event(struct epoll_event *event) { int fd = event->data.fd; if (event->events & (EPOLLERR | EPOLLHUP)) { close_connection(fd); return; } if (fd < 0 || fd >= MAX_FDS) { return; } connection_t *conn = &connections[fd]; if (conn->type == CONN_TYPE_LISTENER) { if (event->events & EPOLLIN) { accept_new_connection(fd); } } else { if (event->events & EPOLLOUT) { handle_write_event(conn); } if ((event->events & EPOLLIN) && conn->type != CONN_TYPE_UNUSED) { if (conn->type == CONN_TYPE_CLIENT) { // **FIX**: Dispatch based on the more detailed state machine switch (conn->state) { case CLIENT_STATE_READING_REQUEST_LINE: handle_client_data(conn); break; case CLIENT_STATE_READING_BODY: handle_client_body(conn); break; case CLIENT_STATE_TUNNELING: handle_tunneling(conn); break; default: // Do nothing for other states like CONNECTING, DONE, ERROR break; } } else { // It's an upstream connection handle_tunneling(conn); } } } } // ================================================================================================= // // Main Implementation // // ================================================================================================= void create_default_config_if_not_exists(const char* filename) { FILE *f = fopen(filename, "r"); if (f) { fclose(f); return; } f = fopen(filename, "w"); if (!f) { log_error("Cannot create default config file"); return; } fprintf(f, "{\n" " \"port\": 8080,\n" " \"reverse_proxy\": [\n" " {\n" " \"hostname\": \"localhost\",\n" " \"upstream_host\": \"127.0.0.1\",\n" " \"upstream_port\": 3000,\n" " \"use_ssl\": false,\n" " \"rewrite_host\": true\n" " },\n" " {\n" " \"hostname\": \"example.com\",\n" " \"upstream_host\": \"127.0.0.1\",\n" " \"upstream_port\": 5000,\n" " \"use_ssl\": false,\n" " \"rewrite_host\": false\n" " }\n" " ]\n" "}\n"); fclose(f); log_info("Created default config file: %s", filename); } void init_ssl() { SSL_load_error_strings(); OpenSSL_add_ssl_algorithms(); ssl_ctx = SSL_CTX_new(TLS_client_method()); if (!ssl_ctx) { log_error("Failed to create SSL context"); exit(EXIT_FAILURE); } // Set verification options SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_NONE, NULL); SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3); } void cleanup() { log_info("Shutting down proxy..."); // Close all connections for (int i = 0; i < MAX_FDS; i++) { if (connections[i].type != CONN_TYPE_UNUSED) { close_connection(i); } } free_config(); monitor_cleanup(); if (epoll_fd >= 0) { close(epoll_fd); } if (ssl_ctx) { SSL_CTX_free(ssl_ctx); } EVP_cleanup(); } void cleanup_idle_connections() { static time_t last_cleanup = 0; time_t current_time = time(NULL); // Run cleanup every 60 seconds if (current_time - last_cleanup < 60) { return; } last_cleanup = current_time; const time_t timeout = 300; // 5 minutes timeout for (int i = 0; i < MAX_FDS; i++) { connection_t *conn = &connections[i]; if (conn->type != CONN_TYPE_UNUSED && conn->type != CONN_TYPE_LISTENER) { if (current_time - conn->last_activity > timeout) { log_debug("Closing idle connection fd=%d", i); close_connection(i); } } } } int main(int argc, char *argv[]) { signal(SIGPIPE, SIG_IGN); if (getenv("DEBUG")) { g_debug_mode = 1; log_info("Debug mode enabled"); } const char *config_file = (argc > 1) ? argv[1] : "proxy_config.json"; create_default_config_if_not_exists(config_file); if (!load_config(config_file)) { fprintf(stderr, "Failed to load configuration\n"); return 1; } init_ssl(); monitor_init("proxy_stats.db"); epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (epoll_fd == -1) { log_error("epoll_create1 failed"); return 1; } // Initialize connections array for (int i = 0; i < MAX_FDS; i++) { connections[i].type = CONN_TYPE_UNUSED; } setup_listener_socket(config.port); log_info("Reverse proxy running on 0.0.0.0:%d", config.port); log_info("Dashboard available at http://localhost:%d/dashboard", config.port); atexit(cleanup); struct epoll_event events[MAX_EVENTS]; struct timespec last_monitor_update = {0, 0}; while (1) { int n = epoll_wait(epoll_fd, events, MAX_EVENTS, 1000); if (n == -1) { if (errno == EINTR) { continue; } log_error("epoll_wait failed"); break; } for (int i = 0; i < n; i++) { handle_connection_event(&events[i]); } struct timespec current_time; clock_gettime(CLOCK_MONOTONIC, ¤t_time); if (current_time.tv_sec > last_monitor_update.tv_sec) { monitor_update(); cleanup_idle_connections(); last_monitor_update = current_time; } } return 0; }