17#define NB_TEST_EVENTS 10
19char *config_file = NULL,
22 *event_log_file = NULL,
35 "Syntax is: ex_kafka -v -c config_file [-s event or -f eventfile] -o event_log_file -V LOGLEVEL\n"
36 " -v version: print version and exit\n"
37 " -c config_file: [required] Kproducer config file\n"
38 " -s : string of the event to send\n"
39 " -f : file containing the event to send\n"
40 " -C : start a consumer (default output received in terminal)"
41 " -P : start a producer and produce event"
42 " -o : optionnal, set a log file instead of default (stderr/stdout)\n"
43 " -p : optionnal, set a log prefix\n"
44 " -V verbosity: specify a log level for console output\n"
45 " Supported: LOG_ EMERG,ALERT,CRIT,ERR,WARNING,NOTICE,INFO,DEBUG\n");
49static void stop(
int sig) {
54int main(
int argc,
char* argv[]) {
59 rd_kafka_headers_t* headers = NULL;
62 while ((getoptret = getopt(argc, argv,
"vhCPH:c:s:f:V:o:p:")) != -1) {
65 fprintf(stderr,
" Version compiled on %s at %s\n", __DATE__, __TIME__);
72 if (KAFKA_MODE == -1) {
73 KAFKA_MODE = RD_KAFKA_CONSUMER;
75 fprintf(stderr,
"-C and -P can not be used at the ame time!");
80 if (KAFKA_MODE == -1) {
81 KAFKA_MODE = RD_KAFKA_PRODUCER;
83 fprintf(stderr,
"-C and -P can not be used at the ame time!");
88 char *name = NULL, *val = NULL;
92 val = strchr(name,
'=');
94 name_sz = (size_t)(val - name);
99 headers = rd_kafka_headers_new(16);
101 int err = rd_kafka_header_add(headers, name, name_sz, val, -1);
104 "%% Failed to add header %s: %s\n",
105 name, rd_kafka_err2str(err));
113 Malloc(event_string,
char, strlen(optarg) + 1);
114 strcpy(event_string, optarg);
126 if (!strncmp(
"LOG_NULL", optarg, 8)) {
128 }
else if (!strncmp(
"LOG_NOTICE", optarg, 10)) {
130 }
else if (!strncmp(
"LOG_INFO", optarg, 8)) {
132 }
else if (!strncmp(
"LOG_ERR", optarg, 7)) {
134 }
else if (!strncmp(
"LOG_DEBUG", optarg, 9)) {
137 fprintf(stderr,
"%s n'est pas un niveau de log valide.\n", optarg);
142 if (optopt ==
'c' || optopt ==
's' || optopt ==
'f') {
143 fprintf(stderr,
"Option -%c need a parameter\n", optopt);
154 if (KAFKA_MODE == -1) {
155 n_log(
LOG_ERR,
"consumer (-C) or producer (-P) mode is not defined !", log_prefix);
160 log_prefix = strdup(
"");
164 if (event_log_file) {
171 n_log(
LOG_ERR,
"%s parameter config_file needs to be set !", log_prefix);
175 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
176 if (!event_string && !event_file) {
177 n_log(
LOG_ERR,
"%s one of (event_string|event_file) needs to be set !", log_prefix);
182 if (event_string && event_file) {
183 n_log(
LOG_ERR,
"%s do not define event_string AND event_file, only one needs to be set !", log_prefix);
188 unsigned int exit_code = 0;
194 size_t nb_queued = 0;
195 size_t nb_waiting = 0;
200 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
210 event->rd_kafka_headers = rd_kafka_headers_copy(headers);
212 rd_kafka_headers_destroy(headers);
216 n_log(
LOG_ERR,
"n_kafka_produce returned an error for event %p", event);
218 n_log(
LOG_INFO,
"n_kafka_produce returned OK for event %p", event);
225 n_log(
LOG_DEBUG,
"polling kafka handle, status: %d, %d in queue, %d waiting for ack, %d on error", poll_status, nb_queued, nb_waiting, nb_error);
228 if (KAFKA_MODE == RD_KAFKA_PRODUCER) {
230 if (nb_queued == 0 && nb_waiting == 0 && nb_error == 0)
236 n_log(
LOG_INFO,
"received event schema id %d string:\n%s", event->schema_id,
_str(event->event_string->data + 4));
238 n_log(
LOG_INFO,
"received event string:\n%s", event->event_string->data);
249 }
while (run && poll_status > 0);
251 n_log(
LOG_INFO,
"kafka_handle: %d queued, %d waiting ack, %d on error", nb_queued, nb_waiting, nb_error);
252 if (nb_error > 0 || nb_waiting > 0) {
253 n_log(
LOG_ERR,
"kafka_handle: %d events are still waiting for ack, and %d are on error !", nb_waiting, nb_error);
261 n_log(
LOG_INFO,
"[unprocessed]received event schema id %d string:\n%s", event->schema_id, event->event_string->data + 4);
263 n_log(
LOG_INFO,
"[unprocessed]received event string:\n%s", event->event_string->data);
#define FALL_THROUGH
fall through macro for switch cases, avoid warning at compilation
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
#define __n_assert(__ptr, __ret)
macro to assert things
#define _str(__PTR)
define true
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
FILE * get_log_file(void)
return the current log_file
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
#define LOG_DEBUG
debug-level messages
#define LOG_ERR
error conditions
int set_log_file(char *file)
Set the logging to a file instead of stderr.
void set_log_level(const int log_level)
Set the global log level value ( static int LOG_LEVEL )
#define LOG_NOTICE
normal but significant condition
#define LOG_NULL
no log output
#define LOG_INFO
informational
LIST * received_events
list of received N_KAFKA_EVENT
int schema_id
kafka schema id in network order
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
int n_kafka_get_status(N_KAFKA *kafka, size_t *nb_queued, size_t *nb_waiting, size_t *nb_error)
return the queues status
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
structure of a KAFKA consumer or producer handle
structure of a KAFKA message
#define local_strdup(__src_)
Do tar(1) matching rules, which ignore a trailing slash?
A box including a string and his lenght.
kafka generic produce and consume event header