21 uint32_t raw_schema_id = 0;
22 memcpy(&raw_schema_id,
string + 1,
sizeof(uint32_t));
24 return (int32_t)ntohl(raw_schema_id);
47 uint32_t schema_id_htonl = htonl((uint32_t)schema_id);
48 memcpy(
string + 1, &schema_id_htonl,
sizeof(uint32_t));
103 if (rkmessage->err) {
104 n_log(
LOG_ERR,
"message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
109 if (event && event->parent_table)
112 if (event && event->parent_table)
113 unlock(event->parent_table->rwlock);
115 n_log(
LOG_DEBUG,
"message delivered (%ld bytes, partition %d)", rkmessage->len, rkmessage->partition);
118 if (event && event->parent_table)
121 if (event->event_files_to_delete) {
122 char** files_to_delete =
split(
_nstr(event->event_files_to_delete),
";", 0);
123 if (files_to_delete) {
126 while (files_to_delete[index]) {
127 int ret = unlink(files_to_delete[index]);
130 n_log(
LOG_DEBUG,
"deleted on produce ack: %s", files_to_delete[index]);
132 n_log(
LOG_ERR,
"couldn't delete \"%s\": %s", files_to_delete[index], strerror(error));
145 if (event && event->parent_table)
146 unlock(event->parent_table->rwlock);
170 if (kafka->
mode == RD_KAFKA_CONSUMER) {
172 rd_kafka_topic_partition_list_destroy(kafka->
subscription);
174 if (kafka->
mode == RD_KAFKA_PRODUCER) {
249 n_log(
LOG_ERR,
"could not init kafka rwlock in kafka structure at address %p", kafka);
275 N_STR* config_string = NULL;
277 if (!config_string) {
283 json = cJSON_Parse(
_nstrp(config_string));
285 n_log(
LOG_ERR,
"unable to parse json %s", config_string);
292 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++) {
293 cJSON* entry = cJSON_GetArrayItem(json, jsonIndex);
295 if (!entry)
continue;
297 if (!entry->valuestring) {
302 if (entry->string[0] !=
'-') {
304 if (strcmp(
"topic", entry->string) != 0 && strcmp(
"topics", entry->string) != 0 && strcmp(
"value.schema.id", entry->string) != 0 && strcmp(
"value.schema.type", entry->string) != 0 && strcmp(
"poll.interval", entry->string) != 0 && strcmp(
"poll.timeout", entry->string) != 0 && strcmp(
"group.id.autogen", entry->string) != 0) {
305 if (!strcmp(
"group.id", entry->string)) {
307 if (
mode == RD_KAFKA_PRODUCER)
309 kafka->
groupid = strdup(entry->valuestring);
315 n_log(
LOG_DEBUG,
"kafka config enabled: %s => %s", entry->string, entry->valuestring);
319 n_log(
LOG_DEBUG,
"kafka disabled config: %s => %s", entry->string, entry->valuestring);
326 jstr = cJSON_GetObjectItem(json,
"topic");
327 if (jstr && jstr->valuestring) {
328 kafka->
topic = strdup(jstr->valuestring);
331 if (
mode == RD_KAFKA_PRODUCER) {
338 jstr = cJSON_GetObjectItem(json,
"topics");
339 if (jstr && jstr->valuestring) {
343 if (
mode == RD_KAFKA_CONSUMER) {
349 jstr = cJSON_GetObjectItem(json,
"value.schema.id");
350 if (jstr && jstr->valuestring) {
351 int schem_v = atoi(jstr->valuestring);
352 if (schem_v < -1 || schem_v > 9999) {
361 jstr = cJSON_GetObjectItem(json,
"poll.interval");
362 if (jstr && jstr->valuestring) {
367 jstr = cJSON_GetObjectItem(json,
"poll.timeout");
368 if (jstr && jstr->valuestring) {
374 jstr = cJSON_GetObjectItem(json,
"bootstrap.servers");
375 if (jstr && jstr->valuestring) {
380 if (
mode == RD_KAFKA_PRODUCER) {
398 kafka->
mode = RD_KAFKA_PRODUCER;
399 }
else if (
mode == RD_KAFKA_CONSUMER) {
413 char computer_name[1024] =
"";
418 jstr = cJSON_GetObjectItem(json,
"group.id.autogen");
419 if (jstr && jstr->valuestring) {
420 if (strcmp(jstr->valuestring,
"host-topic-group") == 0) {
422 nstrprintf(groupid,
"%s_%s", computer_name, topics);
423 }
else if (strcmp(jstr->valuestring,
"unique-group") == 0) {
425 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
430 nstrprintf(groupid,
"%s_%s_%d", computer_name, topics, getpid());
431 n_log(
LOG_DEBUG,
"group.id is not set and group.id.autogen is not set, generated unique group id: %s",
_nstr(groupid));
435 groupid->
data = NULL;
472 kafka->
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
473 for (
int i = 0; i < topic_cnt; i++)
477 RD_KAFKA_PARTITION_UA);
490 n_log(
LOG_ERR,
"kafka consumer: failed to subscribe to %d topics: %s", kafka->
subscription->cnt, rd_kafka_err2str(err));
495 n_log(
LOG_DEBUG,
"kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka->
subscription->cnt);
497 kafka->
mode = RD_KAFKA_CONSUMER;
517 event->event_string = NULL;
518 event->event_files_to_delete = NULL;
519 event->from_topic = NULL;
520 event->rd_kafka_headers = NULL;
521 event->received_headers = NULL;
522 event->schema_id = schema_id;
524 event->parent_table = NULL;
542 event->rd_kafka_headers = rd_kafka_headers_new(count);
545 n_log(
LOG_ERR,
"event headers already allocated for event %p", event);
566 if (key_length < 1 || key_length > SSIZE_MAX) {
567 n_log(
LOG_ERR,
"Invalid key length (%zu) for header in event %p", key_length, event);
571 if (value_length < 1 || value_length > SSIZE_MAX) {
572 n_log(
LOG_ERR,
"Invalid value length (%zu) for key '%s' in event %p", value_length,
key, event);
576 rd_kafka_resp_err_t err = rd_kafka_header_add(event->
rd_kafka_headers,
key, (ssize_t)key_length, value, (ssize_t)value_length);
579 n_log(
LOG_ERR,
"Failed to add header [%s:%zu=%s:%zu] to event %p: %s",
580 key, key_length, value, value_length, event, rd_kafka_err2str(err));
614 event->parent_table = kafka;
634 rd_kafka_resp_err_t err = 0;
637 size_t event_length = 0;
639 event->parent_table = kafka;
642 event_length =
event->event_string->length;
645 rd_kafka_headers_t* hdrs_copy;
648 err = rd_kafka_producev(
650 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
651 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
653 RD_KAFKA_V_HEADERS(hdrs_copy),
654 RD_KAFKA_V_OPAQUE((
void*)event),
658 rd_kafka_headers_destroy(hdrs_copy);
660 n_log(
LOG_ERR,
"failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s", event, event->rd_kafka_headers, kafka->
rd_kafka_handle, kafka->
topic, rd_kafka_err2str(err));
664 if (rd_kafka_produce(kafka->
rd_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
event_string, event_length, NULL, 0, event) == -1) {
697 __n_assert(event->event_string, free(event);
return NULL);
700 size_t length = written + offset;
701 Malloc(event->event_string->data,
char, length);
702 __n_assert(event->event_string, free(event->event_string); free(event);
return NULL);
703 event->event_string->length = length;
706 memcpy(event->event_string->data + offset,
string, written);
707 event->event_string->written = written + offset;
757 if (event->event_string)
760 if (event->event_files_to_delete)
761 free_nstr(&event->event_files_to_delete);
765 if (event->rd_kafka_headers)
766 rd_kafka_headers_destroy(event->rd_kafka_headers);
768 if (event->received_headers)
797 if (kafka->
mode == RD_KAFKA_PRODUCER) {
829 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
830 rd_kafka_message_t* rkm = NULL;
836 n_log(
LOG_ERR,
"consumer: %s", rd_kafka_message_errstr(rkm));
842 n_log(
LOG_DEBUG,
"Message on %s [%" PRId32
"] at offset %" PRId64
" (leader epoch %" PRId32
")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
845 if (rkm->key && rkm->key_len > 0)
846 n_log(
LOG_DEBUG,
"Key: %.*s", (
int)rkm->key_len, (
const char*)rkm->key);
850 if (rkm->payload && rkm->len > 0) {
855 event->parent_table = kafka;
857 rd_kafka_headers_t* hdrs = NULL;
858 if (!rd_kafka_message_headers(rkm, &hdrs)) {
860 const char* name = NULL;
861 const void* val = NULL;
865 while (!rd_kafka_header_get_all(hdrs, idx, &name, &val, &size)) {
866 N_STR* header_entry = NULL;
873 event->from_topic = strdup(rd_kafka_topic_name(rkm->rkt));
877 n_log(
LOG_DEBUG,
"Consumer received event of (%d bytes) from topic %s", (
int)rkm->len, event->from_topic);
880 rd_kafka_message_destroy(rkm);
902 if (kafka->
mode == RD_KAFKA_PRODUCER)
904 if (kafka->
mode == RD_KAFKA_CONSUMER) {
913 while (status == 1) {
915 if (kafka->
mode == RD_KAFKA_PRODUCER) {
917 }
else if (kafka->
topics) {
931 int64_t elapsed_time =
get_usec(&chrono);
933 remaining_time -= elapsed_time;
934 if (remaining_time < 0) {
935 if (kafka->
mode == RD_KAFKA_PRODUCER) {
937 }
else if (kafka->
mode == RD_KAFKA_CONSUMER) {
970 n_log(
LOG_ERR,
"kafka pooling thread already started for handle %p", kafka);
978 n_log(
LOG_ERR,
"unable to create pooling_thread for kafka handle %p", kafka);
1001 if (pooling_thread_status == 0) {
1002 n_log(
LOG_DEBUG,
"kafka pooling thread already stopped for handle %p", kafka);
1005 if (pooling_thread_status == 2) {
1006 n_log(
LOG_DEBUG,
"kafka pooling ask for stop thread already done for handle %p", kafka);
1030 size_t nb_todump = 0;
1035 n_log(
LOG_ERR,
"kafka handle %p thread pooling func is still running, aborting dump", kafka);
1039 if (nb_todump == 0) {
1040 n_log(
LOG_DEBUG,
"kafka handle %p: nothing to dump, all events processed correctly", kafka);
1050 if (event->schema_id != -1)
1053 N_STR* filename = NULL;
1057 dumpstr->
data =
event->event_string->data + offset;
1058 dumpstr->
written =
event->event_string->written - offset;
1059 dumpstr->
length =
event->event_string->length;
1065 dumpstr->
data = NULL;
int mode
Network for managing conenctions.
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
int get_computer_name(char *computer_name, size_t len)
abort program with a text
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
#define Free(__ptr)
Free Handler to get errors.
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
void * ptr
void pointer to store
LIST_NODE * start
pointer to the start of the list
struct LIST_NODE * next
pointer to the next node
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
#define remove_list_node(__LIST_, __NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(size_t max_items)
Initialiaze a generic list container to max_items pointers.
#define MAX_LIST_ITEMS
flag to pass to new_generic_list for the maximum possible number of item in a list
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
int32_t poll_interval
poll interval in usecs
rd_kafka_topic_partition_list_t * subscription
cJSON * configuration
kafka json configuration holder
pthread_t pooling_thread
pooling thread id
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
pthread_rwlock_t rwlock
access lock
N_STR * errstr
kafka error string holder
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
LIST * received_events
list of received N_KAFKA_EVENT
LIST * events_to_send
list of N_KAFKA_EVENT to send
int32_t poll_timeout
poll timeout in usecs
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
char * groupid
consumer group id
int schema_id
kafka schema id in network order
char * bootstrap_servers
kafka bootstrap servers string
char * topic
kafka topic string
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
#define N_KAFKA_EVENT_OK
state of an OK event
#define N_KAFKA_EVENT_ERROR
state of an errored event
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
N_KAFKA * n_kafka_new(int32_t poll_interval, int32_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
#define N_KAFKA_EVENT_QUEUED
state of a queued event
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
size_t written
size of the written data inside the string
size_t length
length of string (in case we wanna keep information after the 0 end of string value)
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
int split_count(char **split_result)
Count split elements.
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
#define nstrprintf(__nstr_var, __format,...)
Macro to quickly allocate and sprintf to N_STR.
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
char * join(char **splitresult, char *delim)
join the array into a string
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
int free_split_result(char ***tab)
Free a split result allocated array.
A box including a string and his lenght.
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Base64 encoding and decoding functions using N_STR.
Common headers and low-level functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Kafka generic produce and consume event header.