1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18** 19**/ 20 21/* 22** We use the library Eclipse Paho (eclipse/paho.mqtt.golang), which is 23** distributed under the terms of the Eclipse Distribution License 1.0 (The 3-Clause BSD License) 24** available at https://www.eclipse.org/org/documents/edl-v10.php 25**/ 26 27package mqtt 28 29import ( 30 "crypto/rand" 31 "encoding/json" 32 "errors" 33 "fmt" 34 "net/url" 35 "strings" 36 "time" 37 38 mqtt "github.com/eclipse/paho.mqtt.golang" 39 "zabbix.com/pkg/conf" 40 "zabbix.com/pkg/itemutil" 41 "zabbix.com/pkg/plugin" 42 "zabbix.com/pkg/version" 43 "zabbix.com/pkg/watch" 44) 45 46type mqttClient struct { 47 client mqtt.Client 48 broker broker 49 subs map[string]*mqttSub 50 opts *mqtt.ClientOptions 51 connected bool 52} 53 54type mqttSub struct { 55 broker broker 56 topic string 57 wildCard bool 58} 59 60type broker struct { 61 url string 62 username string 63 password string 64} 65type Plugin struct { 66 plugin.Base 67 options Options 68 manager *watch.Manager 69 mqttClients map[broker]*mqttClient 70} 71 72var impl Plugin 73 74type Options struct { 75 Timeout int `conf:"optional,range=1:30"` 76} 77 78func (p *Plugin) Configure(global *plugin.GlobalOptions, options interface{}) { 79 if err := conf.Unmarshal(options, &p.options); err != nil { 80 p.Warningf("cannot unmarshal configuration options: %s", err) 81 } 82 if p.options.Timeout == 0 { 83 p.options.Timeout = global.Timeout 84 } 85} 86 87func (p *Plugin) Validate(options interface{}) error { 88 var o Options 89 return conf.Unmarshal(options, &o) 90} 91 92func (p *Plugin) createOptions(clientid, username, password string, b broker) *mqtt.ClientOptions { 93 opts := mqtt.NewClientOptions().AddBroker(b.url).SetClientID(clientid).SetCleanSession(true).SetConnectTimeout( 94 time.Duration(impl.options.Timeout) * time.Second) 95 if username != "" { 96 opts.SetUsername(username) 97 if password != "" { 98 opts.SetPassword(password) 99 } 100 } 101 102 opts.OnConnectionLost = func(client mqtt.Client, reason error) { 103 impl.Warningf("connection lost to [%s]: %s", b.url, reason.Error()) 104 } 105 106 opts.OnConnect = func(client mqtt.Client) { 107 impl.Debugf("connected to [%s]", b.url) 108 109 impl.manager.Lock() 110 defer impl.manager.Unlock() 111 112 mc, ok := p.mqttClients[b] 113 if !ok || mc == nil || mc.client == nil { 114 impl.Warningf("cannot subscribe to [%s]: broker is not connected", b.url) 115 return 116 } 117 118 mc.connected = true 119 for _, ms := range mc.subs { 120 if err := ms.subscribe(mc); err != nil { 121 impl.Warningf("cannot subscribe topic '%s' to [%s]: %s", ms.topic, b.url, err) 122 impl.manager.Notify(ms, err) 123 } 124 } 125 } 126 127 return opts 128} 129 130func newClient(options *mqtt.ClientOptions) (mqtt.Client, error) { 131 c := mqtt.NewClient(options) 132 token := c.Connect() 133 if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) { 134 c.Disconnect(200) 135 return nil, fmt.Errorf("timed out while connecting") 136 } 137 138 if token.Error() != nil { 139 return nil, token.Error() 140 } 141 142 return c, nil 143} 144 145func (ms *mqttSub) handler(client mqtt.Client, msg mqtt.Message) { 146 impl.manager.Lock() 147 impl.Tracef("received publish from [%s] on topic '%s' got: %s", ms.broker.url, msg.Topic(), string(msg.Payload())) 148 impl.manager.Notify(ms, msg) 149 impl.manager.Unlock() 150} 151 152func (ms *mqttSub) subscribe(mc *mqttClient) error { 153 impl.Tracef("subscribing '%s' to [%s]", ms.topic, ms.broker.url) 154 155 token := mc.client.Subscribe(ms.topic, 0, ms.handler) 156 if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) { 157 return fmt.Errorf("timed out while subscribing") 158 } 159 160 if token.Error() != nil { 161 return token.Error() 162 } 163 164 impl.Tracef("subscribed '%s' to [%s]", ms.topic, ms.broker.url) 165 return nil 166} 167 168//Watch MQTT plugin 169func (p *Plugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) { 170 impl.manager.Lock() 171 impl.manager.Update(ctx.ClientID(), ctx.Output(), requests) 172 impl.manager.Unlock() 173} 174 175func (ms *mqttSub) Initialize() (err error) { 176 mc, ok := impl.mqttClients[ms.broker] 177 if !ok || mc == nil { 178 return fmt.Errorf("Cannot connect to [%s]: broker could not be initialized", ms.broker.url) 179 } 180 181 if mc.client == nil { 182 impl.Debugf("establishing connection to [%s]", ms.broker.url) 183 mc.client, err = newClient(mc.opts) 184 if err != nil { 185 impl.Warningf("cannot establish connection to [%s]: %s", ms.broker.url, err) 186 return 187 } 188 189 impl.Debugf("established connection to [%s]", ms.broker.url) 190 return 191 } 192 193 if mc.connected { 194 return ms.subscribe(mc) 195 } 196 197 return 198} 199 200func (ms *mqttSub) Release() { 201 mc, ok := impl.mqttClients[ms.broker] 202 if !ok || mc == nil || mc.client == nil { 203 impl.Errf("cannot release [%s]: broker was not initialized", ms.broker.url) 204 return 205 } 206 207 impl.Tracef("unsubscribing topic '%s' from [%s]", ms.topic, ms.broker.url) 208 token := mc.client.Unsubscribe(ms.topic) 209 if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) { 210 impl.Errf("cannot unsubscribe topic '%s' from [%s]: timed out", ms.topic, ms.broker.url) 211 } 212 213 if token.Error() != nil { 214 impl.Errf("cannot unsubscribe topic '%s' from [%s]: %s", ms.topic, ms.broker.url, token.Error()) 215 } 216 217 delete(mc.subs, ms.topic) 218 impl.Tracef("unsubscribed topic '%s' from [%s]", ms.topic, ms.broker.url) 219 if len(mc.subs) == 0 { 220 impl.Debugf("disconnecting from [%s]", ms.broker.url) 221 mc.client.Disconnect(200) 222 delete(impl.mqttClients, mc.broker) 223 } 224} 225 226type respFilter struct { 227 wildcard bool 228} 229 230func (f *respFilter) Process(v interface{}) (s *string, err error) { 231 m, ok := v.(mqtt.Message) 232 if !ok { 233 if err, ok = v.(error); !ok { 234 err = fmt.Errorf("unexpected input type %T", v) 235 } 236 return 237 } 238 239 var value string 240 if f.wildcard { 241 j, err := json.Marshal(map[string]string{m.Topic(): string(m.Payload())}) 242 if err != nil { 243 return nil, err 244 } 245 value = string(j) 246 } else { 247 value = string(m.Payload()) 248 } 249 250 return &value, nil 251} 252 253func (ms *mqttSub) NewFilter(key string) (filter watch.EventFilter, err error) { 254 return &respFilter{ms.wildCard}, nil 255} 256 257func (p *Plugin) EventSourceByKey(key string) (es watch.EventSource, err error) { 258 var params []string 259 if _, params, err = itemutil.ParseKey(key); err != nil { 260 return 261 } 262 if len(params) > 4 { 263 return nil, fmt.Errorf("Too many parameters.") 264 } 265 266 if len(params) < 2 || "" == params[1] { 267 return nil, errors.New("Invalid second parameter.") 268 } 269 270 topic := params[1] 271 url, err := parseURL(params[0]) 272 if err != nil { 273 return nil, err 274 } 275 276 var username, password string 277 if len(params) > 2 { 278 username = params[2] 279 } 280 281 if len(params) > 3 { 282 password = params[3] 283 } 284 285 broker := broker{url.String(), username, password} 286 var client *mqttClient 287 var ok bool 288 if client, ok = p.mqttClients[broker]; !ok { 289 impl.Tracef("creating client for [%s]", broker.url) 290 291 client = &mqttClient{nil, broker, make(map[string]*mqttSub), p.createOptions(getClientID(), username, password, 292 broker), false} 293 p.mqttClients[broker] = client 294 } 295 296 var sub *mqttSub 297 if sub, ok = client.subs[topic]; !ok { 298 impl.Tracef("creating new subscriber on topic '%s' for [%s]", topic, broker.url) 299 300 sub = &mqttSub{broker, topic, hasWildCards(topic)} 301 client.subs[topic] = sub 302 } 303 304 return sub, nil 305} 306 307func getClientID() string { 308 b := make([]byte, 16) 309 _, err := rand.Read(b) 310 if err != nil { 311 impl.Errf("failed to generate a uuid for mqtt Client ID: %s", err.Error) 312 return "Zabbix agent 2 " + version.Long() 313 } 314 return fmt.Sprintf("Zabbix agent 2 %s %x-%x-%x-%x-%x", version.Long(), b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) 315} 316 317func hasWildCards(topic string) bool { 318 return strings.HasSuffix(topic, "#") || strings.Contains(topic, "+") 319} 320 321func parseURL(rawUrl string) (out *url.URL, err error) { 322 if len(rawUrl) == 0 { 323 rawUrl = "localhost" 324 } 325 326 if !strings.Contains(rawUrl, "://") { 327 rawUrl = "tcp://" + rawUrl 328 } 329 330 out, err = url.Parse(rawUrl) 331 if err != nil { 332 return 333 } 334 335 if out.Port() != "" && out.Hostname() == "" { 336 return nil, errors.New("Host is required.") 337 } 338 339 if out.Port() == "" { 340 out.Host = fmt.Sprintf("%s:1883", out.Host) 341 } 342 343 if len(out.Query()) > 0 { 344 return nil, errors.New("URL should not contain query parameters.") 345 } 346 347 return 348} 349 350func init() { 351 impl.manager = watch.NewManager(&impl) 352 impl.mqttClients = make(map[broker]*mqttClient) 353 354 plugin.RegisterMetrics(&impl, "MQTT", "mqtt.get", "Subscribe to MQTT topics for published messages.") 355} 356