Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_thread_pool.c
Go to the documentation of this file.
1
8#include <unistd.h>
9#include "nilorea/n_common.h"
10#include "nilorea/n_log.h"
12#include "nilorea/n_time.h"
13
14#ifdef __linux__
15#include <sys/sysinfo.h>
16#endif
17#include <pthread.h>
18#include <string.h>
19#include <errno.h>
20
25long int get_nb_cpu_cores() {
26 long int nb_procs = 0;
27#ifdef __windows__
28 SYSTEM_INFO sysinfo;
29 GetSystemInfo(&sysinfo);
30 nb_procs = sysinfo.dwNumberOfProcessors;
31#else
32 nb_procs = sysconf(_SC_NPROCESSORS_ONLN);
33#endif
34 return nb_procs;
35}
36
43 THREAD_POOL_NODE* node = (THREAD_POOL_NODE*)param;
44
45 if (!node) {
46 n_log(LOG_ERR, "Thread fatal error, no valid payload found, exiting thread function !");
47 pthread_exit(NULL);
48 return NULL;
49 }
50
51 n_log(LOG_DEBUG, "Thread %ld started", node->thr);
52
53 int thread_state = 0;
54 do {
55 n_log(LOG_DEBUG, "Thread pool processing func waiting");
56
57 // note: direct procs will automatically post th_start
58 sem_wait(&node->th_start);
59
60 pthread_mutex_lock(&node->lock);
61 thread_state = node->thread_state;
62 pthread_mutex_unlock(&node->lock);
63
64 if (thread_state == RUNNING_THREAD) {
65 n_log(LOG_INFO, "Thread pool running proc %p", node->func);
66 pthread_mutex_lock(&node->lock);
67 node->state = RUNNING_PROC;
68 pthread_mutex_unlock(&node->lock);
69
70 if (node && node->func) {
71 node->func(node->param);
72 }
73 n_log(LOG_INFO, "Thread pool end proc %p", node->func);
74
75 pthread_mutex_lock(&node->lock);
76 node->func = NULL;
77 node->param = NULL;
78 node->state = IDLE_PROC;
79 int type = node->type;
80 node->type = -1;
81 // DIRECT_PROC do not need to post th_end
82 if (type & SYNCED_PROC)
83 sem_post(&node->th_end);
84 pthread_mutex_unlock(&node->lock);
85
87 }
88 } while (thread_state != EXITING_THREAD);
89
90 n_log(LOG_DEBUG, "Thread %ld exiting...", node->thr);
91
92 pthread_mutex_lock(&node->lock);
94 pthread_mutex_unlock(&node->lock);
95
96 n_log(LOG_DEBUG, "Thread %ld exited", node->thr);
97
98 pthread_exit(NULL);
99
100 return NULL;
101} /* thread_pool_processing_function */
102
109THREAD_POOL* new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting) {
110 THREAD_POOL* thread_pool = NULL;
111
112 Malloc(thread_pool, THREAD_POOL, 1);
113 if (!thread_pool)
114 return NULL;
115
116 thread_pool->max_threads = nbmaxthr;
117 thread_pool->nb_max_waiting = nb_max_waiting;
118 thread_pool->nb_actives = 0;
119
120 thread_pool->thread_list = (THREAD_POOL_NODE**)malloc(nbmaxthr * sizeof(THREAD_POOL_NODE*));
121 if (!thread_pool->thread_list) {
122 Free(thread_pool);
123 return NULL;
124 }
125
126 thread_pool->waiting_list = new_generic_list(MAX_LIST_ITEMS);
127 if (!thread_pool->waiting_list) {
128 n_log(LOG_ERR, "Unable to initialize wait list");
129 return NULL;
130 }
131
132 pthread_mutex_init(&thread_pool->lock, NULL);
133
134 if (sem_init(&thread_pool->nb_tasks, 0, 0) == -1) {
135 int error = errno;
136 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> nb_tasks", strerror(error));
137 return NULL;
138 }
139
140 for (size_t it = 0; it < nbmaxthr; it++) {
141 Malloc(thread_pool->thread_list[it], THREAD_POOL_NODE, 1);
142 thread_pool->thread_list[it]->type = -1;
143 thread_pool->thread_list[it]->state = IDLE_PROC;
144 thread_pool->thread_list[it]->thread_state = RUNNING_THREAD;
145 thread_pool->thread_list[it]->thread_pool = thread_pool;
146
147 if (sem_init(&thread_pool->thread_list[it]->th_start, 0, 0) == -1) {
148 int error = errno;
149 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d ] -> th_start\n", strerror(error), it);
150 return NULL;
151 }
152 if (sem_init(&thread_pool->thread_list[it]->th_end, 0, 0) == -1) {
153 int error = errno;
154 n_log(LOG_ERR, "sem_init failed : %s on &thread_pool -> thread_list[ %d] -> th_end\n", strerror(error), it);
155 return NULL;
156 }
157
158 thread_pool->thread_list[it]->func = NULL;
159 thread_pool->thread_list[it]->param = NULL;
160
161 pthread_mutex_init(&thread_pool->thread_list[it]->lock, NULL);
162
163 if (pthread_create(&thread_pool->thread_list[it]->thr, NULL, thread_pool_processing_function, (void*)thread_pool->thread_list[it]) != 0) {
164 n_log(LOG_ERR, "pthread_create failed : %s for it %d\n", strerror(errno), it);
165 return NULL;
166 }
167 }
168 return thread_pool;
169} /* new_thread_pool */
170
179int add_threaded_process(THREAD_POOL* thread_pool, void* (*func_ptr)(void* param), void* param, int mode) {
180 if (!thread_pool)
181 return FALSE;
182
183 if (!thread_pool->thread_list)
184 return FALSE;
185
186 size_t it = 0;
187
188 if (!(mode & NO_LOCK)) pthread_mutex_lock(&thread_pool->lock);
189
190 while (it < thread_pool->max_threads) {
191 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
192 if (thread_pool->thread_list[it]->thread_state == RUNNING_THREAD && thread_pool->thread_list[it]->state == IDLE_PROC) {
193 break;
194 }
195 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
196 it++;
197 }
198 // we have a free thread slot, and the lock on it
199 if (it < thread_pool->max_threads) {
200 if (mode & DIRECT_PROC || mode & SYNCED_PROC) {
201 thread_pool->thread_list[it]->func = func_ptr;
202 thread_pool->thread_list[it]->param = param;
203 thread_pool->thread_list[it]->state = WAITING_PROC;
204 thread_pool->thread_list[it]->type = mode;
205 } else {
206 n_log(LOG_ERR, "unknown mode %d for thread %d", mode, it);
207 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
208 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
209 return FALSE;
210 }
211 if (mode & DIRECT_PROC) sem_post(&thread_pool->thread_list[it]->th_start);
212 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
213 n_log(LOG_DEBUG, "proc %p(%p) added on thread %d", func_ptr, param, it);
214 } else {
215 // all thread are occupied -> test waiting lists. not holding thread_list[ it ] lock because it was obligatory unlocked before
216
217 // if already coming from queue, or if it should be part of a synced start, do not re-add && return FALSE
218 if (mode & NOQUEUE || mode & SYNCED_PROC) {
219 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
220 n_log(LOG_DEBUG, "thread pool active threads are all busy, cannot add DIRECT or SYNCED_PROC %p(%p) to pool %p", func_ptr, param, thread_pool);
221 return FALSE;
222 }
223
224 // try adding to wait list
225 if (thread_pool->nb_max_waiting <= 0 || (thread_pool->waiting_list->nb_items < thread_pool->nb_max_waiting)) {
226 THREAD_WAITING_PROC* proc = NULL;
227 Malloc(proc, THREAD_WAITING_PROC, 1);
228 proc->func = func_ptr;
229 proc->param = param;
230 list_push(thread_pool->waiting_list, proc, free);
231 n_log(LOG_DEBUG, "Adding %p %p to waitlist", proc->func, proc->param);
232 } else {
233 n_log(LOG_ERR, "proc %p(%p) was dropped from waitlist because waitlist of thread pool %p is full", func_ptr, param, thread_pool);
234 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
235 return FALSE;
236 }
237 }
238
239 if (!(mode & NO_LOCK)) pthread_mutex_unlock(&thread_pool->lock);
240
241 return TRUE;
242} /* add_threaded_process */
243
250 if (!thread_pool)
251 return FALSE;
252
253 if (!thread_pool->thread_list)
254 return FALSE;
255
256 int retval = TRUE;
257
258 pthread_mutex_lock(&thread_pool->lock);
259 for (size_t it = 0; it < thread_pool->max_threads; it++) {
260 int to_run = 0;
261 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
262 if ((thread_pool->thread_list[it]->type & SYNCED_PROC) && thread_pool->thread_list[it]->state == WAITING_PROC) {
263 to_run = 1;
264 }
265 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
266 if (to_run == 1) {
267 if (sem_post(&thread_pool->thread_list[it]->th_start) != 0) {
268 int error = errno;
269 n_log(LOG_ERR, "sem_post th_start error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
270 retval = FALSE;
271 }
272 }
273 }
274 pthread_mutex_unlock(&thread_pool->lock);
275
276 return retval;
277} /* start_threaded_pool */
278
285 __n_assert(thread_pool, return FALSE);
286 __n_assert(thread_pool->thread_list, return FALSE);
287
288 int retval = TRUE;
289 for (size_t it = 0; it < thread_pool->max_threads; it++) {
290 if (sem_wait(&thread_pool->thread_list[it]->th_end) == -1) {
291 int error = errno;
292 n_log(LOG_ERR, "sem_wait th_end error in thread_pool %p , thread_list[ %d ] : %s", thread_pool, it, strerror(error));
293 retval = FALSE;
294 }
295 }
296 return retval;
297} /* wait_for_synced_threaded_pool */
298
305int wait_for_threaded_pool(THREAD_POOL* thread_pool, unsigned int delay) {
306 if (!thread_pool)
307 return FALSE;
308
309 if (!thread_pool->thread_list)
310 return FALSE;
311
312 int DONE = 0;
313
314 // n_log( LOG_DEBUG, "Waiting for the waitlist of %p to be consumed", thread_pool );
315
316 /* waiting to consume all the waiting list */
317 while (thread_pool->waiting_list->nb_items > 0) {
318 refresh_thread_pool(thread_pool);
319 u_sleep(delay);
320 }
321
322 // n_log( LOG_DEBUG, "Waiting for active process of %p to be terminated", thread_pool );
323 /* waiting for all active procs to have terminated */
324 while (!DONE) {
325 DONE = 1;
326 for (size_t it = 0; it < thread_pool->max_threads; it++) {
327 int state = 0;
328
329 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
330 state = thread_pool->thread_list[it]->state;
331 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
332 if (state != IDLE_PROC) {
333 DONE = 0;
334 // n_log( LOG_DEBUG, "Thread id %d status is not IDLE: %d", it, state );
335 }
336 }
337 u_sleep(delay);
338 refresh_thread_pool(thread_pool);
339 }
340
341 return TRUE;
342}
343
350int destroy_threaded_pool(THREAD_POOL** pool, unsigned int delay) {
351 __n_assert(pool && (*pool), return FALSE);
352 __n_assert((*pool)->thread_list, return FALSE);
353
354 int state = 0, DONE = 0;
355
356 while (!DONE) {
357 DONE = 0;
358 pthread_mutex_lock(&(*pool)->lock);
359 for (size_t it = 0; it < (*pool)->max_threads; it++) {
360 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
361 state = (*pool)->thread_list[it]->state;
362 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
363
364 if (state == IDLE_PROC) {
365 // n_log( LOG_DEBUG, "Posting EXITING to thread %ld", (*pool) -> thread_list[ it ] -> thr );
366 pthread_mutex_lock(&(*pool)->thread_list[it]->lock);
367 (*pool)->thread_list[it]->thread_state = EXITING_THREAD;
368 sem_post(&(*pool)->thread_list[it]->th_start);
369 pthread_mutex_unlock(&(*pool)->thread_list[it]->lock);
370 DONE = 1;
371 } else {
372 // n_log( LOG_DEBUG, "thr %ld proc state %d thr state %d", (*pool) -> thread_list[ it ] -> thr, (*pool) -> thread_list[ it ] -> state,(*pool) -> thread_list[ it ] -> thread_state );
373 }
374 }
375 pthread_mutex_unlock(&(*pool)->lock);
376
377 u_sleep(delay);
378 }
379
380 pthread_mutex_lock(&(*pool)->lock);
381
382 for (size_t it = 0; it < (*pool)->max_threads; it++) {
383 pthread_join((*pool)->thread_list[it]->thr, NULL);
384 pthread_mutex_destroy(&(*pool)->thread_list[it]->lock);
385 sem_destroy(&(*pool)->thread_list[it]->th_start);
386 sem_destroy(&(*pool)->thread_list[it]->th_end);
387 Free((*pool)->thread_list[it]);
388 }
389 Free((*pool)->thread_list);
390 list_destroy(&(*pool)->waiting_list);
391
392 sem_destroy(&(*pool)->nb_tasks);
393
394 pthread_mutex_unlock(&(*pool)->lock);
395 pthread_mutex_destroy(&(*pool)->lock);
396
397 Free((*pool));
398
399 return TRUE;
400} /* destroy_threaded_pool */
401
408 __n_assert(thread_pool, return FALSE);
409 __n_assert(thread_pool->waiting_list, return FALSE);
410
411 /* Trying to empty the wait list */
412 int push_status = 0;
413 pthread_mutex_lock(&thread_pool->lock);
414 if (thread_pool->waiting_list && thread_pool->waiting_list->start)
415 push_status = 1;
416 while (push_status == 1) {
417 LIST_NODE* node = thread_pool->waiting_list->start;
418 LIST_NODE* next_node = NULL;
419 if (node && node->ptr) {
421 if (proc) {
422 if (add_threaded_process(thread_pool, proc->func, proc->param, DIRECT_PROC | NOQUEUE | NO_LOCK) == TRUE) {
423 THREAD_WAITING_PROC* procptr = NULL;
424 next_node = node->next;
425 procptr = remove_list_node(thread_pool->waiting_list, node, THREAD_WAITING_PROC);
426 n_log(LOG_DEBUG, "waitlist: adding %p,%p to %p", procptr->func, procptr->param, thread_pool);
427 Free(procptr);
428 node = next_node;
429 } else {
430 n_log(LOG_DEBUG, "waitlist: cannot add proc %p from waiting list, all active threads are busy !", proc, thread_pool);
431 push_status = 0;
432 }
433 } else {
434 n_log(LOG_ERR, "waitlist: trying to add invalid NULL proc on thread pool %p !", thread_pool);
435 push_status = 0;
436 }
437 } else {
438 push_status = 0;
439 }
440 } // while( push_status == 1 )
441
442 // update statictics
443 thread_pool->nb_actives = 0;
444 for (size_t it = 0; it < thread_pool->max_threads; it++) {
445 pthread_mutex_lock(&thread_pool->thread_list[it]->lock);
446 if (thread_pool->thread_list[it]->state == RUNNING_PROC)
447 thread_pool->nb_actives++;
448 pthread_mutex_unlock(&thread_pool->thread_list[it]->lock);
449 }
450
451 if (push_status == 0 && thread_pool->nb_actives == 0) {
452 int value = 0;
453 sem_getvalue(&thread_pool->nb_tasks, &value);
454 if (value == 0) {
455 sem_post(&thread_pool->nb_tasks);
456 }
457 }
458
459 pthread_mutex_unlock(&thread_pool->lock);
460
461 return TRUE;
462} // refresh_thread_pool()
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:187
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:258
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:242
void * ptr
void pointer to store
Definition n_list.h:26
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:46
size_t nb_items
number of item currently in the list
Definition n_list.h:41
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:32
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:199
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:76
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:518
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:55
Structure of a generic list node.
Definition n_list.h:24
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:69
#define LOG_DEBUG
debug-level messages
Definition n_log.h:64
#define LOG_ERR
error conditions
Definition n_log.h:56
#define LOG_INFO
informational
Definition n_log.h:62
void u_sleep(unsigned int usec)
wrapper around usleep for API consistency
Definition n_time.c:35
size_t nb_max_waiting
Maximum number of waiting procedures int the list, 0 or -1 for unlimited.
int state
state of the proc , RUNNING_PROC when it is busy processing func( param) , IDLE_PROC when it waits fo...
int type
SYNCED or DIRECT process start.
size_t max_threads
Maximum number of running threads in the list.
void * param
if not NULL , passed as argument
void *(* func)(void *param)
function to call in the thread
sem_t th_end
thread ending semaphore
pthread_mutex_t lock
mutex to prevent mutual access of node parameters
struct THREAD_POOL * thread_pool
pointer to assigned thread pool
sem_t th_start
thread starting semaphore
pthread_t thr
thread id
THREAD_POOL_NODE ** thread_list
Dynamically allocated but fixed size thread array.
LIST * waiting_list
Waiting list handling.
size_t nb_actives
number of threads actually doing a proc
sem_t nb_tasks
semaphore to store the number of tasks
void *(* func)(void *param)
function to call in the thread
int thread_state
state of the managing thread , RUNNING_THREAD, EXITING_THREAD, EXITED_THREAD
pthread_mutex_t lock
mutex to prevent mutual access of waiting_list parameters
void * param
if not NULL , passed as argument
int start_threaded_pool(THREAD_POOL *thread_pool)
Launch the process waiting for execution in the thread pool.
THREAD_POOL * new_thread_pool(size_t nbmaxthr, size_t nb_max_waiting)
Create a new pool of nbmaxthr threads.
#define EXITED_THREAD
indicate that the pool is off, all jobs have been consumed
#define SYNCED_PROC
processing mode for added func, synced start
int add_threaded_process(THREAD_POOL *thread_pool, void *(*func_ptr)(void *param), void *param, int mode)
add a function and params to a thread pool
int refresh_thread_pool(THREAD_POOL *thread_pool)
try to add some waiting DIRECT_PROCs on some free thread slots, else do nothing
#define IDLE_PROC
status of a thread which is waiting for some proc
int destroy_threaded_pool(THREAD_POOL **pool, unsigned int delay)
delete a thread_pool, exit the threads and free the structs
int wait_for_synced_threaded_pool(THREAD_POOL *thread_pool)
wait for all the launched process, blocking but light on the CPU as there is no polling
long int get_nb_cpu_cores()
get number of core of current system
#define DIRECT_PROC
processing mode for added func, direct start
int wait_for_threaded_pool(THREAD_POOL *thread_pool, unsigned int delay)
Wait for all the launched process in the thread pool to terminate.
#define RUNNING_PROC
status of a thread which proc is currently running
#define NO_LOCK
if passed to add_threaded_process, skip main table lock in case we are in a func which is already loc...
#define NOQUEUE
processing mode for waiting_list: do not readd the work in queue
#define EXITING_THREAD
indicate that the pool is exiting, unfinished jobs will finish and the pool will exit the threads and...
#define RUNNING_THREAD
indicate that the pool is running and ready to use
#define WAITING_PROC
status of a thread who have proc waiting to be processed
Structure of a trhead pool.
A thread pool node.
Structure of a waiting process item.
Common headers and low-level functions & define.
Generic log system.
void * thread_pool_processing_function(void *param)
Internal thread pool processing function.
Thread pool declaration.
Timing utilities.