Nilorea Library
C utilities for networking, threading, graphics
KAFKA: generic event producer and consumer functions

Data Structures

struct  N_KAFKA
 structure of a KAFKA consumer or producer handle More...
 
struct  N_KAFKA_EVENT
 structure of a KAFKA message More...
 

Macros

#define N_KAFKA_EVENT_CREATED   5
 state of a freshly created event More...
 
#define N_KAFKA_EVENT_ERROR   2
 state of an errored event More...
 
#define N_KAFKA_EVENT_OK   4
 state of an OK event More...
 
#define N_KAFKA_EVENT_QUEUED   0
 state of a queued event More...
 
#define N_KAFKA_EVENT_WAITING_ACK   1
 state of a sent event waiting for acknowledgement More...
 

Functions

int n_kafka_add_header (N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
 add a header entry to an event. More...
 
int n_kafka_add_header_ex (N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
 add a header entry to an event. More...
 
void n_kafka_delete (N_KAFKA *kafka)
 delete a N_KAFKA handle More...
 
int n_kafka_dump_unprocessed (N_KAFKA *kafka, char *directory)
 dump unprocessed/unset events More...
 
int n_kafka_event_destroy (N_KAFKA_EVENT **event)
 destroy a kafka event and set it's pointer to NULL More...
 
void n_kafka_event_destroy_ptr (void *event)
 festroy a kafka event More...
 
N_KAFKA_EVENTn_kafka_get_event (N_KAFKA *kafka)
 get a received event from the N_KAFKA kafka handle More...
 
int32_t n_kafka_get_schema_from_char (char *string)
 get a schema from the first 4 bytes of a char *string, returning ntohl of the value More...
 
int32_t n_kafka_get_schema_from_nstr (N_STR *string)
 get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value More...
 
int n_kafka_get_status (N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
 return the queues status More...
 
N_KAFKAn_kafka_load_config (char *config_file, int mode)
 load a kafka configuration from a file More...
 
int n_kafka_load_unprocessed (N_KAFKA *kafka, char *directory)
 load unprocessed/unset events More...
 
N_KAFKAn_kafka_new (int64_t poll_timeout, int64_t poll_interval, size_t errstr_len)
 allocate a new kafka handle More...
 
N_KAFKA_EVENTn_kafka_new_event (int schema_id)
 create a new empty event More...
 
N_KAFKA_EVENTn_kafka_new_event_from_char (char *string, size_t written, int schema_id)
 make a new event from a char *string More...
 
N_KAFKA_EVENTn_kafka_new_event_from_file (char *filename, int schema_id)
 make a new event from a N_STR *string More...
 
N_KAFKA_EVENTn_kafka_new_event_from_string (N_STR *string, int schema_id)
 make a new event from a N_STR *string More...
 
int n_kafka_new_headers (N_KAFKA_EVENT *event, size_t count)
 allocate a headers array for the event More...
 
int n_kafka_poll (N_KAFKA *kafka)
 Poll kafka handle in producer or consumer mode. More...
 
int n_kafka_produce (N_KAFKA *kafka, N_KAFKA_EVENT *event)
 put an event in the events_to_send list More...
 
int n_kafka_put_schema_in_char (char *string, int schema_id)
 put a htonl schema id into the first 4 bytes of a char *string More...
 
int n_kafka_put_schema_in_nstr (N_STR *string, int schema_id)
 put a htonl schema id into the first 4 bytes of a N_STR *string More...
 
int n_kafka_start_pooling_thread (N_KAFKA *kafka)
 start the pooling thread of a kafka handle More...
 
int n_kafka_stop_pooling_thread (N_KAFKA *kafka)
 stop the pooling thread of a kafka handle More...
 

Detailed Description


Data Structure Documentation

◆ N_KAFKA

struct N_KAFKA

structure of a KAFKA consumer or producer handle

Definition at line 69 of file n_kafka.h.

+ Collaboration diagram for N_KAFKA:
Data Fields
char * bootstrap_servers kafka bootstrap servers string
cJSON * configuration kafka json configuration holder
N_STR * errstr kafka error string holder
LIST * events_to_send list of N_KAFKA_EVENT to send
char * groupid consumer group id
int mode kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
size_t nb_error
size_t nb_queued
size_t nb_waiting
int64_t poll_interval poll interval in usecs
int64_t poll_timeout poll timeout in usecs
pthread_t pooling_thread pooling thread id
int pooling_thread_status pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting pooling thread
rd_kafka_conf_t * rd_kafka_conf kafka structure handle
rd_kafka_t * rd_kafka_handle kafka handle (producer or consumer)
rd_kafka_topic_t * rd_kafka_topic kafka topic handle
LIST * received_events list of received N_KAFKA_EVENT
pthread_rwlock_t rwlock access lock
int schema_id kafka schema id in network order
rd_kafka_topic_partition_list_t * subscription
char * topic kafka topic string
char ** topics

◆ N_KAFKA_EVENT

struct N_KAFKA_EVENT

structure of a KAFKA message

Definition at line 48 of file n_kafka.h.

+ Collaboration diagram for N_KAFKA_EVENT:
Data Fields
N_STR * event_files_to_delete string containing the original event source file name if it is to be deleted when event is produced.

List separated with ',' else it's NULL

N_STR * event_string string containing the topic id + payload
char * from_topic in case of received event, else NULL
struct N_KAFKA * parent_table access lock
rd_kafka_headers_t * rd_kafka_headers kafka produce event headers structure handle
LIST * received_headers kafka consume event headers structure handle
int schema_id kafka schema_id
unsigned int status state of the event: N_KAFKA_EVENT_CREATED ,N_KAFKA_EVENT_QUEUED , N_KAFKA_EVENT_WAITING_ACK , N_KAFKA_EVENT_ERROR , N_KAFKA_EVENT_OK

Macro Definition Documentation

◆ N_KAFKA_EVENT_CREATED

#define N_KAFKA_EVENT_CREATED   5

state of a freshly created event

Definition at line 45 of file n_kafka.h.

◆ N_KAFKA_EVENT_ERROR

#define N_KAFKA_EVENT_ERROR   2

state of an errored event

Definition at line 41 of file n_kafka.h.

◆ N_KAFKA_EVENT_OK

#define N_KAFKA_EVENT_OK   4

state of an OK event

Definition at line 43 of file n_kafka.h.

◆ N_KAFKA_EVENT_QUEUED

#define N_KAFKA_EVENT_QUEUED   0

state of a queued event

Definition at line 37 of file n_kafka.h.

◆ N_KAFKA_EVENT_WAITING_ACK

#define N_KAFKA_EVENT_WAITING_ACK   1

state of a sent event waiting for acknowledgement

Definition at line 39 of file n_kafka.h.

Function Documentation

◆ n_kafka_add_header()

int n_kafka_add_header ( N_KAFKA_EVENT event,
N_STR key,
N_STR value 
)

add a header entry to an event.

headers array must have been allocated before

Parameters
eventtarget event
keyN_STR *string of the key
valueN_STR *string of the value
Returns
TRUE or FALSE

Definition at line 681 of file n_kafka.c.

References __n_assert, and n_kafka_add_header_ex().

+ Here is the call graph for this function:

◆ n_kafka_add_header_ex()

int n_kafka_add_header_ex ( N_KAFKA_EVENT event,
char *  key,
size_t  key_length,
char *  value,
size_t  value_length 
)

add a header entry to an event.

headers array must have been allocated before

Parameters
eventtarget event
keystring of the key
key_lengthsize of the key string
valuestring of the value
value_lengthsize of the value string
Returns
TRUE or FALSE

Definition at line 646 of file n_kafka.c.

References __n_assert, LOG_ERR, and n_log.

Referenced by n_kafka_add_header().

+ Here is the caller graph for this function:

◆ n_kafka_delete()

void n_kafka_delete ( N_KAFKA kafka)

delete a N_KAFKA handle

Parameters
kafkaN_KAFKA handle to delete

Definition at line 185 of file n_kafka.c.

References __n_assert, Free, free_nstr, FreeNoLog, list_destroy(), LOG_DEBUG, n_kafka_stop_pooling_thread(), n_log, and rw_lock_destroy.

Referenced by n_kafka_load_config(), and n_kafka_new().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_dump_unprocessed()

int n_kafka_dump_unprocessed ( N_KAFKA kafka,
char *  directory 
)

dump unprocessed/unset events

Parameters
kafkakafka handle to use
directorythe directory in which to dump the events
Returns
TRUE or FALSE

Definition at line 1174 of file n_kafka.c.

References __n_assert, _nstr, free_nstr, list_foreach, LOG_DEBUG, LOG_ERR, N_KAFKA_EVENT_OK, n_log, new_nstr(), nstr_to_file(), nstrprintf, read_lock, and unlock.

+ Here is the call graph for this function:

◆ n_kafka_event_destroy()

int n_kafka_event_destroy ( N_KAFKA_EVENT **  event)

destroy a kafka event and set it's pointer to NULL

Parameters
eventevent to delete
Returns
TRUE or FALSE

Definition at line 885 of file n_kafka.c.

References __n_assert, and n_kafka_event_destroy_ptr().

Referenced by n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_event_destroy_ptr()

void n_kafka_event_destroy_ptr ( void *  event_ptr)

festroy a kafka event

Parameters
event_ptrvoid pointer to target

Definition at line 856 of file n_kafka.c.

References __n_assert, free_nstr, FreeNoLog, and list_destroy().

Referenced by n_kafka_event_destroy(), n_kafka_poll(), and n_kafka_produce().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_get_event()

N_KAFKA_EVENT * n_kafka_get_event ( N_KAFKA kafka)

get a received event from the N_KAFKA kafka handle

Parameters
kafkakafka handle to use
Returns
a received event ror NULL

Definition at line 1251 of file n_kafka.c.

References __n_assert, remove_list_node, unlock, and write_lock.

◆ n_kafka_get_schema_from_char()

int32_t n_kafka_get_schema_from_char ( char *  string)

get a schema from the first 4 bytes of a char *string, returning ntohl of the value

Parameters
stringchar *source from where to read schema id
Returns
zero or positive number on success, -1 on error

Definition at line 19 of file n_kafka.c.

References __n_assert.

Referenced by n_kafka_get_schema_from_nstr().

+ Here is the caller graph for this function:

◆ n_kafka_get_schema_from_nstr()

int32_t n_kafka_get_schema_from_nstr ( N_STR string)

get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value

Parameters
stringN_STR *source from where to read schema id
Returns
zero or positive number on success, -1 on error

Definition at line 33 of file n_kafka.c.

References __n_assert, and n_kafka_get_schema_from_char().

Referenced by n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_get_status()

int n_kafka_get_status ( N_KAFKA kafka,
int *  nb_queued,
int *  nb_waiting,
int *  nb_error 
)

return the queues status

Parameters
kafkahandler to use
nb_queuedpointer to queue number holder
nb_waitingpointer to waiting number holder
nb_errorpointer to error number holder
Returns
TRUE or FALSE

Definition at line 78 of file n_kafka.c.

References __n_assert, read_lock, and unlock.

◆ n_kafka_load_config()

N_KAFKA * n_kafka_load_config ( char *  config_file,
int  mode 
)

load a kafka configuration from a file

Parameters
config_filepath and filename of the config file
modeRD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Returns
an allocated and configured N_KAFKA *kafka handle or NULL

Definition at line 297 of file n_kafka.c.

References __n_assert, _nstr, _nstrp, _str, file_to_nstr(), free_nstr, get_computer_name(), join(), LOG_DEBUG, LOG_ERR, n_kafka_delete(), n_kafka_delivery_message_callback(), n_kafka_new(), n_log, new_nstr(), nstrprintf, split(), and split_count().

+ Here is the call graph for this function:

◆ n_kafka_load_unprocessed()

int n_kafka_load_unprocessed ( N_KAFKA kafka,
char *  directory 
)

load unprocessed/unset events

Parameters
kafkakafka handle to use
directorythe directory from which to load the events
Returns
TRUE or FALSE

Definition at line 1231 of file n_kafka.c.

References __n_assert, unlock, and write_lock.

◆ n_kafka_new()

N_KAFKA * n_kafka_new ( int64_t  poll_interval,
int64_t  poll_timeout,
size_t  errstr_len 
)

allocate a new kafka handle

Parameters
poll_intervalset polling interval
poll_timeoutset polling
errstr_lenset the size of the error string buffer
Returns
a new empty N_KAFKA *kafka handle or NULL

Definition at line 246 of file n_kafka.c.

References __n_assert, Free, init_lock, LOG_ERR, Malloc, n_kafka_delete(), n_log, new_generic_list(), and new_nstr().

Referenced by n_kafka_load_config().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_event()

N_KAFKA_EVENT * n_kafka_new_event ( int  schema_id)

create a new empty event

Parameters
schema_idschema id that the event is using, -1 if not using avro schema_id formatted events
Returns
a new empty N_KAFKA_EVENT *event

Definition at line 592 of file n_kafka.c.

References __n_assert, Malloc, and N_KAFKA_EVENT_CREATED.

Referenced by n_kafka_new_event_from_char().

+ Here is the caller graph for this function:

◆ n_kafka_new_event_from_char()

N_KAFKA_EVENT * n_kafka_new_event_from_char ( char *  string,
size_t  written,
int  schema_id 
)

make a new event from a char *string

Parameters
stringsource string
writtenlenght of the string
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 784 of file n_kafka.c.

References __n_assert, Malloc, N_KAFKA_EVENT_QUEUED, n_kafka_new_event(), and n_kafka_put_schema_in_nstr().

Referenced by n_kafka_new_event_from_string(), and n_kafka_poll().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_event_from_file()

N_KAFKA_EVENT * n_kafka_new_event_from_file ( char *  filename,
int  schema_id 
)

make a new event from a N_STR *string

Parameters
filenamesource file path and filename
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 841 of file n_kafka.c.

References __n_assert, file_to_nstr(), and n_kafka_new_event_from_string().

+ Here is the call graph for this function:

◆ n_kafka_new_event_from_string()

N_KAFKA_EVENT * n_kafka_new_event_from_string ( N_STR string,
int  schema_id 
)

make a new event from a N_STR *string

Parameters
stringsource string
schema_idthe schema id to use. Pass -1 for non avro formatted events
Returns
a new N_KAFKA *event or NULL

Definition at line 824 of file n_kafka.c.

References __n_assert, and n_kafka_new_event_from_char().

Referenced by n_kafka_new_event_from_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_new_headers()

int n_kafka_new_headers ( N_KAFKA_EVENT event,
size_t  count 
)

allocate a headers array for the event

Parameters
eventtarget event
countsize in elements of the created headers array
Returns
TRUE or FALSE

Definition at line 617 of file n_kafka.c.

References __n_assert, LOG_ERR, and n_log.

◆ n_kafka_poll()

int n_kafka_poll ( N_KAFKA kafka)

Poll kafka handle in producer or consumer mode.

Parameters
kafkakafka handle to use
Returns
TRUE or FALSE

Definition at line 899 of file n_kafka.c.

References __n_assert, _str, free_nstr_ptr(), list_push(), LOG_DEBUG, LOG_ERR, n_kafka_event_destroy(), n_kafka_event_destroy_ptr(), N_KAFKA_EVENT_ERROR, N_KAFKA_EVENT_OK, N_KAFKA_EVENT_QUEUED, n_kafka_get_schema_from_nstr(), n_kafka_new_event_from_char(), n_kafka_produce_ex(), n_log, new_generic_list(), nstrprintf, remove_list_node, unlock, and write_lock.

Referenced by n_kafka_pooling_thread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_produce()

int n_kafka_produce ( N_KAFKA kafka,
N_KAFKA_EVENT event 
)

put an event in the events_to_send list

Parameters
kafkathe producer N_KAFKA handle to use
eventevent to send. Will be owned by N_KAFKA *kafka handle once done
Returns
TRUE or FALSE

Definition at line 699 of file n_kafka.c.

References __n_assert, list_push(), LOG_DEBUG, n_kafka_event_destroy_ptr(), n_log, unlock, and write_lock.

+ Here is the call graph for this function:

◆ n_kafka_put_schema_in_char()

int n_kafka_put_schema_in_char ( char *  string,
int  schema_id 
)

put a htonl schema id into the first 4 bytes of a char *string

Parameters
stringchar *destination where to write schema id
schema_idthe schema_id to write
Returns
zero or positive number on success, -1 on error

Definition at line 47 of file n_kafka.c.

References __n_assert.

Referenced by n_kafka_put_schema_in_nstr().

+ Here is the caller graph for this function:

◆ n_kafka_put_schema_in_nstr()

int n_kafka_put_schema_in_nstr ( N_STR string,
int  schema_id 
)

put a htonl schema id into the first 4 bytes of a N_STR *string

Parameters
stringN_STR *destination where to write schema id
schema_idthe schema_id to write
Returns
zero or positive number on success, -1 on error

Definition at line 61 of file n_kafka.c.

References __n_assert, and n_kafka_put_schema_in_char().

Referenced by n_kafka_new_event_from_char().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_start_pooling_thread()

int n_kafka_start_pooling_thread ( N_KAFKA kafka)

start the pooling thread of a kafka handle

Parameters
kafkakafka handle to use
Returns
TRUE or FALSE

Definition at line 1103 of file n_kafka.c.

References __n_assert, LOG_DEBUG, LOG_ERR, n_kafka_pooling_thread(), n_log, read_lock, unlock, and write_lock.

+ Here is the call graph for this function:

◆ n_kafka_stop_pooling_thread()

int n_kafka_stop_pooling_thread ( N_KAFKA kafka)

stop the pooling thread of a kafka handle

Parameters
kafkatarget kafka handle
Returns
TRUE or FALSE

Definition at line 1139 of file n_kafka.c.

References __n_assert, LOG_DEBUG, n_log, read_lock, unlock, and write_lock.

Referenced by n_kafka_delete().

+ Here is the caller graph for this function: