1 /*
2 * Copyright (c) 2013 Tihamer Petrovics <tihameri@gmail.com>
3 * Copyright (c) 2014 Pierre-Yves Ritschard <pyr@spootnik.org>
4 * Copyright (c) 2013-2019 Balabit
5 * Copyright (c) 2019 Balazs Scheidler
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License version 2 as published
9 * by the Free Software Foundation, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 * As an additional exemption you are allowed to compile & link against the
21 * OpenSSL libraries as published by the OpenSSL project. See the file
22 * COPYING for details.
23 *
24 */
25
26
27 #include "kafka-dest-driver.h"
28 #include "kafka-props.h"
29 #include "kafka-dest-worker.h"
30
31 #include <librdkafka/rdkafka.h>
32 #include <stdlib.h>
33
34 /*
35 * Configuration
36 */
37
38 void
kafka_dd_set_topic(LogDriver * d,LogTemplate * topic)39 kafka_dd_set_topic(LogDriver *d, LogTemplate *topic)
40 {
41 KafkaDestDriver *self = (KafkaDestDriver *)d;
42
43 log_template_unref(self->topic_name);
44 self->topic_name = topic;
45 }
46
47 void
kafka_dd_set_fallback_topic(LogDriver * d,const gchar * fallback_topic)48 kafka_dd_set_fallback_topic(LogDriver *d, const gchar *fallback_topic)
49 {
50 KafkaDestDriver *self = (KafkaDestDriver *)d;
51
52 g_free(self->fallback_topic_name);
53 self->fallback_topic_name = g_strdup(fallback_topic);
54 }
55
56 void
kafka_dd_merge_config(LogDriver * d,GList * props)57 kafka_dd_merge_config(LogDriver *d, GList *props)
58 {
59 KafkaDestDriver *self = (KafkaDestDriver *)d;
60
61 self->config = g_list_concat(self->config, props);
62 }
63
64 void
kafka_dd_set_bootstrap_servers(LogDriver * d,const gchar * bootstrap_servers)65 kafka_dd_set_bootstrap_servers(LogDriver *d, const gchar *bootstrap_servers)
66 {
67 KafkaDestDriver *self = (KafkaDestDriver *)d;
68
69 g_free(self->bootstrap_servers);
70 self->bootstrap_servers = g_strdup(bootstrap_servers);
71 }
72
73 void
kafka_dd_set_key_ref(LogDriver * d,LogTemplate * key)74 kafka_dd_set_key_ref(LogDriver *d, LogTemplate *key)
75 {
76 KafkaDestDriver *self = (KafkaDestDriver *)d;
77
78 log_template_unref(self->key);
79 self->key = key;
80 }
81
82 void
kafka_dd_set_message_ref(LogDriver * d,LogTemplate * message)83 kafka_dd_set_message_ref(LogDriver *d, LogTemplate *message)
84 {
85 KafkaDestDriver *self = (KafkaDestDriver *)d;
86
87 log_template_unref(self->message);
88 self->message = message;
89 }
90
91 void
kafka_dd_set_flush_timeout_on_shutdown(LogDriver * d,gint flush_timeout_on_shutdown)92 kafka_dd_set_flush_timeout_on_shutdown(LogDriver *d, gint flush_timeout_on_shutdown)
93 {
94 KafkaDestDriver *self = (KafkaDestDriver *)d;
95
96 self->flush_timeout_on_shutdown = flush_timeout_on_shutdown;
97 }
98
99 void
kafka_dd_set_flush_timeout_on_reload(LogDriver * d,gint flush_timeout_on_reload)100 kafka_dd_set_flush_timeout_on_reload(LogDriver *d, gint flush_timeout_on_reload)
101 {
102 KafkaDestDriver *self = (KafkaDestDriver *)d;
103
104 self->flush_timeout_on_reload = flush_timeout_on_reload;
105 }
106
107 void
kafka_dd_set_poll_timeout(LogDriver * d,gint poll_timeout)108 kafka_dd_set_poll_timeout(LogDriver *d, gint poll_timeout)
109 {
110 KafkaDestDriver *self = (KafkaDestDriver *)d;
111
112 self->poll_timeout = poll_timeout;
113 }
114
115 void
kafka_dd_set_transaction_commit(LogDriver * d,gboolean transaction_commit)116 kafka_dd_set_transaction_commit(LogDriver *d, gboolean transaction_commit)
117 {
118 #ifndef SYSLOG_NG_HAVE_RD_KAFKA_INIT_TRANSACTIONS
119 msg_warning_once("syslog-ng version does not support transactional api, because the librdkafka version does not support it");
120 #else
121 KafkaDestDriver *self = (KafkaDestDriver *)d;
122
123 self->transaction_commit = transaction_commit;
124 #endif
125 }
126
127 LogTemplateOptions *
kafka_dd_get_template_options(LogDriver * d)128 kafka_dd_get_template_options(LogDriver *d)
129 {
130 KafkaDestDriver *self = (KafkaDestDriver *)d;
131
132 return &self->template_options;
133 }
134
135 gboolean
kafka_dd_is_topic_name_a_template(KafkaDestDriver * self)136 kafka_dd_is_topic_name_a_template(KafkaDestDriver *self)
137 {
138 return self->topic == NULL;
139 }
140 /* methods */
141
142 static const gchar *
_format_stats_instance(LogThreadedDestDriver * d)143 _format_stats_instance(LogThreadedDestDriver *d)
144 {
145 KafkaDestDriver *self = (KafkaDestDriver *)d;
146 static gchar stats_name[1024];
147
148 g_snprintf(stats_name, sizeof(stats_name), "kafka,%s", self->topic_name->template);
149 return stats_name;
150 }
151
152 static const gchar *
_format_persist_name(const LogPipe * d)153 _format_persist_name(const LogPipe *d)
154 {
155 const KafkaDestDriver *self = (const KafkaDestDriver *)d;
156 static gchar persist_name[1024];
157
158 if (d->persist_name)
159 g_snprintf(persist_name, sizeof(persist_name), "kafka.%s", d->persist_name);
160 else
161 g_snprintf(persist_name, sizeof(persist_name), "kafka(%s)", self->topic_name->template);
162 return persist_name;
163 }
164
165 void
_kafka_log_callback(const rd_kafka_t * rkt,int level,const char * fac,const char * msg)166 _kafka_log_callback(const rd_kafka_t *rkt, int level, const char *fac, const char *msg)
167 {
168 gchar *buf = g_strdup_printf("librdkafka: %s(%d): %s", fac, level, msg);
169 msg_event_send(msg_event_create(level, buf, NULL));
170 g_free(buf);
171 }
172
173
174 static gboolean
_contains_valid_pattern(const gchar * name)175 _contains_valid_pattern(const gchar *name)
176 {
177 const gchar *p;
178 for (p = name; *p; p++)
179 {
180 if (!((*p >= 'a' && *p <= 'z') ||
181 (*p >= 'A' && *p <= 'Z') ||
182 (*p >= '0' && *p <= '9') ||
183 (*p == '_') || (*p == '-') || (*p == '.')))
184 {
185 return FALSE;
186 }
187 }
188 return TRUE;
189 }
190
191 GQuark
topic_name_error_quark(void)192 topic_name_error_quark(void)
193 {
194 return g_quark_from_static_string("invalid-topic-name-error-quark");
195 }
196
197 gboolean
kafka_dd_validate_topic_name(const gchar * name,GError ** error)198 kafka_dd_validate_topic_name(const gchar *name, GError **error)
199 {
200 gint len = strlen(name);
201
202 if (len == 0)
203 {
204 g_set_error(error, TOPIC_NAME_ERROR, TOPIC_LENGTH_ZERO,
205 "kafka: topic name is illegal, it can't be empty");
206
207 return FALSE;
208 }
209
210 if ((!g_strcmp0(name, ".")) || !g_strcmp0(name, ".."))
211 {
212 g_set_error(error, TOPIC_NAME_ERROR, TOPIC_DOT_TWO_DOTS,
213 "kafka: topic name cannot be . or ..");
214
215 return FALSE;
216 }
217
218 if (len > 249)
219 {
220 g_set_error(error, TOPIC_NAME_ERROR, TOPIC_EXCEEDS_MAX_LENGTH,
221 "kafka: topic name cannot be longer than 249 characters");
222
223 return FALSE;
224 }
225
226 if (!_contains_valid_pattern(name))
227 {
228 g_set_error(error, TOPIC_NAME_ERROR, TOPIC_INVALID_PATTERN,
229 "kafka: topic name %s is illegal as it contains characters other than pattern [-._a-zA-Z0-9]+", name);
230
231 return FALSE;
232 }
233
234 return TRUE;
235 }
236
237 rd_kafka_topic_t *
_construct_topic(KafkaDestDriver * self,const gchar * name)238 _construct_topic(KafkaDestDriver *self, const gchar *name)
239 {
240 g_assert(self->kafka != NULL);
241
242 GError *error = NULL;
243
244 if (kafka_dd_validate_topic_name(name, &error))
245 {
246 return rd_kafka_topic_new(self->kafka, name, NULL);
247 }
248
249 msg_error("Error constructing topic", evt_tag_str("topic_name", name),
250 evt_tag_str("driver", self->super.super.super.id),
251 log_pipe_location_tag(&self->super.super.super.super),
252 evt_tag_str("error message", error->message));
253 g_error_free(error);
254
255 return NULL;
256 }
257
258 rd_kafka_topic_t *
kafka_dd_query_insert_topic(KafkaDestDriver * self,const gchar * name)259 kafka_dd_query_insert_topic(KafkaDestDriver *self, const gchar *name)
260 {
261 g_mutex_lock(&self->topics_lock);
262 rd_kafka_topic_t *topic = g_hash_table_lookup(self->topics, name);
263
264 if (topic)
265 {
266 g_mutex_unlock(&self->topics_lock);
267 return topic;
268 }
269
270 topic = _construct_topic(self, name);
271
272 if (topic)
273 {
274 g_hash_table_insert(self->topics, g_strdup(name), topic);
275 }
276
277 g_mutex_unlock(&self->topics_lock);
278 return topic;
279 }
280
281 static void
_kafka_delivery_report_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)282 _kafka_delivery_report_cb(rd_kafka_t *rk,
283 void *payload, size_t len,
284 rd_kafka_resp_err_t err,
285 void *opaque, void *msg_opaque)
286 {
287 KafkaDestDriver *self = (KafkaDestDriver *) opaque;
288
289 /* delivery callback will be called from the the thread where rd_kafka_poll is called,
290 * which could be any worker and not just worker#0 due to the kafka_dd_shutdown in thread_init
291 * and the main thread too. Driver/worker state modification should be done carefully.
292 */
293 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
294 {
295 msg_debug("kafka: delivery report for message came back with an error, message is lost",
296 evt_tag_str("topic", self->topic_name->template),
297 evt_tag_str("fallback_topic", self->fallback_topic_name),
298 evt_tag_printf("message", "%.*s", (int) MIN(len, 128), (char *) payload),
299 evt_tag_str("error", rd_kafka_err2str(err)),
300 evt_tag_str("driver", self->super.super.super.id),
301 log_pipe_location_tag(&self->super.super.super.super));
302 }
303 else
304 {
305 msg_debug("kafka: delivery report successful",
306 evt_tag_str("topic", self->topic_name->template),
307 evt_tag_str("fallback_topic", self->fallback_topic_name),
308 evt_tag_printf("message", "%.*s", (int) MIN(len, 128), (char *) payload),
309 evt_tag_str("error", rd_kafka_err2str(err)),
310 evt_tag_str("driver", self->super.super.super.id),
311 log_pipe_location_tag(&self->super.super.super.super));
312 }
313 }
314
315 static gboolean
_conf_set_prop(rd_kafka_conf_t * conf,const gchar * name,const gchar * value)316 _conf_set_prop(rd_kafka_conf_t *conf, const gchar *name, const gchar *value)
317 {
318 gchar errbuf[1024];
319
320 msg_debug("kafka: setting librdkafka config property",
321 evt_tag_str("name", name),
322 evt_tag_str("value", value));
323 if (rd_kafka_conf_set(conf, name, value, errbuf, sizeof(errbuf)) < 0)
324 {
325 msg_error("kafka: error setting librdkafka config property",
326 evt_tag_str("name", name),
327 evt_tag_str("value", value),
328 evt_tag_str("error", errbuf));
329 return FALSE;
330 }
331 return TRUE;
332 }
333
334 /*
335 * Main thread
336 */
337
338
339 static gboolean
_is_property_protected(const gchar * property_name)340 _is_property_protected(const gchar *property_name)
341 {
342 static gchar *protected_properties[] =
343 {
344 "bootstrap.servers",
345 "metadata.broker.list",
346 };
347
348 for (gint i = 0; i < G_N_ELEMENTS(protected_properties); i++)
349 {
350 if (strcmp(property_name, protected_properties[i]) == 0)
351 {
352 msg_warning("kafka: protected config properties cannot be overridden",
353 evt_tag_str("name", property_name));
354 return TRUE;
355 }
356 }
357 return FALSE;
358 }
359
360 static gboolean
_apply_config_props(rd_kafka_conf_t * conf,GList * props)361 _apply_config_props(rd_kafka_conf_t *conf, GList *props)
362 {
363 GList *ll;
364
365 for (ll = props; ll != NULL; ll = g_list_next(ll))
366 {
367 KafkaProperty *kp = ll->data;
368 if (!_is_property_protected(kp->name))
369 if (!_conf_set_prop(conf, kp->name, kp->value))
370 return FALSE;
371 }
372 return TRUE;
373 }
374
375 static rd_kafka_t *
_construct_client(KafkaDestDriver * self)376 _construct_client(KafkaDestDriver *self)
377 {
378 rd_kafka_t *client;
379 rd_kafka_conf_t *conf;
380 gchar errbuf[1024];
381
382 conf = rd_kafka_conf_new();
383 if (!_conf_set_prop(conf, "metadata.broker.list", self->bootstrap_servers))
384 return NULL;
385 if (!_conf_set_prop(conf, "topic.partitioner", "murmur2_random"))
386 return NULL;
387
388 if (self->transaction_commit)
389 _conf_set_prop(conf, "transactional.id",
390 log_pipe_get_persist_name(&self->super.super.super.super));
391
392 if (!_apply_config_props(conf, self->config))
393 return NULL;
394 rd_kafka_conf_set_log_cb(conf, _kafka_log_callback);
395 rd_kafka_conf_set_dr_cb(conf, _kafka_delivery_report_cb);
396 rd_kafka_conf_set_opaque(conf, self);
397 client = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, sizeof(errbuf));
398 if (!client)
399 {
400 msg_error("kafka: error constructing the kafka connection object",
401 evt_tag_str("topic", self->topic_name->template),
402 evt_tag_str("error", errbuf),
403 evt_tag_str("driver", self->super.super.super.id),
404 log_pipe_location_tag(&self->super.super.super.super));
405 }
406 return client;
407 }
408
409 static LogThreadedDestWorker *
_construct_worker(LogThreadedDestDriver * s,gint worker_index)410 _construct_worker(LogThreadedDestDriver *s, gint worker_index)
411 {
412 return kafka_dest_worker_new(s, worker_index);
413 }
414
415 static gint
_get_flush_timeout(KafkaDestDriver * self)416 _get_flush_timeout(KafkaDestDriver *self)
417 {
418 GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super.super);
419 if (cfg_is_shutting_down(cfg))
420 return self->flush_timeout_on_shutdown;
421 return self->flush_timeout_on_reload;
422 }
423
424 static void
_flush_inflight_messages(KafkaDestDriver * self)425 _flush_inflight_messages(KafkaDestDriver *self)
426 {
427 rd_kafka_resp_err_t err;
428 gint outq_len = rd_kafka_outq_len(self->kafka);
429 gint timeout_ms = _get_flush_timeout(self);
430
431 if (outq_len > 0)
432 {
433 msg_notice("kafka: shutting down kafka producer, while messages are still in-flight, waiting for messages to flush",
434
435 evt_tag_str("topic", self->topic_name->template),
436 evt_tag_str("fallback_topic", self->fallback_topic_name),
437 evt_tag_int("outq_len", outq_len),
438 evt_tag_int("timeout_ms", timeout_ms),
439 evt_tag_str("driver", self->super.super.super.id),
440 log_pipe_location_tag(&self->super.super.super.super));
441 }
442 err = rd_kafka_flush(self->kafka, timeout_ms);
443 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
444 {
445 msg_error("kafka: error flushing accumulated messages during shutdown, rd_kafka_flush() returned failure, this might indicate that some in-flight messages are lost",
446 evt_tag_str("topic", self->topic_name->template),
447 evt_tag_str("fallback_topic", self->fallback_topic_name),
448 evt_tag_int("outq_len", rd_kafka_outq_len(self->kafka)),
449 evt_tag_str("error", rd_kafka_err2str(err)),
450 evt_tag_str("driver", self->super.super.super.id),
451 log_pipe_location_tag(&self->super.super.super.super));
452 }
453 outq_len = rd_kafka_outq_len(self->kafka);
454
455 if (outq_len != 0)
456 msg_notice("kafka: timeout while waiting for the librdkafka queue to empty, the "
457 "remaining entries will be purged and lost",
458 evt_tag_int("timeout_ms", timeout_ms),
459 evt_tag_int("outq_len", outq_len));
460 }
461
462 static void
_purge_remaining_messages(KafkaDestDriver * self)463 _purge_remaining_messages(KafkaDestDriver *self)
464 {
465 /* we are purging all messages, those ones that are sitting in the queue
466 * and also those that were sent and not yet acknowledged. The purged
467 * messages will generate failed delivery reports. */
468
469 rd_kafka_purge(self->kafka, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
470 rd_kafka_poll(self->kafka, 0);
471 }
472
473 static gboolean
_init_template_topic_name(KafkaDestDriver * self)474 _init_template_topic_name(KafkaDestDriver *self)
475 {
476 msg_debug("kafka: The topic name is a template",
477 evt_tag_str("topic", self->topic_name->template),
478 evt_tag_str("driver", self->super.super.super.id),
479 log_pipe_location_tag(&self->super.super.super.super));
480
481 if (!self->fallback_topic_name)
482 {
483 msg_error("kafka: fallback_topic() required when the topic name is a template",
484 evt_tag_str("driver", self->super.super.super.id),
485 log_pipe_location_tag(&self->super.super.super.super));
486 return FALSE;
487 }
488
489 rd_kafka_topic_t *fallback_topic = _construct_topic(self, self->fallback_topic_name);
490
491 if (fallback_topic == NULL)
492 {
493 msg_error("kafka: error constructing the fallback topic object",
494 evt_tag_str("fallback_topic", self->fallback_topic_name),
495 evt_tag_str("driver", self->super.super.super.id),
496 log_pipe_location_tag(&self->super.super.super.super));
497 return FALSE;
498 }
499
500 self->topics = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, (GDestroyNotify) rd_kafka_topic_destroy);
501 g_hash_table_insert(self->topics, g_strdup(self->fallback_topic_name), fallback_topic);
502
503 return TRUE;
504 }
505
506 static gboolean
_topic_name_is_a_template(KafkaDestDriver * self)507 _topic_name_is_a_template(KafkaDestDriver *self)
508 {
509 return !log_template_is_literal_string(self->topic_name);
510 }
511
512 static gboolean
_init_literal_topic_name(KafkaDestDriver * self)513 _init_literal_topic_name(KafkaDestDriver *self)
514 {
515 self->topic = _construct_topic(self, self->topic_name->template);
516
517 if (self->topic == NULL)
518 {
519 msg_error("kafka: error constructing the kafka topic object",
520 evt_tag_str("topic", self->topic_name->template),
521 evt_tag_str("driver", self->super.super.super.id),
522 log_pipe_location_tag(&self->super.super.super.super));
523 return FALSE;
524 }
525
526 return TRUE;
527 }
528
529 static gboolean
_init_topic_name(KafkaDestDriver * self)530 _init_topic_name(KafkaDestDriver *self)
531 {
532 if (_topic_name_is_a_template(self))
533 return _init_template_topic_name(self);
534 else
535 return _init_literal_topic_name(self);
536 }
537
538 static void
_destroy_kafka(LogDriver * s)539 _destroy_kafka(LogDriver *s)
540 {
541 KafkaDestDriver *self = (KafkaDestDriver *)s;
542
543 if (self->topics)
544 g_hash_table_unref(self->topics);
545 if (self->topic)
546 rd_kafka_topic_destroy(self->topic);
547
548 if (self->kafka)
549 rd_kafka_destroy(self->kafka);
550 }
551
552 gboolean
kafka_dd_reopen(LogDriver * s)553 kafka_dd_reopen(LogDriver *s)
554 {
555 KafkaDestDriver *self = (KafkaDestDriver *)s;
556 _destroy_kafka(s);
557
558 self->kafka = _construct_client(self);
559 if (self->kafka == NULL)
560 {
561 msg_error("kafka: error constructing kafka connection object",
562 evt_tag_str("topic", self->topic_name->template),
563 evt_tag_str("driver", self->super.super.super.id),
564 log_pipe_location_tag(&self->super.super.super.super));
565 return FALSE;
566 }
567 if (!_init_topic_name(self))
568 return FALSE;
569
570 self->transaction_inited = FALSE;
571
572 return TRUE;
573 }
574
575 gboolean
kafka_dd_init(LogPipe * s)576 kafka_dd_init(LogPipe *s)
577 {
578 KafkaDestDriver *self = (KafkaDestDriver *)s;
579 GlobalConfig *cfg = log_pipe_get_config(s);
580
581 if (!self->topic_name)
582 {
583 msg_error("kafka: the topic() argument is required for kafka destinations",
584 evt_tag_str("driver", self->super.super.super.id),
585 log_pipe_location_tag(&self->super.super.super.super));
586 return FALSE;
587 }
588 if (!self->bootstrap_servers)
589 {
590 msg_error("kafka: the bootstrap-servers() option is required for kafka destinations",
591 evt_tag_str("driver", self->super.super.super.id),
592 log_pipe_location_tag(&self->super.super.super.super));
593 return FALSE;
594 }
595
596 if (!kafka_dd_reopen(&self->super.super.super))
597 {
598 return FALSE;
599 }
600
601
602 if (self->transaction_commit)
603 {
604 /*
605 * The transaction api works on the rd_kafka client level,
606 * and packing multiple kafka operation into an atomic one.
607 * But it bundles them by calling the same operations,
608 * there is no way to selectivly bundle calls.
609 * Thus a worker cannot have its own dedicated transaction.
610 *
611 */
612 if (self->super.num_workers > 1)
613 {
614 msg_info("kafka: in case of sync_send(yes) option the number of workers limited to 1", evt_tag_int("configured_workers",
615 self->super.num_workers), evt_tag_int("workers", 1));
616 log_threaded_dest_driver_set_num_workers(&self->super.super.super, 1);
617 }
618
619 }
620
621 if (!log_threaded_dest_driver_init_method(s))
622 return FALSE;
623
624 if (self->message == NULL)
625 {
626 self->message = log_template_new(cfg, NULL);
627 log_template_compile(self->message, "$ISODATE $HOST $MSGHDR$MSG", NULL);
628 }
629
630 log_template_options_init(&self->template_options, cfg);
631 msg_verbose("kafka: Kafka destination initialized",
632 evt_tag_str("topic", self->topic_name->template),
633 evt_tag_str("fallback_topic", self->fallback_topic_name),
634 evt_tag_str("key", self->key ? self->key->template : "NULL"),
635 evt_tag_str("message", self->message->template),
636 evt_tag_str("driver", self->super.super.super.id),
637 log_pipe_location_tag(&self->super.super.super.super));
638
639 return TRUE;
640 }
641
642 void
kafka_dd_shutdown(LogThreadedDestDriver * s)643 kafka_dd_shutdown(LogThreadedDestDriver *s)
644 {
645 KafkaDestDriver *self = (KafkaDestDriver *)s;
646
647 /* this can be called from the worker threads and
648 * during reloda/shutdown from the main thread (deinit)
649 * No need to lock here, as librdkafka API is thread safe, it does the locking for us.
650 */
651
652 _flush_inflight_messages(self);
653 _purge_remaining_messages(self);
654 }
655
656 static void
_check_for_remaining_messages(KafkaDestDriver * self)657 _check_for_remaining_messages(KafkaDestDriver *self)
658 {
659 gint outq_len = rd_kafka_outq_len(self->kafka);
660 if (outq_len != 0)
661 msg_notice("kafka: failed to completely empty rdkafka queues, as we still have entries in "
662 "the queue after flush() and purge(), this is probably causing a memory leak, "
663 "please contact syslog-ng authors for support",
664 evt_tag_int("outq_len", outq_len));
665 }
666
667 static gboolean
kafka_dd_deinit(LogPipe * s)668 kafka_dd_deinit(LogPipe *s)
669 {
670 KafkaDestDriver *self = (KafkaDestDriver *)s;
671
672 kafka_dd_shutdown(&self->super);
673 _check_for_remaining_messages(self);
674
675 return log_threaded_dest_driver_deinit_method(s);
676 }
677
678 static void
kafka_dd_free(LogPipe * d)679 kafka_dd_free(LogPipe *d)
680 {
681 KafkaDestDriver *self = (KafkaDestDriver *)d;
682
683 log_template_options_destroy(&self->template_options);
684 _destroy_kafka(&self->super.super.super);
685 if (self->fallback_topic_name)
686 g_free(self->fallback_topic_name);
687 log_template_unref(self->key);
688 log_template_unref(self->message);
689 log_template_unref(self->topic_name);
690 g_mutex_clear(&self->topics_lock);
691 g_free(self->bootstrap_servers);
692 kafka_property_list_free(self->config);
693 log_threaded_dest_driver_free(d);
694 }
695
696 /*
697 * Plugin glue.
698 */
699
700 LogDriver *
kafka_dd_new(GlobalConfig * cfg)701 kafka_dd_new(GlobalConfig *cfg)
702 {
703 KafkaDestDriver *self = g_new0(KafkaDestDriver, 1);
704
705 log_threaded_dest_driver_init_instance(&self->super, cfg);
706 self->super.super.super.super.init = kafka_dd_init;
707 self->super.super.super.super.deinit = kafka_dd_deinit;
708 self->super.super.super.super.free_fn = kafka_dd_free;
709 self->super.super.super.super.generate_persist_name = _format_persist_name;
710
711 self->super.format_stats_instance = _format_stats_instance;
712 self->super.stats_source = stats_register_type("kafka");
713 self->super.worker.construct = _construct_worker;
714 /* one minute */
715 self->flush_timeout_on_shutdown = 60000;
716 self->flush_timeout_on_reload = 1000;
717 self->poll_timeout = 1000;
718
719 g_mutex_init(&self->topics_lock);
720
721 log_template_options_defaults(&self->template_options);
722
723 return (LogDriver *)self;
724 }
725