Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
ex_kafka.c
Go to the documentation of this file.
1
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <locale.h>
13#include <libgen.h>
14#include <errno.h>
15
16#include "nilorea/n_log.h"
17#include "nilorea/n_network.h"
18#include "nilorea/n_kafka.h"
19#include "cJSON.h"
20
21#include "rdkafka.h"
22
23#define OK 0
24#define ERROR -1
25#define NB_TEST_EVENTS 10
26
27char *config_file = NULL,
28 *event_string = NULL,
29 *event_file = NULL,
31 *log_prefix = NULL;
32
33int log_level = LOG_ERR, /* default log level */
34 getoptret = 0, /* getopt return value */
37 run = 1;
38
39// help func
40void usage(void) {
41 // TODO: add -F unprocessed event file
42 fprintf(stderr,
43 "Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
44 " -v version: print version and exit\n"
45 " -c config_file: [required] Kproducer config file\n"
46 " -s : string of the event to send\n"
47 " -f : file containing the event to send\n"
48 " -C : start a consumer (default output received in terminal)"
49 " -P : start a producer and produce event"
50 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
51 " -p : optionnal, set a log prefix\n"
52 " -V verbosity: specify a log level for console output\n"
53 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n");
54}
55
56// stop handler
57static void stop(int sig) {
58 (void)sig;
59 run = 0;
60}
61
62int main(int argc, char* argv[]) {
63 /* Signal handler for clean shutdown */
64 signal(SIGINT, stop);
65
66 /* temporary header structure */
67 rd_kafka_headers_t* headers = NULL;
68
69 /* Analysing arguments */
70 while ((getoptret = getopt(argc, argv, "vhCPH:c:s:f:V:o:p:")) != -1) {
71 switch (getoptret) {
72 case 'v':
73 fprintf(stderr, " Version compiled on %s at %s\n", __DATE__, __TIME__);
74 exit(TRUE);
75 case 'h':
76 usage();
77 exit(0);
78 break;
79 case 'C':
80 if (KAFKA_MODE == -1) {
81 KAFKA_MODE = RD_KAFKA_CONSUMER;
82 } else {
83 fprintf(stderr, "-C and -P can not be used at the ame time!");
84 exit(TRUE);
85 }
86 break;
87 case 'P':
88 if (KAFKA_MODE == -1) {
89 KAFKA_MODE = RD_KAFKA_PRODUCER;
90 } else {
91 fprintf(stderr, "-C and -P can not be used at the ame time!");
92 exit(TRUE);
93 }
94 break;
95 case 'H': {
96 char *name = NULL, *val = NULL;
97 size_t name_sz = -1;
98
99 name = optarg;
100 val = strchr(name, '=');
101 if (val) {
102 name_sz = (size_t)(val - name);
103 val++; /* past the '=' */
104 }
105
106 if (!headers)
107 headers = rd_kafka_headers_new(16);
108
109 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
110 if (err) {
111 fprintf(stderr,
112 "%% Failed to add header %s: %s\n",
113 name, rd_kafka_err2str(err));
114 exit(1);
115 }
116 } break;
117 case 'c':
118 config_file = local_strdup(optarg);
119 break;
120 case 's':
121 Malloc(event_string, char, strlen(optarg) + 1);
122 strcpy(event_string, optarg);
123 break;
124 case 'f':
125 event_file = local_strdup(optarg);
126 break;
127 case 'o':
129 break;
130 case 'p':
131 log_prefix = local_strdup(optarg);
132 break;
133 case 'V':
134 if (!strncmp("LOG_NULL", optarg, 8)) {
136 } else if (!strncmp("LOG_NOTICE", optarg, 10)) {
138 } else if (!strncmp("LOG_INFO", optarg, 8)) {
140 } else if (!strncmp("LOG_ERR", optarg, 7)) {
142 } else if (!strncmp("LOG_DEBUG", optarg, 9)) {
144 } else {
145 fprintf(stderr, "%s n'est pas un niveau de log valide.\n", optarg);
146 exit(-1);
147 }
148 break;
149 case '?':
150 if (optopt == 'c' || optopt == 's' || optopt == 'f') {
151 fprintf(stderr, "Option -%c need a parameter\n", optopt);
152 exit(FALSE);
153 }
155 default:
156 usage();
157 exit(-1);
158 break;
159 }
160 }
161
162 if (KAFKA_MODE == -1) {
163 n_log(LOG_ERR, "consumer (-C) or producer (-P) mode is not defined !", log_prefix);
164 exit(1);
165 }
166
167 if (!log_prefix) {
168 log_prefix = strdup("");
169 }
170
172 if (event_log_file) {
173 int log_file_ret = set_log_file(event_log_file);
174 n_log(LOG_DEBUG, "%s log to file: %s , %d , %p", log_prefix, event_log_file, log_file_ret, get_log_file());
175 }
176
177 /* testing parameters */
178 if (!config_file) {
179 n_log(LOG_ERR, "%s parameter config_file needs to be set !", log_prefix);
180 exit(1);
181 }
182
183 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
184 if (!event_string && !event_file) {
185 n_log(LOG_ERR, "%s one of (event_string|event_file) needs to be set !", log_prefix);
186 exit(1);
187 }
188 }
189
190 if (event_string && event_file) {
191 n_log(LOG_ERR, "%s do not define event_string AND event_file, only one needs to be set !", log_prefix);
192 exit(1);
193 }
194
195 // load kafka config file
196 unsigned int exit_code = 0;
198 __n_assert(kafka_handle, n_log(LOG_ERR, "kafka handke is NULL !!"); exit(1));
199
200 n_kafka_start_pooling_thread(kafka_handle);
201
202 size_t nb_queued = 0;
203 size_t nb_waiting = 0;
204 size_t nb_error = 0;
205 int poll_status = 1;
206 N_KAFKA_EVENT* event = NULL;
207
208 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
209 // create a new kafka event from a string or from a file
210 if (event_string) {
211 event = n_kafka_new_event_from_char(event_string, strlen(event_string), kafka_handle->schema_id);
212 }
213 if (event_file) {
214 event = n_kafka_new_event_from_file(event_file, kafka_handle->schema_id);
215 }
216 // set headers if any
217 if (headers) {
218 event->rd_kafka_headers = rd_kafka_headers_copy(headers);
219 // clean them if no more needed
220 rd_kafka_headers_destroy(headers);
221 }
222 // produce the event, API is charging itself of destroying it
223 if (n_kafka_produce(kafka_handle, event) == FALSE) {
224 n_log(LOG_ERR, "n_kafka_produce returned an error for event %p", event);
225 } else {
226 n_log(LOG_INFO, "n_kafka_produce returned OK for event %p", event);
227 }
228 }
229
230 // loop on pool
231 do {
232 poll_status = n_kafka_get_status(kafka_handle, &nb_queued, &nb_waiting, &nb_error);
233 n_log(LOG_DEBUG, "polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error", poll_status, nb_queued, nb_waiting, nb_error);
234
235 // if we were waiting only for producing elemens we could use a test like this to break out of the loop
236 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
237 usleep(30000);
238 if (nb_queued == 0 && nb_waiting == 0 && nb_error == 0)
239 break;
240 } else {
241 event = n_kafka_get_event(kafka_handle);
242 if (event) {
243 if (kafka_handle->schema_id != -1)
244 n_log(LOG_INFO, "received event schema id %d string:\n%s", event->schema_id, _str(event->event_string->data + 4));
245 else
246 n_log(LOG_INFO, "received event string:\n%s", event->event_string->data);
247
248 list_foreach(node, event->received_headers) {
249 N_STR* header = (N_STR*)node->ptr;
250 n_log(LOG_INFO, "headers: %s", _nstr(header));
251 }
252 n_kafka_event_destroy(&event);
253 } else {
254 usleep(30000);
255 }
256 }
257 } while (run && poll_status > 0);
258
259 n_log(LOG_INFO, "kafka_handle: %d queued, %d waiting ack, %d on error", nb_queued, nb_waiting, nb_error);
260 if (nb_error > 0 || nb_waiting > 0) {
261 n_log(LOG_ERR, "kafka_handle: %d events are still waiting for ack, and %d are on error !", nb_waiting, nb_error);
262 n_kafka_dump_unprocessed(kafka_handle, "DATAS/kafka/unprocessed");
263 }
264
265 // log unprocessed events
266 list_foreach(node, kafka_handle->received_events) {
267 if (node) {
268 if (kafka_handle->schema_id != -1)
269 n_log(LOG_INFO, "[unprocessed]received event schema id %d string:\n%s", event->schema_id, event->event_string->data + 4);
270 else
271 n_log(LOG_INFO, "[unprocessed]received event string:\n%s", event->event_string->data);
272 }
273 }
274
275 // closing kafka handle
276 n_kafka_delete(kafka_handle);
277
278 exit(exit_code);
279}
void usage(void)
Definition ex_common.c:22
int main(void)
int getoptret
Definition ex_fluid.c:42
int log_level
Definition ex_fluid.c:43
int PRODUCER_MODE
Definition ex_kafka.c:36
static void stop(int sig)
Definition ex_kafka.c:57
char * log_prefix
Definition ex_kafka.c:31
int KAFKA_MODE
Definition ex_kafka.c:35
char * event_log_file
Definition ex_kafka.c:30
char * event_file
Definition ex_kafka.c:29
char * event_string
Definition ex_kafka.c:28
char * config_file
Definition ex_kafka.c:27
int run
Definition ex_kafka.c:37
#define FALL_THROUGH
set windows if true
Definition n_common.h:53
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:185
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:256
#define _str(__PTR)
define true
Definition n_common.h:174
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:180
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition n_list.h:66
FILE * get_log_file(void)
return the current log_file
Definition n_log.c:174
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:70
#define LOG_DEBUG
debug-level messages
Definition n_log.h:65
#define LOG_ERR
error conditions
Definition n_log.h:57
int set_log_file(char *file)
Set the logging to a file instead of stderr.
Definition n_log.c:151
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition n_log.c:104
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:61
#define LOG_NULL
no log output
Definition n_log.h:27
#define LOG_INFO
informational
Definition n_log.h:63
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:72
int schema_id
kafka schema id in network order
Definition n_kafka.h:92
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1025
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1093
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:263
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:73
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:780
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:610
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:686
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition n_kafka.c:962
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:739
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:160
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:68
structure of a KAFKA message
Definition n_kafka.h:48
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
Definition n_str.h:56
A box including a string and his lenght.
Definition n_str.h:39
Kafka generic produce and consume event header.
Generic log system.
Network Engine.