#include "wren.h" #include #include #include #include #include // Platform-specific includes and definitions #ifdef _WIN32 #include #include #include #pragma comment(lib, "ws2_32.lib") typedef SOCKET socket_t; typedef int socklen_t; typedef HANDLE thread_t; typedef CRITICAL_SECTION mutex_t; typedef CONDITION_VARIABLE cond_t; #define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET) #define CLOSE_SOCKET(s) closesocket(s) #else #include #include #include #include #include #include #include #include typedef int socket_t; typedef pthread_t thread_t; typedef pthread_mutex_t mutex_t; typedef pthread_cond_t cond_t; #define INVALID_SOCKET -1 #define IS_SOCKET_VALID(s) ((s) >= 0) #define CLOSE_SOCKET(s) close(s) #endif // --- Forward Declarations --- typedef struct SocketContext SocketContext; // --- Socket Data Structures --- typedef enum { SOCKET_OP_CONNECT, SOCKET_OP_ACCEPT, SOCKET_OP_READ, SOCKET_OP_WRITE, } SocketOp; typedef struct { socket_t sock; bool isListener; } SocketData; struct SocketContext { SocketOp operation; WrenVM* vm; WrenHandle* socketHandle; WrenHandle* callback; // For connect char* host; int port; // For write char* data; size_t dataLength; // Results bool success; char* resultData; size_t resultDataLength; char* errorMessage; socket_t newSocket; // For accept struct SocketContext* next; }; // --- Thread-Safe Queue Implementation --- typedef struct { SocketContext *head, *tail; mutex_t mutex; cond_t cond; } ThreadSafeQueueSocket; void queue_init(ThreadSafeQueueSocket* q) { q->head = q->tail = NULL; #ifdef _WIN32 InitializeCriticalSection(&q->mutex); InitializeConditionVariable(&q->cond); #else pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); #endif } void queue_destroy(ThreadSafeQueueSocket* q) { #ifdef _WIN32 DeleteCriticalSection(&q->mutex); #else pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->cond); #endif } void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); #endif if (context) { context->next = NULL; } if (q->tail) { q->tail->next = context; } else { q->head = context; } q->tail = context; #ifdef _WIN32 WakeConditionVariable(&q->cond); LeaveCriticalSection(&q->mutex); #else pthread_cond_signal(&q->cond); pthread_mutex_unlock(&q->mutex); #endif } SocketContext* queue_pop(ThreadSafeQueueSocket* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); while (q->head == NULL) { SleepConditionVariableCS(&q->cond, &q->mutex, INFINITE); } #else pthread_mutex_lock(&q->mutex); while (q->head == NULL) { pthread_cond_wait(&q->cond, &q->mutex); } #endif SocketContext* context = q->head; if (context) { q->head = q->head->next; if (q->head == NULL) { q->tail = NULL; } } #ifdef _WIN32 LeaveCriticalSection(&q->mutex); #else pthread_mutex_unlock(&q->mutex); #endif return context; } bool queue_empty(ThreadSafeQueueSocket* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); bool empty = (q->head == NULL); LeaveCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); bool empty = (q->head == NULL); pthread_mutex_unlock(&q->mutex); #endif return empty; } // --- Asynchronous Socket Manager --- typedef struct { WrenVM* vm; volatile bool running; thread_t worker_threads[4]; ThreadSafeQueueSocket requestQueue; ThreadSafeQueueSocket completionQueue; } AsyncSocketManager; static AsyncSocketManager* socketManager = NULL; void free_socket_context_data(SocketContext* context) { if (!context) return; free(context->host); free(context->data); free(context->resultData); free(context->errorMessage); free(context); } #ifdef _WIN32 DWORD WINAPI workerThread(LPVOID arg); #else void* workerThread(void* arg); #endif // --- Worker Thread Implementation --- #ifdef _WIN32 DWORD WINAPI workerThread(LPVOID arg) { #else void* workerThread(void* arg) { #endif AsyncSocketManager* manager = (AsyncSocketManager*)arg; while (manager->running) { SocketContext* context = queue_pop(&manager->requestQueue); if (!context || !manager->running) { if (context) free_socket_context_data(context); break; } wrenEnsureSlots(context->vm, 1); wrenSetSlotHandle(context->vm, 0, context->socketHandle); SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN) ? (SocketData*)wrenGetSlotForeign(context->vm, 0) : NULL; if (!socketData || !IS_SOCKET_VALID(socketData->sock)) { context->success = false; context->errorMessage = strdup("Invalid or closed socket object."); queue_push(&manager->completionQueue, context); continue; } switch (context->operation) { case SOCKET_OP_CONNECT: { struct addrinfo hints = {0}, *addrs; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; char port_str[6]; snprintf(port_str, 6, "%d", context->port); if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) { context->success = false; context->errorMessage = strdup("Host lookup failed."); break; } socket_t sock = INVALID_SOCKET; for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (!IS_SOCKET_VALID(sock)) continue; if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; CLOSE_SOCKET(sock); sock = INVALID_SOCKET; } freeaddrinfo(addrs); if (IS_SOCKET_VALID(sock)) { socketData->sock = sock; socketData->isListener = false; context->success = true; } else { context->success = false; context->errorMessage = strdup("Connection failed."); } break; } case SOCKET_OP_ACCEPT: { if (!socketData->isListener) { context->success = false; context->errorMessage = strdup("Cannot accept on a non-listening socket."); break; } // This is a blocking call. The worker thread will wait here. context->newSocket = accept(socketData->sock, NULL, NULL); context->success = IS_SOCKET_VALID(context->newSocket); if (!context->success) { #ifdef _WIN32 // TODO: A more descriptive error using FormatMessageA context->errorMessage = strdup("Accept failed."); #else context->errorMessage = strdup(strerror(errno)); #endif } break; } case SOCKET_OP_READ: { if (socketData->isListener) { context->success = false; context->errorMessage = strdup("Cannot read from a listening socket."); break; } char buf[4096]; // This is a blocking call. ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0); if (len > 0) { context->resultData = (char*)malloc(len); memcpy(context->resultData, buf, len); context->resultDataLength = len; context->success = true; } else { context->success = false; if (len == 0) { context->errorMessage = strdup("Connection closed by peer."); } else { #ifdef _WIN32 context->errorMessage = strdup("Read failed."); #else context->errorMessage = strdup(strerror(errno)); #endif } } break; } case SOCKET_OP_WRITE: { if (socketData->isListener) { context->success = false; context->errorMessage = strdup("Cannot write to a listening socket."); break; } ssize_t written = send(socketData->sock, context->data, context->dataLength, 0); context->success = (written == (ssize_t)context->dataLength); if(!context->success) context->errorMessage = strdup("Write failed."); break; } } queue_push(&manager->completionQueue, context); } return 0; } // --- Manager Lifecycle --- void socketManager_create(WrenVM* vm) { if (socketManager != NULL) return; socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager)); socketManager->vm = vm; socketManager->running = true; queue_init(&socketManager->requestQueue); queue_init(&socketManager->completionQueue); for (int i = 0; i < 4; i++) { #ifdef _WIN32 socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL); #else pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager); #endif } } void socketManager_destroy() { if (!socketManager) return; socketManager->running = false; // Unblock all worker threads for (int i = 0; i < 4; i++) { queue_push(&socketManager->requestQueue, NULL); } // Wait for threads to finish for (int i = 0; i < 4; i++) { #ifdef _WIN32 WaitForSingleObject(socketManager->worker_threads[i], INFINITE); CloseHandle(socketManager->worker_threads[i]); #else pthread_join(socketManager->worker_threads[i], NULL); #endif } // Clean up any remaining contexts in queues while (!queue_empty(&socketManager->requestQueue)) { free_socket_context_data(queue_pop(&socketManager->requestQueue)); } while (!queue_empty(&socketManager->completionQueue)) { free_socket_context_data(queue_pop(&socketManager->completionQueue)); } queue_destroy(&socketManager->requestQueue); queue_destroy(&socketManager->completionQueue); free(socketManager); socketManager = NULL; } void socketManager_processCompletions() { if (!socketManager || queue_empty(&socketManager->completionQueue)) return; WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); while (!queue_empty(&socketManager->completionQueue)) { SocketContext* context = queue_pop(&socketManager->completionQueue); wrenEnsureSlots(socketManager->vm, 3); wrenSetSlotHandle(socketManager->vm, 0, context->callback); if (context->success) { wrenSetSlotNull(socketManager->vm, 1); // error slot if (IS_SOCKET_VALID(context->newSocket)) { // Accept succeeded wrenGetVariable(socketManager->vm, "socket", "Socket", 2); void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData)); SocketData* clientData = (SocketData*)foreign; clientData->sock = context->newSocket; clientData->isListener = false; } else if (context->resultData) { // Read succeeded wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength); } else { // Other successes (connect, write) wrenSetSlotNull(socketManager->vm, 2); } } else { wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); wrenSetSlotNull(socketManager->vm, 2); } wrenCall(socketManager->vm, callHandle); wrenReleaseHandle(socketManager->vm, context->socketHandle); wrenReleaseHandle(socketManager->vm, context->callback); free_socket_context_data(context); } wrenReleaseHandle(socketManager->vm, callHandle); } // --- Wren Foreign Methods --- void socketAllocate(WrenVM* vm) { SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData)); data->sock = INVALID_SOCKET; data->isListener = false; } void socketConnect(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_CONNECT; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->host = strdup(wrenGetSlotString(vm, 1)); context->port = (int)wrenGetSlotDouble(vm, 2); context->callback = wrenGetSlotHandle(vm, 3); queue_push(&socketManager->requestQueue, context); } void socketListen(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); const char* host = wrenGetSlotString(vm, 1); int port = (int)wrenGetSlotDouble(vm, 2); int backlog = (int)wrenGetSlotDouble(vm, 3); struct addrinfo hints = {0}, *addrs; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; char port_str[6]; snprintf(port_str, 6, "%d", port); if (getaddrinfo(host, port_str, &hints, &addrs) != 0) { wrenSetSlotBool(vm, 0, false); return; } socket_t sock = INVALID_SOCKET; for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (!IS_SOCKET_VALID(sock)) continue; int yes = 1; setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)); if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; CLOSE_SOCKET(sock); sock = INVALID_SOCKET; } freeaddrinfo(addrs); if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) { data->sock = sock; data->isListener = true; wrenSetSlotBool(vm, 0, true); } else { if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock); wrenSetSlotBool(vm, 0, false); } } void socketAccept(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_ACCEPT; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->callback = wrenGetSlotHandle(vm, 1); queue_push(&socketManager->requestQueue, context); } void socketRead(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_READ; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); context->callback = wrenGetSlotHandle(vm, 1); queue_push(&socketManager->requestQueue, context); } void socketWrite(WrenVM* vm) { SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->operation = SOCKET_OP_WRITE; context->vm = vm; context->socketHandle = wrenGetSlotHandle(vm, 0); int len; const char* bytes = wrenGetSlotBytes(vm, 1, &len); context->data = (char*)malloc(len); memcpy(context->data, bytes, len); context->dataLength = len; context->callback = wrenGetSlotHandle(vm, 2); queue_push(&socketManager->requestQueue, context); } void socketClose(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (IS_SOCKET_VALID(data->sock)) { CLOSE_SOCKET(data->sock); data->sock = INVALID_SOCKET; } } void socketIsOpen(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock)); } void socketRemoteAddress(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (!IS_SOCKET_VALID(data->sock) || data->isListener) { wrenSetSlotNull(vm, 0); return; } struct sockaddr_storage addr; socklen_t len = sizeof(addr); char ipstr[INET6_ADDRSTRLEN]; if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { if (addr.ss_family == AF_INET) { inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr)); } else { inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr)); } wrenSetSlotString(vm, 0, ipstr); } else { wrenSetSlotNull(vm, 0); } } void socketRemotePort(WrenVM* vm) { SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); if (!IS_SOCKET_VALID(data->sock) || data->isListener) { wrenSetSlotNull(vm, 0); return; } struct sockaddr_storage addr; socklen_t len = sizeof(addr); if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { int port = 0; if (addr.ss_family == AF_INET) { port = ntohs(((struct sockaddr_in*)&addr)->sin_port); } else if (addr.ss_family == AF_INET6) { port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port); } wrenSetSlotDouble(vm, 0, (double)port); } else { wrenSetSlotNull(vm, 0); } } WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { if (strcmp(module, "socket") != 0) return NULL; if (strcmp(className, "Socket") == 0 && !isStatic) { if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect; if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen; if (strcmp(signature, "accept(_)") == 0) return socketAccept; if (strcmp(signature, "read(_)") == 0) return socketRead; if (strcmp(signature, "write_(_,_)") == 0) return socketWrite; if (strcmp(signature, "close()") == 0) return socketClose; if (strcmp(signature, "isOpen") == 0) return socketIsOpen; if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress; if (strcmp(signature, "remotePort") == 0) return socketRemotePort; } return NULL; } WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) { WrenForeignClassMethods methods = {0, 0}; if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) { methods.allocate = socketAllocate; } return methods; }