Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_thread_pool.c
Go to the documentation of this file.
1
8#include <unistd.h>
9#include "nilorea/n_common.h"
10#include "nilorea/n_log.h"
12#include "nilorea/n_time.h"
13
14#ifdef __linux__
15#include <sys/sysinfo.h>
16#endif
17#include <pthread.h>
18#include <string.h>
19#include <errno.h>
20
26{
27 int nb_procs = -1 ;
28#ifdef __windows__
29 SYSTEM_INFO sysinfo;
30 GetSystemInfo(&sysinfo);
31 nb_procs = sysinfo.dwNumberOfProcessors;
32#else
33 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
34#endif
35 return nb_procs;
36}
37
38
39
46{
47 THREAD_POOL_NODE *node = (THREAD_POOL_NODE *)param ;
48
49 if( !node )
50 {
51 n_log( LOG_ERR, "Thread %ld fatal error, no valid payload found, exited", node -> thr );
52 pthread_exit( NULL );
53 return NULL ;
54 }
55
56 n_log( LOG_DEBUG, "Thread %ld started", node -> thr );
57
58 int thread_state = 0 ;
59 do
60 {
61 n_log( LOG_DEBUG, "Thread pool processing func waiting" );
62
63 // note: direct procs will automatically post th_start
64 sem_wait( &node -> th_start );
65
66 pthread_mutex_lock( &node -> lock );
67 thread_state = node -> thread_state ;
68 pthread_mutex_unlock( &node -> lock );
69
70 if( thread_state == RUNNING_THREAD )
71 {
72 n_log( LOG_INFO, "Thread pool running proc %p", node -> func );
73 pthread_mutex_lock( &node -> lock );
74 node -> state = RUNNING_PROC ;
75 pthread_mutex_unlock( &node -> lock );
76
77 if( node && node -> func )
78 {
79 node -> func( node -> param ) ;
80 }
81 n_log( LOG_INFO, "Thread pool end proc %p", node -> func );
82
83 pthread_mutex_lock( &node -> lock );
84 node -> func = NULL ;
85 node -> param = NULL ;
86 node -> state = IDLE_PROC ;
87 int type = node -> type ;
88 node -> type = -1 ;
89 // DIRECT_PROC do not need to post th_end
90 if( type&SYNCED_PROC )
91 sem_post( &node -> th_end );
92 pthread_mutex_unlock( &node -> lock );
93
94 refresh_thread_pool( node -> thread_pool );
95 }
96 }
97 while( thread_state != EXITING_THREAD );
98
99 n_log( LOG_DEBUG, "Thread %ld exiting...", node -> thr );
100
101 pthread_mutex_lock( &node -> lock );
102 node -> thread_state = EXITED_THREAD ;
103 pthread_mutex_unlock( &node -> lock );
104
105 n_log( LOG_DEBUG, "Thread %ld exited", node -> thr );
106
107 pthread_exit( NULL );
108
109 return NULL ;
110} /* thread_pool_processing_function */
111
112
113
120THREAD_POOL *new_thread_pool( int nbmaxthr, int nb_max_waiting )
121{
122 THREAD_POOL *thread_pool = NULL ;
123
124 Malloc( thread_pool, THREAD_POOL, 1 );
125 if( !thread_pool )
126 return NULL ;
127
128 thread_pool -> max_threads = nbmaxthr ;
129 thread_pool -> nb_max_waiting = nb_max_waiting ;
130 thread_pool -> nb_actives = 0 ;
131
132 thread_pool -> thread_list = (THREAD_POOL_NODE **)malloc( nbmaxthr * sizeof( THREAD_POOL_NODE * ) );
133 if( !thread_pool -> thread_list )
134 {
135 Free( thread_pool );
136 return NULL ;
137 }
138
139 thread_pool -> waiting_list = new_generic_list( 0 );
140 if( !thread_pool -> waiting_list )
141 {
142 n_log( LOG_ERR, "Unable to initialize wait list" );
143 return NULL ;
144 }
145
146 pthread_mutex_init( &thread_pool -> lock, NULL );
147
148 if( sem_init( &thread_pool -> nb_tasks , 0 , 0 ) == -1 )
149 {
150 int error = errno ;
151 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> nb_tasks", strerror ( error ));
152 return NULL;
153 }
154
155 for( int it = 0 ; it < nbmaxthr ; it ++ )
156 {
157 Malloc( thread_pool -> thread_list[ it ], THREAD_POOL_NODE, 1 );
158 thread_pool -> thread_list[ it ] -> type = -1 ;
159 thread_pool -> thread_list[ it ] -> state = IDLE_PROC ;
160 thread_pool -> thread_list[ it ] -> thread_state = RUNNING_THREAD ;
161 thread_pool -> thread_list[ it ] -> thread_pool = thread_pool ;
162
163 if( sem_init( &thread_pool -> thread_list[ it ] -> th_start, 0, 0 ) == -1 )
164 {
165 int error = errno ;
166 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror ( error ), it );
167 return NULL;
168 }
169 if( sem_init( &thread_pool -> thread_list[ it ] -> th_end, 0, 0 ) == -1 )
170 {
171 int error = errno ;
172 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror ( error ), it );
173 return NULL;
174 }
175
176 thread_pool -> thread_list[ it ] -> func = NULL ;
177 thread_pool -> thread_list[ it ] -> param = NULL ;
178
179 pthread_mutex_init( &thread_pool -> thread_list[ it ] -> lock, NULL );
180
181 if( pthread_create( &thread_pool -> thread_list[ it ] -> thr, NULL, thread_pool_processing_function, (void *)thread_pool -> thread_list[ it ] ) != 0 )
182 {
183 n_log( LOG_ERR, "pthread_create failed : %s for it %d\n", strerror ( errno ), it );
184 return NULL;
185 }
186 }
187 return thread_pool ;
188} /* new_thread_pool */
189
190
191
200int add_threaded_process( THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode )
201{
202
203 if( !thread_pool )
204 return FALSE ;
205
206 if( !thread_pool -> thread_list )
207 return FALSE ;
208
209 int it = 0 ;
210
211 if( !(mode&NO_LOCK) ) pthread_mutex_lock( &thread_pool -> lock );
212
213 while( it < thread_pool -> max_threads )
214 {
215 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
216 if( thread_pool -> thread_list[ it ] -> thread_state == RUNNING_THREAD && thread_pool -> thread_list[ it ] -> state == IDLE_PROC )
217 {
218 break ;
219 }
220 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
221 it ++ ;
222 }
223 // we have a free thread slot, and the lock on it
224 if( it < thread_pool -> max_threads )
225 {
226 if( mode&DIRECT_PROC || mode&SYNCED_PROC)
227 {
228 thread_pool -> thread_list[ it ] -> func = func_ptr ;
229 thread_pool -> thread_list[ it ] -> param = param ;
230 thread_pool -> thread_list[ it ] -> state = WAITING_PROC ;
231 thread_pool -> thread_list[ it ] -> type = mode ;
232 }
233 else
234 {
235 n_log( LOG_ERR, "unknown mode %d for thread %d", mode, it );
236 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
237 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
238 return FALSE ;
239 }
240 if( mode&DIRECT_PROC ) sem_post( &thread_pool -> thread_list[ it ] -> th_start );
241 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
242 n_log( LOG_DEBUG , "proc %p(%p) added on thread %d" , func_ptr , param , it );
243 }
244 else
245 {
246 // all thread are occupied -> test waiting lists. not holding thread_list[ it ] lock because it was obligatory unlocked before
247
248 // if already coming from queue, or if it should be part of a synced start, do not re-add && return FALSE
249 if( mode&NOQUEUE || mode&SYNCED_PROC )
250 {
251 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
252 n_log( LOG_DEBUG, "thread pool active threads are all busy, cannot add DIRECT or SYNCED_PROC %p(%p) to pool %p", func_ptr , param , thread_pool );
253 return FALSE ;
254 }
255
256 // try adding to wait list
257 if( thread_pool -> nb_max_waiting <= 0 || ( thread_pool -> waiting_list -> nb_items < thread_pool -> nb_max_waiting ) )
258 {
259 THREAD_WAITING_PROC *proc = NULL ;
260 Malloc( proc, THREAD_WAITING_PROC, 1 );
261 proc -> func = func_ptr ;
262 proc -> param = param ;
263 list_push( thread_pool -> waiting_list, proc, free );
264 n_log( LOG_DEBUG, "Adding %p %p to waitlist", proc -> func, proc -> param );
265 }
266 else
267 {
268 n_log( LOG_ERR , "proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param , thread_pool );
269 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
270 return FALSE ;
271 }
272 }
273
274 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
275
276 return TRUE ;
277} /* add_threaded_process */
278
279
280
287{
288 if( !thread_pool )
289 return FALSE ;
290
291 if( !thread_pool -> thread_list )
292 return FALSE ;
293
294 int retval = TRUE ;
295
296 pthread_mutex_lock( &thread_pool -> lock );
297 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
298 {
299 int to_run = 0 ;
300 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
301 if( (thread_pool -> thread_list[ it ] -> type&SYNCED_PROC) && thread_pool -> thread_list[ it ] -> state == WAITING_PROC )
302 {
303 to_run = 1 ;
304 }
305 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
306 if( to_run == 1 )
307 {
308 if( sem_post( &thread_pool -> thread_list[ it ] -> th_start ) != 0 )
309 {
310 int error = errno ;
311 n_log( LOG_ERR , "sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
312 } retval = FALSE ;
313 }
314 }
315 pthread_mutex_unlock( &thread_pool -> lock );
316
317 return retval ;
318} /* start_threaded_pool */
319
320
327{
328 __n_assert( thread_pool , return FALSE );
329 __n_assert( thread_pool -> thread_list , return FALSE );
330
331 int retval = TRUE ;
332 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
333 {
334 if( sem_wait( &thread_pool -> thread_list[ it ] -> th_end ) == -1 )
335 {
336 int error = errno ;
337 n_log( LOG_ERR , "sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
338 retval = FALSE ;
339 }
340 }
341 return retval ;
342} /* wait_for_synced_threaded_pool */
343
344
351int wait_for_threaded_pool( THREAD_POOL *thread_pool, int delay )
352{
353 if( !thread_pool )
354 return FALSE ;
355
356 if( !thread_pool -> thread_list )
357 return FALSE ;
358
359 int DONE = 0 ;
360
361 //n_log( LOG_DEBUG, "Waiting for the waitlist of %p to be consumed", thread_pool );
362
363 /* waiting to consume all the waiting list */
364 while( thread_pool -> waiting_list -> nb_items > 0 )
365 {
366 refresh_thread_pool( thread_pool );
367 u_sleep( delay );
368 }
369
370 //n_log( LOG_DEBUG, "Waiting for active process of %p to be terminated", thread_pool );
371 /* waiting for all active procs to have terminated */
372 while( !DONE )
373 {
374 DONE = 1 ;
375 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
376 {
377 int state = 0 ;
378
379 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
380 state = thread_pool -> thread_list[ it ] -> state ;
381 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
382 if( state != IDLE_PROC )
383 {
384 DONE = 0 ;
385 //n_log( LOG_DEBUG, "Thread id %d status is not IDLE: %d", it, state );
386 }
387 }
388 u_sleep( delay );
389 refresh_thread_pool( thread_pool );
390 }
391
392 return TRUE ;
393}
394
395
402int destroy_threaded_pool( THREAD_POOL **pool, int delay )
403{
404 __n_assert( pool&&(*pool), return FALSE );
405 __n_assert( (*pool) -> thread_list, return FALSE );
406
407 int state= 0, DONE = 0 ;
408
409 while( !DONE )
410 {
411 DONE = 0 ;
412 pthread_mutex_lock( &(*pool) -> lock );
413 for( int it = 0 ; it < (*pool) -> max_threads ; it ++ )
414 {
415 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
416 state = (*pool) -> thread_list[ it ] -> state ;
417 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
418
419 if( state == IDLE_PROC )
420 {
421 //n_log( LOG_DEBUG, "Posting EXITING to thread %ld", (*pool) -> thread_list[ it ] -> thr );
422 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
423 (*pool) -> thread_list[ it ] -> thread_state = EXITING_THREAD ;
424 sem_post( &(*pool) -> thread_list[ it ] -> th_start );
425 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
426 DONE = 1 ;
427 }
428 else
429 {
430 //n_log( LOG_DEBUG, "thr %ld proc state %d thr state %d", (*pool) -> thread_list[ it ] -> thr, (*pool) -> thread_list[ it ] -> state,(*pool) -> thread_list[ it ] -> thread_state );
431 }
432 }
433 pthread_mutex_unlock( &(*pool) -> lock );
434
435 u_sleep( delay );
436 }
437
438 pthread_mutex_lock( &(*pool) -> lock );
439
440 for( int it = 0 ; it < (*pool) -> max_threads ; it ++ )
441 {
442 pthread_join( (*pool) -> thread_list[ it ] -> thr, NULL );
443 pthread_mutex_destroy( &(*pool) -> thread_list[ it ] -> lock );
444 sem_destroy( &(*pool) -> thread_list[ it ] -> th_start );
445 sem_destroy( &(*pool) -> thread_list[ it ] -> th_end );
446 Free( (*pool) -> thread_list[ it ] );
447 }
448 Free( (*pool) -> thread_list );
449 list_destroy( &(*pool) -> waiting_list );
450
451 sem_destroy( &(*pool) -> nb_tasks );
452
453 pthread_mutex_unlock( &(*pool) -> lock );
454 pthread_mutex_destroy( &(*pool) -> lock );
455
456 Free( (*pool) );
457
458 return TRUE ;
459} /* destroy_threaded_pool */
460
461
462
463
470{
471 __n_assert( thread_pool, return FALSE );
472 __n_assert( thread_pool -> waiting_list, return FALSE );
473
474 /* Trying to empty the wait list */
475 int push_status = 0 ;
476 pthread_mutex_lock( &thread_pool -> lock );
477 if( thread_pool -> waiting_list && thread_pool -> waiting_list -> start )
478 push_status = 1 ;
479 while( push_status == 1 )
480 {
481 LIST_NODE *node = thread_pool -> waiting_list -> start ;
482 LIST_NODE *next_node = NULL ;
483 if( node && node -> ptr )
484 {
485 THREAD_WAITING_PROC *proc = (THREAD_WAITING_PROC *)node -> ptr ;
486 if( proc )
487 {
488 if( add_threaded_process( thread_pool, proc -> func, proc -> param, DIRECT_PROC|NOQUEUE|NO_LOCK ) == TRUE )
489 {
490 THREAD_WAITING_PROC *procptr = NULL ;
491 next_node = node -> next ;
492 procptr = remove_list_node( thread_pool -> waiting_list, node, THREAD_WAITING_PROC );
493 n_log( LOG_DEBUG, "waitlist: adding %p,%p to %p", procptr -> func, procptr -> param, thread_pool );
494 Free( procptr );
495 node = next_node ;
496 }
497 else
498 {
499 n_log( LOG_DEBUG , "waitlist: cannot add proc %p from waiting list, all active threads are busy !" , proc , thread_pool );
500 push_status = 0 ;
501 }
502 }
503 else
504 {
505 n_log( LOG_ERR , "waitlist: trying to add invalid NULL proc on thread pool %p !" , thread_pool );
506 push_status = 0 ;
507 }
508 }
509 else
510 {
511 push_status = 0 ;
512 }
513 } // while( push_status == 1 )
514
515 // update statictics
516 thread_pool -> nb_actives = 0 ;
517 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
518 {
519 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
520 if( thread_pool -> thread_list[ it ] -> state == RUNNING_PROC )
521 thread_pool -> nb_actives ++ ;
522 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
523 }
524
525 if( push_status == 0 && thread_pool -> nb_actives == 0 )
526 {
527 int value = 0 ;
528 sem_getvalue( &thread_pool -> nb_tasks , &value );
529 if( value == 0 )
530 {
531 sem_post( &thread_pool -> nb_tasks );
532 }
533 }
534
535 pthread_mutex_unlock( &thread_pool -> lock );
536
537 return TRUE ;
538} // refresh_thread_pool()
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:191
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:284
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:264
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:83
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:244
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:605
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Definition n_list.c:20
Structure of a generic list node.
Definition n_list.h:27
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:74
#define LOG_DEBUG
debug-level messages
Definition n_log.h:66
#define LOG_ERR
error conditions
Definition n_log.h:58
#define LOG_INFO
informational
Definition n_log.h:64
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:38
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for exectution in the thread pool.
int get_nb_cpu_cores()
get number of core of current system
int destroy_threaded_pool(THREAD_POOL **pool, int delay)
delete a thread_pool, exit the threads and free the structs
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
#define DIRECT_PROC
processing mode for added func, direct start
#define RUNNING_PROC
status of a thread which proc is currently running
int wait_for_threaded_pool(THREAD_POOL *thread_pool, int delay)
Wait for all the launched process in the thread pool to terminate.
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
THREAD_POOL * new_thread_pool(int nbmaxthr, int nb_max_waiting)
Create a new pool of nbmaxthr threads.
Structure of a trhead pool.
A thread pool node.
Structure of a waiting process item.
Common headers and low-level hugly functions & define.
Generic log system.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.
Thread pool declaration.
Timing utilities.