Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_kafka.c
Go to the documentation of this file.
1
9#include "nilorea/n_kafka.h"
10#include "nilorea/n_common.h"
11#include "nilorea/n_base64.h"
12
18int32_t n_kafka_get_schema_from_char(char* string) {
19 __n_assert(string, return -1);
20
21 uint32_t raw_schema_id = 0;
22 memcpy(&raw_schema_id, string + 1, sizeof(uint32_t));
23
24 return (int32_t)ntohl(raw_schema_id);
25}
26
33 __n_assert(string, return -1);
34 __n_assert(string->data, return -1);
35 __n_assert((string->written >= sizeof(int32_t)), return -1);
36 return n_kafka_get_schema_from_char(string->data);
37}
38
45int n_kafka_put_schema_in_char(char* string, int schema_id) {
46 __n_assert(string, return FALSE);
47 uint32_t schema_id_htonl = htonl((uint32_t)schema_id); // cast to unsigned to avoid warning
48 memcpy(string + 1, &schema_id_htonl, sizeof(uint32_t));
49 return TRUE;
50}
51
58int n_kafka_put_schema_in_nstr(N_STR* string, int schema_id) {
59 __n_assert(string, return FALSE);
60 __n_assert(string->data, return FALSE);
61 __n_assert((string->written >= sizeof(int32_t)), return FALSE);
62 return n_kafka_put_schema_in_char(string->data, schema_id);
63}
64
73int n_kafka_get_status(N_KAFKA* kafka, size_t* nb_queued, size_t* nb_waiting, size_t* nb_error) {
74 __n_assert(kafka, return FALSE);
75 __n_assert(nb_waiting, return FALSE);
76 __n_assert(nb_error, return FALSE);
77 read_lock(kafka->rwlock);
78 int status = kafka->pooling_thread_status;
79 *nb_queued = kafka->nb_queued;
80 *nb_waiting = kafka->nb_waiting;
81 *nb_error = kafka->nb_error;
82 unlock(kafka->rwlock);
83 return status;
84}
85
95static void n_kafka_delivery_message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
96 (void)opaque;
97
98 __n_assert(rk, n_log(LOG_ERR, "rk=NULL is not a valid kafka handle"); return);
99 __n_assert(rkmessage, n_log(LOG_ERR, "rkmessage=NULL is not a valid kafka message"); return);
100
101 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)rkmessage->_private;
102
103 if (rkmessage->err) {
104 n_log(LOG_ERR, "message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
105 if (!event) {
106 n_log(LOG_ERR, "fatal: event is NULL");
107 return;
108 }
109 if (event && event->parent_table)
110 write_lock(event->parent_table->rwlock);
111 event->status = N_KAFKA_EVENT_ERROR;
112 if (event && event->parent_table)
113 unlock(event->parent_table->rwlock);
114 } else {
115 n_log(LOG_DEBUG, "message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
116 if (event) {
117 // lock
118 if (event && event->parent_table)
119 write_lock(event->parent_table->rwlock);
120 // delete produce event linked files
121 if (event->event_files_to_delete) {
122 char** files_to_delete = split(_nstr(event->event_files_to_delete), ";", 0);
123 if (files_to_delete) {
124 if (split_count(files_to_delete) > 0) {
125 int index = 0;
126 while (files_to_delete[index]) {
127 int ret = unlink(files_to_delete[index]);
128 int error = errno;
129 if (ret == 0) {
130 n_log(LOG_DEBUG, "deleted on produce ack: %s", files_to_delete[index]);
131 } else {
132 n_log(LOG_ERR, "couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
133 }
134 index++;
135 }
136 } else {
137 n_log(LOG_ERR, "split result is empty !");
138 }
139 }
140 free_split_result(&files_to_delete);
141 }
142 // set status
143 event->status = N_KAFKA_EVENT_OK;
144 // unlock
145 if (event && event->parent_table)
146 unlock(event->parent_table->rwlock);
147 n_log(LOG_INFO, "kafka event %p received an ack !", event);
148 } else {
149 n_log(LOG_ERR, "fatal: event is NULL");
150 }
151 }
152 return;
153 /* The rkmessage is destroyed automatically by librdkafka */
154}
155
161 __n_assert(kafka, return);
162
163 FreeNoLog(kafka->topic);
164 FreeNoLog(kafka->topics);
165 FreeNoLog(kafka->groupid);
166
168
169 if (kafka->rd_kafka_handle) {
170 if (kafka->mode == RD_KAFKA_CONSUMER) {
171 rd_kafka_consumer_close(kafka->rd_kafka_handle);
172 rd_kafka_topic_partition_list_destroy(kafka->subscription);
173 }
174 if (kafka->mode == RD_KAFKA_PRODUCER) {
175 rd_kafka_flush(kafka->rd_kafka_handle, kafka->poll_timeout);
176 }
177 rd_kafka_destroy(kafka->rd_kafka_handle);
178 n_log(LOG_DEBUG, "kafka handle destroyed");
179 }
180
181 if (kafka->rd_kafka_conf) {
182 rd_kafka_conf_destroy(kafka->rd_kafka_conf);
183 }
184
185 if (kafka->configuration)
186 cJSON_Delete(kafka->configuration);
187
188 if (kafka->errstr)
189 free_nstr(&kafka->errstr);
190
191 if (kafka->events_to_send)
193
194 if (kafka->received_events)
196
197 rw_lock_destroy(kafka->rwlock);
198
199 if (kafka->rd_kafka_topic)
200 rd_kafka_topic_destroy(kafka->rd_kafka_topic);
201
202 Free(kafka->bootstrap_servers);
203
204 Free(kafka);
205 return;
206}
207
215N_KAFKA* n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len) {
216 N_KAFKA* kafka = NULL;
217 Malloc(kafka, N_KAFKA, 1);
218 __n_assert(kafka, return NULL);
219
220 kafka->errstr = new_nstr(errstr_len);
221 __n_assert(kafka->errstr, Free(kafka); return NULL);
222
223 kafka->events_to_send = NULL;
224 kafka->received_events = NULL;
225 kafka->rd_kafka_conf = NULL;
226 kafka->rd_kafka_handle = NULL;
227 kafka->configuration = NULL;
228 kafka->groupid = NULL;
229 kafka->topics = NULL;
230 kafka->subscription = NULL;
231 kafka->topic = NULL;
232 kafka->mode = -1;
233 kafka->schema_id = -1;
234 kafka->poll_timeout = poll_timeout;
235 kafka->poll_interval = poll_interval;
236 kafka->nb_queued = 0;
237 kafka->nb_waiting = 0;
238 kafka->nb_error = 0;
239 kafka->pooling_thread_status = 0;
240 kafka->bootstrap_servers = NULL;
241
243 __n_assert(kafka->events_to_send, n_kafka_delete(kafka); return NULL);
244
246 __n_assert(kafka->events_to_send, n_kafka_delete(kafka); return NULL);
247
248 if (init_lock(kafka->rwlock) != 0) {
249 n_log(LOG_ERR, "could not init kafka rwlock in kafka structure at address %p", kafka);
250 n_kafka_delete(kafka);
251 return NULL;
252 }
253
254 return kafka;
255}
256
264 __n_assert(config_file, return NULL);
265
266 N_KAFKA* kafka = NULL;
267
268 kafka = n_kafka_new(100, -1, 1024);
269 __n_assert(kafka, return NULL);
270
271 // initialize kafka object
272 kafka->rd_kafka_conf = rd_kafka_conf_new();
273
274 // load config file
275 N_STR* config_string = NULL;
276 config_string = file_to_nstr(config_file);
277 if (!config_string) {
278 n_log(LOG_ERR, "unable to read config from file %s !", config_file);
279 n_kafka_delete(kafka);
280 return NULL;
281 }
282 cJSON* json = NULL;
283 json = cJSON_Parse(_nstrp(config_string));
284 if (!json) {
285 n_log(LOG_ERR, "unable to parse json %s", config_string);
286 free_nstr(&config_string);
287 n_kafka_delete(kafka);
288 return NULL;
289 }
290
291 int jsonIndex;
292 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
293 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
294
295 if (!entry) continue;
296 __n_assert(entry->string, continue);
297 if (!entry->valuestring) {
298 n_log(LOG_DEBUG, "no valuestring for entry %s", _str(entry->string));
299 continue;
300 }
301
302 if (entry->string[0] != '-') {
303 // if it's not one of the optionnal parameters not managed by kafka, then we can use rd_kafka_conf_set on them
304 if (strcmp("topic", entry->string) != 0 && strcmp("topics", entry->string) != 0 && strcmp("value.schema.id", entry->string) != 0 && strcmp("value.schema.type", entry->string) != 0 && strcmp("poll.interval", entry->string) != 0 && strcmp("poll.timeout", entry->string) != 0 && strcmp("group.id.autogen", entry->string) != 0) {
305 if (!strcmp("group.id", entry->string)) {
306 // exclude group id for producer
307 if (mode == RD_KAFKA_PRODUCER)
308 continue;
309 kafka->groupid = strdup(entry->valuestring);
310 }
311
312 if (rd_kafka_conf_set(kafka->rd_kafka_conf, entry->string, entry->valuestring, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
313 n_log(LOG_ERR, "kafka config: %s", _nstr(kafka->errstr));
314 } else {
315 n_log(LOG_DEBUG, "kafka config enabled: %s => %s", entry->string, entry->valuestring);
316 }
317 }
318 } else {
319 n_log(LOG_DEBUG, "kafka disabled config: %s => %s", entry->string, entry->valuestring);
320 }
321 }
322
323 // other parameters, not directly managed by kafka API (will cause an error if used along rd_kafka_conf_set )
324 // producer topic
325 cJSON* jstr = NULL;
326 jstr = cJSON_GetObjectItem(json, "topic");
327 if (jstr && jstr->valuestring) {
328 kafka->topic = strdup(jstr->valuestring);
329 n_log(LOG_DEBUG, "kafka producer topic: %s", kafka->topic);
330 } else {
331 if (mode == RD_KAFKA_PRODUCER) {
332 n_log(LOG_ERR, "no topic configured !");
333 n_kafka_delete(kafka);
334 return NULL;
335 }
336 }
337 // consumer topics
338 jstr = cJSON_GetObjectItem(json, "topics");
339 if (jstr && jstr->valuestring) {
340 kafka->topics = split(jstr->valuestring, ",", 0);
341 n_log(LOG_DEBUG, "kafka consumer topics: %s", jstr->valuestring);
342 } else {
343 if (mode == RD_KAFKA_CONSUMER) {
344 n_log(LOG_ERR, "no topics configured !");
345 n_kafka_delete(kafka);
346 return NULL;
347 }
348 }
349 jstr = cJSON_GetObjectItem(json, "value.schema.id");
350 if (jstr && jstr->valuestring) {
351 int schem_v = atoi(jstr->valuestring);
352 if (schem_v < -1 || schem_v > 9999) {
353 n_log(LOG_ERR, "invalid schema id %d", schem_v);
354 n_kafka_delete(kafka);
355 return NULL;
356 }
357 n_log(LOG_DEBUG, "kafka schema id: %d", schem_v);
358 kafka->schema_id = schem_v;
359 }
360
361 jstr = cJSON_GetObjectItem(json, "poll.interval");
362 if (jstr && jstr->valuestring) {
363 kafka->poll_interval = atoi(jstr->valuestring);
364 n_log(LOG_DEBUG, "kafka poll interval: %d", kafka->poll_interval);
365 }
366
367 jstr = cJSON_GetObjectItem(json, "poll.timeout");
368 if (jstr && jstr->valuestring) {
369 kafka->poll_timeout = atoi(jstr->valuestring);
370 n_log(LOG_DEBUG, "kafka poll timeout: %d", kafka->poll_timeout);
371 }
372
373 // saving bootstrap servers in struct
374 jstr = cJSON_GetObjectItem(json, "bootstrap.servers");
375 if (jstr && jstr->valuestring) {
376 kafka->bootstrap_servers = strdup(jstr->valuestring);
377 n_log(LOG_DEBUG, "kafka bootstrap server: %s", kafka->bootstrap_servers);
378 }
379
380 if (mode == RD_KAFKA_PRODUCER) {
381 // set delivery callback
382 rd_kafka_conf_set_dr_msg_cb(kafka->rd_kafka_conf, n_kafka_delivery_message_callback);
383
384 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
385 if (!kafka->rd_kafka_handle) {
386 n_log(LOG_ERR, "failed to create new producer: %s", _nstr(kafka->errstr));
387 n_kafka_delete(kafka);
388 return NULL;
389 }
390 // conf is now owned by kafka handle
391 kafka->rd_kafka_conf = NULL;
392 // Create topic object
393 kafka->rd_kafka_topic = rd_kafka_topic_new(kafka->rd_kafka_handle, kafka->topic, NULL);
394 if (!kafka->rd_kafka_topic) {
395 n_kafka_delete(kafka);
396 return NULL;
397 }
398 kafka->mode = RD_KAFKA_PRODUCER;
399 } else if (mode == RD_KAFKA_CONSUMER) {
400 /* If there is no previously committed offset for a partition
401 * the auto.offset.reset strategy will be used to decide where
402 * in the partition to start fetching messages.
403 * By setting this to earliest the consumer will read all messages
404 * in the partition if there was no previously committed offset. */
405 /*if (rd_kafka_conf_set(kafka -> rd_kafka_conf, "auto.offset.reset", "earliest", _nstr( kafka -> errstr ) , kafka -> errstr -> length) != RD_KAFKA_CONF_OK) {
406 n_log( LOG_ERR , "kafka conf set: %s", kafka -> errstr);
407 n_kafka_delete( kafka );
408 return NULL ;
409 }*/
410
411 // if groupid is not set, generate a unique one
412 if (!kafka->groupid) {
413 char computer_name[1024] = "";
414 get_computer_name(computer_name, 1024);
415 N_STR* groupid = new_nstr(1024);
416 char* topics = join(kafka->topics, "_");
417 // generating group id
418 jstr = cJSON_GetObjectItem(json, "group.id.autogen");
419 if (jstr && jstr->valuestring) {
420 if (strcmp(jstr->valuestring, "host-topic-group") == 0) {
421 // nstrprintf( groupid , "%s_%s_%s" , computer_name , topics ,kafka -> bootstrap_servers );
422 nstrprintf(groupid, "%s_%s", computer_name, topics);
423 } else if (strcmp(jstr->valuestring, "unique-group") == 0) {
424 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
425 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
426 }
427 } else // default unique group
428 {
429 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
430 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
431 n_log(LOG_DEBUG, "group.id is not set and group.id.autogen is not set, generated unique group id: %s", _nstr(groupid));
432 }
433 free(topics);
434 kafka->groupid = groupid->data;
435 groupid->data = NULL;
436 free_nstr(&groupid);
437 }
438 if (rd_kafka_conf_set(kafka->rd_kafka_conf, "group.id", kafka->groupid, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
439 n_log(LOG_ERR, "kafka consumer group.id error: %s", _nstr(kafka->errstr));
440 n_kafka_delete(kafka);
441 return NULL;
442 } else {
443 n_log(LOG_DEBUG, "kafka consumer group.id => %s", kafka->groupid);
444 }
445
446 /* Create consumer instance.
447 * NOTE: rd_kafka_new() takes ownership of the conf object
448 * and the application must not reference it again after
449 * this call.
450 */
451 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
452 if (!kafka->rd_kafka_handle) {
453 n_log(LOG_ERR, "%% Failed to create new consumer: %s", kafka->errstr);
454 n_kafka_delete(kafka);
455 return NULL;
456 }
457 // conf is now owned by kafka handle
458 kafka->rd_kafka_conf = NULL;
459
460 /* Redirect all messages from per-partition queues to
461 * the main queue so that messages can be consumed with one
462 * call from all assigned partitions.
463 *
464 * The alternative is to poll the main queue (for events)
465 * and each partition queue separately, which requires setting
466 * up a rebalance callback and keeping track of the assignment:
467 * but that is more complex and typically not recommended. */
468 rd_kafka_poll_set_consumer(kafka->rd_kafka_handle);
469
470 /* Convert the list of topics to a format suitable for librdkafka */
471 int topic_cnt = split_count(kafka->topics);
472 kafka->subscription = rd_kafka_topic_partition_list_new(topic_cnt);
473 for (int i = 0; i < topic_cnt; i++)
474 rd_kafka_topic_partition_list_add(kafka->subscription, kafka->topics[i],
475 /* the partition is ignored
476 * by subscribe() */
477 RD_KAFKA_PARTITION_UA);
478 /* Assign the topic. This method is disabled as it does noat allow dynamic partition assignement
479 int err = rd_kafka_assign(kafka -> rd_kafka_handle, kafka -> subscription);
480 if( err )
481 {
482 n_log( LOG_ERR , "kafka consumer: failed to assign %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
483 n_kafka_delete( kafka );
484 return NULL;
485 } */
486
487 /* Subscribe to the list of topics */
488 int err = rd_kafka_subscribe(kafka->rd_kafka_handle, kafka->subscription);
489 if (err) {
490 n_log(LOG_ERR, "kafka consumer: failed to subscribe to %d topics: %s", kafka->subscription->cnt, rd_kafka_err2str(err));
491 n_kafka_delete(kafka);
492 return NULL;
493 }
494
495 n_log(LOG_DEBUG, "kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->subscription->cnt);
496
497 kafka->mode = RD_KAFKA_CONSUMER;
498 } else {
499 n_log(LOG_ERR, "invalid mode %d", mode);
500 n_kafka_delete(kafka);
501 return NULL;
502 }
503
504 return kafka;
505} /* n_kafka_load_config */
506
513 N_KAFKA_EVENT* event = NULL;
514 Malloc(event, N_KAFKA_EVENT, 1);
515 __n_assert(event, return NULL);
516
517 event->event_string = NULL;
518 event->event_files_to_delete = NULL;
519 event->from_topic = NULL;
520 event->rd_kafka_headers = NULL;
521 event->received_headers = NULL;
522 event->schema_id = schema_id;
523 event->status = N_KAFKA_EVENT_CREATED;
524 event->parent_table = NULL;
525
526 return event;
527} /* n_kafka_new_event */
528
535int n_kafka_new_headers(N_KAFKA_EVENT* event, size_t count) {
536 __n_assert(event, return FALSE);
537
538 if (count == 0)
539 count = 1;
540
541 if (!event->rd_kafka_headers) {
542 event->rd_kafka_headers = rd_kafka_headers_new(count);
543 __n_assert(event->rd_kafka_headers, return FALSE);
544 } else {
545 n_log(LOG_ERR, "event headers already allocated for event %p", event);
546 return FALSE;
547 }
548 return TRUE;
549}
550
560int n_kafka_add_header_ex(N_KAFKA_EVENT* event, char* key, size_t key_length, char* value, size_t value_length) {
561 __n_assert(event, return FALSE);
562 __n_assert(event->rd_kafka_headers, return FALSE);
563 __n_assert(key, return FALSE);
564 __n_assert(value, return FALSE);
565
566 if (key_length < 1 || key_length > SSIZE_MAX) {
567 n_log(LOG_ERR, "Invalid key length (%zu) for header in event %p", key_length, event);
568 return FALSE;
569 }
570
571 if (value_length < 1 || value_length > SSIZE_MAX) {
572 n_log(LOG_ERR, "Invalid value length (%zu) for key '%s' in event %p", value_length, key, event);
573 return FALSE;
574 }
575
576 rd_kafka_resp_err_t err = rd_kafka_header_add(event->rd_kafka_headers, key, (ssize_t)key_length, value, (ssize_t)value_length);
577
578 if (err) {
579 n_log(LOG_ERR, "Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
580 key, key_length, value, value_length, event, rd_kafka_err2str(err));
581 return FALSE;
582 }
583
584 return TRUE;
585}
586
595 __n_assert(event, return FALSE);
596 __n_assert(key, return FALSE);
597 __n_assert(key->data, return FALSE);
598 __n_assert(value, return FALSE);
599 __n_assert(value->data, return FALSE);
600
601 return n_kafka_add_header_ex(event, key->data, key->written, value->data, value->written);
602}
603
611 __n_assert(kafka, return FALSE);
612 __n_assert(event, return FALSE);
613
614 event->parent_table = kafka;
615
616 write_lock(kafka->rwlock);
617 kafka->nb_queued++;
619 unlock(kafka->rwlock);
620
621 // Success to *enqueue* event
622 n_log(LOG_DEBUG, "successfully enqueued event %p in producer %p waitlist, topic: %s", event, kafka->rd_kafka_handle, kafka->topic);
623
624 return TRUE;
625} /* n_kafka_produce */
626
634 rd_kafka_resp_err_t err = 0;
635
636 char* event_string = NULL;
637 size_t event_length = 0;
638
639 event->parent_table = kafka;
640
641 event_string = event->event_string->data;
642 event_length = event->event_string->length;
643
644 if (event->rd_kafka_headers) {
645 rd_kafka_headers_t* hdrs_copy;
646 hdrs_copy = rd_kafka_headers_copy(event->rd_kafka_headers);
647
648 err = rd_kafka_producev(
649 kafka->rd_kafka_handle, RD_KAFKA_V_RKT(kafka->rd_kafka_topic),
650 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
651 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
652 RD_KAFKA_V_VALUE(event_string, event_length),
653 RD_KAFKA_V_HEADERS(hdrs_copy),
654 RD_KAFKA_V_OPAQUE((void*)event),
655 RD_KAFKA_V_END);
656
657 if (err) {
658 rd_kafka_headers_destroy(hdrs_copy);
659 event->status = N_KAFKA_EVENT_ERROR;
660 n_log(LOG_ERR, "failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->rd_kafka_handle, kafka->topic, rd_kafka_err2str(err));
661 return FALSE;
662 }
663 } else {
664 if (rd_kafka_produce(kafka->rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, event_string, event_length, NULL, 0, event) == -1) {
665 int error = errno;
666 event->status = N_KAFKA_EVENT_ERROR;
667 n_log(LOG_ERR, "failed to produce event: %p, producer: %p, topic: %s, error: %s", event, kafka->rd_kafka_handle, kafka->topic, strerror(error));
668 return FALSE;
669 }
670 }
671 // Success to *enqueue* event
672 n_log(LOG_DEBUG, "successfully enqueued event %p in local producer %p : %s", event, kafka->rd_kafka_handle, kafka->topic);
673
674 event->status = N_KAFKA_EVENT_WAITING_ACK;
675
676 return TRUE;
677} /* n_kafka_produce_ex */
678
686N_KAFKA_EVENT* n_kafka_new_event_from_char(char* string, size_t written, int schema_id) {
687 __n_assert(string, return NULL);
688
689 size_t offset = 0;
690 if (schema_id != -1)
691 offset = 5;
692
693 N_KAFKA_EVENT* event = n_kafka_new_event(schema_id);
694 __n_assert(event, return NULL);
695
696 Malloc(event->event_string, N_STR, 1);
697 __n_assert(event->event_string, free(event); return NULL);
698
699 // allocate the size of the event + (the size of the schema id + magic byte) + one ending \0
700 size_t length = written + offset;
701 Malloc(event->event_string->data, char, length);
702 __n_assert(event->event_string, free(event->event_string); free(event); return NULL);
703 event->event_string->length = length;
704
705 // copy incomming body
706 memcpy(event->event_string->data + offset, string, written);
707 event->event_string->written = written + offset;
708
709 if (schema_id != -1)
710 n_kafka_put_schema_in_nstr(event->event_string, schema_id);
711
712 // set status and schema id
713 event->status = N_KAFKA_EVENT_QUEUED;
714
715 return event;
716} /* n_kafka_new_event_from_char */
717
725 __n_assert(string, return NULL);
726 __n_assert(string->data, return NULL);
727
728 N_KAFKA_EVENT* event = n_kafka_new_event_from_char(string->data, string->written, schema_id);
729
730 return event;
731} /* n_kafka_new_event_from_string */
732
739N_KAFKA_EVENT* n_kafka_new_event_from_file(char* filename, int schema_id) {
740 __n_assert(filename, return NULL);
741
742 N_STR* from = file_to_nstr(filename);
743 __n_assert(from, return NULL);
744
745 return n_kafka_new_event_from_string(from, schema_id);
746} /* n_kafka_new_event_from_file */
747
752void n_kafka_event_destroy_ptr(void* event_ptr) {
753 __n_assert(event_ptr, return);
754 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)event_ptr;
755 __n_assert(event, return);
756
757 if (event->event_string)
758 free_nstr(&event->event_string);
759
760 if (event->event_files_to_delete)
761 free_nstr(&event->event_files_to_delete);
762
763 FreeNoLog(event->from_topic);
764
765 if (event->rd_kafka_headers)
766 rd_kafka_headers_destroy(event->rd_kafka_headers);
767
768 if (event->received_headers)
769 list_destroy(&event->received_headers);
770
771 free(event);
772 return;
773} /* n_kafka_event_destroy_ptr */
774
781 __n_assert(event && (*event), return FALSE);
783 (*event) = NULL;
784 return TRUE;
785} /* n_kafka_event_destroy */
786
793 __n_assert(kafka, return FALSE);
794 int ret = TRUE;
795
796 // wait poll interval msecs for kafka response
797 if (kafka->mode == RD_KAFKA_PRODUCER) {
798 int nb_events = rd_kafka_poll(kafka->rd_kafka_handle, kafka->poll_interval);
799 (void)nb_events;
800 write_lock(kafka->rwlock);
801 // check events status in event table
802 LIST_NODE* node = kafka->events_to_send->start;
803 while (node) {
804 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)node->ptr;
805 if (event->status == N_KAFKA_EVENT_OK) {
806 kafka->nb_waiting--;
807 n_log(LOG_DEBUG, "removing event OK %p", event);
808 LIST_NODE* node_to_kill = node;
809 node = node->next;
810 N_KAFKA_EVENT* event_to_kill = remove_list_node(kafka->events_to_send, node_to_kill, N_KAFKA_EVENT);
811 n_kafka_event_destroy(&event_to_kill);
812 continue;
813 } else if (event->status == N_KAFKA_EVENT_QUEUED) {
814 if (n_kafka_produce_ex(kafka, event) == FALSE) {
815 kafka->nb_error++;
816 } else {
817 kafka->nb_waiting++;
818 kafka->nb_queued--;
819 }
820 } else if (event->status == N_KAFKA_EVENT_ERROR) {
821 kafka->nb_error++;
822 // TODO :
823 // Re try sending errored events after error_timeout is done
824 //
825 }
826 node = node->next;
827 }
828 unlock(kafka->rwlock);
829 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
830 rd_kafka_message_t* rkm = NULL;
831 while ((rkm = rd_kafka_consumer_poll(kafka->rd_kafka_handle, kafka->poll_interval))) {
832 if (rkm->err) {
833 /* Consumer errors are generally to be considered
834 * informational as the consumer will automatically
835 * try to recover from all types of errors. */
836 n_log(LOG_ERR, "consumer: %s", rd_kafka_message_errstr(rkm));
837 continue;
838 }
839
840 /* Reminder of rkm contents */
841 // Proper message.
842 n_log(LOG_DEBUG, "Message on %s [%" PRId32 "] at offset %" PRId64 " (leader epoch %" PRId32 ")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
843
844 // Print the message key
845 if (rkm->key && rkm->key_len > 0)
846 n_log(LOG_DEBUG, "Key: %.*s", (int)rkm->key_len, (const char*)rkm->key);
847 else if (rkm->key)
848 n_log(LOG_DEBUG, "Key: (%d bytes)", (int)rkm->key_len);
849
850 if (rkm->payload && rkm->len > 0) {
851 write_lock(kafka->rwlock);
852 // make a copy of the event for further processing
853 N_KAFKA_EVENT* event = NULL;
854 event = n_kafka_new_event_from_char(rkm->payload, rkm->len, -1); // no schema id because we want a full raw copy here
855 event->parent_table = kafka;
856 // test if there are headers, save them
857 rd_kafka_headers_t* hdrs = NULL;
858 if (!rd_kafka_message_headers(rkm, &hdrs)) {
859 size_t idx = 0;
860 const char* name = NULL;
861 const void* val = NULL;
862 size_t size = 0;
863
864 event->received_headers = new_generic_list(MAX_LIST_ITEMS);
865 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
866 N_STR* header_entry = NULL;
867 nstrprintf(header_entry, "%s=%s", _str(name), _str((char*)val));
868 list_push(event->received_headers, header_entry, &free_nstr_ptr);
869 idx++;
870 }
871 }
872 // save originating topic (can help sorting if there are multiples one)
873 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
874 if (kafka->schema_id != -1)
875 event->schema_id = n_kafka_get_schema_from_nstr(event->event_string);
877 n_log(LOG_DEBUG, "Consumer received event of (%d bytes) from topic %s", (int)rkm->len, event->from_topic);
878 unlock(kafka->rwlock);
879 }
880 rd_kafka_message_destroy(rkm);
881 }
882 }
883 if (kafka->nb_error > 0) {
884 ret = FALSE;
885 }
886 // n_log( LOG_DEBUG , "kafka poll for handle %p returned %d elements" , kafka -> rd_kafka_handle , nb_events );
887 return ret;
888} /* n_kafka_poll */
889
895void* n_kafka_pooling_thread(void* ptr) {
896 N_KAFKA* kafka = (N_KAFKA*)ptr;
897
898 int status = 1;
899
900 N_TIME chrono;
901
902 if (kafka->mode == RD_KAFKA_PRODUCER)
903 n_log(LOG_DEBUG, "starting pooling thread for kafka handler %p mode PRODUCER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_PRODUCER, kafka->topic);
904 if (kafka->mode == RD_KAFKA_CONSUMER) {
905 char* topiclist = join(kafka->topics, ",");
906 n_log(LOG_DEBUG, "starting pooling thread for kafka handler %p mode CONSUMER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_CONSUMER, _str(topiclist));
907 FreeNoLog(topiclist);
908 }
909
910 start_HiTimer(&chrono);
911
912 int64_t remaining_time = kafka->poll_timeout * 1000;
913 while (status == 1) {
914 if (n_kafka_poll(kafka) == FALSE) {
915 if (kafka->mode == RD_KAFKA_PRODUCER) {
916 n_log(LOG_ERR, "failed to poll kafka producer handle %p with topic %s", kafka->rd_kafka_handle, rd_kafka_topic_name(kafka->rd_kafka_topic));
917 } else if (kafka->topics) {
918 char* topiclist = join(kafka->topics, ",");
919 n_log(LOG_ERR, "failed to poll kafka consumer handle %p with topic %s", kafka->rd_kafka_handle, _str(topiclist));
920 FreeNoLog(topiclist);
921 }
922 }
923
924 read_lock(kafka->rwlock);
925 status = kafka->pooling_thread_status;
926 unlock(kafka->rwlock);
927
928 if (status == 2)
929 break;
930
931 int64_t elapsed_time = get_usec(&chrono);
932 if (kafka->poll_timeout != -1) {
933 remaining_time -= elapsed_time;
934 if (remaining_time < 0) {
935 if (kafka->mode == RD_KAFKA_PRODUCER) {
936 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
937 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
938 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
939 }
940 remaining_time = 0;
941 status = 2;
942 break;
943 }
944 }
945 // n_log( LOG_DEBUG , "remaining time: %d on kafka handle %p" , remaining_time , kafka -> rd_kafka_handle );
946 }
947
948 write_lock(kafka->rwlock);
949 kafka->pooling_thread_status = 0;
950 unlock(kafka->rwlock);
951
952 n_log(LOG_DEBUG, "exiting pooling thread for kafka handler %p mode %s", kafka->rd_kafka_handle, (kafka->mode == RD_KAFKA_PRODUCER) ? "PRODUCER" : "CONSUMER");
953 pthread_exit(NULL);
954 return NULL;
955} /* n_kafka_pooling_thread */
956
963 __n_assert(kafka, return FALSE);
964
965 read_lock(kafka->rwlock);
966 int status = kafka->pooling_thread_status;
967 unlock(kafka->rwlock);
968
969 if (status != 0) {
970 n_log(LOG_ERR, "kafka pooling thread already started for handle %p", kafka);
971 return FALSE;
972 }
973
974 write_lock(kafka->rwlock);
975 kafka->pooling_thread_status = 1;
976
977 if (pthread_create(&kafka->pooling_thread, NULL, n_kafka_pooling_thread, (void*)kafka) != 0) {
978 n_log(LOG_ERR, "unable to create pooling_thread for kafka handle %p", kafka);
979 unlock(kafka->rwlock);
980 return FALSE;
981 }
982 unlock(kafka->rwlock);
983
984 n_log(LOG_DEBUG, "pthread_create sucess for kafka handle %p->%p", kafka, kafka->rd_kafka_handle);
985
986 return TRUE;
987} /* n_kafka_start_pooling_thread */
988
995 __n_assert(kafka, return FALSE);
996
997 read_lock(kafka->rwlock);
998 int pooling_thread_status = kafka->pooling_thread_status;
999 unlock(kafka->rwlock);
1000
1001 if (pooling_thread_status == 0) {
1002 n_log(LOG_DEBUG, "kafka pooling thread already stopped for handle %p", kafka);
1003 return FALSE;
1004 }
1005 if (pooling_thread_status == 2) {
1006 n_log(LOG_DEBUG, "kafka pooling ask for stop thread already done for handle %p", kafka);
1007 return FALSE;
1008 }
1009
1010 write_lock(kafka->rwlock);
1011 kafka->pooling_thread_status = 2;
1012 unlock(kafka->rwlock);
1013
1014 pthread_join(kafka->pooling_thread, NULL);
1015
1016 return TRUE;
1017} /* n_kafka_stop_pooling_thread */
1018
1025int n_kafka_dump_unprocessed(N_KAFKA* kafka, char* directory) {
1026 __n_assert(kafka, return FALSE);
1027 __n_assert(directory, return FALSE);
1028
1029 int status = 0;
1030 size_t nb_todump = 0;
1031 read_lock(kafka->rwlock);
1032 status = kafka->pooling_thread_status;
1033 nb_todump = kafka->nb_queued + kafka->nb_waiting + kafka->nb_error;
1034 if (status != 0) {
1035 n_log(LOG_ERR, "kafka handle %p thread pooling func is still running, aborting dump", kafka);
1036 unlock(kafka->rwlock);
1037 return FALSE;
1038 }
1039 if (nb_todump == 0) {
1040 n_log(LOG_DEBUG, "kafka handle %p: nothing to dump, all events processed correctly", kafka);
1041 unlock(kafka->rwlock);
1042 return TRUE;
1043 }
1044
1045 N_STR* dumpstr = new_nstr(0);
1046 list_foreach(node, kafka->events_to_send) {
1047 N_KAFKA_EVENT* event = node->ptr;
1048 if (event->status != N_KAFKA_EVENT_OK) {
1049 size_t offset = 0;
1050 if (event->schema_id != -1)
1051 offset = 5;
1052
1053 N_STR* filename = NULL;
1054 nstrprintf(filename, "%s/%s+%p", directory, kafka->topic, event);
1055 n_log(LOG_DEBUG, "Dumping unprocessed events to %s", _nstr(filename));
1056 // dump event here
1057 dumpstr->data = event->event_string->data + offset;
1058 dumpstr->written = event->event_string->written - offset;
1059 dumpstr->length = event->event_string->length;
1060 nstr_to_file(dumpstr, _nstr(filename));
1061 free_nstr(&filename);
1062 }
1063 }
1064 unlock(kafka->rwlock);
1065 dumpstr->data = NULL;
1066 free_nstr(&dumpstr);
1067 return TRUE;
1068} /* n_kafka_dump_unprocessed */
1069
1076int n_kafka_load_unprocessed(N_KAFKA* kafka, char* directory) {
1077 __n_assert(kafka, return FALSE);
1078 __n_assert(directory, return FALSE);
1079
1080 write_lock(kafka->rwlock);
1081 // TODO:load events from filename
1082 // TODO: use base64decode( filename ) split( result , '+' ) to get brokersname+topic to check against what's saved with the event and what's inside kafka's handle conf
1083 unlock(kafka->rwlock);
1084
1085 return TRUE;
1086} /* n_kafka_load_unprocessed */
1087
1094 __n_assert(kafka, return NULL);
1095
1096 N_KAFKA_EVENT* event = NULL;
1097
1098 write_lock(kafka->rwlock);
1099 if (kafka->received_events->start)
1101 unlock(kafka->rwlock);
1102
1103 return event;
1104} /* n_kafka_get_event */
char * event_string
Definition ex_kafka.c:28
char * config_file
Definition ex_kafka.c:27
int mode
Network for managing conenctions.
Definition ex_network.c:22
char * key
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
Definition n_common.h:329
#define FreeNoLog(__ptr)
Free Handler without log.
Definition n_common.h:249
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:185
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:256
#define _str(__PTR)
define true
Definition n_common.h:174
int get_computer_name(char *computer_name, size_t len)
abort program with a text
Definition n_common.c:56
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
Definition n_common.h:388
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
Definition n_common.h:375
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
Definition n_common.h:182
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
Definition n_common.h:361
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:240
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
Definition n_common.h:347
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:180
void * ptr
void pointer to store
Definition n_list.h:27
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:47
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:33
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:200
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition n_list.h:66
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:77
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:519
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
Definition n_list.c:19
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:56
Structure of a generic list node.
Definition n_list.h:25
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:70
#define LOG_DEBUG
debug-level messages
Definition n_log.h:65
#define LOG_ERR
error conditions
Definition n_log.h:57
#define LOG_INFO
informational
Definition n_log.h:63
int32_t poll_interval
poll interval in usecs
Definition n_kafka.h:102
rd_kafka_topic_partition_list_t * subscription
Definition n_kafka.h:78
cJSON * configuration
kafka json configuration holder
Definition n_kafka.h:98
pthread_t pooling_thread
pooling thread id
Definition n_kafka.h:106
size_t nb_waiting
Definition n_kafka.h:110
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition n_kafka.h:96
char ** topics
Definition n_kafka.h:76
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition n_kafka.h:88
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition n_kafka.h:58
pthread_rwlock_t rwlock
access lock
Definition n_kafka.h:94
N_STR * errstr
kafka error string holder
Definition n_kafka.h:90
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition n_kafka.h:104
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition n_kafka.h:80
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:72
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition n_kafka.h:70
int32_t poll_timeout
poll timeout in usecs
Definition n_kafka.h:100
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition n_kafka.h:82
char * groupid
consumer group id
Definition n_kafka.h:74
int schema_id
kafka schema id in network order
Definition n_kafka.h:92
size_t nb_queued
Definition n_kafka.h:108
size_t nb_error
Definition n_kafka.h:112
char * bootstrap_servers
kafka bootstrap servers string
Definition n_kafka.h:86
char * topic
kafka topic string
Definition n_kafka.h:84
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition n_kafka.c:560
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1025
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1093
#define N_KAFKA_EVENT_OK
state of an OK event
Definition n_kafka.h:43
#define N_KAFKA_EVENT_ERROR
state of an errored event
Definition n_kafka.h:41
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition n_kafka.c:535
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition n_kafka.c:18
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition n_kafka.c:58
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:263
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:73
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
Definition n_kafka.h:39
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition n_kafka.c:45
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition n_kafka.c:792
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:780
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
Definition n_kafka.c:752
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
Definition n_kafka.c:1076
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition n_kafka.c:594
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
Definition n_kafka.c:994
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:610
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:686
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition n_kafka.c:32
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition n_kafka.c:962
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition n_kafka.c:512
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
Definition n_kafka.h:45
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:739
N_KAFKA * n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
Definition n_kafka.c:215
#define N_KAFKA_EVENT_QUEUED
state of a queued event
Definition n_kafka.h:37
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:160
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:724
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:68
structure of a KAFKA message
Definition n_kafka.h:48
size_t written
size of the written data inside the string
Definition n_str.h:45
char * data
the string
Definition n_str.h:41
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
Definition n_str.h:43
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
Definition n_str.c:49
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
Definition n_str.h:176
int split_count(char **split_result)
Count split elements.
Definition n_str.c:954
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
Definition n_str.c:386
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
Definition n_str.c:180
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
Definition n_str.h:94
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
Definition n_str.c:874
char * join(char **splitresult, char *delim)
join the array into a string
Definition n_str.c:990
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
Definition n_str.c:260
int free_split_result(char ***tab)
Free a split result allocated array.
Definition n_str.c:970
A box including a string and his lenght.
Definition n_str.h:39
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
Definition n_time.c:67
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Definition n_time.c:89
Timing Structure.
Definition n_time.h:32
Base64 encoding and decoding functions using N_STR.
Common headers and low-level functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
Definition n_kafka.c:895
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
Definition n_kafka.c:95
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Definition n_kafka.c:633
Kafka generic produce and consume event header.