Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_thread_pool.c
Go to the documentation of this file.
1
9#include <unistd.h>
10#include "nilorea/n_common.h"
11#include "nilorea/n_log.h"
13#include "nilorea/n_time.h"
14
15#ifdef __linux__
16#include <sys/sysinfo.h>
17#endif
18#include <pthread.h>
19#include <string.h>
20#include <errno.h>
21
26long int get_nb_cpu_cores() {
27 long int nb_procs = 0;
28#ifdef __windows__
29 SYSTEM_INFO sysinfo;
30 GetSystemInfo(&sysinfo);
31 nb_procs = sysinfo.dwNumberOfProcessors;
32#else
33 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
34#endif
35 return nb_procs;
36}
37
44 THREAD_POOL_NODE* node = (THREAD_POOL_NODE*)param;
45
46 if (!node) {
47 n_log(LOG_ERR, "Thread fatal error, no valid payload found, exiting thread function !");
48 pthread_exit(NULL);
49 return NULL;
50 }
51
52 n_log(LOG_DEBUG, "Thread %ld started", node->thr);
53
54 int thread_state = 0;
55 do {
56 n_log(LOG_DEBUG, "Thread pool processing func waiting");
57
58 // note: direct procs will automatically post th_start
59 sem_wait(&node->th_start);
60
61 pthread_mutex_lock(&node->lock);
62 thread_state = node->thread_state;
63 pthread_mutex_unlock(&node->lock);
64
65 if (thread_state == RUNNING_THREAD) {
66 n_log(LOG_INFO, "Thread pool running proc %p", node->func);
67 pthread_mutex_lock(&node->lock);
68 node->state = RUNNING_PROC;
69 pthread_mutex_unlock(&node->lock);
70
71 if (node && node->func) {
72 node->func(node->param);
73 }
74 n_log(LOG_INFO, "Thread pool end proc %p", node->func);
75
76 pthread_mutex_lock(&node->lock);
77 node->func = NULL;
78 node->param = NULL;
79 node->state = IDLE_PROC;
80 int type = node->type;
81 node->type = -1;
82 // NORMAL_PROC or DIRECT_PROC do not need to post th_end
83 if (type & SYNCED_PROC)
84 sem_post(&node->th_end);
85 pthread_mutex_unlock(&node->lock);
86
88 }
89 } while (thread_state != EXITING_THREAD);
90
91 n_log(LOG_DEBUG, "Thread %ld exiting...", node->thr);
92
93 pthread_mutex_lock(&node->lock);
95 pthread_mutex_unlock(&node->lock);
96
97 n_log(LOG_DEBUG, "Thread %ld exited", node->thr);
98
99 pthread_exit(NULL);
100
101 return NULL;
102} /* thread_pool_processing_function */
103
110THREAD_POOL* new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting) {
111 THREAD_POOL* thread_pool = NULL;
112
114 if (!thread_pool)
115 return NULL;
116
117 thread_pool->max_threads = nbmaxthr;
118 thread_pool->nb_max_waiting = nb_max_waiting;
120
121 thread_pool->thread_list = (THREAD_POOL_NODE**)malloc(nbmaxthr * sizeof(THREAD_POOL_NODE*));
122 if (!thread_pool->thread_list) {
124 return NULL;
125 }
126
129 n_log(LOG_ERR, "Unable to initialize wait list");
130 return NULL;
131 }
132
133 pthread_mutex_init(&thread_pool->lock, NULL);
134
135 if (sem_init(&thread_pool->nb_tasks, 0, 0) == -1) {
136 int error = errno;
137 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
138 return NULL;
139 }
140
141 for (size_t it = 0; it < nbmaxthr; it++) {
143 thread_pool->thread_list[it]->type = -1;
147
148 if (sem_init(&thread_pool->thread_list[it]->th_start, 0, 0) == -1) {
149 int error = errno;
150 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start", strerror(error), it);
151 return NULL;
152 }
153 if (sem_init(&thread_pool->thread_list[it]->th_end, 0, 0) == -1) {
154 int error = errno;
155 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end", strerror(error), it);
156 return NULL;
157 }
158
159 thread_pool->thread_list[it]->func = NULL;
160 thread_pool->thread_list[it]->param = NULL;
161
162 pthread_mutex_init(&thread_pool->thread_list[it]->lock, NULL);
163
164 if (pthread_create(&thread_pool->thread_list[it]->thr, NULL, thread_pool_processing_function, (void*)thread_pool->thread_list[it]) != 0) {
165 n_log(LOG_ERR, "pthread_create failed : %s for it %d", strerror(errno), it);
166 return NULL;
167 }
168 }
169 return thread_pool;
170} /* new_thread_pool */
171
180int add_threaded_process(THREAD_POOL* thread_pool, void* (*func_ptr)(void* param), void* param, int mode) {
181 if (!thread_pool) {
182 n_log( LOG_ERR , "thread_pool is not allocated, can't add processes to it !");
183 return FALSE;
184 }
185
186 if (!thread_pool->thread_list) {
187 n_log( LOG_ERR , "thread_pool thread_list is not allocated, can't add processes to it !");
188 return FALSE;
189 }
190
191 if (!(mode & NO_LOCK)) pthread_mutex_lock(&thread_pool->lock);
192
193 size_t it = 0;
194 while (it < thread_pool->max_threads) {
195 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
197 break;
198 }
199 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
200 it++;
201 }
202 // we have a free thread slot, and the lock on it
203 if (it < thread_pool->max_threads) {
205 thread_pool->thread_list[it]->func = func_ptr;
206 thread_pool->thread_list[it]->param = param;
209 } else {
210 n_log(LOG_ERR, "unknown mode %d for thread %d", mode, it);
211 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
212 if (!(mode & NO_LOCK))
213 pthread_mutex_unlock(&thread_pool->lock);
214 return FALSE;
215 }
216 if (mode & NORMAL_PROC || mode & DIRECT_PROC)
217 sem_post(&thread_pool->thread_list[it]->th_start);
218 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
219 n_log(LOG_DEBUG, "proc %p(%p) added on thread %d", func_ptr, param, it);
220 } else {
221 // all thread are occupied -> test waiting lists. not holding thread_list[ it ] lock because it was obligatory unlocked before
222
223 // if already coming from queue, or if it should be part of a synced start, do not re-add && return FALSE
224 // it's only an error if SYNCED_PROC mode
225 int cancel_and_return = FALSE ;
226 if( mode & NO_QUEUE ) {
227 n_log(LOG_DEBUG, "Thread pool active threads are all busy and mode is NO_QUEUE, cannot add %p(%p) to pool %p", func_ptr, param, thread_pool);
228 cancel_and_return = TRUE ;
229 } else if( mode & SYNCED_PROC) {
230 n_log(LOG_ERR, "Thread pool active threads are all busy, cannot add SYNCED_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
231 cancel_and_return = TRUE ;
232 } else if( mode & DIRECT_PROC) {
233 n_log(LOG_ERR, "Thread pool active threads are all busy, cannot add DIRECT_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
234 cancel_and_return = TRUE ;
235 }
236
237 if( cancel_and_return )
238 {
239 if (!(mode & NO_LOCK))
240 pthread_mutex_unlock(&thread_pool->lock);
241 return FALSE;
242 }
243
244 // try adding to wait list
246 THREAD_WAITING_PROC* proc = NULL;
247 Malloc(proc, THREAD_WAITING_PROC, 1);
248 proc->func = func_ptr;
249 proc->param = param;
250 list_push(thread_pool->waiting_list, proc, free);
251 n_log(LOG_DEBUG, "Adding %p %p to waitlist", proc->func, proc->param);
252 } else {
253 n_log(LOG_ERR, "proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param, thread_pool);
254 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
255 return FALSE;
256 }
257 }
258
259 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
260
261 return TRUE;
262} /* add_threaded_process */
263
270 if (!thread_pool)
271 return FALSE;
272
274 return FALSE;
275
276 int retval = TRUE;
277
278 pthread_mutex_lock(&thread_pool->lock);
279 for (size_t it = 0; it < thread_pool->max_threads; it++) {
280 int to_run = 0;
281 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
283 to_run = 1;
284 }
285 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
286 if (to_run == 1) {
287 if (sem_post(&thread_pool->thread_list[it]->th_start) != 0) {
288 int error = errno;
289 n_log(LOG_ERR, "sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
290 retval = FALSE;
291 }
292 }
293 }
294 pthread_mutex_unlock(&thread_pool->lock);
295
296 return retval;
297} /* start_threaded_pool */
298
305 __n_assert(thread_pool, return FALSE);
306 __n_assert(thread_pool->thread_list, return FALSE);
307
308 int retval = TRUE;
309 for (size_t it = 0; it < thread_pool->max_threads; it++) {
310 if (sem_wait(&thread_pool->thread_list[it]->th_end) == -1) {
311 int error = errno;
312 n_log(LOG_ERR, "sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
313 retval = FALSE;
314 }
315 }
316 return retval;
317} /* wait_for_synced_threaded_pool */
318
326 if (!thread_pool)
327 return FALSE;
328
330 return FALSE;
331
332 int DONE = 0;
333
334 // n_log( LOG_DEBUG, "Waiting for the waitlist of %p to be consumed", thread_pool );
335
336 /* waiting to consume all the waiting list */
337 while (thread_pool->waiting_list->nb_items > 0) {
339 u_sleep(delay);
340 }
341
342 // n_log( LOG_DEBUG, "Waiting for active process of %p to be terminated", thread_pool );
343 /* waiting for all active procs to have terminated */
344 while (!DONE) {
345 DONE = 1;
346 for (size_t it = 0; it < thread_pool->max_threads; it++) {
347 int state = 0;
348
349 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
350 state = thread_pool->thread_list[it]->state;
351 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
352 if (state != IDLE_PROC) {
353 DONE = 0;
354 // n_log( LOG_DEBUG, "Thread id %d status is not IDLE: %d", it, state );
355 }
356 }
357 u_sleep(delay);
359 }
360
361 return TRUE;
362}
363
370int destroy_threaded_pool(THREAD_POOL** pool, unsigned int delay) {
371 __n_assert(pool && (*pool), return FALSE);
372 __n_assert((*pool)->thread_list, return FALSE);
373
374 int state = 0, DONE = 0;
375
376 while (!DONE) {
377 DONE = 0;
378 pthread_mutex_lock(&(*pool)->lock);
379 for (size_t it = 0; it < (*pool)->max_threads; it++) {
380 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
381 state = (*pool)->thread_list[it]->state;
382 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
383
384 if (state == IDLE_PROC) {
385 // n_log( LOG_DEBUG, "Posting EXITING to thread %ld", (*pool) -> thread_list[ it ] -> thr );
386 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
387 (*pool)->thread_list[it]->thread_state = EXITING_THREAD;
388 sem_post(&(*pool)->thread_list[it]->th_start);
389 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
390 DONE = 1;
391 } else {
392 // n_log( LOG_DEBUG, "thr %ld proc state %d thr state %d", (*pool) -> thread_list[ it ] -> thr, (*pool) -> thread_list[ it ] -> state,(*pool) -> thread_list[ it ] -> thread_state );
393 }
394 }
395 pthread_mutex_unlock(&(*pool)->lock);
396
397 u_sleep(delay);
398 }
399
400 pthread_mutex_lock(&(*pool)->lock);
401
402 for (size_t it = 0; it < (*pool)->max_threads; it++) {
403 pthread_join((*pool)->thread_list[it]->thr, NULL);
404 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
405 sem_destroy(&(*pool)->thread_list[it]->th_start);
406 sem_destroy(&(*pool)->thread_list[it]->th_end);
407 Free((*pool)->thread_list[it]);
408 }
409 Free((*pool)->thread_list);
410 list_destroy(&(*pool)->waiting_list);
411
412 sem_destroy(&(*pool)->nb_tasks);
413
414 pthread_mutex_unlock(&(*pool)->lock);
415 pthread_mutex_destroy(&(*pool)->lock);
416
417 Free((*pool));
418
419 return TRUE;
420} /* destroy_threaded_pool */
421
428 __n_assert(thread_pool, return FALSE);
429 __n_assert(thread_pool->waiting_list, return FALSE);
430
431 /* Trying to empty the wait list */
432 int push_status = 0;
433 pthread_mutex_lock(&thread_pool->lock);
435 push_status = 1;
436 while (push_status == 1) {
438 LIST_NODE* next_node = NULL;
439 if (node && node->ptr) {
441 if (proc) {
442 if (add_threaded_process(thread_pool, proc->func, proc->param, NORMAL_PROC | NO_QUEUE | NO_LOCK) == TRUE) {
443 THREAD_WAITING_PROC* procptr = NULL;
444 next_node = node->next;
446 n_log(LOG_DEBUG, "waitlist: adding %p,%p to %p", procptr->func, procptr->param, thread_pool);
447 Free(procptr);
448 node = next_node;
449 } else {
450 n_log(LOG_DEBUG, "waitlist: cannot add proc %p from waiting list, all active threads are busy !", proc, thread_pool);
451 push_status = 0;
452 }
453 } else {
454 n_log(LOG_ERR, "waitlist: trying to add invalid NULL proc on thread pool %p !", thread_pool);
455 push_status = 0;
456 }
457 } else {
458 push_status = 0;
459 }
460 } // while( push_status == 1 )
461
462 // update statictics
464 for (size_t it = 0; it < thread_pool->max_threads; it++) {
465 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
468 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
469 }
470
471 if (push_status == 0 && thread_pool->nb_actives == 0) {
472 int value = 0;
473 sem_getvalue(&thread_pool->nb_tasks, &value);
474 if (value == 0) {
475 sem_post(&thread_pool->nb_tasks);
476 }
477 }
478
479 pthread_mutex_unlock(&thread_pool->lock);
480
481 return TRUE;
482} // refresh_thread_pool()
THREAD_POOL * thread_pool
Definition ex_fluid.c:59
int DONE
Definition ex_fluid.c:41
int mode
Network for managing conenctions.
Definition ex_network.c:22
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:185
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:256
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:240
void * ptr
void pointer to store
Definition n_list.h:27
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:47
size_t nb_items
number of item currently in the list
Definition n_list.h:42
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:33
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:200
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:77
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:519
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
Definition n_list.c:19
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:56
Structure of a generic list node.
Definition n_list.h:25
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:70
#define LOG_DEBUG
debug-level messages
Definition n_log.h:65
#define LOG_ERR
error conditions
Definition n_log.h:57
#define LOG_INFO
informational
Definition n_log.h:63
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:35
size_t nb_max_waiting
Maximum number of waiting procedures int the list, 0 or -1 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
pthread_t thr
thread id
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore to store the number of tasks
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
#define NORMAL_PROC
processing mode for added func, synced start, can be queued
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start, not queued
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
#define NO_QUEUE
special processing mode for waiting_list: do not add the work in queue since it' coming from the queu...
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start, not queued
int wait_for_threaded_pool(THREAD_POOL *thread_pool, unsigned int delay)
Wait for all the launched process in the thread pool to terminate.
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a trhead pool.
A thread pool node.
Structure of a waiting process item.
Common headers and low-level functions & define.
Generic log system.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.
Thread pool declaration.
Timing utilities.