This commit is contained in:
retoor 2025-07-29 14:38:46 +02:00
parent 9f27915cab
commit 141a4562fd
4 changed files with 1 additions and 12704 deletions

11573
httplib.h

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ var mainFiber = Fiber.new {
System.print("Request #%(i) [GET] Error: %(err)")
} else {
System.print("Request #%(i) [GET] Status: %(res.statusCode)")
System.print("Request #%(i) [GET] Status: %(res.body)")
}
// CORRECTED: Create a new fiber for each atomic operation
Fiber.new { completed = completed + 1 }.call()

View File

@ -1,384 +0,0 @@
// socket_backend.cpp (Native Sockets Implementation)
#include "wren.h"
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <chrono>
#include <string.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET socket_t;
#define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET)
#define CLOSE_SOCKET(s) closesocket(s)
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <cerrno>
typedef int socket_t;
#define INVALID_SOCKET -1
#define IS_SOCKET_VALID(s) ((s) >= 0)
#define CLOSE_SOCKET(s) close(s)
#endif
// --- Thread-Safe Queue for Asynchronous Operations ---
template <typename T>
class ThreadSafeQueue {
public:
void push(T item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
cond_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return !queue_.empty(); });
T item = queue_.front();
queue_.pop();
return item;
}
bool empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
// --- Context for Asynchronous Socket Operations ---
enum class SocketOp {
CONNECT,
ACCEPT,
READ,
WRITE
};
struct SocketData {
socket_t sock;
bool isListener;
};
struct SocketContext {
SocketOp operation;
WrenVM* vm;
WrenHandle* socketHandle;
WrenHandle* callback;
// Operation-specific data
std::string host;
int port;
std::string data;
int bytesToRead;
// Result data
bool success;
std::string resultData;
std::string errorMessage;
socket_t newSocket; // For accepted connections
};
// --- Asynchronous Socket Manager ---
class AsyncSocketManager {
public:
AsyncSocketManager(WrenVM* vm) : vm_(vm), running_(true) {
for (int i = 0; i < 4; ++i) {
threads_.emplace_back([this] { workerThread(); });
}
}
~AsyncSocketManager() {
running_ = false;
for (size_t i = 0; i < threads_.size(); ++i) {
requestQueue_.push(nullptr);
}
for (auto& thread : threads_) {
thread.join();
}
}
void submit(SocketContext* context) {
requestQueue_.push(context);
}
bool completionQueueEmpty() {
return completionQueue_.empty();
}
void processCompletions() {
while (!completionQueue_.empty()) {
SocketContext* context = completionQueue_.pop();
WrenHandle* callHandle = wrenMakeCallHandle(vm_, "call(_,_)");
wrenEnsureSlots(vm_, 3);
wrenSetSlotHandle(vm_, 0, context->callback);
if (context->success) {
wrenSetSlotNull(vm_, 1); // No error
switch (context->operation) {
case SocketOp::ACCEPT: {
wrenGetVariable(vm_, "socket", "Socket", 2);
void* foreign = wrenSetSlotNewForeign(vm_, 2, 2, sizeof(SocketData));
SocketData* clientData = (SocketData*)foreign;
clientData->sock = context->newSocket;
clientData->isListener = false;
break;
}
case SocketOp::READ:
wrenSetSlotBytes(vm_, 2, context->resultData.c_str(), context->resultData.length());
break;
default:
wrenSetSlotNull(vm_, 2);
break;
}
} else {
wrenSetSlotString(vm_, 1, context->errorMessage.c_str());
wrenSetSlotNull(vm_, 2);
}
wrenCall(vm_, callHandle);
wrenReleaseHandle(vm_, context->socketHandle);
wrenReleaseHandle(vm_, context->callback);
wrenReleaseHandle(vm_, callHandle);
delete context;
}
}
private:
void workerThread() {
while (running_) {
SocketContext* context = requestQueue_.pop();
if (!context || !running_) break;
wrenEnsureSlots(context->vm, 1);
wrenSetSlotHandle(context->vm, 0, context->socketHandle);
SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN)
? (SocketData*)wrenGetSlotForeign(context->vm, 0)
: nullptr;
if (!socketData) {
context->success = false;
context->errorMessage = "Invalid socket object.";
completionQueue_.push(context);
continue;
}
switch (context->operation) {
case SocketOp::CONNECT: {
addrinfo hints = {}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
getaddrinfo(context->host.c_str(), std::to_string(context->port).c_str(), &hints, &addrs);
socket_t sock = INVALID_SOCKET;
for (addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock)) {
socketData->sock = sock;
socketData->isListener = false;
context->success = true;
} else {
context->success = false;
context->errorMessage = "Connection failed.";
}
break;
}
case SocketOp::ACCEPT: {
if (socketData->isListener) {
context->newSocket = accept(socketData->sock, nullptr, nullptr);
context->success = IS_SOCKET_VALID(context->newSocket);
if (!context->success) context->errorMessage = "Accept failed.";
} else {
context->success = false;
context->errorMessage = "Cannot accept on a non-listening socket.";
}
break;
}
case SocketOp::READ: {
if (!socketData->isListener) {
char buf[4096];
ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0);
if (len > 0) {
context->resultData.assign(buf, len);
context->success = true;
} else {
context->success = false;
context->errorMessage = "Read failed or connection closed.";
}
}
break;
}
case SocketOp::WRITE: {
if (!socketData->isListener) {
ssize_t written = send(socketData->sock, context->data.c_str(), context->data.length(), 0);
context->success = (written == (ssize_t)context->data.length());
if(!context->success) context->errorMessage = "Write failed.";
}
break;
}
}
completionQueue_.push(context);
}
}
WrenVM* vm_;
std::atomic<bool> running_;
std::vector<std::thread> threads_;
ThreadSafeQueue<SocketContext*> requestQueue_;
ThreadSafeQueue<SocketContext*> completionQueue_;
};
static AsyncSocketManager* socketManager = nullptr;
// --- Socket Foreign Class/Methods ---
void socketAllocate(WrenVM* vm) {
SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData));
data->sock = INVALID_SOCKET;
data->isListener = false;
}
void socketConnect(WrenVM* vm) {
SocketContext* context = new SocketContext();
context->operation = SocketOp::CONNECT;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->host = wrenGetSlotString(vm, 1);
context->port = (int)wrenGetSlotDouble(vm, 2);
context->callback = wrenGetSlotHandle(vm, 3);
socketManager->submit(context);
wrenSetSlotNull(vm, 0);
}
void socketListen(WrenVM* vm) {
const char* host = wrenGetSlotString(vm, 1);
int port = (int)wrenGetSlotDouble(vm, 2);
int backlog = (int)wrenGetSlotDouble(vm, 3);
addrinfo hints = {}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
getaddrinfo(host, std::to_string(port).c_str(), &hints, &addrs);
socket_t sock = INVALID_SOCKET;
for (addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
int yes = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
data->sock = sock;
data->isListener = true;
wrenSetSlotBool(vm, 0, true);
} else {
if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock);
wrenSetSlotBool(vm, 0, false);
}
}
void socketAccept(WrenVM* vm) {
SocketContext* context = new SocketContext();
context->operation = SocketOp::ACCEPT;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->callback = wrenGetSlotHandle(vm, 1);
socketManager->submit(context);
wrenSetSlotNull(vm, 0);
}
void socketRead(WrenVM* vm) {
SocketContext* context = new SocketContext();
context->operation = SocketOp::READ;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->bytesToRead = (int)wrenGetSlotDouble(vm, 1);
context->callback = wrenGetSlotHandle(vm, 2);
socketManager->submit(context);
wrenSetSlotNull(vm, 0);
}
void socketWrite(WrenVM* vm) {
SocketContext* context = new SocketContext();
context->operation = SocketOp::WRITE;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
int len;
const char* bytes = wrenGetSlotBytes(vm, 1, &len);
context->data.assign(bytes, len);
context->callback = wrenGetSlotHandle(vm, 2);
socketManager->submit(context);
wrenSetSlotNull(vm, 0);
}
void socketClose(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
if (IS_SOCKET_VALID(data->sock)) {
CLOSE_SOCKET(data->sock);
data->sock = INVALID_SOCKET;
}
}
void socketIsOpen(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock));
}
WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) {
if (strcmp(module, "socket") != 0) return NULL;
if (strcmp(className, "Socket") == 0 && !isStatic) {
if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect;
if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen;
if (strcmp(signature, "accept(_)") == 0) return socketAccept;
if (strcmp(signature, "read(_,_)") == 0) return socketRead;
if (strcmp(signature, "write(_,_)") == 0) return socketWrite;
if (strcmp(signature, "close()") == 0) return socketClose;
if (strcmp(signature, "isOpen") == 0) return socketIsOpen;
}
return NULL;
}
WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) {
WrenForeignClassMethods methods = {0};
if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) {
methods.allocate = socketAllocate;
}
return methods;
}

View File

@ -1,747 +0,0 @@
// socket_backend.c (Corrected with better handle safety and non-blocking I/O)
#include "wren.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <time.h>
// Platform-specific includes and definitions
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET socket_t;
typedef int socklen_t;
typedef HANDLE thread_t;
typedef CRITICAL_SECTION mutex_t;
typedef CONDITION_VARIABLE cond_t;
#define IS_SOCKET_VALID(s) ((s) != INVALID_SOCKET)
#define CLOSE_SOCKET(s) closesocket(s)
#else
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <errno.h>
#include <sys/select.h>
typedef int socket_t;
typedef pthread_t thread_t;
typedef pthread_mutex_t mutex_t;
typedef pthread_cond_t cond_t;
#define INVALID_SOCKET -1
#define IS_SOCKET_VALID(s) ((s) >= 0)
#define CLOSE_SOCKET(s) close(s)
#endif
// --- Forward Declarations ---
typedef struct SocketContext SocketContext;
// --- Socket Data Structures ---
typedef enum {
SOCKET_OP_CONNECT,
SOCKET_OP_READ,
SOCKET_OP_WRITE,
} SocketOp;
typedef struct {
socket_t sock;
bool isListener;
} SocketData;
struct SocketContext {
SocketOp operation;
WrenVM* vm;
WrenHandle* socketHandle;
WrenHandle* callback;
char* host;
int port;
char* data;
size_t dataLength;
bool success;
char* resultData;
size_t resultDataLength;
char* errorMessage;
socket_t newSocket;
struct SocketContext* next;
};
// --- Thread-Safe Queue Implementation in C ---
typedef struct {
SocketContext *head, *tail;
mutex_t mutex;
cond_t cond;
} ThreadSafeQueueSocket;
void queue_init(ThreadSafeQueueSocket* q) {
q->head = q->tail = NULL;
#ifdef _WIN32
InitializeCriticalSection(&q->mutex);
InitializeConditionVariable(&q->cond);
#else
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
#endif
}
void queue_destroy(ThreadSafeQueueSocket* q) {
#ifdef _WIN32
DeleteCriticalSection(&q->mutex);
#else
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cond);
#endif
}
void queue_push(ThreadSafeQueueSocket* q, SocketContext* context) {
#ifdef _WIN32
EnterCriticalSection(&q->mutex);
#else
pthread_mutex_lock(&q->mutex);
#endif
if (context) {
context->next = NULL;
}
if (q->tail) {
q->tail->next = context;
} else {
q->head = context;
}
q->tail = context;
#ifdef _WIN32
WakeConditionVariable(&q->cond);
LeaveCriticalSection(&q->mutex);
#else
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->mutex);
#endif
}
SocketContext* queue_pop(ThreadSafeQueueSocket* q) {
#ifdef _WIN32
EnterCriticalSection(&q->mutex);
while (q->head == NULL) {
SleepConditionVariableCS(&q->cond, &q->mutex, INFINITE);
}
#else
pthread_mutex_lock(&q->mutex);
while (q->head == NULL) {
pthread_cond_wait(&q->cond, &q->mutex);
}
#endif
SocketContext* context = q->head;
q->head = q->head->next;
if (q->head == NULL) {
q->tail = NULL;
}
#ifdef _WIN32
LeaveCriticalSection(&q->mutex);
#else
pthread_mutex_unlock(&q->mutex);
#endif
return context;
}
bool queue_empty(ThreadSafeQueueSocket* q) {
#ifdef _WIN32
EnterCriticalSection(&q->mutex);
bool empty = (q->head == NULL);
LeaveCriticalSection(&q->mutex);
#else
pthread_mutex_lock(&q->mutex);
bool empty = (q->head == NULL);
pthread_mutex_unlock(&q->mutex);
#endif
return empty;
}
// --- Asynchronous Socket Manager ---
#define MAX_LISTENERS 64
typedef struct {
WrenVM* vm;
volatile bool running;
thread_t worker_threads[4];
thread_t listener_thread;
ThreadSafeQueueSocket requestQueue;
ThreadSafeQueueSocket completionQueue;
ThreadSafeQueueSocket acceptQueue;
mutex_t listener_mutex;
socket_t listener_sockets[MAX_LISTENERS];
int listener_count;
#ifndef _WIN32
socket_t wake_pipe[2];
#endif
} AsyncSocketManager;
static AsyncSocketManager* socketManager = NULL;
void free_socket_context_data(SocketContext* context) {
if (!context) return;
free(context->host);
free(context->data);
free(context->resultData);
free(context->errorMessage);
free(context);
}
#ifdef _WIN32
DWORD WINAPI workerThread(LPVOID arg);
DWORD WINAPI listenerThread(LPVOID arg);
#else
void* workerThread(void* arg);
void* listenerThread(void* arg);
#endif
// --- Worker and Listener Thread Implementations ---
#ifdef _WIN32
DWORD WINAPI listenerThread(LPVOID arg) {
#else
void* listenerThread(void* arg) {
#endif
AsyncSocketManager* manager = (AsyncSocketManager*)arg;
while (manager->running) {
fd_set read_fds;
FD_ZERO(&read_fds);
socket_t max_fd = 0;
#ifndef _WIN32
FD_SET(manager->wake_pipe[0], &read_fds);
max_fd = manager->wake_pipe[0];
#endif
#ifdef _WIN32
EnterCriticalSection(&manager->listener_mutex);
#else
pthread_mutex_lock(&manager->listener_mutex);
#endif
for (int i = 0; i < manager->listener_count; i++) {
socket_t sock = manager->listener_sockets[i];
if (IS_SOCKET_VALID(sock)) {
FD_SET(sock, &read_fds);
if (sock > max_fd) {
max_fd = sock;
}
}
}
#ifdef _WIN32
LeaveCriticalSection(&manager->listener_mutex);
#else
pthread_mutex_unlock(&manager->listener_mutex);
#endif
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
int activity = select(max_fd + 1, &read_fds, NULL, NULL, &timeout);
if (!manager->running) break;
if (activity < 0) {
#ifndef _WIN32
if (errno != EINTR) {
perror("select error");
}
#endif
continue;
}
if (activity == 0) continue;
#ifndef _WIN32
if (FD_ISSET(manager->wake_pipe[0], &read_fds)) {
char buffer[1];
read(manager->wake_pipe[0], buffer, 1);
}
#endif
#ifdef _WIN32
EnterCriticalSection(&manager->listener_mutex);
#else
pthread_mutex_lock(&manager->listener_mutex);
#endif
for (int i = 0; i < manager->listener_count; i++) {
socket_t sock = manager->listener_sockets[i];
if (IS_SOCKET_VALID(sock) && FD_ISSET(sock, &read_fds)) {
if (!queue_empty(&manager->acceptQueue)) {
SocketContext* context = queue_pop(&manager->acceptQueue);
context->newSocket = accept(sock, NULL, NULL);
context->success = IS_SOCKET_VALID(context->newSocket);
if (!context->success) {
context->errorMessage = strdup("Accept failed.");
}
queue_push(&manager->completionQueue, context);
}
}
}
#ifdef _WIN32
LeaveCriticalSection(&manager->listener_mutex);
#else
pthread_mutex_unlock(&manager->listener_mutex);
#endif
}
return 0;
}
#ifdef _WIN32
DWORD WINAPI workerThread(LPVOID arg) {
#else
void* workerThread(void* arg) {
#endif
AsyncSocketManager* manager = (AsyncSocketManager*)arg;
while (manager->running) {
SocketContext* context = queue_pop(&manager->requestQueue);
if (!context || !manager->running) {
if (context) free_socket_context_data(context);
break;
}
wrenEnsureSlots(context->vm, 1);
wrenSetSlotHandle(context->vm, 0, context->socketHandle);
SocketData* socketData = (wrenGetSlotType(context->vm, 0) == WREN_TYPE_FOREIGN)
? (SocketData*)wrenGetSlotForeign(context->vm, 0)
: NULL;
if (!socketData) {
context->success = false;
context->errorMessage = strdup("Invalid socket object.");
queue_push(&manager->completionQueue, context);
continue;
}
switch (context->operation) {
case SOCKET_OP_CONNECT: {
struct addrinfo hints = {0}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
char port_str[6];
snprintf(port_str, 6, "%d", context->port);
if (getaddrinfo(context->host, port_str, &hints, &addrs) != 0) {
context->success = false;
context->errorMessage = strdup("Host lookup failed.");
break;
}
socket_t sock = INVALID_SOCKET;
for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
if (connect(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock)) {
socketData->sock = sock;
socketData->isListener = false;
context->success = true;
} else {
context->success = false;
context->errorMessage = strdup("Connection failed.");
}
break;
}
case SOCKET_OP_READ: {
if (socketData->isListener) {
context->success = false;
context->errorMessage = strdup("Cannot read from a listening socket.");
break;
}
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(socketData->sock, &read_fds);
struct timeval timeout = { .tv_sec = 5, .tv_usec = 0 }; // 5-second timeout
int activity = select(socketData->sock + 1, &read_fds, NULL, NULL, &timeout);
if (activity > 0 && FD_ISSET(socketData->sock, &read_fds)) {
char buf[4096];
ssize_t len = recv(socketData->sock, buf, sizeof(buf), 0);
if (len > 0) {
context->resultData = (char*)malloc(len);
memcpy(context->resultData, buf, len);
context->resultDataLength = len;
context->success = true;
} else {
context->success = false;
context->errorMessage = strdup("Read failed or connection closed.");
}
} else {
context->success = false;
context->errorMessage = strdup("Read timeout or error.");
}
break;
}
case SOCKET_OP_WRITE: {
if (socketData->isListener) {
context->success = false;
context->errorMessage = strdup("Cannot write to a listening socket.");
break;
}
ssize_t written = send(socketData->sock, context->data, context->dataLength, 0);
context->success = (written == (ssize_t)context->dataLength);
if(!context->success) context->errorMessage = strdup("Write failed.");
break;
}
}
queue_push(&manager->completionQueue, context);
}
return 0;
}
// --- Manager Lifecycle ---
void socketManager_create(WrenVM* vm) {
socketManager = (AsyncSocketManager*)malloc(sizeof(AsyncSocketManager));
socketManager->vm = vm;
socketManager->running = true;
socketManager->listener_count = 0;
queue_init(&socketManager->requestQueue);
queue_init(&socketManager->completionQueue);
queue_init(&socketManager->acceptQueue);
#ifdef _WIN32
InitializeCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_init(&socketManager->listener_mutex, NULL);
#endif
#ifndef _WIN32
if (pipe(socketManager->wake_pipe) == -1) {
perror("pipe");
exit(1);
}
#endif
for (int i = 0; i < 4; i++) {
#ifdef _WIN32
socketManager->worker_threads[i] = CreateThread(NULL, 0, workerThread, socketManager, 0, NULL);
#else
pthread_create(&socketManager->worker_threads[i], NULL, workerThread, socketManager);
#endif
}
#ifdef _WIN32
socketManager->listener_thread = CreateThread(NULL, 0, listenerThread, socketManager, 0, NULL);
#else
pthread_create(&socketManager->listener_thread, NULL, listenerThread, socketManager);
#endif
}
void socketManager_destroy() {
socketManager->running = false;
#ifndef _WIN32
write(socketManager->wake_pipe[1], "w", 1);
#endif
for (int i = 0; i < 4; i++) {
queue_push(&socketManager->requestQueue, NULL);
}
#ifdef _WIN32
WaitForSingleObject(socketManager->listener_thread, INFINITE);
CloseHandle(socketManager->listener_thread);
for (int i = 0; i < 4; i++) {
WaitForSingleObject(socketManager->worker_threads[i], INFINITE);
CloseHandle(socketManager->worker_threads[i]);
}
#else
pthread_join(socketManager->listener_thread, NULL);
for (int i = 0; i < 4; i++) {
pthread_join(socketManager->worker_threads[i], NULL);
}
close(socketManager->wake_pipe[0]);
close(socketManager->wake_pipe[1]);
#endif
queue_destroy(&socketManager->requestQueue);
queue_destroy(&socketManager->completionQueue);
queue_destroy(&socketManager->acceptQueue);
#ifdef _WIN32
DeleteCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_destroy(&socketManager->listener_mutex);
#endif
free(socketManager);
}
void socketManager_processCompletions() {
WrenHandle* callHandle = wrenMakeCallHandle(socketManager->vm, "call(_,_)");
while (!queue_empty(&socketManager->completionQueue)) {
SocketContext* context = queue_pop(&socketManager->completionQueue);
wrenEnsureSlots(socketManager->vm, 3);
wrenSetSlotHandle(socketManager->vm, 0, context->callback);
if (context->success) {
wrenSetSlotNull(socketManager->vm, 1);
if (IS_SOCKET_VALID(context->newSocket)) {
wrenGetVariable(socketManager->vm, "socket", "Socket", 2);
void* foreign = wrenSetSlotNewForeign(socketManager->vm, 2, 2, sizeof(SocketData));
SocketData* clientData = (SocketData*)foreign;
clientData->sock = context->newSocket;
clientData->isListener = false;
} else if (context->resultData) {
wrenSetSlotBytes(socketManager->vm, 2, context->resultData, context->resultDataLength);
} else {
wrenSetSlotNull(socketManager->vm, 2);
}
} else {
wrenSetSlotString(socketManager->vm, 1, context->errorMessage ? context->errorMessage : "Unknown error.");
wrenSetSlotNull(socketManager->vm, 2);
}
wrenCall(socketManager->vm, callHandle);
// Safely release handles here on the main thread
wrenReleaseHandle(socketManager->vm, context->socketHandle);
wrenReleaseHandle(socketManager->vm, context->callback);
free_socket_context_data(context);
}
wrenReleaseHandle(socketManager->vm, callHandle);
}
// ... (The rest of the foreign functions from socketAllocate onwards are identical to the previous response) ...
void socketAllocate(WrenVM* vm) {
SocketData* data = (SocketData*)wrenSetSlotNewForeign(vm, 0, 0, sizeof(SocketData));
data->sock = INVALID_SOCKET;
data->isListener = false;
}
void socketConnect(WrenVM* vm) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->operation = SOCKET_OP_CONNECT;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->host = strdup(wrenGetSlotString(vm, 1));
context->port = (int)wrenGetSlotDouble(vm, 2);
context->callback = wrenGetSlotHandle(vm, 3);
queue_push(&socketManager->requestQueue, context);
}
void socketListen(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
const char* host = wrenGetSlotString(vm, 1);
int port = (int)wrenGetSlotDouble(vm, 2);
int backlog = (int)wrenGetSlotDouble(vm, 3);
struct addrinfo hints = {0}, *addrs;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
char port_str[6];
snprintf(port_str, 6, "%d", port);
if (getaddrinfo(host, port_str, &hints, &addrs) != 0) {
wrenSetSlotBool(vm, 0, false);
return;
}
socket_t sock = INVALID_SOCKET;
for (struct addrinfo* addr = addrs; addr; addr = addr->ai_next) {
sock = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (!IS_SOCKET_VALID(sock)) continue;
int yes = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
if (bind(sock, addr->ai_addr, (int)addr->ai_addrlen) == 0) break;
CLOSE_SOCKET(sock);
sock = INVALID_SOCKET;
}
freeaddrinfo(addrs);
if (IS_SOCKET_VALID(sock) && listen(sock, backlog) == 0) {
data->sock = sock;
data->isListener = true;
#ifdef _WIN32
EnterCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_lock(&socketManager->listener_mutex);
#endif
if (socketManager->listener_count < MAX_LISTENERS) {
socketManager->listener_sockets[socketManager->listener_count++] = sock;
}
#ifdef _WIN32
LeaveCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_unlock(&socketManager->listener_mutex);
#endif
#ifndef _WIN32
write(socketManager->wake_pipe[1], "w", 1);
#endif
wrenSetSlotBool(vm, 0, true);
} else {
if(IS_SOCKET_VALID(sock)) CLOSE_SOCKET(sock);
wrenSetSlotBool(vm, 0, false);
}
}
void socketAccept(WrenVM* vm) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->callback = wrenGetSlotHandle(vm, 1);
queue_push(&socketManager->acceptQueue, context);
}
void socketRead(WrenVM* vm) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->operation = SOCKET_OP_READ;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
context->callback = wrenGetSlotHandle(vm, 1);
queue_push(&socketManager->requestQueue, context);
}
void socketWrite(WrenVM* vm) {
SocketContext* context = (SocketContext*)calloc(1, sizeof(SocketContext));
context->operation = SOCKET_OP_WRITE;
context->vm = vm;
context->socketHandle = wrenGetSlotHandle(vm, 0);
int len;
const char* bytes = wrenGetSlotBytes(vm, 1, &len);
context->data = (char*)malloc(len);
memcpy(context->data, bytes, len);
context->dataLength = len;
context->callback = wrenGetSlotHandle(vm, 2);
queue_push(&socketManager->requestQueue, context);
}
void socketClose(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
if (IS_SOCKET_VALID(data->sock)) {
if (data->isListener) {
#ifdef _WIN32
EnterCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_lock(&socketManager->listener_mutex);
#endif
for (int i = 0; i < socketManager->listener_count; i++) {
if (socketManager->listener_sockets[i] == data->sock) {
socketManager->listener_sockets[i] = socketManager->listener_sockets[socketManager->listener_count - 1];
socketManager->listener_count--;
break;
}
}
#ifdef _WIN32
LeaveCriticalSection(&socketManager->listener_mutex);
#else
pthread_mutex_unlock(&socketManager->listener_mutex);
#endif
}
CLOSE_SOCKET(data->sock);
data->sock = INVALID_SOCKET;
}
}
void socketIsOpen(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
wrenSetSlotBool(vm, 0, IS_SOCKET_VALID(data->sock));
}
void socketRemoteAddress(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
if (!IS_SOCKET_VALID(data->sock) || data->isListener) {
wrenSetSlotNull(vm, 0);
return;
}
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
char ipstr[INET6_ADDRSTRLEN];
if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) {
if (addr.ss_family == AF_INET) {
inet_ntop(AF_INET, &((struct sockaddr_in*)&addr)->sin_addr, ipstr, sizeof(ipstr));
} else {
inet_ntop(AF_INET6, &((struct sockaddr_in6*)&addr)->sin6_addr, ipstr, sizeof(ipstr));
}
wrenSetSlotString(vm, 0, ipstr);
} else {
wrenSetSlotNull(vm, 0);
}
}
void socketRemotePort(WrenVM* vm) {
SocketData* data = (SocketData*)wrenGetSlotForeign(vm, 0);
if (!IS_SOCKET_VALID(data->sock) || data->isListener) {
wrenSetSlotNull(vm, 0);
return;
}
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
if (getpeername(data->sock, (struct sockaddr*)&addr, &len) == 0) {
int port = 0;
if (addr.ss_family == AF_INET) {
port = ntohs(((struct sockaddr_in*)&addr)->sin_port);
} else if (addr.ss_family == AF_INET6) {
port = ntohs(((struct sockaddr_in6*)&addr)->sin6_port);
}
wrenSetSlotDouble(vm, 0, (double)port);
} else {
wrenSetSlotNull(vm, 0);
}
}
WrenForeignMethodFn bindSocketForeignMethod(WrenVM* vm, const char* module, const char* className, bool isStatic, const char* signature) {
if (strcmp(module, "socket") != 0) return NULL;
if (strcmp(className, "Socket") == 0 && !isStatic) {
if (strcmp(signature, "connect(_,_,_)") == 0) return socketConnect;
if (strcmp(signature, "listen(_,_,_)") == 0) return socketListen;
if (strcmp(signature, "accept(_)") == 0) return socketAccept;
// NOTE: The signature for read() in Wren takes one argument (the callback) now.
if (strcmp(signature, "read(_)") == 0) return socketRead;
if (strcmp(signature, "write_(_,_)") == 0) return socketWrite;
if (strcmp(signature, "close()") == 0) return socketClose;
if (strcmp(signature, "isOpen") == 0) return socketIsOpen;
if (strcmp(signature, "remoteAddress") == 0) return socketRemoteAddress;
if (strcmp(signature, "remotePort") == 0) return socketRemotePort;
}
return NULL;
}
WrenForeignClassMethods bindSocketForeignClass(WrenVM* vm, const char* module, const char* className) {
WrenForeignClassMethods methods = {0, 0};
if (strcmp(module, "socket") == 0 && strcmp(className, "Socket") == 0) {
methods.allocate = socketAllocate;
}
return methods;
}