1 /*
2  * Copyright (c) 2013 Tihamer Petrovics <tihameri@gmail.com>
3  * Copyright (c) 2014 Pierre-Yves Ritschard <pyr@spootnik.org>
4  * Copyright (c) 2013-2019 Balabit
5  * Copyright (c) 2019 Balazs Scheidler
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License version 2 as published
9  * by the Free Software Foundation, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  * As an additional exemption you are allowed to compile & link against the
21  * OpenSSL libraries as published by the OpenSSL project. See the file
22  * COPYING for details.
23  *
24  */
25 
26 
27 #include "kafka-dest-driver.h"
28 #include "kafka-props.h"
29 #include "kafka-dest-worker.h"
30 
31 #include <librdkafka/rdkafka.h>
32 #include <stdlib.h>
33 
34 /*
35  * Configuration
36  */
37 
38 void
kafka_dd_set_topic(LogDriver * d,LogTemplate * topic)39 kafka_dd_set_topic(LogDriver *d, LogTemplate *topic)
40 {
41   KafkaDestDriver *self = (KafkaDestDriver *)d;
42 
43   log_template_unref(self->topic_name);
44   self->topic_name = topic;
45 }
46 
47 void
kafka_dd_set_fallback_topic(LogDriver * d,const gchar * fallback_topic)48 kafka_dd_set_fallback_topic(LogDriver *d, const gchar *fallback_topic)
49 {
50   KafkaDestDriver *self = (KafkaDestDriver *)d;
51 
52   g_free(self->fallback_topic_name);
53   self->fallback_topic_name = g_strdup(fallback_topic);
54 }
55 
56 void
kafka_dd_merge_config(LogDriver * d,GList * props)57 kafka_dd_merge_config(LogDriver *d, GList *props)
58 {
59   KafkaDestDriver *self = (KafkaDestDriver *)d;
60 
61   self->config = g_list_concat(self->config, props);
62 }
63 
64 void
kafka_dd_set_bootstrap_servers(LogDriver * d,const gchar * bootstrap_servers)65 kafka_dd_set_bootstrap_servers(LogDriver *d, const gchar *bootstrap_servers)
66 {
67   KafkaDestDriver *self = (KafkaDestDriver *)d;
68 
69   g_free(self->bootstrap_servers);
70   self->bootstrap_servers = g_strdup(bootstrap_servers);
71 }
72 
73 void
kafka_dd_set_key_ref(LogDriver * d,LogTemplate * key)74 kafka_dd_set_key_ref(LogDriver *d, LogTemplate *key)
75 {
76   KafkaDestDriver *self = (KafkaDestDriver *)d;
77 
78   log_template_unref(self->key);
79   self->key = key;
80 }
81 
82 void
kafka_dd_set_message_ref(LogDriver * d,LogTemplate * message)83 kafka_dd_set_message_ref(LogDriver *d, LogTemplate *message)
84 {
85   KafkaDestDriver *self = (KafkaDestDriver *)d;
86 
87   log_template_unref(self->message);
88   self->message = message;
89 }
90 
91 void
kafka_dd_set_flush_timeout_on_shutdown(LogDriver * d,gint flush_timeout_on_shutdown)92 kafka_dd_set_flush_timeout_on_shutdown(LogDriver *d, gint flush_timeout_on_shutdown)
93 {
94   KafkaDestDriver *self = (KafkaDestDriver *)d;
95 
96   self->flush_timeout_on_shutdown = flush_timeout_on_shutdown;
97 }
98 
99 void
kafka_dd_set_flush_timeout_on_reload(LogDriver * d,gint flush_timeout_on_reload)100 kafka_dd_set_flush_timeout_on_reload(LogDriver *d, gint flush_timeout_on_reload)
101 {
102   KafkaDestDriver *self = (KafkaDestDriver *)d;
103 
104   self->flush_timeout_on_reload = flush_timeout_on_reload;
105 }
106 
107 void
kafka_dd_set_poll_timeout(LogDriver * d,gint poll_timeout)108 kafka_dd_set_poll_timeout(LogDriver *d, gint poll_timeout)
109 {
110   KafkaDestDriver *self = (KafkaDestDriver *)d;
111 
112   self->poll_timeout = poll_timeout;
113 }
114 
115 void
kafka_dd_set_transaction_commit(LogDriver * d,gboolean transaction_commit)116 kafka_dd_set_transaction_commit(LogDriver *d, gboolean transaction_commit)
117 {
118 #ifndef SYSLOG_NG_HAVE_RD_KAFKA_INIT_TRANSACTIONS
119   msg_warning_once("syslog-ng version does not support transactional api, because the librdkafka version does not support it");
120 #else
121   KafkaDestDriver *self = (KafkaDestDriver *)d;
122 
123   self->transaction_commit = transaction_commit;
124 #endif
125 }
126 
127 LogTemplateOptions *
kafka_dd_get_template_options(LogDriver * d)128 kafka_dd_get_template_options(LogDriver *d)
129 {
130   KafkaDestDriver *self = (KafkaDestDriver *)d;
131 
132   return &self->template_options;
133 }
134 
135 gboolean
kafka_dd_is_topic_name_a_template(KafkaDestDriver * self)136 kafka_dd_is_topic_name_a_template(KafkaDestDriver *self)
137 {
138   return self->topic == NULL;
139 }
140 /* methods */
141 
142 static const gchar *
_format_stats_instance(LogThreadedDestDriver * d)143 _format_stats_instance(LogThreadedDestDriver *d)
144 {
145   KafkaDestDriver *self = (KafkaDestDriver *)d;
146   static gchar stats_name[1024];
147 
148   g_snprintf(stats_name, sizeof(stats_name), "kafka,%s", self->topic_name->template);
149   return stats_name;
150 }
151 
152 static const gchar *
_format_persist_name(const LogPipe * d)153 _format_persist_name(const LogPipe *d)
154 {
155   const KafkaDestDriver *self = (const KafkaDestDriver *)d;
156   static gchar persist_name[1024];
157 
158   if (d->persist_name)
159     g_snprintf(persist_name, sizeof(persist_name), "kafka.%s", d->persist_name);
160   else
161     g_snprintf(persist_name, sizeof(persist_name), "kafka(%s)", self->topic_name->template);
162   return persist_name;
163 }
164 
165 void
_kafka_log_callback(const rd_kafka_t * rkt,int level,const char * fac,const char * msg)166 _kafka_log_callback(const rd_kafka_t *rkt, int level, const char *fac, const char *msg)
167 {
168   gchar *buf = g_strdup_printf("librdkafka: %s(%d): %s", fac, level, msg);
169   msg_event_send(msg_event_create(level, buf, NULL));
170   g_free(buf);
171 }
172 
173 
174 static gboolean
_contains_valid_pattern(const gchar * name)175 _contains_valid_pattern(const gchar *name)
176 {
177   const gchar *p;
178   for (p = name; *p; p++)
179     {
180       if (!((*p >= 'a' && *p <= 'z') ||
181             (*p >= 'A' && *p <= 'Z') ||
182             (*p >= '0' && *p <= '9') ||
183             (*p == '_') || (*p == '-') || (*p == '.')))
184         {
185           return FALSE;
186         }
187     }
188   return TRUE;
189 }
190 
191 GQuark
topic_name_error_quark(void)192 topic_name_error_quark(void)
193 {
194   return g_quark_from_static_string("invalid-topic-name-error-quark");
195 }
196 
197 gboolean
kafka_dd_validate_topic_name(const gchar * name,GError ** error)198 kafka_dd_validate_topic_name(const gchar *name, GError **error)
199 {
200   gint len = strlen(name);
201 
202   if (len == 0)
203     {
204       g_set_error(error, TOPIC_NAME_ERROR, TOPIC_LENGTH_ZERO,
205                   "kafka: topic name is illegal, it can't be empty");
206 
207       return FALSE;
208     }
209 
210   if ((!g_strcmp0(name, ".")) || !g_strcmp0(name, ".."))
211     {
212       g_set_error(error, TOPIC_NAME_ERROR, TOPIC_DOT_TWO_DOTS,
213                   "kafka: topic name cannot be . or ..");
214 
215       return FALSE;
216     }
217 
218   if (len > 249)
219     {
220       g_set_error(error, TOPIC_NAME_ERROR, TOPIC_EXCEEDS_MAX_LENGTH,
221                   "kafka: topic name cannot be longer than 249 characters");
222 
223       return FALSE;
224     }
225 
226   if (!_contains_valid_pattern(name))
227     {
228       g_set_error(error, TOPIC_NAME_ERROR, TOPIC_INVALID_PATTERN,
229                   "kafka: topic name %s is illegal as it contains characters other than pattern [-._a-zA-Z0-9]+", name);
230 
231       return FALSE;
232     }
233 
234   return TRUE;
235 }
236 
237 rd_kafka_topic_t *
_construct_topic(KafkaDestDriver * self,const gchar * name)238 _construct_topic(KafkaDestDriver *self, const gchar *name)
239 {
240   g_assert(self->kafka != NULL);
241 
242   GError *error = NULL;
243 
244   if (kafka_dd_validate_topic_name(name, &error))
245     {
246       return rd_kafka_topic_new(self->kafka, name, NULL);
247     }
248 
249   msg_error("Error constructing topic", evt_tag_str("topic_name", name),
250             evt_tag_str("driver", self->super.super.super.id),
251             log_pipe_location_tag(&self->super.super.super.super),
252             evt_tag_str("error message", error->message));
253   g_error_free(error);
254 
255   return NULL;
256 }
257 
258 rd_kafka_topic_t *
kafka_dd_query_insert_topic(KafkaDestDriver * self,const gchar * name)259 kafka_dd_query_insert_topic(KafkaDestDriver *self, const gchar *name)
260 {
261   g_mutex_lock(&self->topics_lock);
262   rd_kafka_topic_t *topic = g_hash_table_lookup(self->topics, name);
263 
264   if (topic)
265     {
266       g_mutex_unlock(&self->topics_lock);
267       return topic;
268     }
269 
270   topic = _construct_topic(self, name);
271 
272   if (topic)
273     {
274       g_hash_table_insert(self->topics, g_strdup(name), topic);
275     }
276 
277   g_mutex_unlock(&self->topics_lock);
278   return topic;
279 }
280 
281 static void
_kafka_delivery_report_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)282 _kafka_delivery_report_cb(rd_kafka_t *rk,
283                           void *payload, size_t len,
284                           rd_kafka_resp_err_t err,
285                           void *opaque, void *msg_opaque)
286 {
287   KafkaDestDriver *self = (KafkaDestDriver *) opaque;
288 
289   /* delivery callback will be called from the the thread where rd_kafka_poll is called,
290    * which could be any worker and not just worker#0 due to the kafka_dd_shutdown in thread_init
291    * and the main thread too. Driver/worker state modification should be done carefully.
292    */
293   if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
294     {
295       msg_debug("kafka: delivery report for message came back with an error, message is lost",
296                 evt_tag_str("topic", self->topic_name->template),
297                 evt_tag_str("fallback_topic", self->fallback_topic_name),
298                 evt_tag_printf("message", "%.*s", (int) MIN(len, 128), (char *) payload),
299                 evt_tag_str("error", rd_kafka_err2str(err)),
300                 evt_tag_str("driver", self->super.super.super.id),
301                 log_pipe_location_tag(&self->super.super.super.super));
302     }
303   else
304     {
305       msg_debug("kafka: delivery report successful",
306                 evt_tag_str("topic", self->topic_name->template),
307                 evt_tag_str("fallback_topic", self->fallback_topic_name),
308                 evt_tag_printf("message", "%.*s", (int) MIN(len, 128), (char *) payload),
309                 evt_tag_str("error", rd_kafka_err2str(err)),
310                 evt_tag_str("driver", self->super.super.super.id),
311                 log_pipe_location_tag(&self->super.super.super.super));
312     }
313 }
314 
315 static gboolean
_conf_set_prop(rd_kafka_conf_t * conf,const gchar * name,const gchar * value)316 _conf_set_prop(rd_kafka_conf_t *conf, const gchar *name, const gchar *value)
317 {
318   gchar errbuf[1024];
319 
320   msg_debug("kafka: setting librdkafka config property",
321             evt_tag_str("name", name),
322             evt_tag_str("value", value));
323   if (rd_kafka_conf_set(conf, name, value, errbuf, sizeof(errbuf)) < 0)
324     {
325       msg_error("kafka: error setting librdkafka config property",
326                 evt_tag_str("name", name),
327                 evt_tag_str("value", value),
328                 evt_tag_str("error", errbuf));
329       return FALSE;
330     }
331   return TRUE;
332 }
333 
334 /*
335  * Main thread
336  */
337 
338 
339 static gboolean
_is_property_protected(const gchar * property_name)340 _is_property_protected(const gchar *property_name)
341 {
342   static gchar *protected_properties[] =
343   {
344     "bootstrap.servers",
345     "metadata.broker.list",
346   };
347 
348   for (gint i = 0; i < G_N_ELEMENTS(protected_properties); i++)
349     {
350       if (strcmp(property_name, protected_properties[i]) == 0)
351         {
352           msg_warning("kafka: protected config properties cannot be overridden",
353                       evt_tag_str("name", property_name));
354           return TRUE;
355         }
356     }
357   return FALSE;
358 }
359 
360 static gboolean
_apply_config_props(rd_kafka_conf_t * conf,GList * props)361 _apply_config_props(rd_kafka_conf_t *conf, GList *props)
362 {
363   GList *ll;
364 
365   for (ll = props; ll != NULL; ll = g_list_next(ll))
366     {
367       KafkaProperty *kp = ll->data;
368       if (!_is_property_protected(kp->name))
369         if (!_conf_set_prop(conf, kp->name, kp->value))
370           return FALSE;
371     }
372   return TRUE;
373 }
374 
375 static rd_kafka_t *
_construct_client(KafkaDestDriver * self)376 _construct_client(KafkaDestDriver *self)
377 {
378   rd_kafka_t *client;
379   rd_kafka_conf_t *conf;
380   gchar errbuf[1024];
381 
382   conf = rd_kafka_conf_new();
383   if (!_conf_set_prop(conf, "metadata.broker.list", self->bootstrap_servers))
384     return NULL;
385   if (!_conf_set_prop(conf, "topic.partitioner", "murmur2_random"))
386     return NULL;
387 
388   if (self->transaction_commit)
389     _conf_set_prop(conf, "transactional.id",
390                    log_pipe_get_persist_name(&self->super.super.super.super));
391 
392   if (!_apply_config_props(conf, self->config))
393     return NULL;
394   rd_kafka_conf_set_log_cb(conf, _kafka_log_callback);
395   rd_kafka_conf_set_dr_cb(conf, _kafka_delivery_report_cb);
396   rd_kafka_conf_set_opaque(conf, self);
397   client = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, sizeof(errbuf));
398   if (!client)
399     {
400       msg_error("kafka: error constructing the kafka connection object",
401                 evt_tag_str("topic", self->topic_name->template),
402                 evt_tag_str("error", errbuf),
403                 evt_tag_str("driver", self->super.super.super.id),
404                 log_pipe_location_tag(&self->super.super.super.super));
405     }
406   return client;
407 }
408 
409 static LogThreadedDestWorker *
_construct_worker(LogThreadedDestDriver * s,gint worker_index)410 _construct_worker(LogThreadedDestDriver *s, gint worker_index)
411 {
412   return kafka_dest_worker_new(s, worker_index);
413 }
414 
415 static gint
_get_flush_timeout(KafkaDestDriver * self)416 _get_flush_timeout(KafkaDestDriver *self)
417 {
418   GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
419   if (cfg_is_shutting_down(cfg))
420     return self->flush_timeout_on_shutdown;
421   return self->flush_timeout_on_reload;
422 }
423 
424 static void
_flush_inflight_messages(KafkaDestDriver * self)425 _flush_inflight_messages(KafkaDestDriver *self)
426 {
427   rd_kafka_resp_err_t err;
428   gint outq_len = rd_kafka_outq_len(self->kafka);
429   gint timeout_ms = _get_flush_timeout(self);
430 
431   if (outq_len > 0)
432     {
433       msg_notice("kafka: shutting down kafka producer, while messages are still in-flight, waiting for messages to flush",
434 
435                  evt_tag_str("topic", self->topic_name->template),
436                  evt_tag_str("fallback_topic", self->fallback_topic_name),
437                  evt_tag_int("outq_len", outq_len),
438                  evt_tag_int("timeout_ms", timeout_ms),
439                  evt_tag_str("driver", self->super.super.super.id),
440                  log_pipe_location_tag(&self->super.super.super.super));
441     }
442   err = rd_kafka_flush(self->kafka, timeout_ms);
443   if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
444     {
445       msg_error("kafka: error flushing accumulated messages during shutdown, rd_kafka_flush() returned failure, this might indicate that some in-flight messages are lost",
446                 evt_tag_str("topic", self->topic_name->template),
447                 evt_tag_str("fallback_topic", self->fallback_topic_name),
448                 evt_tag_int("outq_len", rd_kafka_outq_len(self->kafka)),
449                 evt_tag_str("error", rd_kafka_err2str(err)),
450                 evt_tag_str("driver", self->super.super.super.id),
451                 log_pipe_location_tag(&self->super.super.super.super));
452     }
453   outq_len = rd_kafka_outq_len(self->kafka);
454 
455   if (outq_len != 0)
456     msg_notice("kafka: timeout while waiting for the librdkafka queue to empty, the "
457                "remaining entries will be purged and lost",
458                evt_tag_int("timeout_ms", timeout_ms),
459                evt_tag_int("outq_len", outq_len));
460 }
461 
462 static void
_purge_remaining_messages(KafkaDestDriver * self)463 _purge_remaining_messages(KafkaDestDriver *self)
464 {
465   /* we are purging all messages, those ones that are sitting in the queue
466    * and also those that were sent and not yet acknowledged.  The purged
467    * messages will generate failed delivery reports. */
468 
469   rd_kafka_purge(self->kafka, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
470   rd_kafka_poll(self->kafka, 0);
471 }
472 
473 static gboolean
_init_template_topic_name(KafkaDestDriver * self)474 _init_template_topic_name(KafkaDestDriver *self)
475 {
476   msg_debug("kafka: The topic name is a template",
477             evt_tag_str("topic", self->topic_name->template),
478             evt_tag_str("driver", self->super.super.super.id),
479             log_pipe_location_tag(&self->super.super.super.super));
480 
481   if (!self->fallback_topic_name)
482     {
483       msg_error("kafka: fallback_topic() required when the topic name is a template",
484                 evt_tag_str("driver", self->super.super.super.id),
485                 log_pipe_location_tag(&self->super.super.super.super));
486       return FALSE;
487     }
488 
489   rd_kafka_topic_t *fallback_topic = _construct_topic(self, self->fallback_topic_name);
490 
491   if (fallback_topic == NULL)
492     {
493       msg_error("kafka: error constructing the fallback topic object",
494                 evt_tag_str("fallback_topic", self->fallback_topic_name),
495                 evt_tag_str("driver", self->super.super.super.id),
496                 log_pipe_location_tag(&self->super.super.super.super));
497       return FALSE;
498     }
499 
500   self->topics = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, (GDestroyNotify) rd_kafka_topic_destroy);
501   g_hash_table_insert(self->topics, g_strdup(self->fallback_topic_name), fallback_topic);
502 
503   return TRUE;
504 }
505 
506 static gboolean
_topic_name_is_a_template(KafkaDestDriver * self)507 _topic_name_is_a_template(KafkaDestDriver *self)
508 {
509   return !log_template_is_literal_string(self->topic_name);
510 }
511 
512 static gboolean
_init_literal_topic_name(KafkaDestDriver * self)513 _init_literal_topic_name(KafkaDestDriver *self)
514 {
515   self->topic = _construct_topic(self, self->topic_name->template);
516 
517   if (self->topic == NULL)
518     {
519       msg_error("kafka: error constructing the kafka topic object",
520                 evt_tag_str("topic", self->topic_name->template),
521                 evt_tag_str("driver", self->super.super.super.id),
522                 log_pipe_location_tag(&self->super.super.super.super));
523       return FALSE;
524     }
525 
526   return TRUE;
527 }
528 
529 static gboolean
_init_topic_name(KafkaDestDriver * self)530 _init_topic_name(KafkaDestDriver *self)
531 {
532   if (_topic_name_is_a_template(self))
533     return _init_template_topic_name(self);
534   else
535     return _init_literal_topic_name(self);
536 }
537 
538 static void
_destroy_kafka(LogDriver * s)539 _destroy_kafka(LogDriver *s)
540 {
541   KafkaDestDriver *self = (KafkaDestDriver *)s;
542 
543   if (self->topics)
544     g_hash_table_unref(self->topics);
545   if (self->topic)
546     rd_kafka_topic_destroy(self->topic);
547 
548   if (self->kafka)
549     rd_kafka_destroy(self->kafka);
550 }
551 
552 gboolean
kafka_dd_reopen(LogDriver * s)553 kafka_dd_reopen(LogDriver *s)
554 {
555   KafkaDestDriver *self = (KafkaDestDriver *)s;
556   _destroy_kafka(s);
557 
558   self->kafka = _construct_client(self);
559   if (self->kafka == NULL)
560     {
561       msg_error("kafka: error constructing kafka connection object",
562                 evt_tag_str("topic", self->topic_name->template),
563                 evt_tag_str("driver", self->super.super.super.id),
564                 log_pipe_location_tag(&self->super.super.super.super));
565       return FALSE;
566     }
567   if (!_init_topic_name(self))
568     return FALSE;
569 
570   self->transaction_inited = FALSE;
571 
572   return TRUE;
573 }
574 
575 gboolean
kafka_dd_init(LogPipe * s)576 kafka_dd_init(LogPipe *s)
577 {
578   KafkaDestDriver *self = (KafkaDestDriver *)s;
579   GlobalConfig *cfg = log_pipe_get_config(s);
580 
581   if (!self->topic_name)
582     {
583       msg_error("kafka: the topic() argument is required for kafka destinations",
584                 evt_tag_str("driver", self->super.super.super.id),
585                 log_pipe_location_tag(&self->super.super.super.super));
586       return FALSE;
587     }
588   if (!self->bootstrap_servers)
589     {
590       msg_error("kafka: the bootstrap-servers() option is required for kafka destinations",
591                 evt_tag_str("driver", self->super.super.super.id),
592                 log_pipe_location_tag(&self->super.super.super.super));
593       return FALSE;
594     }
595 
596   if (!kafka_dd_reopen(&self->super.super.super))
597     {
598       return FALSE;
599     }
600 
601 
602   if (self->transaction_commit)
603     {
604       /*
605        * The transaction api works on the rd_kafka client level,
606        * and packing multiple kafka operation into an atomic one.
607        * But it bundles them by calling the same operations,
608        * there is no way to selectivly bundle calls.
609        * Thus a worker cannot have its own dedicated transaction.
610        *
611        */
612       if (self->super.num_workers > 1)
613         {
614           msg_info("kafka: in case of sync_send(yes) option the number of workers limited to 1", evt_tag_int("configured_workers",
615                    self->super.num_workers), evt_tag_int("workers", 1));
616           log_threaded_dest_driver_set_num_workers(&self->super.super.super, 1);
617         }
618 
619     }
620 
621   if (!log_threaded_dest_driver_init_method(s))
622     return FALSE;
623 
624   if (self->message == NULL)
625     {
626       self->message = log_template_new(cfg, NULL);
627       log_template_compile(self->message, "$ISODATE $HOST $MSGHDR$MSG", NULL);
628     }
629 
630   log_template_options_init(&self->template_options, cfg);
631   msg_verbose("kafka: Kafka destination initialized",
632               evt_tag_str("topic", self->topic_name->template),
633               evt_tag_str("fallback_topic", self->fallback_topic_name),
634               evt_tag_str("key", self->key ? self->key->template : "NULL"),
635               evt_tag_str("message", self->message->template),
636               evt_tag_str("driver", self->super.super.super.id),
637               log_pipe_location_tag(&self->super.super.super.super));
638 
639   return TRUE;
640 }
641 
642 void
kafka_dd_shutdown(LogThreadedDestDriver * s)643 kafka_dd_shutdown(LogThreadedDestDriver *s)
644 {
645   KafkaDestDriver *self = (KafkaDestDriver *)s;
646 
647   /* this can be called from the worker threads and
648    * during reloda/shutdown from the main thread (deinit)
649    * No need to lock here, as librdkafka API is thread safe, it does the locking for us.
650    */
651 
652   _flush_inflight_messages(self);
653   _purge_remaining_messages(self);
654 }
655 
656 static void
_check_for_remaining_messages(KafkaDestDriver * self)657 _check_for_remaining_messages(KafkaDestDriver *self)
658 {
659   gint outq_len = rd_kafka_outq_len(self->kafka);
660   if (outq_len != 0)
661     msg_notice("kafka: failed to completely empty rdkafka queues, as we still have entries in "
662                "the queue after flush() and purge(), this is probably causing a memory leak, "
663                "please contact syslog-ng authors for support",
664                evt_tag_int("outq_len", outq_len));
665 }
666 
667 static gboolean
kafka_dd_deinit(LogPipe * s)668 kafka_dd_deinit(LogPipe *s)
669 {
670   KafkaDestDriver *self = (KafkaDestDriver *)s;
671 
672   kafka_dd_shutdown(&self->super);
673   _check_for_remaining_messages(self);
674 
675   return log_threaded_dest_driver_deinit_method(s);
676 }
677 
678 static void
kafka_dd_free(LogPipe * d)679 kafka_dd_free(LogPipe *d)
680 {
681   KafkaDestDriver *self = (KafkaDestDriver *)d;
682 
683   log_template_options_destroy(&self->template_options);
684   _destroy_kafka(&self->super.super.super);
685   if (self->fallback_topic_name)
686     g_free(self->fallback_topic_name);
687   log_template_unref(self->key);
688   log_template_unref(self->message);
689   log_template_unref(self->topic_name);
690   g_mutex_clear(&self->topics_lock);
691   g_free(self->bootstrap_servers);
692   kafka_property_list_free(self->config);
693   log_threaded_dest_driver_free(d);
694 }
695 
696 /*
697  * Plugin glue.
698  */
699 
700 LogDriver *
kafka_dd_new(GlobalConfig * cfg)701 kafka_dd_new(GlobalConfig *cfg)
702 {
703   KafkaDestDriver *self = g_new0(KafkaDestDriver, 1);
704 
705   log_threaded_dest_driver_init_instance(&self->super, cfg);
706   self->super.super.super.super.init = kafka_dd_init;
707   self->super.super.super.super.deinit = kafka_dd_deinit;
708   self->super.super.super.super.free_fn = kafka_dd_free;
709   self->super.super.super.super.generate_persist_name = _format_persist_name;
710 
711   self->super.format_stats_instance = _format_stats_instance;
712   self->super.stats_source = stats_register_type("kafka");
713   self->super.worker.construct = _construct_worker;
714   /* one minute */
715   self->flush_timeout_on_shutdown = 60000;
716   self->flush_timeout_on_reload = 1000;
717   self->poll_timeout = 1000;
718 
719   g_mutex_init(&self->topics_lock);
720 
721   log_template_options_defaults(&self->template_options);
722 
723   return (LogDriver *)self;
724 }
725