Nilorea Library
C utilities for networking, threading, graphics
ex_kafka.c
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4#include <locale.h>
5#include <libgen.h>
6#include <errno.h>
7
8#include "nilorea/n_log.h"
9#include "nilorea/n_network.h"
10#include "nilorea/n_kafka.h"
11#include "cJSON.h"
12
13#include "rdkafka.h"
14
15#define OK 0
16#define ERROR -1
17#define NB_TEST_EVENTS 10
18
19char *config_file = NULL ,
20 *event_string = NULL ,
21 *event_file = NULL ,
22 *event_log_file = NULL ,
23 *log_prefix = NULL ;
24
25int log_level = LOG_ERR , /* default log level */
26 getoptret = 0 , /* getopt return value */
27 KAFKA_MODE = -1 ,
28 PRODUCER_MODE = 0 ,
29 run = 1 ;
30
31
32
33// help func
34void usage( void )
35{
36 // TODO: add -F unprocessed event file
37 fprintf( stderr, "Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
38 " -v version: print version and exit\n"
39 " -c config_file: [required] Kproducer config file\n"
40 " -s : string of the event to send\n"
41 " -f : file containing the event to send\n"
42 " -C : start a consumer (default output received in terminal)"
43 " -P : start a producer and produce event"
44 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
45 " -p : optionnal, set a log prefix\n"
46 " -V verbosity: specify a log level for console output\n"
47 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n"
48 );
49}
50
51// stop handler
52static void stop(int sig) {
53 (void)sig;
54 run = 0;
55}
56
57
58int main( int argc , char *argv[] )
59{
60 /* Signal handler for clean shutdown */
61 signal(SIGINT, stop);
62
63 /* temporary header structure */
64 rd_kafka_headers_t *headers = NULL ;
65
66 /* Analysing arguments */
67 while( ( getoptret = getopt( argc, argv, "vhCPH:c:s:f:V:o:p:" ) ) != -1 )
68 {
69 switch( getoptret )
70 {
71 case 'v':
72 fprintf( stderr, " Version compiled on %s at %s\n", __DATE__, __TIME__ );
73 exit( TRUE );
74 case 'h':
75 usage();
76 exit( 0 );
77 break ;
78 case 'C':
79 if( KAFKA_MODE == -1 )
80 {
81 KAFKA_MODE = RD_KAFKA_CONSUMER;
82 }
83 else
84 {
85 fprintf( stderr, "-C and -P can not be used at the ame time!" );
86 exit( TRUE );
87 }
88 break ;
89 case 'P':
90 if( KAFKA_MODE == -1 )
91 {
92 KAFKA_MODE = RD_KAFKA_PRODUCER;
93 }
94 else
95 {
96 fprintf( stderr, "-C and -P can not be used at the ame time!" );
97 exit( TRUE );
98 }
99 break ;
100 case 'H':
101 {
102 char *name = NULL , *val = NULL ;
103 size_t name_sz = -1;
104
105 name = optarg;
106 val = strchr(name, '=');
107 if (val) {
108 name_sz = (size_t)(val - name);
109 val++; /* past the '=' */
110 }
111
112 if (!headers)
113 headers = rd_kafka_headers_new(16);
114
115 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
116 if (err) {
117 fprintf(stderr,
118 "%% Failed to add header %s: %s\n",
119 name, rd_kafka_err2str(err));
120 exit(1);
121 }
122 }
123 break;
124 case 'c':
125 config_file = local_strdup( optarg );
126 break ;
127 case 's':
128 Malloc( event_string , char , strlen( optarg ) + 1 );
129 strcpy( event_string , optarg );
130 break ;
131 case 'f':
132 event_file = local_strdup( optarg );
133 break ;
134 case 'o':
135 event_log_file = local_strdup( optarg );
136 break ;
137 case 'p':
138 log_prefix = local_strdup( optarg );
139 break ;
140 case 'V' :
141 if( !strncmp( "LOG_NULL", optarg, 8 ) )
142 {
143 log_level = LOG_NULL ;
144 }
145 else if( !strncmp( "LOG_NOTICE", optarg, 10 ) )
146 {
147 log_level = LOG_NOTICE;
148 }
149 else if( !strncmp( "LOG_INFO", optarg, 8 ) )
150 {
151 log_level = LOG_INFO;
152 }
153 else if( !strncmp( "LOG_ERR", optarg, 7 ) )
154 {
155 log_level = LOG_ERR;
156 }
157 else if( !strncmp( "LOG_DEBUG", optarg, 9 ) )
158 {
159 log_level = LOG_DEBUG;
160 }
161 else
162 {
163 fprintf( stderr, "%s n'est pas un niveau de log valide.\n", optarg );
164 exit( -1 );
165 }
166 break;
167 case '?':
168 if( optopt == 'c' || optopt == 's' || optopt == 'f' )
169 {
170 fprintf( stderr, "Option -%c need a parameter\n", optopt );
171 exit( FALSE );
172 }
174 default:
175 usage();
176 exit( -1 );
177 break;
178 }
179 }
180
181 if( KAFKA_MODE == -1 )
182 {
183 n_log( LOG_ERR , "consumer (-C) or producer (-P) mode is not defined !" , log_prefix );
184 exit( 1 );
185 }
186
187 if( !log_prefix )
188 {
189 log_prefix = strdup( "" );
190 }
191
192 set_log_level( log_level );
193 if( event_log_file )
194 {
195 int log_file_ret = set_log_file( event_log_file );
196 n_log( LOG_DEBUG , "%s log to file: %s , %d , %p" , log_prefix , event_log_file , log_file_ret , get_log_file() );
197 }
198
199 /* testing parameters */
200 if( !config_file )
201 {
202 n_log( LOG_ERR , "%s parameter config_file needs to be set !" , log_prefix );
203 exit( 1 );
204 }
205
206 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
207 {
208 if( !event_string && !event_file )
209 {
210 n_log( LOG_ERR , "%s one of (event_string|event_file) needs to be set !" , log_prefix );
211 exit( 1 );
212 }
213 }
214
215 if( event_string && event_file )
216 {
217 n_log( LOG_ERR , "%s do not define event_string AND event_file, only one needs to be set !" , log_prefix );
218 exit( 1 );
219 }
220
221 // load kafka config file
222 unsigned int exit_code = 0 ;
223 N_KAFKA *kafka_handle = n_kafka_load_config( config_file , KAFKA_MODE );
224 __n_assert( kafka_handle , n_log( LOG_ERR ,"kafka handke is NULL !!"); exit( 1 ) );
225
226 n_kafka_start_pooling_thread( kafka_handle );
227
228 int nb_queued = 0 ;
229 int nb_waiting = 0 ;
230 int nb_error = 0 ;
231 int poll_status = 1 ;
232 N_KAFKA_EVENT *event = NULL ;
233
234 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
235 {
236 // create a new kafka event from a string or from a file
237 if( event_string )
238 {
239 event = n_kafka_new_event_from_char( event_string , strlen( event_string ) , kafka_handle -> schema_id );
240 }
241 if( event_file )
242 {
243 event = n_kafka_new_event_from_file( event_file , kafka_handle -> schema_id );
244 }
245 // set headers if any
246 if( headers )
247 {
248 event -> rd_kafka_headers = rd_kafka_headers_copy( headers );
249 // clean them if no more needed
250 rd_kafka_headers_destroy(headers);
251 }
252 // produce the event, API is charging itself of destroying it
253 if( n_kafka_produce( kafka_handle , event ) == FALSE )
254 {
255 n_log( LOG_ERR , "n_kafka_produce returned an error for event %p" , event );
256 }
257 else
258 {
259 n_log( LOG_INFO , "n_kafka_produce returned OK for event %p" , event );
260 }
261 }
262
263 // loop on pool
264 do
265 {
266 poll_status = n_kafka_get_status( kafka_handle , &nb_queued , &nb_waiting , &nb_error );
267 n_log( LOG_DEBUG , "polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error" , poll_status, nb_queued , nb_waiting , nb_error );
268
269 // if we were waiting only for producing elemens we could use a test like this to break out of the loop
270 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
271 {
272 usleep( 30000 );
273 if( nb_queued == 0 && nb_waiting == 0 && nb_error == 0 )
274 break ;
275 }
276 else
277 {
278 event = n_kafka_get_event( kafka_handle );
279 if( event )
280 {
281 if( kafka_handle -> schema_id != -1 )
282 n_log( LOG_INFO , "received event schema id %d string:\n%s" , event -> schema_id , _str( event -> event_string -> data + 4 ) );
283 else
284 n_log( LOG_INFO , "received event string:\n%s" , event -> event_string -> data );
285
286 list_foreach( node , event -> received_headers )
287 {
288 N_STR *header = (N_STR *)node -> ptr ;
289 n_log( LOG_INFO , "headers: %s", _nstr( header ) );
290 }
291 n_kafka_event_destroy( &event );
292 }
293 else
294 {
295 usleep( 30000 );
296 }
297 }
298 }
299 while( run && poll_status > 0 );
300
301 n_log( LOG_INFO , "kafka_handle: %d queued, %d waiting ack, %d on error" , nb_queued , nb_waiting , nb_error );
302 if( nb_error > 0 || nb_waiting > 0 )
303 {
304 n_log( LOG_ERR , "kafka_handle: %d events are still waiting for ack, and %d are on error !" , nb_waiting , nb_error );
305 n_kafka_dump_unprocessed( kafka_handle , "DATAS/kafka/unprocessed" );
306 }
307
308 // log unprocessed events
309 list_foreach( node , kafka_handle -> received_events )
310 {
311 if( node )
312 {
313 if( kafka_handle -> schema_id != -1 )
314 n_log( LOG_INFO , "[unprocessed]received event schema id %d string:\n%s" , event -> schema_id , event -> event_string -> data + 4 );
315 else
316 n_log( LOG_INFO , "[unprocessed]received event string:\n%s" , event -> event_string -> data );
317 }
318
319 }
320
321 // closing kafka handle
322 n_kafka_delete( kafka_handle );
323
324 exit( exit_code );
325}
#define FALL_THROUGH
set windows if true
Definition: n_common.h:52
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition: n_common.h:183
#define __n_assert(__ptr, __ret)
macro to assert things
Definition: n_common.h:276
#define _str(__PTR)
define true
Definition: n_common.h:172
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition: n_common.h:178
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition: n_list.h:70
FILE * get_log_file(void)
return the current log_file
Definition: n_log.c:160
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition: n_log.h:74
#define LOG_DEBUG
debug-level messages
Definition: n_log.h:66
#define LOG_ERR
error conditions
Definition: n_log.h:58
int set_log_file(char *file)
Set the logging to a file instead of stderr.
Definition: n_log.c:135
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
Definition: n_log.c:97
#define LOG_NOTICE
normal but significant condition
Definition: n_log.h:62
#define LOG_NULL
no log output
Definition: n_log.h:27
#define LOG_INFO
informational
Definition: n_log.h:64
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition: n_kafka.c:1174
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition: n_kafka.c:1251
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition: n_kafka.c:297
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition: n_kafka.c:885
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition: n_kafka.c:699
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition: n_kafka.c:784
int n_kafka_get_status(N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
return the queues status
Definition: n_kafka.c:78
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition: n_kafka.c:1103
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition: n_kafka.c:841
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition: n_kafka.c:185
structure of a KAFKA consumer or producer handle
Definition: n_kafka.h:70
structure of a KAFKA message
Definition: n_kafka.h:49
#define local_strdup(__src_)
local strdup
Definition: n_str.h:37
A box including a string and his lenght.
Definition: n_str.h:173
kafka generic produce and consume event header
Generic log system.
Network Engine.