23 int32_t schema_id = -1 ;
24 memcpy( &schema_id ,
string + 1 ,
sizeof( int32_t) );
25 return ntohl( schema_id );
37 __n_assert( (
string -> written >=
sizeof( int32_t ) ) ,
return -1 );
50 int32_t schema_id_htonl = htonl( schema_id );
51 memcpy(
string + 1, &schema_id_htonl ,
sizeof( int32_t) );
65 __n_assert( (
string -> written >=
sizeof( int32_t ) ) ,
return FALSE );
84 int status = kafka -> pooling_thread_status ;
85 *nb_queued = kafka -> nb_queued ;
86 *nb_waiting = kafka -> nb_waiting ;
87 *nb_error = kafka -> nb_error ;
113 n_log(
LOG_ERR ,
"message delivery failed: %s" , rd_kafka_err2str( rkmessage -> err) );
119 if( event && event -> parent_table )
120 write_lock( event -> parent_table -> rwlock );
122 if( event && event -> parent_table )
123 unlock( event -> parent_table -> rwlock );
127 n_log(
LOG_DEBUG ,
"message delivered (%ld bytes, partition %d)" , rkmessage -> len , rkmessage -> partition );
131 if( event && event -> parent_table )
132 write_lock( event -> parent_table -> rwlock );
134 if( event -> event_files_to_delete )
136 char **files_to_delete =
split(
_nstr( event -> event_files_to_delete ) ,
";" , 0 );
137 if( files_to_delete )
142 while( files_to_delete[ index ] )
144 int ret = unlink( files_to_delete[ index ] );
148 n_log(
LOG_DEBUG ,
"deleted on produce ack: %s" , files_to_delete[ index ] );
152 n_log(
LOG_ERR ,
"couldn't delete \"%s\": %s" , files_to_delete[ index ] , strerror( error ) );
167 if( event && event -> parent_table )
168 unlock( event -> parent_table -> rwlock );
169 n_log(
LOG_INFO ,
"kafka event %p received an ack !" , event );
195 if( kafka -> rd_kafka_handle )
197 if( kafka -> mode == RD_KAFKA_CONSUMER )
199 rd_kafka_consumer_close(kafka -> rd_kafka_handle);
200 rd_kafka_topic_partition_list_destroy(kafka ->subscription);
202 if( kafka -> mode == RD_KAFKA_PRODUCER )
204 rd_kafka_flush( kafka -> rd_kafka_handle , kafka -> poll_timeout );
206 rd_kafka_destroy( kafka -> rd_kafka_handle );
210 if( kafka -> rd_kafka_conf )
212 rd_kafka_conf_destroy( kafka -> rd_kafka_conf );
215 if( kafka -> configuration )
216 cJSON_Delete( kafka -> configuration );
218 if( kafka -> errstr )
221 if( kafka -> events_to_send )
224 if( kafka -> received_events )
229 if( kafka -> rd_kafka_topic )
230 rd_kafka_topic_destroy( kafka -> rd_kafka_topic );
232 Free( kafka -> bootstrap_servers );
252 kafka -> errstr =
new_nstr( errstr_len );
255 kafka -> events_to_send = NULL ;
256 kafka -> received_events = NULL ;
257 kafka -> rd_kafka_conf = NULL ;
258 kafka -> rd_kafka_handle = NULL ;
259 kafka -> configuration = NULL ;
260 kafka -> groupid= NULL ;
261 kafka -> topics= NULL ;
262 kafka -> subscription= NULL ;
263 kafka -> topic = NULL ;
265 kafka -> schema_id = -1 ;
266 kafka -> poll_timeout = poll_timeout ;
267 kafka -> poll_interval = poll_interval ;
268 kafka -> nb_queued = 0 ;
269 kafka -> nb_waiting = 0 ;
270 kafka -> nb_error = 0 ;
271 kafka -> pooling_thread_status = 0 ;
272 kafka -> bootstrap_servers = NULL ;
282 n_log(
LOG_ERR ,
"could not init kafka rwlock in kafka structure at address %p" , kafka );
307 kafka -> rd_kafka_conf = rd_kafka_conf_new();
310 N_STR *config_string = NULL ;
314 n_log(
LOG_ERR ,
"unable to read config from file %s !" , config_file );
319 json = cJSON_Parse(
_nstrp( config_string ) );
322 n_log(
LOG_ERR ,
"unable to parse json %s" , config_string );
329 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++)
331 cJSON *entry = cJSON_GetArrayItem(json, jsonIndex);
333 if( !entry ) continue ;
335 if( !entry -> valuestring )
341 if( entry ->
string[ 0 ] !=
'-' )
344 if( strcmp(
"topic" , entry ->
string ) != 0 && strcmp(
"topics" , entry ->
string ) != 0 && strcmp(
"value.schema.id" , entry ->
string ) != 0 && strcmp(
"poll.interval" , entry ->
string ) != 0 && strcmp(
"poll.timeout" , entry ->
string ) !=0 && strcmp(
"group.id.autogen" , entry ->
string ) != 0 )
346 if( !strcmp(
"group.id" , entry ->
string ) )
349 if( mode == RD_KAFKA_PRODUCER )
351 kafka -> groupid = strdup( entry -> valuestring );
354 if( rd_kafka_conf_set( kafka -> rd_kafka_conf , entry ->
string , entry -> valuestring ,
_nstr( kafka -> errstr ) , kafka -> errstr -> length ) != RD_KAFKA_CONF_OK )
360 n_log(
LOG_DEBUG ,
"kafka config enabled: %s => %s" , entry ->
string , entry -> valuestring );
366 n_log(
LOG_DEBUG ,
"kafka disabled config: %s => %s" , entry ->
string , entry -> valuestring );
373 jstr = cJSON_GetObjectItem( json,
"topic" );
374 if( jstr && jstr -> valuestring )
376 kafka -> topic = strdup( jstr -> valuestring );
381 if( mode == RD_KAFKA_PRODUCER )
389 jstr = cJSON_GetObjectItem( json,
"topics" );
390 if( jstr && jstr -> valuestring )
392 kafka -> topics =
split( jstr -> valuestring ,
"," , 0 );
393 n_log(
LOG_DEBUG ,
"kafka consumer topics: %s" , jstr -> valuestring );
397 if( mode == RD_KAFKA_CONSUMER )
404 jstr = cJSON_GetObjectItem( json,
"value.schema.id" );
405 if( jstr && jstr -> valuestring )
407 int schem_v = atoi( jstr -> valuestring );
408 if( schem_v < -1 || schem_v > 9999 )
415 kafka -> schema_id = schem_v ;
418 jstr = cJSON_GetObjectItem( json,
"poll.interval" );
419 if( jstr && jstr -> valuestring )
421 kafka -> poll_interval = atoi( jstr -> valuestring );
422 n_log(
LOG_DEBUG ,
"kafka poll interval: %d" , kafka -> poll_interval );
425 jstr = cJSON_GetObjectItem( json,
"poll.timeout" );
426 if( jstr && jstr -> valuestring )
428 kafka -> poll_timeout = atoi( jstr -> valuestring );
429 n_log(
LOG_DEBUG ,
"kafka poll timeout: %d" , kafka -> poll_timeout );
433 jstr = cJSON_GetObjectItem( json,
"bootstrap.servers" );
434 if( jstr && jstr -> valuestring )
436 kafka -> bootstrap_servers = strdup( jstr -> valuestring );
437 n_log(
LOG_DEBUG ,
"kafka bootstrap server: %s" , kafka -> bootstrap_servers );
441 if( mode == RD_KAFKA_PRODUCER )
446 kafka -> rd_kafka_handle = rd_kafka_new( RD_KAFKA_PRODUCER , kafka -> rd_kafka_conf ,
_nstr( kafka -> errstr ) , kafka -> errstr -> length );
447 if( !kafka -> rd_kafka_handle )
449 n_log(
LOG_ERR ,
"failed to create new producer: %s" ,
_nstr( kafka -> errstr ) );
454 kafka -> rd_kafka_conf = NULL ;
456 kafka -> rd_kafka_topic = rd_kafka_topic_new( kafka -> rd_kafka_handle , kafka -> topic , NULL );
457 if( !kafka -> rd_kafka_topic )
462 kafka -> mode = RD_KAFKA_PRODUCER ;
464 else if( mode == RD_KAFKA_CONSUMER )
478 if( !kafka -> groupid )
480 char computer_name[ 1024 ] =
"" ;
483 char *topics =
join( kafka -> topics ,
"_" );
485 jstr = cJSON_GetObjectItem( json,
"group.id.autogen" );
486 if( jstr && jstr -> valuestring )
488 if( strcmp( jstr -> valuestring ,
"host-topic-group" ) == 0 )
491 nstrprintf( groupid ,
"%s_%s" , computer_name , topics );
493 else if( strcmp( jstr -> valuestring ,
"unique-group" ) == 0 )
496 nstrprintf( groupid ,
"%s_%s_%d" , computer_name , topics , getpid() );
502 nstrprintf( groupid ,
"%s_%s_%d" , computer_name , topics , getpid() );
503 n_log(
LOG_DEBUG ,
"group.id is not set and group.id.autogen is not set, generated unique group id: %s" ,
_nstr( groupid ));
506 kafka -> groupid = groupid -> data ;
507 groupid -> data = NULL ;
510 if( rd_kafka_conf_set( kafka -> rd_kafka_conf ,
"group.id" , kafka -> groupid ,
_nstr( kafka -> errstr ) , kafka -> errstr -> length ) != RD_KAFKA_CONF_OK )
512 n_log(
LOG_ERR ,
"kafka consumer group.id error: %s" ,
_nstr( kafka -> errstr ) );
518 n_log(
LOG_DEBUG ,
"kafka consumer group.id => %s" , kafka -> groupid );
527 kafka -> rd_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, kafka -> rd_kafka_conf,
_nstr( kafka -> errstr ) , kafka -> errstr -> length );
528 if (!kafka -> rd_kafka_handle) {
529 n_log(
LOG_ERR ,
"%% Failed to create new consumer: %s", kafka ->errstr);
534 kafka -> rd_kafka_conf = NULL ;
544 rd_kafka_poll_set_consumer(kafka -> rd_kafka_handle);
548 kafka -> subscription = rd_kafka_topic_partition_list_new(topic_cnt);
549 for (
int i = 0; i < topic_cnt; i++)
550 rd_kafka_topic_partition_list_add(kafka ->subscription, kafka ->topics[i],
553 RD_KAFKA_PARTITION_UA);
564 int err = rd_kafka_subscribe(kafka -> rd_kafka_handle, kafka -> subscription);
567 n_log(
LOG_ERR ,
"kafka consumer: failed to subscribe to %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
572 n_log(
LOG_DEBUG,
"kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka ->subscription->cnt);
574 kafka -> mode = RD_KAFKA_CONSUMER ;
598 event -> event_string = NULL ;
599 event -> event_files_to_delete = NULL ;
600 event -> from_topic = NULL ;
601 event -> rd_kafka_headers = NULL ;
602 event -> received_headers = NULL ;
603 event -> schema_id = schema_id ;
605 event -> parent_table = NULL ;
624 if( !event -> rd_kafka_headers )
626 event -> rd_kafka_headers = rd_kafka_headers_new( count );
627 __n_assert( event -> rd_kafka_headers ,
return FALSE );
631 n_log(
LOG_ERR ,
"event headers already allocated for event %p" , event );
649 __n_assert( event -> rd_kafka_headers,
return FALSE );
655 n_log(
LOG_ERR ,
"could not add null key in event %p headers" , event );
658 if( value_length < 1 )
660 n_log(
LOG_ERR ,
"could not add null value for key '%s' in event %p headers" , key , event);
663 rd_kafka_resp_err_t err = rd_kafka_header_add( event -> rd_kafka_headers , key , key_length , value , value_length );
666 n_log(
LOG_ERR ,
"failed to add header [%s:%d=%s:%d] to event %p: %s" , key , key_length , value , value_length , event , rd_kafka_err2str( err ) );
689 return n_kafka_add_header_ex( event , key -> data , key -> written , value -> data , value -> written );
704 event -> parent_table = kafka ;
707 kafka -> nb_queued ++ ;
709 unlock( kafka -> rwlock );
712 n_log(
LOG_DEBUG ,
"successfully enqueued event %p in producer %p waitlist, topic: %s" , event , kafka -> rd_kafka_handle , kafka -> topic );
726 rd_kafka_resp_err_t err = 0 ;
728 char *event_string = NULL ;
729 size_t event_length = -1 ;
731 event -> parent_table = kafka ;
733 event_string =
event -> event_string -> data ;
734 event_length =
event -> event_string -> length ;
736 if( event -> rd_kafka_headers )
738 rd_kafka_headers_t *hdrs_copy;
739 hdrs_copy = rd_kafka_headers_copy( event -> rd_kafka_headers );
741 err = rd_kafka_producev(
742 kafka -> rd_kafka_handle , RD_KAFKA_V_RKT(kafka -> rd_kafka_topic),
743 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
744 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
745 RD_KAFKA_V_VALUE( event_string , event_length ),
746 RD_KAFKA_V_HEADERS(hdrs_copy),
747 RD_KAFKA_V_OPAQUE((
void *)event ),
752 rd_kafka_headers_destroy(hdrs_copy);
754 n_log(
LOG_ERR ,
"failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s" , event , event -> rd_kafka_headers , kafka -> rd_kafka_handle , kafka -> topic , rd_kafka_err2str( err ) );
760 if( rd_kafka_produce( kafka -> rd_kafka_topic , RD_KAFKA_PARTITION_UA , RD_KAFKA_MSG_F_COPY , event_string , event_length , NULL , 0 , event ) == -1 )
764 n_log(
LOG_ERR ,
"failed to produce event: %p, producer: %p, topic: %s, error: %s" , event , kafka -> rd_kafka_handle , kafka -> topic , strerror( error ) );
769 n_log(
LOG_DEBUG ,
"successfully enqueued event %p in local producer %p : %s" , event , kafka -> rd_kafka_handle , kafka -> topic );
789 if( schema_id != -1 )
796 __n_assert( event -> event_string , free( event );
return NULL );
799 size_t length = written + offset ;
800 Malloc( event -> event_string -> data ,
char , length );
801 __n_assert( event -> event_string , free( event -> event_string ); free( event );
return NULL );
802 event -> event_string -> length = length ;
805 memcpy( event -> event_string -> data + offset ,
string , written );
806 event -> event_string -> written = written + offset ;
808 if( schema_id != -1 )
862 if( event -> event_string )
865 if( event -> event_files_to_delete )
866 free_nstr( &event -> event_files_to_delete );
870 if( event -> rd_kafka_headers )
871 rd_kafka_headers_destroy( event -> rd_kafka_headers );
873 if( event -> received_headers )
905 if( kafka -> mode == RD_KAFKA_PRODUCER )
907 int nb_events = rd_kafka_poll( kafka -> rd_kafka_handle , kafka -> poll_interval );
911 LIST_NODE *node = kafka -> events_to_send -> start ;
917 kafka -> nb_waiting -- ;
920 node = node -> next ;
929 kafka -> nb_error++ ;
933 kafka -> nb_waiting ++ ;
934 kafka -> nb_queued -- ;
939 kafka -> nb_error ++ ;
944 node = node -> next ;
946 unlock( kafka -> rwlock );
948 else if( kafka -> mode == RD_KAFKA_CONSUMER )
950 rd_kafka_message_t *rkm = NULL ;
951 while( ( rkm = rd_kafka_consumer_poll(kafka -> rd_kafka_handle , kafka -> poll_interval) ) )
957 n_log(
LOG_ERR ,
"consumer: %s", rd_kafka_message_errstr(rkm));
963 n_log(
LOG_DEBUG ,
"Message on %s [%" PRId32
"] at offset %" PRId64
" (leader epoch %" PRId32
")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
966 if (rkm->key && rkm -> key_len > 0 )
967 n_log(
LOG_DEBUG ,
"Key: %.*s", (
int)rkm->key_len, (
const char *)rkm->key);
971 if( rkm -> payload && rkm -> len > 0)
977 event -> parent_table = kafka ;
979 rd_kafka_headers_t *hdrs = NULL ;
980 if( !rd_kafka_message_headers( rkm , &hdrs ) )
983 const char *name = NULL ;
984 const void *val = NULL ;
988 while( !rd_kafka_header_get_all(hdrs, idx, &name, &val, &size) )
990 N_STR *header_entry = NULL ;
997 event -> from_topic = strdup( rd_kafka_topic_name( rkm -> rkt ) );
998 if( kafka -> schema_id != -1 )
1001 n_log(
LOG_DEBUG ,
"Consumer received event of (%d bytes) from topic %s", (
int)rkm->len , event -> from_topic);
1002 unlock( kafka -> rwlock );
1004 rd_kafka_message_destroy( rkm );
1007 if( kafka -> nb_error > 0 )
1029 if( kafka -> mode == RD_KAFKA_PRODUCER )
1030 n_log(
LOG_DEBUG ,
"starting pooling thread for kafka handler %p mode PRODUCER (%d) topic %s" , kafka -> rd_kafka_handle ,RD_KAFKA_PRODUCER, kafka -> topic );
1031 if( kafka -> mode == RD_KAFKA_CONSUMER )
1033 char *topiclist =
join( kafka -> topics ,
"," );
1034 n_log(
LOG_DEBUG ,
"starting pooling thread for kafka handler %p mode CONSUMER (%d) topic %s" , kafka -> rd_kafka_handle , RD_KAFKA_CONSUMER,
_str( topiclist ) );
1040 int64_t remaining_time = kafka -> poll_timeout * 1000 ;
1041 while( status == 1 )
1046 if( kafka -> mode == RD_KAFKA_PRODUCER )
1048 n_log(
LOG_ERR ,
"failed to poll kafka producer handle %p with topic %s" , kafka -> rd_kafka_handle , rd_kafka_topic_name( kafka -> rd_kafka_topic ) );
1050 else if( kafka -> topics )
1052 char *topiclist =
join( kafka -> topics ,
"," );
1053 n_log(
LOG_ERR ,
"failed to poll kafka consumer handle %p with topic %s" , kafka -> rd_kafka_handle ,
_str( topiclist ) );
1059 status = kafka -> pooling_thread_status ;
1060 unlock( kafka -> rwlock );
1065 int64_t elapsed_time =
get_usec( &chrono );
1066 if( kafka -> poll_timeout != -1 )
1068 remaining_time -= elapsed_time ;
1069 if( remaining_time < 0 )
1071 if( kafka -> mode == RD_KAFKA_PRODUCER )
1073 n_log(
LOG_DEBUG ,
"timeouted on kafka handle %p" , kafka -> rd_kafka_handle );
1075 else if( kafka -> mode == RD_KAFKA_CONSUMER )
1077 n_log(
LOG_DEBUG ,
"timeouted on kafka handle %p" , kafka -> rd_kafka_handle );
1079 remaining_time = 0 ;
1088 kafka -> pooling_thread_status = 0;
1089 unlock( kafka -> rwlock );
1091 n_log(
LOG_DEBUG ,
"exiting pooling thread for kafka handler %p mode %s" , kafka -> rd_kafka_handle , (kafka->
mode==RD_KAFKA_PRODUCER)?
"PRODUCER":
"CONSUMER" );
1092 pthread_exit( NULL );
1108 int status = kafka -> pooling_thread_status ;
1109 unlock( kafka -> rwlock );
1113 n_log(
LOG_ERR,
"kafka pooling thread already started for handle %p", kafka );
1118 kafka -> pooling_thread_status = 1 ;
1122 n_log(
LOG_ERR,
"unable to create pooling_thread for kafka handle %p" , kafka );
1123 unlock( kafka -> rwlock );
1126 unlock( kafka -> rwlock );
1128 n_log(
LOG_DEBUG ,
"pthread_create sucess for kafka handle %p->%p" , kafka , kafka -> rd_kafka_handle );
1144 int pooling_thread_status = kafka -> pooling_thread_status ;
1145 unlock( kafka -> rwlock );
1147 if( pooling_thread_status == 0 )
1149 n_log(
LOG_DEBUG ,
"kafka pooling thread already stopped for handle %p", kafka );
1152 if( pooling_thread_status == 2 )
1154 n_log(
LOG_DEBUG ,
"kafka pooling ask for stop thread already done for handle %p", kafka );
1159 kafka -> pooling_thread_status = 2 ;
1160 unlock( kafka -> rwlock );
1162 pthread_join( kafka -> pooling_thread , NULL );
1182 status = kafka -> pooling_thread_status ;
1183 nb_todump = kafka -> nb_queued + kafka -> nb_waiting + kafka -> nb_error ;
1186 n_log(
LOG_ERR ,
"kafka handle %p thread pooling func is still running, aborting dump" , kafka );
1187 unlock( kafka -> rwlock );
1190 if( nb_todump == 0 )
1192 n_log(
LOG_DEBUG ,
"kafka handle %p: nothing to dump, all events processed correctly" , kafka );
1193 unlock( kafka -> rwlock );
1204 if( event -> schema_id != -1 )
1207 N_STR *filename = NULL ;
1208 nstrprintf( filename ,
"%s/%s+%p" , directory , kafka -> topic ,event);
1211 dumpstr -> data =
event -> event_string -> data + offset ;
1212 dumpstr -> written =
event -> event_string -> written - offset ;
1213 dumpstr -> length =
event -> event_string -> length ;
1218 unlock( kafka -> rwlock );
1219 dumpstr -> data = NULL ;
1239 unlock( kafka -> rwlock );
1258 if( kafka -> received_events -> start )
1260 unlock( kafka -> rwlock );
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
#define FreeNoLog(__ptr)
Free Handler without log.
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
int get_computer_name(char *computer_name, size_t len)
abort program with a text
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
#define Free(__ptr)
Free Handler to get errors.
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
int list_destroy(LIST **list)
Empty and Free a list container.
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Structure of a generic list node.
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
#define LOG_INFO
informational
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
#define N_KAFKA_EVENT_OK
state of an OK event
#define N_KAFKA_EVENT_ERROR
state of an errored event
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
int n_kafka_get_status(N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
return the queues status
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
N_KAFKA * n_kafka_new(int64_t poll_interval, int64_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
#define N_KAFKA_EVENT_QUEUED
state of a queued event
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
int split_count(char **split_result)
Count split elements.
#define nstrprintf(__nstr_var,...)
Macro to quickly allocate and sprintf to N_STR *.
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
char * join(char **splitresult, char *delim)
join the array into a string
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
int free_split_result(char ***tab)
Free a split result allocated array.
A box including a string and his lenght.
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
base64 encoding and decoding functions using N_STR
Common headers and low-level hugly functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
kafka generic produce and consume event header