Nilorea Library
C utilities for networking, threading, graphics
Loading...
Searching...
No Matches
ex_kafka.c
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4#include <locale.h>
5#include <libgen.h>
6#include <errno.h>
7
8#include "nilorea/n_log.h"
9#include "nilorea/n_network.h"
10#include "nilorea/n_kafka.h"
11#include "cJSON.h"
12
13#include "rdkafka.h"
14
15#define OK 0
16#define ERROR -1
17#define NB_TEST_EVENTS 10
18
19char *config_file = NULL,
20 *event_string = NULL,
21 *event_file = NULL,
22 *event_log_file = NULL,
23 *log_prefix = NULL;
24
25int log_level = LOG_ERR, /* default log level */
26 getoptret = 0, /* getopt return value */
27 KAFKA_MODE = -1,
28 PRODUCER_MODE = 0,
29 run = 1;
30
31// help func
32void usage(void) {
33 // TODO: add -F unprocessed event file
34 fprintf(stderr,
35 "Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
36 " -v version: print version and exit\n"
37 " -c config_file: [required] Kproducer config file\n"
38 " -s : string of the event to send\n"
39 " -f : file containing the event to send\n"
40 " -C : start a consumer (default output received in terminal)"
41 " -P : start a producer and produce event"
42 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
43 " -p : optionnal, set a log prefix\n"
44 " -V verbosity: specify a log level for console output\n"
45 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n");
46}
47
48// stop handler
49static void stop(int sig) {
50 (void)sig;
51 run = 0;
52}
53
54int main(int argc, char* argv[]) {
55 /* Signal handler for clean shutdown */
56 signal(SIGINT, stop);
57
58 /* temporary header structure */
59 rd_kafka_headers_t* headers = NULL;
60
61 /* Analysing arguments */
62 while ((getoptret = getopt(argc, argv, "vhCPH:c:s:f:V:o:p:")) != -1) {
63 switch (getoptret) {
64 case 'v':
65 fprintf(stderr, " Version compiled on %s at %s\n", __DATE__, __TIME__);
66 exit(TRUE);
67 case 'h':
68 usage();
69 exit(0);
70 break;
71 case 'C':
72 if (KAFKA_MODE == -1) {
73 KAFKA_MODE = RD_KAFKA_CONSUMER;
74 } else {
75 fprintf(stderr, "-C and -P can not be used at the ame time!");
76 exit(TRUE);
77 }
78 break;
79 case 'P':
80 if (KAFKA_MODE == -1) {
81 KAFKA_MODE = RD_KAFKA_PRODUCER;
82 } else {
83 fprintf(stderr, "-C and -P can not be used at the ame time!");
84 exit(TRUE);
85 }
86 break;
87 case 'H': {
88 char *name = NULL, *val = NULL;
89 size_t name_sz = -1;
90
91 name = optarg;
92 val = strchr(name, '=');
93 if (val) {
94 name_sz = (size_t)(val - name);
95 val++; /* past the '=' */
96 }
97
98 if (!headers)
99 headers = rd_kafka_headers_new(16);
100
101 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
102 if (err) {
103 fprintf(stderr,
104 "%% Failed to add header %s: %s\n",
105 name, rd_kafka_err2str(err));
106 exit(1);
107 }
108 } break;
109 case 'c':
110 config_file = local_strdup(optarg);
111 break;
112 case 's':
113 Malloc(event_string, char, strlen(optarg) + 1);
114 strcpy(event_string, optarg);
115 break;
116 case 'f':
117 event_file = local_strdup(optarg);
118 break;
119 case 'o':
120 event_log_file = local_strdup(optarg);
121 break;
122 case 'p':
123 log_prefix = local_strdup(optarg);
124 break;
125 case 'V':
126 if (!strncmp("LOG_NULL", optarg, 8)) {
127 log_level = LOG_NULL;
128 } else if (!strncmp("LOG_NOTICE", optarg, 10)) {
129 log_level = LOG_NOTICE;
130 } else if (!strncmp("LOG_INFO", optarg, 8)) {
131 log_level = LOG_INFO;
132 } else if (!strncmp("LOG_ERR", optarg, 7)) {
133 log_level = LOG_ERR;
134 } else if (!strncmp("LOG_DEBUG", optarg, 9)) {
135 log_level = LOG_DEBUG;
136 } else {
137 fprintf(stderr, "%s n'est pas un niveau de log valide.\n", optarg);
138 exit(-1);
139 }
140 break;
141 case '?':
142 if (optopt == 'c' || optopt == 's' || optopt == 'f') {
143 fprintf(stderr, "Option -%c need a parameter\n", optopt);
144 exit(FALSE);
145 }
147 default:
148 usage();
149 exit(-1);
150 break;
151 }
152 }
153
154 if (KAFKA_MODE == -1) {
155 n_log(LOG_ERR, "consumer (-C) or producer (-P) mode is not defined !", log_prefix);
156 exit(1);
157 }
158
159 if (!log_prefix) {
160 log_prefix = strdup("");
161 }
162
163 set_log_level(log_level);
164 if (event_log_file) {
165 int log_file_ret = set_log_file(event_log_file);
166 n_log(LOG_DEBUG, "%s log to file: %s , %d , %p", log_prefix, event_log_file, log_file_ret, get_log_file());
167 }
168
169 /* testing parameters */
170 if (!config_file) {
171 n_log(LOG_ERR, "%s parameter config_file needs to be set !", log_prefix);
172 exit(1);
173 }
174
175 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
176 if (!event_string && !event_file) {
177 n_log(LOG_ERR, "%s one of (event_string|event_file) needs to be set !", log_prefix);
178 exit(1);
179 }
180 }
181
182 if (event_string && event_file) {
183 n_log(LOG_ERR, "%s do not define event_string AND event_file, only one needs to be set !", log_prefix);
184 exit(1);
185 }
186
187 // load kafka config file
188 unsigned int exit_code = 0;
189 N_KAFKA* kafka_handle = n_kafka_load_config(config_file, KAFKA_MODE);
190 __n_assert(kafka_handle, n_log(LOG_ERR, "kafka handke is NULL !!"); exit(1));
191
192 n_kafka_start_pooling_thread(kafka_handle);
193
194 size_t nb_queued = 0;
195 size_t nb_waiting = 0;
196 size_t nb_error = 0;
197 int poll_status = 1;
198 N_KAFKA_EVENT* event = NULL;
199
200 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
201 // create a new kafka event from a string or from a file
202 if (event_string) {
203 event = n_kafka_new_event_from_char(event_string, strlen(event_string), kafka_handle->schema_id);
204 }
205 if (event_file) {
206 event = n_kafka_new_event_from_file(event_file, kafka_handle->schema_id);
207 }
208 // set headers if any
209 if (headers) {
210 event->rd_kafka_headers = rd_kafka_headers_copy(headers);
211 // clean them if no more needed
212 rd_kafka_headers_destroy(headers);
213 }
214 // produce the event, API is charging itself of destroying it
215 if (n_kafka_produce(kafka_handle, event) == FALSE) {
216 n_log(LOG_ERR, "n_kafka_produce returned an error for event %p", event);
217 } else {
218 n_log(LOG_INFO, "n_kafka_produce returned OK for event %p", event);
219 }
220 }
221
222 // loop on pool
223 do {
224 poll_status = n_kafka_get_status(kafka_handle, &nb_queued, &nb_waiting, &nb_error);
225 n_log(LOG_DEBUG, "polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error", poll_status, nb_queued, nb_waiting, nb_error);
226
227 // if we were waiting only for producing elemens we could use a test like this to break out of the loop
228 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
229 usleep(30000);
230 if (nb_queued == 0 && nb_waiting == 0 && nb_error == 0)
231 break;
232 } else {
233 event = n_kafka_get_event(kafka_handle);
234 if (event) {
235 if (kafka_handle->schema_id != -1)
236 n_log(LOG_INFO, "received event schema id %d string:\n%s", event->schema_id, _str(event->event_string->data + 4));
237 else
238 n_log(LOG_INFO, "received event string:\n%s", event->event_string->data);
239
240 list_foreach(node, event->received_headers) {
241 N_STR* header = (N_STR*)node->ptr;
242 n_log(LOG_INFO, "headers: %s", _nstr(header));
243 }
244 n_kafka_event_destroy(&event);
245 } else {
246 usleep(30000);
247 }
248 }
249 } while (run && poll_status > 0);
250
251 n_log(LOG_INFO, "kafka_handle: %d queued, %d waiting ack, %d on error", nb_queued, nb_waiting, nb_error);
252 if (nb_error > 0 || nb_waiting > 0) {
253 n_log(LOG_ERR, "kafka_handle: %d events are still waiting for ack, and %d are on error !", nb_waiting, nb_error);
254 n_kafka_dump_unprocessed(kafka_handle, "DATAS/kafka/unprocessed");
255 }
256
257 // log unprocessed events
258 list_foreach(node, kafka_handle->received_events) {
259 if (node) {
260 if (kafka_handle->schema_id != -1)
261 n_log(LOG_INFO, "[unprocessed]received event schema id %d string:\n%s", event->schema_id, event->event_string->data + 4);
262 else
263 n_log(LOG_INFO, "[unprocessed]received event string:\n%s", event->event_string->data);
264 }
265 }
266
267 // closing kafka handle
268 n_kafka_delete(kafka_handle);
269
270 exit(exit_code);
271}
#define FALL_THROUGH
fall through macro for switch cases, avoid warning at compilation
Definition n_common.h:55
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition n_common.h:187
#define __n_assert(__ptr, __ret)
macro to assert things
Definition n_common.h:258
#define _str(__PTR)
define true
Definition n_common.h:176
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition n_common.h:182
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition n_list.h:65
FILE * get_log_file(void)
return the current log_file
Definition n_log.c:143
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition n_log.h:69
#define LOG_DEBUG
debug-level messages
Definition n_log.h:64
#define LOG_ERR
error conditions
Definition n_log.h:56
int set_log_file(char *file)
Set the logging to a file instead of stderr.
Definition n_log.c:120
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition n_log.c:91
#define LOG_NOTICE
normal but significant condition
Definition n_log.h:60
#define LOG_NULL
no log output
Definition n_log.h:26
#define LOG_INFO
informational
Definition n_log.h:62
LIST * received_events
list of received N_KAFKA_EVENT
Definition n_kafka.h:71
int schema_id
kafka schema id in network order
Definition n_kafka.h:91
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition n_kafka.c:1024
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition n_kafka.c:1092
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition n_kafka.c:262
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
Definition n_kafka.c:72
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition n_kafka.c:779
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition n_kafka.c:609
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition n_kafka.c:685
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition n_kafka.c:961
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition n_kafka.c:738
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition n_kafka.c:159
structure of a KAFKA consumer or producer handle
Definition n_kafka.h:67
structure of a KAFKA message
Definition n_kafka.h:47
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
Definition n_str.h:56
A box including a string and his lenght.
Definition n_str.h:39
kafka generic produce and consume event header
Generic log system.
Network Engine.