1 /*
2  * Copyright (c) 2021 One Identity
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation, or (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
16  *
17  * As an additional exemption you are allowed to compile & link against the
18  * OpenSSL libraries as published by the OpenSSL project. See the file
19  * COPYING for details.
20  *
21  */
22 
23 #include "mqtt-source.h"
24 #include <MQTTClient.h>
25 #include "messages.h"
26 
27 #define RECEIVE_TIMEOUT 1000
28 
29 void
mqtt_sd_set_topic(LogDriver * s,const gchar * topic)30 mqtt_sd_set_topic(LogDriver *s, const gchar *topic)
31 {
32   MQTTSourceDriver *self = (MQTTSourceDriver *) s;
33   g_free(self->topic);
34   self->topic = g_strdup(topic);
35 }
36 
37 const gchar *
_format_persist_name(const LogPipe * s)38 _format_persist_name(const LogPipe *s)
39 {
40   MQTTSourceDriver *self = (MQTTSourceDriver *) s;
41   static gchar stats_instance[1024];
42 
43   if (((LogPipe *)s)->persist_name)
44     g_snprintf(stats_instance, sizeof(stats_instance), "%s", ((LogPipe *)s)->persist_name);
45   else
46     g_snprintf(stats_instance, sizeof(stats_instance), "mqtt,source,%s,%s", mqtt_client_options_get_address(&self->options),
47                self->topic);
48 
49   return stats_instance;
50 }
51 
52 static const gchar *
_format_stats_instance(LogThreadedSourceDriver * s)53 _format_stats_instance(LogThreadedSourceDriver *s)
54 {
55   MQTTSourceDriver *self = (MQTTSourceDriver *) s;
56   LogPipe *p = &s->super.super.super;
57   static gchar persist_name[1024];
58 
59   if (p->persist_name)
60     g_snprintf(persist_name, sizeof(persist_name), "mqtt-source.%s", p->persist_name);
61   else
62     g_snprintf(persist_name, sizeof(persist_name), "mqtt-source.(%s,%s)", mqtt_client_options_get_address(&self->options),
63                self->topic);
64 
65   return persist_name;
66 }
67 
68 static LogMessage *
_create_message(MQTTSourceDriver * self,const gchar * message,gint length)69 _create_message(MQTTSourceDriver *self, const gchar *message, gint length)
70 {
71   LogMessage *msg = log_msg_new_empty();
72   log_msg_set_value(msg, LM_V_MESSAGE, message, length);
73 
74   return msg;
75 }
76 
77 static gboolean
_client_init(MQTTSourceDriver * self)78 _client_init(MQTTSourceDriver *self)
79 {
80   gint rc;
81 
82   if ((rc = MQTTClient_create(&self->client, mqtt_client_options_get_address(&self->options),
83                               mqtt_client_options_get_client_id(&self->options),
84                               MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
85     {
86       msg_error("Error creating mqtt client",
87                 evt_tag_str("address", mqtt_client_options_get_address(&self->options)),
88                 evt_tag_str("error code", MQTTClient_strerror(rc)),
89                 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
90                 log_pipe_location_tag(&self->super.super.super.super.super));
91       return FALSE;
92     }
93   return TRUE;
94 }
95 
96 static gint
_log_ssl_errors(const gchar * str,gsize len,gpointer u)97 _log_ssl_errors(const gchar *str, gsize len, gpointer u)
98 {
99   MQTTSourceDriver *self = (MQTTSourceDriver *) u;
100 
101   msg_error("MQTT TLS error", evt_tag_printf("line", "%.*s", (gint) len, str),
102             log_pipe_location_tag(&self->super.super.super.super.super));
103   return TRUE;
104 }
105 
106 static gboolean
_subscribe_topic(MQTTSourceDriver * self)107 _subscribe_topic(MQTTSourceDriver *self)
108 {
109   gint rc;
110   if ((rc = MQTTClient_subscribe(self->client, self->topic,
111                                  mqtt_client_options_get_qos(&self->options))) != MQTTCLIENT_SUCCESS)
112     {
113       msg_error("mqtt: Error while subscribing to topic",
114                 evt_tag_str("topic", self->topic),
115                 evt_tag_int("qos", mqtt_client_options_get_qos(&self->options)),
116                 evt_tag_str("error code", MQTTClient_strerror(rc)),
117                 evt_tag_str("driver", self->super.super.super.super.id),
118                 log_pipe_location_tag(&self->super.super.super.super.super));
119       return FALSE;
120     }
121 
122   return TRUE;
123 }
124 
125 static void
_thread_init(LogThreadedFetcherDriver * s)126 _thread_init(LogThreadedFetcherDriver *s)
127 {
128   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
129   _client_init(self);
130 }
131 
132 static void
_thread_deinit(LogThreadedFetcherDriver * s)133 _thread_deinit(LogThreadedFetcherDriver *s)
134 {
135   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
136 
137   MQTTClient_destroy(&self->client);
138 }
139 
140 static gboolean
_connect(LogThreadedFetcherDriver * s)141 _connect(LogThreadedFetcherDriver *s)
142 {
143   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
144 
145   gint rc;
146 
147   MQTTClient_connectOptions conn_opts;
148   MQTTClient_SSLOptions ssl_opts;
149   mqtt_client_options_to_mqtt_client_connection_option(&self->options, &conn_opts, &ssl_opts);
150 
151   if ((rc = MQTTClient_connect(self->client, &conn_opts)) != MQTTCLIENT_SUCCESS)
152     {
153       msg_error("Error connecting mqtt client",
154                 evt_tag_str("error code", MQTTClient_strerror(rc)),
155                 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
156                 log_pipe_location_tag(&self->super.super.super.super.super));
157       return FALSE;
158     }
159 
160   if (!_subscribe_topic(self))
161     return FALSE;
162 
163   return TRUE;
164 }
165 
166 static void
_disconnect(LogThreadedFetcherDriver * s)167 _disconnect(LogThreadedFetcherDriver *s)
168 {
169   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
170 
171   MQTTClient_disconnect(self->client, MQTT_DISCONNECT_TIMEOUT);
172 }
173 
174 static ThreadedFetchResult
_receive_result_evaluation(gint rc,MQTTClient_message * message)175 _receive_result_evaluation(gint rc, MQTTClient_message *message)
176 {
177   if (rc == MQTTCLIENT_SUCCESS && message != NULL)
178     return THREADED_FETCH_SUCCESS;
179 
180   if (rc == MQTTCLIENT_TOPICNAME_TRUNCATED && message != NULL)
181     return THREADED_FETCH_SUCCESS;
182 
183   if (rc == MQTTCLIENT_SUCCESS && message == NULL)
184     // timeout
185     return THREADED_FETCH_NO_DATA;
186 
187   return THREADED_FETCH_ERROR;
188 }
189 
190 static LogThreadedFetchResult
_fetch(LogThreadedFetcherDriver * s)191 _fetch(LogThreadedFetcherDriver *s)
192 {
193   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
194   ThreadedFetchResult result = THREADED_FETCH_ERROR;
195   char *topicName = NULL;
196   int topicLen;
197   MQTTClient_message *message = NULL;
198   LogMessage *msg = NULL;
199   gint rc = MQTTClient_receive(self->client, &topicName, &topicLen, &message, RECEIVE_TIMEOUT);
200 
201   result = _receive_result_evaluation(rc, message);
202 
203   if (result == THREADED_FETCH_SUCCESS)
204     {
205       msg = _create_message(self, (gchar *)message->payload, message->payloadlen);
206       MQTTClient_freeMessage(&message);
207       MQTTClient_free(topicName);
208     }
209 
210   if (result == THREADED_FETCH_ERROR)
211     {
212       msg_error("Error while receiving msg",
213                 evt_tag_str("error code", MQTTClient_strerror(rc)),
214                 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
215                 log_pipe_location_tag(&self->super.super.super.super.super));
216     }
217 
218   return (LogThreadedFetchResult)
219   {
220     result, msg
221   };
222 }
223 
224 static gboolean
_init(LogPipe * s)225 _init(LogPipe *s)
226 {
227   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
228 
229   if (!self->topic)
230     {
231       msg_error("mqtt: the topic() argument is required for mqtt source",
232                 log_pipe_location_tag(&self->super.super.super.super.super));
233       return FALSE;
234     }
235 
236   if(!mqtt_client_options_checker(&self->options))
237     return FALSE;
238 
239   if(!log_threaded_fetcher_driver_init_method(s))
240     return FALSE;
241 
242   if (mqtt_client_options_get_client_id(&self->options) == NULL)
243     {
244       gchar *tmp_client_id;
245 
246       tmp_client_id = g_strdup_printf("syslog-ng-source-%s", self->topic);
247 
248       mqtt_client_options_set_client_id(&self->options, tmp_client_id);
249       g_free(tmp_client_id);
250     }
251 
252   return TRUE;
253 }
254 
255 static gboolean
_deinit(LogPipe * s)256 _deinit(LogPipe *s)
257 {
258   return log_threaded_fetcher_driver_deinit_method(s);
259 }
260 
261 static void
_free(LogPipe * s)262 _free(LogPipe *s)
263 {
264   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
265   mqtt_client_options_destroy(&self->options);
266   g_free(self->topic);
267 
268   log_threaded_fetcher_driver_free_method(s);
269 }
270 
271 LogDriver *
mqtt_sd_new(GlobalConfig * cfg)272 mqtt_sd_new(GlobalConfig *cfg)
273 {
274   MQTTSourceDriver *self = g_new0(MQTTSourceDriver, 1);
275 
276   log_threaded_fetcher_driver_init_instance(&self->super, cfg);
277 
278   mqtt_client_options_defaults(&self->options);
279   mqtt_client_options_set_log_ssl_error_fn(&self->options, self, _log_ssl_errors);
280 
281   self->super.super.super.super.super.init = _init;
282   self->super.super.super.super.super.deinit = _deinit;
283   self->super.super.super.super.super.free_fn = _free;
284 
285   self->super.connect = _connect;
286   self->super.disconnect = _disconnect;
287   self->super.fetch = _fetch;
288   self->super.thread_init = _thread_init;
289   self->super.thread_deinit = _thread_deinit;
290 
291   self->super.super.super.super.super.generate_persist_name = _format_persist_name;
292   self->super.super.format_stats_instance = _format_stats_instance;
293   return &self->super.super.super.super;
294 }
295 
296 MQTTClientOptions *
mqtt_sd_get_options(LogDriver * s)297 mqtt_sd_get_options(LogDriver *s)
298 {
299   MQTTSourceDriver *self = (MQTTSourceDriver *)s;
300 
301   return &self->options;
302 }
303