Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
n_kafka.h
Go to the documentation of this file.
1
8#ifndef __N_KAFKA
9#define __N_KAFKA
10
11#ifdef __cplusplus
12extern "C" {
13#endif
14
20#include "nilorea/n_log.h"
21#include "nilorea/n_network.h"
22#include "nilorea/n_network.h"
23#include "cJSON.h"
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <locale.h>
29#include <libgen.h>
30#include <errno.h>
31#include <unistd.h>
32
33#include "rdkafka.h"
34
36#define N_KAFKA_EVENT_QUEUED 0
38#define N_KAFKA_EVENT_WAITING_ACK 1
40#define N_KAFKA_EVENT_ERROR 2
42#define N_KAFKA_EVENT_OK 4
44#define N_KAFKA_EVENT_CREATED 5
45
65
67typedef struct N_KAFKA {
73 char* groupid;
74 /* list of topics to subscribe to */
75 char** topics;
76 /* subscribed topics */
77 rd_kafka_topic_partition_list_t* subscription;
79 rd_kafka_conf_t* rd_kafka_conf;
81 rd_kafka_t* rd_kafka_handle;
83 char* topic;
87 rd_kafka_topic_t* rd_kafka_topic;
93 pthread_rwlock_t rwlock;
95 int mode;
99 int32_t poll_timeout;
105 pthread_t pooling_thread;
106 /* number of waiting events in the producer waiting list */
107 size_t nb_queued;
108 /* number of events waiting for an ack in the waiting list */
109 size_t nb_waiting;
110 /* number of errored events */
111 size_t nb_error;
112} N_KAFKA;
113
114int32_t n_kafka_get_schema_from_char(char* string);
115int32_t n_kafka_get_schema_from_nstr(N_STR* string);
116int n_kafka_put_schema_in_char(char* string, int schema_id);
117int n_kafka_put_schema_in_nstr(N_STR* string, int schema_id);
118
119int n_kafka_get_status(N_KAFKA* kafka, size_t* nb_queued, size_t* nb_waiting, size_t* nb_error);
120void n_kafka_delete(N_KAFKA* kafka);
121N_KAFKA* n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len);
122N_KAFKA* n_kafka_load_config(char* config_file, int mode);
123
124int n_kafka_new_headers(N_KAFKA_EVENT* event, size_t count);
125int n_kafka_add_header_ex(N_KAFKA_EVENT* event, char* key, size_t key_length, char* value, size_t value_length);
126int n_kafka_add_header(N_KAFKA_EVENT* event, N_STR* key, N_STR* value);
127
128int n_kafka_produce(N_KAFKA* kafka, N_KAFKA_EVENT* event);
129
130N_KAFKA_EVENT* n_kafka_new_event(int schema_id);
131N_KAFKA_EVENT* n_kafka_new_event_from_char(char* string, size_t written, int schema_id);
132N_KAFKA_EVENT* n_kafka_new_event_from_string(N_STR* string, int schema_id);
133N_KAFKA_EVENT* n_kafka_new_event_from_file(char* filename, int schema_id);
134void n_kafka_event_destroy_ptr(void* event);
136
137int n_kafka_poll(N_KAFKA* kafka);
140
141int n_kafka_dump_unprocessed(N_KAFKA* kafka, char* directory);
142int n_kafka_load_unprocessed(N_KAFKA* kafka, char* directory);
143
145
150#ifdef __cplusplus
151}
152#endif
153
154#endif // header guard
Structure of a generic LIST container.
Definition n_list.h:39
int32_t poll_interval
poll interval in usecs
Definition n_kafka.h:101
cJSON * configuration
kafka json configuration holder
Definition n_kafka.h:97
pthread_t pooling_thread
pooling thread id
Definition n_kafka.h:105
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition n_kafka.h:95
int schema_id
kafka schema_id
Definition n_kafka.h:61
rd_kafka_topic_t * rd_kafka_topic
kafka topic handle
Definition n_kafka.h:87
rd_kafka_headers_t * rd_kafka_headers
kafka produce event headers structure handle
Definition n_kafka.h:57
pthread_rwlock_t rwlock
access lock
Definition n_kafka.h:93
N_STR * errstr
kafka error string holder
Definition n_kafka.h:89
int pooling_thread_status
pooling thread status, 0 => off , 1 => on , 2 => wants to stop, will be turned out to 0 by exiting po...
Definition n_kafka.h:103
rd_kafka_conf_t * rd_kafka_conf
kafka structure handle
Definition n_kafka.h:79
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:71
LIST * events_to_send
list of N_KAFKA_EVENT to send
Definition n_kafka.h:69
int32_t poll_timeout
poll timeout in usecs
Definition n_kafka.h:99
rd_kafka_t * rd_kafka_handle
kafka handle (producer or consumer)
Definition n_kafka.h:81
LIST * received_headers
kafka consume event headers structure handle
Definition n_kafka.h:59
N_STR * event_string
string containing the topic id + payload
Definition n_kafka.h:49
char * groupid
consumer group id
Definition n_kafka.h:73
N_STR * event_files_to_delete
string containing the original event source file name if it is to be deleted when event is produced.
Definition n_kafka.h:51
int schema_id
kafka schema id in network order
Definition n_kafka.h:91
struct N_KAFKA * parent_table
access lock
Definition n_kafka.h:63
char * bootstrap_servers
kafka bootstrap servers string
Definition n_kafka.h:85
char * from_topic
in case of received event, else NULL
Definition n_kafka.h:53
unsigned int status
state of the event: N_KAFKA_EVENT_CREATED ,N_KAFKA_EVENT_QUEUED , N_KAFKA_EVENT_WAITING_ACK ,...
Definition n_kafka.h:55
char * topic
kafka topic string
Definition n_kafka.h:83
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition n_kafka.c:559
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1024
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1092
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition n_kafka.c:534
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition n_kafka.c:17
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition n_kafka.c:57
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:262
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:72
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition n_kafka.c:44
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition n_kafka.c:791
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:779
void n_kafka_event_destroy_ptr(void *event)
festroy a kafka event
Definition n_kafka.c:751
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
Definition n_kafka.c:1075
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition n_kafka.c:593
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
Definition n_kafka.c:993
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:609
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:685
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition n_kafka.c:31
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition n_kafka.c:961
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition n_kafka.c:511
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:738
N_KAFKA * n_kafka_new(int32_t poll_timeout, int32_t poll_interval, size_t errstr_len)
allocate a new kafka handle
Definition n_kafka.c:214
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:159
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:723
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:67
structure of a KAFKA message
Definition n_kafka.h:47
A box including a string and his lenght.
Definition n_str.h:39
Generic log system.
Network Engine.