17#define NB_TEST_EVENTS 10
19char *config_file = NULL ,
20 *event_string = NULL ,
22 *event_log_file = NULL ,
37 fprintf( stderr,
"Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
38 " -v version: print version and exit\n"
39 " -c config_file: [required] Kproducer config file\n"
40 " -s : string of the event to send\n"
41 " -f : file containing the event to send\n"
42 " -C : start a consumer (default output received in terminal)"
43 " -P : start a producer and produce event"
44 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
45 " -p : optionnal, set a log prefix\n"
46 " -V verbosity: specify a log level for console output\n"
47 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n"
52static void stop(
int sig) {
58int main(
int argc ,
char *argv[] )
64 rd_kafka_headers_t *headers = NULL ;
67 while( ( getoptret = getopt( argc, argv,
"vhCPH:c:s:f:V:o:p:" ) ) != -1 )
72 fprintf( stderr,
" Version compiled on %s at %s\n", __DATE__, __TIME__ );
79 if( KAFKA_MODE == -1 )
81 KAFKA_MODE = RD_KAFKA_CONSUMER;
85 fprintf( stderr,
"-C and -P can not be used at the ame time!" );
90 if( KAFKA_MODE == -1 )
92 KAFKA_MODE = RD_KAFKA_PRODUCER;
96 fprintf( stderr,
"-C and -P can not be used at the ame time!" );
102 char *name = NULL , *val = NULL ;
106 val = strchr(name,
'=');
108 name_sz = (size_t)(val - name);
113 headers = rd_kafka_headers_new(16);
115 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
118 "%% Failed to add header %s: %s\n",
119 name, rd_kafka_err2str(err));
128 Malloc( event_string ,
char , strlen( optarg ) + 1 );
129 strcpy( event_string , optarg );
141 if( !strncmp(
"LOG_NULL", optarg, 8 ) )
145 else if( !strncmp(
"LOG_NOTICE", optarg, 10 ) )
149 else if( !strncmp(
"LOG_INFO", optarg, 8 ) )
153 else if( !strncmp(
"LOG_ERR", optarg, 7 ) )
157 else if( !strncmp(
"LOG_DEBUG", optarg, 9 ) )
163 fprintf( stderr,
"%s n'est pas un niveau de log valide.\n", optarg );
168 if( optopt ==
'c' || optopt ==
's' || optopt ==
'f' )
170 fprintf( stderr,
"Option -%c need a parameter\n", optopt );
181 if( KAFKA_MODE == -1 )
183 n_log(
LOG_ERR ,
"consumer (-C) or producer (-P) mode is not defined !" , log_prefix );
189 log_prefix = strdup(
"" );
202 n_log(
LOG_ERR ,
"%s parameter config_file needs to be set !" , log_prefix );
206 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
208 if( !event_string && !event_file )
210 n_log(
LOG_ERR ,
"%s one of (event_string|event_file) needs to be set !" , log_prefix );
215 if( event_string && event_file )
217 n_log(
LOG_ERR ,
"%s do not define event_string AND event_file, only one needs to be set !" , log_prefix );
222 unsigned int exit_code = 0 ;
231 int poll_status = 1 ;
234 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
248 event -> rd_kafka_headers = rd_kafka_headers_copy( headers );
250 rd_kafka_headers_destroy(headers);
255 n_log(
LOG_ERR ,
"n_kafka_produce returned an error for event %p" , event );
259 n_log(
LOG_INFO ,
"n_kafka_produce returned OK for event %p" , event );
266 poll_status =
n_kafka_get_status( kafka_handle , &nb_queued , &nb_waiting , &nb_error );
267 n_log(
LOG_DEBUG ,
"polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error" , poll_status, nb_queued , nb_waiting , nb_error );
270 if( KAFKA_MODE == RD_KAFKA_PRODUCER )
273 if( nb_queued == 0 && nb_waiting == 0 && nb_error == 0 )
281 if( kafka_handle -> schema_id != -1 )
282 n_log(
LOG_INFO ,
"received event schema id %d string:\n%s" , event -> schema_id ,
_str( event -> event_string -> data + 4 ) );
284 n_log(
LOG_INFO ,
"received event string:\n%s" , event -> event_string -> data );
299 while( run && poll_status > 0 );
301 n_log(
LOG_INFO ,
"kafka_handle: %d queued, %d waiting ack, %d on error" , nb_queued , nb_waiting , nb_error );
302 if( nb_error > 0 || nb_waiting > 0 )
304 n_log(
LOG_ERR ,
"kafka_handle: %d events are still waiting for ack, and %d are on error !" , nb_waiting , nb_error );
313 if( kafka_handle -> schema_id != -1 )
314 n_log(
LOG_INFO ,
"[unprocessed]received event schema id %d string:\n%s" , event -> schema_id , event -> event_string -> data + 4 );
316 n_log(
LOG_INFO ,
"[unprocessed]received event string:\n%s" , event -> event_string -> data );
#define FALL_THROUGH
set windows if true
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
FILE * get_log_file(void)
return the current log_file
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
int set_log_file(char *file)
Set the logging to a file instead of stderr.
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
#define LOG_NOTICE
normal but significant condition
#define LOG_NULL
no log output
#define LOG_INFO
informational
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
int n_kafka_get_status(N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
return the queues status
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
#define local_strdup(__src_)
local strdup
A box including a string and his lenght.
kafka generic produce and consume event header