1/*
2 * Copyright (c) 2013 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 *    Seth Hoenig
11 *    Allan Stockdill-Mander
12 *    Mike Robertson
13 */
14
15// Portions copyright © 2018 TIBCO Software Inc.
16
17// Package mqtt provides an MQTT v3.1.1 client library.
18package mqtt
19
20import (
21	"errors"
22	"fmt"
23	"net"
24	"strings"
25	"sync"
26	"sync/atomic"
27	"time"
28
29	"github.com/eclipse/paho.mqtt.golang/packets"
30)
31
32const (
33	disconnected uint32 = iota
34	connecting
35	reconnecting
36	connected
37)
38
39// Client is the interface definition for a Client as used by this
40// library, the interface is primarily to allow mocking tests.
41//
42// It is an MQTT v3.1.1 client for communicating
43// with an MQTT server using non-blocking methods that allow work
44// to be done in the background.
45// An application may connect to an MQTT server using:
46//   A plain TCP socket
47//   A secure SSL/TLS socket
48//   A websocket
49// To enable ensured message delivery at Quality of Service (QoS) levels
50// described in the MQTT spec, a message persistence mechanism must be
51// used. This is done by providing a type which implements the Store
52// interface. For convenience, FileStore and MemoryStore are provided
53// implementations that should be sufficient for most use cases. More
54// information can be found in their respective documentation.
55// Numerous connection options may be specified by configuring a
56// and then supplying a ClientOptions type.
57type Client interface {
58	// IsConnected returns a bool signifying whether
59	// the client is connected or not.
60	IsConnected() bool
61	// IsConnectionOpen return a bool signifying wether the client has an active
62	// connection to mqtt broker, i.e not in disconnected or reconnect mode
63	IsConnectionOpen() bool
64	// Connect will create a connection to the message broker, by default
65	// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
66	// fails
67	Connect() Token
68	// Disconnect will end the connection with the server, but not before waiting
69	// the specified number of milliseconds to wait for existing work to be
70	// completed.
71	Disconnect(quiesce uint)
72	// Publish will publish a message with the specified QoS and content
73	// to the specified topic.
74	// Returns a token to track delivery of the message to the broker
75	Publish(topic string, qos byte, retained bool, payload interface{}) Token
76	// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
77	// a message is published on the topic provided, or nil for the default handler
78	Subscribe(topic string, qos byte, callback MessageHandler) Token
79	// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
80	// be executed when a message is published on one of the topics provided, or nil for the
81	// default handler
82	SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
83	// Unsubscribe will end the subscription from each of the topics provided.
84	// Messages published to those topics from other clients will no longer be
85	// received.
86	Unsubscribe(topics ...string) Token
87	// AddRoute allows you to add a handler for messages on a specific topic
88	// without making a subscription. For example having a different handler
89	// for parts of a wildcard subscription
90	AddRoute(topic string, callback MessageHandler)
91	// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
92	// in use by the client.
93	OptionsReader() ClientOptionsReader
94}
95
96// client implements the Client interface
97type client struct {
98	lastSent        atomic.Value
99	lastReceived    atomic.Value
100	pingOutstanding int32
101	status          uint32
102	sync.RWMutex
103	messageIds
104	conn            net.Conn
105	ibound          chan packets.ControlPacket
106	obound          chan *PacketAndToken
107	oboundP         chan *PacketAndToken
108	msgRouter       *router
109	stopRouter      chan bool
110	incomingPubChan chan *packets.PublishPacket
111	errors          chan error
112	stop            chan struct{}
113	persist         Store
114	options         ClientOptions
115	workers         sync.WaitGroup
116}
117
118// NewClient will create an MQTT v3.1.1 client with all of the options specified
119// in the provided ClientOptions. The client must have the Connect method called
120// on it before it may be used. This is to make sure resources (such as a net
121// connection) are created before the application is actually ready.
122func NewClient(o *ClientOptions) Client {
123	c := &client{}
124	c.options = *o
125
126	if c.options.Store == nil {
127		c.options.Store = NewMemoryStore()
128	}
129	switch c.options.ProtocolVersion {
130	case 3, 4:
131		c.options.protocolVersionExplicit = true
132	case 0x83, 0x84:
133		c.options.protocolVersionExplicit = true
134	default:
135		c.options.ProtocolVersion = 4
136		c.options.protocolVersionExplicit = false
137	}
138	c.persist = c.options.Store
139	c.status = disconnected
140	c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
141	c.msgRouter, c.stopRouter = newRouter()
142	c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
143	if !c.options.AutoReconnect {
144		c.options.MessageChannelDepth = 0
145	}
146	return c
147}
148
149// AddRoute allows you to add a handler for messages on a specific topic
150// without making a subscription. For example having a different handler
151// for parts of a wildcard subscription
152func (c *client) AddRoute(topic string, callback MessageHandler) {
153	if callback != nil {
154		c.msgRouter.addRoute(topic, callback)
155	}
156}
157
158// IsConnected returns a bool signifying whether
159// the client is connected or not.
160func (c *client) IsConnected() bool {
161	c.RLock()
162	defer c.RUnlock()
163	status := atomic.LoadUint32(&c.status)
164	switch {
165	case status == connected:
166		return true
167	case c.options.AutoReconnect && status > connecting:
168		return true
169	default:
170		return false
171	}
172}
173
174// IsConnectionOpen return a bool signifying whether the client has an active
175// connection to mqtt broker, i.e not in disconnected or reconnect mode
176func (c *client) IsConnectionOpen() bool {
177	c.RLock()
178	defer c.RUnlock()
179	status := atomic.LoadUint32(&c.status)
180	switch {
181	case status == connected:
182		return true
183	default:
184		return false
185	}
186}
187
188func (c *client) connectionStatus() uint32 {
189	c.RLock()
190	defer c.RUnlock()
191	status := atomic.LoadUint32(&c.status)
192	return status
193}
194
195func (c *client) setConnected(status uint32) {
196	c.Lock()
197	defer c.Unlock()
198	atomic.StoreUint32(&c.status, uint32(status))
199}
200
201//ErrNotConnected is the error returned from function calls that are
202//made when the client is not connected to a broker
203var ErrNotConnected = errors.New("Not Connected")
204
205// Connect will create a connection to the message broker, by default
206// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
207// fails
208func (c *client) Connect() Token {
209	var err error
210	t := newToken(packets.Connect).(*ConnectToken)
211	DEBUG.Println(CLI, "Connect()")
212
213	c.obound = make(chan *PacketAndToken, c.options.MessageChannelDepth)
214	c.oboundP = make(chan *PacketAndToken, c.options.MessageChannelDepth)
215	c.ibound = make(chan packets.ControlPacket)
216
217	go func() {
218		c.persist.Open()
219
220		c.setConnected(connecting)
221		c.errors = make(chan error, 1)
222		c.stop = make(chan struct{})
223
224		var rc byte
225		protocolVersion := c.options.ProtocolVersion
226
227		if len(c.options.Servers) == 0 {
228			t.setError(fmt.Errorf("No servers defined to connect to"))
229			return
230		}
231
232		for _, broker := range c.options.Servers {
233			cm := newConnectMsgFromOptions(&c.options, broker)
234			c.options.ProtocolVersion = protocolVersion
235		CONN:
236			DEBUG.Println(CLI, "about to write new connect msg")
237			c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
238			if err == nil {
239				DEBUG.Println(CLI, "socket connected to broker")
240				switch c.options.ProtocolVersion {
241				case 3:
242					DEBUG.Println(CLI, "Using MQTT 3.1 protocol")
243					cm.ProtocolName = "MQIsdp"
244					cm.ProtocolVersion = 3
245				case 0x83:
246					DEBUG.Println(CLI, "Using MQTT 3.1b protocol")
247					cm.ProtocolName = "MQIsdp"
248					cm.ProtocolVersion = 0x83
249				case 0x84:
250					DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol")
251					cm.ProtocolName = "MQTT"
252					cm.ProtocolVersion = 0x84
253				default:
254					DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol")
255					c.options.ProtocolVersion = 4
256					cm.ProtocolName = "MQTT"
257					cm.ProtocolVersion = 4
258				}
259				cm.Write(c.conn)
260
261				rc, t.sessionPresent = c.connect()
262				if rc != packets.Accepted {
263					if c.conn != nil {
264						c.conn.Close()
265						c.conn = nil
266					}
267					//if the protocol version was explicitly set don't do any fallback
268					if c.options.protocolVersionExplicit {
269						ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
270						continue
271					}
272					if c.options.ProtocolVersion == 4 {
273						DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
274						c.options.ProtocolVersion = 3
275						goto CONN
276					}
277				}
278				break
279			} else {
280				ERROR.Println(CLI, err.Error())
281				WARN.Println(CLI, "failed to connect to broker, trying next")
282				rc = packets.ErrNetworkError
283			}
284		}
285
286		if c.conn == nil {
287			ERROR.Println(CLI, "Failed to connect to a broker")
288			c.setConnected(disconnected)
289			c.persist.Close()
290			t.returnCode = rc
291			if rc != packets.ErrNetworkError {
292				t.setError(packets.ConnErrors[rc])
293			} else {
294				t.setError(fmt.Errorf("%s : %s", packets.ConnErrors[rc], err))
295			}
296			return
297		}
298
299		c.options.protocolVersionExplicit = true
300
301		if c.options.KeepAlive != 0 {
302			atomic.StoreInt32(&c.pingOutstanding, 0)
303			c.lastReceived.Store(time.Now())
304			c.lastSent.Store(time.Now())
305			c.workers.Add(1)
306			go keepalive(c)
307		}
308
309		c.incomingPubChan = make(chan *packets.PublishPacket, c.options.MessageChannelDepth)
310		c.msgRouter.matchAndDispatch(c.incomingPubChan, c.options.Order, c)
311
312		c.setConnected(connected)
313		DEBUG.Println(CLI, "client is connected")
314		if c.options.OnConnect != nil {
315			go c.options.OnConnect(c)
316		}
317
318		c.workers.Add(4)
319		go errorWatch(c)
320		go alllogic(c)
321		go outgoing(c)
322		go incoming(c)
323
324		// Take care of any messages in the store
325		if c.options.CleanSession == false {
326			c.resume(c.options.ResumeSubs)
327		} else {
328			c.persist.Reset()
329		}
330
331		DEBUG.Println(CLI, "exit startClient")
332		t.flowComplete()
333	}()
334	return t
335}
336
337// internal function used to reconnect the client when it loses its connection
338func (c *client) reconnect() {
339	DEBUG.Println(CLI, "enter reconnect")
340	var (
341		err error
342
343		rc    = byte(1)
344		sleep = time.Duration(1 * time.Second)
345	)
346
347	for rc != 0 && atomic.LoadUint32(&c.status) != disconnected {
348		for _, broker := range c.options.Servers {
349			cm := newConnectMsgFromOptions(&c.options, broker)
350			DEBUG.Println(CLI, "about to write new connect msg")
351			c.Lock()
352			c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
353			c.Unlock()
354			if err == nil {
355				DEBUG.Println(CLI, "socket connected to broker")
356				switch c.options.ProtocolVersion {
357				case 0x83:
358					DEBUG.Println(CLI, "Using MQTT 3.1b protocol")
359					cm.ProtocolName = "MQIsdp"
360					cm.ProtocolVersion = 0x83
361				case 0x84:
362					DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol")
363					cm.ProtocolName = "MQTT"
364					cm.ProtocolVersion = 0x84
365				case 3:
366					DEBUG.Println(CLI, "Using MQTT 3.1 protocol")
367					cm.ProtocolName = "MQIsdp"
368					cm.ProtocolVersion = 3
369				default:
370					DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol")
371					cm.ProtocolName = "MQTT"
372					cm.ProtocolVersion = 4
373				}
374				cm.Write(c.conn)
375
376				rc, _ = c.connect()
377				if rc != packets.Accepted {
378					c.conn.Close()
379					c.conn = nil
380					//if the protocol version was explicitly set don't do any fallback
381					if c.options.protocolVersionExplicit {
382						ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not Accepted, but rather", packets.ConnackReturnCodes[rc])
383						continue
384					}
385				}
386				break
387			} else {
388				ERROR.Println(CLI, err.Error())
389				WARN.Println(CLI, "failed to connect to broker, trying next")
390				rc = packets.ErrNetworkError
391			}
392		}
393		if rc != 0 {
394			DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds")
395			time.Sleep(sleep)
396			if sleep < c.options.MaxReconnectInterval {
397				sleep *= 2
398			}
399
400			if sleep > c.options.MaxReconnectInterval {
401				sleep = c.options.MaxReconnectInterval
402			}
403		}
404	}
405	// Disconnect() must have been called while we were trying to reconnect.
406	if c.connectionStatus() == disconnected {
407		DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
408		return
409	}
410
411	c.stop = make(chan struct{})
412
413	if c.options.KeepAlive != 0 {
414		atomic.StoreInt32(&c.pingOutstanding, 0)
415		c.lastReceived.Store(time.Now())
416		c.lastSent.Store(time.Now())
417		c.workers.Add(1)
418		go keepalive(c)
419	}
420
421	c.setConnected(connected)
422	DEBUG.Println(CLI, "client is reconnected")
423	if c.options.OnConnect != nil {
424		go c.options.OnConnect(c)
425	}
426
427	c.workers.Add(4)
428	go errorWatch(c)
429	go alllogic(c)
430	go outgoing(c)
431	go incoming(c)
432
433	c.resume(false)
434}
435
436// This function is only used for receiving a connack
437// when the connection is first started.
438// This prevents receiving incoming data while resume
439// is in progress if clean session is false.
440func (c *client) connect() (byte, bool) {
441	DEBUG.Println(NET, "connect started")
442
443	ca, err := packets.ReadPacket(c.conn)
444	if err != nil {
445		ERROR.Println(NET, "connect got error", err)
446		return packets.ErrNetworkError, false
447	}
448	if ca == nil {
449		ERROR.Println(NET, "received nil packet")
450		return packets.ErrNetworkError, false
451	}
452
453	msg, ok := ca.(*packets.ConnackPacket)
454	if !ok {
455		ERROR.Println(NET, "received msg that was not CONNACK")
456		return packets.ErrNetworkError, false
457	}
458
459	DEBUG.Println(NET, "received connack")
460	return msg.ReturnCode, msg.SessionPresent
461}
462
463// Disconnect will end the connection with the server, but not before waiting
464// the specified number of milliseconds to wait for existing work to be
465// completed.
466func (c *client) Disconnect(quiesce uint) {
467	status := atomic.LoadUint32(&c.status)
468	if status == connected {
469		DEBUG.Println(CLI, "disconnecting")
470		c.setConnected(disconnected)
471
472		dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
473		dt := newToken(packets.Disconnect)
474		c.oboundP <- &PacketAndToken{p: dm, t: dt}
475
476		// wait for work to finish, or quiesce time consumed
477		dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
478	} else {
479		WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
480		c.setConnected(disconnected)
481	}
482
483	c.disconnect()
484}
485
486// ForceDisconnect will end the connection with the mqtt broker immediately.
487func (c *client) forceDisconnect() {
488	if !c.IsConnected() {
489		WARN.Println(CLI, "already disconnected")
490		return
491	}
492	c.setConnected(disconnected)
493	c.conn.Close()
494	DEBUG.Println(CLI, "forcefully disconnecting")
495	c.disconnect()
496}
497
498func (c *client) internalConnLost(err error) {
499	// Only do anything if this was called and we are still "connected"
500	// forceDisconnect can cause incoming/outgoing/alllogic to end with
501	// error from closing the socket but state will be "disconnected"
502	if c.IsConnected() {
503		c.closeStop()
504		c.conn.Close()
505		c.workers.Wait()
506		if c.options.CleanSession && !c.options.AutoReconnect {
507			c.messageIds.cleanUp()
508		}
509		if c.options.AutoReconnect {
510			c.setConnected(reconnecting)
511			go c.reconnect()
512		} else {
513			c.setConnected(disconnected)
514		}
515		if c.options.OnConnectionLost != nil {
516			go c.options.OnConnectionLost(c, err)
517		}
518	}
519}
520
521func (c *client) closeStop() {
522	c.Lock()
523	defer c.Unlock()
524	select {
525	case <-c.stop:
526		DEBUG.Println("In disconnect and stop channel is already closed")
527	default:
528		if c.stop != nil {
529			close(c.stop)
530		}
531	}
532}
533
534func (c *client) closeStopRouter() {
535	c.Lock()
536	defer c.Unlock()
537	select {
538	case <-c.stopRouter:
539		DEBUG.Println("In disconnect and stop channel is already closed")
540	default:
541		if c.stopRouter != nil {
542			close(c.stopRouter)
543		}
544	}
545}
546
547func (c *client) closeConn() {
548	c.Lock()
549	defer c.Unlock()
550	if c.conn != nil {
551		c.conn.Close()
552	}
553}
554
555func (c *client) disconnect() {
556	c.closeStop()
557	c.closeConn()
558	c.workers.Wait()
559	c.messageIds.cleanUp()
560	c.closeStopRouter()
561	DEBUG.Println(CLI, "disconnected")
562	c.persist.Close()
563}
564
565// Publish will publish a message with the specified QoS and content
566// to the specified topic.
567// Returns a token to track delivery of the message to the broker
568func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
569	token := newToken(packets.Publish).(*PublishToken)
570	DEBUG.Println(CLI, "enter Publish")
571	switch {
572	case !c.IsConnected():
573		token.setError(ErrNotConnected)
574		return token
575	case c.connectionStatus() == reconnecting && qos == 0:
576		token.flowComplete()
577		return token
578	}
579	pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
580	pub.Qos = qos
581	pub.TopicName = topic
582	pub.Retain = retained
583	switch payload.(type) {
584	case string:
585		pub.Payload = []byte(payload.(string))
586	case []byte:
587		pub.Payload = payload.([]byte)
588	default:
589		token.setError(fmt.Errorf("Unknown payload type"))
590		return token
591	}
592
593	if pub.Qos != 0 && pub.MessageID == 0 {
594		pub.MessageID = c.getID(token)
595		token.messageID = pub.MessageID
596	}
597	persistOutbound(c.persist, pub)
598	if c.connectionStatus() == reconnecting {
599		DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
600	} else {
601		DEBUG.Println(CLI, "sending publish message, topic:", topic)
602		c.obound <- &PacketAndToken{p: pub, t: token}
603	}
604	return token
605}
606
607// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
608// a message is published on the topic provided.
609func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
610	token := newToken(packets.Subscribe).(*SubscribeToken)
611	DEBUG.Println(CLI, "enter Subscribe")
612	if !c.IsConnected() {
613		token.setError(ErrNotConnected)
614		return token
615	}
616	sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
617	if err := validateTopicAndQos(topic, qos); err != nil {
618		token.setError(err)
619		return token
620	}
621	sub.Topics = append(sub.Topics, topic)
622	sub.Qoss = append(sub.Qoss, qos)
623	DEBUG.Println(CLI, sub.String())
624
625	if strings.HasPrefix(topic, "$share") {
626		topic = strings.Join(strings.Split(topic, "/")[2:], "/")
627	}
628
629	if callback != nil {
630		c.msgRouter.addRoute(topic, callback)
631	}
632
633	token.subs = append(token.subs, topic)
634	c.oboundP <- &PacketAndToken{p: sub, t: token}
635	DEBUG.Println(CLI, "exit Subscribe")
636	return token
637}
638
639// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
640// be executed when a message is published on one of the topics provided.
641func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
642	var err error
643	token := newToken(packets.Subscribe).(*SubscribeToken)
644	DEBUG.Println(CLI, "enter SubscribeMultiple")
645	if !c.IsConnected() {
646		token.setError(ErrNotConnected)
647		return token
648	}
649	sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
650	if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
651		token.setError(err)
652		return token
653	}
654
655	if callback != nil {
656		for topic := range filters {
657			c.msgRouter.addRoute(topic, callback)
658		}
659	}
660	token.subs = make([]string, len(sub.Topics))
661	copy(token.subs, sub.Topics)
662	c.oboundP <- &PacketAndToken{p: sub, t: token}
663	DEBUG.Println(CLI, "exit SubscribeMultiple")
664	return token
665}
666
667// Load all stored messages and resend them
668// Call this to ensure QOS > 1,2 even after an application crash
669func (c *client) resume(subscription bool) {
670
671	storedKeys := c.persist.All()
672	for _, key := range storedKeys {
673		packet := c.persist.Get(key)
674		if packet == nil {
675			continue
676		}
677		details := packet.Details()
678		if isKeyOutbound(key) {
679			switch packet.(type) {
680			case *packets.SubscribePacket:
681				if subscription {
682					DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
683					token := newToken(packets.Subscribe).(*SubscribeToken)
684					c.oboundP <- &PacketAndToken{p: packet, t: token}
685				}
686			case *packets.UnsubscribePacket:
687				if subscription {
688					DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
689					token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
690					c.oboundP <- &PacketAndToken{p: packet, t: token}
691				}
692			case *packets.PubrelPacket:
693				DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
694				select {
695				case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
696				case <-c.stop:
697				}
698			case *packets.PublishPacket:
699				token := newToken(packets.Publish).(*PublishToken)
700				token.messageID = details.MessageID
701				c.claimID(token, details.MessageID)
702				DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
703				DEBUG.Println(STR, details)
704				c.obound <- &PacketAndToken{p: packet, t: token}
705			default:
706				ERROR.Println(STR, "invalid message type in store (discarded)")
707				c.persist.Del(key)
708			}
709		} else {
710			switch packet.(type) {
711			case *packets.PubrelPacket, *packets.PublishPacket:
712				DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
713				select {
714				case c.ibound <- packet:
715				case <-c.stop:
716				}
717			default:
718				ERROR.Println(STR, "invalid message type in store (discarded)")
719				c.persist.Del(key)
720			}
721		}
722	}
723}
724
725// Unsubscribe will end the subscription from each of the topics provided.
726// Messages published to those topics from other clients will no longer be
727// received.
728func (c *client) Unsubscribe(topics ...string) Token {
729	token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
730	DEBUG.Println(CLI, "enter Unsubscribe")
731	if !c.IsConnected() {
732		token.setError(ErrNotConnected)
733		return token
734	}
735	unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
736	unsub.Topics = make([]string, len(topics))
737	copy(unsub.Topics, topics)
738
739	c.oboundP <- &PacketAndToken{p: unsub, t: token}
740	for _, topic := range topics {
741		c.msgRouter.deleteRoute(topic)
742	}
743
744	DEBUG.Println(CLI, "exit Unsubscribe")
745	return token
746}
747
748// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
749// in use by the client.
750func (c *client) OptionsReader() ClientOptionsReader {
751	r := ClientOptionsReader{options: &c.options}
752	return r
753}
754
755//DefaultConnectionLostHandler is a definition of a function that simply
756//reports to the DEBUG log the reason for the client losing a connection.
757func DefaultConnectionLostHandler(client Client, reason error) {
758	DEBUG.Println("Connection lost:", reason.Error())
759}
760