/*
* DWN - Desktop Window Manager
* retoor <retoor@molodetz.nl>
* Extensive Abstract Threading Framework
*
* This module provides a comprehensive, abstract threading system with:
* - Lock-free data structures
* - Thread pools with work stealing
* - Async/await pattern support
* - Thread-safe event bus
* - Memory barriers and atomic operations
*/
#ifndef DWN_THREADING_H
#define DWN_THREADING_H
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <pthread.h>
#include <stdatomic.h>
/* ============================================================================
* Platform Abstraction Layer
* ============================================================================ */
/* Cache line size (commonly 64 bytes on x86_64) */
#define CACHE_LINE_SIZE 64
/* Align to cache line to prevent false sharing */
#define CACHE_ALIGN __attribute__((aligned(CACHE_LINE_SIZE)))
/* Memory ordering shortcuts */
#define ATOMIC_RELAXED memory_order_relaxed
#define ATOMIC_ACQUIRE memory_order_acquire
#define ATOMIC_RELEASE memory_order_release
#define ATOMIC_ACQ_REL memory_order_acq_rel
#define ATOMIC_SEQ_CST memory_order_seq_cst
/* ============================================================================
* Core Types and Status Codes
* ============================================================================ */
typedef enum {
THREAD_OK = 0,
THREAD_ERROR = -1,
THREAD_ERROR_NOMEM = -2,
THREAD_ERROR_BUSY = -3,
THREAD_ERROR_CLOSED = -4,
THREAD_ERROR_TIMEOUT = -5,
THREAD_ERROR_INVALID = -6
} ThreadStatus;
typedef enum {
TASK_PRIORITY_CRITICAL = 0, /* UI-critical, must execute immediately */
TASK_PRIORITY_HIGH = 1, /* User-initiated actions */
TASK_PRIORITY_NORMAL = 2, /* Default background work */
TASK_PRIORITY_LOW = 3, /* Maintenance tasks */
TASK_PRIORITY_IDLE = 4, /* Only when system idle */
TASK_PRIORITY_COUNT
} TaskPriority;
typedef enum {
TASK_STATE_PENDING = 0,
TASK_STATE_RUNNING = 1,
TASK_STATE_COMPLETED = 2,
TASK_STATE_CANCELLED = 3,
TASK_STATE_ERROR = 4
} TaskState;
/* Forward declarations */
typedef struct ThreadPool ThreadPool;
typedef struct Task Task;
typedef struct Channel Channel;
typedef struct Future Future;
typedef struct EventBus EventBus;
typedef struct AsyncContext AsyncContext;
typedef struct Worker Worker;
/* ============================================================================
* Task System - Abstract Unit of Work
* ============================================================================ */
/* Task function signature - receives user data and cancellation flag */
typedef void (*TaskFunc)(void *user_data, atomic_int *cancelled);
/* Callback for task completion (called in worker thread) */
typedef void (*TaskCallback)(Task *task, void *user_data, ThreadStatus status);
/* Task handle - opaque pointer */
typedef Task* TaskHandle;
/**
* Create a new task
* @param func The function to execute
* @param user_data Data passed to the function
* @param priority Task priority level
* @return Task handle or NULL on error
*/
TaskHandle task_create(TaskFunc func, void *user_data, TaskPriority priority);
/**
* Create a task with completion callback
*/
TaskHandle task_create_with_callback(TaskFunc func, void *user_data,
TaskPriority priority,
TaskCallback on_complete,
void *callback_data);
/**
* Destroy a task (only if not submitted)
*/
void task_destroy(TaskHandle task);
/**
* Cancel a task (best effort - may already be running)
*/
bool task_cancel(TaskHandle task);
/**
* Get current task state
*/
TaskState task_get_state(TaskHandle task);
/**
* Wait for task completion (blocking)
*/
ThreadStatus task_wait(TaskHandle task, uint64_t timeout_ms);
/**
* Check if task was cancelled
*/
bool task_is_cancelled(TaskHandle task);
/* ============================================================================
* Thread Pool - Manage Worker Threads
* ============================================================================ */
/* Thread pool configuration */
typedef struct {
uint32_t min_threads; /* Minimum threads to keep alive */
uint32_t max_threads; /* Maximum threads allowed */
uint32_t queue_capacity; /* Task queue capacity per priority */
uint32_t steal_attempts; /* Work stealing attempts before blocking */
uint64_t idle_timeout_ms; /* Time before idle thread terminates */
bool enable_work_stealing; /* Enable work stealing between queues */
} ThreadPoolConfig;
/* Default configuration */
#define THREAD_POOL_DEFAULT_CONFIG ((ThreadPoolConfig){ \
.min_threads = 2, \
.max_threads = 8, \
.queue_capacity = 256, \
.steal_attempts = 4, \
.idle_timeout_ms = 60000, \
.enable_work_stealing = true \
})
/**
* Create a thread pool
*/
ThreadPool* thread_pool_create(const ThreadPoolConfig *config);
/**
* Destroy thread pool, cancelling pending tasks
*/
void thread_pool_destroy(ThreadPool *pool);
/**
* Submit a task to the pool
*/
ThreadStatus thread_pool_submit(ThreadPool *pool, TaskHandle task);
/**
* Submit a function as a task (convenience)
*/
ThreadStatus thread_pool_submit_func(ThreadPool *pool, TaskFunc func,
void *user_data, TaskPriority priority);
/**
* Get number of active threads
*/
uint32_t thread_pool_active_count(ThreadPool *pool);
/**
* Get number of pending tasks
*/
uint32_t thread_pool_pending_count(ThreadPool *pool);
/**
* Shutdown pool gracefully, waiting for tasks to complete
*/
ThreadStatus thread_pool_shutdown(ThreadPool *pool, uint64_t timeout_ms);
/**
* Get the default/global thread pool
*/
ThreadPool* thread_pool_default(void);
/**
* Initialize the default thread pool
*/
ThreadStatus thread_pool_init_default(const ThreadPoolConfig *config);
/**
* Shutdown the default thread pool
*/
void thread_pool_shutdown_default(void);
/* ============================================================================
* Lock-Free Queue - Single Producer Single Consumer
* ============================================================================ */
#define SPSC_QUEUE_SIZE 1024
typedef struct {
_Atomic uint64_t head CACHE_ALIGN; /* Write index - producer only */
_Atomic uint64_t tail CACHE_ALIGN; /* Read index - consumer only */
void *buffer[SPSC_QUEUE_SIZE];
} SpscQueue;
/**
* Initialize SPSC queue
*/
void spsc_queue_init(SpscQueue *q);
/**
* Push item (producer only)
* @return false if queue full
*/
bool spsc_queue_push(SpscQueue *q, void *item);
/**
* Pop item (consumer only)
* @return false if queue empty
*/
bool spsc_queue_pop(SpscQueue *q, void **item);
/**
* Check if queue is empty (consumer only)
*/
bool spsc_queue_empty(SpscQueue *q);
/**
* Get approximate size (not synchronized)
*/
uint64_t spsc_queue_size_approx(SpscQueue *q);
/* ============================================================================
* Lock-Free Queue - Multi Producer Single Consumer
* ============================================================================ */
typedef struct MpscQueue MpscQueue;
/**
* Create MPSC queue
*/
MpscQueue* mpsc_queue_create(uint32_t capacity);
/**
* Destroy MPSC queue
*/
void mpsc_queue_destroy(MpscQueue *q);
/**
* Push item (thread-safe, any producer)
* @return false if queue full
*/
bool mpsc_queue_push(MpscQueue *q, void *item);
/**
* Pop item (consumer only - single thread)
* @return false if queue empty
*/
bool mpsc_queue_pop(MpscQueue *q, void **item);
/**
* Check if queue is empty
*/
bool mpsc_queue_empty(MpscQueue *q);
/* ============================================================================
* Channel - Thread-Safe Communication
* ============================================================================ */
typedef enum {
CHANNEL_UNBUFFERED = 0, /* Synchronous - sender blocks until received */
CHANNEL_BUFFERED = 1 /* Asynchronous - uses internal buffer */
} ChannelType;
/**
* Create a channel
* @param capacity Buffer size (0 for unbuffered synchronous channel)
*/
Channel* channel_create(uint32_t capacity);
/**
* Destroy channel
*/
void channel_destroy(Channel *ch);
/**
* Send data through channel (blocking)
* @return THREAD_OK on success, THREAD_ERROR_CLOSED if closed
*/
ThreadStatus channel_send(Channel *ch, void *data);
/**
* Send with timeout
*/
ThreadStatus channel_send_timeout(Channel *ch, void *data, uint64_t timeout_ms);
/**
* Try send (non-blocking)
*/
ThreadStatus channel_try_send(Channel *ch, void *data);
/**
* Receive from channel (blocking)
*/
ThreadStatus channel_recv(Channel *ch, void **data);
/**
* Receive with timeout
*/
ThreadStatus channel_recv_timeout(Channel *ch, void **data, uint64_t timeout_ms);
/**
* Try receive (non-blocking)
*/
ThreadStatus channel_try_recv(Channel *ch, void **data);
/**
* Close channel (no more sends allowed)
*/
void channel_close(Channel *ch);
/**
* Check if channel is closed
*/
bool channel_is_closed(Channel *ch);
/**
* Select on multiple channels (like Go's select)
* Returns index of ready channel or -1 on timeout
*/
int channel_select(Channel **channels, uint32_t count, uint64_t timeout_ms,
void **out_data);
/* ============================================================================
* Future/Promise - Async Result Handling
* ============================================================================ */
typedef void* FutureResult;
typedef void (*FutureCallback)(Future *f, FutureResult result, void *user_data);
/**
* Create a future
*/
Future* future_create(void);
/**
* Destroy future
*/
void future_destroy(Future *f);
/**
* Set the result (called by producer)
*/
void future_set_result(Future *f, FutureResult result);
/**
* Set error result
*/
void future_set_error(Future *f, int error_code);
/**
* Get result (blocking)
*/
FutureResult future_get(Future *f, ThreadStatus *status);
/**
* Get result with timeout
*/
FutureResult future_get_timeout(Future *f, uint64_t timeout_ms, ThreadStatus *status);
/**
* Check if future is ready
*/
bool future_is_ready(Future *f);
/**
* Attach callback to be called when ready (thread-safe)
*/
void future_then(Future *f, FutureCallback callback, void *user_data);
/**
* Create future that completes when all given futures complete
*/
Future* future_all(Future **futures, uint32_t count);
/**
* Create future that completes when any given future completes
*/
Future* future_any(Future **futures, uint32_t count);
/* ============================================================================
* Event Bus - Thread-Safe Pub/Sub
* ============================================================================ */
typedef uint32_t EventType;
typedef uint32_t SubscriptionId;
/* Event handler signature */
typedef void (*EventHandler)(EventType type, void *event_data, void *user_data);
/* Event filter - return true to allow event */
typedef bool (*EventFilter)(EventType type, void *event_data, void *user_data);
/**
* Create event bus
*/
EventBus* event_bus_create(void);
/**
* Destroy event bus
*/
void event_bus_destroy(EventBus *bus);
/**
* Subscribe to event type
* @return Subscription ID or 0 on error
*/
SubscriptionId event_bus_subscribe(EventBus *bus, EventType type,
EventHandler handler, void *user_data);
/**
* Subscribe with filter
*/
SubscriptionId event_bus_subscribe_filtered(EventBus *bus, EventType type,
EventHandler handler, void *user_data,
EventFilter filter, void *filter_data);
/**
* Unsubscribe
*/
bool event_bus_unsubscribe(EventBus *bus, SubscriptionId id);
/**
* Publish event (thread-safe)
*/
void event_bus_publish(EventBus *bus, EventType type, void *event_data);
/**
* Publish event with custom free function
*/
void event_bus_publish_owned(EventBus *bus, EventType type, void *event_data,
void (*free_fn)(void*));
/**
* Process pending events (call from main thread)
* @return number of events processed
*/
uint32_t event_bus_process(EventBus *bus);
/**
* Set processing limit per call
*/
void event_bus_set_batch_size(EventBus *bus, uint32_t batch_size);
/**
* Get the global/default event bus
*/
EventBus* event_bus_default(void);
/**
* Initialize default event bus
*/
bool event_bus_init_default(void);
/**
* Shutdown default event bus
*/
void event_bus_shutdown_default(void);
/* ============================================================================
* Async Context - Per-Module Async State
* ============================================================================ */
/* Context for managing async operations within a module */
struct AsyncContext {
ThreadPool *pool;
EventBus *event_bus;
Channel *completion_channel;
atomic_int operation_count;
atomic_int shutdown_requested;
pthread_mutex_t mutex;
};
/**
* Create async context
*/
AsyncContext* async_context_create(const char *name);
/**
* Destroy async context (cancels all pending operations)
*/
void async_context_destroy(AsyncContext *ctx);
/**
* Submit work to context's thread pool
*/
TaskHandle async_submit(AsyncContext *ctx, TaskFunc func, void *user_data,
TaskPriority priority);
/**
* Submit work and get future
*/
Future* async_submit_future(AsyncContext *ctx, TaskFunc func, void *user_data,
TaskPriority priority);
/**
* Check for completed operations (call from main thread)
*/
uint32_t async_poll(AsyncContext *ctx);
/**
* Get file descriptor for select() integration
* Returns -1 if not available
*/
int async_get_poll_fd(AsyncContext *ctx);
/* ============================================================================
* Timer/Scheduler - Delayed Execution
* ============================================================================ */
typedef uint64_t TimerId;
typedef void (*TimerCallback)(TimerId id, void *user_data);
/**
* Schedule one-shot timer
*/
TimerId timer_schedule(uint64_t delay_ms, TimerCallback callback, void *user_data);
/**
* Schedule repeating timer
*/
TimerId timer_schedule_repeating(uint64_t interval_ms, TimerCallback callback,
void *user_data);
/**
* Cancel timer
*/
bool timer_cancel(TimerId id);
/**
* Check if timer exists
*/
bool timer_exists(TimerId id);
/**
* Process timer events (call from main thread)
* @return Number of timers fired
*/
uint32_t timer_process(void);
/**
* Get timer file descriptor for select()
*/
int timer_get_poll_fd(void);
/**
* Initialize timer subsystem
*/
bool timer_init(void);
/**
* Shutdown timer subsystem
*/
void timer_shutdown(void);
/* ============================================================================
* Thread-Local Storage Abstraction
* ============================================================================ */
typedef struct TlsKey TlsKey;
/**
* Create TLS key
*/
TlsKey* tls_create(void (*destructor)(void*));
/**
* Destroy TLS key
*/
void tls_destroy(TlsKey *key);
/**
* Set TLS value
*/
void tls_set(TlsKey *key, void *value);
/**
* Get TLS value
*/
void* tls_get(TlsKey *key);
/* ============================================================================
* Read-Write Lock Wrapper
* ============================================================================ */
typedef struct RwLock RwLock;
RwLock* rwlock_create(void);
void rwlock_destroy(RwLock *lock);
void rwlock_read_lock(RwLock *lock);
bool rwlock_read_trylock(RwLock *lock);
void rwlock_read_unlock(RwLock *lock);
void rwlock_write_lock(RwLock *lock);
bool rwlock_write_trylock(RwLock *lock);
void rwlock_write_unlock(RwLock *lock);
/* ============================================================================
* Barrier and Synchronization
* ============================================================================ */
typedef struct Barrier Barrier;
Barrier* barrier_create(uint32_t count);
void barrier_destroy(Barrier *b);
bool barrier_wait(Barrier *b, uint64_t timeout_ms);
/* ============================================================================
* Initialization and Cleanup
* ============================================================================ */
/**
* Initialize entire threading subsystem
*/
bool threading_init(void);
/**
* Shutdown entire threading subsystem
*/
void threading_shutdown(void);
/**
* Get number of hardware threads
*/
uint32_t threading_hw_concurrency(void);
/**
* Current thread ID
*/
uint64_t threading_current_thread_id(void);
/**
* Set current thread name (for debugging)
*/
void threading_set_name(const char *name);
/**
* Yield current thread
*/
void threading_yield(void);
/* ============================================================================
* Utility Macros
* ============================================================================ */
/* Run function in background */
#define ASYNC(func, data) \
thread_pool_submit_func(thread_pool_default(), (func), (data), TASK_PRIORITY_NORMAL)
/* Run function with priority */
#define ASYNC_PRIORITY(func, data, prio) \
thread_pool_submit_func(thread_pool_default(), (func), (data), (prio))
/* Create future and submit */
#define ASYNC_FUTURE(ctx, func, data) \
async_submit_future((ctx), (func), (data), TASK_PRIORITY_NORMAL)
/* Synchronized block using mutex */
#define WITH_MUTEX(mutex, code) do { \
pthread_mutex_lock(&(mutex)); \
code; \
pthread_mutex_unlock(&(mutex)); \
} while(0)
/* Read lock block */
#define WITH_READ_LOCK(rwlock, code) do { \
rwlock_read_lock((rwlock)); \
code; \
rwlock_read_unlock((rwlock)); \
} while(0)
/* Write lock block */
#define WITH_WRITE_LOCK(rwlock, code) do { \
rwlock_write_lock((rwlock)); \
code; \
rwlock_write_unlock((rwlock)); \
} while(0)
#endif /* DWN_THREADING_H */