Nilorea Library
C utilities for networking, threading, graphics
n_kafka.c
Go to the documentation of this file.
1
8#include "nilorea/n_kafka.h"
9#include "nilorea/n_common.h"
10#include "nilorea/n_base64.h"
11
12
13
19int32_t n_kafka_get_schema_from_char( char *string )
20{
21 __n_assert( string , return -1 );
22
23 int32_t schema_id = -1 ;
24 memcpy( &schema_id , string + 1 , sizeof( int32_t) );
25 return ntohl( schema_id );
26}
27
34{
35 __n_assert( string , return -1 );
36 __n_assert( string -> data , return -1 );
37 __n_assert( ( string -> written >= sizeof( int32_t ) ) , return -1 );
38 return n_kafka_get_schema_from_char( string -> data );
39}
40
47int n_kafka_put_schema_in_char( char *string , int schema_id )
48{
49 __n_assert( string , return FALSE );
50 int32_t schema_id_htonl = htonl( schema_id );
51 memcpy( string + 1, &schema_id_htonl , sizeof( int32_t) );
52 return TRUE ;
53}
54
61int n_kafka_put_schema_in_nstr( N_STR *string , int schema_id )
62{
63 __n_assert( string , return FALSE );
64 __n_assert( string -> data , return FALSE );
65 __n_assert( ( string -> written >= sizeof( int32_t ) ) , return FALSE );
66 return n_kafka_put_schema_in_char( string -> data , schema_id );
67}
68
69
78int n_kafka_get_status( N_KAFKA *kafka , int *nb_queued , int *nb_waiting , int *nb_error)
79{
80 __n_assert( kafka , return FALSE );
81 __n_assert( nb_waiting , return FALSE );
82 __n_assert( nb_error , return FALSE );
83 read_lock( kafka -> rwlock );
84 int status = kafka -> pooling_thread_status ;
85 *nb_queued = kafka -> nb_queued ;
86 *nb_waiting = kafka -> nb_waiting ;
87 *nb_error = kafka -> nb_error ;
88 unlock( kafka -> rwlock );
89 return status ;
90}
91
92
102static void n_kafka_delivery_message_callback( rd_kafka_t *rk , const rd_kafka_message_t *rkmessage , void *opaque )
103{
104 (void)opaque;
105
106 __n_assert( rk , n_log( LOG_ERR , "rk=NULL is not a valid kafka handle" ); return );
107 __n_assert( rkmessage , n_log( LOG_ERR , "rkmessage=NULL is not a valid kafka message" ); return );
108
109 N_KAFKA_EVENT *event = (N_KAFKA_EVENT *)rkmessage->_private ;
110
111 if (rkmessage->err)
112 {
113 n_log( LOG_ERR , "message delivery failed: %s" , rd_kafka_err2str( rkmessage -> err) );
114 if( !event )
115 {
116 n_log( LOG_ERR , "fatal: event is NULL" );
117 return ;
118 }
119 if( event && event -> parent_table )
120 write_lock( event -> parent_table -> rwlock );
121 event -> status = N_KAFKA_EVENT_ERROR ;
122 if( event && event -> parent_table )
123 unlock( event -> parent_table -> rwlock );
124 }
125 else
126 {
127 n_log( LOG_DEBUG , "message delivered (%ld bytes, partition %d)" , rkmessage -> len , rkmessage -> partition );
128 if( event )
129 {
130 // lock
131 if( event && event -> parent_table )
132 write_lock( event -> parent_table -> rwlock );
133 // delete produce event linked files
134 if( event -> event_files_to_delete )
135 {
136 char **files_to_delete = split( _nstr( event -> event_files_to_delete ) , ";" , 0 );
137 if( files_to_delete )
138 {
139 if( split_count( files_to_delete ) > 0 )
140 {
141 int index = 0 ;
142 while( files_to_delete[ index ] )
143 {
144 int ret = unlink( files_to_delete[ index ] );
145 int error = errno ;
146 if( ret == 0 )
147 {
148 n_log( LOG_DEBUG , "deleted on produce ack: %s" , files_to_delete[ index ] );
149 }
150 else
151 {
152 n_log( LOG_ERR , "couldn't delete \"%s\": %s" , files_to_delete[ index ] , strerror( error ) );
153 }
154 index ++;
155 }
156 }
157 else
158 {
159 n_log( LOG_ERR , "split result is empty !" );
160 }
161 }
162 free_split_result( &files_to_delete );
163 }
164 // set status
165 event -> status = N_KAFKA_EVENT_OK ;
166 // unlock
167 if( event && event -> parent_table )
168 unlock( event -> parent_table -> rwlock );
169 n_log( LOG_INFO , "kafka event %p received an ack !" , event );
170 }
171 else
172 {
173 n_log( LOG_ERR , "fatal: event is NULL" );
174 }
175 }
176 return ;
177 /* The rkmessage is destroyed automatically by librdkafka */
178}
179
180
186{
187 __n_assert( kafka , return );
188
189 FreeNoLog( kafka -> topic );
190 FreeNoLog( kafka -> topics );
191 FreeNoLog( kafka -> groupid );
192
194
195 if( kafka -> rd_kafka_handle )
196 {
197 if( kafka -> mode == RD_KAFKA_CONSUMER )
198 {
199 rd_kafka_consumer_close(kafka -> rd_kafka_handle);
200 rd_kafka_topic_partition_list_destroy(kafka ->subscription);
201 }
202 if( kafka -> mode == RD_KAFKA_PRODUCER )
203 {
204 rd_kafka_flush( kafka -> rd_kafka_handle , kafka -> poll_timeout );
205 }
206 rd_kafka_destroy( kafka -> rd_kafka_handle );
207 n_log( LOG_DEBUG , "kafka handle destroyed" );
208 }
209
210 if( kafka -> rd_kafka_conf )
211 {
212 rd_kafka_conf_destroy( kafka -> rd_kafka_conf );
213 }
214
215 if( kafka -> configuration )
216 cJSON_Delete( kafka -> configuration );
217
218 if( kafka -> errstr )
219 free_nstr( &kafka -> errstr );
220
221 if( kafka -> events_to_send )
222 list_destroy( &kafka -> events_to_send );
223
224 if( kafka -> received_events )
225 list_destroy( &kafka -> received_events );
226
227 rw_lock_destroy( kafka -> rwlock );
228
229 if( kafka -> rd_kafka_topic )
230 rd_kafka_topic_destroy( kafka -> rd_kafka_topic );
231
232 Free( kafka -> bootstrap_servers );
233
234 Free( kafka );
235 return ;
236}
237
238
246N_KAFKA *n_kafka_new( int64_t poll_interval , int64_t poll_timeout , size_t errstr_len )
247{
248 N_KAFKA *kafka = NULL ;
249 Malloc( kafka , N_KAFKA , 1 );
250 __n_assert( kafka , return NULL );
251
252 kafka -> errstr = new_nstr( errstr_len );
253 __n_assert( kafka -> errstr , Free( kafka ); return NULL );
254
255 kafka -> events_to_send = NULL ;
256 kafka -> received_events = NULL ;
257 kafka -> rd_kafka_conf = NULL ;
258 kafka -> rd_kafka_handle = NULL ;
259 kafka -> configuration = NULL ;
260 kafka -> groupid= NULL ;
261 kafka -> topics= NULL ;
262 kafka -> subscription= NULL ;
263 kafka -> topic = NULL ;
264 kafka -> mode = -1 ;
265 kafka -> schema_id = -1 ;
266 kafka -> poll_timeout = poll_timeout ;
267 kafka -> poll_interval = poll_interval ;
268 kafka -> nb_queued = 0 ;
269 kafka -> nb_waiting = 0 ;
270 kafka -> nb_error = 0 ;
271 kafka -> pooling_thread_status = 0 ;
272 kafka -> bootstrap_servers = NULL ;
273
274 kafka -> events_to_send = new_generic_list( 0 );
275 __n_assert( kafka -> events_to_send , n_kafka_delete( kafka ); return NULL );
276
277 kafka -> received_events = new_generic_list( 0 );
278 __n_assert( kafka -> events_to_send , n_kafka_delete( kafka ); return NULL );
279
280 if( init_lock( kafka -> rwlock ) != 0 )
281 {
282 n_log( LOG_ERR , "could not init kafka rwlock in kafka structure at address %p" , kafka );
283 n_kafka_delete( kafka );
284 return NULL ;
285 }
286
287 return kafka ;
288}
289
290
297N_KAFKA *n_kafka_load_config( char *config_file , int mode )
298{
299 __n_assert( config_file , return NULL );
300
301 N_KAFKA *kafka = NULL ;
302
303 kafka = n_kafka_new( 100 , -1 , 1024 );
304 __n_assert( kafka , return NULL );
305
306 // initialize kafka object
307 kafka -> rd_kafka_conf = rd_kafka_conf_new();
308
309 // load config file
310 N_STR *config_string = NULL ;
311 config_string = file_to_nstr( config_file );
312 if( !config_string )
313 {
314 n_log( LOG_ERR , "unable to read config from file %s !" , config_file );
315 n_kafka_delete( kafka );
316 return NULL ;
317 }
318 cJSON *json = NULL ;
319 json = cJSON_Parse( _nstrp( config_string ) );
320 if( !json )
321 {
322 n_log( LOG_ERR , "unable to parse json %s" , config_string );
323 free_nstr( &config_string );
324 n_kafka_delete( kafka );
325 return NULL ;
326 }
327
328 int jsonIndex;
329 for (jsonIndex = 0; jsonIndex < cJSON_GetArraySize(json); jsonIndex++)
330 {
331 cJSON *entry = cJSON_GetArrayItem(json, jsonIndex);
332
333 if( !entry ) continue ;
334 __n_assert( entry -> string , continue );
335 if( !entry -> valuestring )
336 {
337 n_log( LOG_DEBUG , "no valuestring for entry %s" , _str( entry -> string ) );
338 continue ;
339 }
340
341 if( entry -> string[ 0 ] != '-' )
342 {
343 // if it's not one of the optionnal parameters not managed by kafka, then we can use rd_kafka_conf_set on them
344 if( strcmp( "topic" , entry -> string ) != 0 && strcmp( "topics" , entry -> string ) != 0 && strcmp( "value.schema.id" , entry -> string ) != 0 && strcmp( "poll.interval" , entry -> string ) != 0 && strcmp( "poll.timeout" , entry -> string ) !=0 && strcmp( "group.id.autogen" , entry -> string ) != 0 )
345 {
346 if( !strcmp( "group.id" , entry -> string ) )
347 {
348 // exclude group id for producer
349 if( mode == RD_KAFKA_PRODUCER )
350 continue ;
351 kafka -> groupid = strdup( entry -> valuestring );
352 }
353
354 if( rd_kafka_conf_set( kafka -> rd_kafka_conf , entry -> string , entry -> valuestring , _nstr( kafka -> errstr ) , kafka -> errstr -> length ) != RD_KAFKA_CONF_OK )
355 {
356 n_log( LOG_ERR , "kafka config: %s" , _nstr( kafka -> errstr ) );
357 }
358 else
359 {
360 n_log( LOG_DEBUG , "kafka config enabled: %s => %s" , entry -> string , entry -> valuestring );
361 }
362 }
363 }
364 else
365 {
366 n_log( LOG_DEBUG , "kafka disabled config: %s => %s" , entry -> string , entry -> valuestring );
367 }
368 }
369
370 // other parameters, not directly managed by kafka API (will cause an error if used along rd_kafka_conf_set )
371 // producer topic
372 cJSON *jstr = NULL ;
373 jstr = cJSON_GetObjectItem( json, "topic" );
374 if( jstr && jstr -> valuestring )
375 {
376 kafka -> topic = strdup( jstr -> valuestring );
377 n_log( LOG_DEBUG , "kafka producer topic: %s" , kafka -> topic );
378 }
379 else
380 {
381 if( mode == RD_KAFKA_PRODUCER )
382 {
383 n_log( LOG_ERR , "no topic configured !" );
384 n_kafka_delete( kafka );
385 return NULL ;
386 }
387 }
388 // consumer topics
389 jstr = cJSON_GetObjectItem( json, "topics" );
390 if( jstr && jstr -> valuestring )
391 {
392 kafka -> topics = split( jstr -> valuestring , "," , 0 );
393 n_log( LOG_DEBUG , "kafka consumer topics: %s" , jstr -> valuestring );
394 }
395 else
396 {
397 if( mode == RD_KAFKA_CONSUMER )
398 {
399 n_log( LOG_ERR , "no topics configured !" );
400 n_kafka_delete( kafka );
401 return NULL ;
402 }
403 }
404 jstr = cJSON_GetObjectItem( json, "value.schema.id" );
405 if( jstr && jstr -> valuestring )
406 {
407 int schem_v = atoi( jstr -> valuestring );
408 if( schem_v < -1 || schem_v > 9999 )
409 {
410 n_log( LOG_ERR , "invalid schema id %d" , schem_v );
411 n_kafka_delete( kafka );
412 return NULL ;
413 }
414 n_log( LOG_DEBUG , "kafka schema id: %d" , schem_v );
415 kafka -> schema_id = schem_v ;
416 }
417
418 jstr = cJSON_GetObjectItem( json, "poll.interval" );
419 if( jstr && jstr -> valuestring )
420 {
421 kafka -> poll_interval = atoi( jstr -> valuestring );
422 n_log( LOG_DEBUG , "kafka poll interval: %d" , kafka -> poll_interval );
423 }
424
425 jstr = cJSON_GetObjectItem( json, "poll.timeout" );
426 if( jstr && jstr -> valuestring )
427 {
428 kafka -> poll_timeout = atoi( jstr -> valuestring );
429 n_log( LOG_DEBUG , "kafka poll timeout: %d" , kafka -> poll_timeout );
430 }
431
432 // saving bootstrap servers in struct
433 jstr = cJSON_GetObjectItem( json, "bootstrap.servers" );
434 if( jstr && jstr -> valuestring )
435 {
436 kafka -> bootstrap_servers = strdup( jstr -> valuestring );
437 n_log( LOG_DEBUG , "kafka bootstrap server: %s" , kafka -> bootstrap_servers );
438 }
439
440
441 if( mode == RD_KAFKA_PRODUCER )
442 {
443 // set delivery callback
444 rd_kafka_conf_set_dr_msg_cb( kafka -> rd_kafka_conf , n_kafka_delivery_message_callback );
445
446 kafka -> rd_kafka_handle = rd_kafka_new( RD_KAFKA_PRODUCER , kafka -> rd_kafka_conf , _nstr( kafka -> errstr ) , kafka -> errstr -> length );
447 if( !kafka -> rd_kafka_handle )
448 {
449 n_log( LOG_ERR , "failed to create new producer: %s" , _nstr( kafka -> errstr ) );
450 n_kafka_delete( kafka );
451 return NULL ;
452 }
453 // conf is now owned by kafka handle
454 kafka -> rd_kafka_conf = NULL ;
455 // Create topic object
456 kafka -> rd_kafka_topic = rd_kafka_topic_new( kafka -> rd_kafka_handle , kafka -> topic , NULL );
457 if( !kafka -> rd_kafka_topic )
458 {
459 n_kafka_delete( kafka );
460 return NULL ;
461 }
462 kafka -> mode = RD_KAFKA_PRODUCER ;
463 }
464 else if( mode == RD_KAFKA_CONSUMER )
465 {
466 /* If there is no previously committed offset for a partition
467 * the auto.offset.reset strategy will be used to decide where
468 * in the partition to start fetching messages.
469 * By setting this to earliest the consumer will read all messages
470 * in the partition if there was no previously committed offset. */
471 /*if (rd_kafka_conf_set(kafka -> rd_kafka_conf, "auto.offset.reset", "earliest", _nstr( kafka -> errstr ) , kafka -> errstr -> length) != RD_KAFKA_CONF_OK) {
472 n_log( LOG_ERR , "kafka conf set: %s", kafka -> errstr);
473 n_kafka_delete( kafka );
474 return NULL ;
475 }*/
476
477 // if groupid is not set, generate a unique one
478 if( !kafka -> groupid )
479 {
480 char computer_name[ 1024 ] = "" ;
481 get_computer_name( computer_name , 1024 );
482 N_STR *groupid = new_nstr( 1024 );
483 char *topics = join( kafka -> topics , "_" );
484 // generating group id
485 jstr = cJSON_GetObjectItem( json, "group.id.autogen" );
486 if( jstr && jstr -> valuestring )
487 {
488 if( strcmp( jstr -> valuestring , "host-topic-group" ) == 0 )
489 {
490 //nstrprintf( groupid , "%s_%s_%s" , computer_name , topics ,kafka -> bootstrap_servers );
491 nstrprintf( groupid , "%s_%s" , computer_name , topics );
492 }
493 else if( strcmp( jstr -> valuestring , "unique-group" ) == 0 )
494 {
495 //nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
496 nstrprintf( groupid , "%s_%s_%d" , computer_name , topics , getpid() );
497 }
498 }
499 else // default unique group
500 {
501 //nstrprintf( groupid , "%s_%s_%s_%d" , computer_name , topics , kafka -> bootstrap_servers , getpid() );
502 nstrprintf( groupid , "%s_%s_%d" , computer_name , topics , getpid() );
503 n_log( LOG_DEBUG , "group.id is not set and group.id.autogen is not set, generated unique group id: %s" , _nstr( groupid ));
504 }
505 free( topics );
506 kafka -> groupid = groupid -> data ;
507 groupid -> data = NULL ;
508 free_nstr( &groupid );
509 }
510 if( rd_kafka_conf_set( kafka -> rd_kafka_conf , "group.id" , kafka -> groupid , _nstr( kafka -> errstr ) , kafka -> errstr -> length ) != RD_KAFKA_CONF_OK )
511 {
512 n_log( LOG_ERR , "kafka consumer group.id error: %s" , _nstr( kafka -> errstr ) );
513 n_kafka_delete( kafka );
514 return NULL;
515 }
516 else
517 {
518 n_log( LOG_DEBUG , "kafka consumer group.id => %s" , kafka -> groupid );
519 }
520
521
522 /* Create consumer instance.
523 * NOTE: rd_kafka_new() takes ownership of the conf object
524 * and the application must not reference it again after
525 * this call.
526 */
527 kafka -> rd_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, kafka -> rd_kafka_conf, _nstr( kafka -> errstr ) , kafka -> errstr -> length );
528 if (!kafka -> rd_kafka_handle) {
529 n_log( LOG_ERR , "%% Failed to create new consumer: %s", kafka ->errstr);
530 n_kafka_delete( kafka );
531 return NULL;
532 }
533 // conf is now owned by kafka handle
534 kafka -> rd_kafka_conf = NULL ;
535
536 /* Redirect all messages from per-partition queues to
537 * the main queue so that messages can be consumed with one
538 * call from all assigned partitions.
539 *
540 * The alternative is to poll the main queue (for events)
541 * and each partition queue separately, which requires setting
542 * up a rebalance callback and keeping track of the assignment:
543 * but that is more complex and typically not recommended. */
544 rd_kafka_poll_set_consumer(kafka -> rd_kafka_handle);
545
546 /* Convert the list of topics to a format suitable for librdkafka */
547 int topic_cnt = split_count( kafka -> topics );
548 kafka -> subscription = rd_kafka_topic_partition_list_new(topic_cnt);
549 for (int i = 0; i < topic_cnt; i++)
550 rd_kafka_topic_partition_list_add(kafka ->subscription, kafka ->topics[i],
551 /* the partition is ignored
552 * by subscribe() */
553 RD_KAFKA_PARTITION_UA);
554 /* Assign the topic. This method is disabled as it does noat allow dynamic partition assignement
555 int err = rd_kafka_assign(kafka -> rd_kafka_handle, kafka -> subscription);
556 if( err )
557 {
558 n_log( LOG_ERR , "kafka consumer: failed to assign %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
559 n_kafka_delete( kafka );
560 return NULL;
561 } */
562
563 /* Subscribe to the list of topics */
564 int err = rd_kafka_subscribe(kafka -> rd_kafka_handle, kafka -> subscription);
565 if( err )
566 {
567 n_log( LOG_ERR , "kafka consumer: failed to subscribe to %d topics: %s", kafka -> subscription->cnt, rd_kafka_err2str(err));
568 n_kafka_delete( kafka );
569 return NULL;
570 }
571
572 n_log( LOG_DEBUG, "kafka consumer created and subscribed to %d topic(s), waiting for rebalance and messages...", kafka ->subscription->cnt);
573
574 kafka -> mode = RD_KAFKA_CONSUMER ;
575 }
576 else
577 {
578 n_log( LOG_ERR , "invalid mode %d" , mode );
579 n_kafka_delete( kafka );
580 return NULL ;
581 }
582
583 return kafka ;
584} /* n_kafka_load_config */
585
586
593{
594 N_KAFKA_EVENT *event = NULL ;
595 Malloc( event , N_KAFKA_EVENT , 1 );
596 __n_assert( event , return NULL );
597
598 event -> event_string = NULL ;
599 event -> event_files_to_delete = NULL ;
600 event -> from_topic = NULL ;
601 event -> rd_kafka_headers = NULL ;
602 event -> received_headers = NULL ;
603 event -> schema_id = schema_id ;
604 event -> status = N_KAFKA_EVENT_CREATED ;
605 event -> parent_table = NULL ;
606
607 return event ;
608} /* n_kafka_new_event */
609
610
617int n_kafka_new_headers( N_KAFKA_EVENT *event , size_t count )
618{
619 __n_assert( event , return FALSE );
620
621 if( count == 0 )
622 count = 1 ;
623
624 if( !event -> rd_kafka_headers )
625 {
626 event -> rd_kafka_headers = rd_kafka_headers_new( count );
627 __n_assert( event -> rd_kafka_headers , return FALSE );
628 }
629 else
630 {
631 n_log( LOG_ERR , "event headers already allocated for event %p" , event );
632 return FALSE ;
633 }
634 return TRUE ;
635}
636
646int n_kafka_add_header_ex( N_KAFKA_EVENT *event , char *key , size_t key_length , char *value , size_t value_length )
647{
648 __n_assert( event , return FALSE );
649 __n_assert( event -> rd_kafka_headers, return FALSE );
650 __n_assert( key , return FALSE );
651 __n_assert( value , return FALSE );
652
653 if( key_length < 1 )
654 {
655 n_log( LOG_ERR , "could not add null key in event %p headers" , event );
656 return FALSE ;
657 }
658 if( value_length < 1 )
659 {
660 n_log( LOG_ERR , "could not add null value for key '%s' in event %p headers" , key , event);
661 return FALSE ;
662 }
663 rd_kafka_resp_err_t err = rd_kafka_header_add( event -> rd_kafka_headers , key , key_length , value , value_length );
664 if( err )
665 {
666 n_log( LOG_ERR , "failed to add header [%s:%d=%s:%d] to event %p: %s" , key , key_length , value , value_length , event , rd_kafka_err2str( err ) );
667 return FALSE ;
668 }
669
670 return TRUE ;
671}
672
673
681int n_kafka_add_header( N_KAFKA_EVENT *event , N_STR *key , N_STR *value )
682{
683 __n_assert( event , return FALSE );
684 __n_assert( key , return FALSE );
685 __n_assert( key -> data , return FALSE );
686 __n_assert( value , return FALSE );
687 __n_assert( value -> data , return FALSE );
688
689 return n_kafka_add_header_ex( event , key -> data , key -> written , value -> data , value -> written );
690}
691
692
700{
701 __n_assert( kafka , return FALSE );
702 __n_assert( event , return FALSE );
703
704 event -> parent_table = kafka ;
705
706 write_lock( kafka -> rwlock );
707 kafka -> nb_queued ++ ;
708 list_push( kafka -> events_to_send , event , &n_kafka_event_destroy_ptr );
709 unlock( kafka -> rwlock );
710
711 // Success to *enqueue* event
712 n_log( LOG_DEBUG , "successfully enqueued event %p in producer %p waitlist, topic: %s" , event , kafka -> rd_kafka_handle , kafka -> topic );
713
714 return TRUE ;
715} /* n_kafka_produce */
716
717
725{
726 rd_kafka_resp_err_t err = 0 ;
727
728 char *event_string = NULL ;
729 size_t event_length = -1 ;
730
731 event -> parent_table = kafka ;
732
733 event_string = event -> event_string -> data ;
734 event_length = event -> event_string -> length ;
735
736 if( event -> rd_kafka_headers )
737 {
738 rd_kafka_headers_t *hdrs_copy;
739 hdrs_copy = rd_kafka_headers_copy( event -> rd_kafka_headers );
740
741 err = rd_kafka_producev(
742 kafka -> rd_kafka_handle , RD_KAFKA_V_RKT(kafka -> rd_kafka_topic),
743 RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
744 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
745 RD_KAFKA_V_VALUE( event_string , event_length ),
746 RD_KAFKA_V_HEADERS(hdrs_copy),
747 RD_KAFKA_V_OPAQUE((void *)event ),
748 RD_KAFKA_V_END);
749
750 if( err )
751 {
752 rd_kafka_headers_destroy(hdrs_copy);
753 event -> status = N_KAFKA_EVENT_ERROR ;
754 n_log( LOG_ERR , "failed to produce event: %p with headers %p, producer: %p, topic: %s, error: %s" , event , event -> rd_kafka_headers , kafka -> rd_kafka_handle , kafka -> topic , rd_kafka_err2str( err ) );
755 return FALSE ;
756 }
757 }
758 else
759 {
760 if( rd_kafka_produce( kafka -> rd_kafka_topic , RD_KAFKA_PARTITION_UA , RD_KAFKA_MSG_F_COPY , event_string , event_length , NULL , 0 , event ) == -1 )
761 {
762 int error = errno ;
763 event -> status = N_KAFKA_EVENT_ERROR ;
764 n_log( LOG_ERR , "failed to produce event: %p, producer: %p, topic: %s, error: %s" , event , kafka -> rd_kafka_handle , kafka -> topic , strerror( error ) );
765 return FALSE ;
766 }
767 }
768 // Success to *enqueue* event
769 n_log( LOG_DEBUG , "successfully enqueued event %p in local producer %p : %s" , event , kafka -> rd_kafka_handle , kafka -> topic );
770
771 event -> status = N_KAFKA_EVENT_WAITING_ACK ;
772
773 return TRUE ;
774} /* n_kafka_produce_ex */
775
776
784N_KAFKA_EVENT *n_kafka_new_event_from_char( char *string , size_t written , int schema_id )
785{
786 __n_assert( string , return NULL );
787
788 size_t offset = 0 ;
789 if( schema_id != -1 )
790 offset = 5 ;
791
792 N_KAFKA_EVENT *event = n_kafka_new_event( schema_id );
793 __n_assert( event , return NULL );
794
795 Malloc( event -> event_string , N_STR , 1 );
796 __n_assert( event -> event_string , free( event ); return NULL );
797
798 // allocate the size of the event + (the size of the schema id + magic byte) + one ending \0
799 size_t length = written + offset ;
800 Malloc( event -> event_string -> data , char , length );
801 __n_assert( event -> event_string , free( event -> event_string ); free( event ); return NULL );
802 event -> event_string -> length = length ;
803
804 // copy incomming body
805 memcpy( event -> event_string -> data + offset , string , written );
806 event -> event_string -> written = written + offset ;
807
808 if( schema_id != -1 )
809 n_kafka_put_schema_in_nstr( event -> event_string , schema_id );
810
811 // set status and schema id
812 event -> status = N_KAFKA_EVENT_QUEUED ;
813
814 return event ;
815} /* n_kafka_new_event_from_char */
816
817
825{
826 __n_assert( string , return NULL );
827 __n_assert( string -> data , return NULL );
828
829 N_KAFKA_EVENT *event = n_kafka_new_event_from_char( string -> data , string -> written , schema_id );
830
831 return event ;
832} /* n_kafka_new_event_from_string */
833
834
841N_KAFKA_EVENT *n_kafka_new_event_from_file( char *filename , int schema_id )
842{
843 __n_assert( filename , return NULL );
844
845 N_STR *from = file_to_nstr( filename );
846 __n_assert( from , return NULL );
847
848 return n_kafka_new_event_from_string( from , schema_id );
849} /* n_kafka_new_event_from_file */
850
851
856void n_kafka_event_destroy_ptr( void *event_ptr )
857{
858 __n_assert( event_ptr , return );
859 N_KAFKA_EVENT *event = (N_KAFKA_EVENT *)event_ptr ;
860 __n_assert( event , return );
861
862 if( event -> event_string )
863 free_nstr( &event -> event_string );
864
865 if( event -> event_files_to_delete )
866 free_nstr( &event -> event_files_to_delete );
867
868 FreeNoLog( event -> from_topic );
869
870 if( event -> rd_kafka_headers )
871 rd_kafka_headers_destroy( event -> rd_kafka_headers );
872
873 if( event -> received_headers )
874 list_destroy( &event -> received_headers );
875
876 free( event );
877 return ;
878} /* n_kafka_event_destroy_ptr */
879
886{
887 __n_assert( event&&(*event) , return FALSE );
888 n_kafka_event_destroy_ptr( (*event) );
889 (*event) = NULL ;
890 return TRUE ;
891} /* n_kafka_event_destroy */
892
893
900{
901 __n_assert( kafka , return FALSE );
902 int ret = TRUE ;
903
904 // wait poll interval msecs for kafka response
905 if( kafka -> mode == RD_KAFKA_PRODUCER )
906 {
907 int nb_events = rd_kafka_poll( kafka -> rd_kafka_handle , kafka -> poll_interval );
908 (void)nb_events;
909 write_lock( kafka -> rwlock );
910 // check events status in event table
911 LIST_NODE *node = kafka -> events_to_send -> start ;
912 while( node )
913 {
914 N_KAFKA_EVENT *event = (N_KAFKA_EVENT *)node -> ptr ;
915 if( event -> status == N_KAFKA_EVENT_OK )
916 {
917 kafka -> nb_waiting -- ;
918 n_log( LOG_DEBUG , "removing event OK %p" , event );
919 LIST_NODE *node_to_kill = node ;
920 node = node -> next ;
921 N_KAFKA_EVENT *event_to_kill = remove_list_node( kafka -> events_to_send , node_to_kill , N_KAFKA_EVENT );
922 n_kafka_event_destroy( &event_to_kill );
923 continue ;
924 }
925 else if( event -> status == N_KAFKA_EVENT_QUEUED )
926 {
927 if( n_kafka_produce_ex( kafka , event ) == FALSE )
928 {
929 kafka -> nb_error++ ;
930 }
931 else
932 {
933 kafka -> nb_waiting ++ ;
934 kafka -> nb_queued -- ;
935 }
936 }
937 else if( event -> status == N_KAFKA_EVENT_ERROR )
938 {
939 kafka -> nb_error ++ ;
940 // TODO :
941 //Re try sending errored events after error_timeout is done
942 //
943 }
944 node = node -> next ;
945 }
946 unlock( kafka -> rwlock );
947 }
948 else if( kafka -> mode == RD_KAFKA_CONSUMER )
949 {
950 rd_kafka_message_t *rkm = NULL ;
951 while( ( rkm = rd_kafka_consumer_poll(kafka -> rd_kafka_handle , kafka -> poll_interval) ) )
952 {
953 if (rkm->err) {
954 /* Consumer errors are generally to be considered
955 * informational as the consumer will automatically
956 * try to recover from all types of errors. */
957 n_log( LOG_ERR , "consumer: %s", rd_kafka_message_errstr(rkm));
958 continue ;
959 }
960
961 /* Reminder of rkm contents */
962 // Proper message.
963 n_log( LOG_DEBUG , "Message on %s [%" PRId32 "] at offset %" PRId64 " (leader epoch %" PRId32 ")", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset, rd_kafka_message_leader_epoch(rkm));
964
965 // Print the message key
966 if (rkm->key && rkm -> key_len > 0 )
967 n_log( LOG_DEBUG , "Key: %.*s", (int)rkm->key_len, (const char *)rkm->key);
968 else if (rkm->key)
969 n_log( LOG_DEBUG , "Key: (%d bytes)", (int)rkm->key_len);
970
971 if( rkm -> payload && rkm -> len > 0)
972 {
973 write_lock( kafka -> rwlock );
974 // make a copy of the event for further processing
975 N_KAFKA_EVENT *event = NULL ;
976 event = n_kafka_new_event_from_char( rkm -> payload , rkm -> len , -1 ); // no schema id because we want a full raw copy here
977 event -> parent_table = kafka ;
978 // test if there are headers, save them
979 rd_kafka_headers_t *hdrs = NULL ;
980 if( !rd_kafka_message_headers( rkm , &hdrs ) )
981 {
982 size_t idx = 0 ;
983 const char *name = NULL ;
984 const void *val = NULL ;
985 size_t size = 0 ;
986
987 event -> received_headers = new_generic_list( 0 );
988 while( !rd_kafka_header_get_all(hdrs, idx, &name, &val, &size) )
989 {
990 N_STR *header_entry = NULL ;
991 nstrprintf( header_entry , "%s=%s" , _str( name ) , _str( (char *)val ) );
992 list_push( event -> received_headers , header_entry , &free_nstr_ptr );
993 idx ++ ;
994 }
995 }
996 // save originating topic (can help sorting if there are multiples one)
997 event -> from_topic = strdup( rd_kafka_topic_name( rkm -> rkt ) );
998 if( kafka -> schema_id != -1 )
999 event -> schema_id = n_kafka_get_schema_from_nstr( event -> event_string );
1000 list_push( kafka -> received_events , event , &n_kafka_event_destroy_ptr );
1001 n_log( LOG_DEBUG , "Consumer received event of (%d bytes) from topic %s", (int)rkm->len , event -> from_topic);
1002 unlock( kafka -> rwlock );
1003 }
1004 rd_kafka_message_destroy( rkm );
1005 }
1006 }
1007 if( kafka -> nb_error > 0 )
1008 {
1009 ret = FALSE ;
1010 }
1011 // n_log( LOG_DEBUG , "kafka poll for handle %p returned %d elements" , kafka -> rd_kafka_handle , nb_events );
1012 return ret ;
1013} /* n_kafka_poll */
1014
1015
1021void *n_kafka_pooling_thread( void *ptr )
1022{
1023 N_KAFKA *kafka = (N_KAFKA *)ptr ;
1024
1025 int status = 1 ;
1026
1027 N_TIME chrono ;
1028
1029 if( kafka -> mode == RD_KAFKA_PRODUCER )
1030 n_log( LOG_DEBUG , "starting pooling thread for kafka handler %p mode PRODUCER (%d) topic %s" , kafka -> rd_kafka_handle ,RD_KAFKA_PRODUCER, kafka -> topic );
1031 if( kafka -> mode == RD_KAFKA_CONSUMER )
1032 {
1033 char *topiclist = join( kafka -> topics , "," );
1034 n_log( LOG_DEBUG , "starting pooling thread for kafka handler %p mode CONSUMER (%d) topic %s" , kafka -> rd_kafka_handle , RD_KAFKA_CONSUMER, _str( topiclist ) );
1035 FreeNoLog( topiclist );
1036 }
1037
1038 start_HiTimer( &chrono );
1039
1040 int64_t remaining_time = kafka -> poll_timeout * 1000 ;
1041 while( status == 1 )
1042 {
1043
1044 if( n_kafka_poll( kafka ) == FALSE )
1045 {
1046 if( kafka -> mode == RD_KAFKA_PRODUCER )
1047 {
1048 n_log( LOG_ERR , "failed to poll kafka producer handle %p with topic %s" , kafka -> rd_kafka_handle , rd_kafka_topic_name( kafka -> rd_kafka_topic ) );
1049 }
1050 else if( kafka -> topics )
1051 {
1052 char *topiclist = join( kafka -> topics , "," );
1053 n_log( LOG_ERR , "failed to poll kafka consumer handle %p with topic %s" , kafka -> rd_kafka_handle , _str( topiclist ) );
1054 FreeNoLog( topiclist );
1055 }
1056 }
1057
1058 read_lock( kafka -> rwlock );
1059 status = kafka -> pooling_thread_status ;
1060 unlock( kafka -> rwlock );
1061
1062 if( status == 2 )
1063 break ;
1064
1065 int64_t elapsed_time = get_usec( &chrono );
1066 if( kafka -> poll_timeout != -1 )
1067 {
1068 remaining_time -= elapsed_time ;
1069 if( remaining_time < 0 )
1070 {
1071 if( kafka -> mode == RD_KAFKA_PRODUCER )
1072 {
1073 n_log( LOG_DEBUG , "timeouted on kafka handle %p" , kafka -> rd_kafka_handle );
1074 }
1075 else if( kafka -> mode == RD_KAFKA_CONSUMER )
1076 {
1077 n_log( LOG_DEBUG , "timeouted on kafka handle %p" , kafka -> rd_kafka_handle );
1078 }
1079 remaining_time = 0 ;
1080 status = 2 ;
1081 break ;
1082 }
1083 }
1084 //n_log( LOG_DEBUG , "remaining time: %d on kafka handle %p" , remaining_time , kafka -> rd_kafka_handle );
1085 }
1086
1087 write_lock( kafka -> rwlock );
1088 kafka -> pooling_thread_status = 0;
1089 unlock( kafka -> rwlock );
1090
1091 n_log( LOG_DEBUG , "exiting pooling thread for kafka handler %p mode %s" , kafka -> rd_kafka_handle , (kafka->mode==RD_KAFKA_PRODUCER)?"PRODUCER":"CONSUMER" );
1092 pthread_exit( NULL );
1093 return NULL ;
1094} /* n_kafka_pooling_thread */
1095
1096
1097
1104{
1105 __n_assert( kafka, return FALSE );
1106
1107 read_lock( kafka -> rwlock );
1108 int status = kafka -> pooling_thread_status ;
1109 unlock( kafka -> rwlock );
1110
1111 if( status != 0 )
1112 {
1113 n_log( LOG_ERR, "kafka pooling thread already started for handle %p", kafka );
1114 return FALSE ;
1115 }
1116
1117 write_lock( kafka -> rwlock );
1118 kafka -> pooling_thread_status = 1 ;
1119
1120 if( pthread_create( &kafka -> pooling_thread , NULL, n_kafka_pooling_thread , (void *)kafka ) != 0 )
1121 {
1122 n_log( LOG_ERR, "unable to create pooling_thread for kafka handle %p" , kafka );
1123 unlock( kafka -> rwlock );
1124 return FALSE ;
1125 }
1126 unlock( kafka -> rwlock );
1127
1128 n_log( LOG_DEBUG , "pthread_create sucess for kafka handle %p->%p" , kafka , kafka -> rd_kafka_handle );
1129
1130 return TRUE ;
1131} /* n_kafka_start_pooling_thread */
1132
1133
1140{
1141 __n_assert( kafka, return FALSE );
1142
1143 read_lock( kafka -> rwlock );
1144 int pooling_thread_status = kafka -> pooling_thread_status ;
1145 unlock( kafka -> rwlock );
1146
1147 if( pooling_thread_status == 0 )
1148 {
1149 n_log( LOG_DEBUG , "kafka pooling thread already stopped for handle %p", kafka );
1150 return FALSE ;
1151 }
1152 if( pooling_thread_status == 2 )
1153 {
1154 n_log( LOG_DEBUG , "kafka pooling ask for stop thread already done for handle %p", kafka );
1155 return FALSE ;
1156 }
1157
1158 write_lock( kafka -> rwlock );
1159 kafka -> pooling_thread_status = 2 ;
1160 unlock( kafka -> rwlock );
1161
1162 pthread_join( kafka -> pooling_thread , NULL );
1163
1164 return TRUE ;
1165} /* n_kafka_stop_pooling_thread */
1166
1167
1174int n_kafka_dump_unprocessed( N_KAFKA *kafka , char *directory )
1175{
1176 __n_assert( kafka , return FALSE );
1177 __n_assert( directory , return FALSE );
1178
1179 int status = 0 ;
1180 int nb_todump = 0 ;
1181 read_lock( kafka -> rwlock );
1182 status = kafka -> pooling_thread_status ;
1183 nb_todump = kafka -> nb_queued + kafka -> nb_waiting + kafka -> nb_error ;
1184 if( status != 0 )
1185 {
1186 n_log( LOG_ERR , "kafka handle %p thread pooling func is still running, aborting dump" , kafka );
1187 unlock( kafka -> rwlock );
1188 return FALSE ;
1189 }
1190 if( nb_todump == 0 )
1191 {
1192 n_log( LOG_DEBUG , "kafka handle %p: nothing to dump, all events processed correctly" , kafka );
1193 unlock( kafka -> rwlock );
1194 return TRUE ;
1195 }
1196
1197 N_STR *dumpstr = new_nstr(0);
1198 list_foreach( node , kafka -> events_to_send )
1199 {
1200 N_KAFKA_EVENT *event = node -> ptr ;
1201 if( event -> status != N_KAFKA_EVENT_OK )
1202 {
1203 size_t offset = 0 ;
1204 if( event -> schema_id != -1 )
1205 offset = 5 ;
1206
1207 N_STR *filename = NULL ;
1208 nstrprintf( filename , "%s/%s+%p" , directory , kafka -> topic ,event);
1209 n_log( LOG_DEBUG , "Dumping unprocessed events to %s" , _nstr( filename) );
1210 // dump event here
1211 dumpstr -> data = event -> event_string -> data + offset ;
1212 dumpstr -> written = event -> event_string -> written - offset ;
1213 dumpstr -> length = event -> event_string -> length ;
1214 nstr_to_file( dumpstr , _nstr( filename ) );
1215 free_nstr( &filename );
1216 }
1217 }
1218 unlock( kafka -> rwlock );
1219 dumpstr -> data = NULL ;
1220 free_nstr( &dumpstr );
1221 return TRUE ;
1222} /* n_kafka_dump_unprocessed */
1223
1224
1231int n_kafka_load_unprocessed( N_KAFKA *kafka , char *directory )
1232{
1233 __n_assert( kafka , return FALSE );
1234 __n_assert( directory , return FALSE );
1235
1236 write_lock( kafka -> rwlock );
1237 // TODO:load events from filename
1238 // TODO: use base64decode( filename ) split( result , '+' ) to get brokersname+topic to check against what's saved with the event and what's inside kafka's handle conf
1239 unlock( kafka -> rwlock );
1240
1241 return TRUE ;
1242} /* n_kafka_load_unprocessed */
1243
1244
1245
1252{
1253 __n_assert( kafka , return NULL );
1254
1255 N_KAFKA_EVENT *event = NULL ;
1256
1257 write_lock( kafka -> rwlock );
1258 if( kafka -> received_events -> start )
1259 event = remove_list_node( kafka -> received_events , kafka -> received_events -> start , N_KAFKA_EVENT );
1260 unlock( kafka -> rwlock );
1261
1262 return event ;
1263} /* n_kafka_get_event */
#define init_lock(__rwlock_mutex)
Macro for initializing a rwlock.
Definition: n_common.h:337
#define FreeNoLog(__ptr)
Free Handler without log.
Definition: n_common.h:268
#define Malloc(__ptr, __struct, __size)
Malloc Handler to get errors and set to 0.
Definition: n_common.h:183
#define __n_assert(__ptr, __ret)
macro to assert things
Definition: n_common.h:276
#define _str(__PTR)
define true
Definition: n_common.h:172
int get_computer_name(char *computer_name, size_t len)
abort program with a text
Definition: n_common.c:58
#define rw_lock_destroy(__rwlock_mutex)
Macro to destroy rwlock mutex.
Definition: n_common.h:404
#define unlock(__rwlock_mutex)
Macro for releasing read/write lock a rwlock mutex.
Definition: n_common.h:389
#define _nstrp(__PTR)
N_STR or NULL pointer for testing purposes.
Definition: n_common.h:180
#define write_lock(__rwlock_mutex)
Macro for acquiring a write lock on a rwlock mutex.
Definition: n_common.h:373
#define Free(__ptr)
Free Handler to get errors.
Definition: n_common.h:256
#define read_lock(__rwlock_mutex)
Macro for acquiring a read lock on a rwlock mutex.
Definition: n_common.h:357
#define _nstr(__PTR)
N_STR or "NULL" string for logging purposes.
Definition: n_common.h:178
#define remove_list_node(__LIST_,__NODE_, __TYPE_)
Remove macro helper for void pointer casting.
Definition: n_list.h:83
int list_push(LIST *list, void *ptr, void(*destructor)(void *ptr))
Add a pointer to the end of the list.
Definition: n_list.c:244
#define list_foreach(__ITEM_, __LIST_)
ForEach macro helper.
Definition: n_list.h:70
int list_destroy(LIST **list)
Empty and Free a list container.
Definition: n_list.c:603
LIST * new_generic_list(int max_items)
Initialiaze a generic list container to max_items pointers.
Definition: n_list.c:20
Structure of a generic list node.
Definition: n_list.h:27
#define n_log(__LEVEL__,...)
Logging function wrapper to get line and func.
Definition: n_log.h:74
#define LOG_DEBUG
debug-level messages
Definition: n_log.h:66
#define LOG_ERR
error conditions
Definition: n_log.h:58
#define LOG_INFO
informational
Definition: n_log.h:64
int mode
kafka handle mode: RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER
Definition: n_kafka.h:98
int n_kafka_add_header_ex(N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
add a header entry to an event.
Definition: n_kafka.c:646
int n_kafka_dump_unprocessed(N_KAFKA *kafka, char *directory)
dump unprocessed/unset events
Definition: n_kafka.c:1174
N_KAFKA_EVENT * n_kafka_get_event(N_KAFKA *kafka)
get a received event from the N_KAFKA kafka handle
Definition: n_kafka.c:1251
#define N_KAFKA_EVENT_OK
state of an OK event
Definition: n_kafka.h:43
#define N_KAFKA_EVENT_ERROR
state of an errored event
Definition: n_kafka.h:41
int n_kafka_new_headers(N_KAFKA_EVENT *event, size_t count)
allocate a headers array for the event
Definition: n_kafka.c:617
int32_t n_kafka_get_schema_from_char(char *string)
get a schema from the first 4 bytes of a char *string, returning ntohl of the value
Definition: n_kafka.c:19
int n_kafka_put_schema_in_nstr(N_STR *string, int schema_id)
put a htonl schema id into the first 4 bytes of a N_STR *string
Definition: n_kafka.c:61
N_KAFKA * n_kafka_load_config(char *config_file, int mode)
load a kafka configuration from a file
Definition: n_kafka.c:297
#define N_KAFKA_EVENT_WAITING_ACK
state of a sent event waiting for acknowledgement
Definition: n_kafka.h:39
int n_kafka_put_schema_in_char(char *string, int schema_id)
put a htonl schema id into the first 4 bytes of a char *string
Definition: n_kafka.c:47
int n_kafka_poll(N_KAFKA *kafka)
Poll kafka handle in producer or consumer mode.
Definition: n_kafka.c:899
int n_kafka_event_destroy(N_KAFKA_EVENT **event)
destroy a kafka event and set it's pointer to NULL
Definition: n_kafka.c:885
void n_kafka_event_destroy_ptr(void *event_ptr)
festroy a kafka event
Definition: n_kafka.c:856
int n_kafka_load_unprocessed(N_KAFKA *kafka, char *directory)
load unprocessed/unset events
Definition: n_kafka.c:1231
int n_kafka_add_header(N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
add a header entry to an event.
Definition: n_kafka.c:681
int n_kafka_stop_pooling_thread(N_KAFKA *kafka)
stop the pooling thread of a kafka handle
Definition: n_kafka.c:1139
int n_kafka_produce(N_KAFKA *kafka, N_KAFKA_EVENT *event)
put an event in the events_to_send list
Definition: n_kafka.c:699
N_KAFKA_EVENT * n_kafka_new_event_from_char(char *string, size_t written, int schema_id)
make a new event from a char *string
Definition: n_kafka.c:784
int32_t n_kafka_get_schema_from_nstr(N_STR *string)
get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value
Definition: n_kafka.c:33
int n_kafka_get_status(N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
return the queues status
Definition: n_kafka.c:78
int n_kafka_start_pooling_thread(N_KAFKA *kafka)
start the pooling thread of a kafka handle
Definition: n_kafka.c:1103
N_KAFKA_EVENT * n_kafka_new_event(int schema_id)
create a new empty event
Definition: n_kafka.c:592
#define N_KAFKA_EVENT_CREATED
state of a freshly created event
Definition: n_kafka.h:45
N_KAFKA_EVENT * n_kafka_new_event_from_file(char *filename, int schema_id)
make a new event from a N_STR *string
Definition: n_kafka.c:841
N_KAFKA * n_kafka_new(int64_t poll_interval, int64_t poll_timeout, size_t errstr_len)
allocate a new kafka handle
Definition: n_kafka.c:246
#define N_KAFKA_EVENT_QUEUED
state of a queued event
Definition: n_kafka.h:37
void n_kafka_delete(N_KAFKA *kafka)
delete a N_KAFKA handle
Definition: n_kafka.c:185
N_KAFKA_EVENT * n_kafka_new_event_from_string(N_STR *string, int schema_id)
make a new event from a N_STR *string
Definition: n_kafka.c:824
structure of a KAFKA consumer or producer handle
Definition: n_kafka.h:70
structure of a KAFKA message
Definition: n_kafka.h:49
void free_nstr_ptr(void *ptr)
Free a N_STR pointer structure.
Definition: n_str.c:55
#define free_nstr(__ptr)
free a N_STR structure and set the pointer to NULL
Definition: n_str.h:222
int split_count(char **split_result)
Count split elements.
Definition: n_str.c:1111
#define nstrprintf(__nstr_var,...)
Macro to quickly allocate and sprintf to N_STR *.
Definition: n_str.h:97
int nstr_to_file(N_STR *str, char *filename)
Write a N_STR content into a file.
Definition: n_str.c:459
N_STR * new_nstr(NSTRBYTE size)
create a new N_STR string
Definition: n_str.c:215
char ** split(const char *str, const char *delim, int empty)
split the strings into a an array of char *pointer , ended by a NULL one.
Definition: n_str.c:1032
char * join(char **splitresult, char *delim)
join the array into a string
Definition: n_str.c:1153
N_STR * file_to_nstr(char *filename)
Load a whole file into a N_STR.
Definition: n_str.c:332
int free_split_result(char ***tab)
Free a split result allocated array.
Definition: n_str.c:1131
A box including a string and his lenght.
Definition: n_str.h:173
int start_HiTimer(N_TIME *timer)
Initialize or restart from zero any N_TIME HiTimer.
Definition: n_time.c:82
time_t get_usec(N_TIME *timer)
Poll any N_TIME HiTimer, returning usec, and moving currentTime to startTime.
Definition: n_time.c:107
Timing Structure.
Definition: n_time.h:34
base64 encoding and decoding functions using N_STR
Common headers and low-level hugly functions & define.
void * n_kafka_pooling_thread(void *ptr)
kafka produce or consume pooling thread function
Definition: n_kafka.c:1021
static void n_kafka_delivery_message_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
Message delivery report callback.
Definition: n_kafka.c:102
int n_kafka_produce_ex(N_KAFKA *kafka, N_KAFKA_EVENT *event)
produce an event on a N_KAFKA *kafka handle
Definition: n_kafka.c:724
kafka generic produce and consume event header