1 /*
2  * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
3  * Copyright (c) 2014 Balabit
4  * Copyright (c) 2013 Viktor Tusa <tusa@balabit.hu>
5  * Copyright (c) 2014 Gergely Nagy <algernon@balabit.hu>
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License version 2 as published
9  * by the Free Software Foundation, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  * As an additional exemption you are allowed to compile & link against the
21  * OpenSSL libraries as published by the OpenSSL project. See the file
22  * COPYING for details.
23  *
24  */
25 
26 #include "afstomp.h"
27 #include "afstomp-parser.h"
28 #include "plugin.h"
29 #include "messages.h"
30 #include "stats/stats-registry.h"
31 #include "logmsg/nvtable.h"
32 #include "logqueue.h"
33 #include "scratch-buffers.h"
34 #include "plugin-types.h"
35 
36 #include <glib.h>
37 #include <stomp.h>
38 #include "logthrdest/logthrdestdrv.h"
39 
40 typedef struct
41 {
42   LogThreadedDestDriver super;
43 
44   gchar *destination;
45   LogTemplate *body_template;
46 
47   gboolean persistent;
48   gboolean ack_needed;
49 
50   gchar *host;
51   gint port;
52 
53   gchar *user;
54   gchar *password;
55 
56   LogTemplateOptions template_options;
57 
58   ValuePairs *vp;
59 
60   stomp_connection *conn;
61 } STOMPDestDriver;
62 
63 /*
64  * Configuration
65  */
66 
67 void
afstomp_dd_set_user(LogDriver * d,const gchar * user)68 afstomp_dd_set_user(LogDriver *d, const gchar *user)
69 {
70   STOMPDestDriver *self = (STOMPDestDriver *) d;
71 
72   g_free(self->user);
73   self->user = g_strdup(user);
74 }
75 
76 void
afstomp_dd_set_password(LogDriver * d,const gchar * password)77 afstomp_dd_set_password(LogDriver *d, const gchar *password)
78 {
79   STOMPDestDriver *self = (STOMPDestDriver *) d;
80 
81   g_free(self->password);
82   self->password = g_strdup(password);
83 }
84 
85 void
afstomp_dd_set_host(LogDriver * d,const gchar * host)86 afstomp_dd_set_host(LogDriver *d, const gchar *host)
87 {
88   STOMPDestDriver *self = (STOMPDestDriver *) d;
89 
90   g_free(self->host);
91   self->host = g_strdup(host);
92 }
93 
94 void
afstomp_dd_set_port(LogDriver * d,gint port)95 afstomp_dd_set_port(LogDriver *d, gint port)
96 {
97   STOMPDestDriver *self = (STOMPDestDriver *) d;
98 
99   self->port = (int) port;
100 }
101 
102 void
afstomp_dd_set_destination(LogDriver * d,const gchar * destination)103 afstomp_dd_set_destination(LogDriver *d, const gchar *destination)
104 {
105   STOMPDestDriver *self = (STOMPDestDriver *) d;
106 
107   g_free(self->destination);
108   self->destination = g_strdup(destination);
109 }
110 
111 void
afstomp_dd_set_body(LogDriver * d,LogTemplate * body_template)112 afstomp_dd_set_body(LogDriver *d, LogTemplate *body_template)
113 {
114   STOMPDestDriver *self = (STOMPDestDriver *) d;
115 
116   log_template_unref(self->body_template);
117   self->body_template = body_template;
118 }
119 
120 void
afstomp_dd_set_persistent(LogDriver * s,gboolean persistent)121 afstomp_dd_set_persistent(LogDriver *s, gboolean persistent)
122 {
123   STOMPDestDriver *self = (STOMPDestDriver *) s;
124 
125   self->persistent = persistent;
126 }
127 
128 void
afstomp_dd_set_ack(LogDriver * s,gboolean ack_needed)129 afstomp_dd_set_ack(LogDriver *s, gboolean ack_needed)
130 {
131   STOMPDestDriver *self = (STOMPDestDriver *) s;
132 
133   self->ack_needed = ack_needed;
134 }
135 
136 void
afstomp_dd_set_value_pairs(LogDriver * s,ValuePairs * vp)137 afstomp_dd_set_value_pairs(LogDriver *s, ValuePairs *vp)
138 {
139   STOMPDestDriver *self = (STOMPDestDriver *) s;
140 
141   value_pairs_unref(self->vp);
142   self->vp = vp;
143 }
144 
145 
146 LogTemplateOptions *
afstomp_dd_get_template_options(LogDriver * s)147 afstomp_dd_get_template_options(LogDriver *s)
148 {
149   STOMPDestDriver *self = (STOMPDestDriver *) s;
150 
151   return &self->template_options;
152 }
153 
154 /*
155  * Utilities
156  */
157 
158 static const gchar *
afstomp_dd_format_stats_instance(LogThreadedDestDriver * s)159 afstomp_dd_format_stats_instance(LogThreadedDestDriver *s)
160 {
161   STOMPDestDriver *self = (STOMPDestDriver *) s;
162   static gchar persist_name[1024];
163 
164   if (s->super.super.super.persist_name)
165     g_snprintf(persist_name, sizeof(persist_name), "afstomp,%s", s->super.super.super.persist_name);
166   else
167     g_snprintf(persist_name, sizeof(persist_name), "afstomp,%s,%u,%s", self->host, self->port,
168                self->destination);
169 
170   return persist_name;
171 }
172 
173 static const gchar *
afstomp_dd_format_persist_name(const LogPipe * s)174 afstomp_dd_format_persist_name(const LogPipe *s)
175 {
176   const STOMPDestDriver *self = (const STOMPDestDriver *)s;
177   static gchar persist_name[1024];
178 
179   if (s->persist_name)
180     g_snprintf(persist_name, sizeof(persist_name), "afstomp.%s", s->persist_name);
181   else
182     g_snprintf(persist_name, sizeof(persist_name), "afstomp(%s,%u,%s)", self->host, self->port,
183                self->destination);
184 
185   return persist_name;
186 }
187 
188 static void
afstomp_create_connect_frame(STOMPDestDriver * self,stomp_frame * frame)189 afstomp_create_connect_frame(STOMPDestDriver *self, stomp_frame *frame)
190 {
191   stomp_frame_init(frame, "CONNECT", sizeof("CONNECT"));
192   stomp_frame_add_header(frame, "login", self->user);
193   stomp_frame_add_header(frame, "passcode", self->password);
194 };
195 
afstomp_try_connect(STOMPDestDriver * self)196 static gboolean afstomp_try_connect(STOMPDestDriver *self)
197 {
198   return stomp_connect(&self->conn, self->host, self->port);
199 };
200 
201 static gboolean
afstomp_send_frame(STOMPDestDriver * self,stomp_frame * frame)202 afstomp_send_frame(STOMPDestDriver *self, stomp_frame *frame)
203 {
204   return stomp_write(self->conn, frame);
205 }
206 
207 static gboolean
afstomp_dd_connect(STOMPDestDriver * self,gboolean reconnect)208 afstomp_dd_connect(STOMPDestDriver *self, gboolean reconnect)
209 {
210   stomp_frame frame;
211 
212   if (reconnect && self->conn)
213     return TRUE;
214 
215   if (!afstomp_try_connect(self))
216     return FALSE;
217 
218   afstomp_create_connect_frame(self, &frame);
219   if (!afstomp_send_frame(self, &frame))
220     {
221       msg_error("Sending CONNECT frame to STOMP server failed!");
222       return FALSE;
223     }
224 
225   gboolean frame_read = stomp_receive_frame(self->conn, &frame);
226   if (!frame_read || strcmp(frame.command, "CONNECTED") != 0)
227     {
228       msg_debug("Error connecting to STOMP server, stomp server did not accept CONNECT request");
229       stomp_frame_deinit(&frame);
230 
231       return FALSE;
232     }
233   msg_debug("Connecting to STOMP succeeded",
234             evt_tag_str("driver", self->super.super.super.id));
235 
236   stomp_frame_deinit(&frame);
237 
238   return TRUE;
239 }
240 
241 static void
afstomp_dd_disconnect(LogThreadedDestDriver * s)242 afstomp_dd_disconnect(LogThreadedDestDriver *s)
243 {
244   STOMPDestDriver *self = (STOMPDestDriver *)s;
245 
246   stomp_disconnect(&self->conn);
247   self->conn = NULL;
248 }
249 
250 /* TODO escape '\0' when passing down the value */
251 static gboolean
afstomp_vp_foreach(const gchar * name,TypeHint type,const gchar * value,gsize value_len,gpointer user_data)252 afstomp_vp_foreach(const gchar *name, TypeHint type, const gchar *value, gsize value_len,
253                    gpointer user_data)
254 {
255   stomp_frame *frame = (stomp_frame *) (user_data);
256 
257   stomp_frame_add_header(frame, name, value);
258 
259   return FALSE;
260 }
261 
262 static void
afstomp_set_frame_body(STOMPDestDriver * self,GString * body,stomp_frame * frame,LogMessage * msg)263 afstomp_set_frame_body(STOMPDestDriver *self, GString *body, stomp_frame *frame, LogMessage *msg)
264 {
265   if (self->body_template)
266     {
267       LogTemplateEvalOptions options = {&self->template_options, LTZ_LOCAL,
268                                         self->super.worker.instance.seq_num, NULL
269                                        };
270       log_template_format(self->body_template, msg, &options, body);
271       stomp_frame_set_body(frame, body->str, body->len);
272     }
273 }
274 
275 static gboolean
afstomp_worker_publish(STOMPDestDriver * self,LogMessage * msg)276 afstomp_worker_publish(STOMPDestDriver *self, LogMessage *msg)
277 {
278   gboolean success = TRUE;
279   GString *body = NULL;
280   stomp_frame frame;
281   stomp_frame recv_frame;
282   gchar seq_num[16];
283 
284   if (!self->conn)
285     {
286       msg_error("STOMP server is not connected, not sending message!");
287       return FALSE;
288     }
289 
290   body = scratch_buffers_alloc();
291   stomp_frame_init(&frame, "SEND", sizeof("SEND"));
292 
293   if (self->persistent)
294     stomp_frame_add_header(&frame, "persistent", "true");
295 
296   stomp_frame_add_header(&frame, "destination", self->destination);
297   if (self->ack_needed)
298     {
299       g_snprintf(seq_num, sizeof(seq_num), "%i", self->super.worker.instance.seq_num);
300       stomp_frame_add_header(&frame, "receipt", seq_num);
301     };
302 
303   LogTemplateEvalOptions options = {&self->template_options, LTZ_SEND, self->super.worker.instance.seq_num, NULL};
304   value_pairs_foreach(self->vp, afstomp_vp_foreach, msg, &options, &frame);
305 
306   afstomp_set_frame_body(self, body, &frame, msg);
307 
308   if (!afstomp_send_frame(self, &frame))
309     {
310       msg_error("Error while inserting into STOMP server");
311       success = FALSE;
312     }
313 
314   if (success && self->ack_needed)
315     success = stomp_receive_frame(self->conn, &recv_frame);
316 
317   return success;
318 }
319 
320 static LogThreadedResult
afstomp_worker_insert(LogThreadedDestDriver * s,LogMessage * msg)321 afstomp_worker_insert(LogThreadedDestDriver *s, LogMessage *msg)
322 {
323   STOMPDestDriver *self = (STOMPDestDriver *)s;
324 
325   if (!afstomp_dd_connect(self, TRUE))
326     return LTR_NOT_CONNECTED;
327 
328   if (!afstomp_worker_publish (self, msg))
329     return LTR_ERROR;
330 
331   return LTR_SUCCESS;
332 }
333 
334 static void
afstomp_worker_thread_init(LogThreadedDestDriver * s)335 afstomp_worker_thread_init(LogThreadedDestDriver *s)
336 {
337   STOMPDestDriver *self = (STOMPDestDriver *) s;
338 
339   afstomp_dd_connect(self, FALSE);
340 }
341 
342 static gboolean
afstomp_dd_init(LogPipe * s)343 afstomp_dd_init(LogPipe *s)
344 {
345   STOMPDestDriver *self = (STOMPDestDriver *) s;
346   GlobalConfig *cfg = log_pipe_get_config(s);
347 
348   if (!log_threaded_dest_driver_init_method(s))
349     return FALSE;
350 
351   log_template_options_init(&self->template_options, cfg);
352 
353   self->conn = NULL;
354 
355   msg_verbose("Initializing STOMP destination",
356               evt_tag_str("host", self->host),
357               evt_tag_int("port", self->port),
358               evt_tag_str("destination", self->destination));
359 
360   return TRUE;
361 }
362 
363 static void
afstomp_dd_free(LogPipe * d)364 afstomp_dd_free(LogPipe *d)
365 {
366   STOMPDestDriver *self = (STOMPDestDriver *) d;
367 
368   log_template_options_destroy(&self->template_options);
369 
370   g_free(self->destination);
371   log_template_unref(self->body_template);
372   g_free(self->user);
373   g_free(self->password);
374   g_free(self->host);
375   value_pairs_unref(self->vp);
376   log_threaded_dest_driver_free(d);
377 }
378 
379 LogDriver *
afstomp_dd_new(GlobalConfig * cfg)380 afstomp_dd_new(GlobalConfig *cfg)
381 {
382   STOMPDestDriver *self = g_new0(STOMPDestDriver, 1);
383 
384   log_threaded_dest_driver_init_instance(&self->super, cfg);
385   self->super.super.super.super.init = afstomp_dd_init;
386   self->super.super.super.super.free_fn = afstomp_dd_free;
387   self->super.super.super.super.generate_persist_name = afstomp_dd_format_persist_name;
388 
389   self->super.worker.thread_init = afstomp_worker_thread_init;
390   self->super.worker.disconnect = afstomp_dd_disconnect;
391   self->super.worker.insert = afstomp_worker_insert;
392 
393   self->super.format_stats_instance = afstomp_dd_format_stats_instance;
394   self->super.stats_source = stats_register_type("stomp");
395 
396   afstomp_dd_set_host((LogDriver *) self, "127.0.0.1");
397   afstomp_dd_set_port((LogDriver *) self, 61613);
398   afstomp_dd_set_destination((LogDriver *) self, "/topic/syslog");
399   afstomp_dd_set_persistent((LogDriver *) self, TRUE);
400   afstomp_dd_set_ack((LogDriver *) self, FALSE);
401 
402   log_template_options_defaults(&self->template_options);
403   afstomp_dd_set_value_pairs(&self->super.super.super, value_pairs_new_default(cfg));
404 
405   return (LogDriver *) self;
406 }
407 
408 extern CfgParser afstomp_dd_parser;
409 
410 static Plugin afstomp_plugin =
411 {
412   .type = LL_CONTEXT_DESTINATION,
413   .name = "stomp",
414   .parser = &afstomp_parser
415 };
416 
417 gboolean
afstomp_module_init(PluginContext * context,CfgArgs * args)418 afstomp_module_init(PluginContext *context, CfgArgs *args)
419 {
420   plugin_register(context, &afstomp_plugin, 1);
421   return TRUE;
422 }
423 
424 const ModuleInfo module_info =
425 {
426   .canonical_name = "afstomp",
427   .version = SYSLOG_NG_VERSION,
428   .description = "The afstomp module provides STOMP destination support for syslog-ng.",
429   .core_revision = SYSLOG_NG_SOURCE_REVISION,
430   .plugins = &afstomp_plugin,
431   .plugins_len = 1,
432 };
433