1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**
19**/
20
21/*
22** We use the library Eclipse Paho (eclipse/paho.mqtt.golang), which is
23** distributed under the terms of the Eclipse Distribution License 1.0 (The 3-Clause BSD License)
24** available at https://www.eclipse.org/org/documents/edl-v10.php
25**/
26
27package mqtt
28
29import (
30	"crypto/rand"
31	"encoding/json"
32	"errors"
33	"fmt"
34	"net/url"
35	"strings"
36	"time"
37
38	mqtt "github.com/eclipse/paho.mqtt.golang"
39	"zabbix.com/pkg/conf"
40	"zabbix.com/pkg/itemutil"
41	"zabbix.com/pkg/plugin"
42	"zabbix.com/pkg/version"
43	"zabbix.com/pkg/watch"
44)
45
46type mqttClient struct {
47	client    mqtt.Client
48	broker    broker
49	subs      map[string]*mqttSub
50	opts      *mqtt.ClientOptions
51	connected bool
52}
53
54type mqttSub struct {
55	broker   broker
56	topic    string
57	wildCard bool
58}
59
60type broker struct {
61	url      string
62	username string
63	password string
64}
65type Plugin struct {
66	plugin.Base
67	options     Options
68	manager     *watch.Manager
69	mqttClients map[broker]*mqttClient
70}
71
72var impl Plugin
73
74type Options struct {
75	Timeout int `conf:"optional,range=1:30"`
76}
77
78func (p *Plugin) Configure(global *plugin.GlobalOptions, options interface{}) {
79	if err := conf.Unmarshal(options, &p.options); err != nil {
80		p.Warningf("cannot unmarshal configuration options: %s", err)
81	}
82	if p.options.Timeout == 0 {
83		p.options.Timeout = global.Timeout
84	}
85}
86
87func (p *Plugin) Validate(options interface{}) error {
88	var o Options
89	return conf.Unmarshal(options, &o)
90}
91
92func (p *Plugin) createOptions(clientid, username, password string, b broker) *mqtt.ClientOptions {
93	opts := mqtt.NewClientOptions().AddBroker(b.url).SetClientID(clientid).SetCleanSession(true).SetConnectTimeout(
94		time.Duration(impl.options.Timeout) * time.Second)
95	if username != "" {
96		opts.SetUsername(username)
97		if password != "" {
98			opts.SetPassword(password)
99		}
100	}
101
102	opts.OnConnectionLost = func(client mqtt.Client, reason error) {
103		impl.Warningf("connection lost to [%s]: %s", b.url, reason.Error())
104	}
105
106	opts.OnConnect = func(client mqtt.Client) {
107		impl.Debugf("connected to [%s]", b.url)
108
109		impl.manager.Lock()
110		defer impl.manager.Unlock()
111
112		mc, ok := p.mqttClients[b]
113		if !ok || mc == nil || mc.client == nil {
114			impl.Warningf("cannot subscribe to [%s]: broker is not connected", b.url)
115			return
116		}
117
118		mc.connected = true
119		for _, ms := range mc.subs {
120			if err := ms.subscribe(mc); err != nil {
121				impl.Warningf("cannot subscribe topic '%s' to [%s]: %s", ms.topic, b.url, err)
122				impl.manager.Notify(ms, err)
123			}
124		}
125	}
126
127	return opts
128}
129
130func newClient(options *mqtt.ClientOptions) (mqtt.Client, error) {
131	c := mqtt.NewClient(options)
132	token := c.Connect()
133	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
134		c.Disconnect(200)
135		return nil, fmt.Errorf("timed out while connecting")
136	}
137
138	if token.Error() != nil {
139		return nil, token.Error()
140	}
141
142	return c, nil
143}
144
145func (ms *mqttSub) handler(client mqtt.Client, msg mqtt.Message) {
146	impl.manager.Lock()
147	impl.Tracef("received publish from [%s] on topic '%s' got: %s", ms.broker.url, msg.Topic(), string(msg.Payload()))
148	impl.manager.Notify(ms, msg)
149	impl.manager.Unlock()
150}
151
152func (ms *mqttSub) subscribe(mc *mqttClient) error {
153	impl.Tracef("subscribing '%s' to [%s]", ms.topic, ms.broker.url)
154
155	token := mc.client.Subscribe(ms.topic, 0, ms.handler)
156	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
157		return fmt.Errorf("timed out while subscribing")
158	}
159
160	if token.Error() != nil {
161		return token.Error()
162	}
163
164	impl.Tracef("subscribed '%s' to [%s]", ms.topic, ms.broker.url)
165	return nil
166}
167
168//Watch MQTT plugin
169func (p *Plugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) {
170	impl.manager.Lock()
171	impl.manager.Update(ctx.ClientID(), ctx.Output(), requests)
172	impl.manager.Unlock()
173}
174
175func (ms *mqttSub) Initialize() (err error) {
176	mc, ok := impl.mqttClients[ms.broker]
177	if !ok || mc == nil {
178		return fmt.Errorf("Cannot connect to [%s]: broker could not be initialized", ms.broker.url)
179	}
180
181	if mc.client == nil {
182		impl.Debugf("establishing connection to [%s]", ms.broker.url)
183		mc.client, err = newClient(mc.opts)
184		if err != nil {
185			impl.Warningf("cannot establish connection to [%s]: %s", ms.broker.url, err)
186			return
187		}
188
189		impl.Debugf("established connection to [%s]", ms.broker.url)
190		return
191	}
192
193	if mc.connected {
194		return ms.subscribe(mc)
195	}
196
197	return
198}
199
200func (ms *mqttSub) Release() {
201	mc, ok := impl.mqttClients[ms.broker]
202	if !ok || mc == nil || mc.client == nil {
203		impl.Errf("cannot release [%s]: broker was not initialized", ms.broker.url)
204		return
205	}
206
207	impl.Tracef("unsubscribing topic '%s' from [%s]", ms.topic, ms.broker.url)
208	token := mc.client.Unsubscribe(ms.topic)
209	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
210		impl.Errf("cannot unsubscribe topic '%s' from [%s]: timed out", ms.topic, ms.broker.url)
211	}
212
213	if token.Error() != nil {
214		impl.Errf("cannot unsubscribe topic '%s' from [%s]: %s", ms.topic, ms.broker.url, token.Error())
215	}
216
217	delete(mc.subs, ms.topic)
218	impl.Tracef("unsubscribed topic '%s' from [%s]", ms.topic, ms.broker.url)
219	if len(mc.subs) == 0 {
220		impl.Debugf("disconnecting from [%s]", ms.broker.url)
221		mc.client.Disconnect(200)
222		delete(impl.mqttClients, mc.broker)
223	}
224}
225
226type respFilter struct {
227	wildcard bool
228}
229
230func (f *respFilter) Process(v interface{}) (s *string, err error) {
231	m, ok := v.(mqtt.Message)
232	if !ok {
233		if err, ok = v.(error); !ok {
234			err = fmt.Errorf("unexpected input type %T", v)
235		}
236		return
237	}
238
239	var value string
240	if f.wildcard {
241		j, err := json.Marshal(map[string]string{m.Topic(): string(m.Payload())})
242		if err != nil {
243			return nil, err
244		}
245		value = string(j)
246	} else {
247		value = string(m.Payload())
248	}
249
250	return &value, nil
251}
252
253func (ms *mqttSub) NewFilter(key string) (filter watch.EventFilter, err error) {
254	return &respFilter{ms.wildCard}, nil
255}
256
257func (p *Plugin) EventSourceByKey(key string) (es watch.EventSource, err error) {
258	var params []string
259	if _, params, err = itemutil.ParseKey(key); err != nil {
260		return
261	}
262	if len(params) > 4 {
263		return nil, fmt.Errorf("Too many parameters.")
264	}
265
266	if len(params) < 2 || "" == params[1] {
267		return nil, errors.New("Invalid second parameter.")
268	}
269
270	topic := params[1]
271	url, err := parseURL(params[0])
272	if err != nil {
273		return nil, err
274	}
275
276	var username, password string
277	if len(params) > 2 {
278		username = params[2]
279	}
280
281	if len(params) > 3 {
282		password = params[3]
283	}
284
285	broker := broker{url.String(), username, password}
286	var client *mqttClient
287	var ok bool
288	if client, ok = p.mqttClients[broker]; !ok {
289		impl.Tracef("creating client for [%s]", broker.url)
290
291		client = &mqttClient{nil, broker, make(map[string]*mqttSub), p.createOptions(getClientID(), username, password,
292			broker), false}
293		p.mqttClients[broker] = client
294	}
295
296	var sub *mqttSub
297	if sub, ok = client.subs[topic]; !ok {
298		impl.Tracef("creating new subscriber on topic '%s' for [%s]", topic, broker.url)
299
300		sub = &mqttSub{broker, topic, hasWildCards(topic)}
301		client.subs[topic] = sub
302	}
303
304	return sub, nil
305}
306
307func getClientID() string {
308	b := make([]byte, 16)
309	_, err := rand.Read(b)
310	if err != nil {
311		impl.Errf("failed to generate a uuid for mqtt Client ID: %s", err.Error)
312		return "Zabbix agent 2 " + version.Long()
313	}
314	return fmt.Sprintf("Zabbix agent 2 %s %x-%x-%x-%x-%x", version.Long(), b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
315}
316
317func hasWildCards(topic string) bool {
318	return strings.HasSuffix(topic, "#") || strings.Contains(topic, "+")
319}
320
321func parseURL(rawUrl string) (out *url.URL, err error) {
322	if len(rawUrl) == 0 {
323		rawUrl = "localhost"
324	}
325
326	if !strings.Contains(rawUrl, "://") {
327		rawUrl = "tcp://" + rawUrl
328	}
329
330	out, err = url.Parse(rawUrl)
331	if err != nil {
332		return
333	}
334
335	if out.Port() != "" && out.Hostname() == "" {
336		return nil, errors.New("Host is required.")
337	}
338
339	if out.Port() == "" {
340		out.Host = fmt.Sprintf("%s:1883", out.Host)
341	}
342
343	if len(out.Query()) > 0 {
344		return nil, errors.New("URL should not contain query parameters.")
345	}
346
347	return
348}
349
350func init() {
351	impl.manager = watch.NewManager(&impl)
352	impl.mqttClients = make(map[broker]*mqttClient)
353
354	plugin.RegisterMetrics(&impl, "MQTT", "mqtt.get", "Subscribe to MQTT topics for published messages.")
355}
356