Working sockets.

This commit is contained in:
retoor 2025-07-30 06:56:55 +02:00
parent b4884196e1
commit 77386df7a6
7 changed files with 1011 additions and 986 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,49 +1,87 @@
// socket.wren (Corrected) // socket.wren
foreign class Socket { foreign class Socket {
// CORRECTED: Changed 'new_' to 'new' to match the standard convention. // Asynchronous static methods that perform raw socket operations.
construct new() {} // These are the direct bindings to the C backend.
foreign static connect_(host, port, callback)
foreign connect(host, port, callback) foreign static new_(callback)
foreign listen(host, port, backlog) foreign static bind_(sock, host, port, callback)
foreign accept(callback) foreign static listen_(sock, backlog, callback)
foreign read(bytes) foreign static accept_(sock, callback)
foreign close() foreign static read_(sock, length, callback)
foreign static write_(sock, data, callback)
foreign isOpen // Additional raw functions can be added here following the pattern.
foreign remoteAddress
foreign remotePort
// Implemented in Wren
write(data, callback) {
write_(data, callback)
}
readUntil(delimiter, callback) {
var buffer = ""
var readChunk
readChunk = Fn.new {
this.read(4096) { |err, data|
if (err) {
callback.call(err, null)
return
}
buffer = buffer + data
var index = buffer.indexOf(delimiter)
if (index != -1) {
var result = buffer.substring(0, index + delimiter.count)
callback.call(null, result)
} else {
// Delimiter not found, read more data.
readChunk.call()
}
}
}
// Start reading.
readChunk.call()
}
// Private foreign method for writing
foreign write_(data, callback)
} }
class SocketInstance {
construct new(socketFd) {
_sock = socketFd
}
// Instance methods providing a more convenient API.
accept() {
var fiber = Fiber.current
Socket.accept_(_sock) { |err, newSock|
fiber.transfer([err, newSock])
}
return Fiber.yield()
}
read(length) {
var fiber = Fiber.current
Socket.read_(_sock, length) { |err, data|
fiber.transfer([err, data])
}
return Fiber.yield()
}
write(data) {
var fiber = Fiber.current
Socket.write_(_sock, data) { |err, nothing|
fiber.transfer(err)
}
return Fiber.yield()
}
// Static methods for creating client and server sockets.
static connect(host, port) {
var fiber = Fiber.current
Socket.connect_(host, port) { |err, sock|
if (err) {
fiber.transfer([err, null])
} else {
fiber.transfer([null, SocketInstance.new(sock)])
}
}
return Fiber.yield()
}
static new() {
var fiber = Fiber.current
Socket.new_() { |err, sock|
if (err) {
fiber.transfer([err, null])
} else {
fiber.transfer([null, SocketInstance.new(sock)])
}
}
return Fiber.yield()
}
bind(host, port) {
var fiber = Fiber.current
Socket.bind_(_sock, host, port) { |err, success|
fiber.transfer([err, success])
}
return Fiber.yield()
}
listen(backlog) {
var fiber = Fiber.current
Socket.listen_(_sock, backlog) { |err, success|
fiber.transfer([err, success])
}
return Fiber.yield()
}
}

View File

@ -3,87 +3,87 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <stdbool.h> #include <stdbool.h>
#include <errno.h>
// Platform-specific includes and definitions
#ifdef _WIN32 #ifdef _WIN32
#include <winsock2.h> #include <winsock2.h>
#include <ws2tcpip.h> #include <ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "ws2_32.lib") #pragma comment(lib, "ws2_32.lib")
typedef SOCKET socket_t; typedef SOCKET socket_t;
typedef int socklen_t;
typedef HANDLE thread_t; typedef HANDLE thread_t;
typedef CRITICAL_SECTION mutex_t; typedef CRITICAL_SECTION mutex_t;
typedef CONDITION_VARIABLE cond_t; typedef CONDITION_VARIABLE cond_t;
#define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET)
#define CLOSE_SOCKET(s) closesocket(s)
#else #else
#include <pthread.h> #include <pthread.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <errno.h>
#include <netdb.h>
#include <sys/select.h> #include <sys/select.h>
#include <fcntl.h>
typedef int socket_t; typedef int socket_t;
typedef pthread_t thread_t; typedef pthread_t thread_t;
typedef pthread_mutex_t mutex_t; typedef pthread_mutex_t mutex_t;
typedef pthread_cond_t cond_t; typedef pthread_cond_t cond_t;
#define INVALID_SOCKET -1 #define closesocket(s) close(s)
#define IS_SOCKET_VALID(s) ((s) >= 0)
#define CLOSE_SOCKET(s) close(s)
#endif #endif
// --- Forward Declarations --- // --- Data Structures ---
typedef struct SocketContext SocketContext;
// --- Socket Data Structures ---
typedef enum { typedef enum {
SOCKET_OP_CONNECT, SOCKET_OP_CONNECT,
SOCKET_OP_NEW,
SOCKET_OP_BIND,
SOCKET_OP_LISTEN,
SOCKET_OP_ACCEPT, SOCKET_OP_ACCEPT,
SOCKET_OP_READ, SOCKET_OP_READ,
SOCKET_OP_READ_UNTIL,
SOCKET_OP_READ_EXACTLY,
SOCKET_OP_WRITE, SOCKET_OP_WRITE,
SOCKET_OP_IS_READABLE,
SOCKET_OP_SELECT
} SocketOp; } SocketOp;
typedef struct { typedef struct {
socket_t sock; char* data;
bool isListener; int length;
} SocketData; } Buffer;
struct SocketContext { typedef struct SocketContext {
SocketOp operation;
WrenVM* vm; WrenVM* vm;
WrenHandle* socketHandle; SocketOp operation;
WrenHandle* callback; WrenHandle* callback;
// For connect // Operation specific data
socket_t sock;
char* host; char* host;
int port; int port;
int backlog;
int length;
Buffer write_data;
char* until_bytes;
int until_len;
WrenHandle* sockets_list_handle; // For select
// For write // Result data
char* data;
size_t dataLength;
// Results
bool success; bool success;
char* resultData; char* error_message;
size_t resultDataLength; socket_t new_sock;
char* errorMessage; Buffer read_data;
socket_t newSocket; // For accept WrenHandle* readable_sockets_handle; // For select result
struct SocketContext* next;
}; struct SocketContext* next;
} SocketContext;
// --- Thread-Safe Queue for Socket Operations ---
// --- Thread-Safe Queue Implementation ---
typedef struct { typedef struct {
SocketContext *head, *tail; SocketContext *head, *tail;
mutex_t mutex; mutex_t mutex;
cond_t cond; cond_t cond;
} ThreadSafeQueueSocket; } SocketThreadSafeQueue;
void queue_init(ThreadSafeQueueSocket* q) { void socket_queue_init(SocketThreadSafeQueue* q) {
q->head = q->tail = NULL; q->head = q->tail = NULL;
#ifdef _WIN32 #ifdef _WIN32
InitializeCriticalSection(&q->mutex); InitializeCriticalSection(&q->mutex);
@ -94,7 +94,7 @@ void queue_init(ThreadSafeQueueSocket* q) {
#endif #endif
} }
void queue_destroy(ThreadSafeQueueSocket* q) { void socket_queue_destroy(SocketThreadSafeQueue* q) {
#ifdef _WIN32 #ifdef _WIN32
DeleteCriticalSection(&q->mutex); DeleteCriticalSection(&q->mutex);
#else #else
@ -103,24 +103,16 @@ void queue_destroy(ThreadSafeQueueSocket* q) {
#endif #endif
} }
void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) { void socket_queue_push(SocketThreadSafeQueue* q, SocketContext* context) {
#ifdef _WIN32 #ifdef _WIN32
EnterCriticalSection(&q->mutex); EnterCriticalSection(&q->mutex);
#else #else
pthread_mutex_lock(&q->mutex); pthread_mutex_lock(&q->mutex);
#endif #endif
if(context) context->next = NULL;
if (context) { if (q->tail) q->tail->next = context;
context->next = NULL; else q->head = context;
}
if (q->tail) {
q->tail->next = context;
} else {
q->head = context;
}
q->tail = context; q->tail = context;
#ifdef _WIN32 #ifdef _WIN32
WakeConditionVariable(&q->cond); WakeConditionVariable(&q->cond);
LeaveCriticalSection(&q->mutex); LeaveCriticalSection(&q->mutex);
@ -130,7 +122,7 @@ void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) {
#endif #endif
} }
SocketContext* queue_pop(ThreadSafeQueueSocket* q) { SocketContext* socket_queue_pop(SocketThreadSafeQueue* q) {
#ifdef _WIN32 #ifdef _WIN32
EnterCriticalSection(&q->mutex); EnterCriticalSection(&q->mutex);
while (q->head == NULL) { while (q->head == NULL) {
@ -142,208 +134,88 @@ SocketContext* queue_pop(ThreadSafeQueueSocket* q) {
pthread_cond_wait(&q->cond, &q->mutex); pthread_cond_wait(&q->cond, &q->mutex);
} }
#endif #endif
SocketContext* context = q->head; SocketContext* context = q->head;
if (context) {
q->head = q->head->next; q->head = q->head->next;
if (q->head == NULL) { if (q->head == NULL) q->tail = NULL;
q->tail = NULL;
}
}
#ifdef _WIN32 #ifdef _WIN32
LeaveCriticalSection(&q->mutex); LeaveCriticalSection(&q->mutex);
#else #else
pthread_mutex_unlock(&q->mutex); pthread_mutex_unlock(&q->mutex);
#endif #endif
return context; return context;
} }
bool queue_empty(ThreadSafeQueueSocket* q) { bool socket_queue_empty(SocketThreadSafeQueue* q) {
bool empty;
#ifdef _WIN32 #ifdef _WIN32
EnterCriticalSection(&q->mutex); EnterCriticalSection(&q->mutex);
bool empty = (q->head == NULL); empty = (q->head == NULL);
LeaveCriticalSection(&q->mutex); LeaveCriticalSection(&q->mutex);
#else #else
pthread_mutex_lock(&q->mutex); pthread_mutex_lock(&q->mutex);
bool empty = (q->head == NULL); empty = (q->head == NULL);
pthread_mutex_unlock(&q->mutex); pthread_mutex_unlock(&q->mutex);
#endif #endif
return empty; return empty;
} }
// --- Asynchronous Socket Manager --- // --- Async Socket Manager ---
typedef struct { typedef struct {
WrenVM* vm; WrenVM* vm;
volatile bool running; volatile bool running;
thread_t worker_threads[4]; thread_t threads[4]; // 4 worker threads
ThreadSafeQueueSocket requestQueue; SocketThreadSafeQueue requestQueue;
ThreadSafeQueueSocket completionQueue; SocketThreadSafeQueue completionQueue;
} AsyncSocketManager; } AsyncSocketManager;
static AsyncSocketManager* socketManager = NULL; static AsyncSocketManager* socketManager = NULL;
void free_socket_context_data(SocketContext* context) { void free_socket_context(SocketContext* context) {
if (!context) return; if (context == NULL) return;
free(context->host); free(context->host);
free(context->data); free(context->error_message);
free(context->resultData); free(context->write_data.data);
free(context->errorMessage); free(context->read_data.data);
free(context->until_bytes);
if (context->callback) wrenReleaseHandle(context->vm, context->callback);
if (context->sockets_list_handle) wrenReleaseHandle(context->vm, context->sockets_list_handle);
if (context->readable_sockets_handle) wrenReleaseHandle(context->vm, context->readable_sockets_handle);
free(context); free(context);
} }
#ifdef _WIN32 static void set_socket_context_error(SocketContext* context, const char* message) {
DWORD WINAPI workerThread(LPVOID arg); if (context == NULL) return;
#else
void* workerThread(void* arg);
#endif
// --- Worker Thread Implementation ---
#ifdef _WIN32
DWORD WINAPI workerThread(LPVOID arg) {
#else
void* workerThread(void* arg) {
#endif
AsyncSocketManager* manager = (AsyncSocketManager*)arg;
while (manager->running) {
SocketContext* context = queue_pop(&manager->requestQueue);
if (!context || !manager->running) {
if (context) free_socket_context_data(context);
break;
}
wrenEnsureSlots(context->vm, 1);
wrenSetSlotHandle(context->vm, 0, context->socketHandle);
SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN)
? (SocketData*)wrenGetSlotForeign(context->vm, 0)
: NULL;
if (!socketData || !IS_SOCKET_VALID(socketData->sock)) {
context->success = false; context->success = false;
context->errorMessage = strdup("Invalid or closed socket object."); if (context->error_message) free(context->error_message);
queue_push(&manager->completionQueue, context); context->error_message = message ? strdup(message) : strdup("An unknown socket error occurred.");
continue;
}
switch (context->operation) {
case SOCKET_OP_CONNECT: {
struct addrinfo hints = {0}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
char port_str[6];
snprintf(port_str, 6, "%d", context->port);
if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) {
context->success = false;
context->errorMessage = strdup("Host lookup failed.");
break;
}
socket_t sock = INVALID_SOCKET;
for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock)) {
socketData->sock = sock;
socketData->isListener = false;
context->success = true;
} else {
context->success = false;
context->errorMessage = strdup("Connection failed.");
}
break;
}
case SOCKET_OP_ACCEPT: {
if (!socketData->isListener) {
context->success = false;
context->errorMessage = strdup("Cannot accept on a non-listening socket.");
break;
}
// This is a blocking call. The worker thread will wait here.
context->newSocket = accept(socketData->sock, NULL, NULL);
context->success = IS_SOCKET_VALID(context->newSocket);
if (!context->success) {
#ifdef _WIN32
// TODO: A more descriptive error using FormatMessageA
context->errorMessage = strdup("Accept failed.");
#else
context->errorMessage = strdup(strerror(errno));
#endif
}
break;
}
case SOCKET_OP_READ: {
if (socketData->isListener) {
context->success = false;
context->errorMessage = strdup("Cannot read from a listening socket.");
break;
}
char buf[4096];
// This is a blocking call.
ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0);
if (len > 0) {
context->resultData = (char*)malloc(len);
memcpy(context->resultData, buf, len);
context->resultDataLength = len;
context->success = true;
} else {
context->success = false;
if (len == 0) {
context->errorMessage = strdup("Connection closed by peer.");
} else {
#ifdef _WIN32
context->errorMessage = strdup("Read failed.");
#else
context->errorMessage = strdup(strerror(errno));
#endif
}
}
break;
}
case SOCKET_OP_WRITE: {
if (socketData->isListener) {
context->success = false;
context->errorMessage = strdup("Cannot write to a listening socket.");
break;
}
ssize_t written = send(socketData->sock, context->data, context->dataLength, 0);
context->success = (written == (ssize_t)context->dataLength);
if(!context->success) context->errorMessage = strdup("Write failed.");
break;
}
}
queue_push(&manager->completionQueue, context);
}
return 0;
} }
// --- Manager Lifecycle --- #ifdef _WIN32
DWORD WINAPI socketWorkerThread(LPVOID arg);
#else
void* socketWorkerThread(void* arg);
#endif
void socketManager_create(WrenVM* vm) { void socketManager_create(WrenVM* vm) {
if (socketManager != NULL) return; if (socketManager != NULL) return;
socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager)); socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager));
if (socketManager == NULL) return;
#ifdef _WIN32
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
socketManager->vm = vm; socketManager->vm = vm;
socketManager->running = true; socketManager->running = true;
socket_queue_init(&socketManager->requestQueue);
queue_init(&socketManager->requestQueue); socket_queue_init(&socketManager->completionQueue);
queue_init(&socketManager->completionQueue); for (int i = 0; i < 4; ++i) {
for (int i = 0; i < 4; i++) {
#ifdef _WIN32 #ifdef _WIN32
socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL); socketManager->threads[i] = CreateThread(NULL, 0, socketWorkerThread, socketManager, 0, NULL);
#else #else
pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager); pthread_create(&socketManager->threads[i], NULL, socketWorkerThread, socketManager);
#endif #endif
} }
} }
@ -351,246 +223,279 @@ void socketManager_create(WrenVM* vm) {
void socketManager_destroy() { void socketManager_destroy() {
if (!socketManager) return; if (!socketManager) return;
socketManager->running = false; socketManager->running = false;
for (int i = 0; i < 4; ++i) socket_queue_push(&socketManager->requestQueue, NULL);
// Unblock all worker threads for (int i = 0; i < 4; ++i) {
for (int i = 0; i < 4; i++) {
queue_push(&socketManager->requestQueue, NULL);
}
// Wait for threads to finish
for (int i = 0; i < 4; i++) {
#ifdef _WIN32 #ifdef _WIN32
WaitForSingleObject(socketManager->worker_threads[i], INFINITE); WaitForSingleObject(socketManager->threads[i], INFINITE);
CloseHandle(socketManager->worker_threads[i]); CloseHandle(socketManager->threads[i]);
#else #else
pthread_join(socketManager->worker_threads[i], NULL); pthread_join(socketManager->threads[i], NULL);
#endif #endif
} }
while(!socket_queue_empty(&socketManager->requestQueue)) free_socket_context(socket_queue_pop(&socketManager->requestQueue));
// Clean up any remaining contexts in queues while(!socket_queue_empty(&socketManager->completionQueue)) free_socket_context(socket_queue_pop(&socketManager->completionQueue));
while (!queue_empty(&socketManager->requestQueue)) { socket_queue_destroy(&socketManager->requestQueue);
free_socket_context_data(queue_pop(&socketManager->requestQueue)); socket_queue_destroy(&socketManager->completionQueue);
}
while (!queue_empty(&socketManager->completionQueue)) {
free_socket_context_data(queue_pop(&socketManager->completionQueue));
}
queue_destroy(&socketManager->requestQueue);
queue_destroy(&socketManager->completionQueue);
free(socketManager); free(socketManager);
socketManager = NULL; socketManager = NULL;
#ifdef _WIN32
WSACleanup();
#endif
} }
void socketManager_processCompletions() { void socketManager_processCompletions() {
if (!socketManager || queue_empty(&socketManager->completionQueue)) return; if (!socketManager || !socketManager->vm || socket_queue_empty(&socketManager->completionQueue)) return;
while (!socket_queue_empty(&socketManager->completionQueue)) {
SocketContext* context = socket_queue_pop(&socketManager->completionQueue);
if (context == NULL) continue;
if (context->callback == NULL) {
free_socket_context(context);
continue;
}
WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)"); WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)");
while (!queue_empty(&socketManager->completionQueue)) {
SocketContext* context = queue_pop(&socketManager->completionQueue);
wrenEnsureSlots(socketManager->vm, 3); wrenEnsureSlots(socketManager->vm, 3);
wrenSetSlotHandle(socketManager->vm, 0, context->callback); wrenSetSlotHandle(socketManager->vm, 0, context->callback);
if (context->success) { if (context->success) {
wrenSetSlotNull(socketManager->vm, 1); // error slot wrenSetSlotNull(socketManager->vm, 1); // error is null
if (IS_SOCKET_VALID(context->newSocket)) { // Accept succeeded switch(context->operation) {
wrenGetVariable(socketManager->vm, "socket", "Socket", 2); case SOCKET_OP_CONNECT:
void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData)); case SOCKET_OP_NEW:
SocketData* clientData = (SocketData*)foreign; case SOCKET_OP_ACCEPT:
clientData->sock = context->newSocket; wrenSetSlotDouble(socketManager->vm, 2, (double)context->new_sock);
clientData->isListener = false; break;
} else if (context->resultData) { // Read succeeded case SOCKET_OP_BIND:
wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength); case SOCKET_OP_LISTEN:
} else { // Other successes (connect, write) case SOCKET_OP_IS_READABLE:
wrenSetSlotBool(socketManager->vm, 2, true);
break;
case SOCKET_OP_READ:
case SOCKET_OP_READ_UNTIL:
case SOCKET_OP_READ_EXACTLY:
if (context->read_data.data) {
wrenSetSlotBytes(socketManager->vm, 2, context->read_data.data, context->read_data.length);
} else {
wrenSetSlotNull(socketManager->vm, 2); wrenSetSlotNull(socketManager->vm, 2);
} }
break;
case SOCKET_OP_SELECT:
wrenSetSlotHandle(socketManager->vm, 2, context->readable_sockets_handle);
break;
case SOCKET_OP_WRITE:
default:
wrenSetSlotNull(socketManager->vm, 2);
break;
}
} else { } else {
wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error."); wrenSetSlotString(socketManager->vm, 1, context->error_message ? context->error_message : "Unknown error.");
wrenSetSlotNull(socketManager->vm, 2); wrenSetSlotNull(socketManager->vm, 2);
} }
wrenCall(socketManager->vm, callHandle); wrenCall(socketManager->vm, callHandle);
wrenReleaseHandle(socketManager->vm, context->socketHandle);
wrenReleaseHandle(socketManager->vm, context->callback);
free_socket_context_data(context);
}
wrenReleaseHandle(socketManager->vm, callHandle); wrenReleaseHandle(socketManager->vm, callHandle);
free_socket_context(context);
}
} }
// --- Wren Foreign Methods --- // --- Worker Thread Implementation ---
void socketAllocate(WrenVM* vm) { #ifdef _WIN32
SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData)); DWORD WINAPI socketWorkerThread(LPVOID arg) {
data->sock = INVALID_SOCKET; #else
data->isListener = false; void* socketWorkerThread(void* arg) {
#endif
AsyncSocketManager* manager = (AsyncSocketManager*)arg;
while (manager->running) {
SocketContext* context = socket_queue_pop(&manager->requestQueue);
if (!context || !manager->running) {
if (context) free_socket_context(context);
break;
}
switch (context->operation) {
case SOCKET_OP_NEW: {
context->new_sock = socket(AF_INET, SOCK_STREAM, 0);
if (context->new_sock == -1) {
set_socket_context_error(context, "Failed to create socket.");
} else {
context->success = true;
}
break;
}
case SOCKET_OP_CONNECT: {
struct sockaddr_in serv_addr;
context->new_sock = socket(AF_INET, SOCK_STREAM, 0);
if (context->new_sock < 0) {
set_socket_context_error(context, "Socket creation error");
break;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(context->port);
if(inet_pton(AF_INET, context->host, &serv_addr.sin_addr)<=0) {
set_socket_context_error(context, "Invalid address/ Address not supported");
break;
}
if (connect(context->new_sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
set_socket_context_error(context, "Connection Failed");
} else {
context->success = true;
}
break;
}
case SOCKET_OP_BIND: {
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY; // Or inet_addr(context->host) for specific interface
address.sin_port = htons(context->port);
if (bind(context->sock, (struct sockaddr *)&address, sizeof(address)) < 0) {
set_socket_context_error(context, "Bind failed");
} else {
context->success = true;
}
break;
}
case SOCKET_OP_LISTEN: {
if (listen(context->sock, context->backlog) < 0) {
set_socket_context_error(context, "Listen failed");
} else {
context->success = true;
}
break;
}
case SOCKET_OP_ACCEPT: {
struct sockaddr_in address;
int addrlen = sizeof(address);
context->new_sock = accept(context->sock, (struct sockaddr *)&address, (socklen_t*)&addrlen);
if (context->new_sock < 0) {
set_socket_context_error(context, "Accept failed");
} else {
context->success = true;
}
break;
}
case SOCKET_OP_READ: {
char* buffer = malloc(context->length + 1);
int valread = recv(context->sock, buffer, context->length, 0);
if (valread >= 0) {
context->read_data.data = buffer;
context->read_data.length = valread;
context->success = true;
} else {
free(buffer);
set_socket_context_error(context, "Read failed");
}
break;
}
case SOCKET_OP_WRITE: {
int total_sent = 0;
while (total_sent < context->write_data.length) {
int sent = send(context->sock, context->write_data.data + total_sent, context->write_data.length - total_sent, 0);
if (sent < 0) {
set_socket_context_error(context, "Write failed");
break;
}
total_sent += sent;
}
if (total_sent == context->write_data.length) {
context->success = true;
}
break;
}
// ... other cases ...
}
socket_queue_push(&manager->completionQueue, context);
}
return 0;
} }
void socketConnect(WrenVM* vm) { // --- Wren FFI Functions ---
static void create_socket_context(WrenVM* vm, SocketOp op) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->operation = SOCKET_OP_CONNECT; if (!context) {
wrenSetSlotString(vm, 0, "Out of memory.");
wrenAbortFiber(vm, 0);
return;
}
context->vm = vm; context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0); context->operation = op;
switch(op) {
case SOCKET_OP_CONNECT:
context->host = strdup(wrenGetSlotString(vm, 1)); context->host = strdup(wrenGetSlotString(vm, 1));
context->port = (int)wrenGetSlotDouble(vm, 2); context->port = (int)wrenGetSlotDouble(vm, 2);
context->callback = wrenGetSlotHandle(vm, 3); context->callback = wrenGetSlotHandle(vm, 3);
queue_push(&socketManager->requestQueue, context); break;
} case SOCKET_OP_NEW:
void socketListen(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
const char* host = wrenGetSlotString(vm, 1);
int port = (int)wrenGetSlotDouble(vm, 2);
int backlog = (int)wrenGetSlotDouble(vm, 3);
struct addrinfo hints = {0}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
char port_str[6];
snprintf(port_str, 6, "%d", port);
if (getaddrinfo(host, port_str, &hints, &addrs) != 0) {
wrenSetSlotBool(vm, 0, false);
return;
}
socket_t sock = INVALID_SOCKET;
for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
int yes = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) {
data->sock = sock;
data->isListener = true;
wrenSetSlotBool(vm, 0, true);
} else {
if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock);
wrenSetSlotBool(vm, 0, false);
}
}
void socketAccept(WrenVM* vm) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->operation = SOCKET_OP_ACCEPT;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->callback = wrenGetSlotHandle(vm, 1); context->callback = wrenGetSlotHandle(vm, 1);
queue_push(&socketManager->requestQueue, context); break;
} case SOCKET_OP_BIND:
context->sock = (socket_t)wrenGetSlotDouble(vm, 1);
void socketRead(WrenVM* vm) { context->host = strdup(wrenGetSlotString(vm, 2));
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->port = (int)wrenGetSlotDouble(vm, 3);
context->operation = SOCKET_OP_READ; context->callback = wrenGetSlotHandle(vm, 4);
context->vm = vm; break;
context->socketHandle = wrenGetSlotHandle(vm, 0); case SOCKET_OP_LISTEN:
context->callback = wrenGetSlotHandle(vm, 1); context->sock = (socket_t)wrenGetSlotDouble(vm, 1);
queue_push(&socketManager->requestQueue, context); context->backlog = (int)wrenGetSlotDouble(vm, 2);
} context->callback = wrenGetSlotHandle(vm, 3);
break;
void socketWrite(WrenVM* vm) { case SOCKET_OP_ACCEPT:
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext)); context->sock = (socket_t)wrenGetSlotDouble(vm, 1);
context->operation = SOCKET_OP_WRITE;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
int len;
const char* bytes = wrenGetSlotBytes(vm, 1, &len);
context->data = (char*)malloc(len);
memcpy(context->data, bytes, len);
context->dataLength = len;
context->callback = wrenGetSlotHandle(vm, 2); context->callback = wrenGetSlotHandle(vm, 2);
queue_push(&socketManager->requestQueue, context); break;
} case SOCKET_OP_READ:
context->sock = (socket_t)wrenGetSlotDouble(vm, 1);
void socketClose(WrenVM* vm) { context->length = (int)wrenGetSlotDouble(vm, 2);
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); context->callback = wrenGetSlotHandle(vm, 3);
if (IS_SOCKET_VALID(data->sock)) { break;
CLOSE_SOCKET(data->sock); case SOCKET_OP_WRITE:
data->sock = INVALID_SOCKET; context->sock = (socket_t)wrenGetSlotDouble(vm, 1);
} int len;
} const char* data = wrenGetSlotBytes(vm, 2, &len);
context->write_data.data = malloc(len);
void socketIsOpen(WrenVM* vm) { memcpy(context->write_data.data, data, len);
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); context->write_data.length = len;
wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock)); context->callback = wrenGetSlotHandle(vm, 3);
} break;
// ... other cases ...
void socketRemoteAddress(WrenVM* vm) { default:
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); free(context);
if (!IS_SOCKET_VALID(data->sock) || data->isListener) {
wrenSetSlotNull(vm, 0);
return; return;
} }
socket_queue_push(&socketManager->requestQueue, context);
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
char ipstr[INET6_ADDRSTRLEN];
if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) {
if (addr.ss_family == AF_INET) {
inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr));
} else {
inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr));
}
wrenSetSlotString(vm, 0, ipstr);
} else {
wrenSetSlotNull(vm, 0);
}
} }
void socketRemotePort(WrenVM* vm) { void socketConnect(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_CONNECT); }
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0); void socketNew(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_NEW); }
if (!IS_SOCKET_VALID(data->sock) || data->isListener) { void socketBind(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_BIND); }
wrenSetSlotNull(vm, 0); void socketListen(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_LISTEN); }
return; void socketAccept(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_ACCEPT); }
} void socketRead(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_READ); }
void socketWrite(WrenVM* vm) { create_socket_context(vm, SOCKET_OP_WRITE); }
struct sockaddr_storage addr; // ... other FFI functions ...
socklen_t len = sizeof(addr);
if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) {
int port = 0;
if (addr.ss_family == AF_INET) {
port = ntohs(((struct sockaddr_in*)&addr)->sin_port);
} else if (addr.ss_family == AF_INET6) {
port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port);
}
wrenSetSlotDouble(vm, 0, (double)port);
} else {
wrenSetSlotNull(vm, 0);
}
}
WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) { WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) {
if (strcmp(module, "socket") != 0) return NULL; if (strcmp(module, "socket") != 0) return NULL;
if (strcmp(className, "Socket") == 0 && !isStatic) { if (strcmp(className, "Socket") == 0 && isStatic) {
if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect; if (strcmp(signature, "connect_(_,_,_)") == 0) return socketConnect;
if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen; if (strcmp(signature, "new_(_)") == 0) return socketNew;
if (strcmp(signature, "accept(_)") == 0) return socketAccept; if (strcmp(signature, "bind_(_,_,_,_)") == 0) return socketBind;
if (strcmp(signature, "read(_)") == 0) return socketRead; if (strcmp(signature, "listen_(_,_,_)") == 0) return socketListen;
if (strcmp(signature, "write_(_,_)") == 0) return socketWrite; if (strcmp(signature, "accept_(_,_)") == 0) return socketAccept;
if (strcmp(signature, "close()") == 0) return socketClose; if (strcmp(signature, "read_(_,_,_)") == 0) return socketRead;
if (strcmp(signature, "isOpen") == 0) return socketIsOpen; if (strcmp(signature, "write_(_,_,_)") == 0) return socketWrite;
if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress; // ... other bindings ...
if (strcmp(signature, "remotePort") == 0) return socketRemotePort;
} }
return NULL; return NULL;
} }
WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) { WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) {
WrenForeignClassMethods methods = {0, 0}; WrenForeignClassMethods methods = {0, 0};
if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) {
methods.allocate = socketAllocate;
}
return methods; return methods;
} }

View File

@ -1,81 +1,126 @@
// socket_example.wren (Corrected) // socket_example.wren
import "socket" for Socket import "socket" for Socket, SocketInstance
System.print("--- Wren Socket Echo Server and Client ---") foreign class Host {
foreign static signalDone()
}
var serverFiber = Fiber.new { // Helper class for time-related functions.
var server = Socket.new() class Time {
if (server.listen("localhost", 8080, 5)) { static sleep(ms) {
System.print("Server listening on localhost:8080") var start = System.clock
while (server.isOpen) { while ((System.clock - start) * 1000 < ms) {
server.accept { |err, client| Fiber.yield()
}
}
}
var mainFiber = Fiber.new {
var serverFiber = Fiber.new {
System.print("Server: Starting up...")
var result = SocketInstance.new()
var err = result[0]
var serverSock = result[1]
if (err) { if (err) {
System.print("Accept error: %(err)") System.print("Server Error creating socket: %(err)")
return return
} }
System.print("Client connected!") result = serverSock.bind("127.0.0.1", 8080)
Fiber.new { err = result[0]
while (client.isOpen) { var success = result[1]
client.read(4096) { |readErr, data| if (err) {
if (readErr) { System.print("Server Error binding: %(err)")
System.print("Client disconnected.")
client.close()
return return
} }
System.print("Received: %(data)") System.print("Server: Bound to 127.0.0.1:8080")
// CORRECTED: Replaced '_' with 'result'
client.write("Echo: %(data)") { |writeErr, result| result = serverSock.listen(5)
if (writeErr) System.print("Write error: %(writeErr)") err = result[0]
success = result[1]
if (err) {
System.print("Server Error listening: %(err)")
return
} }
System.print("Server: Listening for connections...")
while (true) {
result = serverSock.accept()
err = result[0]
var clientSockFd = result[1]
if (err) {
System.print("Server Error accepting: %(err)")
continue
} }
var clientSock = SocketInstance.new(clientSockFd)
System.print("Server: Accepted connection.")
Fiber.new {
while(true){
var result = clientSock.read(1024)
var err = result[0]
var data = result[1]
if (err) {
System.print("Server Error reading: %(err)")
return
}
System.print("Server received: %(data)")
var response = "Hello from server!"
err = clientSock.write(response)
if (err) {
System.print("Server Error writing: %(err)")
}
System.print("Server sent response.")
} }
}.call() }.call()
} }
} }
} else {
System.print("Failed to start server.")
}
}
var clientFiber = Fiber.new { var clientFiber = Fiber.new {
var client = Socket.new() // Give the server a moment to start up.
// CORRECTED: Replaced '_' with 'result' Time.sleep(100)
client.connect("localhost", 8080) { |err, result|
System.print("Client: Connecting...")
var result = SocketInstance.connect("127.0.0.1", 8080)
var err = result[0]
var clientSock = result[1]
if (err) { if (err) {
System.print("Client connection error: %(err)") System.print("Client Error connecting: %(err)")
Host.signalDone()
return return
} }
System.print("Client: Connected.")
System.print("Client connected to server.") var message = "Hello from client!"
// CORRECTED: Replaced '_' with 'result' err = clientSock.write(message)
client.write("Hello from Wren!") { |writeErr, result| if (err) {
if (writeErr) { System.print("Client Error writing: %(err)")
System.print("Client write error: %(writeErr)") Host.signalDone()
return return
} }
System.print("Client: Sent message.")
client.read(1024) { |readErr, data| result = clientSock.read(1024)
if (readErr) { err = result[0]
System.print("Client read error: %(readErr)") var response = result[1]
} else { if (err) {
System.print("Client received: %(data)") System.print("Client Error reading: %(err)")
} Host.signalDone()
client.close() return
} }
System.print("Client received: %(response)")
// All done.
Host.signalDone()
} }
serverFiber.call()
clientFiber.call()
// Keep the main fiber alive until Host.signalDone() is called.
while(true) {
Fiber.yield()
} }
} }
// Start the server
serverFiber.call()
// Give the server a moment to start up before connecting the client
Fiber.sleep(100)
// Start the client
clientFiber.call()
// Let the operations complete
Fiber.sleep(1000)

View File

@ -34,7 +34,7 @@ var mainFiber = Fiber.new {
var insertCount = 0 var insertCount = 0
var doInsertAndRead var doInsertAndRead
doInsertAndRead = Fn.new { doInsertAndRead = Fn.new {
if (insertCount >= 1000) { if (insertCount >= 100000) {
// Finished, close db // Finished, close db
db.close() { |err| db.close() { |err|
if (err) { if (err) {

BIN
test.db-journal Normal file

Binary file not shown.

BIN
wren

Binary file not shown.