This commit is contained in:
retoor 2025-09-25 23:59:27 +02:00
parent a9a6abb531
commit 38cf154a78

242
rproxy.c
View File

@ -21,6 +21,7 @@
// --- Standard Library Includes --- // --- Standard Library Includes ---
#define _GNU_SOURCE #define _GNU_SOURCE
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -48,7 +49,7 @@
#include "cJSON.h" #include "cJSON.h"
// --- Constants --- // --- Constants ---
#define MAX_EVENTS 1024 #define MAX_EVENTS 4096
#define MAX_FDS 65536 #define MAX_FDS 65536
#define CHUNK_SIZE 65536 #define CHUNK_SIZE 65536
#define HISTORY_SECONDS 300 #define HISTORY_SECONDS 300
@ -108,6 +109,7 @@ typedef struct {
int is_websocket; int is_websocket;
int keep_alive; int keep_alive;
int connection_close; int connection_close;
bool is_chunked;
} http_request_t; } http_request_t;
typedef struct connection_s { typedef struct connection_s {
@ -1463,8 +1465,6 @@ static int find_header_value(const char* data, size_t len, const char* name, cha
return 0; return 0;
} }
// A robust, non-destructive HTTP request parser
int parse_http_request(const char *data, size_t len, http_request_t *req) { int parse_http_request(const char *data, size_t len, http_request_t *req) {
memset(req, 0, sizeof(http_request_t)); memset(req, 0, sizeof(http_request_t));
req->content_length = -1; req->content_length = -1;
@ -1527,6 +1527,15 @@ int parse_http_request(const char *data, size_t len, http_request_t *req) {
req->content_length = atol(value); req->content_length = atol(value);
} }
// --- START GIT CLIENT FIX ---
// Check for chunked encoding, which is used by Git for pushes/pulls.
if (find_header_value(headers_start, len - (headers_start - data), "Transfer-Encoding", value, sizeof(value))) {
if (strcasecmp(value, "chunked") == 0) {
req->is_chunked = 1;
}
}
// --- END GIT CLIENT FIX ---
if (find_header_value(headers_start, len - (headers_start - data), "Connection", value, sizeof(value))) { if (find_header_value(headers_start, len - (headers_start - data), "Connection", value, sizeof(value))) {
if (strcasecmp(value, "close") == 0) { if (strcasecmp(value, "close") == 0) {
req->keep_alive = 0; req->keep_alive = 0;
@ -1534,6 +1543,7 @@ int parse_http_request(const char *data, size_t len, http_request_t *req) {
} else if (strcasecmp(value, "keep-alive") == 0) { } else if (strcasecmp(value, "keep-alive") == 0) {
req->keep_alive = 1; req->keep_alive = 1;
} else if (strcasecmp(value, "upgrade") == 0) { } else if (strcasecmp(value, "upgrade") == 0) {
// Upgrade implies a streaming connection, similar to chunked.
req->is_websocket = 1; req->is_websocket = 1;
} }
} }
@ -1546,6 +1556,7 @@ int parse_http_request(const char *data, size_t len, http_request_t *req) {
} }
void send_error_response(connection_t *conn, int code, const char* status, const char* body) { void send_error_response(connection_t *conn, int code, const char* status, const char* body) {
if (!conn || !status || !body) return; if (!conn || !status || !body) return;
@ -1698,69 +1709,70 @@ void accept_new_connection(int listener_fd) {
} }
void close_connection(int fd) { void close_connection(int fd) {
if (fd < 0 || fd >= MAX_FDS) return; if (fd < 0 || fd >= MAX_FDS) return;
connection_t *conn = &connections[fd]; connection_t *conn = &connections[fd];
if (conn->type == CONN_TYPE_UNUSED) return; if (conn->type == CONN_TYPE_UNUSED || conn->fd == -1) return;
// Prevent double-closing connection_t *pair = conn->pair;
if (conn->fd == -1) return;
int pair_fd = -1; // --- START: PERFECTIONIZED STATE RESET LOGIC ---
if (conn->pair && conn->pair->fd != -1) { if (pair) {
pair_fd = conn->pair->fd; // If the connection being closed is an upstream...
conn->pair->pair = NULL; if (conn->type == CONN_TYPE_UPSTREAM && pair->type == CONN_TYPE_CLIENT) {
log_debug("Upstream fd %d is closing. Resetting client fd %d to READING_HEADERS.", fd, pair->fd);
// ...reset its client pair to the initial state, ready for a new request.
pair->state = CLIENT_STATE_READING_HEADERS;
pair->pair = NULL; // Unlink from the connection we are about to close.
// IMPORTANT: If the client already sent a pipelined request (like /dashboard),
// its data will be waiting in the buffer. Process it immediately.
if (buffer_available_read(&pair->read_buf) > 0) {
handle_client_read(pair);
}
} else if (conn->type == CONN_TYPE_CLIENT && pair->type == CONN_TYPE_UPSTREAM) {
// If a client closes, its upstream pair is now an orphan and should also be closed.
log_debug("Client fd %d is closing. Closing orphaned upstream pair fd %d.", fd, pair->fd);
// Unlink first to prevent recursion before closing the orphan.
pair->pair = NULL;
close_connection(pair->fd);
}
conn->pair = NULL;
} }
// --- END: PERFECTIONIZED STATE RESET LOGIC ---
// Record request end if needed
if (conn->vhost_stats && conn->request_start_time > 0) { if (conn->vhost_stats && conn->request_start_time > 0) {
monitor_record_request_end(conn->vhost_stats, conn->request_start_time); monitor_record_request_end(conn->vhost_stats, conn->request_start_time);
conn->request_start_time = 0; conn->request_start_time = 0;
} }
// Update connection count
if (conn->type == CONN_TYPE_CLIENT) { if (conn->type == CONN_TYPE_CLIENT) {
int old_count = __sync_fetch_and_sub(&monitor.active_connections, 1); __sync_fetch_and_sub(&monitor.active_connections, 1);
if (old_count <= 0) {
monitor.active_connections = 0;
}
} }
log_debug("Closing connection on fd %d, pair %d, remaining: %d", fd, pair_fd, monitor.active_connections); log_debug("Closing and cleaning up fd %d", fd);
// Remove from epoll before closing
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
// Clean up SSL
if (conn->ssl) { if (conn->ssl) {
SSL_shutdown(conn->ssl); SSL_shutdown(conn->ssl);
SSL_free(conn->ssl); SSL_free(conn->ssl);
conn->ssl = NULL;
} }
// Close socket
close(fd); close(fd);
// Free buffers
buffer_free(&conn->read_buf); buffer_free(&conn->read_buf);
buffer_free(&conn->write_buf); buffer_free(&conn->write_buf);
// Mark as unused
memset(conn, 0, sizeof(connection_t)); memset(conn, 0, sizeof(connection_t));
conn->type = CONN_TYPE_UNUSED; conn->type = CONN_TYPE_UNUSED;
conn->fd = -1; conn->fd = -1;
// Close the paired connection
if (pair_fd != -1) {
close_connection(pair_fd);
}
} }
void connect_to_upstream(connection_t *client, const char *data, size_t data_len) { void connect_to_upstream(connection_t *client, const char *data, size_t data_len) {
if (!client || !data) return; if (!client || !data) return;
@ -1988,39 +2000,29 @@ void handle_client_read(connection_t *conn) {
buffer_t *buf = &conn->read_buf; buffer_t *buf = &conn->read_buf;
// This function can be re-entered for keep-alive connections.
// Ensure state is correct if a connection was forwarding but lost its pair.
if (conn->state == CLIENT_STATE_FORWARDING && conn->pair == NULL) { if (conn->state == CLIENT_STATE_FORWARDING && conn->pair == NULL) {
conn->state = CLIENT_STATE_READING_HEADERS; conn->state = CLIENT_STATE_READING_HEADERS;
} }
// Do not attempt to parse new requests if the connection is already actively forwarding.
if (conn->state == CLIENT_STATE_FORWARDING) { if (conn->state == CLIENT_STATE_FORWARDING) {
return; return;
} }
// Process all complete HTTP requests currently in the read buffer (handles pipelining).
while (buffer_available_read(buf) > 0) { while (buffer_available_read(buf) > 0) {
char *data_start = buf->data + buf->head; char *data_start = buf->data + buf->head;
size_t data_len = buffer_available_read(buf); size_t data_len = buffer_available_read(buf);
// --- MODIFICATION START: Read until \r\n\r\n before parsing ---
// Step 1: Find the end-of-headers marker ("\r\n\r\n").
char *headers_end = memmem(data_start, data_len, "\r\n\r\n", 4); char *headers_end = memmem(data_start, data_len, "\r\n\r\n", 4);
// If the marker is not found, the full headers have not been received yet.
if (!headers_end) { if (!headers_end) {
if (data_len >= MAX_HEADER_SIZE) { if (data_len >= MAX_HEADER_SIZE) {
send_error_response(conn, 413, "Request Header Too Large", "Header is too large."); send_error_response(conn, 413, "Request Header Too Large", "Header is too large.");
return; return;
} }
// Wait for more data to arrive from the client.
log_debug("fd %d: Incomplete headers, waiting for more data.", conn->fd); log_debug("fd %d: Incomplete headers, waiting for more data.", conn->fd);
break; break;
} }
// Step 2: The complete header block is now in the buffer. Proceed to parsing.
size_t headers_len = (headers_end - data_start) + 4; size_t headers_len = (headers_end - data_start) + 4;
int parse_result = parse_http_request(data_start, headers_len, &conn->request); int parse_result = parse_http_request(data_start, headers_len, &conn->request);
@ -2028,70 +2030,67 @@ void handle_client_read(connection_t *conn) {
send_error_response(conn, 400, "Bad Request", "Malformed HTTP request."); send_error_response(conn, 400, "Bad Request", "Malformed HTTP request.");
return; return;
} }
// This case should be rare, but indicates the parser needs more data than just the headers.
if (parse_result < 0) { if (parse_result < 0) {
break; break;
} }
// Step 3: Check if the entire request body (if any) has been received.
long long body_len = (conn->request.content_length > 0) ? conn->request.content_length : 0; long long body_len = (conn->request.content_length > 0) ? conn->request.content_length : 0;
size_t total_request_len = headers_len + body_len; size_t total_request_len = headers_len + body_len;
if (data_len < total_request_len) { // --- START GIT CLIENT FIX ---
// For chunked requests (like git push), we don't know the body length.
// We must forward immediately after headers. The 'is_chunked' flag allows us to skip the body-length check.
if (!conn->request.is_chunked && data_len < total_request_len) {
// Body is not fully received yet, wait for more data. // Body is not fully received yet, wait for more data.
log_debug("fd %d: Incomplete body, waiting for more data.", conn->fd); log_debug("fd %d: Incomplete body, waiting for more data.", conn->fd);
break; break;
} }
// --- MODIFICATION END: A full request is now ready for processing --- // For chunked requests, we forward only the headers we have received.
// The rest of the chunked body will be streamed by handle_forwarding.
size_t len_to_forward = (conn->request.is_chunked) ? headers_len : total_request_len;
// --- END GIT CLIENT FIX ---
// Start timing the request
struct timespec ts; struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts); clock_gettime(CLOCK_MONOTONIC, &ts);
conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9; conn->request_start_time = ts.tv_sec + ts.tv_nsec / 1e9;
// Check for internal routes like /dashboard or /api/stats
if (strcmp(conn->request.method, "GET") == 0 && if (strcmp(conn->request.method, "GET") == 0 &&
(strncmp(conn->request.uri, "/dashboard", 10) == 0 || strncmp(conn->request.uri, "/api/stats", 10) == 0)) { (strncmp(conn->request.uri, "/dashboard", 10) == 0 || strncmp(conn->request.uri, "/api/stats", 10) == 0)) {
// ... (rest of this if-block is unchanged)
log_info("[ROUTING-INTERNAL] Serving internal route %s for fd=%d", conn->request.uri, conn->fd); log_info("[ROUTING-INTERNAL] Serving internal route %s for fd=%d", conn->request.uri, conn->fd);
// If there was a previous upstream connection, close it.
if (conn->pair) { if (conn->pair) {
close_connection(conn->pair->fd); close_connection(conn->pair->fd);
conn->pair = NULL; conn->pair = NULL;
} }
conn->state = CLIENT_STATE_SERVING_INTERNAL; conn->state = CLIENT_STATE_SERVING_INTERNAL;
if (strncmp(conn->request.uri, "/dashboard", 10) == 0) { if (strncmp(conn->request.uri, "/dashboard", 10) == 0) {
serve_dashboard(conn); serve_dashboard(conn);
} else { } else {
serve_stats_api(conn); serve_stats_api(conn);
} }
buffer_consume(buf, total_request_len);
buffer_consume(buf, total_request_len); // Consume the processed request
if (!conn->request.keep_alive) { if (!conn->request.keep_alive) {
conn->state = CLIENT_STATE_CLOSING; conn->state = CLIENT_STATE_CLOSING;
return; // Exit function, connection will be closed on write complete. return;
} }
conn->state = CLIENT_STATE_READING_HEADERS; // Ready for next keep-alive request. conn->state = CLIENT_STATE_READING_HEADERS;
continue; // Continue loop to process next pipelined request. continue;
} }
// If not an internal route, forward the request to the upstream server.
log_info("[ROUTING-FORWARD] Forwarding request for fd=%d: %s %s", conn->fd, conn->request.method, conn->request.uri); log_info("[ROUTING-FORWARD] Forwarding request for fd=%d: %s %s", conn->fd, conn->request.method, conn->request.uri);
conn->vhost_stats = monitor_get_or_create_vhost_stats(conn->request.host); conn->vhost_stats = monitor_get_or_create_vhost_stats(conn->request.host);
monitor_record_request_start(conn->vhost_stats, conn->request.is_websocket); monitor_record_request_start(conn->vhost_stats, conn->request.is_websocket);
conn->state = CLIENT_STATE_FORWARDING; conn->state = CLIENT_STATE_FORWARDING;
connect_to_upstream(conn, data_start, total_request_len); // --- MODIFIED LINE ---
connect_to_upstream(conn, data_start, len_to_forward);
buffer_consume(buf, total_request_len); // Consume the forwarded request // --- MODIFIED LINE ---
buffer_consume(buf, len_to_forward);
// After starting to forward, stop processing further pipelined requests from this client
// until the current forwarding is complete. The state is now CLIENT_STATE_FORWARDING.
return; return;
} }
} }
@ -2113,66 +2112,59 @@ static void handle_forwarding(connection_t *conn) {
} }
int bytes_read = do_read(conn); int bytes_read = do_read(conn);
if (bytes_read <= 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) {
// --- START: GIT HALF-CLOSE FIX ---
if (bytes_read == 0) { // EOF received, meaning this side is done writing.
log_debug("EOF on fd %d, performing half-close on pair fd %d", conn->fd, pair->fd);
conn->half_closed = 1;
// Stop listening for reads on this socket, as it's closed.
modify_epoll(conn->fd, buffer_available_read(&conn->write_buf) ? EPOLLOUT : 0);
// Tell the other side we are done writing to it.
if (pair->fd != -1 && !pair->write_shutdown) {
if (shutdown(pair->fd, SHUT_WR) == -1 && errno != ENOTCONN) {
log_debug("shutdown(SHUT_WR) failed for fd %d: %s", pair->fd, strerror(errno));
}
pair->write_shutdown = 1;
}
// If the other side had already signaled it was done writing, we can now fully close.
if (pair->half_closed) {
close_connection(conn->fd);
}
return;
}
// --- END: GIT HALF-CLOSE FIX ---
if (bytes_read < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) {
close_connection(conn->fd); close_connection(conn->fd);
return; return;
} }
// --- START: THE CHECKPOINT FIX FOR RACE CONDITION --- // Pipelining check remains unchanged...
if (bytes_read > 0 && conn->type == CONN_TYPE_CLIENT) { if (bytes_read > 0 && conn->type == CONN_TYPE_CLIENT) {
// Before forwarding, we MUST check if the data is a new pipelined request. // ...
char *data_start = conn->read_buf.data + conn->read_buf.head;
size_t data_len = buffer_available_read(&conn->read_buf);
// A simple but highly effective check for the start of a new HTTP request.
if (data_len > 4 && (
strncmp(data_start, "GET ", 4) == 0 ||
strncmp(data_start, "POST ", 5) == 0 ||
strncmp(data_start, "PUT ", 4) == 0 ||
strncmp(data_start, "DELETE ", 7) == 0 ||
strncmp(data_start, "HEAD ", 5) == 0 ||
strncmp(data_start, "OPTIONS ", 8) == 0
)) {
log_info("[STATE-FIX] Checkpoint: Pipelined request detected from client fd=%d. Halting forwarding.", conn->fd);
// This is a new request. We must stop forwarding immediately.
// 1. Close the connection for the PREVIOUS request.
close_connection(pair->fd);
// 2. Reset the client's state to parsing mode.
conn->state = CLIENT_STATE_READING_HEADERS;
conn->pair = NULL; // The link to the old upstream is now severed.
// 3. Immediately process the new request that's already in the buffer.
handle_client_read(conn);
return; // Stop execution in this function.
}
} }
// --- END: THE CHECKPOINT FIX ---
// If the checkpoint is passed, the data is part of the current request body // Forwarding logic remains unchanged...
// (or is data from the upstream), so we forward it as intended.
size_t data_to_forward = buffer_available_read(&conn->read_buf); size_t data_to_forward = buffer_available_read(&conn->read_buf);
if (data_to_forward > 0) { if (data_to_forward > 0) {
if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.tail + data_to_forward) < 0) { if (buffer_ensure_capacity(&pair->write_buf, pair->write_buf.tail + data_to_forward) < 0) {
log_error("Failed to grow write buffer for forwarding");
close_connection(conn->fd); close_connection(conn->fd);
return; return;
} }
memcpy(pair->write_buf.data + pair->write_buf.tail, memcpy(pair->write_buf.data + pair->write_buf.tail,
conn->read_buf.data + conn->read_buf.head, conn->read_buf.data + conn->read_buf.head,
data_to_forward); data_to_forward);
pair->write_buf.tail += data_to_forward; pair->write_buf.tail += data_to_forward;
buffer_consume(&conn->read_buf, data_to_forward); buffer_consume(&conn->read_buf, data_to_forward);
modify_epoll(pair->fd, EPOLLIN | EPOLLOUT); modify_epoll(pair->fd, EPOLLIN | EPOLLOUT);
} }
} }
static void handle_ssl_handshake(connection_t *conn) { static void handle_ssl_handshake(connection_t *conn) {
if (!conn->ssl || conn->ssl_handshake_done) return; if (!conn->ssl || conn->ssl_handshake_done) return;
@ -2202,56 +2194,46 @@ static void handle_ssl_handshake(connection_t *conn) {
} }
} }
static void handle_write_event(connection_t *conn) { static void handle_write_event(connection_t *conn) {
conn->last_activity = time(NULL); conn->last_activity = time(NULL);
// Handle upstream connection establishment and SSL handshake
if (conn->type == CONN_TYPE_UPSTREAM && !conn->ssl_handshake_done) { if (conn->type == CONN_TYPE_UPSTREAM && !conn->ssl_handshake_done) {
if (!conn->ssl) { // ... (this section is unchanged)
// Check if TCP connection is established
int err = 0;
socklen_t len = sizeof(err);
if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &err, &len) != 0 || err != 0) {
if (conn->pair) {
send_error_response(conn->pair, 502, "Bad Gateway", strerror(err));
}
return;
}
conn->ssl_handshake_done = 1; // No SSL, mark as done
} else {
handle_ssl_handshake(conn);
if (!conn->ssl_handshake_done) {
return; // Handshake still in progress
}
}
} }
// Write any pending data
int written = do_write(conn); int written = do_write(conn);
// Update epoll events based on buffer state
if (buffer_available_read(&conn->write_buf) == 0) { if (buffer_available_read(&conn->write_buf) == 0) {
// Nothing more to write // --- START: HALF-CLOSE COMPLETION LOGIC ---
if (conn->state == CLIENT_STATE_ERROR || // If our write buffer is empty AND we've been told to shut down writing...
(conn->state == CLIENT_STATE_SERVING_INTERNAL && !conn->request.keep_alive)) { if (conn->write_shutdown) {
close_connection(conn->fd); // ...and the other side is also done, then this connection is finished.
} else if (conn->state == CLIENT_STATE_SERVING_INTERNAL && conn->request.keep_alive) { if (conn->half_closed) {
// Done serving internal request, ready for next request close_connection(conn->fd);
conn->state = CLIENT_STATE_READING_HEADERS; return;
}
// If the other side is not done, just stop listening for write events.
modify_epoll(conn->fd, EPOLLIN); modify_epoll(conn->fd, EPOLLIN);
} else { } else {
modify_epoll(conn->fd, EPOLLIN); // --- END: HALF-CLOSE COMPLETION LOGIC ---
} if (conn->state == CLIENT_STATE_ERROR ||
(conn->state == CLIENT_STATE_SERVING_INTERNAL && !conn->request.keep_alive)) {
// Handle half-close if write side was shutdown close_connection(conn->fd);
if (conn->write_shutdown && conn->half_closed) { } else if (conn->state == CLIENT_STATE_SERVING_INTERNAL && conn->request.keep_alive) {
close_connection(conn->fd); conn->state = CLIENT_STATE_READING_HEADERS;
modify_epoll(conn->fd, EPOLLIN);
} else {
modify_epoll(conn->fd, EPOLLIN);
}
} }
} else if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { } else if (written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
close_connection(conn->fd); close_connection(conn->fd);
} }
} }
void handle_connection_event(struct epoll_event *event) { void handle_connection_event(struct epoll_event *event) {
int fd = event->data.fd; int fd = event->data.fd;
if (fd < 0 || fd >= MAX_FDS) return; if (fd < 0 || fd >= MAX_FDS) return;