15#include <sys/sysinfo.h>
30 GetSystemInfo(&sysinfo);
31 nb_procs = sysinfo.dwNumberOfProcessors;
33 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
51 n_log(
LOG_ERR,
"Thread %ld fatal error, no valid payload found, exited", node -> thr );
58 int thread_state = 0 ;
64 sem_wait( &node -> th_start );
66 pthread_mutex_lock( &node -> lock );
67 thread_state = node -> thread_state ;
68 pthread_mutex_unlock( &node -> lock );
72 n_log(
LOG_INFO,
"Thread pool running proc %p", node -> func );
73 pthread_mutex_lock( &node -> lock );
75 pthread_mutex_unlock( &node -> lock );
77 if( node && node -> func )
79 node -> func( node -> param ) ;
83 pthread_mutex_lock( &node -> lock );
85 node -> param = NULL ;
87 int type = node -> type ;
91 sem_post( &node -> th_end );
92 pthread_mutex_unlock( &node -> lock );
101 pthread_mutex_lock( &node -> lock );
103 pthread_mutex_unlock( &node -> lock );
107 pthread_exit( NULL );
128 thread_pool -> max_threads = nbmaxthr ;
129 thread_pool -> nb_max_waiting = nb_max_waiting ;
130 thread_pool -> nb_actives = 0 ;
133 if( !thread_pool -> thread_list )
140 if( !thread_pool -> waiting_list )
146 pthread_mutex_init( &thread_pool -> lock, NULL );
148 if( sem_init( &thread_pool -> nb_tasks , 0 , 0 ) == -1 )
151 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> nb_tasks", strerror ( error ));
155 for(
int it = 0 ; it < nbmaxthr ; it ++ )
158 thread_pool -> thread_list[ it ] -> type = -1 ;
159 thread_pool -> thread_list[ it ] -> state =
IDLE_PROC ;
160 thread_pool -> thread_list[ it ] -> thread_state =
RUNNING_THREAD ;
161 thread_pool -> thread_list[ it ] -> thread_pool = thread_pool ;
163 if( sem_init( &thread_pool -> thread_list[ it ] -> th_start, 0, 0 ) == -1 )
166 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror ( error ), it );
169 if( sem_init( &thread_pool -> thread_list[ it ] -> th_end, 0, 0 ) == -1 )
172 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror ( error ), it );
176 thread_pool -> thread_list[ it ] -> func = NULL ;
177 thread_pool -> thread_list[ it ] -> param = NULL ;
179 pthread_mutex_init( &thread_pool -> thread_list[ it ] -> lock, NULL );
183 n_log(
LOG_ERR,
"pthread_create failed : %s for it %d\n", strerror ( errno ), it );
206 if( !thread_pool -> thread_list )
211 if( !(mode&
NO_LOCK) ) pthread_mutex_lock( &thread_pool -> lock );
213 while( it < thread_pool -> max_threads )
215 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
216 if( thread_pool -> thread_list[ it ] -> thread_state ==
RUNNING_THREAD && thread_pool -> thread_list[ it ] -> state ==
IDLE_PROC )
220 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
224 if( it < thread_pool -> max_threads )
228 thread_pool -> thread_list[ it ] -> func = func_ptr ;
229 thread_pool -> thread_list[ it ] -> param = param ;
230 thread_pool -> thread_list[ it ] -> state =
WAITING_PROC ;
231 thread_pool -> thread_list[ it ] -> type = mode ;
235 n_log(
LOG_ERR,
"unknown mode %d for thread %d", mode, it );
236 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
237 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
240 if( mode&
DIRECT_PROC ) sem_post( &thread_pool -> thread_list[ it ] -> th_start );
241 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
242 n_log(
LOG_DEBUG ,
"proc %p(%p) added on thread %d" , func_ptr , param , it );
251 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
252 n_log(
LOG_DEBUG,
"thread pool active threads are all busy, cannot add DIRECT or SYNCED_PROC %p(%p) to pool %p", func_ptr , param , thread_pool );
257 if( thread_pool -> nb_max_waiting <= 0 || ( thread_pool -> waiting_list -> nb_items < thread_pool -> nb_max_waiting ) )
261 proc -> func = func_ptr ;
262 proc -> param = param ;
263 list_push( thread_pool -> waiting_list, proc, free );
264 n_log(
LOG_DEBUG,
"Adding %p %p to waitlist", proc -> func, proc -> param );
268 n_log(
LOG_ERR ,
"proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param , thread_pool );
269 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
274 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
291 if( !thread_pool -> thread_list )
296 pthread_mutex_lock( &thread_pool -> lock );
297 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
300 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
301 if( (thread_pool -> thread_list[ it ] -> type&
SYNCED_PROC) && thread_pool -> thread_list[ it ] -> state ==
WAITING_PROC )
305 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
308 if( sem_post( &thread_pool -> thread_list[ it ] -> th_start ) != 0 )
311 n_log(
LOG_ERR ,
"sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
315 pthread_mutex_unlock( &thread_pool -> lock );
329 __n_assert( thread_pool -> thread_list ,
return FALSE );
332 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
334 if( sem_wait( &thread_pool -> thread_list[ it ] -> th_end ) == -1 )
337 n_log(
LOG_ERR ,
"sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
356 if( !thread_pool -> thread_list )
364 while( thread_pool -> waiting_list -> nb_items > 0 )
375 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
379 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
380 state = thread_pool -> thread_list[ it ] -> state ;
381 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
405 __n_assert( (*pool) -> thread_list,
return FALSE );
407 int state= 0, DONE = 0 ;
412 pthread_mutex_lock( &(*pool) -> lock );
413 for(
int it = 0 ; it < (*pool) -> max_threads ; it ++ )
415 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
416 state = (*pool) -> thread_list[ it ] -> state ;
417 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
422 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
424 sem_post( &(*pool) -> thread_list[ it ] -> th_start );
425 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
433 pthread_mutex_unlock( &(*pool) -> lock );
438 pthread_mutex_lock( &(*pool) -> lock );
440 for(
int it = 0 ; it < (*pool) -> max_threads ; it ++ )
442 pthread_join( (*pool) -> thread_list[ it ] -> thr, NULL );
443 pthread_mutex_destroy( &(*pool) -> thread_list[ it ] -> lock );
444 sem_destroy( &(*pool) -> thread_list[ it ] -> th_start );
445 sem_destroy( &(*pool) -> thread_list[ it ] -> th_end );
446 Free( (*pool) -> thread_list[ it ] );
448 Free( (*pool) -> thread_list );
451 sem_destroy( &(*pool) -> nb_tasks );
453 pthread_mutex_unlock( &(*pool) -> lock );
454 pthread_mutex_destroy( &(*pool) -> lock );
472 __n_assert( thread_pool -> waiting_list,
return FALSE );
475 int push_status = 0 ;
476 pthread_mutex_lock( &thread_pool -> lock );
477 if( thread_pool -> waiting_list && thread_pool -> waiting_list -> start )
479 while( push_status == 1 )
481 LIST_NODE *node = thread_pool -> waiting_list -> start ;
483 if( node && node -> ptr )
491 next_node = node -> next ;
493 n_log(
LOG_DEBUG,
"waitlist: adding %p,%p to %p", procptr -> func, procptr -> param, thread_pool );
499 n_log(
LOG_DEBUG ,
"waitlist: cannot add proc %p from waiting list, all active threads are busy !" , proc , thread_pool );
505 n_log(
LOG_ERR ,
"waitlist: trying to add invalid NULL proc on thread pool %p !" , thread_pool );
516 thread_pool -> nb_actives = 0 ;
517 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
519 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
520 if( thread_pool -> thread_list[ it ] -> state ==
RUNNING_PROC )
521 thread_pool -> nb_actives ++ ;
522 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
525 if( push_status == 0 && thread_pool -> nb_actives == 0 )
528 sem_getvalue( &thread_pool -> nb_tasks , &value );
531 sem_post( &thread_pool -> nb_tasks );
535 pthread_mutex_unlock( &thread_pool -> lock );
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for exectution in the thread pool.
int get_nb_cpu_cores()
get number of core of current system
int destroy_threaded_pool(THREAD_POOL **pool, int delay)
delete a thread_pool, exit the threads and free the structs
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
#define DIRECT_PROC
processing mode for added func, direct start
#define RUNNING_PROC
status of a thread which proc is currently running
int wait_for_threaded_pool(THREAD_POOL *thread_pool, int delay)
Wait for all the launched process in the thread pool to terminate.
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
THREAD_POOL * new_thread_pool(int nbmaxthr, int nb_max_waiting)
Create a new pool of nbmaxthr threads.
Structure of a trhead pool.
Structure of a waiting process item.
Common headers and low-level hugly functions & define.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.