1/* 2 * Copyright (c) 2013 IBM Corp. 3 * 4 * All rights reserved. This program and the accompanying materials 5 * are made available under the terms of the Eclipse Public License v1.0 6 * which accompanies this distribution, and is available at 7 * http://www.eclipse.org/legal/epl-v10.html 8 * 9 * Contributors: 10 * Seth Hoenig 11 * Allan Stockdill-Mander 12 * Mike Robertson 13 */ 14 15// Portions copyright © 2018 TIBCO Software Inc. 16 17// Package mqtt provides an MQTT v3.1.1 client library. 18package mqtt 19 20import ( 21 "errors" 22 "fmt" 23 "net" 24 "strings" 25 "sync" 26 "sync/atomic" 27 "time" 28 29 "github.com/eclipse/paho.mqtt.golang/packets" 30) 31 32const ( 33 disconnected uint32 = iota 34 connecting 35 reconnecting 36 connected 37) 38 39// Client is the interface definition for a Client as used by this 40// library, the interface is primarily to allow mocking tests. 41// 42// It is an MQTT v3.1.1 client for communicating 43// with an MQTT server using non-blocking methods that allow work 44// to be done in the background. 45// An application may connect to an MQTT server using: 46// A plain TCP socket 47// A secure SSL/TLS socket 48// A websocket 49// To enable ensured message delivery at Quality of Service (QoS) levels 50// described in the MQTT spec, a message persistence mechanism must be 51// used. This is done by providing a type which implements the Store 52// interface. For convenience, FileStore and MemoryStore are provided 53// implementations that should be sufficient for most use cases. More 54// information can be found in their respective documentation. 55// Numerous connection options may be specified by configuring a 56// and then supplying a ClientOptions type. 57type Client interface { 58 // IsConnected returns a bool signifying whether 59 // the client is connected or not. 60 IsConnected() bool 61 // IsConnectionOpen return a bool signifying wether the client has an active 62 // connection to mqtt broker, i.e not in disconnected or reconnect mode 63 IsConnectionOpen() bool 64 // Connect will create a connection to the message broker, by default 65 // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that 66 // fails 67 Connect() Token 68 // Disconnect will end the connection with the server, but not before waiting 69 // the specified number of milliseconds to wait for existing work to be 70 // completed. 71 Disconnect(quiesce uint) 72 // Publish will publish a message with the specified QoS and content 73 // to the specified topic. 74 // Returns a token to track delivery of the message to the broker 75 Publish(topic string, qos byte, retained bool, payload interface{}) Token 76 // Subscribe starts a new subscription. Provide a MessageHandler to be executed when 77 // a message is published on the topic provided, or nil for the default handler 78 Subscribe(topic string, qos byte, callback MessageHandler) Token 79 // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to 80 // be executed when a message is published on one of the topics provided, or nil for the 81 // default handler 82 SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token 83 // Unsubscribe will end the subscription from each of the topics provided. 84 // Messages published to those topics from other clients will no longer be 85 // received. 86 Unsubscribe(topics ...string) Token 87 // AddRoute allows you to add a handler for messages on a specific topic 88 // without making a subscription. For example having a different handler 89 // for parts of a wildcard subscription 90 AddRoute(topic string, callback MessageHandler) 91 // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions 92 // in use by the client. 93 OptionsReader() ClientOptionsReader 94} 95 96// client implements the Client interface 97type client struct { 98 lastSent atomic.Value 99 lastReceived atomic.Value 100 pingOutstanding int32 101 status uint32 102 sync.RWMutex 103 messageIds 104 conn net.Conn 105 ibound chan packets.ControlPacket 106 obound chan *PacketAndToken 107 oboundP chan *PacketAndToken 108 msgRouter *router 109 stopRouter chan bool 110 incomingPubChan chan *packets.PublishPacket 111 errors chan error 112 stop chan struct{} 113 persist Store 114 options ClientOptions 115 workers sync.WaitGroup 116} 117 118// NewClient will create an MQTT v3.1.1 client with all of the options specified 119// in the provided ClientOptions. The client must have the Connect method called 120// on it before it may be used. This is to make sure resources (such as a net 121// connection) are created before the application is actually ready. 122func NewClient(o *ClientOptions) Client { 123 c := &client{} 124 c.options = *o 125 126 if c.options.Store == nil { 127 c.options.Store = NewMemoryStore() 128 } 129 switch c.options.ProtocolVersion { 130 case 3, 4: 131 c.options.protocolVersionExplicit = true 132 case 0x83, 0x84: 133 c.options.protocolVersionExplicit = true 134 default: 135 c.options.ProtocolVersion = 4 136 c.options.protocolVersionExplicit = false 137 } 138 c.persist = c.options.Store 139 c.status = disconnected 140 c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)} 141 c.msgRouter, c.stopRouter = newRouter() 142 c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler) 143 if !c.options.AutoReconnect { 144 c.options.MessageChannelDepth = 0 145 } 146 return c 147} 148 149// AddRoute allows you to add a handler for messages on a specific topic 150// without making a subscription. For example having a different handler 151// for parts of a wildcard subscription 152func (c *client) AddRoute(topic string, callback MessageHandler) { 153 if callback != nil { 154 c.msgRouter.addRoute(topic, callback) 155 } 156} 157 158// IsConnected returns a bool signifying whether 159// the client is connected or not. 160func (c *client) IsConnected() bool { 161 c.RLock() 162 defer c.RUnlock() 163 status := atomic.LoadUint32(&c.status) 164 switch { 165 case status == connected: 166 return true 167 case c.options.AutoReconnect && status > connecting: 168 return true 169 default: 170 return false 171 } 172} 173 174// IsConnectionOpen return a bool signifying whether the client has an active 175// connection to mqtt broker, i.e not in disconnected or reconnect mode 176func (c *client) IsConnectionOpen() bool { 177 c.RLock() 178 defer c.RUnlock() 179 status := atomic.LoadUint32(&c.status) 180 switch { 181 case status == connected: 182 return true 183 default: 184 return false 185 } 186} 187 188func (c *client) connectionStatus() uint32 { 189 c.RLock() 190 defer c.RUnlock() 191 status := atomic.LoadUint32(&c.status) 192 return status 193} 194 195func (c *client) setConnected(status uint32) { 196 c.Lock() 197 defer c.Unlock() 198 atomic.StoreUint32(&c.status, uint32(status)) 199} 200 201//ErrNotConnected is the error returned from function calls that are 202//made when the client is not connected to a broker 203var ErrNotConnected = errors.New("Not Connected") 204 205// Connect will create a connection to the message broker, by default 206// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that 207// fails 208func (c *client) Connect() Token { 209 var err error 210 t := newToken(packets.Connect).(*ConnectToken) 211 DEBUG.Println(CLI, "Connect()") 212 213 c.obound = make(chan *PacketAndToken, c.options.MessageChannelDepth) 214 c.oboundP = make(chan *PacketAndToken, c.options.MessageChannelDepth) 215 c.ibound = make(chan packets.ControlPacket) 216 217 go func() { 218 c.persist.Open() 219 220 c.setConnected(connecting) 221 c.errors = make(chan error, 1) 222 c.stop = make(chan struct{}) 223 224 var rc byte 225 protocolVersion := c.options.ProtocolVersion 226 227 if len(c.options.Servers) == 0 { 228 t.setError(fmt.Errorf("No servers defined to connect to")) 229 return 230 } 231 232 for _, broker := range c.options.Servers { 233 cm := newConnectMsgFromOptions(&c.options, broker) 234 c.options.ProtocolVersion = protocolVersion 235 CONN: 236 DEBUG.Println(CLI, "about to write new connect msg") 237 c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders) 238 if err == nil { 239 DEBUG.Println(CLI, "socket connected to broker") 240 switch c.options.ProtocolVersion { 241 case 3: 242 DEBUG.Println(CLI, "Using MQTT 3.1 protocol") 243 cm.ProtocolName = "MQIsdp" 244 cm.ProtocolVersion = 3 245 case 0x83: 246 DEBUG.Println(CLI, "Using MQTT 3.1b protocol") 247 cm.ProtocolName = "MQIsdp" 248 cm.ProtocolVersion = 0x83 249 case 0x84: 250 DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol") 251 cm.ProtocolName = "MQTT" 252 cm.ProtocolVersion = 0x84 253 default: 254 DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol") 255 c.options.ProtocolVersion = 4 256 cm.ProtocolName = "MQTT" 257 cm.ProtocolVersion = 4 258 } 259 cm.Write(c.conn) 260 261 rc, t.sessionPresent = c.connect() 262 if rc != packets.Accepted { 263 if c.conn != nil { 264 c.conn.Close() 265 c.conn = nil 266 } 267 //if the protocol version was explicitly set don't do any fallback 268 if c.options.protocolVersionExplicit { 269 ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc]) 270 continue 271 } 272 if c.options.ProtocolVersion == 4 { 273 DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol") 274 c.options.ProtocolVersion = 3 275 goto CONN 276 } 277 } 278 break 279 } else { 280 ERROR.Println(CLI, err.Error()) 281 WARN.Println(CLI, "failed to connect to broker, trying next") 282 rc = packets.ErrNetworkError 283 } 284 } 285 286 if c.conn == nil { 287 ERROR.Println(CLI, "Failed to connect to a broker") 288 c.setConnected(disconnected) 289 c.persist.Close() 290 t.returnCode = rc 291 if rc != packets.ErrNetworkError { 292 t.setError(packets.ConnErrors[rc]) 293 } else { 294 t.setError(fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)) 295 } 296 return 297 } 298 299 c.options.protocolVersionExplicit = true 300 301 if c.options.KeepAlive != 0 { 302 atomic.StoreInt32(&c.pingOutstanding, 0) 303 c.lastReceived.Store(time.Now()) 304 c.lastSent.Store(time.Now()) 305 c.workers.Add(1) 306 go keepalive(c) 307 } 308 309 c.incomingPubChan = make(chan *packets.PublishPacket, c.options.MessageChannelDepth) 310 c.msgRouter.matchAndDispatch(c.incomingPubChan, c.options.Order, c) 311 312 c.setConnected(connected) 313 DEBUG.Println(CLI, "client is connected") 314 if c.options.OnConnect != nil { 315 go c.options.OnConnect(c) 316 } 317 318 c.workers.Add(4) 319 go errorWatch(c) 320 go alllogic(c) 321 go outgoing(c) 322 go incoming(c) 323 324 // Take care of any messages in the store 325 if c.options.CleanSession == false { 326 c.resume(c.options.ResumeSubs) 327 } else { 328 c.persist.Reset() 329 } 330 331 DEBUG.Println(CLI, "exit startClient") 332 t.flowComplete() 333 }() 334 return t 335} 336 337// internal function used to reconnect the client when it loses its connection 338func (c *client) reconnect() { 339 DEBUG.Println(CLI, "enter reconnect") 340 var ( 341 err error 342 343 rc = byte(1) 344 sleep = time.Duration(1 * time.Second) 345 ) 346 347 for rc != 0 && atomic.LoadUint32(&c.status) != disconnected { 348 for _, broker := range c.options.Servers { 349 cm := newConnectMsgFromOptions(&c.options, broker) 350 DEBUG.Println(CLI, "about to write new connect msg") 351 c.Lock() 352 c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders) 353 c.Unlock() 354 if err == nil { 355 DEBUG.Println(CLI, "socket connected to broker") 356 switch c.options.ProtocolVersion { 357 case 0x83: 358 DEBUG.Println(CLI, "Using MQTT 3.1b protocol") 359 cm.ProtocolName = "MQIsdp" 360 cm.ProtocolVersion = 0x83 361 case 0x84: 362 DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol") 363 cm.ProtocolName = "MQTT" 364 cm.ProtocolVersion = 0x84 365 case 3: 366 DEBUG.Println(CLI, "Using MQTT 3.1 protocol") 367 cm.ProtocolName = "MQIsdp" 368 cm.ProtocolVersion = 3 369 default: 370 DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol") 371 cm.ProtocolName = "MQTT" 372 cm.ProtocolVersion = 4 373 } 374 cm.Write(c.conn) 375 376 rc, _ = c.connect() 377 if rc != packets.Accepted { 378 c.conn.Close() 379 c.conn = nil 380 //if the protocol version was explicitly set don't do any fallback 381 if c.options.protocolVersionExplicit { 382 ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not Accepted, but rather", packets.ConnackReturnCodes[rc]) 383 continue 384 } 385 } 386 break 387 } else { 388 ERROR.Println(CLI, err.Error()) 389 WARN.Println(CLI, "failed to connect to broker, trying next") 390 rc = packets.ErrNetworkError 391 } 392 } 393 if rc != 0 { 394 DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds") 395 time.Sleep(sleep) 396 if sleep < c.options.MaxReconnectInterval { 397 sleep *= 2 398 } 399 400 if sleep > c.options.MaxReconnectInterval { 401 sleep = c.options.MaxReconnectInterval 402 } 403 } 404 } 405 // Disconnect() must have been called while we were trying to reconnect. 406 if c.connectionStatus() == disconnected { 407 DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect") 408 return 409 } 410 411 c.stop = make(chan struct{}) 412 413 if c.options.KeepAlive != 0 { 414 atomic.StoreInt32(&c.pingOutstanding, 0) 415 c.lastReceived.Store(time.Now()) 416 c.lastSent.Store(time.Now()) 417 c.workers.Add(1) 418 go keepalive(c) 419 } 420 421 c.setConnected(connected) 422 DEBUG.Println(CLI, "client is reconnected") 423 if c.options.OnConnect != nil { 424 go c.options.OnConnect(c) 425 } 426 427 c.workers.Add(4) 428 go errorWatch(c) 429 go alllogic(c) 430 go outgoing(c) 431 go incoming(c) 432 433 c.resume(false) 434} 435 436// This function is only used for receiving a connack 437// when the connection is first started. 438// This prevents receiving incoming data while resume 439// is in progress if clean session is false. 440func (c *client) connect() (byte, bool) { 441 DEBUG.Println(NET, "connect started") 442 443 ca, err := packets.ReadPacket(c.conn) 444 if err != nil { 445 ERROR.Println(NET, "connect got error", err) 446 return packets.ErrNetworkError, false 447 } 448 if ca == nil { 449 ERROR.Println(NET, "received nil packet") 450 return packets.ErrNetworkError, false 451 } 452 453 msg, ok := ca.(*packets.ConnackPacket) 454 if !ok { 455 ERROR.Println(NET, "received msg that was not CONNACK") 456 return packets.ErrNetworkError, false 457 } 458 459 DEBUG.Println(NET, "received connack") 460 return msg.ReturnCode, msg.SessionPresent 461} 462 463// Disconnect will end the connection with the server, but not before waiting 464// the specified number of milliseconds to wait for existing work to be 465// completed. 466func (c *client) Disconnect(quiesce uint) { 467 status := atomic.LoadUint32(&c.status) 468 if status == connected { 469 DEBUG.Println(CLI, "disconnecting") 470 c.setConnected(disconnected) 471 472 dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) 473 dt := newToken(packets.Disconnect) 474 c.oboundP <- &PacketAndToken{p: dm, t: dt} 475 476 // wait for work to finish, or quiesce time consumed 477 dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond) 478 } else { 479 WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)") 480 c.setConnected(disconnected) 481 } 482 483 c.disconnect() 484} 485 486// ForceDisconnect will end the connection with the mqtt broker immediately. 487func (c *client) forceDisconnect() { 488 if !c.IsConnected() { 489 WARN.Println(CLI, "already disconnected") 490 return 491 } 492 c.setConnected(disconnected) 493 c.conn.Close() 494 DEBUG.Println(CLI, "forcefully disconnecting") 495 c.disconnect() 496} 497 498func (c *client) internalConnLost(err error) { 499 // Only do anything if this was called and we are still "connected" 500 // forceDisconnect can cause incoming/outgoing/alllogic to end with 501 // error from closing the socket but state will be "disconnected" 502 if c.IsConnected() { 503 c.closeStop() 504 c.conn.Close() 505 c.workers.Wait() 506 if c.options.CleanSession && !c.options.AutoReconnect { 507 c.messageIds.cleanUp() 508 } 509 if c.options.AutoReconnect { 510 c.setConnected(reconnecting) 511 go c.reconnect() 512 } else { 513 c.setConnected(disconnected) 514 } 515 if c.options.OnConnectionLost != nil { 516 go c.options.OnConnectionLost(c, err) 517 } 518 } 519} 520 521func (c *client) closeStop() { 522 c.Lock() 523 defer c.Unlock() 524 select { 525 case <-c.stop: 526 DEBUG.Println("In disconnect and stop channel is already closed") 527 default: 528 if c.stop != nil { 529 close(c.stop) 530 } 531 } 532} 533 534func (c *client) closeStopRouter() { 535 c.Lock() 536 defer c.Unlock() 537 select { 538 case <-c.stopRouter: 539 DEBUG.Println("In disconnect and stop channel is already closed") 540 default: 541 if c.stopRouter != nil { 542 close(c.stopRouter) 543 } 544 } 545} 546 547func (c *client) closeConn() { 548 c.Lock() 549 defer c.Unlock() 550 if c.conn != nil { 551 c.conn.Close() 552 } 553} 554 555func (c *client) disconnect() { 556 c.closeStop() 557 c.closeConn() 558 c.workers.Wait() 559 c.messageIds.cleanUp() 560 c.closeStopRouter() 561 DEBUG.Println(CLI, "disconnected") 562 c.persist.Close() 563} 564 565// Publish will publish a message with the specified QoS and content 566// to the specified topic. 567// Returns a token to track delivery of the message to the broker 568func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token { 569 token := newToken(packets.Publish).(*PublishToken) 570 DEBUG.Println(CLI, "enter Publish") 571 switch { 572 case !c.IsConnected(): 573 token.setError(ErrNotConnected) 574 return token 575 case c.connectionStatus() == reconnecting && qos == 0: 576 token.flowComplete() 577 return token 578 } 579 pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) 580 pub.Qos = qos 581 pub.TopicName = topic 582 pub.Retain = retained 583 switch payload.(type) { 584 case string: 585 pub.Payload = []byte(payload.(string)) 586 case []byte: 587 pub.Payload = payload.([]byte) 588 default: 589 token.setError(fmt.Errorf("Unknown payload type")) 590 return token 591 } 592 593 if pub.Qos != 0 && pub.MessageID == 0 { 594 pub.MessageID = c.getID(token) 595 token.messageID = pub.MessageID 596 } 597 persistOutbound(c.persist, pub) 598 if c.connectionStatus() == reconnecting { 599 DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic) 600 } else { 601 DEBUG.Println(CLI, "sending publish message, topic:", topic) 602 c.obound <- &PacketAndToken{p: pub, t: token} 603 } 604 return token 605} 606 607// Subscribe starts a new subscription. Provide a MessageHandler to be executed when 608// a message is published on the topic provided. 609func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token { 610 token := newToken(packets.Subscribe).(*SubscribeToken) 611 DEBUG.Println(CLI, "enter Subscribe") 612 if !c.IsConnected() { 613 token.setError(ErrNotConnected) 614 return token 615 } 616 sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket) 617 if err := validateTopicAndQos(topic, qos); err != nil { 618 token.setError(err) 619 return token 620 } 621 sub.Topics = append(sub.Topics, topic) 622 sub.Qoss = append(sub.Qoss, qos) 623 DEBUG.Println(CLI, sub.String()) 624 625 if strings.HasPrefix(topic, "$share") { 626 topic = strings.Join(strings.Split(topic, "/")[2:], "/") 627 } 628 629 if callback != nil { 630 c.msgRouter.addRoute(topic, callback) 631 } 632 633 token.subs = append(token.subs, topic) 634 c.oboundP <- &PacketAndToken{p: sub, t: token} 635 DEBUG.Println(CLI, "exit Subscribe") 636 return token 637} 638 639// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to 640// be executed when a message is published on one of the topics provided. 641func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token { 642 var err error 643 token := newToken(packets.Subscribe).(*SubscribeToken) 644 DEBUG.Println(CLI, "enter SubscribeMultiple") 645 if !c.IsConnected() { 646 token.setError(ErrNotConnected) 647 return token 648 } 649 sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket) 650 if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil { 651 token.setError(err) 652 return token 653 } 654 655 if callback != nil { 656 for topic := range filters { 657 c.msgRouter.addRoute(topic, callback) 658 } 659 } 660 token.subs = make([]string, len(sub.Topics)) 661 copy(token.subs, sub.Topics) 662 c.oboundP <- &PacketAndToken{p: sub, t: token} 663 DEBUG.Println(CLI, "exit SubscribeMultiple") 664 return token 665} 666 667// Load all stored messages and resend them 668// Call this to ensure QOS > 1,2 even after an application crash 669func (c *client) resume(subscription bool) { 670 671 storedKeys := c.persist.All() 672 for _, key := range storedKeys { 673 packet := c.persist.Get(key) 674 if packet == nil { 675 continue 676 } 677 details := packet.Details() 678 if isKeyOutbound(key) { 679 switch packet.(type) { 680 case *packets.SubscribePacket: 681 if subscription { 682 DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID)) 683 token := newToken(packets.Subscribe).(*SubscribeToken) 684 c.oboundP <- &PacketAndToken{p: packet, t: token} 685 } 686 case *packets.UnsubscribePacket: 687 if subscription { 688 DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID)) 689 token := newToken(packets.Unsubscribe).(*UnsubscribeToken) 690 c.oboundP <- &PacketAndToken{p: packet, t: token} 691 } 692 case *packets.PubrelPacket: 693 DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID)) 694 select { 695 case c.oboundP <- &PacketAndToken{p: packet, t: nil}: 696 case <-c.stop: 697 } 698 case *packets.PublishPacket: 699 token := newToken(packets.Publish).(*PublishToken) 700 token.messageID = details.MessageID 701 c.claimID(token, details.MessageID) 702 DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID)) 703 DEBUG.Println(STR, details) 704 c.obound <- &PacketAndToken{p: packet, t: token} 705 default: 706 ERROR.Println(STR, "invalid message type in store (discarded)") 707 c.persist.Del(key) 708 } 709 } else { 710 switch packet.(type) { 711 case *packets.PubrelPacket, *packets.PublishPacket: 712 DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID)) 713 select { 714 case c.ibound <- packet: 715 case <-c.stop: 716 } 717 default: 718 ERROR.Println(STR, "invalid message type in store (discarded)") 719 c.persist.Del(key) 720 } 721 } 722 } 723} 724 725// Unsubscribe will end the subscription from each of the topics provided. 726// Messages published to those topics from other clients will no longer be 727// received. 728func (c *client) Unsubscribe(topics ...string) Token { 729 token := newToken(packets.Unsubscribe).(*UnsubscribeToken) 730 DEBUG.Println(CLI, "enter Unsubscribe") 731 if !c.IsConnected() { 732 token.setError(ErrNotConnected) 733 return token 734 } 735 unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket) 736 unsub.Topics = make([]string, len(topics)) 737 copy(unsub.Topics, topics) 738 739 c.oboundP <- &PacketAndToken{p: unsub, t: token} 740 for _, topic := range topics { 741 c.msgRouter.deleteRoute(topic) 742 } 743 744 DEBUG.Println(CLI, "exit Unsubscribe") 745 return token 746} 747 748// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions 749// in use by the client. 750func (c *client) OptionsReader() ClientOptionsReader { 751 r := ClientOptionsReader{options: &c.options} 752 return r 753} 754 755//DefaultConnectionLostHandler is a definition of a function that simply 756//reports to the DEBUG log the reason for the client losing a connection. 757func DefaultConnectionLostHandler(client Client, reason error) { 758 DEBUG.Println("Connection lost:", reason.Error()) 759} 760