20 uint32_t raw_schema_id = 0;
21 memcpy(&raw_schema_id,
string + 1,
sizeof(uint32_t));
23 return (int32_t)ntohl(raw_schema_id);
46 uint32_t schema_id_htonl = htonl((uint32_t)schema_id);
47 memcpy(
string + 1, &schema_id_htonl,
sizeof(uint32_t));
78 *nb_queued = kafka->nb_queued;
79 *nb_waiting = kafka->nb_waiting;
80 *nb_error = kafka->nb_error;
102 if (rkmessage->err) {
103 n_log(
LOG_ERR,
"message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
108 if (event && event->parent_table)
111 if (event && event->parent_table)
112 unlock(event->parent_table->rwlock);
114 n_log(
LOG_DEBUG,
"message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
117 if (event && event->parent_table)
120 if (event->event_files_to_delete) {
121 char** files_to_delete =
split(
_nstr(event->event_files_to_delete),
";", 0);
122 if (files_to_delete) {
125 while (files_to_delete[index]) {
126 int ret = unlink(files_to_delete[index]);
129 n_log(
LOG_DEBUG,
"deleted on produce ack: %s", files_to_delete[index]);
131 n_log(
LOG_ERR,
"couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
144 if (event && event->parent_table)
145 unlock(event->parent_table->rwlock);
169 if (kafka->
mode == RD_KAFKA_CONSUMER) {
171 rd_kafka_topic_partition_list_destroy(kafka->subscription);
173 if (kafka->
mode == RD_KAFKA_PRODUCER) {
228 kafka->topics = NULL;
229 kafka->subscription = NULL;
235 kafka->nb_queued = 0;
236 kafka->nb_waiting = 0;
248 n_log(
LOG_ERR,
"could not init kafka rwlock in kafka structure at address %p", kafka);
274 N_STR* config_string = NULL;
276 if (!config_string) {
277 n_log(
LOG_ERR,
"unable to read config from file %s !", config_file);
282 json = cJSON_Parse(
_nstrp(config_string));
284 n_log(
LOG_ERR,
"unable to parse json %s", config_string);
291 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
292 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
294 if (!entry)
continue;
296 if (!entry->valuestring) {
301 if (entry->string[0] !=
'-') {
303 if (strcmp(
"topic", entry->string) != 0 && strcmp(
"topics", entry->string) != 0 && strcmp(
"value.schema.id", entry->string) != 0 && strcmp(
"value.schema.type", entry->string) != 0 && strcmp(
"poll.interval", entry->string) != 0 && strcmp(
"poll.timeout", entry->string) != 0 && strcmp(
"group.id.autogen", entry->string) != 0) {
304 if (!strcmp(
"group.id", entry->string)) {
306 if (mode == RD_KAFKA_PRODUCER)
308 kafka->
groupid = strdup(entry->valuestring);
314 n_log(
LOG_DEBUG,
"kafka config enabled: %s => %s", entry->string, entry->valuestring);
318 n_log(
LOG_DEBUG,
"kafka disabled config: %s => %s", entry->string, entry->valuestring);
325 jstr = cJSON_GetObjectItem(json,
"topic");
326 if (jstr && jstr->valuestring) {
327 kafka->
topic = strdup(jstr->valuestring);
330 if (mode == RD_KAFKA_PRODUCER) {
337 jstr = cJSON_GetObjectItem(json,
"topics");
338 if (jstr && jstr->valuestring) {
339 kafka->topics =
split(jstr->valuestring,
",", 0);
342 if (mode == RD_KAFKA_CONSUMER) {
348 jstr = cJSON_GetObjectItem(json,
"value.schema.id");
349 if (jstr && jstr->valuestring) {
350 int schem_v = atoi(jstr->valuestring);
351 if (schem_v < -1 || schem_v > 9999) {
360 jstr = cJSON_GetObjectItem(json,
"poll.interval");
361 if (jstr && jstr->valuestring) {
366 jstr = cJSON_GetObjectItem(json,
"poll.timeout");
367 if (jstr && jstr->valuestring) {
373 jstr = cJSON_GetObjectItem(json,
"bootstrap.servers");
374 if (jstr && jstr->valuestring) {
379 if (mode == RD_KAFKA_PRODUCER) {
397 kafka->
mode = RD_KAFKA_PRODUCER;
398 }
else if (mode == RD_KAFKA_CONSUMER) {
412 char computer_name[1024] =
"";
415 char* topics =
join(kafka->topics,
"_");
417 jstr = cJSON_GetObjectItem(json,
"group.id.autogen");
418 if (jstr && jstr->valuestring) {
419 if (strcmp(jstr->valuestring,
"host-topic-group") == 0) {
421 nstrprintf(groupid,
"%s_%s", computer_name, topics);
422 }
else if (strcmp(jstr->valuestring,
"unique-group") == 0) {
424 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
429 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
430 n_log(
LOG_DEBUG,
"group.id is not set and group.id.autogen is not set, generated unique group id: %s",
_nstr(groupid));
434 groupid->
data = NULL;
471 kafka->subscription = rd_kafka_topic_partition_list_new(topic_cnt);
472 for (
int i = 0; i < topic_cnt; i++)
473 rd_kafka_topic_partition_list_add(kafka->subscription, kafka->topics[i],
476 RD_KAFKA_PARTITION_UA);
487 int err = rd_kafka_subscribe(kafka->
rd_kafka_handle, kafka->subscription);
489 n_log(
LOG_ERR,
"kafka consumer: failed to subscribe to %d topics: %s", kafka->subscription->cnt, rd_kafka_err2str(err));
494 n_log(
LOG_DEBUG,
"kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->subscription->cnt);
496 kafka->
mode = RD_KAFKA_CONSUMER;
516 event->event_string = NULL;
517 event->event_files_to_delete = NULL;
518 event->from_topic = NULL;
519 event->rd_kafka_headers = NULL;
520 event->received_headers = NULL;
521 event->schema_id = schema_id;
523 event->parent_table = NULL;
541 event->rd_kafka_headers = rd_kafka_headers_new(count);
544 n_log(
LOG_ERR,
"event headers already allocated for event %p", event);
565 if (key_length < 1 || key_length > SSIZE_MAX) {
566 n_log(
LOG_ERR,
"Invalid key length (%zu) for header in event %p", key_length, event);
570 if (value_length < 1 || value_length > SSIZE_MAX) {
571 n_log(
LOG_ERR,
"Invalid value length (%zu) for key '%s' in event %p", value_length, key, event);
575 rd_kafka_resp_err_t err = rd_kafka_header_add(event->
rd_kafka_headers, key, (ssize_t)key_length, value, (ssize_t)value_length);
578 n_log(
LOG_ERR,
"Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
579 key, key_length, value, value_length, event, rd_kafka_err2str(err));
613 event->parent_table = kafka;
633 rd_kafka_resp_err_t err = 0;
635 char* event_string = NULL;
636 size_t event_length = 0;
638 event->parent_table = kafka;
640 event_string =
event->event_string->data;
641 event_length =
event->event_string->length;
644 rd_kafka_headers_t* hdrs_copy;
647 err = rd_kafka_producev(
649 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
650 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
651 RD_KAFKA_V_VALUE(event_string, event_length),
652 RD_KAFKA_V_HEADERS(hdrs_copy),
653 RD_KAFKA_V_OPAQUE((
void*)event),
657 rd_kafka_headers_destroy(hdrs_copy);
659 n_log(
LOG_ERR,
"failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->
rd_kafka_handle, kafka->
topic, rd_kafka_err2str(err));
663 if (rd_kafka_produce(kafka->
rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, event_string, event_length, NULL, 0, event) == -1) {
696 __n_assert(event->event_string, free(event);
return NULL);
699 size_t length = written + offset;
700 Malloc(event->event_string->data,
char, length);
701 __n_assert(event->event_string, free(event->event_string); free(event);
return NULL);
702 event->event_string->length = length;
705 memcpy(event->event_string->data + offset,
string, written);
706 event->event_string->written = written + offset;
756 if (event->event_string)
759 if (event->event_files_to_delete)
760 free_nstr(&event->event_files_to_delete);
764 if (event->rd_kafka_headers)
765 rd_kafka_headers_destroy(event->rd_kafka_headers);
767 if (event->received_headers)
796 if (kafka->
mode == RD_KAFKA_PRODUCER) {
828 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
829 rd_kafka_message_t* rkm = NULL;
835 n_log(
LOG_ERR,
"consumer: %s", rd_kafka_message_errstr(rkm));
841 n_log(
LOG_DEBUG,
"Message on %s [%" PRId32
"] at offset %" PRId64
" (leader epoch %" PRId32
")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
844 if (rkm->key && rkm->key_len > 0)
845 n_log(
LOG_DEBUG,
"Key: %.*s", (
int)rkm->key_len, (
const char*)rkm->key);
849 if (rkm->payload && rkm->len > 0) {
854 event->parent_table = kafka;
856 rd_kafka_headers_t* hdrs = NULL;
857 if (!rd_kafka_message_headers(rkm, &hdrs)) {
859 const char* name = NULL;
860 const void* val = NULL;
864 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
865 N_STR* header_entry = NULL;
872 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
876 n_log(
LOG_DEBUG,
"Consumer received event of (%d bytes) from topic %s", (
int)rkm->len, event->from_topic);
879 rd_kafka_message_destroy(rkm);
882 if (kafka->nb_error > 0) {
901 if (kafka->
mode == RD_KAFKA_PRODUCER)
903 if (kafka->
mode == RD_KAFKA_CONSUMER) {
904 char* topiclist =
join(kafka->topics,
",");
912 while (status == 1) {
914 if (kafka->
mode == RD_KAFKA_PRODUCER) {
916 }
else if (kafka->topics) {
917 char* topiclist =
join(kafka->topics,
",");
930 int64_t elapsed_time =
get_usec(&chrono);
932 remaining_time -= elapsed_time;
933 if (remaining_time < 0) {
934 if (kafka->
mode == RD_KAFKA_PRODUCER) {
936 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
969 n_log(
LOG_ERR,
"kafka pooling thread already started for handle %p", kafka);
977 n_log(
LOG_ERR,
"unable to create pooling_thread for kafka handle %p", kafka);
1000 if (pooling_thread_status == 0) {
1001 n_log(
LOG_DEBUG,
"kafka pooling thread already stopped for handle %p", kafka);
1004 if (pooling_thread_status == 2) {
1005 n_log(
LOG_DEBUG,
"kafka pooling ask for stop thread already done for handle %p", kafka);
1029 size_t nb_todump = 0;
1032 nb_todump = kafka->nb_queued + kafka->nb_waiting + kafka->nb_error;
1034 n_log(
LOG_ERR,
"kafka handle %p thread pooling func is still running, aborting dump", kafka);
1038 if (nb_todump == 0) {
1039 n_log(
LOG_DEBUG,
"kafka handle %p: nothing to dump, all events processed correctly", kafka);
1049 if (event->schema_id != -1)
1052 N_STR* filename = NULL;
1056 dumpstr->
data =
event->event_string->data + offset;
1057 dumpstr->
written =
event->event_string->written - offset;
1058 dumpstr->
length =
event->event_string->length;
1064 dumpstr->
data = NULL;
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
int get_computer_name(char *computer_name, size_t len)
abort program with a text
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
#define Free(__ptr)
Free Handler to get errors.
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
int32_t poll_interval
poll interval in usecs
cJSON * configuration
kafka json configuration holder
pthread_t pooling_thread
pooling thread id
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
pthread_rwlock_t rwlock
access lock
N_STR * errstr
kafka error string holder
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
LIST * received_events
list of received N_KAFKA_EVENT
LIST * events_to_send
list of N_KAFKA_EVENT to send
int32_t poll_timeout
poll timeout in usecs
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
char * groupid
consumer group id
int schema_id
kafka schema id in network order
char * bootstrap_servers
kafka bootstrap servers string
char * topic
kafka topic string
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
#define N_KAFKA_EVENT_OK
state of an OK event
#define N_KAFKA_EVENT_ERROR
state of an errored event
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
N_KAFKA * n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
#define N_KAFKA_EVENT_QUEUED
state of a queued event
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
size_t written
size of the written data inside the string
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
int split_count(char **split_result)
Count split elements.
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
char * join(char **splitresult, char *delim)
join the array into a string
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
int free_split_result(char ***tab)
Free a split result allocated array.
A box including a string and his lenght.
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
base64 encoding and decoding functions using N_STR
Common headers and low-level functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
kafka generic produce and consume event header