1 /*
2 * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
3 * Copyright (c) 2014 Balabit
4 * Copyright (c) 2013 Viktor Tusa <tusa@balabit.hu>
5 * Copyright (c) 2014 Gergely Nagy <algernon@balabit.hu>
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License version 2 as published
9 * by the Free Software Foundation, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 * As an additional exemption you are allowed to compile & link against the
21 * OpenSSL libraries as published by the OpenSSL project. See the file
22 * COPYING for details.
23 *
24 */
25
26 #include "afstomp.h"
27 #include "afstomp-parser.h"
28 #include "plugin.h"
29 #include "messages.h"
30 #include "stats/stats-registry.h"
31 #include "logmsg/nvtable.h"
32 #include "logqueue.h"
33 #include "scratch-buffers.h"
34 #include "plugin-types.h"
35
36 #include <glib.h>
37 #include <stomp.h>
38 #include "logthrdest/logthrdestdrv.h"
39
40 typedef struct
41 {
42 LogThreadedDestDriver super;
43
44 gchar *destination;
45 LogTemplate *body_template;
46
47 gboolean persistent;
48 gboolean ack_needed;
49
50 gchar *host;
51 gint port;
52
53 gchar *user;
54 gchar *password;
55
56 LogTemplateOptions template_options;
57
58 ValuePairs *vp;
59
60 stomp_connection *conn;
61 } STOMPDestDriver;
62
63 /*
64 * Configuration
65 */
66
67 void
afstomp_dd_set_user(LogDriver * d,const gchar * user)68 afstomp_dd_set_user(LogDriver *d, const gchar *user)
69 {
70 STOMPDestDriver *self = (STOMPDestDriver *) d;
71
72 g_free(self->user);
73 self->user = g_strdup(user);
74 }
75
76 void
afstomp_dd_set_password(LogDriver * d,const gchar * password)77 afstomp_dd_set_password(LogDriver *d, const gchar *password)
78 {
79 STOMPDestDriver *self = (STOMPDestDriver *) d;
80
81 g_free(self->password);
82 self->password = g_strdup(password);
83 }
84
85 void
afstomp_dd_set_host(LogDriver * d,const gchar * host)86 afstomp_dd_set_host(LogDriver *d, const gchar *host)
87 {
88 STOMPDestDriver *self = (STOMPDestDriver *) d;
89
90 g_free(self->host);
91 self->host = g_strdup(host);
92 }
93
94 void
afstomp_dd_set_port(LogDriver * d,gint port)95 afstomp_dd_set_port(LogDriver *d, gint port)
96 {
97 STOMPDestDriver *self = (STOMPDestDriver *) d;
98
99 self->port = (int) port;
100 }
101
102 void
afstomp_dd_set_destination(LogDriver * d,const gchar * destination)103 afstomp_dd_set_destination(LogDriver *d, const gchar *destination)
104 {
105 STOMPDestDriver *self = (STOMPDestDriver *) d;
106
107 g_free(self->destination);
108 self->destination = g_strdup(destination);
109 }
110
111 void
afstomp_dd_set_body(LogDriver * d,LogTemplate * body_template)112 afstomp_dd_set_body(LogDriver *d, LogTemplate *body_template)
113 {
114 STOMPDestDriver *self = (STOMPDestDriver *) d;
115
116 log_template_unref(self->body_template);
117 self->body_template = body_template;
118 }
119
120 void
afstomp_dd_set_persistent(LogDriver * s,gboolean persistent)121 afstomp_dd_set_persistent(LogDriver *s, gboolean persistent)
122 {
123 STOMPDestDriver *self = (STOMPDestDriver *) s;
124
125 self->persistent = persistent;
126 }
127
128 void
afstomp_dd_set_ack(LogDriver * s,gboolean ack_needed)129 afstomp_dd_set_ack(LogDriver *s, gboolean ack_needed)
130 {
131 STOMPDestDriver *self = (STOMPDestDriver *) s;
132
133 self->ack_needed = ack_needed;
134 }
135
136 void
afstomp_dd_set_value_pairs(LogDriver * s,ValuePairs * vp)137 afstomp_dd_set_value_pairs(LogDriver *s, ValuePairs *vp)
138 {
139 STOMPDestDriver *self = (STOMPDestDriver *) s;
140
141 value_pairs_unref(self->vp);
142 self->vp = vp;
143 }
144
145
146 LogTemplateOptions *
afstomp_dd_get_template_options(LogDriver * s)147 afstomp_dd_get_template_options(LogDriver *s)
148 {
149 STOMPDestDriver *self = (STOMPDestDriver *) s;
150
151 return &self->template_options;
152 }
153
154 /*
155 * Utilities
156 */
157
158 static const gchar *
afstomp_dd_format_stats_instance(LogThreadedDestDriver * s)159 afstomp_dd_format_stats_instance(LogThreadedDestDriver *s)
160 {
161 STOMPDestDriver *self = (STOMPDestDriver *) s;
162 static gchar persist_name[1024];
163
164 if (s->super.super.super.persist_name)
165 g_snprintf(persist_name, sizeof(persist_name), "afstomp,%s", s->super.super.super.persist_name);
166 else
167 g_snprintf(persist_name, sizeof(persist_name), "afstomp,%s,%u,%s", self->host, self->port,
168 self->destination);
169
170 return persist_name;
171 }
172
173 static const gchar *
afstomp_dd_format_persist_name(const LogPipe * s)174 afstomp_dd_format_persist_name(const LogPipe *s)
175 {
176 const STOMPDestDriver *self = (const STOMPDestDriver *)s;
177 static gchar persist_name[1024];
178
179 if (s->persist_name)
180 g_snprintf(persist_name, sizeof(persist_name), "afstomp.%s", s->persist_name);
181 else
182 g_snprintf(persist_name, sizeof(persist_name), "afstomp(%s,%u,%s)", self->host, self->port,
183 self->destination);
184
185 return persist_name;
186 }
187
188 static void
afstomp_create_connect_frame(STOMPDestDriver * self,stomp_frame * frame)189 afstomp_create_connect_frame(STOMPDestDriver *self, stomp_frame *frame)
190 {
191 stomp_frame_init(frame, "CONNECT", sizeof("CONNECT"));
192 stomp_frame_add_header(frame, "login", self->user);
193 stomp_frame_add_header(frame, "passcode", self->password);
194 };
195
afstomp_try_connect(STOMPDestDriver * self)196 static gboolean afstomp_try_connect(STOMPDestDriver *self)
197 {
198 return stomp_connect(&self->conn, self->host, self->port);
199 };
200
201 static gboolean
afstomp_send_frame(STOMPDestDriver * self,stomp_frame * frame)202 afstomp_send_frame(STOMPDestDriver *self, stomp_frame *frame)
203 {
204 return stomp_write(self->conn, frame);
205 }
206
207 static gboolean
afstomp_dd_connect(STOMPDestDriver * self,gboolean reconnect)208 afstomp_dd_connect(STOMPDestDriver *self, gboolean reconnect)
209 {
210 stomp_frame frame;
211
212 if (reconnect && self->conn)
213 return TRUE;
214
215 if (!afstomp_try_connect(self))
216 return FALSE;
217
218 afstomp_create_connect_frame(self, &frame);
219 if (!afstomp_send_frame(self, &frame))
220 {
221 msg_error("Sending CONNECT frame to STOMP server failed!");
222 return FALSE;
223 }
224
225 gboolean frame_read = stomp_receive_frame(self->conn, &frame);
226 if (!frame_read || strcmp(frame.command, "CONNECTED") != 0)
227 {
228 msg_debug("Error connecting to STOMP server, stomp server did not accept CONNECT request");
229 stomp_frame_deinit(&frame);
230
231 return FALSE;
232 }
233 msg_debug("Connecting to STOMP succeeded",
234 evt_tag_str("driver", self->super.super.super.id));
235
236 stomp_frame_deinit(&frame);
237
238 return TRUE;
239 }
240
241 static void
afstomp_dd_disconnect(LogThreadedDestDriver * s)242 afstomp_dd_disconnect(LogThreadedDestDriver *s)
243 {
244 STOMPDestDriver *self = (STOMPDestDriver *)s;
245
246 stomp_disconnect(&self->conn);
247 self->conn = NULL;
248 }
249
250 /* TODO escape '\0' when passing down the value */
251 static gboolean
afstomp_vp_foreach(const gchar * name,TypeHint type,const gchar * value,gsize value_len,gpointer user_data)252 afstomp_vp_foreach(const gchar *name, TypeHint type, const gchar *value, gsize value_len,
253 gpointer user_data)
254 {
255 stomp_frame *frame = (stomp_frame *) (user_data);
256
257 stomp_frame_add_header(frame, name, value);
258
259 return FALSE;
260 }
261
262 static void
afstomp_set_frame_body(STOMPDestDriver * self,GString * body,stomp_frame * frame,LogMessage * msg)263 afstomp_set_frame_body(STOMPDestDriver *self, GString *body, stomp_frame *frame, LogMessage *msg)
264 {
265 if (self->body_template)
266 {
267 LogTemplateEvalOptions options = {&self->template_options, LTZ_LOCAL,
268 self->super.worker.instance.seq_num, NULL
269 };
270 log_template_format(self->body_template, msg, &options, body);
271 stomp_frame_set_body(frame, body->str, body->len);
272 }
273 }
274
275 static gboolean
afstomp_worker_publish(STOMPDestDriver * self,LogMessage * msg)276 afstomp_worker_publish(STOMPDestDriver *self, LogMessage *msg)
277 {
278 gboolean success = TRUE;
279 GString *body = NULL;
280 stomp_frame frame;
281 stomp_frame recv_frame;
282 gchar seq_num[16];
283
284 if (!self->conn)
285 {
286 msg_error("STOMP server is not connected, not sending message!");
287 return FALSE;
288 }
289
290 body = scratch_buffers_alloc();
291 stomp_frame_init(&frame, "SEND", sizeof("SEND"));
292
293 if (self->persistent)
294 stomp_frame_add_header(&frame, "persistent", "true");
295
296 stomp_frame_add_header(&frame, "destination", self->destination);
297 if (self->ack_needed)
298 {
299 g_snprintf(seq_num, sizeof(seq_num), "%i", self->super.worker.instance.seq_num);
300 stomp_frame_add_header(&frame, "receipt", seq_num);
301 };
302
303 LogTemplateEvalOptions options = {&self->template_options, LTZ_SEND, self->super.worker.instance.seq_num, NULL};
304 value_pairs_foreach(self->vp, afstomp_vp_foreach, msg, &options, &frame);
305
306 afstomp_set_frame_body(self, body, &frame, msg);
307
308 if (!afstomp_send_frame(self, &frame))
309 {
310 msg_error("Error while inserting into STOMP server");
311 success = FALSE;
312 }
313
314 if (success && self->ack_needed)
315 success = stomp_receive_frame(self->conn, &recv_frame);
316
317 return success;
318 }
319
320 static LogThreadedResult
afstomp_worker_insert(LogThreadedDestDriver * s,LogMessage * msg)321 afstomp_worker_insert(LogThreadedDestDriver *s, LogMessage *msg)
322 {
323 STOMPDestDriver *self = (STOMPDestDriver *)s;
324
325 if (!afstomp_dd_connect(self, TRUE))
326 return LTR_NOT_CONNECTED;
327
328 if (!afstomp_worker_publish (self, msg))
329 return LTR_ERROR;
330
331 return LTR_SUCCESS;
332 }
333
334 static void
afstomp_worker_thread_init(LogThreadedDestDriver * s)335 afstomp_worker_thread_init(LogThreadedDestDriver *s)
336 {
337 STOMPDestDriver *self = (STOMPDestDriver *) s;
338
339 afstomp_dd_connect(self, FALSE);
340 }
341
342 static gboolean
afstomp_dd_init(LogPipe * s)343 afstomp_dd_init(LogPipe *s)
344 {
345 STOMPDestDriver *self = (STOMPDestDriver *) s;
346 GlobalConfig *cfg = log_pipe_get_config(s);
347
348 if (!log_threaded_dest_driver_init_method(s))
349 return FALSE;
350
351 log_template_options_init(&self->template_options, cfg);
352
353 self->conn = NULL;
354
355 msg_verbose("Initializing STOMP destination",
356 evt_tag_str("host", self->host),
357 evt_tag_int("port", self->port),
358 evt_tag_str("destination", self->destination));
359
360 return TRUE;
361 }
362
363 static void
afstomp_dd_free(LogPipe * d)364 afstomp_dd_free(LogPipe *d)
365 {
366 STOMPDestDriver *self = (STOMPDestDriver *) d;
367
368 log_template_options_destroy(&self->template_options);
369
370 g_free(self->destination);
371 log_template_unref(self->body_template);
372 g_free(self->user);
373 g_free(self->password);
374 g_free(self->host);
375 value_pairs_unref(self->vp);
376 log_threaded_dest_driver_free(d);
377 }
378
379 LogDriver *
afstomp_dd_new(GlobalConfig * cfg)380 afstomp_dd_new(GlobalConfig *cfg)
381 {
382 STOMPDestDriver *self = g_new0(STOMPDestDriver, 1);
383
384 log_threaded_dest_driver_init_instance(&self->super, cfg);
385 self->super.super.super.super.init = afstomp_dd_init;
386 self->super.super.super.super.free_fn = afstomp_dd_free;
387 self->super.super.super.super.generate_persist_name = afstomp_dd_format_persist_name;
388
389 self->super.worker.thread_init = afstomp_worker_thread_init;
390 self->super.worker.disconnect = afstomp_dd_disconnect;
391 self->super.worker.insert = afstomp_worker_insert;
392
393 self->super.format_stats_instance = afstomp_dd_format_stats_instance;
394 self->super.stats_source = stats_register_type("stomp");
395
396 afstomp_dd_set_host((LogDriver *) self, "127.0.0.1");
397 afstomp_dd_set_port((LogDriver *) self, 61613);
398 afstomp_dd_set_destination((LogDriver *) self, "/topic/syslog");
399 afstomp_dd_set_persistent((LogDriver *) self, TRUE);
400 afstomp_dd_set_ack((LogDriver *) self, FALSE);
401
402 log_template_options_defaults(&self->template_options);
403 afstomp_dd_set_value_pairs(&self->super.super.super, value_pairs_new_default(cfg));
404
405 return (LogDriver *) self;
406 }
407
408 extern CfgParser afstomp_dd_parser;
409
410 static Plugin afstomp_plugin =
411 {
412 .type = LL_CONTEXT_DESTINATION,
413 .name = "stomp",
414 .parser = &afstomp_parser
415 };
416
417 gboolean
afstomp_module_init(PluginContext * context,CfgArgs * args)418 afstomp_module_init(PluginContext *context, CfgArgs *args)
419 {
420 plugin_register(context, &afstomp_plugin, 1);
421 return TRUE;
422 }
423
424 const ModuleInfo module_info =
425 {
426 .canonical_name = "afstomp",
427 .version = SYSLOG_NG_VERSION,
428 .description = "The afstomp module provides STOMP destination support for syslog-ng.",
429 .core_revision = SYSLOG_NG_SOURCE_REVISION,
430 .plugins = &afstomp_plugin,
431 .plugins_len = 1,
432 };
433