1 /*
2 * Copyright (c) 2021 One Identity
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation, or (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 *
17 * As an additional exemption you are allowed to compile & link against the
18 * OpenSSL libraries as published by the OpenSSL project. See the file
19 * COPYING for details.
20 *
21 */
22
23 #include "mqtt-source.h"
24 #include <MQTTClient.h>
25 #include "messages.h"
26
27 #define RECEIVE_TIMEOUT 1000
28
29 void
mqtt_sd_set_topic(LogDriver * s,const gchar * topic)30 mqtt_sd_set_topic(LogDriver *s, const gchar *topic)
31 {
32 MQTTSourceDriver *self = (MQTTSourceDriver *) s;
33 g_free(self->topic);
34 self->topic = g_strdup(topic);
35 }
36
37 const gchar *
_format_persist_name(const LogPipe * s)38 _format_persist_name(const LogPipe *s)
39 {
40 MQTTSourceDriver *self = (MQTTSourceDriver *) s;
41 static gchar stats_instance[1024];
42
43 if (((LogPipe *)s)->persist_name)
44 g_snprintf(stats_instance, sizeof(stats_instance), "%s", ((LogPipe *)s)->persist_name);
45 else
46 g_snprintf(stats_instance, sizeof(stats_instance), "mqtt,source,%s,%s", mqtt_client_options_get_address(&self->options),
47 self->topic);
48
49 return stats_instance;
50 }
51
52 static const gchar *
_format_stats_instance(LogThreadedSourceDriver * s)53 _format_stats_instance(LogThreadedSourceDriver *s)
54 {
55 MQTTSourceDriver *self = (MQTTSourceDriver *) s;
56 LogPipe *p = &s->super.super.super;
57 static gchar persist_name[1024];
58
59 if (p->persist_name)
60 g_snprintf(persist_name, sizeof(persist_name), "mqtt-source.%s", p->persist_name);
61 else
62 g_snprintf(persist_name, sizeof(persist_name), "mqtt-source.(%s,%s)", mqtt_client_options_get_address(&self->options),
63 self->topic);
64
65 return persist_name;
66 }
67
68 static LogMessage *
_create_message(MQTTSourceDriver * self,const gchar * message,gint length)69 _create_message(MQTTSourceDriver *self, const gchar *message, gint length)
70 {
71 LogMessage *msg = log_msg_new_empty();
72 log_msg_set_value(msg, LM_V_MESSAGE, message, length);
73
74 return msg;
75 }
76
77 static gboolean
_client_init(MQTTSourceDriver * self)78 _client_init(MQTTSourceDriver *self)
79 {
80 gint rc;
81
82 if ((rc = MQTTClient_create(&self->client, mqtt_client_options_get_address(&self->options),
83 mqtt_client_options_get_client_id(&self->options),
84 MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
85 {
86 msg_error("Error creating mqtt client",
87 evt_tag_str("address", mqtt_client_options_get_address(&self->options)),
88 evt_tag_str("error code", MQTTClient_strerror(rc)),
89 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
90 log_pipe_location_tag(&self->super.super.super.super.super));
91 return FALSE;
92 }
93 return TRUE;
94 }
95
96 static gint
_log_ssl_errors(const gchar * str,gsize len,gpointer u)97 _log_ssl_errors(const gchar *str, gsize len, gpointer u)
98 {
99 MQTTSourceDriver *self = (MQTTSourceDriver *) u;
100
101 msg_error("MQTT TLS error", evt_tag_printf("line", "%.*s", (gint) len, str),
102 log_pipe_location_tag(&self->super.super.super.super.super));
103 return TRUE;
104 }
105
106 static gboolean
_subscribe_topic(MQTTSourceDriver * self)107 _subscribe_topic(MQTTSourceDriver *self)
108 {
109 gint rc;
110 if ((rc = MQTTClient_subscribe(self->client, self->topic,
111 mqtt_client_options_get_qos(&self->options))) != MQTTCLIENT_SUCCESS)
112 {
113 msg_error("mqtt: Error while subscribing to topic",
114 evt_tag_str("topic", self->topic),
115 evt_tag_int("qos", mqtt_client_options_get_qos(&self->options)),
116 evt_tag_str("error code", MQTTClient_strerror(rc)),
117 evt_tag_str("driver", self->super.super.super.super.id),
118 log_pipe_location_tag(&self->super.super.super.super.super));
119 return FALSE;
120 }
121
122 return TRUE;
123 }
124
125 static void
_thread_init(LogThreadedFetcherDriver * s)126 _thread_init(LogThreadedFetcherDriver *s)
127 {
128 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
129 _client_init(self);
130 }
131
132 static void
_thread_deinit(LogThreadedFetcherDriver * s)133 _thread_deinit(LogThreadedFetcherDriver *s)
134 {
135 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
136
137 MQTTClient_destroy(&self->client);
138 }
139
140 static gboolean
_connect(LogThreadedFetcherDriver * s)141 _connect(LogThreadedFetcherDriver *s)
142 {
143 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
144
145 gint rc;
146
147 MQTTClient_connectOptions conn_opts;
148 MQTTClient_SSLOptions ssl_opts;
149 mqtt_client_options_to_mqtt_client_connection_option(&self->options, &conn_opts, &ssl_opts);
150
151 if ((rc = MQTTClient_connect(self->client, &conn_opts)) != MQTTCLIENT_SUCCESS)
152 {
153 msg_error("Error connecting mqtt client",
154 evt_tag_str("error code", MQTTClient_strerror(rc)),
155 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
156 log_pipe_location_tag(&self->super.super.super.super.super));
157 return FALSE;
158 }
159
160 if (!_subscribe_topic(self))
161 return FALSE;
162
163 return TRUE;
164 }
165
166 static void
_disconnect(LogThreadedFetcherDriver * s)167 _disconnect(LogThreadedFetcherDriver *s)
168 {
169 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
170
171 MQTTClient_disconnect(self->client, MQTT_DISCONNECT_TIMEOUT);
172 }
173
174 static ThreadedFetchResult
_receive_result_evaluation(gint rc,MQTTClient_message * message)175 _receive_result_evaluation(gint rc, MQTTClient_message *message)
176 {
177 if (rc == MQTTCLIENT_SUCCESS && message != NULL)
178 return THREADED_FETCH_SUCCESS;
179
180 if (rc == MQTTCLIENT_TOPICNAME_TRUNCATED && message != NULL)
181 return THREADED_FETCH_SUCCESS;
182
183 if (rc == MQTTCLIENT_SUCCESS && message == NULL)
184 // timeout
185 return THREADED_FETCH_NO_DATA;
186
187 return THREADED_FETCH_ERROR;
188 }
189
190 static LogThreadedFetchResult
_fetch(LogThreadedFetcherDriver * s)191 _fetch(LogThreadedFetcherDriver *s)
192 {
193 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
194 ThreadedFetchResult result = THREADED_FETCH_ERROR;
195 char *topicName = NULL;
196 int topicLen;
197 MQTTClient_message *message = NULL;
198 LogMessage *msg = NULL;
199 gint rc = MQTTClient_receive(self->client, &topicName, &topicLen, &message, RECEIVE_TIMEOUT);
200
201 result = _receive_result_evaluation(rc, message);
202
203 if (result == THREADED_FETCH_SUCCESS)
204 {
205 msg = _create_message(self, (gchar *)message->payload, message->payloadlen);
206 MQTTClient_freeMessage(&message);
207 MQTTClient_free(topicName);
208 }
209
210 if (result == THREADED_FETCH_ERROR)
211 {
212 msg_error("Error while receiving msg",
213 evt_tag_str("error code", MQTTClient_strerror(rc)),
214 evt_tag_str("client_id", mqtt_client_options_get_client_id(&self->options)),
215 log_pipe_location_tag(&self->super.super.super.super.super));
216 }
217
218 return (LogThreadedFetchResult)
219 {
220 result, msg
221 };
222 }
223
224 static gboolean
_init(LogPipe * s)225 _init(LogPipe *s)
226 {
227 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
228
229 if (!self->topic)
230 {
231 msg_error("mqtt: the topic() argument is required for mqtt source",
232 log_pipe_location_tag(&self->super.super.super.super.super));
233 return FALSE;
234 }
235
236 if(!mqtt_client_options_checker(&self->options))
237 return FALSE;
238
239 if(!log_threaded_fetcher_driver_init_method(s))
240 return FALSE;
241
242 if (mqtt_client_options_get_client_id(&self->options) == NULL)
243 {
244 gchar *tmp_client_id;
245
246 tmp_client_id = g_strdup_printf("syslog-ng-source-%s", self->topic);
247
248 mqtt_client_options_set_client_id(&self->options, tmp_client_id);
249 g_free(tmp_client_id);
250 }
251
252 return TRUE;
253 }
254
255 static gboolean
_deinit(LogPipe * s)256 _deinit(LogPipe *s)
257 {
258 return log_threaded_fetcher_driver_deinit_method(s);
259 }
260
261 static void
_free(LogPipe * s)262 _free(LogPipe *s)
263 {
264 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
265 mqtt_client_options_destroy(&self->options);
266 g_free(self->topic);
267
268 log_threaded_fetcher_driver_free_method(s);
269 }
270
271 LogDriver *
mqtt_sd_new(GlobalConfig * cfg)272 mqtt_sd_new(GlobalConfig *cfg)
273 {
274 MQTTSourceDriver *self = g_new0(MQTTSourceDriver, 1);
275
276 log_threaded_fetcher_driver_init_instance(&self->super, cfg);
277
278 mqtt_client_options_defaults(&self->options);
279 mqtt_client_options_set_log_ssl_error_fn(&self->options, self, _log_ssl_errors);
280
281 self->super.super.super.super.super.init = _init;
282 self->super.super.super.super.super.deinit = _deinit;
283 self->super.super.super.super.super.free_fn = _free;
284
285 self->super.connect = _connect;
286 self->super.disconnect = _disconnect;
287 self->super.fetch = _fetch;
288 self->super.thread_init = _thread_init;
289 self->super.thread_deinit = _thread_deinit;
290
291 self->super.super.super.super.super.generate_persist_name = _format_persist_name;
292 self->super.super.format_stats_instance = _format_stats_instance;
293 return &self->super.super.super.super;
294 }
295
296 MQTTClientOptions *
mqtt_sd_get_options(LogDriver * s)297 mqtt_sd_get_options(LogDriver *s)
298 {
299 MQTTSourceDriver *self = (MQTTSourceDriver *)s;
300
301 return &self->options;
302 }
303