|
/*
|
|
* DWN - Desktop Window Manager
|
|
* retoor <retoor@molodetz.nl>
|
|
* Extensive Abstract Threading Framework
|
|
*
|
|
* This module provides a comprehensive, abstract threading system with:
|
|
* - Lock-free data structures
|
|
* - Thread pools with work stealing
|
|
* - Async/await pattern support
|
|
* - Thread-safe event bus
|
|
* - Memory barriers and atomic operations
|
|
*/
|
|
|
|
#ifndef DWN_THREADING_H
|
|
#define DWN_THREADING_H
|
|
|
|
#include <stdbool.h>
|
|
#include <stdint.h>
|
|
#include <stddef.h>
|
|
#include <pthread.h>
|
|
#include <stdatomic.h>
|
|
|
|
/* ============================================================================
|
|
* Platform Abstraction Layer
|
|
* ============================================================================ */
|
|
|
|
/* Cache line size (commonly 64 bytes on x86_64) */
|
|
#define CACHE_LINE_SIZE 64
|
|
|
|
/* Align to cache line to prevent false sharing */
|
|
#define CACHE_ALIGN __attribute__((aligned(CACHE_LINE_SIZE)))
|
|
|
|
/* Memory ordering shortcuts */
|
|
#define ATOMIC_RELAXED memory_order_relaxed
|
|
#define ATOMIC_ACQUIRE memory_order_acquire
|
|
#define ATOMIC_RELEASE memory_order_release
|
|
#define ATOMIC_ACQ_REL memory_order_acq_rel
|
|
#define ATOMIC_SEQ_CST memory_order_seq_cst
|
|
|
|
/* ============================================================================
|
|
* Core Types and Status Codes
|
|
* ============================================================================ */
|
|
|
|
typedef enum {
|
|
THREAD_OK = 0,
|
|
THREAD_ERROR = -1,
|
|
THREAD_ERROR_NOMEM = -2,
|
|
THREAD_ERROR_BUSY = -3,
|
|
THREAD_ERROR_CLOSED = -4,
|
|
THREAD_ERROR_TIMEOUT = -5,
|
|
THREAD_ERROR_INVALID = -6
|
|
} ThreadStatus;
|
|
|
|
typedef enum {
|
|
TASK_PRIORITY_CRITICAL = 0, /* UI-critical, must execute immediately */
|
|
TASK_PRIORITY_HIGH = 1, /* User-initiated actions */
|
|
TASK_PRIORITY_NORMAL = 2, /* Default background work */
|
|
TASK_PRIORITY_LOW = 3, /* Maintenance tasks */
|
|
TASK_PRIORITY_IDLE = 4, /* Only when system idle */
|
|
TASK_PRIORITY_COUNT
|
|
} TaskPriority;
|
|
|
|
typedef enum {
|
|
TASK_STATE_PENDING = 0,
|
|
TASK_STATE_RUNNING = 1,
|
|
TASK_STATE_COMPLETED = 2,
|
|
TASK_STATE_CANCELLED = 3,
|
|
TASK_STATE_ERROR = 4
|
|
} TaskState;
|
|
|
|
/* Forward declarations */
|
|
typedef struct ThreadPool ThreadPool;
|
|
typedef struct Task Task;
|
|
typedef struct Channel Channel;
|
|
typedef struct Future Future;
|
|
typedef struct EventBus EventBus;
|
|
typedef struct AsyncContext AsyncContext;
|
|
typedef struct Worker Worker;
|
|
|
|
/* ============================================================================
|
|
* Task System - Abstract Unit of Work
|
|
* ============================================================================ */
|
|
|
|
/* Task function signature - receives user data and cancellation flag */
|
|
typedef void (*TaskFunc)(void *user_data, atomic_int *cancelled);
|
|
|
|
/* Callback for task completion (called in worker thread) */
|
|
typedef void (*TaskCallback)(Task *task, void *user_data, ThreadStatus status);
|
|
|
|
/* Task handle - opaque pointer */
|
|
typedef Task* TaskHandle;
|
|
|
|
/**
|
|
* Create a new task
|
|
* @param func The function to execute
|
|
* @param user_data Data passed to the function
|
|
* @param priority Task priority level
|
|
* @return Task handle or NULL on error
|
|
*/
|
|
TaskHandle task_create(TaskFunc func, void *user_data, TaskPriority priority);
|
|
|
|
/**
|
|
* Create a task with completion callback
|
|
*/
|
|
TaskHandle task_create_with_callback(TaskFunc func, void *user_data,
|
|
TaskPriority priority,
|
|
TaskCallback on_complete,
|
|
void *callback_data);
|
|
|
|
/**
|
|
* Destroy a task (only if not submitted)
|
|
*/
|
|
void task_destroy(TaskHandle task);
|
|
|
|
/**
|
|
* Cancel a task (best effort - may already be running)
|
|
*/
|
|
bool task_cancel(TaskHandle task);
|
|
|
|
/**
|
|
* Get current task state
|
|
*/
|
|
TaskState task_get_state(TaskHandle task);
|
|
|
|
/**
|
|
* Wait for task completion (blocking)
|
|
*/
|
|
ThreadStatus task_wait(TaskHandle task, uint64_t timeout_ms);
|
|
|
|
/**
|
|
* Check if task was cancelled
|
|
*/
|
|
bool task_is_cancelled(TaskHandle task);
|
|
|
|
/* ============================================================================
|
|
* Thread Pool - Manage Worker Threads
|
|
* ============================================================================ */
|
|
|
|
/* Thread pool configuration */
|
|
typedef struct {
|
|
uint32_t min_threads; /* Minimum threads to keep alive */
|
|
uint32_t max_threads; /* Maximum threads allowed */
|
|
uint32_t queue_capacity; /* Task queue capacity per priority */
|
|
uint32_t steal_attempts; /* Work stealing attempts before blocking */
|
|
uint64_t idle_timeout_ms; /* Time before idle thread terminates */
|
|
bool enable_work_stealing; /* Enable work stealing between queues */
|
|
} ThreadPoolConfig;
|
|
|
|
/* Default configuration */
|
|
#define THREAD_POOL_DEFAULT_CONFIG ((ThreadPoolConfig){ \
|
|
.min_threads = 2, \
|
|
.max_threads = 8, \
|
|
.queue_capacity = 256, \
|
|
.steal_attempts = 4, \
|
|
.idle_timeout_ms = 60000, \
|
|
.enable_work_stealing = true \
|
|
})
|
|
|
|
/**
|
|
* Create a thread pool
|
|
*/
|
|
ThreadPool* thread_pool_create(const ThreadPoolConfig *config);
|
|
|
|
/**
|
|
* Destroy thread pool, cancelling pending tasks
|
|
*/
|
|
void thread_pool_destroy(ThreadPool *pool);
|
|
|
|
/**
|
|
* Submit a task to the pool
|
|
*/
|
|
ThreadStatus thread_pool_submit(ThreadPool *pool, TaskHandle task);
|
|
|
|
/**
|
|
* Submit a function as a task (convenience)
|
|
*/
|
|
ThreadStatus thread_pool_submit_func(ThreadPool *pool, TaskFunc func,
|
|
void *user_data, TaskPriority priority);
|
|
|
|
/**
|
|
* Get number of active threads
|
|
*/
|
|
uint32_t thread_pool_active_count(ThreadPool *pool);
|
|
|
|
/**
|
|
* Get number of pending tasks
|
|
*/
|
|
uint32_t thread_pool_pending_count(ThreadPool *pool);
|
|
|
|
/**
|
|
* Shutdown pool gracefully, waiting for tasks to complete
|
|
*/
|
|
ThreadStatus thread_pool_shutdown(ThreadPool *pool, uint64_t timeout_ms);
|
|
|
|
/**
|
|
* Get the default/global thread pool
|
|
*/
|
|
ThreadPool* thread_pool_default(void);
|
|
|
|
/**
|
|
* Initialize the default thread pool
|
|
*/
|
|
ThreadStatus thread_pool_init_default(const ThreadPoolConfig *config);
|
|
|
|
/**
|
|
* Shutdown the default thread pool
|
|
*/
|
|
void thread_pool_shutdown_default(void);
|
|
|
|
/* ============================================================================
|
|
* Lock-Free Queue - Single Producer Single Consumer
|
|
* ============================================================================ */
|
|
|
|
#define SPSC_QUEUE_SIZE 1024
|
|
|
|
typedef struct {
|
|
_Atomic uint64_t head CACHE_ALIGN; /* Write index - producer only */
|
|
_Atomic uint64_t tail CACHE_ALIGN; /* Read index - consumer only */
|
|
void *buffer[SPSC_QUEUE_SIZE];
|
|
} SpscQueue;
|
|
|
|
/**
|
|
* Initialize SPSC queue
|
|
*/
|
|
void spsc_queue_init(SpscQueue *q);
|
|
|
|
/**
|
|
* Push item (producer only)
|
|
* @return false if queue full
|
|
*/
|
|
bool spsc_queue_push(SpscQueue *q, void *item);
|
|
|
|
/**
|
|
* Pop item (consumer only)
|
|
* @return false if queue empty
|
|
*/
|
|
bool spsc_queue_pop(SpscQueue *q, void **item);
|
|
|
|
/**
|
|
* Check if queue is empty (consumer only)
|
|
*/
|
|
bool spsc_queue_empty(SpscQueue *q);
|
|
|
|
/**
|
|
* Get approximate size (not synchronized)
|
|
*/
|
|
uint64_t spsc_queue_size_approx(SpscQueue *q);
|
|
|
|
/* ============================================================================
|
|
* Lock-Free Queue - Multi Producer Single Consumer
|
|
* ============================================================================ */
|
|
|
|
typedef struct MpscQueue MpscQueue;
|
|
|
|
/**
|
|
* Create MPSC queue
|
|
*/
|
|
MpscQueue* mpsc_queue_create(uint32_t capacity);
|
|
|
|
/**
|
|
* Destroy MPSC queue
|
|
*/
|
|
void mpsc_queue_destroy(MpscQueue *q);
|
|
|
|
/**
|
|
* Push item (thread-safe, any producer)
|
|
* @return false if queue full
|
|
*/
|
|
bool mpsc_queue_push(MpscQueue *q, void *item);
|
|
|
|
/**
|
|
* Pop item (consumer only - single thread)
|
|
* @return false if queue empty
|
|
*/
|
|
bool mpsc_queue_pop(MpscQueue *q, void **item);
|
|
|
|
/**
|
|
* Check if queue is empty
|
|
*/
|
|
bool mpsc_queue_empty(MpscQueue *q);
|
|
|
|
/* ============================================================================
|
|
* Channel - Thread-Safe Communication
|
|
* ============================================================================ */
|
|
|
|
typedef enum {
|
|
CHANNEL_UNBUFFERED = 0, /* Synchronous - sender blocks until received */
|
|
CHANNEL_BUFFERED = 1 /* Asynchronous - uses internal buffer */
|
|
} ChannelType;
|
|
|
|
/**
|
|
* Create a channel
|
|
* @param capacity Buffer size (0 for unbuffered synchronous channel)
|
|
*/
|
|
Channel* channel_create(uint32_t capacity);
|
|
|
|
/**
|
|
* Destroy channel
|
|
*/
|
|
void channel_destroy(Channel *ch);
|
|
|
|
/**
|
|
* Send data through channel (blocking)
|
|
* @return THREAD_OK on success, THREAD_ERROR_CLOSED if closed
|
|
*/
|
|
ThreadStatus channel_send(Channel *ch, void *data);
|
|
|
|
/**
|
|
* Send with timeout
|
|
*/
|
|
ThreadStatus channel_send_timeout(Channel *ch, void *data, uint64_t timeout_ms);
|
|
|
|
/**
|
|
* Try send (non-blocking)
|
|
*/
|
|
ThreadStatus channel_try_send(Channel *ch, void *data);
|
|
|
|
/**
|
|
* Receive from channel (blocking)
|
|
*/
|
|
ThreadStatus channel_recv(Channel *ch, void **data);
|
|
|
|
/**
|
|
* Receive with timeout
|
|
*/
|
|
ThreadStatus channel_recv_timeout(Channel *ch, void **data, uint64_t timeout_ms);
|
|
|
|
/**
|
|
* Try receive (non-blocking)
|
|
*/
|
|
ThreadStatus channel_try_recv(Channel *ch, void **data);
|
|
|
|
/**
|
|
* Close channel (no more sends allowed)
|
|
*/
|
|
void channel_close(Channel *ch);
|
|
|
|
/**
|
|
* Check if channel is closed
|
|
*/
|
|
bool channel_is_closed(Channel *ch);
|
|
|
|
/**
|
|
* Select on multiple channels (like Go's select)
|
|
* Returns index of ready channel or -1 on timeout
|
|
*/
|
|
int channel_select(Channel **channels, uint32_t count, uint64_t timeout_ms,
|
|
void **out_data);
|
|
|
|
/* ============================================================================
|
|
* Future/Promise - Async Result Handling
|
|
* ============================================================================ */
|
|
|
|
typedef void* FutureResult;
|
|
typedef void (*FutureCallback)(Future *f, FutureResult result, void *user_data);
|
|
|
|
/**
|
|
* Create a future
|
|
*/
|
|
Future* future_create(void);
|
|
|
|
/**
|
|
* Destroy future
|
|
*/
|
|
void future_destroy(Future *f);
|
|
|
|
/**
|
|
* Set the result (called by producer)
|
|
*/
|
|
void future_set_result(Future *f, FutureResult result);
|
|
|
|
/**
|
|
* Set error result
|
|
*/
|
|
void future_set_error(Future *f, int error_code);
|
|
|
|
/**
|
|
* Get result (blocking)
|
|
*/
|
|
FutureResult future_get(Future *f, ThreadStatus *status);
|
|
|
|
/**
|
|
* Get result with timeout
|
|
*/
|
|
FutureResult future_get_timeout(Future *f, uint64_t timeout_ms, ThreadStatus *status);
|
|
|
|
/**
|
|
* Check if future is ready
|
|
*/
|
|
bool future_is_ready(Future *f);
|
|
|
|
/**
|
|
* Attach callback to be called when ready (thread-safe)
|
|
*/
|
|
void future_then(Future *f, FutureCallback callback, void *user_data);
|
|
|
|
/**
|
|
* Create future that completes when all given futures complete
|
|
*/
|
|
Future* future_all(Future **futures, uint32_t count);
|
|
|
|
/**
|
|
* Create future that completes when any given future completes
|
|
*/
|
|
Future* future_any(Future **futures, uint32_t count);
|
|
|
|
/* ============================================================================
|
|
* Event Bus - Thread-Safe Pub/Sub
|
|
* ============================================================================ */
|
|
|
|
typedef uint32_t EventType;
|
|
typedef uint32_t SubscriptionId;
|
|
|
|
/* Event handler signature */
|
|
typedef void (*EventHandler)(EventType type, void *event_data, void *user_data);
|
|
|
|
/* Event filter - return true to allow event */
|
|
typedef bool (*EventFilter)(EventType type, void *event_data, void *user_data);
|
|
|
|
/**
|
|
* Create event bus
|
|
*/
|
|
EventBus* event_bus_create(void);
|
|
|
|
/**
|
|
* Destroy event bus
|
|
*/
|
|
void event_bus_destroy(EventBus *bus);
|
|
|
|
/**
|
|
* Subscribe to event type
|
|
* @return Subscription ID or 0 on error
|
|
*/
|
|
SubscriptionId event_bus_subscribe(EventBus *bus, EventType type,
|
|
EventHandler handler, void *user_data);
|
|
|
|
/**
|
|
* Subscribe with filter
|
|
*/
|
|
SubscriptionId event_bus_subscribe_filtered(EventBus *bus, EventType type,
|
|
EventHandler handler, void *user_data,
|
|
EventFilter filter, void *filter_data);
|
|
|
|
/**
|
|
* Unsubscribe
|
|
*/
|
|
bool event_bus_unsubscribe(EventBus *bus, SubscriptionId id);
|
|
|
|
/**
|
|
* Publish event (thread-safe)
|
|
*/
|
|
void event_bus_publish(EventBus *bus, EventType type, void *event_data);
|
|
|
|
/**
|
|
* Publish event with custom free function
|
|
*/
|
|
void event_bus_publish_owned(EventBus *bus, EventType type, void *event_data,
|
|
void (*free_fn)(void*));
|
|
|
|
/**
|
|
* Process pending events (call from main thread)
|
|
* @return number of events processed
|
|
*/
|
|
uint32_t event_bus_process(EventBus *bus);
|
|
|
|
/**
|
|
* Set processing limit per call
|
|
*/
|
|
void event_bus_set_batch_size(EventBus *bus, uint32_t batch_size);
|
|
|
|
/**
|
|
* Get the global/default event bus
|
|
*/
|
|
EventBus* event_bus_default(void);
|
|
|
|
/**
|
|
* Initialize default event bus
|
|
*/
|
|
bool event_bus_init_default(void);
|
|
|
|
/**
|
|
* Shutdown default event bus
|
|
*/
|
|
void event_bus_shutdown_default(void);
|
|
|
|
/* ============================================================================
|
|
* Async Context - Per-Module Async State
|
|
* ============================================================================ */
|
|
|
|
/* Context for managing async operations within a module */
|
|
struct AsyncContext {
|
|
ThreadPool *pool;
|
|
EventBus *event_bus;
|
|
Channel *completion_channel;
|
|
atomic_int operation_count;
|
|
atomic_int shutdown_requested;
|
|
pthread_mutex_t mutex;
|
|
};
|
|
|
|
/**
|
|
* Create async context
|
|
*/
|
|
AsyncContext* async_context_create(const char *name);
|
|
|
|
/**
|
|
* Destroy async context (cancels all pending operations)
|
|
*/
|
|
void async_context_destroy(AsyncContext *ctx);
|
|
|
|
/**
|
|
* Submit work to context's thread pool
|
|
*/
|
|
TaskHandle async_submit(AsyncContext *ctx, TaskFunc func, void *user_data,
|
|
TaskPriority priority);
|
|
|
|
/**
|
|
* Submit work and get future
|
|
*/
|
|
Future* async_submit_future(AsyncContext *ctx, TaskFunc func, void *user_data,
|
|
TaskPriority priority);
|
|
|
|
/**
|
|
* Check for completed operations (call from main thread)
|
|
*/
|
|
uint32_t async_poll(AsyncContext *ctx);
|
|
|
|
/**
|
|
* Get file descriptor for select() integration
|
|
* Returns -1 if not available
|
|
*/
|
|
int async_get_poll_fd(AsyncContext *ctx);
|
|
|
|
/* ============================================================================
|
|
* Timer/Scheduler - Delayed Execution
|
|
* ============================================================================ */
|
|
|
|
typedef uint64_t TimerId;
|
|
typedef void (*TimerCallback)(TimerId id, void *user_data);
|
|
|
|
/**
|
|
* Schedule one-shot timer
|
|
*/
|
|
TimerId timer_schedule(uint64_t delay_ms, TimerCallback callback, void *user_data);
|
|
|
|
/**
|
|
* Schedule repeating timer
|
|
*/
|
|
TimerId timer_schedule_repeating(uint64_t interval_ms, TimerCallback callback,
|
|
void *user_data);
|
|
|
|
/**
|
|
* Cancel timer
|
|
*/
|
|
bool timer_cancel(TimerId id);
|
|
|
|
/**
|
|
* Check if timer exists
|
|
*/
|
|
bool timer_exists(TimerId id);
|
|
|
|
/**
|
|
* Process timer events (call from main thread)
|
|
* @return Number of timers fired
|
|
*/
|
|
uint32_t timer_process(void);
|
|
|
|
/**
|
|
* Get timer file descriptor for select()
|
|
*/
|
|
int timer_get_poll_fd(void);
|
|
|
|
/**
|
|
* Initialize timer subsystem
|
|
*/
|
|
bool timer_init(void);
|
|
|
|
/**
|
|
* Shutdown timer subsystem
|
|
*/
|
|
void timer_shutdown(void);
|
|
|
|
/* ============================================================================
|
|
* Thread-Local Storage Abstraction
|
|
* ============================================================================ */
|
|
|
|
typedef struct TlsKey TlsKey;
|
|
|
|
/**
|
|
* Create TLS key
|
|
*/
|
|
TlsKey* tls_create(void (*destructor)(void*));
|
|
|
|
/**
|
|
* Destroy TLS key
|
|
*/
|
|
void tls_destroy(TlsKey *key);
|
|
|
|
/**
|
|
* Set TLS value
|
|
*/
|
|
void tls_set(TlsKey *key, void *value);
|
|
|
|
/**
|
|
* Get TLS value
|
|
*/
|
|
void* tls_get(TlsKey *key);
|
|
|
|
/* ============================================================================
|
|
* Read-Write Lock Wrapper
|
|
* ============================================================================ */
|
|
|
|
typedef struct RwLock RwLock;
|
|
|
|
RwLock* rwlock_create(void);
|
|
void rwlock_destroy(RwLock *lock);
|
|
void rwlock_read_lock(RwLock *lock);
|
|
bool rwlock_read_trylock(RwLock *lock);
|
|
void rwlock_read_unlock(RwLock *lock);
|
|
void rwlock_write_lock(RwLock *lock);
|
|
bool rwlock_write_trylock(RwLock *lock);
|
|
void rwlock_write_unlock(RwLock *lock);
|
|
|
|
/* ============================================================================
|
|
* Barrier and Synchronization
|
|
* ============================================================================ */
|
|
|
|
typedef struct Barrier Barrier;
|
|
|
|
Barrier* barrier_create(uint32_t count);
|
|
void barrier_destroy(Barrier *b);
|
|
bool barrier_wait(Barrier *b, uint64_t timeout_ms);
|
|
|
|
/* ============================================================================
|
|
* Initialization and Cleanup
|
|
* ============================================================================ */
|
|
|
|
/**
|
|
* Initialize entire threading subsystem
|
|
*/
|
|
bool threading_init(void);
|
|
|
|
/**
|
|
* Shutdown entire threading subsystem
|
|
*/
|
|
void threading_shutdown(void);
|
|
|
|
/**
|
|
* Get number of hardware threads
|
|
*/
|
|
uint32_t threading_hw_concurrency(void);
|
|
|
|
/**
|
|
* Current thread ID
|
|
*/
|
|
uint64_t threading_current_thread_id(void);
|
|
|
|
/**
|
|
* Set current thread name (for debugging)
|
|
*/
|
|
void threading_set_name(const char *name);
|
|
|
|
/**
|
|
* Yield current thread
|
|
*/
|
|
void threading_yield(void);
|
|
|
|
/* ============================================================================
|
|
* Utility Macros
|
|
* ============================================================================ */
|
|
|
|
/* Run function in background */
|
|
#define ASYNC(func, data) \
|
|
thread_pool_submit_func(thread_pool_default(), (func), (data), TASK_PRIORITY_NORMAL)
|
|
|
|
/* Run function with priority */
|
|
#define ASYNC_PRIORITY(func, data, prio) \
|
|
thread_pool_submit_func(thread_pool_default(), (func), (data), (prio))
|
|
|
|
/* Create future and submit */
|
|
#define ASYNC_FUTURE(ctx, func, data) \
|
|
async_submit_future((ctx), (func), (data), TASK_PRIORITY_NORMAL)
|
|
|
|
/* Synchronized block using mutex */
|
|
#define WITH_MUTEX(mutex, code) do { \
|
|
pthread_mutex_lock(&(mutex)); \
|
|
code; \
|
|
pthread_mutex_unlock(&(mutex)); \
|
|
} while(0)
|
|
|
|
/* Read lock block */
|
|
#define WITH_READ_LOCK(rwlock, code) do { \
|
|
rwlock_read_lock((rwlock)); \
|
|
code; \
|
|
rwlock_read_unlock((rwlock)); \
|
|
} while(0)
|
|
|
|
/* Write lock block */
|
|
#define WITH_WRITE_LOCK(rwlock, code) do { \
|
|
rwlock_write_lock((rwlock)); \
|
|
code; \
|
|
rwlock_write_unlock((rwlock)); \
|
|
} while(0)
|
|
|
|
#endif /* DWN_THREADING_H */
|