15#include <sys/sysinfo.h>
26 long int nb_procs = 0;
29 GetSystemInfo(&sysinfo);
30 nb_procs = sysinfo.dwNumberOfProcessors;
32 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
46 n_log(
LOG_ERR,
"Thread fatal error, no valid payload found, exiting thread function !");
60 pthread_mutex_lock(&node->
lock);
62 pthread_mutex_unlock(&node->
lock);
66 pthread_mutex_lock(&node->
lock);
68 pthread_mutex_unlock(&node->
lock);
70 if (node && node->
func) {
75 pthread_mutex_lock(&node->
lock);
79 int type = node->
type;
84 pthread_mutex_unlock(&node->
lock);
92 pthread_mutex_lock(&node->
lock);
94 pthread_mutex_unlock(&node->
lock);
132 pthread_mutex_init(&thread_pool->
lock, NULL);
134 if (sem_init(&thread_pool->
nb_tasks, 0, 0) == -1) {
136 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
140 for (
size_t it = 0; it < nbmaxthr; it++) {
149 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror(error), it);
154 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror(error), it);
164 n_log(
LOG_ERR,
"pthread_create failed : %s for it %d\n", strerror(errno), it);
188 if (!(mode &
NO_LOCK)) pthread_mutex_lock(&thread_pool->
lock);
190 while (it < thread_pool->max_threads) {
199 if (it < thread_pool->max_threads) {
206 n_log(
LOG_ERR,
"unknown mode %d for thread %d", mode, it);
208 if (!(mode &
NO_LOCK)) pthread_mutex_unlock(&thread_pool->
lock);
213 n_log(
LOG_DEBUG,
"proc %p(%p) added on thread %d", func_ptr, param, it);
219 if (!(mode &
NO_LOCK)) pthread_mutex_unlock(&thread_pool->
lock);
220 n_log(
LOG_DEBUG,
"thread pool active threads are all busy, cannot add DIRECT or SYNCED_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
228 proc->
func = func_ptr;
233 n_log(
LOG_ERR,
"proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param, thread_pool);
234 if (!(mode &
NO_LOCK)) pthread_mutex_unlock(&thread_pool->
lock);
239 if (!(mode &
NO_LOCK)) pthread_mutex_unlock(&thread_pool->
lock);
258 pthread_mutex_lock(&thread_pool->
lock);
259 for (
size_t it = 0; it < thread_pool->
max_threads; it++) {
269 n_log(
LOG_ERR,
"sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
274 pthread_mutex_unlock(&thread_pool->
lock);
289 for (
size_t it = 0; it < thread_pool->
max_threads; it++) {
292 n_log(
LOG_ERR,
"sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
326 for (
size_t it = 0; it < thread_pool->
max_threads; it++) {
352 __n_assert((*pool)->thread_list,
return FALSE);
354 int state = 0, DONE = 0;
358 pthread_mutex_lock(&(*pool)->lock);
359 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
360 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
361 state = (*pool)->thread_list[it]->state;
362 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
366 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
368 sem_post(&(*pool)->thread_list[it]->th_start);
369 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
375 pthread_mutex_unlock(&(*pool)->lock);
380 pthread_mutex_lock(&(*pool)->lock);
382 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
383 pthread_join((*pool)->thread_list[it]->thr, NULL);
384 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
385 sem_destroy(&(*pool)->thread_list[it]->th_start);
386 sem_destroy(&(*pool)->thread_list[it]->th_end);
387 Free((*pool)->thread_list[it]);
389 Free((*pool)->thread_list);
392 sem_destroy(&(*pool)->nb_tasks);
394 pthread_mutex_unlock(&(*pool)->lock);
395 pthread_mutex_destroy(&(*pool)->lock);
413 pthread_mutex_lock(&thread_pool->
lock);
416 while (push_status == 1) {
419 if (node && node->
ptr) {
424 next_node = node->
next;
430 n_log(
LOG_DEBUG,
"waitlist: cannot add proc %p from waiting list, all active threads are busy !", proc, thread_pool);
434 n_log(
LOG_ERR,
"waitlist: trying to add invalid NULL proc on thread pool %p !", thread_pool);
444 for (
size_t it = 0; it < thread_pool->
max_threads; it++) {
451 if (push_status == 0 && thread_pool->
nb_actives == 0) {
453 sem_getvalue(&thread_pool->
nb_tasks, &value);
459 pthread_mutex_unlock(&thread_pool->
lock);
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
size_t nb_items
number of item currently in the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
size_t nb_max_waiting
Maximum number of waiting procedures int the list, 0 or -1 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore to store the number of tasks
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start
int wait_for_threaded_pool(THREAD_POOL *thread_pool, unsigned int delay)
Wait for all the launched process in the thread pool to terminate.
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a trhead pool.
Structure of a waiting process item.
Common headers and low-level functions & define.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.