15#include <sys/sysinfo.h>
32 GetSystemInfo(&sysinfo);
33 nb_procs = sysinfo.dwNumberOfProcessors;
35 nb_procs = get_nprocs();
53 n_log(
LOG_ERR,
"Thread %ld fatal error, no valid payload found, exited", node -> thr );
60 int thread_state = 0 ;
66 sem_wait( &node -> th_start );
68 pthread_mutex_lock( &node -> lock );
69 thread_state = node -> thread_state ;
70 pthread_mutex_unlock( &node -> lock );
76 pthread_mutex_lock( &node -> lock );
78 pthread_mutex_unlock( &node -> lock );
80 if( node && node -> func )
82 node -> func( node -> param ) ;
86 pthread_mutex_lock( &node -> lock );
88 node -> param = NULL ;
90 int type = node -> type ;
92 pthread_mutex_unlock( &node -> lock );
96 sem_post( &node -> th_end );
106 pthread_mutex_lock( &node -> lock );
108 pthread_mutex_unlock( &node -> lock );
112 pthread_exit( NULL );
133 thread_pool -> max_threads = nbmaxthr ;
134 thread_pool -> nb_max_waiting = nb_max_waiting ;
135 thread_pool -> nb_actives = 0 ;
138 if( !thread_pool -> thread_list )
145 if( !thread_pool -> waiting_list )
151 pthread_mutex_init( &thread_pool -> lock, NULL );
153 if( sem_init( &thread_pool -> nb_tasks , 0 , 0 ) == -1 )
156 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> nb_tasks", strerror ( error ));
160 for(
int it = 0 ; it < nbmaxthr ; it ++ )
163 thread_pool -> thread_list[ it ] -> type = -1 ;
164 thread_pool -> thread_list[ it ] -> state =
IDLE_PROC ;
165 thread_pool -> thread_list[ it ] -> thread_state =
RUNNING_THREAD ;
166 thread_pool -> thread_list[ it ] -> thread_pool = thread_pool ;
168 if( sem_init( &thread_pool -> thread_list[ it ] -> th_start, 0, 0 ) == -1 )
171 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror ( error ), it );
174 if( sem_init( &thread_pool -> thread_list[ it ] -> th_end, 0, 0 ) == -1 )
177 n_log(
LOG_ERR,
"sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror ( error ), it );
181 thread_pool -> thread_list[ it ] -> func = NULL ;
182 thread_pool -> thread_list[ it ] -> param = NULL ;
184 pthread_mutex_init( &thread_pool -> thread_list[ it ] -> lock, NULL );
188 n_log(
LOG_ERR,
"pthread_create failed : %s for it %d\n", strerror ( errno ), it );
211 if( !thread_pool -> thread_list )
216 if( !(mode&
NO_LOCK) ) pthread_mutex_lock( &thread_pool -> lock );
218 while( it < thread_pool -> max_threads )
220 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
221 if( thread_pool -> thread_list[ it ] -> thread_state ==
RUNNING_THREAD && thread_pool -> thread_list[ it ] -> state ==
IDLE_PROC )
225 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
229 if( it >= thread_pool -> max_threads )
234 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
237 if( thread_pool -> nb_max_waiting <= 0 || ( thread_pool -> waiting_list -> nb_items < thread_pool -> nb_max_waiting ) )
241 proc -> func = func_ptr ;
242 proc -> param = param ;
243 list_push( thread_pool -> waiting_list, proc, free );
244 n_log(
LOG_DEBUG,
"Adding %p %p to waitlist", proc -> func, proc -> param );
248 n_log(
LOG_ERR ,
"Proc %p %p was dropped from waitlist because waitlist is full", func_ptr, param );
253 thread_pool -> thread_list[ it ] -> func = func_ptr ;
254 thread_pool -> thread_list[ it ] -> param = param ;
255 thread_pool -> thread_list[ it ] -> state =
WAITING_PROC ;
256 thread_pool -> thread_list[ it ] -> type = mode ;
257 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
258 sem_post( &thread_pool -> thread_list[ it ] -> th_start );
262 thread_pool -> thread_list[ it ] -> func = func_ptr ;
263 thread_pool -> thread_list[ it ] -> param = param ;
264 thread_pool -> thread_list[ it ] -> state =
WAITING_PROC ;
265 thread_pool -> thread_list[ it ] -> type = mode ;
266 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
270 n_log(
LOG_ERR,
"Unknown mode %d for thread %d", mode, it );
271 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
272 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
275 if( !(mode&
NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
291 if( !thread_pool -> thread_list )
296 pthread_mutex_lock( &thread_pool -> lock );
297 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
300 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
301 if( (thread_pool -> thread_list[ it ] -> type&
SYNCED_PROC) && thread_pool -> thread_list[ it ] -> state ==
WAITING_PROC )
305 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
308 if( sem_post( &thread_pool -> thread_list[ it ] -> th_start ) != 0 )
311 n_log(
LOG_ERR ,
"sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
315 pthread_mutex_unlock( &thread_pool -> lock );
329 __n_assert( thread_pool -> thread_list ,
return FALSE );
332 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
334 if( sem_wait( &thread_pool -> thread_list[ it ] -> th_end ) == -1 )
337 n_log(
LOG_ERR ,
"sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
357 if( !thread_pool -> thread_list )
365 while( thread_pool -> waiting_list -> nb_items > 0 )
376 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
380 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
381 state = thread_pool -> thread_list[ it ] -> state ;
382 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
406 __n_assert( (*pool) -> thread_list,
return FALSE );
408 int state= 0, DONE = 0 ;
413 pthread_mutex_lock( &(*pool) -> lock );
414 for(
int it = 0 ; it < (*pool) -> max_threads ; it ++ )
416 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
417 state = (*pool) -> thread_list[ it ] -> state ;
418 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
423 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
425 sem_post( &(*pool) -> thread_list[ it ] -> th_start );
426 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
434 pthread_mutex_unlock( &(*pool) -> lock );
439 pthread_mutex_lock( &(*pool) -> lock );
440 for(
int it = 0 ; it < (*pool) -> max_threads ; it ++ )
442 pthread_join( (*pool) -> thread_list[ it ] -> thr, NULL );
443 pthread_mutex_destroy( &(*pool) -> thread_list[ it ] -> lock );
444 sem_destroy( &(*pool) -> thread_list[ it ] -> th_start );
445 sem_destroy( &(*pool) -> thread_list[ it ] -> th_end );
446 Free( (*pool) -> thread_list[ it ] );
448 Free( (*pool) -> thread_list );
451 sem_destroy( &(*pool) -> nb_tasks );
453 pthread_mutex_unlock( &(*pool) -> lock );
454 pthread_mutex_destroy( &(*pool) -> lock );
472 __n_assert( thread_pool -> waiting_list,
return FALSE );
475 int push_status = 0 ;
476 pthread_mutex_lock( &thread_pool -> lock );
477 if( thread_pool -> waiting_list && thread_pool -> waiting_list -> start )
479 while( push_status == 1 )
481 LIST_NODE *node = thread_pool -> waiting_list -> start ;
483 if( node && node -> ptr )
491 next_node = node -> next ;
510 thread_pool -> nb_actives = 0 ;
511 for(
int it = 0 ; it < thread_pool -> max_threads ; it ++ )
513 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
514 if( thread_pool -> thread_list[ it ] -> state ==
RUNNING_PROC )
515 thread_pool -> nb_actives ++ ;
516 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
519 if( push_status == 0 && thread_pool -> nb_actives == 0 )
522 sem_getvalue( &thread_pool -> nb_tasks , &value );
525 sem_post( &thread_pool -> nb_tasks );
529 pthread_mutex_unlock( &thread_pool -> lock );
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define Free(__ptr)
Free Handler to get errors.
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for exectution in the thread pool.
int get_nb_cpu_cores()
get number of core of current system
int destroy_threaded_pool(THREAD_POOL **pool, int delay)
delete a thread_pool, exit the threads and free the structs
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
#define DIRECT_PROC
processing mode for added func, direct start
#define RUNNING_PROC
status of a thread which proc is currently running
int wait_for_threaded_pool(THREAD_POOL *thread_pool, int delay)
Wait for all the launched process in the thread pool to terminate.
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
THREAD_POOL * new_thread_pool(int nbmaxthr, int nb_max_waiting)
Create a new pool of nbmaxthr threads.
Structure of a trhead pool.
Structure of a waiting process item.
Common headers and low-level hugly functions & define.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.