Nilorea Library
C utilities for networking, threading, graphics
n_kafka.h
Go to the documentation of this file.
1
8#ifndef __N_KAFKA
9#define __N_KAFKA
10
11#ifdef __cplusplus
12extern "C"
13{
14#endif
15
21#include "nilorea/n_log.h"
22#include "nilorea/n_network.h"
23#include "nilorea/n_network.h"
24#include "cJSON.h"
25
26#include <stdio.h>
27#include <stdlib.h>
28#include <string.h>
29#include <locale.h>
30#include <libgen.h>
31#include <errno.h>
32#include <unistd.h>
33
34#include "rdkafka.h"
35
37#define N_KAFKA_EVENT_QUEUED 0
39#define N_KAFKA_EVENT_WAITING_ACK 1
41#define N_KAFKA_EVENT_ERROR 2
43#define N_KAFKA_EVENT_OK 4
45#define N_KAFKA_EVENT_CREATED 5
46
48 typedef struct N_KAFKA_EVENT
49 {
55 char *from_topic ;
57 unsigned int status ;
59 rd_kafka_headers_t *rd_kafka_headers ;
67
69 typedef struct N_KAFKA
70 {
76 char *groupid;
77 /* list of topics to subscribe to */
78 char **topics;
79 /* subscribed topics */
80 rd_kafka_topic_partition_list_t *subscription;
82 rd_kafka_conf_t *rd_kafka_conf ;
84 rd_kafka_t* rd_kafka_handle ;
86 char *topic ;
90 rd_kafka_topic_t* rd_kafka_topic ;
96 pthread_rwlock_t rwlock ;
98 int mode ;
102 int64_t poll_timeout ;
108 pthread_t pooling_thread;
109 /* number of waiting events in the producer waiting list */
110 size_t nb_queued ;
111 /* number of events waiting for an ack in the waiting list */
112 size_t nb_waiting ;
113 /* number of errored events */
114 size_t nb_error ;
115 } N_KAFKA ;
116
117 int32_t n_kafka_get_schema_from_char( char *string );
118 int32_t n_kafka_get_schema_from_nstr( N_STR *string );
119 int n_kafka_put_schema_in_char( char *string , int schema_id );
120 int n_kafka_put_schema_in_nstr( N_STR *string , int schema_id );
121
122 int n_kafka_get_status( N_KAFKA *kafka , int *nb_queued , int *nb_waiting , int *nb_error );
123 void n_kafka_delete( N_KAFKA *kafka );
124 N_KAFKA *n_kafka_new( int64_t poll_timeout , int64_t poll_interval , size_t errstr_len );
125 N_KAFKA *n_kafka_load_config( char *config_file , int mode );
126
127 int n_kafka_new_headers( N_KAFKA_EVENT *event , size_t count );
128 int n_kafka_add_header_ex( N_KAFKA_EVENT *event , char *key , size_t key_length , char *value , size_t value_length );
129 int n_kafka_add_header( N_KAFKA_EVENT *event , N_STR *key , N_STR *value );
130
131 int n_kafka_produce( N_KAFKA *kafka , N_KAFKA_EVENT *event );
132
133 N_KAFKA_EVENT *n_kafka_new_event(int schema_id );
134 N_KAFKA_EVENT *n_kafka_new_event_from_char( char *string , size_t written , int schema_id );
135 N_KAFKA_EVENT *n_kafka_new_event_from_string( N_STR *string , int schema_id );
136 N_KAFKA_EVENT *n_kafka_new_event_from_file( char *filename , int schema_id );
137 void n_kafka_event_destroy_ptr( void *event );
139
140 int n_kafka_poll( N_KAFKA *kafka );
143
144 int n_kafka_dump_unprocessed( N_KAFKA *kafka , char *directory );
145 int n_kafka_load_unprocessed( N_KAFKA *kafka , char *directory );
146
148
153#ifdef __cplusplus
154}
155#endif
156
157#endif // header guard
158
Structure of a generic LIST container.
Definition: n_list.h:45
cJSON * configuration
kafka json configuration holder
Definition: n_kafka.h:100
pthread_t pooling_thread
pooling thread id
Definition: n_kafka.h:108
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition: n_kafka.h:98
int64_t poll_timeout
poll timeout in usecs
Definition: n_kafka.h:102
int schema_id
kafka schema_id
Definition: n_kafka.h:63
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition: n_kafka.h:90
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition: n_kafka.h:59
pthread_rwlock_t rwlock
access lock
Definition: n_kafka.h:96
N_STR * errstr
kafka error string holder
Definition: n_kafka.h:92
int64_t poll_interval
poll interval in usecs
Definition: n_kafka.h:104
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition: n_kafka.h:106
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition: n_kafka.h:82
LIST * received_events
list of received N_KAFKA_EVENT
Definition: n_kafka.h:74
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition: n_kafka.h:72
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition: n_kafka.h:84
LIST * received_headers
kafka consume event headers structure handle
Definition: n_kafka.h:61
N_STR * event_string
string containing the topic id + payload
Definition: n_kafka.h:51
char * groupid
consumer group id
Definition: n_kafka.h:76
N_STR * event_files_to_delete
string containing the original event source file name if it is to be deleted when event is produced.
Definition: n_kafka.h:53
int schema_id
kafka schema id in network order
Definition: n_kafka.h:94
struct N_KAFKA * parent_table
access lock
Definition: n_kafka.h:65
char * bootstrap_servers
kafka bootstrap servers string
Definition: n_kafka.h:88
char * from_topic
in case of received event, else NULL
Definition: n_kafka.h:55
unsigned int status
state of the event: N_KAFKA_EVENT_CREATED ,N_KAFKA_EVENT_QUEUED , N_KAFKA_EVENT_WAITING_ACK ,...
Definition: n_kafka.h:57
char * topic
kafka topic string
Definition: n_kafka.h:86
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition: n_kafka.c:646
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition: n_kafka.c:1174
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition: n_kafka.c:1251
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition: n_kafka.c:617
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition: n_kafka.c:19
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition: n_kafka.c:61
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition: n_kafka.c:297
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition: n_kafka.c:47
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition: n_kafka.c:899
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition: n_kafka.c:885
void n_kafka_event_destroy_ptr(void *event)
festroy a kafka event
Definition: n_kafka.c:856
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
Definition: n_kafka.c:1231
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition: n_kafka.c:681
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
Definition: n_kafka.c:1139
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition: n_kafka.c:699
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition: n_kafka.c:784
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition: n_kafka.c:33
int n_kafka_get_status(N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
return the queues status
Definition: n_kafka.c:78
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition: n_kafka.c:1103
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition: n_kafka.c:592
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition: n_kafka.c:841
N_KAFKA * n_kafka_new(int64_t poll_timeout, int64_t poll_interval, size_t errstr_len)
allocate a new kafka handle
Definition: n_kafka.c:246
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition: n_kafka.c:185
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition: n_kafka.c:824
structure of a KAFKA consumer or producer handle
Definition: n_kafka.h:70
structure of a KAFKA message
Definition: n_kafka.h:49
A box including a string and his lenght.
Definition: n_str.h:173
Generic log system.
Network Engine.