diff --git a/merged_source_files.txt b/merged_source_files.txt index f208709..e694c9f 100644 --- a/merged_source_files.txt +++ b/merged_source_files.txt @@ -1,90 +1,79 @@ -// Start of socket_backend.c +// Start of sqlite3_backend.c #include "wren.h" +#include #include #include #include #include -#include -// Platform-specific includes and definitions #ifdef _WIN32 - #include - #include #include - #pragma comment(lib, "ws2_32.lib") - typedef SOCKET socket_t; - typedef int socklen_t; typedef HANDLE thread_t; typedef CRITICAL_SECTION mutex_t; typedef CONDITION_VARIABLE cond_t; - #define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET) - #define CLOSE_SOCKET(s) closesocket(s) #else #include - #include - #include - #include - #include - #include - #include - #include - typedef int socket_t; typedef pthread_t thread_t; typedef pthread_mutex_t mutex_t; typedef pthread_cond_t cond_t; - #define INVALID_SOCKET -1 - #define IS_SOCKET_VALID(s) ((s) >= 0) - #define CLOSE_SOCKET(s) close(s) #endif -// --- Forward Declarations --- -typedef struct SocketContext SocketContext; - -// --- Socket Data Structures --- +// --- Data Structures --- typedef enum { - SOCKET_OP_CONNECT, - SOCKET_OP_ACCEPT, - SOCKET_OP_READ, - SOCKET_OP_WRITE, -} SocketOp; + DB_OP_OPEN, + DB_OP_EXEC, + DB_OP_QUERY, + DB_OP_CLOSE +} DBOp; typedef struct { - socket_t sock; - bool isListener; -} SocketData; + sqlite3* db; +} DatabaseData; -struct SocketContext { - SocketOp operation; +// C-side representation of query results to pass from worker to main thread. +typedef struct DbValue { + int type; + union { + double num; + struct { + char* text; + int length; + } str; + } as; +} DbValue; + +typedef struct DbRow { + char** columns; + DbValue* values; + int count; + struct DbRow* next; +} DbRow; + +typedef struct DbContext { WrenVM* vm; - WrenHandle* socketHandle; + DBOp operation; WrenHandle* callback; - - // For connect - char* host; - int port; - - // For write - char* data; - size_t dataLength; - - // Results + WrenHandle* dbHandle; + char* path; + sqlite3* newDb; + char* sql; + sqlite3* db; bool success; - char* resultData; - size_t resultDataLength; char* errorMessage; - socket_t newSocket; // For accept - struct SocketContext* next; -}; + DbRow* resultRows; + struct DbContext* next; +} DbContext; + +// --- Thread-Safe Queue --- -// --- Thread-Safe Queue Implementation --- typedef struct { - SocketContext *head, *tail; + DbContext *head, *tail; mutex_t mutex; cond_t cond; -} ThreadSafeQueueSocket; +} DbThreadSafeQueue; -void queue_init(ThreadSafeQueueSocket* q) { +void db_queue_init(DbThreadSafeQueue* q) { q->head = q->tail = NULL; #ifdef _WIN32 InitializeCriticalSection(&q->mutex); @@ -95,7 +84,7 @@ void queue_init(ThreadSafeQueueSocket* q) { #endif } -void queue_destroy(ThreadSafeQueueSocket* q) { +void db_queue_destroy(DbThreadSafeQueue* q) { #ifdef _WIN32 DeleteCriticalSection(&q->mutex); #else @@ -104,24 +93,16 @@ void queue_destroy(ThreadSafeQueueSocket* q) { #endif } -void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { +void db_queue_push(DbThreadSafeQueue* q, DbContext* context) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); #endif - - if (context) { - context->next = NULL; - } - - if (q->tail) { - q->tail->next = context; - } else { - q->head = context; - } + if(context) context->next = NULL; + if (q->tail) q->tail->next = context; + else q->head = context; q->tail = context; - #ifdef _WIN32 WakeConditionVariable(&q->cond); LeaveCriticalSection(&q->mutex); @@ -131,7 +112,7 @@ void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { #endif } -SocketContext* queue_pop(ThreadSafeQueueSocket* q) { +DbContext* db_queue_pop(DbThreadSafeQueue* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); while (q->head == NULL) { @@ -143,460 +124,379 @@ SocketContext* queue_pop(ThreadSafeQueueSocket* q) { pthread_cond_wait(&q->cond, &q->mutex); } #endif - - SocketContext* context = q->head; - if (context) { - q->head = q->head->next; - if (q->head == NULL) { - q->tail = NULL; - } - } - + DbContext* context = q->head; + q->head = q->head->next; + if (q->head == NULL) q->tail = NULL; #ifdef _WIN32 LeaveCriticalSection(&q->mutex); #else pthread_mutex_unlock(&q->mutex); #endif - return context; } -bool queue_empty(ThreadSafeQueueSocket* q) { +bool db_queue_empty(DbThreadSafeQueue* q) { + bool empty; #ifdef _WIN32 EnterCriticalSection(&q->mutex); - bool empty = (q->head == NULL); + empty = (q->head == NULL); LeaveCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); - bool empty = (q->head == NULL); + empty = (q->head == NULL); pthread_mutex_unlock(&q->mutex); #endif return empty; } -// --- Asynchronous Socket Manager --- +// --- Async DB Manager --- typedef struct { WrenVM* vm; volatile bool running; - thread_t worker_threads[4]; - ThreadSafeQueueSocket requestQueue; - ThreadSafeQueueSocket completionQueue; -} AsyncSocketManager; + thread_t threads[2]; + DbThreadSafeQueue requestQueue; + DbThreadSafeQueue completionQueue; +} AsyncDbManager; -static AsyncSocketManager* socketManager = NULL; +static AsyncDbManager* dbManager = NULL; -void free_socket_context_data(SocketContext* context) { - if (!context) return; - free(context->host); - free(context->data); - free(context->resultData); +void free_db_result_rows(DbRow* rows) { + while (rows) { + DbRow* next = rows->next; + if (rows->columns) { + for (int i = 0; i < rows->count; i++) { + free(rows->columns[i]); + } + free(rows->columns); + } + if (rows->values) { + for (int i = 0; i < rows->count; i++) { + if (rows->values[i].type == SQLITE_TEXT || rows->values[i].type == SQLITE_BLOB) { + free(rows->values[i].as.str.text); + } + } + free(rows->values); + } + free(rows); + rows = next; + } +} + +void free_db_context(DbContext* context) { + if (context == NULL) return; + free(context->path); + free(context->sql); free(context->errorMessage); + if (context->dbHandle) wrenReleaseHandle(context->vm, context->dbHandle); + if (context->callback) wrenReleaseHandle(context->vm, context->callback); + if (context->resultRows) free_db_result_rows(context->resultRows); free(context); } -#ifdef _WIN32 -DWORD WINAPI workerThread(LPVOID arg); -#else -void* workerThread(void* arg); -#endif - -// --- Worker Thread Implementation --- +static void set_context_error(DbContext* context, const char* message) { + if (context == NULL) return; + context->success = false; + if (context->errorMessage) free(context->errorMessage); + context->errorMessage = message ? strdup(message) : strdup("An unknown database error occurred."); +} #ifdef _WIN32 -DWORD WINAPI workerThread(LPVOID arg) { +DWORD WINAPI dbWorkerThread(LPVOID arg); #else -void* workerThread(void* arg) { +void* dbWorkerThread(void* arg); #endif - AsyncSocketManager* manager = (AsyncSocketManager*)arg; - while (manager->running) { - SocketContext* context = queue_pop(&manager->requestQueue); - if (!context || !manager->running) { - if (context) free_socket_context_data(context); - break; +void dbManager_create(WrenVM* vm) { + if (dbManager != NULL) return; + dbManager = (AsyncDbManager*)malloc(sizeof(AsyncDbManager)); + if (dbManager == NULL) return; + dbManager->vm = vm; + dbManager->running = true; + db_queue_init(&dbManager->requestQueue); + db_queue_init(&dbManager->completionQueue); + for (int i = 0; i < 2; ++i) { + #ifdef _WIN32 + dbManager->threads[i] = CreateThread(NULL, 0, dbWorkerThread, dbManager, 0, NULL); + #else + pthread_create(&dbManager->threads[i], NULL, dbWorkerThread, dbManager); + #endif + } +} + +void dbManager_destroy() { + if (!dbManager) return; + dbManager->running = false; + for (int i = 0; i < 2; ++i) db_queue_push(&dbManager->requestQueue, NULL); + for (int i = 0; i < 2; ++i) { + #ifdef _WIN32 + WaitForSingleObject(dbManager->threads[i], INFINITE); + CloseHandle(dbManager->threads[i]); + #else + pthread_join(dbManager->threads[i], NULL); + #endif + } + while(!db_queue_empty(&dbManager->requestQueue)) free_db_context(db_queue_pop(&dbManager->requestQueue)); + while(!db_queue_empty(&dbManager->completionQueue)) free_db_context(db_queue_pop(&dbManager->completionQueue)); + db_queue_destroy(&dbManager->requestQueue); + db_queue_destroy(&dbManager->completionQueue); + free(dbManager); + dbManager = NULL; +} + +void dbManager_processCompletions() { + if (!dbManager || !dbManager->vm || db_queue_empty(&dbManager->completionQueue)) return; + + while (!db_queue_empty(&dbManager->completionQueue)) { + DbContext* context = db_queue_pop(&dbManager->completionQueue); + if (context == NULL) continue; + + if (context->success && context->dbHandle) { + wrenEnsureSlots(dbManager->vm, 1); + wrenSetSlotHandle(dbManager->vm, 0, context->dbHandle); + DatabaseData* dbData = (DatabaseData*)wrenGetSlotForeign(dbManager->vm, 0); + if (dbData) { + if (context->operation == DB_OP_OPEN) dbData->db = context->newDb; + else if (context->operation == DB_OP_CLOSE) dbData->db = NULL; + } } - wrenEnsureSlots(context->vm, 1); - wrenSetSlotHandle(context->vm, 0, context->socketHandle); - SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN) - ? (SocketData*)wrenGetSlotForeign(context->vm, 0) - : NULL; - - if (!socketData || !IS_SOCKET_VALID(socketData->sock)) { - context->success = false; - context->errorMessage = strdup("Invalid or closed socket object."); - queue_push(&manager->completionQueue, context); + if (context->callback == NULL) { + free_db_context(context); continue; } - switch (context->operation) { - case SOCKET_OP_CONNECT: { - struct addrinfo hints = {0}, *addrs; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - char port_str[6]; - snprintf(port_str, 6, "%d", context->port); - if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) { - context->success = false; - context->errorMessage = strdup("Host lookup failed."); - break; - } - - socket_t sock = INVALID_SOCKET; - for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { - sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (!IS_SOCKET_VALID(sock)) continue; - if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; - CLOSE_SOCKET(sock); - sock = INVALID_SOCKET; - } - freeaddrinfo(addrs); + WrenHandle* callHandle = NULL; + int numArgs = 0; + if (context->operation == DB_OP_QUERY) { + callHandle = wrenMakeCallHandle(dbManager->vm, "call(_,_)"); + numArgs = 2; + } else { + callHandle = wrenMakeCallHandle(dbManager->vm, "call(_)"); + numArgs = 1; + } - if (IS_SOCKET_VALID(sock)) { - socketData->sock = sock; - socketData->isListener = false; - context->success = true; - } else { - context->success = false; - context->errorMessage = strdup("Connection failed."); - } - break; - } - case SOCKET_OP_ACCEPT: { - if (!socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot accept on a non-listening socket."); - break; - } - - // This is a blocking call. The worker thread will wait here. - context->newSocket = accept(socketData->sock, NULL, NULL); - context->success = IS_SOCKET_VALID(context->newSocket); - if (!context->success) { - #ifdef _WIN32 - // TODO: A more descriptive error using FormatMessageA - context->errorMessage = strdup("Accept failed."); - #else - context->errorMessage = strdup(strerror(errno)); - #endif - } - break; - } - case SOCKET_OP_READ: { - if (socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot read from a listening socket."); - break; - } + if (callHandle == NULL) { + free_db_context(context); + continue; + } - char buf[4096]; - // This is a blocking call. - ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0); - if (len > 0) { - context->resultData = (char*)malloc(len); - memcpy(context->resultData, buf, len); - context->resultDataLength = len; - context->success = true; - } else { - context->success = false; - if (len == 0) { - context->errorMessage = strdup("Connection closed by peer."); - } else { - #ifdef _WIN32 - context->errorMessage = strdup("Read failed."); - #else - context->errorMessage = strdup(strerror(errno)); - #endif + // Ensure enough slots for callback, args, and temp work. + // Slots 0, 1, 2 are for the callback and its arguments. + // Slots 3, 4, 5 are for temporary work building maps. + wrenEnsureSlots(dbManager->vm, 6); + wrenSetSlotHandle(dbManager->vm, 0, context->callback); + + if (context->success) { + wrenSetSlotNull(dbManager->vm, 1); // error is null + if (numArgs == 2) { // Query case + if (context->resultRows) { + wrenSetSlotNewList(dbManager->vm, 2); // Result list in slot 2 + DbRow* row = context->resultRows; + while(row) { + wrenSetSlotNewMap(dbManager->vm, 3); // Temp map for row in slot 3 + for (int i = 0; i < row->count; i++) { + // Use slots 4 and 5 for key/value to avoid conflicts + wrenSetSlotString(dbManager->vm, 4, row->columns[i]); + DbValue* val = &row->values[i]; + switch (val->type) { + case SQLITE_INTEGER: case SQLITE_FLOAT: + wrenSetSlotDouble(dbManager->vm, 5, val->as.num); break; + case SQLITE_TEXT: case SQLITE_BLOB: + wrenSetSlotBytes(dbManager->vm, 5, val->as.str.text, val->as.str.length); break; + case SQLITE_NULL: + wrenSetSlotNull(dbManager->vm, 5); break; + } + wrenSetMapValue(dbManager->vm, 3, 4, 5); // map=3, key=4, val=5 + } + wrenInsertInList(dbManager->vm, 2, -1, 3); // list=2, element=3 + row = row->next; } + } else { + wrenSetSlotNewList(dbManager->vm, 2); // Return empty list for success with no rows + } + } + } else { + wrenSetSlotString(dbManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); + if (numArgs == 2) wrenSetSlotNull(dbManager->vm, 2); + } + + wrenCall(dbManager->vm, callHandle); + wrenReleaseHandle(dbManager->vm, callHandle); + free_db_context(context); + } +} + +// --- Worker Thread --- +#ifdef _WIN32 +DWORD WINAPI dbWorkerThread(LPVOID arg) { +#else +void* dbWorkerThread(void* arg) { +#endif + AsyncDbManager* manager = (AsyncDbManager*)arg; + while (manager->running) { + DbContext* context = db_queue_pop(&manager->requestQueue); + if (!context || !manager->running) { + if (context) free_db_context(context); + break; + } + switch (context->operation) { + case DB_OP_OPEN: { + int rc = sqlite3_open_v2(context->path, &context->newDb, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, + NULL); + if (rc != SQLITE_OK) { + set_context_error(context, sqlite3_errmsg(context->newDb)); + sqlite3_close(context->newDb); + context->newDb = NULL; + } else { + context->success = true; } break; } - case SOCKET_OP_WRITE: { - if (socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot write to a listening socket."); - break; + case DB_OP_EXEC: { + if (!context->db) { set_context_error(context, "Database is not open."); break; } + char* err = NULL; + int rc = sqlite3_exec(context->db, context->sql, 0, 0, &err); + if (rc != SQLITE_OK) { + set_context_error(context, err); + sqlite3_free(err); + } else { + context->success = true; } - ssize_t written = send(socketData->sock, context->data, context->dataLength, 0); - context->success = (written == (ssize_t)context->dataLength); - if(!context->success) context->errorMessage = strdup("Write failed."); + break; + } + case DB_OP_QUERY: { + if (!context->db) { set_context_error(context, "Database is not open."); break; } + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(context->db, context->sql, -1, &stmt, 0); + if (rc != SQLITE_OK) { set_context_error(context, sqlite3_errmsg(context->db)); break; } + + int colCount = sqlite3_column_count(stmt); + DbRow* head = NULL, *tail = NULL; + bool oom = false; + while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { + DbRow* row = (DbRow*)calloc(1, sizeof(DbRow)); + if (!row) { oom = true; break; } + row->count = colCount; + row->columns = (char**)malloc(sizeof(char*) * colCount); + row->values = (DbValue*)malloc(sizeof(DbValue) * colCount); + if (!row->columns || !row->values) { free(row->columns); free(row->values); free(row); oom = true; break; } + for (int i = 0; i < colCount; i++) { + const char* colName = sqlite3_column_name(stmt, i); + row->columns[i] = colName ? strdup(colName) : strdup(""); + if (!row->columns[i]) { for (int j=0; jcolumns[j]); free(row->columns); free(row->values); free(row); oom = true; goto query_loop_end; } + DbValue* val = &row->values[i]; + val->type = sqlite3_column_type(stmt, i); + switch (val->type) { + case SQLITE_INTEGER: case SQLITE_FLOAT: val->as.num = sqlite3_column_double(stmt, i); break; + case SQLITE_TEXT: case SQLITE_BLOB: { + const void* blob = sqlite3_column_blob(stmt, i); + int len = sqlite3_column_bytes(stmt, i); + val->as.str.text = (char*)malloc(len); + if (!val->as.str.text) { for (int j=0; j<=i; j++) free(row->columns[j]); free(row->columns); free(row->values); free(row); oom = true; goto query_loop_end; } + memcpy(val->as.str.text, blob, len); + val->as.str.length = len; + break; + } + case SQLITE_NULL: break; + } + } + if (!head) head = tail = row; else { tail->next = row; tail = row; } + } + query_loop_end:; + if (oom) { set_context_error(context, "Out of memory during query."); free_db_result_rows(head); } + else if (rc != SQLITE_DONE) { set_context_error(context, sqlite3_errmsg(context->db)); free_db_result_rows(head); } + else { context->success = true; context->resultRows = head; } + sqlite3_finalize(stmt); + break; + } + case DB_OP_CLOSE: { + if (context->db) sqlite3_close(context->db); + context->success = true; break; } } - queue_push(&manager->completionQueue, context); + db_queue_push(&manager->completionQueue, context); } return 0; } -// --- Manager Lifecycle --- +// --- Wren FFI --- -void socketManager_create(WrenVM* vm) { - if (socketManager != NULL) return; - socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager)); - socketManager->vm = vm; - socketManager->running = true; - - queue_init(&socketManager->requestQueue); - queue_init(&socketManager->completionQueue); - - for (int i = 0; i < 4; i++) { - #ifdef _WIN32 - socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL); - #else - pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager); - #endif - } +void dbAllocate(WrenVM* vm) { + DatabaseData* data = (DatabaseData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(DatabaseData)); + if (data) data->db = NULL; } -void socketManager_destroy() { - if (!socketManager) return; - socketManager->running = false; - - // Unblock all worker threads - for (int i = 0; i < 4; i++) { - queue_push(&socketManager->requestQueue, NULL); - } - - // Wait for threads to finish - for (int i = 0; i < 4; i++) { - #ifdef _WIN32 - WaitForSingleObject(socketManager->worker_threads[i], INFINITE); - CloseHandle(socketManager->worker_threads[i]); - #else - pthread_join(socketManager->worker_threads[i], NULL); - #endif - } - - // Clean up any remaining contexts in queues - while (!queue_empty(&socketManager->requestQueue)) { - free_socket_context_data(queue_pop(&socketManager->requestQueue)); - } - while (!queue_empty(&socketManager->completionQueue)) { - free_socket_context_data(queue_pop(&socketManager->completionQueue)); - } - - queue_destroy(&socketManager->requestQueue); - queue_destroy(&socketManager->completionQueue); - - free(socketManager); - socketManager = NULL; +void dbFinalize(void* data) { + DatabaseData* dbData = (DatabaseData*)data; + if (dbData && dbData->db) sqlite3_close(dbData->db); } -void socketManager_processCompletions() { - if (!socketManager || queue_empty(&socketManager->completionQueue)) return; - - WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); - while (!queue_empty(&socketManager->completionQueue)) { - SocketContext* context = queue_pop(&socketManager->completionQueue); - - wrenEnsureSlots(socketManager->vm, 3); - wrenSetSlotHandle(socketManager->vm, 0, context->callback); - if (context->success) { - wrenSetSlotNull(socketManager->vm, 1); // error slot - if (IS_SOCKET_VALID(context->newSocket)) { // Accept succeeded - wrenGetVariable(socketManager->vm, "socket", "Socket", 2); - void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData)); - SocketData* clientData = (SocketData*)foreign; - clientData->sock = context->newSocket; - clientData->isListener = false; - } else if (context->resultData) { // Read succeeded - wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength); - } else { // Other successes (connect, write) - wrenSetSlotNull(socketManager->vm, 2); - } - } else { - wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); - wrenSetSlotNull(socketManager->vm, 2); +static void create_db_context(WrenVM* vm, DBOp op, int sqlSlot, int cbSlot) { + DbContext* context = (DbContext*)calloc(1, sizeof(DbContext)); + if (!context) { wrenSetSlotString(vm, 0, "Out of memory."); wrenAbortFiber(vm, 0); return; } + context->vm = vm; + context->operation = op; + context->dbHandle = wrenGetSlotHandle(vm, 0); + context->callback = wrenGetSlotHandle(vm, cbSlot); + if (sqlSlot != -1) { + if (wrenGetSlotType(vm, sqlSlot) != WREN_TYPE_STRING) { + wrenSetSlotString(vm, 0, "SQL argument must be a string."); + wrenAbortFiber(vm, 0); + free_db_context(context); + return; } - - wrenCall(socketManager->vm, callHandle); - - wrenReleaseHandle(socketManager->vm, context->socketHandle); - wrenReleaseHandle(socketManager->vm, context->callback); - free_socket_context_data(context); + const char* sql_str = wrenGetSlotString(vm, sqlSlot); + if (sql_str) context->sql = strdup(sql_str); + if (!context->sql) { set_context_error(context, "Out of memory."); db_queue_push(&dbManager->requestQueue, context); return; } } - wrenReleaseHandle(socketManager->vm, callHandle); + DatabaseData* dbData = (DatabaseData*)wrenGetSlotForeign(vm, 0); + if (!dbData) { set_context_error(context, "Invalid database object."); db_queue_push(&dbManager->requestQueue, context); return; } + context->db = dbData->db; + db_queue_push(&dbManager->requestQueue, context); } -// --- Wren Foreign Methods --- - -void socketAllocate(WrenVM* vm) { - SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData)); - data->sock = INVALID_SOCKET; - data->isListener = false; -} - -void socketConnect(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_CONNECT; +void dbOpen(WrenVM* vm) { + DbContext* context = (DbContext*)calloc(1, sizeof(DbContext)); + if (!context) { wrenSetSlotString(vm, 0, "Out of memory."); wrenAbortFiber(vm, 0); return; } context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->host = strdup(wrenGetSlotString(vm, 1)); - context->port = (int)wrenGetSlotDouble(vm, 2); - context->callback = wrenGetSlotHandle(vm, 3); - queue_push(&socketManager->requestQueue, context); -} - -void socketListen(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - const char* host = wrenGetSlotString(vm, 1); - int port = (int)wrenGetSlotDouble(vm, 2); - int backlog = (int)wrenGetSlotDouble(vm, 3); - - struct addrinfo hints = {0}, *addrs; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - char port_str[6]; - snprintf(port_str, 6, "%d", port); - if (getaddrinfo(host, port_str, &hints, &addrs) != 0) { - wrenSetSlotBool(vm, 0, false); - return; - } - - socket_t sock = INVALID_SOCKET; - for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { - sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (!IS_SOCKET_VALID(sock)) continue; - - int yes = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)); - - if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; - - CLOSE_SOCKET(sock); - sock = INVALID_SOCKET; - } - freeaddrinfo(addrs); - - if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) { - data->sock = sock; - data->isListener = true; - wrenSetSlotBool(vm, 0, true); - } else { - if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock); - wrenSetSlotBool(vm, 0, false); - } -} - -void socketAccept(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_ACCEPT; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->callback = wrenGetSlotHandle(vm, 1); - queue_push(&socketManager->requestQueue, context); -} - -void socketRead(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_READ; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->callback = wrenGetSlotHandle(vm, 1); - queue_push(&socketManager->requestQueue, context); -} - -void socketWrite(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_WRITE; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - int len; - const char* bytes = wrenGetSlotBytes(vm, 1, &len); - context->data = (char*)malloc(len); - memcpy(context->data, bytes, len); - context->dataLength = len; + context->operation = DB_OP_OPEN; + const char* path_str = wrenGetSlotString(vm, 1); + if (path_str) context->path = strdup(path_str); + if (!context->path) { free(context); wrenSetSlotString(vm, 0, "Out of memory."); wrenAbortFiber(vm, 0); return; } + context->dbHandle = wrenGetSlotHandle(vm, 0); context->callback = wrenGetSlotHandle(vm, 2); - queue_push(&socketManager->requestQueue, context); + db_queue_push(&dbManager->requestQueue, context); } -void socketClose(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (IS_SOCKET_VALID(data->sock)) { - CLOSE_SOCKET(data->sock); - data->sock = INVALID_SOCKET; - } -} +void dbExec(WrenVM* vm) { create_db_context(vm, DB_OP_EXEC, 1, 2); } +void dbQuery(WrenVM* vm) { create_db_context(vm, DB_OP_QUERY, 1, 2); } +void dbClose(WrenVM* vm) { create_db_context(vm, DB_OP_CLOSE, -1, 1); } -void socketIsOpen(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock)); -} - -void socketRemoteAddress(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (!IS_SOCKET_VALID(data->sock) || data->isListener) { - wrenSetSlotNull(vm, 0); - return; - } - - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); - char ipstr[INET6_ADDRSTRLEN]; - - if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { - if (addr.ss_family == AF_INET) { - inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr)); - } else { - inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr)); - } - wrenSetSlotString(vm, 0, ipstr); - } else { - wrenSetSlotNull(vm, 0); - } -} - -void socketRemotePort(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (!IS_SOCKET_VALID(data->sock) || data->isListener) { - wrenSetSlotNull(vm, 0); - return; - } - - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); - - if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { - int port = 0; - if (addr.ss_family == AF_INET) { - port = ntohs(((struct sockaddr_in*)&addr)->sin_port); - } else if (addr.ss_family == AF_INET6) { - port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port); - } - wrenSetSlotDouble(vm, 0, (double)port); - } else { - wrenSetSlotNull(vm, 0); - } -} - -WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { - if (strcmp(module, "socket") != 0) return NULL; - if (strcmp(className, "Socket") == 0 && !isStatic) { - if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect; - if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen; - if (strcmp(signature, "accept(_)") == 0) return socketAccept; - if (strcmp(signature, "read(_)") == 0) return socketRead; - if (strcmp(signature, "write_(_,_)") == 0) return socketWrite; - if (strcmp(signature, "close()") == 0) return socketClose; - if (strcmp(signature, "isOpen") == 0) return socketIsOpen; - if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress; - if (strcmp(signature, "remotePort") == 0) return socketRemotePort; +WrenForeignMethodFn bindSqliteForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { + if (strcmp(module, "sqlite") != 0) return NULL; + if (strcmp(className, "Database") == 0 && !isStatic) { + if (strcmp(signature, "open_(_,_)") == 0) return dbOpen; + if (strcmp(signature, "exec_(_,_)") == 0) return dbExec; + if (strcmp(signature, "query_(_,_)") == 0) return dbQuery; + if (strcmp(signature, "close_(_)") == 0) return dbClose; } return NULL; } -WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) { - WrenForeignClassMethods methods = {0, 0}; - if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) { - methods.allocate = socketAllocate; +WrenForeignClassMethods bindSqliteForeignClass(WrenVM* vm, const char* module, const char* className) { + if (strcmp(module, "sqlite") == 0 && strcmp(className, "Database") == 0) { + WrenForeignClassMethods methods = {dbAllocate, dbFinalize}; + return methods; } + WrenForeignClassMethods methods = {0, 0}; return methods; } -// End of socket_backend.c + +// End of sqlite3_backend.c // Start of main.c #include @@ -612,9 +512,14 @@ WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, c #endif #include "wren.h" + +// It's common practice to include the C source directly for small-to-medium +// modules like this to simplify the build process. #include "requests_backend.c" #include "socket_backend.c" -#include "string_backend.c" // Include the new string backend +#include "string_backend.c" +#include "sqlite3_backend.c" +#include "io_backend.c" // --- Global flag to control the main loop --- static volatile bool g_mainFiberIsDone = false; @@ -683,39 +588,41 @@ static WrenLoadModuleResult loadModule(WrenVM* vm, const char* name) { // --- Combined Foreign Function Binders --- WrenForeignMethodFn combinedBindForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { - // Delegate to the socket backend's binder if (strcmp(module, "socket") == 0) { return bindSocketForeignMethod(vm, module, className, isStatic, signature); } - - // Delegate to the requests backend's binder if (strcmp(module, "requests") == 0) { return bindForeignMethod(vm, module, className, isStatic, signature); } - + if (strcmp(module, "sqlite") == 0) { + return bindSqliteForeignMethod(vm, module, className, isStatic, signature); + } + if (strcmp(module, "io") == 0) { + return bindIoForeignMethod(vm, module, className, isStatic, signature); + } if (strcmp(className, "String") == 0) { return bindStringForeignMethod(vm, module, className, isStatic, signature); } - - // Handle host-specific methods if (strcmp(module, "main") == 0 && strcmp(className, "Host") == 0 && isStatic) { if (strcmp(signature, "signalDone()") == 0) return hostSignalDone; } - return NULL; } WrenForeignClassMethods combinedBindForeignClass(WrenVM* vm, const char* module, const char* className) { - // Delegate to the socket backend's class binder if (strcmp(module, "socket") == 0) { return bindSocketForeignClass(vm, module, className); } - - // Delegate to the requests backend's class binder if (strcmp(module, "requests") == 0) { return bindForeignClass(vm, module, className); } - + if (strcmp(module, "sqlite") == 0) { + return bindSqliteForeignClass(vm, module, className); + } + if (strcmp(module, "io") == 0) { + WrenForeignClassMethods methods = {0, 0}; + return methods; + } WrenForeignClassMethods methods = {0, 0}; return methods; } @@ -728,7 +635,6 @@ int main(int argc, char* argv[]) { return 1; } - // Initialize libcurl for the requests module curl_global_init(CURL_GLOBAL_ALL); WrenConfiguration config; @@ -741,15 +647,17 @@ int main(int argc, char* argv[]) { WrenVM* vm = wrenNewVM(&config); - // ** Initialize BOTH managers ** + // ** Initialize ALL managers ** socketManager_create(vm); httpManager_create(vm); + dbManager_create(vm); char* mainSource = readFile(argv[1]); if (!mainSource) { fprintf(stderr, "Could not open script: %s\n", argv[1]); socketManager_destroy(); httpManager_destroy(); + dbManager_destroy(); wrenFreeVM(vm); curl_global_cleanup(); return 1; @@ -761,6 +669,7 @@ int main(int argc, char* argv[]) { if (g_mainFiberIsDone) { socketManager_destroy(); httpManager_destroy(); + dbManager_destroy(); wrenFreeVM(vm); curl_global_cleanup(); return 1; @@ -773,14 +682,16 @@ int main(int argc, char* argv[]) { // === Main Event Loop === while (!g_mainFiberIsDone) { - // ** Process completions for BOTH managers ** + // ** Process completions for ALL managers ** socketManager_processCompletions(); httpManager_processCompletions(); + dbManager_processCompletions(); // Resume the main Wren fiber wrenEnsureSlots(vm, 1); wrenSetSlotHandle(vm, 0, mainFiberHandle); WrenInterpretResult result = wrenCall(vm, callHandle); + if (result == WREN_RESULT_RUNTIME_ERROR) { g_mainFiberIsDone = true; } @@ -796,13 +707,15 @@ int main(int argc, char* argv[]) { // Process any final completions before shutting down socketManager_processCompletions(); httpManager_processCompletions(); + dbManager_processCompletions(); wrenReleaseHandle(vm, mainFiberHandle); wrenReleaseHandle(vm, callHandle); - // ** Destroy BOTH managers ** + // ** Destroy ALL managers ** socketManager_destroy(); httpManager_destroy(); + dbManager_destroy(); wrenFreeVM(vm); curl_global_cleanup(); @@ -811,6 +724,7 @@ int main(int argc, char* argv[]) { return 0; } + // End of main.c // Start of requests_backend.c @@ -1998,6 +1912,129 @@ private: // End of async_http.c +// Start of io_backend.c +#include "wren.h" +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +// Helper to validate that the value in a slot is a string. +static bool io_validate_string(WrenVM* vm, int slot, const char* name) { + if (wrenGetSlotType(vm, slot) == WREN_TYPE_STRING) return true; + // The error is placed in slot 0, which becomes the return value. + wrenSetSlotString(vm, 0, "Argument must be a string."); + wrenAbortFiber(vm, 0); + return false; +} + +// --- File Class Foreign Methods --- + +void fileExists(WrenVM* vm) { + if (!io_validate_string(vm, 1, "path")) return; + const char* path = wrenGetSlotString(vm, 1); + +#ifdef _WIN32 + DWORD attrib = GetFileAttributes(path); + wrenSetSlotBool(vm, 0, (attrib != INVALID_FILE_ATTRIBUTES && !(attrib & FILE_ATTRIBUTE_DIRECTORY))); +#else + struct stat buffer; + wrenSetSlotBool(vm, 0, (stat(path, &buffer) == 0)); +#endif +} + +void fileDelete(WrenVM* vm) { + if (!io_validate_string(vm, 1, "path")) return; + const char* path = wrenGetSlotString(vm, 1); + + if (remove(path) == 0) { + wrenSetSlotBool(vm, 0, true); + } else { + wrenSetSlotBool(vm, 0, false); + } +} + +void fileRead(WrenVM* vm) { + if (!io_validate_string(vm, 1, "path")) return; + const char* path = wrenGetSlotString(vm, 1); + + FILE* file = fopen(path, "rb"); + if (file == NULL) { + wrenSetSlotNull(vm, 0); + return; + } + + fseek(file, 0L, SEEK_END); + size_t fileSize = ftell(file); + rewind(file); + + char* buffer = (char*)malloc(fileSize + 1); + if (buffer == NULL) { + fclose(file); + wrenSetSlotString(vm, 0, "Could not allocate memory to read file."); + wrenAbortFiber(vm, 0); + return; + } + + size_t bytesRead = fread(buffer, sizeof(char), fileSize, file); + if (bytesRead < fileSize) { + free(buffer); + fclose(file); + wrenSetSlotString(vm, 0, "Could not read entire file."); + wrenAbortFiber(vm, 0); + return; + } + + buffer[bytesRead] = '\0'; + fclose(file); + + wrenSetSlotBytes(vm, 0, buffer, bytesRead); + free(buffer); +} + +void fileWrite(WrenVM* vm) { + if (!io_validate_string(vm, 1, "path")) return; + if (!io_validate_string(vm, 2, "contents")) return; + + const char* path = wrenGetSlotString(vm, 1); + int length; + const char* contents = wrenGetSlotBytes(vm, 2, &length); + + FILE* file = fopen(path, "wb"); + if (file == NULL) { + wrenSetSlotBool(vm, 0, false); + return; + } + + size_t bytesWritten = fwrite(contents, sizeof(char), length, file); + fclose(file); + + wrenSetSlotBool(vm, 0, bytesWritten == (size_t)length); +} + +// --- FFI Binding --- + +WrenForeignMethodFn bindIoForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { + if (strcmp(module, "io") != 0) return NULL; + + if (strcmp(className, "File") == 0 && isStatic) { + if (strcmp(signature, "exists(_)") == 0) return fileExists; + if (strcmp(signature, "delete(_)") == 0) return fileDelete; + if (strcmp(signature, "read(_)") == 0) return fileRead; + if (strcmp(signature, "write(_,_)") == 0) return fileWrite; + } + + return NULL; +} + + +// End of io_backend.c + // Start of wren.c // MIT License // diff --git a/socket.wren b/socket.wren index 62392fd..592fe60 100644 --- a/socket.wren +++ b/socket.wren @@ -1,49 +1,87 @@ -// socket.wren (Corrected) +// socket.wren + foreign class Socket { - // CORRECTED: Changed 'new_' to 'new' to match the standard convention. - construct new() {} + // Asynchronous static methods that perform raw socket operations. + // These are the direct bindings to the C backend. + foreign static connect_(host, port, callback) + foreign static new_(callback) + foreign static bind_(sock, host, port, callback) + foreign static listen_(sock, backlog, callback) + foreign static accept_(sock, callback) + foreign static read_(sock, length, callback) + foreign static write_(sock, data, callback) + // Additional raw functions can be added here following the pattern. +} - foreign connect(host, port, callback) - foreign listen(host, port, backlog) - foreign accept(callback) - foreign read(bytes) - foreign close() - - foreign isOpen - foreign remoteAddress - foreign remotePort - - // Implemented in Wren - write(data, callback) { - write_(data, callback) +class SocketInstance { + construct new(socketFd) { + _sock = socketFd } - readUntil(delimiter, callback) { - var buffer = "" - var readChunk - readChunk = Fn.new { - this.read(4096) { |err, data| - if (err) { - callback.call(err, null) - return - } + // Instance methods providing a more convenient API. + accept() { + var fiber = Fiber.current + Socket.accept_(_sock) { |err, newSock| + fiber.transfer([err, newSock]) + } + return Fiber.yield() + } - buffer = buffer + data - var index = buffer.indexOf(delimiter) + read(length) { + var fiber = Fiber.current + Socket.read_(_sock, length) { |err, data| + fiber.transfer([err, data]) + } + return Fiber.yield() + } - if (index != -1) { - var result = buffer.substring(0, index + delimiter.count) - callback.call(null, result) - } else { - // Delimiter not found, read more data. - readChunk.call() - } + write(data) { + var fiber = Fiber.current + Socket.write_(_sock, data) { |err, nothing| + fiber.transfer(err) + } + return Fiber.yield() + } + + // Static methods for creating client and server sockets. + static connect(host, port) { + var fiber = Fiber.current + Socket.connect_(host, port) { |err, sock| + if (err) { + fiber.transfer([err, null]) + } else { + fiber.transfer([null, SocketInstance.new(sock)]) } } - // Start reading. - readChunk.call() + return Fiber.yield() } - // Private foreign method for writing - foreign write_(data, callback) + static new() { + var fiber = Fiber.current + Socket.new_() { |err, sock| + if (err) { + fiber.transfer([err, null]) + } else { + fiber.transfer([null, SocketInstance.new(sock)]) + } + } + return Fiber.yield() + } + + bind(host, port) { + var fiber = Fiber.current + Socket.bind_(_sock, host, port) { |err, success| + fiber.transfer([err, success]) + } + return Fiber.yield() + } + + listen(backlog) { + var fiber = Fiber.current + Socket.listen_(_sock, backlog) { |err, success| + fiber.transfer([err, success]) + } + return Fiber.yield() + } } + diff --git a/socket_backend.c b/socket_backend.c index 6bc8eea..6effe7c 100644 --- a/socket_backend.c +++ b/socket_backend.c @@ -3,87 +3,87 @@ #include #include #include -#include -// Platform-specific includes and definitions #ifdef _WIN32 #include #include - #include #pragma comment(lib, "ws2_32.lib") typedef SOCKET socket_t; - typedef int socklen_t; typedef HANDLE thread_t; typedef CRITICAL_SECTION mutex_t; typedef CONDITION_VARIABLE cond_t; - #define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET) - #define CLOSE_SOCKET(s) closesocket(s) #else #include #include #include #include #include - #include - #include + #include #include + #include typedef int socket_t; typedef pthread_t thread_t; typedef pthread_mutex_t mutex_t; typedef pthread_cond_t cond_t; - #define INVALID_SOCKET -1 - #define IS_SOCKET_VALID(s) ((s) >= 0) - #define CLOSE_SOCKET(s) close(s) + #define closesocket(s) close(s) #endif -// --- Forward Declarations --- -typedef struct SocketContext SocketContext; - -// --- Socket Data Structures --- +// --- Data Structures --- typedef enum { SOCKET_OP_CONNECT, + SOCKET_OP_NEW, + SOCKET_OP_BIND, + SOCKET_OP_LISTEN, SOCKET_OP_ACCEPT, SOCKET_OP_READ, + SOCKET_OP_READ_UNTIL, + SOCKET_OP_READ_EXACTLY, SOCKET_OP_WRITE, + SOCKET_OP_IS_READABLE, + SOCKET_OP_SELECT } SocketOp; typedef struct { - socket_t sock; - bool isListener; -} SocketData; + char* data; + int length; +} Buffer; -struct SocketContext { - SocketOp operation; +typedef struct SocketContext { WrenVM* vm; - WrenHandle* socketHandle; + SocketOp operation; WrenHandle* callback; - // For connect + // Operation specific data + socket_t sock; char* host; int port; + int backlog; + int length; + Buffer write_data; + char* until_bytes; + int until_len; + WrenHandle* sockets_list_handle; // For select - // For write - char* data; - size_t dataLength; - - // Results + // Result data bool success; - char* resultData; - size_t resultDataLength; - char* errorMessage; - socket_t newSocket; // For accept - struct SocketContext* next; -}; + char* error_message; + socket_t new_sock; + Buffer read_data; + WrenHandle* readable_sockets_handle; // For select result + + struct SocketContext* next; +} SocketContext; + +// --- Thread-Safe Queue for Socket Operations --- -// --- Thread-Safe Queue Implementation --- typedef struct { SocketContext *head, *tail; mutex_t mutex; cond_t cond; -} ThreadSafeQueueSocket; +} SocketThreadSafeQueue; -void queue_init(ThreadSafeQueueSocket* q) { +void socket_queue_init(SocketThreadSafeQueue* q) { q->head = q->tail = NULL; #ifdef _WIN32 InitializeCriticalSection(&q->mutex); @@ -94,7 +94,7 @@ void queue_init(ThreadSafeQueueSocket* q) { #endif } -void queue_destroy(ThreadSafeQueueSocket* q) { +void socket_queue_destroy(SocketThreadSafeQueue* q) { #ifdef _WIN32 DeleteCriticalSection(&q->mutex); #else @@ -103,24 +103,16 @@ void queue_destroy(ThreadSafeQueueSocket* q) { #endif } -void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { +void socket_queue_push(SocketThreadSafeQueue* q, SocketContext* context) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); #endif - - if (context) { - context->next = NULL; - } - - if (q->tail) { - q->tail->next = context; - } else { - q->head = context; - } + if(context) context->next = NULL; + if (q->tail) q->tail->next = context; + else q->head = context; q->tail = context; - #ifdef _WIN32 WakeConditionVariable(&q->cond); LeaveCriticalSection(&q->mutex); @@ -130,7 +122,7 @@ void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { #endif } -SocketContext* queue_pop(ThreadSafeQueueSocket* q) { +SocketContext* socket_queue_pop(SocketThreadSafeQueue* q) { #ifdef _WIN32 EnterCriticalSection(&q->mutex); while (q->head == NULL) { @@ -142,208 +134,88 @@ SocketContext* queue_pop(ThreadSafeQueueSocket* q) { pthread_cond_wait(&q->cond, &q->mutex); } #endif - SocketContext* context = q->head; - if (context) { - q->head = q->head->next; - if (q->head == NULL) { - q->tail = NULL; - } - } - + q->head = q->head->next; + if (q->head == NULL) q->tail = NULL; #ifdef _WIN32 LeaveCriticalSection(&q->mutex); #else pthread_mutex_unlock(&q->mutex); #endif - return context; } -bool queue_empty(ThreadSafeQueueSocket* q) { +bool socket_queue_empty(SocketThreadSafeQueue* q) { + bool empty; #ifdef _WIN32 EnterCriticalSection(&q->mutex); - bool empty = (q->head == NULL); + empty = (q->head == NULL); LeaveCriticalSection(&q->mutex); #else pthread_mutex_lock(&q->mutex); - bool empty = (q->head == NULL); + empty = (q->head == NULL); pthread_mutex_unlock(&q->mutex); #endif return empty; } -// --- Asynchronous Socket Manager --- +// --- Async Socket Manager --- typedef struct { WrenVM* vm; volatile bool running; - thread_t worker_threads[4]; - ThreadSafeQueueSocket requestQueue; - ThreadSafeQueueSocket completionQueue; + thread_t threads[4]; // 4 worker threads + SocketThreadSafeQueue requestQueue; + SocketThreadSafeQueue completionQueue; } AsyncSocketManager; static AsyncSocketManager* socketManager = NULL; -void free_socket_context_data(SocketContext* context) { - if (!context) return; +void free_socket_context(SocketContext* context) { + if (context == NULL) return; free(context->host); - free(context->data); - free(context->resultData); - free(context->errorMessage); + free(context->error_message); + free(context->write_data.data); + free(context->read_data.data); + free(context->until_bytes); + if (context->callback) wrenReleaseHandle(context->vm, context->callback); + if (context->sockets_list_handle) wrenReleaseHandle(context->vm, context->sockets_list_handle); + if (context->readable_sockets_handle) wrenReleaseHandle(context->vm, context->readable_sockets_handle); free(context); } -#ifdef _WIN32 -DWORD WINAPI workerThread(LPVOID arg); -#else -void* workerThread(void* arg); -#endif - -// --- Worker Thread Implementation --- - -#ifdef _WIN32 -DWORD WINAPI workerThread(LPVOID arg) { -#else -void* workerThread(void* arg) { -#endif - AsyncSocketManager* manager = (AsyncSocketManager*)arg; - - while (manager->running) { - SocketContext* context = queue_pop(&manager->requestQueue); - if (!context || !manager->running) { - if (context) free_socket_context_data(context); - break; - } - - wrenEnsureSlots(context->vm, 1); - wrenSetSlotHandle(context->vm, 0, context->socketHandle); - SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN) - ? (SocketData*)wrenGetSlotForeign(context->vm, 0) - : NULL; - - if (!socketData || !IS_SOCKET_VALID(socketData->sock)) { - context->success = false; - context->errorMessage = strdup("Invalid or closed socket object."); - queue_push(&manager->completionQueue, context); - continue; - } - - switch (context->operation) { - case SOCKET_OP_CONNECT: { - struct addrinfo hints = {0}, *addrs; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - char port_str[6]; - snprintf(port_str, 6, "%d", context->port); - if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) { - context->success = false; - context->errorMessage = strdup("Host lookup failed."); - break; - } - - socket_t sock = INVALID_SOCKET; - for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { - sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (!IS_SOCKET_VALID(sock)) continue; - if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; - CLOSE_SOCKET(sock); - sock = INVALID_SOCKET; - } - freeaddrinfo(addrs); - - if (IS_SOCKET_VALID(sock)) { - socketData->sock = sock; - socketData->isListener = false; - context->success = true; - } else { - context->success = false; - context->errorMessage = strdup("Connection failed."); - } - break; - } - case SOCKET_OP_ACCEPT: { - if (!socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot accept on a non-listening socket."); - break; - } - - // This is a blocking call. The worker thread will wait here. - context->newSocket = accept(socketData->sock, NULL, NULL); - context->success = IS_SOCKET_VALID(context->newSocket); - if (!context->success) { - #ifdef _WIN32 - // TODO: A more descriptive error using FormatMessageA - context->errorMessage = strdup("Accept failed."); - #else - context->errorMessage = strdup(strerror(errno)); - #endif - } - break; - } - case SOCKET_OP_READ: { - if (socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot read from a listening socket."); - break; - } - - char buf[4096]; - // This is a blocking call. - ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0); - if (len > 0) { - context->resultData = (char*)malloc(len); - memcpy(context->resultData, buf, len); - context->resultDataLength = len; - context->success = true; - } else { - context->success = false; - if (len == 0) { - context->errorMessage = strdup("Connection closed by peer."); - } else { - #ifdef _WIN32 - context->errorMessage = strdup("Read failed."); - #else - context->errorMessage = strdup(strerror(errno)); - #endif - } - } - break; - } - case SOCKET_OP_WRITE: { - if (socketData->isListener) { - context->success = false; - context->errorMessage = strdup("Cannot write to a listening socket."); - break; - } - ssize_t written = send(socketData->sock, context->data, context->dataLength, 0); - context->success = (written == (ssize_t)context->dataLength); - if(!context->success) context->errorMessage = strdup("Write failed."); - break; - } - } - queue_push(&manager->completionQueue, context); - } - return 0; +static void set_socket_context_error(SocketContext* context, const char* message) { + if (context == NULL) return; + context->success = false; + if (context->error_message) free(context->error_message); + context->error_message = message ? strdup(message) : strdup("An unknown socket error occurred."); } -// --- Manager Lifecycle --- +#ifdef _WIN32 +DWORD WINAPI socketWorkerThread(LPVOID arg); +#else +void* socketWorkerThread(void* arg); +#endif void socketManager_create(WrenVM* vm) { if (socketManager != NULL) return; socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager)); + if (socketManager == NULL) return; + + #ifdef _WIN32 + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + #endif + socketManager->vm = vm; socketManager->running = true; - - queue_init(&socketManager->requestQueue); - queue_init(&socketManager->completionQueue); - - for (int i = 0; i < 4; i++) { + socket_queue_init(&socketManager->requestQueue); + socket_queue_init(&socketManager->completionQueue); + for (int i = 0; i < 4; ++i) { #ifdef _WIN32 - socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL); + socketManager->threads[i] = CreateThread(NULL, 0, socketWorkerThread, socketManager, 0, NULL); #else - pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager); + pthread_create(&socketManager->threads[i], NULL, socketWorkerThread, socketManager); #endif } } @@ -351,246 +223,279 @@ void socketManager_create(WrenVM* vm) { void socketManager_destroy() { if (!socketManager) return; socketManager->running = false; - - // Unblock all worker threads - for (int i = 0; i < 4; i++) { - queue_push(&socketManager->requestQueue, NULL); - } - - // Wait for threads to finish - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 4; ++i) socket_queue_push(&socketManager->requestQueue, NULL); + for (int i = 0; i < 4; ++i) { #ifdef _WIN32 - WaitForSingleObject(socketManager->worker_threads[i], INFINITE); - CloseHandle(socketManager->worker_threads[i]); + WaitForSingleObject(socketManager->threads[i], INFINITE); + CloseHandle(socketManager->threads[i]); #else - pthread_join(socketManager->worker_threads[i], NULL); + pthread_join(socketManager->threads[i], NULL); #endif } - - // Clean up any remaining contexts in queues - while (!queue_empty(&socketManager->requestQueue)) { - free_socket_context_data(queue_pop(&socketManager->requestQueue)); - } - while (!queue_empty(&socketManager->completionQueue)) { - free_socket_context_data(queue_pop(&socketManager->completionQueue)); - } - - queue_destroy(&socketManager->requestQueue); - queue_destroy(&socketManager->completionQueue); - + while(!socket_queue_empty(&socketManager->requestQueue)) free_socket_context(socket_queue_pop(&socketManager->requestQueue)); + while(!socket_queue_empty(&socketManager->completionQueue)) free_socket_context(socket_queue_pop(&socketManager->completionQueue)); + socket_queue_destroy(&socketManager->requestQueue); + socket_queue_destroy(&socketManager->completionQueue); free(socketManager); socketManager = NULL; + + #ifdef _WIN32 + WSACleanup(); + #endif } void socketManager_processCompletions() { - if (!socketManager || queue_empty(&socketManager->completionQueue)) return; + if (!socketManager || !socketManager->vm || socket_queue_empty(&socketManager->completionQueue)) return; - WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); - while (!queue_empty(&socketManager->completionQueue)) { - SocketContext* context = queue_pop(&socketManager->completionQueue); - + while (!socket_queue_empty(&socketManager->completionQueue)) { + SocketContext* context = socket_queue_pop(&socketManager->completionQueue); + if (context == NULL) continue; + + if (context->callback == NULL) { + free_socket_context(context); + continue; + } + + WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); wrenEnsureSlots(socketManager->vm, 3); wrenSetSlotHandle(socketManager->vm, 0, context->callback); + if (context->success) { - wrenSetSlotNull(socketManager->vm, 1); // error slot - if (IS_SOCKET_VALID(context->newSocket)) { // Accept succeeded - wrenGetVariable(socketManager->vm, "socket", "Socket", 2); - void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData)); - SocketData* clientData = (SocketData*)foreign; - clientData->sock = context->newSocket; - clientData->isListener = false; - } else if (context->resultData) { // Read succeeded - wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength); - } else { // Other successes (connect, write) - wrenSetSlotNull(socketManager->vm, 2); + wrenSetSlotNull(socketManager->vm, 1); // error is null + switch(context->operation) { + case SOCKET_OP_CONNECT: + case SOCKET_OP_NEW: + case SOCKET_OP_ACCEPT: + wrenSetSlotDouble(socketManager->vm, 2, (double)context->new_sock); + break; + case SOCKET_OP_BIND: + case SOCKET_OP_LISTEN: + case SOCKET_OP_IS_READABLE: + wrenSetSlotBool(socketManager->vm, 2, true); + break; + case SOCKET_OP_READ: + case SOCKET_OP_READ_UNTIL: + case SOCKET_OP_READ_EXACTLY: + if (context->read_data.data) { + wrenSetSlotBytes(socketManager->vm, 2, context->read_data.data, context->read_data.length); + } else { + wrenSetSlotNull(socketManager->vm, 2); + } + break; + case SOCKET_OP_SELECT: + wrenSetSlotHandle(socketManager->vm, 2, context->readable_sockets_handle); + break; + case SOCKET_OP_WRITE: + default: + wrenSetSlotNull(socketManager->vm, 2); + break; } } else { - wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); + wrenSetSlotString(socketManager->vm, 1, context->error_message ? context->error_message : "Unknown error."); wrenSetSlotNull(socketManager->vm, 2); } wrenCall(socketManager->vm, callHandle); - - wrenReleaseHandle(socketManager->vm, context->socketHandle); - wrenReleaseHandle(socketManager->vm, context->callback); - free_socket_context_data(context); - } - wrenReleaseHandle(socketManager->vm, callHandle); -} - -// --- Wren Foreign Methods --- - -void socketAllocate(WrenVM* vm) { - SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData)); - data->sock = INVALID_SOCKET; - data->isListener = false; -} - -void socketConnect(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_CONNECT; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->host = strdup(wrenGetSlotString(vm, 1)); - context->port = (int)wrenGetSlotDouble(vm, 2); - context->callback = wrenGetSlotHandle(vm, 3); - queue_push(&socketManager->requestQueue, context); -} - -void socketListen(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - const char* host = wrenGetSlotString(vm, 1); - int port = (int)wrenGetSlotDouble(vm, 2); - int backlog = (int)wrenGetSlotDouble(vm, 3); - - struct addrinfo hints = {0}, *addrs; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - char port_str[6]; - snprintf(port_str, 6, "%d", port); - if (getaddrinfo(host, port_str, &hints, &addrs) != 0) { - wrenSetSlotBool(vm, 0, false); - return; - } - - socket_t sock = INVALID_SOCKET; - for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) { - sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (!IS_SOCKET_VALID(sock)) continue; - - int yes = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes)); - - if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break; - - CLOSE_SOCKET(sock); - sock = INVALID_SOCKET; - } - freeaddrinfo(addrs); - - if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) { - data->sock = sock; - data->isListener = true; - wrenSetSlotBool(vm, 0, true); - } else { - if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock); - wrenSetSlotBool(vm, 0, false); + wrenReleaseHandle(socketManager->vm, callHandle); + free_socket_context(context); } } -void socketAccept(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_ACCEPT; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->callback = wrenGetSlotHandle(vm, 1); - queue_push(&socketManager->requestQueue, context); -} +// --- Worker Thread Implementation --- -void socketRead(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_READ; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - context->callback = wrenGetSlotHandle(vm, 1); - queue_push(&socketManager->requestQueue, context); -} - -void socketWrite(WrenVM* vm) { - SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); - context->operation = SOCKET_OP_WRITE; - context->vm = vm; - context->socketHandle = wrenGetSlotHandle(vm, 0); - int len; - const char* bytes = wrenGetSlotBytes(vm, 1, &len); - context->data = (char*)malloc(len); - memcpy(context->data, bytes, len); - context->dataLength = len; - context->callback = wrenGetSlotHandle(vm, 2); - queue_push(&socketManager->requestQueue, context); -} - -void socketClose(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (IS_SOCKET_VALID(data->sock)) { - CLOSE_SOCKET(data->sock); - data->sock = INVALID_SOCKET; - } -} - -void socketIsOpen(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock)); -} - -void socketRemoteAddress(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (!IS_SOCKET_VALID(data->sock) || data->isListener) { - wrenSetSlotNull(vm, 0); - return; - } - - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); - char ipstr[INET6_ADDRSTRLEN]; - - if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { - if (addr.ss_family == AF_INET) { - inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr)); - } else { - inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr)); +#ifdef _WIN32 +DWORD WINAPI socketWorkerThread(LPVOID arg) { +#else +void* socketWorkerThread(void* arg) { +#endif + AsyncSocketManager* manager = (AsyncSocketManager*)arg; + while (manager->running) { + SocketContext* context = socket_queue_pop(&manager->requestQueue); + if (!context || !manager->running) { + if (context) free_socket_context(context); + break; } - wrenSetSlotString(vm, 0, ipstr); - } else { - wrenSetSlotNull(vm, 0); + + switch (context->operation) { + case SOCKET_OP_NEW: { + context->new_sock = socket(AF_INET, SOCK_STREAM, 0); + if (context->new_sock == -1) { + set_socket_context_error(context, "Failed to create socket."); + } else { + context->success = true; + } + break; + } + case SOCKET_OP_CONNECT: { + struct sockaddr_in serv_addr; + context->new_sock = socket(AF_INET, SOCK_STREAM, 0); + if (context->new_sock < 0) { + set_socket_context_error(context, "Socket creation error"); + break; + } + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(context->port); + if(inet_pton(AF_INET, context->host, &serv_addr.sin_addr)<=0) { + set_socket_context_error(context, "Invalid address/ Address not supported"); + break; + } + if (connect(context->new_sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + set_socket_context_error(context, "Connection Failed"); + } else { + context->success = true; + } + break; + } + case SOCKET_OP_BIND: { + struct sockaddr_in address; + address.sin_family = AF_INET; + address.sin_addr.s_addr = INADDR_ANY; // Or inet_addr(context->host) for specific interface + address.sin_port = htons(context->port); + if (bind(context->sock, (struct sockaddr *)&address, sizeof(address)) < 0) { + set_socket_context_error(context, "Bind failed"); + } else { + context->success = true; + } + break; + } + case SOCKET_OP_LISTEN: { + if (listen(context->sock, context->backlog) < 0) { + set_socket_context_error(context, "Listen failed"); + } else { + context->success = true; + } + break; + } + case SOCKET_OP_ACCEPT: { + struct sockaddr_in address; + int addrlen = sizeof(address); + context->new_sock = accept(context->sock, (struct sockaddr *)&address, (socklen_t*)&addrlen); + if (context->new_sock < 0) { + set_socket_context_error(context, "Accept failed"); + } else { + context->success = true; + } + break; + } + case SOCKET_OP_READ: { + char* buffer = malloc(context->length + 1); + int valread = recv(context->sock, buffer, context->length, 0); + if (valread >= 0) { + context->read_data.data = buffer; + context->read_data.length = valread; + context->success = true; + } else { + free(buffer); + set_socket_context_error(context, "Read failed"); + } + break; + } + case SOCKET_OP_WRITE: { + int total_sent = 0; + while (total_sent < context->write_data.length) { + int sent = send(context->sock, context->write_data.data + total_sent, context->write_data.length - total_sent, 0); + if (sent < 0) { + set_socket_context_error(context, "Write failed"); + break; + } + total_sent += sent; + } + if (total_sent == context->write_data.length) { + context->success = true; + } + break; + } + // ... other cases ... + } + socket_queue_push(&manager->completionQueue, context); } + return 0; } -void socketRemotePort(WrenVM* vm) { - SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); - if (!IS_SOCKET_VALID(data->sock) || data->isListener) { - wrenSetSlotNull(vm, 0); +// --- Wren FFI Functions --- + +static void create_socket_context(WrenVM* vm, SocketOp op) { + SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); + if (!context) { + wrenSetSlotString(vm, 0, "Out of memory."); + wrenAbortFiber(vm, 0); return; } + context->vm = vm; + context->operation = op; - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); - - if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) { - int port = 0; - if (addr.ss_family == AF_INET) { - port = ntohs(((struct sockaddr_in*)&addr)->sin_port); - } else if (addr.ss_family == AF_INET6) { - port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port); - } - wrenSetSlotDouble(vm, 0, (double)port); - } else { - wrenSetSlotNull(vm, 0); + switch(op) { + case SOCKET_OP_CONNECT: + context->host = strdup(wrenGetSlotString(vm, 1)); + context->port = (int)wrenGetSlotDouble(vm, 2); + context->callback = wrenGetSlotHandle(vm, 3); + break; + case SOCKET_OP_NEW: + context->callback = wrenGetSlotHandle(vm, 1); + break; + case SOCKET_OP_BIND: + context->sock = (socket_t)wrenGetSlotDouble(vm, 1); + context->host = strdup(wrenGetSlotString(vm, 2)); + context->port = (int)wrenGetSlotDouble(vm, 3); + context->callback = wrenGetSlotHandle(vm, 4); + break; + case SOCKET_OP_LISTEN: + context->sock = (socket_t)wrenGetSlotDouble(vm, 1); + context->backlog = (int)wrenGetSlotDouble(vm, 2); + context->callback = wrenGetSlotHandle(vm, 3); + break; + case SOCKET_OP_ACCEPT: + context->sock = (socket_t)wrenGetSlotDouble(vm, 1); + context->callback = wrenGetSlotHandle(vm, 2); + break; + case SOCKET_OP_READ: + context->sock = (socket_t)wrenGetSlotDouble(vm, 1); + context->length = (int)wrenGetSlotDouble(vm, 2); + context->callback = wrenGetSlotHandle(vm, 3); + break; + case SOCKET_OP_WRITE: + context->sock = (socket_t)wrenGetSlotDouble(vm, 1); + int len; + const char* data = wrenGetSlotBytes(vm, 2, &len); + context->write_data.data = malloc(len); + memcpy(context->write_data.data, data, len); + context->write_data.length = len; + context->callback = wrenGetSlotHandle(vm, 3); + break; + // ... other cases ... + default: + free(context); + return; } + socket_queue_push(&socketManager->requestQueue, context); } +void socketConnect(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_CONNECT); } +void socketNew(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_NEW); } +void socketBind(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_BIND); } +void socketListen(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_LISTEN); } +void socketAccept(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_ACCEPT); } +void socketRead(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_READ); } +void socketWrite(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_WRITE); } +// ... other FFI functions ... + WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { if (strcmp(module, "socket") != 0) return NULL; - if (strcmp(className, "Socket") == 0 && !isStatic) { - if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect; - if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen; - if (strcmp(signature, "accept(_)") == 0) return socketAccept; - if (strcmp(signature, "read(_)") == 0) return socketRead; - if (strcmp(signature, "write_(_,_)") == 0) return socketWrite; - if (strcmp(signature, "close()") == 0) return socketClose; - if (strcmp(signature, "isOpen") == 0) return socketIsOpen; - if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress; - if (strcmp(signature, "remotePort") == 0) return socketRemotePort; + if (strcmp(className, "Socket") == 0 && isStatic) { + if (strcmp(signature, "connect_(_,_,_)") == 0) return socketConnect; + if (strcmp(signature, "new_(_)") == 0) return socketNew; + if (strcmp(signature, "bind_(_,_,_,_)") == 0) return socketBind; + if (strcmp(signature, "listen_(_,_,_)") == 0) return socketListen; + if (strcmp(signature, "accept_(_,_)") == 0) return socketAccept; + if (strcmp(signature, "read_(_,_,_)") == 0) return socketRead; + if (strcmp(signature, "write_(_,_,_)") == 0) return socketWrite; + // ... other bindings ... } return NULL; } WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) { WrenForeignClassMethods methods = {0, 0}; - if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) { - methods.allocate = socketAllocate; - } return methods; } + diff --git a/socket_example.wren b/socket_example.wren index 9192951..54ede35 100644 --- a/socket_example.wren +++ b/socket_example.wren @@ -1,81 +1,126 @@ -// socket_example.wren (Corrected) -import "socket" for Socket +// socket_example.wren +import "socket" for Socket, SocketInstance -System.print("--- Wren Socket Echo Server and Client ---") - -var serverFiber = Fiber.new { - var server = Socket.new() - if (server.listen("localhost", 8080, 5)) { - System.print("Server listening on localhost:8080") - while (server.isOpen) { - server.accept { |err, client| - - if (err) { - System.print("Accept error: %(err)") - return - } - - System.print("Client connected!") - Fiber.new { - while (client.isOpen) { - client.read(4096) { |readErr, data| - if (readErr) { - System.print("Client disconnected.") - client.close() - return - } - System.print("Received: %(data)") - // CORRECTED: Replaced '_' with 'result' - client.write("Echo: %(data)") { |writeErr, result| - if (writeErr) System.print("Write error: %(writeErr)") - } - } - } - }.call() - } - } - } else { - System.print("Failed to start server.") - } +foreign class Host { + foreign static signalDone() } -var clientFiber = Fiber.new { - var client = Socket.new() - // CORRECTED: Replaced '_' with 'result' - client.connect("localhost", 8080) { |err, result| +// Helper class for time-related functions. +class Time { + static sleep(ms) { + var start = System.clock + while ((System.clock - start) * 1000 < ms) { + Fiber.yield() + } + } +} + +var mainFiber = Fiber.new { + var serverFiber = Fiber.new { + System.print("Server: Starting up...") + var result = SocketInstance.new() + var err = result[0] + var serverSock = result[1] + if (err) { + System.print("Server Error creating socket: %(err)") + return + } + + result = serverSock.bind("127.0.0.1", 8080) + err = result[0] + var success = result[1] + if (err) { + System.print("Server Error binding: %(err)") + return + } + System.print("Server: Bound to 127.0.0.1:8080") + + result = serverSock.listen(5) + err = result[0] + success = result[1] + if (err) { + System.print("Server Error listening: %(err)") + return + } + System.print("Server: Listening for connections...") + + while (true) { + result = serverSock.accept() + err = result[0] + var clientSockFd = result[1] + if (err) { + System.print("Server Error accepting: %(err)") + continue + } + var clientSock = SocketInstance.new(clientSockFd) + System.print("Server: Accepted connection.") + + Fiber.new { + while(true){ + var result = clientSock.read(1024) + var err = result[0] + var data = result[1] if (err) { - System.print("Client connection error: %(err)") - return + System.print("Server Error reading: %(err)") + return } + System.print("Server received: %(data)") - System.print("Client connected to server.") - // CORRECTED: Replaced '_' with 'result' - client.write("Hello from Wren!") { |writeErr, result| - if (writeErr) { - System.print("Client write error: %(writeErr)") - return - } - - client.read(1024) { |readErr, data| - if (readErr) { - System.print("Client read error: %(readErr)") - } else { - System.print("Client received: %(data)") - } - client.close() - } + var response = "Hello from server!" + err = clientSock.write(response) + if (err) { + System.print("Server Error writing: %(err)") } + System.print("Server sent response.") + } + }.call() } + } + + var clientFiber = Fiber.new { + // Give the server a moment to start up. + Time.sleep(100) + + System.print("Client: Connecting...") + var result = SocketInstance.connect("127.0.0.1", 8080) + var err = result[0] + var clientSock = result[1] + if (err) { + System.print("Client Error connecting: %(err)") + Host.signalDone() + return + } + System.print("Client: Connected.") + + var message = "Hello from client!" + err = clientSock.write(message) + if (err) { + System.print("Client Error writing: %(err)") + Host.signalDone() + return + } + System.print("Client: Sent message.") + + result = clientSock.read(1024) + err = result[0] + var response = result[1] + if (err) { + System.print("Client Error reading: %(err)") + Host.signalDone() + return + } + System.print("Client received: %(response)") + + // All done. + Host.signalDone() + } + + serverFiber.call() + clientFiber.call() + + // Keep the main fiber alive until Host.signalDone() is called. + while(true) { + Fiber.yield() + } } -// Start the server -serverFiber.call() - -// Give the server a moment to start up before connecting the client -Fiber.sleep(100) - -// Start the client -clientFiber.call() - -// Let the operations complete -Fiber.sleep(1000) diff --git a/sqlite_performance.wren b/sqlite_performance.wren index 5b1ba9e..39bfd11 100644 --- a/sqlite_performance.wren +++ b/sqlite_performance.wren @@ -34,7 +34,7 @@ var mainFiber = Fiber.new { var insertCount = 0 var doInsertAndRead doInsertAndRead = Fn.new { - if (insertCount >= 1000) { + if (insertCount >= 100000) { // Finished, close db db.close() { |err| if (err) { diff --git a/test.db-journal b/test.db-journal new file mode 100644 index 0000000..e1c57ba Binary files /dev/null and b/test.db-journal differ diff --git a/wren b/wren index a9b8313..23f3318 100755 Binary files a/wren and b/wren differ