Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_kafka.c
Go to the documentation of this file.
1
8#include "nilorea/n_kafka.h"
9#include "nilorea/n_common.h"
10#include "nilorea/n_base64.h"
11
17int32_t n_kafka_get_schema_from_char(char* string) {
18 __n_assert(string, return -1);
19
20 uint32_t raw_schema_id = 0;
21 memcpy(&raw_schema_id, string + 1, sizeof(uint32_t));
22
23 return (int32_t)ntohl(raw_schema_id);
24}
25
32 __n_assert(string, return -1);
33 __n_assert(string->data, return -1);
34 __n_assert((string->written >= sizeof(int32_t)), return -1);
35 return n_kafka_get_schema_from_char(string->data);
36}
37
44int n_kafka_put_schema_in_char(char* string, int schema_id) {
45 __n_assert(string, return FALSE);
46 uint32_t schema_id_htonl = htonl((uint32_t)schema_id); // cast to unsigned to avoid warning
47 memcpy(string + 1, &schema_id_htonl, sizeof(uint32_t));
48 return TRUE;
49}
50
57int n_kafka_put_schema_in_nstr(N_STR* string, int schema_id) {
58 __n_assert(string, return FALSE);
59 __n_assert(string->data, return FALSE);
60 __n_assert((string->written >= sizeof(int32_t)), return FALSE);
61 return n_kafka_put_schema_in_char(string->data, schema_id);
62}
63
72int n_kafka_get_status(N_KAFKA* kafka, size_t* nb_queued, size_t* nb_waiting, size_t* nb_error) {
73 __n_assert(kafka, return FALSE);
74 __n_assert(nb_waiting, return FALSE);
75 __n_assert(nb_error, return FALSE);
76 read_lock(kafka->rwlock);
77 int status = kafka->pooling_thread_status;
78 *nb_queued = kafka->nb_queued;
79 *nb_waiting = kafka->nb_waiting;
80 *nb_error = kafka->nb_error;
81 unlock(kafka->rwlock);
82 return status;
83}
84
94static void n_kafka_delivery_message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
95 (void)opaque;
96
97 __n_assert(rk, n_log(LOG_ERR, "rk=NULL is not a valid kafka handle"); return);
98 __n_assert(rkmessage, n_log(LOG_ERR, "rkmessage=NULL is not a valid kafka message"); return);
99
100 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)rkmessage->_private;
101
102 if (rkmessage->err) {
103 n_log(LOG_ERR, "message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
104 if (!event) {
105 n_log(LOG_ERR, "fatal: event is NULL");
106 return;
107 }
108 if (event && event->parent_table)
109 write_lock(event->parent_table->rwlock);
110 event->status = N_KAFKA_EVENT_ERROR;
111 if (event && event->parent_table)
112 unlock(event->parent_table->rwlock);
113 } else {
114 n_log(LOG_DEBUG, "message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
115 if (event) {
116 // lock
117 if (event && event->parent_table)
118 write_lock(event->parent_table->rwlock);
119 // delete produce event linked files
120 if (event->event_files_to_delete) {
121 char** files_to_delete = split(_nstr(event->event_files_to_delete), ";", 0);
122 if (files_to_delete) {
123 if (split_count(files_to_delete) > 0) {
124 int index = 0;
125 while (files_to_delete[index]) {
126 int ret = unlink(files_to_delete[index]);
127 int error = errno;
128 if (ret == 0) {
129 n_log(LOG_DEBUG, "deleted on produce ack: %s", files_to_delete[index]);
130 } else {
131 n_log(LOG_ERR, "couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
132 }
133 index++;
134 }
135 } else {
136 n_log(LOG_ERR, "split result is empty !");
137 }
138 }
139 free_split_result(&files_to_delete);
140 }
141 // set status
142 event->status = N_KAFKA_EVENT_OK;
143 // unlock
144 if (event && event->parent_table)
145 unlock(event->parent_table->rwlock);
146 n_log(LOG_INFO, "kafka event %p received an ack !", event);
147 } else {
148 n_log(LOG_ERR, "fatal: event is NULL");
149 }
150 }
151 return;
152 /* The rkmessage is destroyed automatically by librdkafka */
153}
154
160 __n_assert(kafka, return);
161
162 FreeNoLog(kafka->topic);
163 FreeNoLog(kafka->topics);
164 FreeNoLog(kafka->groupid);
165
167
168 if (kafka->rd_kafka_handle) {
169 if (kafka->mode == RD_KAFKA_CONSUMER) {
170 rd_kafka_consumer_close(kafka->rd_kafka_handle);
171 rd_kafka_topic_partition_list_destroy(kafka->subscription);
172 }
173 if (kafka->mode == RD_KAFKA_PRODUCER) {
174 rd_kafka_flush(kafka->rd_kafka_handle, kafka->poll_timeout);
175 }
176 rd_kafka_destroy(kafka->rd_kafka_handle);
177 n_log(LOG_DEBUG, "kafka handle destroyed");
178 }
179
180 if (kafka->rd_kafka_conf) {
181 rd_kafka_conf_destroy(kafka->rd_kafka_conf);
182 }
183
184 if (kafka->configuration)
185 cJSON_Delete(kafka->configuration);
186
187 if (kafka->errstr)
188 free_nstr(&kafka->errstr);
189
190 if (kafka->events_to_send)
192
193 if (kafka->received_events)
195
196 rw_lock_destroy(kafka->rwlock);
197
198 if (kafka->rd_kafka_topic)
199 rd_kafka_topic_destroy(kafka->rd_kafka_topic);
200
201 Free(kafka->bootstrap_servers);
202
203 Free(kafka);
204 return;
205}
206
214N_KAFKA* n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len) {
215 N_KAFKA* kafka = NULL;
216 Malloc(kafka, N_KAFKA, 1);
217 __n_assert(kafka, return NULL);
218
219 kafka->errstr = new_nstr(errstr_len);
220 __n_assert(kafka->errstr, Free(kafka); return NULL);
221
222 kafka->events_to_send = NULL;
223 kafka->received_events = NULL;
224 kafka->rd_kafka_conf = NULL;
225 kafka->rd_kafka_handle = NULL;
226 kafka->configuration = NULL;
227 kafka->groupid = NULL;
228 kafka->topics = NULL;
229 kafka->subscription = NULL;
230 kafka->topic = NULL;
231 kafka->mode = -1;
232 kafka->schema_id = -1;
233 kafka->poll_timeout = poll_timeout;
234 kafka->poll_interval = poll_interval;
235 kafka->nb_queued = 0;
236 kafka->nb_waiting = 0;
237 kafka->nb_error = 0;
238 kafka->pooling_thread_status = 0;
239 kafka->bootstrap_servers = NULL;
240
241 kafka->events_to_send = new_generic_list(MAX_LIST_ITEMS);
242 __n_assert(kafka->events_to_send, n_kafka_delete(kafka); return NULL);
243
244 kafka->received_events = new_generic_list(MAX_LIST_ITEMS);
245 __n_assert(kafka->events_to_send, n_kafka_delete(kafka); return NULL);
246
247 if (init_lock(kafka->rwlock) != 0) {
248 n_log(LOG_ERR, "could not init kafka rwlock in kafka structure at address %p", kafka);
249 n_kafka_delete(kafka);
250 return NULL;
251 }
252
253 return kafka;
254}
255
262N_KAFKA* n_kafka_load_config(char* config_file, int mode) {
263 __n_assert(config_file, return NULL);
264
265 N_KAFKA* kafka = NULL;
266
267 kafka = n_kafka_new(100, -1, 1024);
268 __n_assert(kafka, return NULL);
269
270 // initialize kafka object
271 kafka->rd_kafka_conf = rd_kafka_conf_new();
272
273 // load config file
274 N_STR* config_string = NULL;
275 config_string = file_to_nstr(config_file);
276 if (!config_string) {
277 n_log(LOG_ERR, "unable to read config from file %s !", config_file);
278 n_kafka_delete(kafka);
279 return NULL;
280 }
281 cJSON* json = NULL;
282 json = cJSON_Parse(_nstrp(config_string));
283 if (!json) {
284 n_log(LOG_ERR, "unable to parse json %s", config_string);
285 free_nstr(&config_string);
286 n_kafka_delete(kafka);
287 return NULL;
288 }
289
290 int jsonIndex;
291 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
292 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
293
294 if (!entry) continue;
295 __n_assert(entry->string, continue);
296 if (!entry->valuestring) {
297 n_log(LOG_DEBUG, "no valuestring for entry %s", _str(entry->string));
298 continue;
299 }
300
301 if (entry->string[0] != '-') {
302 // if it's not one of the optionnal parameters not managed by kafka, then we can use rd_kafka_conf_set on them
303 if (strcmp("topic", entry->string) != 0 && strcmp("topics", entry->string) != 0 && strcmp("value.schema.id", entry->string) != 0 && strcmp("value.schema.type", entry->string) != 0 && strcmp("poll.interval", entry->string) != 0 && strcmp("poll.timeout", entry->string) != 0 && strcmp("group.id.autogen", entry->string) != 0) {
304 if (!strcmp("group.id", entry->string)) {
305 // exclude group id for producer
306 if (mode == RD_KAFKA_PRODUCER)
307 continue;
308 kafka->groupid = strdup(entry->valuestring);
309 }
310
311 if (rd_kafka_conf_set(kafka->rd_kafka_conf, entry->string, entry->valuestring, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
312 n_log(LOG_ERR, "kafka config: %s", _nstr(kafka->errstr));
313 } else {
314 n_log(LOG_DEBUG, "kafka config enabled: %s => %s", entry->string, entry->valuestring);
315 }
316 }
317 } else {
318 n_log(LOG_DEBUG, "kafka disabled config: %s => %s", entry->string, entry->valuestring);
319 }
320 }
321
322 // other parameters, not directly managed by kafka API (will cause an error if used along rd_kafka_conf_set )
323 // producer topic
324 cJSON* jstr = NULL;
325 jstr = cJSON_GetObjectItem(json, "topic");
326 if (jstr && jstr->valuestring) {
327 kafka->topic = strdup(jstr->valuestring);
328 n_log(LOG_DEBUG, "kafka producer topic: %s", kafka->topic);
329 } else {
330 if (mode == RD_KAFKA_PRODUCER) {
331 n_log(LOG_ERR, "no topic configured !");
332 n_kafka_delete(kafka);
333 return NULL;
334 }
335 }
336 // consumer topics
337 jstr = cJSON_GetObjectItem(json, "topics");
338 if (jstr && jstr->valuestring) {
339 kafka->topics = split(jstr->valuestring, ",", 0);
340 n_log(LOG_DEBUG, "kafka consumer topics: %s", jstr->valuestring);
341 } else {
342 if (mode == RD_KAFKA_CONSUMER) {
343 n_log(LOG_ERR, "no topics configured !");
344 n_kafka_delete(kafka);
345 return NULL;
346 }
347 }
348 jstr = cJSON_GetObjectItem(json, "value.schema.id");
349 if (jstr && jstr->valuestring) {
350 int schem_v = atoi(jstr->valuestring);
351 if (schem_v < -1 || schem_v > 9999) {
352 n_log(LOG_ERR, "invalid schema id %d", schem_v);
353 n_kafka_delete(kafka);
354 return NULL;
355 }
356 n_log(LOG_DEBUG, "kafka schema id: %d", schem_v);
357 kafka->schema_id = schem_v;
358 }
359
360 jstr = cJSON_GetObjectItem(json, "poll.interval");
361 if (jstr && jstr->valuestring) {
362 kafka->poll_interval = atoi(jstr->valuestring);
363 n_log(LOG_DEBUG, "kafka poll interval: %d", kafka->poll_interval);
364 }
365
366 jstr = cJSON_GetObjectItem(json, "poll.timeout");
367 if (jstr && jstr->valuestring) {
368 kafka->poll_timeout = atoi(jstr->valuestring);
369 n_log(LOG_DEBUG, "kafka poll timeout: %d", kafka->poll_timeout);
370 }
371
372 // saving bootstrap servers in struct
373 jstr = cJSON_GetObjectItem(json, "bootstrap.servers");
374 if (jstr && jstr->valuestring) {
375 kafka->bootstrap_servers = strdup(jstr->valuestring);
376 n_log(LOG_DEBUG, "kafka bootstrap server: %s", kafka->bootstrap_servers);
377 }
378
379 if (mode == RD_KAFKA_PRODUCER) {
380 // set delivery callback
381 rd_kafka_conf_set_dr_msg_cb(kafka->rd_kafka_conf, n_kafka_delivery_message_callback);
382
383 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
384 if (!kafka->rd_kafka_handle) {
385 n_log(LOG_ERR, "failed to create new producer: %s", _nstr(kafka->errstr));
386 n_kafka_delete(kafka);
387 return NULL;
388 }
389 // conf is now owned by kafka handle
390 kafka->rd_kafka_conf = NULL;
391 // Create topic object
392 kafka->rd_kafka_topic = rd_kafka_topic_new(kafka->rd_kafka_handle, kafka->topic, NULL);
393 if (!kafka->rd_kafka_topic) {
394 n_kafka_delete(kafka);
395 return NULL;
396 }
397 kafka->mode = RD_KAFKA_PRODUCER;
398 } else if (mode == RD_KAFKA_CONSUMER) {
399 /* If there is no previously committed offset for a partition
400 * the auto.offset.reset strategy will be used to decide where
401 * in the partition to start fetching messages.
402 * By setting this to earliest the consumer will read all messages
403 * in the partition if there was no previously committed offset. */
404 /*if (rd_kafka_conf_set(kafka -> rd_kafka_conf, "auto.offset.reset", "earliest", _nstr( kafka -> errstr ) , kafka -> errstr -> length) != RD_KAFKA_CONF_OK) {
405 n_log( LOG_ERR , "kafka conf set: %s", kafka -> errstr);
406 n_kafka_delete( kafka );
407 return NULL ;
408 }*/
409
410 // if groupid is not set, generate a unique one
411 if (!kafka->groupid) {
412 char computer_name[1024] = "";
413 get_computer_name(computer_name, 1024);
414 N_STR* groupid = new_nstr(1024);
415 char* topics = join(kafka->topics, "_");
416 // generating group id
417 jstr = cJSON_GetObjectItem(json, "group.id.autogen");
418 if (jstr && jstr->valuestring) {
419 if (strcmp(jstr->valuestring, "host-topic-group") == 0) {
420 // nstrprintf( groupid , "%s_%s_%s" , computer_name , topics ,kafka -> bootstrap_servers );
421 nstrprintf(groupid, "%s_%s", computer_name, topics);
422 } else if (strcmp(jstr->valuestring, "unique-group") == 0) {
423 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
424 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
425 }
426 } else // default unique group
427 {
428 // nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
429 nstrprintf(groupid, "%s_%s_%d", computer_name, topics, getpid());
430 n_log(LOG_DEBUG, "group.id is not set and group.id.autogen is not set, generated unique group id: %s", _nstr(groupid));
431 }
432 free(topics);
433 kafka->groupid = groupid->data;
434 groupid->data = NULL;
435 free_nstr(&groupid);
436 }
437 if (rd_kafka_conf_set(kafka->rd_kafka_conf, "group.id", kafka->groupid, _nstr(kafka->errstr), kafka->errstr->length) != RD_KAFKA_CONF_OK) {
438 n_log(LOG_ERR, "kafka consumer group.id error: %s", _nstr(kafka->errstr));
439 n_kafka_delete(kafka);
440 return NULL;
441 } else {
442 n_log(LOG_DEBUG, "kafka consumer group.id => %s", kafka->groupid);
443 }
444
445 /* Create consumer instance.
446 * NOTE: rd_kafka_new() takes ownership of the conf object
447 * and the application must not reference it again after
448 * this call.
449 */
450 kafka->rd_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, kafka->rd_kafka_conf, _nstr(kafka->errstr), kafka->errstr->length);
451 if (!kafka->rd_kafka_handle) {
452 n_log(LOG_ERR, "%% Failed to create new consumer: %s", kafka->errstr);
453 n_kafka_delete(kafka);
454 return NULL;
455 }
456 // conf is now owned by kafka handle
457 kafka->rd_kafka_conf = NULL;
458
459 /* Redirect all messages from per-partition queues to
460 * the main queue so that messages can be consumed with one
461 * call from all assigned partitions.
462 *
463 * The alternative is to poll the main queue (for events)
464 * and each partition queue separately, which requires setting
465 * up a rebalance callback and keeping track of the assignment:
466 * but that is more complex and typically not recommended. */
467 rd_kafka_poll_set_consumer(kafka->rd_kafka_handle);
468
469 /* Convert the list of topics to a format suitable for librdkafka */
470 int topic_cnt = split_count(kafka->topics);
471 kafka->subscription = rd_kafka_topic_partition_list_new(topic_cnt);
472 for (int i = 0; i < topic_cnt; i++)
473 rd_kafka_topic_partition_list_add(kafka->subscription, kafka->topics[i],
474 /* the partition is ignored
475 * by subscribe() */
476 RD_KAFKA_PARTITION_UA);
477 /* Assign the topic. This method is disabled as it does noat allow dynamic partition assignement
478 int err = rd_kafka_assign(kafka -> rd_kafka_handle, kafka -> subscription);
479 if( err )
480 {
481 n_log( LOG_ERR , "kafka consumer: failed to assign %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
482 n_kafka_delete( kafka );
483 return NULL;
484 } */
485
486 /* Subscribe to the list of topics */
487 int err = rd_kafka_subscribe(kafka->rd_kafka_handle, kafka->subscription);
488 if (err) {
489 n_log(LOG_ERR, "kafka consumer: failed to subscribe to %d topics: %s", kafka->subscription->cnt, rd_kafka_err2str(err));
490 n_kafka_delete(kafka);
491 return NULL;
492 }
493
494 n_log(LOG_DEBUG, "kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->subscription->cnt);
495
496 kafka->mode = RD_KAFKA_CONSUMER;
497 } else {
498 n_log(LOG_ERR, "invalid mode %d", mode);
499 n_kafka_delete(kafka);
500 return NULL;
501 }
502
503 return kafka;
504} /* n_kafka_load_config */
505
512 N_KAFKA_EVENT* event = NULL;
513 Malloc(event, N_KAFKA_EVENT, 1);
514 __n_assert(event, return NULL);
515
516 event->event_string = NULL;
517 event->event_files_to_delete = NULL;
518 event->from_topic = NULL;
519 event->rd_kafka_headers = NULL;
520 event->received_headers = NULL;
521 event->schema_id = schema_id;
522 event->status = N_KAFKA_EVENT_CREATED;
523 event->parent_table = NULL;
524
525 return event;
526} /* n_kafka_new_event */
527
534int n_kafka_new_headers(N_KAFKA_EVENT* event, size_t count) {
535 __n_assert(event, return FALSE);
536
537 if (count == 0)
538 count = 1;
539
540 if (!event->rd_kafka_headers) {
541 event->rd_kafka_headers = rd_kafka_headers_new(count);
542 __n_assert(event->rd_kafka_headers, return FALSE);
543 } else {
544 n_log(LOG_ERR, "event headers already allocated for event %p", event);
545 return FALSE;
546 }
547 return TRUE;
548}
549
559int n_kafka_add_header_ex(N_KAFKA_EVENT* event, char* key, size_t key_length, char* value, size_t value_length) {
560 __n_assert(event, return FALSE);
561 __n_assert(event->rd_kafka_headers, return FALSE);
562 __n_assert(key, return FALSE);
563 __n_assert(value, return FALSE);
564
565 if (key_length < 1 || key_length > SSIZE_MAX) {
566 n_log(LOG_ERR, "Invalid key length (%zu) for header in event %p", key_length, event);
567 return FALSE;
568 }
569
570 if (value_length < 1 || value_length > SSIZE_MAX) {
571 n_log(LOG_ERR, "Invalid value length (%zu) for key '%s' in event %p", value_length, key, event);
572 return FALSE;
573 }
574
575 rd_kafka_resp_err_t err = rd_kafka_header_add(event->rd_kafka_headers, key, (ssize_t)key_length, value, (ssize_t)value_length);
576
577 if (err) {
578 n_log(LOG_ERR, "Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
579 key, key_length, value, value_length, event, rd_kafka_err2str(err));
580 return FALSE;
581 }
582
583 return TRUE;
584}
585
593int n_kafka_add_header(N_KAFKA_EVENT* event, N_STR* key, N_STR* value) {
594 __n_assert(event, return FALSE);
595 __n_assert(key, return FALSE);
596 __n_assert(key->data, return FALSE);
597 __n_assert(value, return FALSE);
598 __n_assert(value->data, return FALSE);
599
600 return n_kafka_add_header_ex(event, key->data, key->written, value->data, value->written);
601}
602
610 __n_assert(kafka, return FALSE);
611 __n_assert(event, return FALSE);
612
613 event->parent_table = kafka;
614
615 write_lock(kafka->rwlock);
616 kafka->nb_queued++;
618 unlock(kafka->rwlock);
619
620 // Success to *enqueue* event
621 n_log(LOG_DEBUG, "successfully enqueued event %p in producer %p waitlist, topic: %s", event, kafka->rd_kafka_handle, kafka->topic);
622
623 return TRUE;
624} /* n_kafka_produce */
625
633 rd_kafka_resp_err_t err = 0;
634
635 char* event_string = NULL;
636 size_t event_length = 0;
637
638 event->parent_table = kafka;
639
640 event_string = event->event_string->data;
641 event_length = event->event_string->length;
642
643 if (event->rd_kafka_headers) {
644 rd_kafka_headers_t* hdrs_copy;
645 hdrs_copy = rd_kafka_headers_copy(event->rd_kafka_headers);
646
647 err = rd_kafka_producev(
648 kafka->rd_kafka_handle, RD_KAFKA_V_RKT(kafka->rd_kafka_topic),
649 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
650 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
651 RD_KAFKA_V_VALUE(event_string, event_length),
652 RD_KAFKA_V_HEADERS(hdrs_copy),
653 RD_KAFKA_V_OPAQUE((void*)event),
654 RD_KAFKA_V_END);
655
656 if (err) {
657 rd_kafka_headers_destroy(hdrs_copy);
658 event->status = N_KAFKA_EVENT_ERROR;
659 n_log(LOG_ERR, "failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->rd_kafka_handle, kafka->topic, rd_kafka_err2str(err));
660 return FALSE;
661 }
662 } else {
663 if (rd_kafka_produce(kafka->rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, event_string, event_length, NULL, 0, event) == -1) {
664 int error = errno;
665 event->status = N_KAFKA_EVENT_ERROR;
666 n_log(LOG_ERR, "failed to produce event: %p, producer: %p, topic: %s, error: %s", event, kafka->rd_kafka_handle, kafka->topic, strerror(error));
667 return FALSE;
668 }
669 }
670 // Success to *enqueue* event
671 n_log(LOG_DEBUG, "successfully enqueued event %p in local producer %p : %s", event, kafka->rd_kafka_handle, kafka->topic);
672
673 event->status = N_KAFKA_EVENT_WAITING_ACK;
674
675 return TRUE;
676} /* n_kafka_produce_ex */
677
685N_KAFKA_EVENT* n_kafka_new_event_from_char(char* string, size_t written, int schema_id) {
686 __n_assert(string, return NULL);
687
688 size_t offset = 0;
689 if (schema_id != -1)
690 offset = 5;
691
692 N_KAFKA_EVENT* event = n_kafka_new_event(schema_id);
693 __n_assert(event, return NULL);
694
695 Malloc(event->event_string, N_STR, 1);
696 __n_assert(event->event_string, free(event); return NULL);
697
698 // allocate the size of the event + (the size of the schema id + magic byte) + one ending \0
699 size_t length = written + offset;
700 Malloc(event->event_string->data, char, length);
701 __n_assert(event->event_string, free(event->event_string); free(event); return NULL);
702 event->event_string->length = length;
703
704 // copy incomming body
705 memcpy(event->event_string->data + offset, string, written);
706 event->event_string->written = written + offset;
707
708 if (schema_id != -1)
709 n_kafka_put_schema_in_nstr(event->event_string, schema_id);
710
711 // set status and schema id
712 event->status = N_KAFKA_EVENT_QUEUED;
713
714 return event;
715} /* n_kafka_new_event_from_char */
716
724 __n_assert(string, return NULL);
725 __n_assert(string->data, return NULL);
726
727 N_KAFKA_EVENT* event = n_kafka_new_event_from_char(string->data, string->written, schema_id);
728
729 return event;
730} /* n_kafka_new_event_from_string */
731
738N_KAFKA_EVENT* n_kafka_new_event_from_file(char* filename, int schema_id) {
739 __n_assert(filename, return NULL);
740
741 N_STR* from = file_to_nstr(filename);
742 __n_assert(from, return NULL);
743
744 return n_kafka_new_event_from_string(from, schema_id);
745} /* n_kafka_new_event_from_file */
746
751void n_kafka_event_destroy_ptr(void* event_ptr) {
752 __n_assert(event_ptr, return);
753 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)event_ptr;
754 __n_assert(event, return);
755
756 if (event->event_string)
757 free_nstr(&event->event_string);
758
759 if (event->event_files_to_delete)
760 free_nstr(&event->event_files_to_delete);
761
762 FreeNoLog(event->from_topic);
763
764 if (event->rd_kafka_headers)
765 rd_kafka_headers_destroy(event->rd_kafka_headers);
766
767 if (event->received_headers)
768 list_destroy(&event->received_headers);
769
770 free(event);
771 return;
772} /* n_kafka_event_destroy_ptr */
773
780 __n_assert(event && (*event), return FALSE);
782 (*event) = NULL;
783 return TRUE;
784} /* n_kafka_event_destroy */
785
792 __n_assert(kafka, return FALSE);
793 int ret = TRUE;
794
795 // wait poll interval msecs for kafka response
796 if (kafka->mode == RD_KAFKA_PRODUCER) {
797 int nb_events = rd_kafka_poll(kafka->rd_kafka_handle, kafka->poll_interval);
798 (void)nb_events;
799 write_lock(kafka->rwlock);
800 // check events status in event table
801 LIST_NODE* node = kafka->events_to_send->start;
802 while (node) {
803 N_KAFKA_EVENT* event = (N_KAFKA_EVENT*)node->ptr;
804 if (event->status == N_KAFKA_EVENT_OK) {
805 kafka->nb_waiting--;
806 n_log(LOG_DEBUG, "removing event OK %p", event);
807 LIST_NODE* node_to_kill = node;
808 node = node->next;
809 N_KAFKA_EVENT* event_to_kill = remove_list_node(kafka->events_to_send, node_to_kill, N_KAFKA_EVENT);
810 n_kafka_event_destroy(&event_to_kill);
811 continue;
812 } else if (event->status == N_KAFKA_EVENT_QUEUED) {
813 if (n_kafka_produce_ex(kafka, event) == FALSE) {
814 kafka->nb_error++;
815 } else {
816 kafka->nb_waiting++;
817 kafka->nb_queued--;
818 }
819 } else if (event->status == N_KAFKA_EVENT_ERROR) {
820 kafka->nb_error++;
821 // TODO :
822 // Re try sending errored events after error_timeout is done
823 //
824 }
825 node = node->next;
826 }
827 unlock(kafka->rwlock);
828 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
829 rd_kafka_message_t* rkm = NULL;
830 while ((rkm = rd_kafka_consumer_poll(kafka->rd_kafka_handle, kafka->poll_interval))) {
831 if (rkm->err) {
832 /* Consumer errors are generally to be considered
833 * informational as the consumer will automatically
834 * try to recover from all types of errors. */
835 n_log(LOG_ERR, "consumer: %s", rd_kafka_message_errstr(rkm));
836 continue;
837 }
838
839 /* Reminder of rkm contents */
840 // Proper message.
841 n_log(LOG_DEBUG, "Message on %s [%" PRId32 "] at offset %" PRId64 " (leader epoch %" PRId32 ")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
842
843 // Print the message key
844 if (rkm->key && rkm->key_len > 0)
845 n_log(LOG_DEBUG, "Key: %.*s", (int)rkm->key_len, (const char*)rkm->key);
846 else if (rkm->key)
847 n_log(LOG_DEBUG, "Key: (%d bytes)", (int)rkm->key_len);
848
849 if (rkm->payload && rkm->len > 0) {
850 write_lock(kafka->rwlock);
851 // make a copy of the event for further processing
852 N_KAFKA_EVENT* event = NULL;
853 event = n_kafka_new_event_from_char(rkm->payload, rkm->len, -1); // no schema id because we want a full raw copy here
854 event->parent_table = kafka;
855 // test if there are headers, save them
856 rd_kafka_headers_t* hdrs = NULL;
857 if (!rd_kafka_message_headers(rkm, &hdrs)) {
858 size_t idx = 0;
859 const char* name = NULL;
860 const void* val = NULL;
861 size_t size = 0;
862
863 event->received_headers = new_generic_list(MAX_LIST_ITEMS);
864 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
865 N_STR* header_entry = NULL;
866 nstrprintf(header_entry, "%s=%s", _str(name), _str((char*)val));
867 list_push(event->received_headers, header_entry, &free_nstr_ptr);
868 idx++;
869 }
870 }
871 // save originating topic (can help sorting if there are multiples one)
872 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
873 if (kafka->schema_id != -1)
874 event->schema_id = n_kafka_get_schema_from_nstr(event->event_string);
876 n_log(LOG_DEBUG, "Consumer received event of (%d bytes) from topic %s", (int)rkm->len, event->from_topic);
877 unlock(kafka->rwlock);
878 }
879 rd_kafka_message_destroy(rkm);
880 }
881 }
882 if (kafka->nb_error > 0) {
883 ret = FALSE;
884 }
885 // n_log( LOG_DEBUG , "kafka poll for handle %p returned %d elements" , kafka -> rd_kafka_handle , nb_events );
886 return ret;
887} /* n_kafka_poll */
888
894void* n_kafka_pooling_thread(void* ptr) {
895 N_KAFKA* kafka = (N_KAFKA*)ptr;
896
897 int status = 1;
898
899 N_TIME chrono;
900
901 if (kafka->mode == RD_KAFKA_PRODUCER)
902 n_log(LOG_DEBUG, "starting pooling thread for kafka handler %p mode PRODUCER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_PRODUCER, kafka->topic);
903 if (kafka->mode == RD_KAFKA_CONSUMER) {
904 char* topiclist = join(kafka->topics, ",");
905 n_log(LOG_DEBUG, "starting pooling thread for kafka handler %p mode CONSUMER (%d) topic %s", kafka->rd_kafka_handle, RD_KAFKA_CONSUMER, _str(topiclist));
906 FreeNoLog(topiclist);
907 }
908
909 start_HiTimer(&chrono);
910
911 int64_t remaining_time = kafka->poll_timeout * 1000;
912 while (status == 1) {
913 if (n_kafka_poll(kafka) == FALSE) {
914 if (kafka->mode == RD_KAFKA_PRODUCER) {
915 n_log(LOG_ERR, "failed to poll kafka producer handle %p with topic %s", kafka->rd_kafka_handle, rd_kafka_topic_name(kafka->rd_kafka_topic));
916 } else if (kafka->topics) {
917 char* topiclist = join(kafka->topics, ",");
918 n_log(LOG_ERR, "failed to poll kafka consumer handle %p with topic %s", kafka->rd_kafka_handle, _str(topiclist));
919 FreeNoLog(topiclist);
920 }
921 }
922
923 read_lock(kafka->rwlock);
924 status = kafka->pooling_thread_status;
925 unlock(kafka->rwlock);
926
927 if (status == 2)
928 break;
929
930 int64_t elapsed_time = get_usec(&chrono);
931 if (kafka->poll_timeout != -1) {
932 remaining_time -= elapsed_time;
933 if (remaining_time < 0) {
934 if (kafka->mode == RD_KAFKA_PRODUCER) {
935 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
936 } else if (kafka->mode == RD_KAFKA_CONSUMER) {
937 n_log(LOG_DEBUG, "timeouted on kafka handle %p", kafka->rd_kafka_handle);
938 }
939 remaining_time = 0;
940 status = 2;
941 break;
942 }
943 }
944 // n_log( LOG_DEBUG , "remaining time: %d on kafka handle %p" , remaining_time , kafka -> rd_kafka_handle );
945 }
946
947 write_lock(kafka->rwlock);
948 kafka->pooling_thread_status = 0;
949 unlock(kafka->rwlock);
950
951 n_log(LOG_DEBUG, "exiting pooling thread for kafka handler %p mode %s", kafka->rd_kafka_handle, (kafka->mode == RD_KAFKA_PRODUCER) ? "PRODUCER" : "CONSUMER");
952 pthread_exit(NULL);
953 return NULL;
954} /* n_kafka_pooling_thread */
955
962 __n_assert(kafka, return FALSE);
963
964 read_lock(kafka->rwlock);
965 int status = kafka->pooling_thread_status;
966 unlock(kafka->rwlock);
967
968 if (status != 0) {
969 n_log(LOG_ERR, "kafka pooling thread already started for handle %p", kafka);
970 return FALSE;
971 }
972
973 write_lock(kafka->rwlock);
974 kafka->pooling_thread_status = 1;
975
976 if (pthread_create(&kafka->pooling_thread, NULL, n_kafka_pooling_thread, (void*)kafka) != 0) {
977 n_log(LOG_ERR, "unable to create pooling_thread for kafka handle %p", kafka);
978 unlock(kafka->rwlock);
979 return FALSE;
980 }
981 unlock(kafka->rwlock);
982
983 n_log(LOG_DEBUG, "pthread_create sucess for kafka handle %p->%p", kafka, kafka->rd_kafka_handle);
984
985 return TRUE;
986} /* n_kafka_start_pooling_thread */
987
994 __n_assert(kafka, return FALSE);
995
996 read_lock(kafka->rwlock);
997 int pooling_thread_status = kafka->pooling_thread_status;
998 unlock(kafka->rwlock);
999
1000 if (pooling_thread_status == 0) {
1001 n_log(LOG_DEBUG, "kafka pooling thread already stopped for handle %p", kafka);
1002 return FALSE;
1003 }
1004 if (pooling_thread_status == 2) {
1005 n_log(LOG_DEBUG, "kafka pooling ask for stop thread already done for handle %p", kafka);
1006 return FALSE;
1007 }
1008
1009 write_lock(kafka->rwlock);
1010 kafka->pooling_thread_status = 2;
1011 unlock(kafka->rwlock);
1012
1013 pthread_join(kafka->pooling_thread, NULL);
1014
1015 return TRUE;
1016} /* n_kafka_stop_pooling_thread */
1017
1024int n_kafka_dump_unprocessed(N_KAFKA* kafka, char* directory) {
1025 __n_assert(kafka, return FALSE);
1026 __n_assert(directory, return FALSE);
1027
1028 int status = 0;
1029 size_t nb_todump = 0;
1030 read_lock(kafka->rwlock);
1031 status = kafka->pooling_thread_status;
1032 nb_todump = kafka->nb_queued + kafka->nb_waiting + kafka->nb_error;
1033 if (status != 0) {
1034 n_log(LOG_ERR, "kafka handle %p thread pooling func is still running, aborting dump", kafka);
1035 unlock(kafka->rwlock);
1036 return FALSE;
1037 }
1038 if (nb_todump == 0) {
1039 n_log(LOG_DEBUG, "kafka handle %p: nothing to dump, all events processed correctly", kafka);
1040 unlock(kafka->rwlock);
1041 return TRUE;
1042 }
1043
1044 N_STR* dumpstr = new_nstr(0);
1045 list_foreach(node, kafka->events_to_send) {
1046 N_KAFKA_EVENT* event = node->ptr;
1047 if (event->status != N_KAFKA_EVENT_OK) {
1048 size_t offset = 0;
1049 if (event->schema_id != -1)
1050 offset = 5;
1051
1052 N_STR* filename = NULL;
1053 nstrprintf(filename, "%s/%s+%p", directory, kafka->topic, event);
1054 n_log(LOG_DEBUG, "Dumping unprocessed events to %s", _nstr(filename));
1055 // dump event here
1056 dumpstr->data = event->event_string->data + offset;
1057 dumpstr->written = event->event_string->written - offset;
1058 dumpstr->length = event->event_string->length;
1059 nstr_to_file(dumpstr, _nstr(filename));
1060 free_nstr(&filename);
1061 }
1062 }
1063 unlock(kafka->rwlock);
1064 dumpstr->data = NULL;
1065 free_nstr(&dumpstr);
1066 return TRUE;
1067} /* n_kafka_dump_unprocessed */
1068
1075int n_kafka_load_unprocessed(N_KAFKA* kafka, char* directory) {
1076 __n_assert(kafka, return FALSE);
1077 __n_assert(directory, return FALSE);
1078
1079 write_lock(kafka->rwlock);
1080 // TODO:load events from filename
1081 // TODO: use base64decode( filename ) split( result , '+' ) to get brokersname+topic to check against what's saved with the event and what's inside kafka's handle conf
1082 unlock(kafka->rwlock);
1083
1084 return TRUE;
1085} /* n_kafka_load_unprocessed */
1086
1093 __n_assert(kafka, return NULL);
1094
1095 N_KAFKA_EVENT* event = NULL;
1096
1097 write_lock(kafka->rwlock);
1098 if (kafka->received_events->start)
1100 unlock(kafka->rwlock);
1101
1102 return event;
1103} /* n_kafka_get_event */
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
Definition n_common.h:331
#define FreeNoLog(__ptr)
Free Handler without log.
Definition n_common.h:251
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:187
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:258
#define _str(__PTR)
define true
Definition n_common.h:176
int get_computer_name(char *computer_name, size_t len)
abort program with a text
Definition n_common.c:55
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
Definition n_common.h:390
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
Definition n_common.h:377
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
Definition n_common.h:184
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
Definition n_common.h:363
#define Free(__ptr)
Free Handler to get errors.
Definition n_common.h:242
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
Definition n_common.h:349
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:182
void * ptr
void pointer to store
Definition n_list.h:26
LIST_NODE * start
pointer to the start of the list
Definition n_list.h:46
struct LIST_NODE * next
pointer to the next node
Definition n_list.h:32
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition n_list.c:199
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition n_list.h:65
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition n_list.h:76
int list_destroy(LIST **list)
Empty and Free a list container.
Definition n_list.c:518
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Definition n_list.h:55
Structure of a generic list node.
Definition n_list.h:24
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:69
#define LOG_DEBUG
debug-level messages
Definition n_log.h:64
#define LOG_ERR
error conditions
Definition n_log.h:56
#define LOG_INFO
informational
Definition n_log.h:62
int32_t poll_interval
poll interval in usecs
Definition n_kafka.h:101
cJSON * configuration
kafka json configuration holder
Definition n_kafka.h:97
pthread_t pooling_thread
pooling thread id
Definition n_kafka.h:105
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition n_kafka.h:95
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition n_kafka.h:87
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition n_kafka.h:57
pthread_rwlock_t rwlock
access lock
Definition n_kafka.h:93
N_STR * errstr
kafka error string holder
Definition n_kafka.h:89
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition n_kafka.h:103
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition n_kafka.h:79
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:71
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition n_kafka.h:69
int32_t poll_timeout
poll timeout in usecs
Definition n_kafka.h:99
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition n_kafka.h:81
char * groupid
consumer group id
Definition n_kafka.h:73
int schema_id
kafka schema id in network order
Definition n_kafka.h:91
char * bootstrap_servers
kafka bootstrap servers string
Definition n_kafka.h:85
char * topic
kafka topic string
Definition n_kafka.h:83
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition n_kafka.c:559
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1024
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1092
#define N_KAFKA_EVENT_OK
state of an OK event
Definition n_kafka.h:42
#define N_KAFKA_EVENT_ERROR
state of an errored event
Definition n_kafka.h:40
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition n_kafka.c:534
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition n_kafka.c:17
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition n_kafka.c:57
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:262
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:72
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
Definition n_kafka.h:38
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition n_kafka.c:44
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition n_kafka.c:791
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:779
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
Definition n_kafka.c:751
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
Definition n_kafka.c:1075
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition n_kafka.c:593
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
Definition n_kafka.c:993
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:609
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:685
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition n_kafka.c:31
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition n_kafka.c:961
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition n_kafka.c:511
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
Definition n_kafka.h:44
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:738
N_KAFKA * n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
Definition n_kafka.c:214
#define N_KAFKA_EVENT_QUEUED
state of a queued event
Definition n_kafka.h:36
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:159
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:723
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:67
structure of a KAFKA message
Definition n_kafka.h:47
size_t written
size of the written data inside the string
Definition n_str.h:45
char * data
the string
Definition n_str.h:41
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
Definition n_str.h:43
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
Definition n_str.c:49
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
Definition n_str.h:176
int split_count(char **split_result)
Count split elements.
Definition n_str.c:954
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
Definition n_str.c:386
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
Definition n_str.c:180
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
Definition n_str.h:94
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
Definition n_str.c:874
char * join(char **splitresult, char *delim)
join the array into a string
Definition n_str.c:990
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
Definition n_str.c:260
int free_split_result(char ***tab)
Free a split result allocated array.
Definition n_str.c:970
A box including a string and his lenght.
Definition n_str.h:39
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
Definition n_time.c:67
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Definition n_time.c:89
Timing Structure.
Definition n_time.h:32
base64 encoding and decoding functions using N_STR
Common headers and low-level functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
Definition n_kafka.c:894
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
Definition n_kafka.c:94
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Definition n_kafka.c:632
kafka generic produce and consume event header