16#include <sys/sysinfo.h>
27 long int nb_procs = 0;
30 GetSystemInfo(&sysinfo);
31 nb_procs = sysinfo.dwNumberOfProcessors;
33 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
47 n_log(
LOG_ERR,
"Thread fatal error, no valid payload found, exiting thread function !");
61 pthread_mutex_lock(&node->
lock);
63 pthread_mutex_unlock(&node->
lock);
67 pthread_mutex_lock(&node->
lock);
69 pthread_mutex_unlock(&node->
lock);
71 if (node && node->
func) {
76 pthread_mutex_lock(&node->
lock);
80 int type = node->
type;
85 pthread_mutex_unlock(&node->
lock);
93 pthread_mutex_lock(&node->
lock);
95 pthread_mutex_unlock(&node->
lock);
137 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
141 for (
size_t it = 0; it < nbmaxthr; it++) {
150 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start", strerror(error), it);
155 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end", strerror(error), it);
165 n_log(
LOG_ERR,
"pthread_create failed : %s for it %d", strerror(errno), it);
182 n_log(
LOG_ERR ,
"thread_pool is not allocated, can't add processes to it !");
187 n_log(
LOG_ERR ,
"thread_pool thread_list is not allocated, can't add processes to it !");
194 while (it < thread_pool->max_threads) {
203 if (it < thread_pool->max_threads) {
219 n_log(
LOG_DEBUG,
"proc %p(%p) added on thread %d", func_ptr, param, it);
225 int cancel_and_return = FALSE ;
227 n_log(
LOG_DEBUG,
"Thread pool active threads are all busy and mode is NO_QUEUE, cannot add %p(%p) to pool %p", func_ptr, param,
thread_pool);
228 cancel_and_return = TRUE ;
230 n_log(
LOG_ERR,
"Thread pool active threads are all busy, cannot add SYNCED_PROC %p(%p) to pool %p", func_ptr, param,
thread_pool);
231 cancel_and_return = TRUE ;
233 n_log(
LOG_ERR,
"Thread pool active threads are all busy, cannot add DIRECT_PROC %p(%p) to pool %p", func_ptr, param,
thread_pool);
234 cancel_and_return = TRUE ;
237 if( cancel_and_return )
248 proc->
func = func_ptr;
253 n_log(
LOG_ERR,
"proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param,
thread_pool);
289 n_log(
LOG_ERR,
"sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s",
thread_pool, it, strerror(error));
312 n_log(
LOG_ERR,
"sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s",
thread_pool, it, strerror(error));
372 __n_assert((*pool)->thread_list,
return FALSE);
374 int state = 0,
DONE = 0;
378 pthread_mutex_lock(&(*pool)->lock);
379 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
380 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
381 state = (*pool)->thread_list[it]->state;
382 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
386 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
388 sem_post(&(*pool)->thread_list[it]->th_start);
389 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
395 pthread_mutex_unlock(&(*pool)->lock);
400 pthread_mutex_lock(&(*pool)->lock);
402 for (
size_t it = 0; it < (*pool)->max_threads; it++) {
403 pthread_join((*pool)->thread_list[it]->thr, NULL);
404 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
405 sem_destroy(&(*pool)->thread_list[it]->th_start);
406 sem_destroy(&(*pool)->thread_list[it]->th_end);
407 Free((*pool)->thread_list[it]);
409 Free((*pool)->thread_list);
412 sem_destroy(&(*pool)->nb_tasks);
414 pthread_mutex_unlock(&(*pool)->lock);
415 pthread_mutex_destroy(&(*pool)->lock);
436 while (push_status == 1) {
439 if (node && node->
ptr) {
444 next_node = node->
next;
THREAD_POOL * thread_pool
int mode
Network for managing conenctions.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
size_t nb_items
number of item currently in the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
size_t nb_max_waiting
Maximum number of waiting procedures int the list, 0 or -1 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore to store the number of tasks
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start, not queued
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
#define NO_QUEUE
special processing mode for waiting_list: do not add the work in queue since it' coming from the queu...
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start, not queued
int wait_for_threaded_pool(THREAD_POOL *thread_pool, unsigned int delay)
Wait for all the launched process in the thread pool to terminate.
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a trhead pool.
Structure of a waiting process item.
Common headers and low-level functions & define.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.