Nilorea Library
C utilities for networking, threading, graphics
n_thread_pool.c
Go to the documentation of this file.
1
8#include "nilorea/n_common.h"
9#include "nilorea/n_log.h"
11#include "nilorea/n_time.h"
12
13#include <unistd.h>
14#ifdef __linux__
15#include <sys/sysinfo.h>
16#endif
17#include <pthread.h>
18#include <string.h>
19#include <errno.h>
20
21
22
28{
29 int nb_procs = -1 ;
30#ifdef __windows__
31 SYSTEM_INFO sysinfo;
32 GetSystemInfo(&sysinfo);
33 nb_procs = sysinfo.dwNumberOfProcessors;
34#else
35 nb_procs = get_nprocs();
36#endif
37 return nb_procs;
38}
39
40
41
48{
49 THREAD_POOL_NODE *node = (THREAD_POOL_NODE *)param ;
50
51 if( !node )
52 {
53 n_log( LOG_ERR, "Thread %ld fatal error, no valid payload found, exited", node -> thr );
54 pthread_exit( NULL );
55 return NULL ;
56 }
57
58 n_log( LOG_DEBUG, "Thread %ld started", node -> thr );
59
60 int thread_state = 0 ;
61 do
62 {
63 //n_log( LOG_DEBUG, "Thread pool processing func waiting" );
64
65 // Direct procs automatically post th_start
66 sem_wait( &node -> th_start );
67
68 pthread_mutex_lock( &node -> lock );
69 thread_state = node -> thread_state ;
70 pthread_mutex_unlock( &node -> lock );
71
72 if( thread_state == RUNNING_THREAD )
73 {
74 //n_log( LOG_DEBUG, "Thread pool running proc %p", node -> func );
75
76 pthread_mutex_lock( &node -> lock );
77 node -> state = RUNNING_PROC ;
78 pthread_mutex_unlock( &node -> lock );
79
80 if( node && node -> func )
81 {
82 node -> func( node -> param ) ;
83 }
84 //n_log( LOG_DEBUG, "Thread pool end proc %p", node -> func );
85
86 pthread_mutex_lock( &node -> lock );
87 node -> func = NULL ;
88 node -> param = NULL ;
89 node -> state = IDLE_PROC ;
90 int type = node -> type ;
91 node -> type = -1 ;
92 pthread_mutex_unlock( &node -> lock );
93
94 // DIRECT_PROC do not need to post th_end
95 if( type&SYNCED_PROC )
96 sem_post( &node -> th_end );
97
98 refresh_thread_pool( node -> thread_pool );
99 }
100
101 }
102 while( thread_state != EXITING_THREAD );
103
104 n_log( LOG_DEBUG, "Thread %ld exiting...", node -> thr );
105
106 pthread_mutex_lock( &node -> lock );
107 node -> thread_state = EXITED_THREAD ;
108 pthread_mutex_unlock( &node -> lock );
109
110 n_log( LOG_DEBUG, "Thread %ld exited", node -> thr );
111
112 pthread_exit( NULL );
113
114 return NULL ;
115} /* thread_pool_processing_function */
116
117
118
125THREAD_POOL *new_thread_pool( int nbmaxthr, int nb_max_waiting )
126{
127 THREAD_POOL *thread_pool = NULL ;
128
129 Malloc( thread_pool, THREAD_POOL, 1 );
130 if( !thread_pool )
131 return NULL ;
132
133 thread_pool -> max_threads = nbmaxthr ;
134 thread_pool -> nb_max_waiting = nb_max_waiting ;
135 thread_pool -> nb_actives = 0 ;
136
137 thread_pool -> thread_list = (THREAD_POOL_NODE **)malloc( nbmaxthr * sizeof( THREAD_POOL_NODE * ) );
138 if( !thread_pool -> thread_list )
139 {
140 Free( thread_pool );
141 return NULL ;
142 }
143
144 thread_pool -> waiting_list = new_generic_list( 0 );
145 if( !thread_pool -> waiting_list )
146 {
147 n_log( LOG_ERR, "Unable to initialize wait list" );
148 return NULL ;
149 }
150
151 pthread_mutex_init( &thread_pool -> lock, NULL );
152
153 if( sem_init( &thread_pool -> nb_tasks , 0 , 0 ) == -1 )
154 {
155 int error = errno ;
156 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> nb_tasks", strerror ( error ));
157 return NULL;
158 }
159
160 for( int it = 0 ; it < nbmaxthr ; it ++ )
161 {
162 Malloc( thread_pool -> thread_list[ it ], THREAD_POOL_NODE, 1 );
163 thread_pool -> thread_list[ it ] -> type = -1 ;
164 thread_pool -> thread_list[ it ] -> state = IDLE_PROC ;
165 thread_pool -> thread_list[ it ] -> thread_state = RUNNING_THREAD ;
166 thread_pool -> thread_list[ it ] -> thread_pool = thread_pool ;
167
168 if( sem_init( &thread_pool -> thread_list[ it ] -> th_start, 0, 0 ) == -1 )
169 {
170 int error = errno ;
171 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror ( error ), it );
172 return NULL;
173 }
174 if( sem_init( &thread_pool -> thread_list[ it ] -> th_end, 0, 0 ) == -1 )
175 {
176 int error = errno ;
177 n_log( LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror ( error ), it );
178 return NULL;
179 }
180
181 thread_pool -> thread_list[ it ] -> func = NULL ;
182 thread_pool -> thread_list[ it ] -> param = NULL ;
183
184 pthread_mutex_init( &thread_pool -> thread_list[ it ] -> lock, NULL );
185
186 if( pthread_create( &thread_pool -> thread_list[ it ] -> thr, NULL, thread_pool_processing_function, (void *)thread_pool -> thread_list[ it ] ) != 0 )
187 {
188 n_log( LOG_ERR, "pthread_create failed : %s for it %d\n", strerror ( errno ), it );
189 return NULL;
190 }
191 }
192 return thread_pool ;
193} /* new_thread_pool */
194
195
196
205int add_threaded_process( THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode )
206{
207
208 if( !thread_pool )
209 return FALSE ;
210
211 if( !thread_pool -> thread_list )
212 return FALSE ;
213
214 int it = 0 ;
215
216 if( !(mode&NO_LOCK) ) pthread_mutex_lock( &thread_pool -> lock );
217
218 while( it < thread_pool -> max_threads )
219 {
220 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
221 if( thread_pool -> thread_list[ it ] -> thread_state == RUNNING_THREAD && thread_pool -> thread_list[ it ] -> state == IDLE_PROC )
222 {
223 break ;
224 }
225 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
226 it ++ ;
227 }
228 /* all thread are occupied -> test waiting lists. not holding node lock because it > max_threads !!! */
229 if( it >= thread_pool -> max_threads )
230 {
231 /* if already coming from queue, or if it should be part of a synced start, do not re-add && return FALSE */
232 if( mode&NOQUEUE || mode&SYNCED_PROC )
233 {
234 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
235 return FALSE ;
236 }
237 if( thread_pool -> nb_max_waiting <= 0 || ( thread_pool -> waiting_list -> nb_items < thread_pool -> nb_max_waiting ) )
238 {
239 THREAD_WAITING_PROC *proc = NULL ;
240 Malloc( proc, THREAD_WAITING_PROC, 1 );
241 proc -> func = func_ptr ;
242 proc -> param = param ;
243 list_push( thread_pool -> waiting_list, proc, free );
244 n_log( LOG_DEBUG, "Adding %p %p to waitlist", proc -> func, proc -> param );
245 }
246 else
247 {
248 n_log( LOG_ERR , "Proc %p %p was dropped from waitlist because waitlist is full", func_ptr, param );
249 }
250 }
251 else if( mode&DIRECT_PROC )
252 {
253 thread_pool -> thread_list[ it ] -> func = func_ptr ;
254 thread_pool -> thread_list[ it ] -> param = param ;
255 thread_pool -> thread_list[ it ] -> state = WAITING_PROC ;
256 thread_pool -> thread_list[ it ] -> type = mode ;
257 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
258 sem_post( &thread_pool -> thread_list[ it ] -> th_start );
259 }
260 else if( mode&SYNCED_PROC )
261 {
262 thread_pool -> thread_list[ it ] -> func = func_ptr ;
263 thread_pool -> thread_list[ it ] -> param = param ;
264 thread_pool -> thread_list[ it ] -> state = WAITING_PROC ;
265 thread_pool -> thread_list[ it ] -> type = mode ;
266 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
267 }
268 else
269 {
270 n_log( LOG_ERR, "Unknown mode %d for thread %d", mode, it );
271 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
272 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
273 return FALSE ;
274 }
275 if( !(mode&NO_LOCK) ) pthread_mutex_unlock( &thread_pool -> lock );
276 return TRUE ;
277} /* add_threaded_process */
278
279
280
287{
288 if( !thread_pool )
289 return FALSE ;
290
291 if( !thread_pool -> thread_list )
292 return FALSE ;
293
294 int retval = TRUE ;
295
296 pthread_mutex_lock( &thread_pool -> lock );
297 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
298 {
299 int to_run = 0 ;
300 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
301 if( (thread_pool -> thread_list[ it ] -> type&SYNCED_PROC) && thread_pool -> thread_list[ it ] -> state == WAITING_PROC )
302 {
303 to_run = 1 ;
304 }
305 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
306 if( to_run == 1 )
307 {
308 if( sem_post( &thread_pool -> thread_list[ it ] -> th_start ) != 0 )
309 {
310 int error = errno ;
311 n_log( LOG_ERR , "sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
312 } retval = FALSE ;
313 }
314 }
315 pthread_mutex_unlock( &thread_pool -> lock );
316
317 return retval ;
318} /* start_threaded_pool */
319
320
327{
328 __n_assert( thread_pool , return FALSE );
329 __n_assert( thread_pool -> thread_list , return FALSE );
330
331 int retval = TRUE ;
332 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
333 {
334 if( sem_wait( &thread_pool -> thread_list[ it ] -> th_end ) == -1 )
335 {
336 int error = errno ;
337 n_log( LOG_ERR , "sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s" , thread_pool , it , strerror( error ) );
338 retval = FALSE ;
339 }
340 }
341
342 return retval ;
343} /* wait_for_synced_threaded_pool */
344
345
352int wait_for_threaded_pool( THREAD_POOL *thread_pool, int delay )
353{
354 if( !thread_pool )
355 return FALSE ;
356
357 if( !thread_pool -> thread_list )
358 return FALSE ;
359
360 int DONE = 0 ;
361
362 //n_log( LOG_DEBUG, "Waiting for the waitlist of %p to be consumed", thread_pool );
363
364 /* waiting to consume all the waiting list */
365 while( thread_pool -> waiting_list -> nb_items > 0 )
366 {
367 refresh_thread_pool( thread_pool );
368 u_sleep( delay );
369 }
370
371 //n_log( LOG_DEBUG, "Waiting for active process of %p to be terminated", thread_pool );
372 /* waiting for all active procs to have terminated */
373 while( !DONE )
374 {
375 DONE = 1 ;
376 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
377 {
378 int state = 0 ;
379
380 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
381 state = thread_pool -> thread_list[ it ] -> state ;
382 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
383 if( state != IDLE_PROC )
384 {
385 DONE = 0 ;
386 //n_log( LOG_DEBUG, "Thread id %d status is not IDLE: %d", it, state );
387 }
388 }
389 u_sleep( delay );
390 refresh_thread_pool( thread_pool );
391 }
392
393 return TRUE ;
394}
395
396
403int destroy_threaded_pool( THREAD_POOL **pool, int delay )
404{
405 __n_assert( pool&&(*pool), return FALSE );
406 __n_assert( (*pool) -> thread_list, return FALSE );
407
408 int state= 0, DONE = 0 ;
409
410 while( !DONE )
411 {
412 DONE = 0 ;
413 pthread_mutex_lock( &(*pool) -> lock );
414 for( int it = 0 ; it < (*pool) -> max_threads ; it ++ )
415 {
416 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
417 state = (*pool) -> thread_list[ it ] -> state ;
418 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
419
420 if( state == IDLE_PROC )
421 {
422 //n_log( LOG_DEBUG, "Posting EXITING to thread %ld", (*pool) -> thread_list[ it ] -> thr );
423 pthread_mutex_lock( &(*pool) -> thread_list[ it ] -> lock );
424 (*pool) -> thread_list[ it ] -> thread_state = EXITING_THREAD ;
425 sem_post( &(*pool) -> thread_list[ it ] -> th_start );
426 pthread_mutex_unlock( &(*pool) -> thread_list[ it ] -> lock );
427 DONE = 1 ;
428 }
429 else
430 {
431 //n_log( LOG_DEBUG, "thr %ld proc state %d thr state %d", (*pool) -> thread_list[ it ] -> thr, (*pool) -> thread_list[ it ] -> state,(*pool) -> thread_list[ it ] -> thread_state );
432 }
433 }
434 pthread_mutex_unlock( &(*pool) -> lock );
435
436 u_sleep( delay );
437 }
438
439 pthread_mutex_lock( &(*pool) -> lock );
440 for( int it = 0 ; it < (*pool) -> max_threads ; it ++ )
441 {
442 pthread_join( (*pool) -> thread_list[ it ] -> thr, NULL );
443 pthread_mutex_destroy( &(*pool) -> thread_list[ it ] -> lock );
444 sem_destroy( &(*pool) -> thread_list[ it ] -> th_start );
445 sem_destroy( &(*pool) -> thread_list[ it ] -> th_end );
446 Free( (*pool) -> thread_list[ it ] );
447 }
448 Free( (*pool) -> thread_list );
449 list_destroy( &(*pool) -> waiting_list );
450
451 sem_destroy( &(*pool) -> nb_tasks );
452
453 pthread_mutex_unlock( &(*pool) -> lock );
454 pthread_mutex_destroy( &(*pool) -> lock );
455
456 Free( (*pool) );
457
458 return TRUE ;
459} /* destroy_threaded_pool */
460
461
462
463
470{
471 __n_assert( thread_pool, return FALSE );
472 __n_assert( thread_pool -> waiting_list, return FALSE );
473
474 /* Trying to empty the wait list */
475 int push_status = 0 ;
476 pthread_mutex_lock( &thread_pool -> lock );
477 if( thread_pool -> waiting_list && thread_pool -> waiting_list -> start )
478 push_status = 1 ;
479 while( push_status == 1 )
480 {
481 LIST_NODE *node = thread_pool -> waiting_list -> start ;
482 LIST_NODE *next_node = NULL ;
483 if( node && node -> ptr )
484 {
485 THREAD_WAITING_PROC *proc = (THREAD_WAITING_PROC *)node -> ptr ;
486 if( proc )
487 {
488 if( add_threaded_process( thread_pool, proc -> func, proc -> param, DIRECT_PROC|NOQUEUE|NO_LOCK ) == TRUE )
489 {
490 THREAD_WAITING_PROC *procptr = NULL ;
491 next_node = node -> next ;
492 procptr = remove_list_node( thread_pool -> waiting_list, node, THREAD_WAITING_PROC );
493 //n_log( LOG_DEBUG, "waitlist: adding %p,%p to %p", procptr -> func, procptr -> param, thread_pool );
494 Free( procptr );
495 node = next_node ;
496 }
497 else
498 {
499 push_status = 0 ;
500 }
501 }
502 }
503 else
504 {
505 push_status = 0 ;
506 }
507 } // while( push_status == 1 )
508
509 // update statictics
510 thread_pool -> nb_actives = 0 ;
511 for( int it = 0 ; it < thread_pool -> max_threads ; it ++ )
512 {
513 pthread_mutex_lock( &thread_pool -> thread_list[ it ] -> lock );
514 if( thread_pool -> thread_list[ it ] -> state == RUNNING_PROC )
515 thread_pool -> nb_actives ++ ;
516 pthread_mutex_unlock( &thread_pool -> thread_list[ it ] -> lock );
517 }
518
519 if( push_status == 0 && thread_pool -> nb_actives == 0 )
520 {
521 int value = 0 ;
522 sem_getvalue( &thread_pool -> nb_tasks , &value );
523 if( value == 0 )
524 {
525 sem_post( &thread_pool -> nb_tasks );
526 }
527 }
528
529 pthread_mutex_unlock( &thread_pool -> lock );
530
531 return TRUE ;
532} // refresh_thread_pool()
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition: n_common.h:183
#define __n_assert(__ptr, __ret)
macro to assert things
Definition: n_common.h:276
#define Free(__ptr)
Free Handler to get errors.
Definition: n_common.h:256
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition: n_list.h:83
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition: n_list.c:244
int list_destroy(LIST **list)
Empty and Free a list container.
Definition: n_list.c:603
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Definition: n_list.c:20
Structure of a generic list node.
Definition: n_list.h:27
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition: n_log.h:74
#define LOG_DEBUG
debug-level messages
Definition: n_log.h:66
#define LOG_ERR
error conditions
Definition: n_log.h:58
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition: n_time.c:38
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for exectution in the thread pool.
int get_nb_cpu_cores()
get number of core of current system
Definition: n_thread_pool.c:27
int destroy_threaded_pool(THREAD_POOL **pool, int delay)
delete a thread_pool, exit the threads and free the structs
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
Definition: n_thread_pool.h:43
#define SYNCED_PROC
processing mode for added func, synced start
Definition: n_thread_pool.h:27
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
Definition: n_thread_pool.h:33
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
#define DIRECT_PROC
processing mode for added func, direct start
Definition: n_thread_pool.h:29
#define RUNNING_PROC
status of a thread which proc is currently running
Definition: n_thread_pool.h:37
int wait_for_threaded_pool(THREAD_POOL *thread_pool, int delay)
Wait for all the launched process in the thread pool to terminate.
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
Definition: n_thread_pool.h:45
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
Definition: n_thread_pool.h:31
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
Definition: n_thread_pool.h:41
#define RUNNING_THREAD
indicate that the pool is running and ready to use
Definition: n_thread_pool.h:39
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Definition: n_thread_pool.h:35
THREAD_POOL * new_thread_pool(int nbmaxthr, int nb_max_waiting)
Create a new pool of nbmaxthr threads.
Structure of a trhead pool.
Definition: n_thread_pool.h:81
A thread pool node.
Definition: n_thread_pool.h:49
Structure of a waiting process item.
Common headers and low-level hugly functions & define.
Generic log system.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.
Definition: n_thread_pool.c:47
Thread pool declaration.
Timing utilities.