// socket_backend.c (Corrected with better handle safety and non-blocking I/O) #include "wren.h" #include #include #include #include #include // Platform-specific includes and definitions #ifdef _WIN32 #include #include #include #pragma comment(lib, "ws2_32.lib") typedef SOCKET socket_t; typedef int socklen_t; typedef HANDLE thread_t; typedef CRITICAL_SECTION mutex_t; typedef CONDITION_VARIABLE cond_t; #define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET) #define CLOSE_SOCKET(s) closesocket(s) #else #include #include #include #include #include #include #include #include #include typedef int socket_t; typedef pthread_t thread_t; typedef pthread_mutex_t mutex_t; typedef pthread_cond_t cond_t; #define INVALID_SOCKET -1 #define IS_SOCKET_VALID(s) ((s) >= 0) #define CLOSE_SOCKET(s) close(s) #endif // --- Forward Declarations --- typedef struct SocketContext SocketContext; // --- Socket Data Structures --- typedef enum { SOCKET_OP_CONNECT, SOCKET_OP_READ, SOCKET_OP_WRITE, } SocketOp; typedef struct { socket_t sock; bool isListener; } SocketData; struct SocketContext { SocketOp operation; WrenVM* vm; WrenHandle* socketHandle; WrenHandle* callback; char* host; int port; char* data; size_t dataLength; bool success; char* resultData; size_t resultDataLength; char* errorMessage; socket_t newSocket; struct SocketContext* next; }; // --- Thread-Safe Queue Implementation in C --- typedef struct { SocketContext *head, *tail; mutex_t mutex; cond_t cond; } ThreadSafeQueueSocket; void queue_init(ThreadSafeQueueSocket* q) { q->head = q->tail = NULL; #ifdef _WIN32 InitializeCriticalSection(&q->mutex); InitializeConditionVariable(&q->cond); #else pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); #endif } void queue_destroy(ThreadSafeQueueSocket* q) { #ifdef _WIN32 DeleteCriticalSection(&q->mutex); #else pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); #endif } void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); #endif if (context) { context->next = NULL; } if (q->tail) { q->tail->next = context; } else { q->head = context; } q->tail = context; #ifdef _WIN32 WakeConditionVariable(&q->cond); LeaveCriticalSection(&q->mutex); #else pthread_cond_signal(&q->cond); pthread_mutex_unlock(&q->mutex); #endif } SocketContext* queue_pop(ThreadSafeQueueSocket* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); while (q->head == NULL) { SleepConditionVariableCS(&q->cond, &q->mutex, INFINITE); } #else pthread_mutex_lock(&q->mutex); while (q->head == NULL) { pthread_cond_wait(&q->cond, &q->mutex); } #endif SocketContext* context = q->head; q->head = q->head->next; if (q->head == NULL) { q->tail = NULL; } #ifdef _WIN32 LeaveCriticalSection(&q->mutex); #else pthread_mutex_unlock(&q->mutex); #endif return context; } bool queue_empty(ThreadSafeQueueSocket* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); bool empty = (q->head == NULL); LeaveCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); bool empty = (q->head == NULL); pthread_mutex_unlock(&q->mutex); #endif return empty; } // --- Asynchronous Socket Manager --- #define MAX_LISTENERS 64 typedef struct { WrenVM* vm; volatile bool running; thread_t worker_threads[4]; thread_t listener_thread; ThreadSafeQueueSocket requestQueue; ThreadSafeQueueSocket completionQueue; ThreadSafeQueueSocket acceptQueue; mutex_t listener_mutex; socket_t listener_sockets[MAX_LISTENERS]; int listener_count; #ifndef _WIN32 socket_t wake_pipe[2]; #endif } AsyncSocketManager; static AsyncSocketManager* socketManager = NULL; void free_socket_context_data(SocketContext* context) { if (!context) return; free(context->host); free(context->data); free(context->resultData); free(context->errorMessage); free(context); } #ifdef _WIN32 DWORD WINAPI workerThread(LPVOID arg); DWORD WINAPI listenerThread(LPVOID arg); #else void* workerThread(void* arg); void* listenerThread(void* arg); #endif // --- Worker and Listener Thread Implementations --- #ifdef _WIN32 DWORD WINAPI listenerThread(LPVOID arg) { #else void* listenerThread(void* arg) { #endif AsyncSocketManager* manager = (AsyncSocketManager*)arg; while (manager->running) { fd_set read_fds; FD_ZERO(&read_fds); socket_t max_fd = 0; #ifndef _WIN32 FD_SET(manager->wake_pipe[0], &read_fds); max_fd = manager->wake_pipe[0]; #endif #ifdef _WIN32 EnterCriticalSection(&manager->listener_mutex); #else pthread_mutex_lock(&manager->listener_mutex); #endif for (int i = 0; i < manager->listener_count; i++) { socket_t sock = manager->listener_sockets[i]; if (IS_SOCKET_VALID(sock)) { FD_SET(sock, &read_fds); if (sock > max_fd) { max_fd = sock; } } } #ifdef _WIN32 LeaveCriticalSection(&manager->listener_mutex); #else pthread_mutex_unlock(&manager->listener_mutex); #endif struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; int activity = select(max_fd + 1, &read_fds, NULL, NULL, &timeout); if (!manager->running) break; if (activity < 0) { #ifndef _WIN32 if (errno != EINTR) { perror("select error"); } #endif continue; } if (activity == 0) continue; #ifndef _WIN32 if (FD_ISSET(manager->wake_pipe[0], &read_fds)) { char buffer[1]; read(manager->wake_pipe[0], buffer, 1); } #endif #ifdef _WIN32 EnterCriticalSection(&manager->listener_mutex); #else pthread_mutex_lock(&manager->listener_mutex); #endif for (int i = 0; i < manager->listener_count; i++) { socket_t sock = manager->listener_sockets[i]; if (IS_SOCKET_VALID(sock) && FD_ISSET(sock, &read_fds)) { if (!queue_empty(&manager->acceptQueue)) { SocketContext* context = queue_pop(&manager->acceptQueue); context->newSocket = accept(sock, NULL, NULL); context->success = IS_SOCKET_VALID(context->newSocket); if (!context->success) { context->errorMessage = strdup("Accept failed."); } queue_push(&manager->completionQueue, context); } } } #ifdef _WIN32 LeaveCriticalSection(&manager->listener_mutex); #else pthread_mutex_unlock(&manager->listener_mutex); #endif } return 0; } #ifdef _WIN32 DWORD WINAPI workerThread(LPVOID arg) { #else void* workerThread(void* arg) { #endif AsyncSocketManager* manager = (AsyncSocketManager*)arg; while (manager->running) { SocketContext* context = queue_pop(&manager->requestQueue); if (!context || !manager->running) { if (context) free_socket_context_data(context); break; } wrenEnsureSlots(context->vm, 1); wrenSetSlotHandle(context->vm, 0, context->socketHandle); SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN) ? (SocketData*)wrenGetSlotForeign(context->vm, 0) : NULL; if (!socketData) { context->success = false; context->errorMessage = strdup("Invalid socket object."); queue_push(&manager->completionQueue, context); continue; } switch (context->operation) { case SOCKET_OP_CONNECT: { struct addrinfo hints = {0}, *addrs; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; char port_str[6]; snprintf(port_str, 6, "%d", context->port); if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) { context->success = false; context->errorMessage = strdup("Host lookup failed."); break; } socket_t sock = INVALID_SOCKET; for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (!IS_SOCKET_VALID(sock)) continue; if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; CLOSE_SOCKET(sock); sock = INVALID_SOCKET; } freeaddrinfo(addrs); if (IS_SOCKET_VALID(sock)) { socketData->sock = sock; socketData->isListener = false; context->success = true; } else { context->success = false; context->errorMessage = strdup("Connection failed."); } break; } case SOCKET_OP_READ: { if (socketData->isListener) { context->success = false; context->errorMessage = strdup("Cannot read from a listening socket."); break; } fd_set read_fds; FD_ZERO(&read_fds); FD_SET(socketData->sock, &read_fds); struct timeval timeout = { .tv_sec = 5, .tv_usec = 0 }; // 5-second timeout int activity = select(socketData->sock + 1, &read_fds, NULL, NULL, &timeout); if (activity > 0 && FD_ISSET(socketData->sock, &read_fds)) { char buf[4096]; ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0); if (len > 0) { context->resultData = (char*)malloc(len); memcpy(context->resultData, buf, len); context->resultDataLength = len; context->success = true; } else { context->success = false; context->errorMessage = strdup("Read failed or connection closed."); } } else { context->success = false; context->errorMessage = strdup("Read timeout or error."); } break; } case SOCKET_OP_WRITE: { if (socketData->isListener) { context->success = false; context->errorMessage = strdup("Cannot write to a listening socket."); break; } ssize_t written = send(socketData->sock, context->data, context->dataLength, 0); context->success = (written == (ssize_t)context->dataLength); if(!context->success) context->errorMessage = strdup("Write failed."); break; } } queue_push(&manager->completionQueue, context); } return 0; } // --- Manager Lifecycle --- void socketManager_create(WrenVM* vm) { socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager)); socketManager->vm = vm; socketManager->running = true; socketManager->listener_count = 0; queue_init(&socketManager->requestQueue); queue_init(&socketManager->completionQueue); queue_init(&socketManager->acceptQueue); #ifdef _WIN32 InitializeCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_init(&socketManager->listener_mutex, NULL); #endif #ifndef _WIN32 if (pipe(socketManager->wake_pipe) == -1) { perror("pipe"); exit(1); } #endif for (int i = 0; i < 4; i++) { #ifdef _WIN32 socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL); #else pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager); #endif } #ifdef _WIN32 socketManager->listener_thread = CreateThread(NULL, 0, listenerThread, socketManager, 0, NULL); #else pthread_create(&socketManager->listener_thread, NULL, listenerThread, socketManager); #endif } void socketManager_destroy() { socketManager->running = false; #ifndef _WIN32 write(socketManager->wake_pipe[1], "w", 1); #endif for (int i = 0; i < 4; i++) { queue_push(&socketManager->requestQueue, NULL); } #ifdef _WIN32 WaitForSingleObject(socketManager->listener_thread, INFINITE); CloseHandle(socketManager->listener_thread); for (int i = 0; i < 4; i++) { WaitForSingleObject(socketManager->worker_threads[i], INFINITE); CloseHandle(socketManager->worker_threads[i]); } #else pthread_join(socketManager->listener_thread, NULL); for (int i = 0; i < 4; i++) { pthread_join(socketManager->worker_threads[i], NULL); } close(socketManager->wake_pipe[0]); close(socketManager->wake_pipe[1]); #endif queue_destroy(&socketManager->requestQueue); queue_destroy(&socketManager->completionQueue); queue_destroy(&socketManager->acceptQueue); #ifdef _WIN32 DeleteCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_destroy(&socketManager->listener_mutex); #endif free(socketManager); } void socketManager_processCompletions() { WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); while (!queue_empty(&socketManager->completionQueue)) { SocketContext* context = queue_pop(&socketManager->completionQueue); wrenEnsureSlots(socketManager->vm, 3); wrenSetSlotHandle(socketManager->vm, 0, context->callback); if (context->success) { wrenSetSlotNull(socketManager->vm, 1); if (IS_SOCKET_VALID(context->newSocket)) { wrenGetVariable(socketManager->vm, "socket", "Socket", 2); void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData)); SocketData* clientData = (SocketData*)foreign; clientData->sock = context->newSocket; clientData->isListener = false; } else if (context->resultData) { wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength); } else { wrenSetSlotNull(socketManager->vm, 2); } } else { wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); wrenSetSlotNull(socketManager->vm, 2); } wrenCall(socketManager->vm, callHandle); // Safely release handles here on the main thread wrenReleaseHandle(socketManager->vm, context->socketHandle); wrenReleaseHandle(socketManager->vm, context->callback); free_socket_context_data(context); } wrenReleaseHandle(socketManager->vm, callHandle); } // ... (The rest of the foreign functions from socketAllocate onwards are identical to the previous response) ... void socketAllocate(WrenVM* vm) { SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData)); data->sock = INVALID_SOCKET; data->isListener = false; } void socketConnect(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_CONNECT; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->host = strdup(wrenGetSlotString(vm, 1)); context->port = (int)wrenGetSlotDouble(vm, 2); context->callback = wrenGetSlotHandle(vm, 3); queue_push(&socketManager->requestQueue, context); } void socketListen(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); const char* host = wrenGetSlotString(vm, 1); int port = (int)wrenGetSlotDouble(vm, 2); int backlog = (int)wrenGetSlotDouble(vm, 3); struct addrinfo hints = {0}, *addrs; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; char port_str[6]; snprintf(port_str, 6, "%d", port); if (getaddrinfo(host, port_str, &hints, &addrs) != 0) { wrenSetSlotBool(vm, 0, false); return; } socket_t sock = INVALID_SOCKET; for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (!IS_SOCKET_VALID(sock)) continue; int yes = 1; setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)); if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; CLOSE_SOCKET(sock); sock = INVALID_SOCKET; } freeaddrinfo(addrs); if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) { data->sock = sock; data->isListener = true; #ifdef _WIN32 EnterCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_lock(&socketManager->listener_mutex); #endif if (socketManager->listener_count < MAX_LISTENERS) { socketManager->listener_sockets[socketManager->listener_count++] = sock; } #ifdef _WIN32 LeaveCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_unlock(&socketManager->listener_mutex); #endif #ifndef _WIN32 write(socketManager->wake_pipe[1], "w", 1); #endif wrenSetSlotBool(vm, 0, true); } else { if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock); wrenSetSlotBool(vm, 0, false); } } void socketAccept(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->callback = wrenGetSlotHandle(vm, 1); queue_push(&socketManager->acceptQueue, context); } void socketRead(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_READ; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->callback = wrenGetSlotHandle(vm, 1); queue_push(&socketManager->requestQueue, context); } void socketWrite(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_WRITE; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); int len; const char* bytes = wrenGetSlotBytes(vm, 1, &len); context->data = (char*)malloc(len); memcpy(context->data, bytes, len); context->dataLength = len; context->callback = wrenGetSlotHandle(vm, 2); queue_push(&socketManager->requestQueue, context); } void socketClose(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (IS_SOCKET_VALID(data->sock)) { if (data->isListener) { #ifdef _WIN32 EnterCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_lock(&socketManager->listener_mutex); #endif for (int i = 0; i < socketManager->listener_count; i++) { if (socketManager->listener_sockets[i] == data->sock) { socketManager->listener_sockets[i] = socketManager->listener_sockets[socketManager->listener_count - 1]; socketManager->listener_count--; break; } } #ifdef _WIN32 LeaveCriticalSection(&socketManager->listener_mutex); #else pthread_mutex_unlock(&socketManager->listener_mutex); #endif } CLOSE_SOCKET(data->sock); data->sock = INVALID_SOCKET; } } void socketIsOpen(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock)); } void socketRemoteAddress(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (!IS_SOCKET_VALID(data->sock) || data->isListener) { wrenSetSlotNull(vm, 0); return; } struct sockaddr_storage addr; socklen_t len = sizeof(addr); char ipstr[INET6_ADDRSTRLEN]; if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { if (addr.ss_family == AF_INET) { inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr)); } else { inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr)); } wrenSetSlotString(vm, 0, ipstr); } else { wrenSetSlotNull(vm, 0); } } void socketRemotePort(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (!IS_SOCKET_VALID(data->sock) || data->isListener) { wrenSetSlotNull(vm, 0); return; } struct sockaddr_storage addr; socklen_t len = sizeof(addr); if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { int port = 0; if (addr.ss_family == AF_INET) { port = ntohs(((struct sockaddr_in*)&addr)->sin_port); } else if (addr.ss_family == AF_INET6) { port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port); } wrenSetSlotDouble(vm, 0, (double)port); } else { wrenSetSlotNull(vm, 0); } } WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { if (strcmp(module, "socket") != 0) return NULL; if (strcmp(className, "Socket") == 0 && !isStatic) { if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect; if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen; if (strcmp(signature, "accept(_)") == 0) return socketAccept; // NOTE: The signature for read() in Wren takes one argument (the callback) now. if (strcmp(signature, "read(_)") == 0) return socketRead; if (strcmp(signature, "write_(_,_)") == 0) return socketWrite; if (strcmp(signature, "close()") == 0) return socketClose; if (strcmp(signature, "isOpen") == 0) return socketIsOpen; if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress; if (strcmp(signature, "remotePort") == 0) return socketRemotePort; } return NULL; } WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) { WrenForeignClassMethods methods = {0, 0}; if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) { methods.allocate = socketAllocate; } return methods; }