#include "connection.h" #include "buffer.h" #include "logging.h" #include "config.h" #include "monitor.h" #include "http.h" #include "ssl_handler.h" #include "dashboard.h" #include #include #include #include #include #include #include #include #include #include #include connection_t connections[MAX_FDS]; int epoll_fd = -1; void connection_init_all(void) { for (int i = 0; i < MAX_FDS; i++) { connections[i].type = CONN_TYPE_UNUSED; connections[i].fd = -1; } } void connection_set_non_blocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags >= 0) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } void connection_set_tcp_keepalive(int fd) { int yes = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); int idle = 60; setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)); int interval = 10; setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)); int maxpkt = 6; setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &maxpkt, sizeof(maxpkt)); } void connection_add_to_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { log_error("epoll_ctl_add failed"); close(fd); } } void connection_modify_epoll(int fd, uint32_t events) { struct epoll_event event = { .data.fd = fd, .events = events }; if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) == -1) { if(errno != EBADF && errno != ENOENT) { log_debug("epoll_ctl_mod failed for fd %d: %s", fd, strerror(errno)); } } } void connection_setup_listener(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { log_error("socket failed"); exit(EXIT_FAILURE); } int reuse = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { log_error("setsockopt SO_REUSEADDR failed"); close(fd); exit(EXIT_FAILURE); } struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr.s_addr = htonl(INADDR_ANY) }; if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { log_error("bind failed"); close(fd); exit(EXIT_FAILURE); } if (listen(fd, SOMAXCONN) == -1) { log_error("listen failed"); close(fd); exit(EXIT_FAILURE); } connection_set_non_blocking(fd); connection_add_to_epoll(fd, EPOLLIN); connections[fd].type = CONN_TYPE_LISTENER; connections[fd].fd = fd; log_info("Listening on port %d (fd=%d)", port, fd); } void connection_accept(int listener_fd) { while (1) { struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int client_fd = accept(listener_fd, (struct sockaddr*)&client_addr, &client_len); if (client_fd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { log_error("accept failed"); } break; } if (client_fd >= MAX_FDS) { log_error("Connection fd too high, closing"); close(client_fd); continue; } connection_set_non_blocking(client_fd); connection_set_tcp_keepalive(client_fd); connection_add_to_epoll(client_fd, EPOLLIN); connection_t *conn = &connections[client_fd]; memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_CLIENT; conn->state = CLIENT_STATE_READING_HEADERS; conn->fd = client_fd; conn->last_activity = time(NULL); if (buffer_init(&conn->read_buf, CHUNK_SIZE) < 0 || buffer_init(&conn->write_buf, CHUNK_SIZE) < 0) { connection_close(client_fd); continue; } __sync_fetch_and_add(&monitor.active_connections, 1); log_debug("New connection on fd %d from %s, total: %d", client_fd, inet_ntoa(client_addr.sin_addr), monitor.active_connections); } } void connection_close(int fd) { if (fd < 0 || fd >= MAX_FDS) return; connection_t *conn = &connections[fd]; if (conn->type == CONN_TYPE_UNUSED || conn->fd == -1) return; connection_t *pair = conn->pair; if (pair) { if (conn->type == CONN_TYPE_UPSTREAM && pair->type == CONN_TYPE_CLIENT) { log_debug("Upstream fd %d is closing. Resetting client fd %d to READING_HEADERS.", fd, pair->fd); pair->state = CLIENT_STATE_READING_HEADERS; pair->pair = NULL; } else if (conn->type == CONN_TYPE_CLIENT && pair->type == CONN_TYPE_UPSTREAM) { log_debug("Client fd %d is closing. Closing orphaned upstream pair fd %d.", fd, pair->fd); pair->pair = NULL; connection_close(pair->fd); } conn->pair = NULL; } if (conn->vhost_stats && conn->request_start_time > 0) { monitor_record_request_end(conn->vhost_stats, conn->request_start_time); conn->request_start_time = 0; } if (conn->type == CONN_TYPE_CLIENT) { __sync_fetch_and_sub(&monitor.active_connections, 1); } log_debug("Closing and cleaning up fd %d", fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); if (conn->ssl) { SSL_shutdown(conn->ssl); SSL_free(conn->ssl); } close(fd); buffer_free(&conn->read_buf); buffer_free(&conn->write_buf); memset(conn, 0, sizeof(connection_t)); conn->type = CONN_TYPE_UNUSED; conn->fd = -1; } int connection_do_read(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->read_buf; buffer_compact(buf); size_t available = buffer_available_write(buf); if (available == 0) { if (buffer_ensure_capacity(buf, buf->capacity * 2) < 0) { return -1; } available = buffer_available_write(buf); } int bytes_read; if (conn->ssl && conn->ssl_handshake_done) { bytes_read = SSL_read(conn->ssl, buf->data + buf->tail, available); if (bytes_read <= 0) { int ssl_error = SSL_get_error(conn->ssl, bytes_read); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return -1; } if (bytes_read == 0) { return 0; } return -1; } } else { bytes_read = read(conn->fd, buf->data + buf->tail, available); } if (bytes_read > 0) { buf->tail += bytes_read; conn->last_activity = time(NULL); if (conn->vhost_stats) { monitor_record_bytes(conn->vhost_stats, 0, bytes_read); } } return bytes_read; } int connection_do_write(connection_t *conn) { if (!conn) return -1; buffer_t *buf = &conn->write_buf; size_t available = buffer_available_read(buf); if (available == 0) return 0; int written; if (conn->ssl && conn->ssl_handshake_done) { written = SSL_write(conn->ssl, buf->data + buf->head, available); if (written <= 0) { int ssl_error = SSL_get_error(conn->ssl, written); if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE) { errno = EAGAIN; return 0; } return written; } } else { written = write(conn->fd, buf->data + buf->head, available); } if (written > 0) { buffer_consume(buf, written); conn->last_activity = time(NULL); if (conn->vhost_stats) { monitor_record_bytes(conn->vhost_stats, written, 0); } } return written; } void connection_send_error_response(connection_t *conn, int code, const char* status, const char* body) { if (!conn || !status || !body) return; char response[2048]; int len = snprintf(response, sizeof(response), "HTTP/1.1 %d %s\r\n" "Content-Type: text/plain; charset=utf-8\r\n" "Content-Length: %zu\r\n" "Connection: close\r\n" "Server: ReverseProxy/4.0\r\n" "\r\n" "%s", code, status, strlen(body), body); if (len > 0 && (size_t)len < sizeof(response)) { if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + len) == 0) { memcpy(conn->write_buf.data + conn->write_buf.tail, response, len); conn->write_buf.tail += len; struct epoll_event event = { .data.fd = conn->fd, .events = EPOLLIN | EPOLLOUT }; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); } } conn->state = CLIENT_STATE_ERROR; conn->request.keep_alive = 0; } void connection_connect_to_upstream(connection_t *client, const char *data, size_t data_len) { if (!client || !data) return; route_config_t *route = config_find_route(client->request.host); if (!route) { connection_send_error_response(client, 502, "Bad Gateway", "No route configured for this host"); return; } int up_fd = socket(AF_INET, SOCK_STREAM, 0); if (up_fd < 0) { connection_send_error_response(client, 502, "Bad Gateway", "Failed to create upstream socket"); return; } if (up_fd >= MAX_FDS) { close(up_fd); connection_send_error_response(client, 502, "Bad Gateway", "Connection limit exceeded"); return; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(route->upstream_port); if (inet_pton(AF_INET, route->upstream_host, &addr.sin_addr) <= 0) { struct hostent *he = gethostbyname(route->upstream_host); if (!he) { close(up_fd); connection_send_error_response(client, 502, "Bad Gateway", "Cannot resolve upstream hostname"); return; } memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length); } connection_set_non_blocking(up_fd); connection_set_tcp_keepalive(up_fd); int connect_result = connect(up_fd, (struct sockaddr*)&addr, sizeof(addr)); if (connect_result < 0 && errno != EINPROGRESS) { close(up_fd); connection_send_error_response(client, 502, "Bad Gateway", "Failed to connect to upstream"); return; } connection_add_to_epoll(up_fd, EPOLLIN | EPOLLOUT); connection_t *up = &connections[up_fd]; memset(up, 0, sizeof(connection_t)); up->type = CONN_TYPE_UPSTREAM; up->fd = up_fd; up->last_activity = time(NULL); client->pair = up; up->pair = client; up->vhost_stats = client->vhost_stats; if (buffer_init(&up->read_buf, CHUNK_SIZE) < 0 || buffer_init(&up->write_buf, CHUNK_SIZE) < 0) { connection_close(client->fd); return; } char *data_to_send = (char*)data; size_t len_to_send = data_len; char *modified_request = NULL; if (route->rewrite_host) { char new_host_header[512]; const char *old_host_header_start = NULL; const char *old_host_header_end = NULL; const char *current = data; const char *end = data + data_len; while(current < end) { const char* line_end = memchr(current, '\n', end - current); if (!line_end) break; if (strncasecmp(current, "Host:", 5) == 0) { old_host_header_start = current; old_host_header_end = line_end + 1; break; } current = line_end + 1; } if (old_host_header_start) { int is_default_port = (route->use_ssl && route->upstream_port == 443) || (!route->use_ssl && route->upstream_port == 80); if (is_default_port) { snprintf(new_host_header, sizeof(new_host_header), "Host: %s\r\n", route->upstream_host); } else { snprintf(new_host_header, sizeof(new_host_header), "Host: %s:%d\r\n", route->upstream_host, route->upstream_port); } size_t new_host_len = strlen(new_host_header); size_t old_host_len = old_host_header_end - old_host_header_start; len_to_send = data_len - old_host_len + new_host_len; modified_request = malloc(len_to_send + 1); if (modified_request) { char* p = modified_request; size_t prefix_len = old_host_header_start - data; memcpy(p, data, prefix_len); p += prefix_len; memcpy(p, new_host_header, new_host_len); p += new_host_len; size_t suffix_len = data_len - (old_host_header_end - data); memcpy(p, old_host_header_end, suffix_len); data_to_send = modified_request; } } } if (buffer_ensure_capacity(&up->write_buf, len_to_send) == 0) { memcpy(up->write_buf.data, data_to_send, len_to_send); up->write_buf.tail = len_to_send; } if (modified_request) { free(modified_request); } if (route->use_ssl) { up->ssl = SSL_new(ssl_ctx); if (!up->ssl) { connection_close(client->fd); return; } const char *sni_hostname = route->rewritehost ? client->request.host : route->upstreamhost; SSL_set_tlsext_hostname(up->ssl, sni_hostname); SSL_set_fd(up->ssl, up_fd); SSL_set_connect_state(up->ssl); up->ssl_handshake_done = 0; log_debug("Setting SNI to: %s for upstream %s:%d", sni_hostname, route->upstream_host, route->upstream_port); } up->state = CLIENT_STATE_FORWARDING; log_debug("Connecting to upstream %s:%d on fd %d (SSL: %s)", route->upstream_host, route->upstream_port, up_fd, route->use_ssl ? "yes" : "no"); } static void handle_client_read(connection_t *conn) { int bytes_read = connection_do_read(conn); if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { log_debug("[ROUTING] Closing connection fd=%d due to read error", conn->fd); connection_close(conn->fd); return; } buffer_t *buf = &conn->read_buf; if (conn->state == CLIENT_STATE_FORWARDING && conn->pair == NULL) { conn->state = CLIENT_STATE_READING_HEADERS; } if (conn->state == CLIENT_STATE_FORWARDING && conn->pair != NULL) { char *data_start = buf->data + buf->head; size_t data_len = buffer_available_read(buf); if (data_len >= 1) { int looks_like_new_request = http_is_request_start(data_start, data_len); if (!looks_like_new_request) { return; } log_debug("Pipelined request detected on fd %d, closing upstream fd %d", conn->fd, conn->pair->fd); connection_close(conn->pair->fd); conn->pair = NULL; conn->state = CLIENT_STATE_READING_HEADERS; } else { return; } } while (buffer_available_read(buf) > 0 && conn->state == CLIENT_STATE_READING_HEADERS) { char *data_start = buf->data + buf->head; size_t data_len = buffer_available_read(buf); char *headers_end = memmem(data_start, data_len, "\r\n\r\n", 4); if (!headers_end) { if (data_len >= MAX_HEADER_SIZE) { connection_send_error_response(conn, 413, "Request Header Too Large", "Header is too large."); return; } log_debug("fd %d: Incomplete headers, waiting for more data.", conn->fd); break; } size_t headers_len = (headers_end - data_start) + 4; int parse_result = http_parse_request(data_start, headers_len, &conn->request); if (parse_result == 0) { connection_send_error_response(conn, 400, "Bad Request", "Malformed HTTP request."); return; } if (parse_result < 0) { break; } long long body_len = (conn->request.content_length > 0) ? conn->request.content_length : 0; size_t total_request_len = headers_len + body_len; if (!conn->request.is_chunked && data_len < total_request_len) { log_debug("fd %d: Incomplete body, waiting for more data.", conn->fd); break; } size_t len_to_forward = (conn->request.is_chunked) ? headers_len : total_request_len; struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9; #define DASHBOARD_PATH "/rproxy/dashboard" #define STATS_PATH "/rproxy/api/stats" if (strncmp(conn->request.uri, DASHBOARD_PATH, sizeof(DASHBOARD_PATH) - 1) == 0 || strncmp(conn->request.uri, STATS_PATH, sizeof(STATS_PATH) - 1) == 0) { log_info("[ROUTING-INTERNAL] Serving internal route %s for fd=%d", conn->request.uri, conn->fd); if (conn->pair) { connection_close(conn->pair->fd); conn->pair = NULL; } conn->state = CLIENT_STATE_SERVING_INTERNAL; if (strncmp(conn->request.uri, DASHBOARD_PATH, sizeof(DASHBOARD_PATH) - 1) == 0) { dashboard_serve(conn); } else { dashboard_serve_stats_api(conn); } buffer_consume(buf, total_request_len); if (!conn->request.keep_alive) { conn->state = CLIENT_STATE_CLOSING; } return; } #undef DASHBOARD_PATH #undef STATS_PATH log_info("[ROUTING-FORWARD] Forwarding request for fd=%d: %s %s", conn->fd, conn->request.method, conn->request.uri); conn->vhost_stats = monitor_get_or_create_vhost_stats(conn->request.host); monitor_record_request_start(conn->vhost_stats, conn->request.is_websocket); conn->state = CLIENT_STATE_FORWARDING; connection_connect_to_upstream(conn, data_start, len_to_forward); buffer_consume(buf, len_to_forward); return; } } static void handle_forwarding(connection_t *conn) { connection_t *pair = conn->pair; if (!pair || pair->fd == -1) { if (conn->type == CONN_TYPE_CLIENT) { log_info("ROUTING-ORPHAN: Client fd=%d lost upstream, resetting to READING_HEADERS", conn->fd); conn->state = CLIENT_STATE_READING_HEADERS; conn->pair = NULL; if (buffer_available_read(&conn->read_buf) > 0) { handle_client_read(conn); } } else { connection_close(conn->fd); } return; } int bytes_read = connection_do_read(conn); if (bytes_read == 0) { log_debug("EOF on fd %d, performing half-close on pair fd %d", conn->fd, pair->fd); conn->half_closed = 1; connection_modify_epoll(conn->fd, buffer_available_read(&conn->write_buf) ? EPOLLOUT : 0); if (pair->fd != -1 && !pair->write_shutdown) { if (shutdown(pair->fd, SHUT_WR) == -1 && errno != ENOTCONN) { log_debug("shutdown(SHUT_WR) failed for fd %d: %s", pair->fd, strerror(errno)); } pair->write_shutdown = 1; } if (pair->half_closed) { connection_close(conn->fd); } return; } if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { connection_close(conn->fd); return; } size_t data_to_forward = buffer_available_read(&conn->read_buf); if (data_to_forward > 0) { if (buffer_available_read(&pair->write_buf) > CHUNK_SIZE / 2) { connection_do_write(pair); } size_t space_needed = pair->write_buf.tail + data_to_forward; if (buffer_ensure_capacity(&pair->write_buf, space_needed) < 0) { log_debug("Failed to buffer data for fd %d, closing connection", conn->fd); connection_close(conn->fd); return; } memcpy(pair->write_buf.data + pair->write_buf.tail, conn->read_buf.data + conn->read_buf.head, data_to_forward); pair->write_buf.tail += data_to_forward; buffer_consume(&conn->read_buf, data_to_forward); connection_do_write(pair); connection_modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); } } static void handle_ssl_handshake(connection_t *conn) { if (!conn->ssl || conn->ssl_handshake_done) return; int ret = SSL_do_handshake(conn->ssl); if (ret == 1) { conn->ssl_handshake_done = 1; log_debug("SSL handshake completed for fd %d", conn->fd); if (conn->type == CONN_TYPE_UPSTREAM && conn->pair) { connection_t *client = conn->pair; if (buffer_available_read(&client->read_buf) > 0) { size_t data_len = buffer_available_read(&client->read_buf); if (buffer_ensure_capacity(&conn->write_buf, conn->write_buf.tail + data_len) == 0) { memcpy(conn->write_buf.data + conn->write_buf.tail, client->read_buf.data + client->read_buf.head, data_len); conn->write_buf.tail += data_len; buffer_consume(&client->read_buf, data_len); log_debug("Forwarding %zu bytes of buffered request data after SSL handshake", data_len); } } } if (buffer_available_read(&conn->write_buf) > 0) { connection_modify_epoll(conn->fd, EPOLLIN | EPOLLOUT); } else { connection_modify_epoll(conn->fd, EPOLLIN); } } else { int ssl_error = SSL_get_error(conn->ssl, ret); if (ssl_error == SSL_ERROR_WANT_READ) { connection_modify_epoll(conn->fd, EPOLLIN); } else if (ssl_error == SSL_ERROR_WANT_WRITE) { connection_modify_epoll(conn->fd, EPOLLOUT); } else { log_debug("SSL handshake failed for fd %d: %d", conn->fd, ssl_error); if (conn->pair) { connection_send_error_response(conn->pair, 502, "Bad Gateway", "SSL handshake failed"); } else { connection_close(conn->fd); } } } } static void handle_write_event(connection_t *conn) { conn->last_activity = time(NULL); if (conn->type == CONN_TYPE_UPSTREAM && conn->ssl && !conn->ssl_handshake_done) { handle_ssl_handshake(conn); if (!conn->ssl_handshake_done) { return; } } int written = connection_do_write(conn); if (buffer_available_read(&conn->write_buf) == 0) { if (conn->write_shutdown) { if (conn->half_closed) { connection_close(conn->fd); return; } connection_modify_epoll(conn->fd, EPOLLIN); } else { if (conn->state == CLIENT_STATE_ERROR || (conn->state == CLIENT_STATE_SERVING_INTERNAL && !conn->request.keep_alive)) { connection_close(conn->fd); } else if (conn->state == CLIENT_STATE_SERVING_INTERNAL && conn->request.keep_alive) { memset(&conn->request, 0, sizeof(http_request_t)); conn->request.keep_alive = 1; conn->state = CLIENT_STATE_READING_HEADERS; connection_modify_epoll(conn->fd, EPOLLIN); } else { connection_modify_epoll(conn->fd, EPOLLIN); } } } else if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { connection_close(conn->fd); } } void connection_handle_event(struct epoll_event *event) { int fd = event->data.fd; if (fd < 0 || fd >= MAX_FDS) return; connection_t *conn = &connections[fd]; if (conn->type == CONN_TYPE_UNUSED || conn->fd == -1) return; if (event->events & (EPOLLERR | EPOLLHUP)) { if (event->events & EPOLLERR) { log_debug("EPOLLERR on fd %d", fd); } connection_close(fd); return; } if (conn->type == CONN_TYPE_LISTENER) { if (event->events & EPOLLIN) { connection_accept(fd); } } else { int handshake_just_completed = 0; if (conn->type == CONN_TYPE_UPSTREAM && conn->ssl && !conn->ssl_handshake_done) { handle_ssl_handshake(conn); if (!conn->ssl_handshake_done) { return; } handshake_just_completed = 1; } if (handshake_just_completed && buffer_available_read(&conn->write_buf) > 0) { connection_do_write(conn); } if (event->events & EPOLLOUT) { handle_write_event(conn); } if (connections[fd].type != CONN_TYPE_UNUSED && (event->events & EPOLLIN)) { if (conn->type == CONN_TYPE_CLIENT && (conn->state == CLIENT_STATE_READING_HEADERS || conn->state == CLIENT_STATE_SERVING_INTERNAL)) { handle_client_read(conn); } else if (conn->state == CLIENT_STATE_FORWARDING) { handle_forwarding(conn); } } } } void connection_cleanup_idle(void) { time_t current_time = time(NULL); for (int i = 0; i < MAX_FDS; i++) { connection_t *conn = &connections[i]; if (conn->type != CONN_TYPE_UNUSED && conn->type != CONN_TYPE_LISTENER && conn->fd != -1) { if (current_time - conn->last_activity > CONNECTION_TIMEOUT) { log_debug("Closing idle connection fd=%d", i); connection_close(i); } } } }