Nilorea Library
C utilities for networking, threading, graphics
n_kafka.c File Reference

generic kafka consume and produce event functions More...

#include "nilorea/n_kafka.h"
#include "nilorea/n_common.h"
#include "nilorea/n_base64.h"
+ Include dependency graph for n_kafka.c:

Go to the source code of this file.

Functions

int n_kafka_add_header (N_KAFKA_EVENT *event, N_STR *key, N_STR *value)
 add a header entry to an event. More...
 
int n_kafka_add_header_ex (N_KAFKA_EVENT *event, char *key, size_t key_length, char *value, size_t value_length)
 add a header entry to an event. More...
 
void n_kafka_delete (N_KAFKA *kafka)
 delete a N_KAFKA handle More...
 
static void n_kafka_delivery_message_callback (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
 Message delivery report callback. More...
 
int n_kafka_dump_unprocessed (N_KAFKA *kafka, char *directory)
 dump unprocessed/unset events More...
 
int n_kafka_event_destroy (N_KAFKA_EVENT **event)
 destroy a kafka event and set it's pointer to NULL More...
 
void n_kafka_event_destroy_ptr (void *event_ptr)
 festroy a kafka event More...
 
N_KAFKA_EVENTn_kafka_get_event (N_KAFKA *kafka)
 get a received event from the N_KAFKA kafka handle More...
 
int32_t n_kafka_get_schema_from_char (char *string)
 get a schema from the first 4 bytes of a char *string, returning ntohl of the value More...
 
int32_t n_kafka_get_schema_from_nstr (N_STR *string)
 get a schema from the first 4 bytes of a N_STR *string, returning ntohl of the value More...
 
int n_kafka_get_status (N_KAFKA *kafka, int *nb_queued, int *nb_waiting, int *nb_error)
 return the queues status More...
 
N_KAFKAn_kafka_load_config (char *config_file, int mode)
 load a kafka configuration from a file More...
 
int n_kafka_load_unprocessed (N_KAFKA *kafka, char *directory)
 load unprocessed/unset events More...
 
N_KAFKAn_kafka_new (int64_t poll_interval, int64_t poll_timeout, size_t errstr_len)
 allocate a new kafka handle More...
 
N_KAFKA_EVENTn_kafka_new_event (int schema_id)
 create a new empty event More...
 
N_KAFKA_EVENTn_kafka_new_event_from_char (char *string, size_t written, int schema_id)
 make a new event from a char *string More...
 
N_KAFKA_EVENTn_kafka_new_event_from_file (char *filename, int schema_id)
 make a new event from a N_STR *string More...
 
N_KAFKA_EVENTn_kafka_new_event_from_string (N_STR *string, int schema_id)
 make a new event from a N_STR *string More...
 
int n_kafka_new_headers (N_KAFKA_EVENT *event, size_t count)
 allocate a headers array for the event More...
 
int n_kafka_poll (N_KAFKA *kafka)
 Poll kafka handle in producer or consumer mode. More...
 
void * n_kafka_pooling_thread (void *ptr)
 kafka produce or consume pooling thread function More...
 
int n_kafka_produce (N_KAFKA *kafka, N_KAFKA_EVENT *event)
 put an event in the events_to_send list More...
 
int n_kafka_produce_ex (N_KAFKA *kafka, N_KAFKA_EVENT *event)
 produce an event on a N_KAFKA *kafka handle More...
 
int n_kafka_put_schema_in_char (char *string, int schema_id)
 put a htonl schema id into the first 4 bytes of a char *string More...
 
int n_kafka_put_schema_in_nstr (N_STR *string, int schema_id)
 put a htonl schema id into the first 4 bytes of a N_STR *string More...
 
int n_kafka_start_pooling_thread (N_KAFKA *kafka)
 start the pooling thread of a kafka handle More...
 
int n_kafka_stop_pooling_thread (N_KAFKA *kafka)
 stop the pooling thread of a kafka handle More...
 

Detailed Description

generic kafka consume and produce event functions

Author
Castagnier Mickael
Version
1.0
Date
23/11/2022

Definition in file n_kafka.c.

Function Documentation

◆ n_kafka_delivery_message_callback()

static void n_kafka_delivery_message_callback ( rd_kafka_t *  rk,
const rd_kafka_message_t *  rkmessage,
void *  opaque 
)
static

Message delivery report callback.

This callback is called exactly once per message, indicating if the message was succesfully delivered The callback is triggered from rd_kafka_poll() and executes on the application's thread

Parameters
rkkafka rd_kafka_t kafka handle
rkmessagepointer to the received event in kafka form
opaqueopaque structure holding the pointer of a sent event

Definition at line 102 of file n_kafka.c.

References __n_assert, _nstr, free_split_result(), LOG_DEBUG, LOG_ERR, LOG_INFO, N_KAFKA_EVENT_ERROR, N_KAFKA_EVENT_OK, n_log, split(), split_count(), unlock, and write_lock.

Referenced by n_kafka_load_config().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_pooling_thread()

void * n_kafka_pooling_thread ( void *  ptr)

kafka produce or consume pooling thread function

Parameters
ptr(void *)kafka handle
Returns
NULL

Definition at line 1021 of file n_kafka.c.

References _str, FreeNoLog, get_usec(), join(), LOG_DEBUG, LOG_ERR, N_KAFKA::mode, n_kafka_poll(), n_log, read_lock, start_HiTimer(), unlock, and write_lock.

Referenced by n_kafka_start_pooling_thread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ n_kafka_produce_ex()

int n_kafka_produce_ex ( N_KAFKA kafka,
N_KAFKA_EVENT event 
)

produce an event on a N_KAFKA *kafka handle

Parameters
kafkathe producer handle to use
eventevent to send
Returns
TRUE or FALSE

Definition at line 724 of file n_kafka.c.

References LOG_DEBUG, LOG_ERR, N_KAFKA_EVENT_ERROR, N_KAFKA_EVENT_WAITING_ACK, and n_log.

Referenced by n_kafka_poll().

+ Here is the caller graph for this function: