1/* 2 * Copyright (c) 2014 IBM Corp. 3 * 4 * All rights reserved. This program and the accompanying materials 5 * are made available under the terms of the Eclipse Public License v1.0 6 * which accompanies this distribution, and is available at 7 * http://www.eclipse.org/legal/epl-v10.html 8 * 9 * Contributors: 10 * Allan Stockdill-Mander 11 */ 12 13package mqtt 14 15import ( 16 "sync" 17 "time" 18 19 "github.com/eclipse/paho.mqtt.golang/packets" 20) 21 22//PacketAndToken is a struct that contains both a ControlPacket and a 23//Token. This struct is passed via channels between the client interface 24//code and the underlying code responsible for sending and receiving 25//MQTT messages. 26type PacketAndToken struct { 27 p packets.ControlPacket 28 t Token 29} 30 31//Token defines the interface for the tokens used to indicate when 32//actions have completed. 33type Token interface { 34 Wait() bool 35 WaitTimeout(time.Duration) bool 36 flowComplete() 37 Error() error 38} 39 40type baseToken struct { 41 m sync.RWMutex 42 complete chan struct{} 43 ready bool 44 err error 45} 46 47// Wait will wait indefinitely for the Token to complete, ie the Publish 48// to be sent and confirmed receipt from the broker 49func (b *baseToken) Wait() bool { 50 b.m.Lock() 51 defer b.m.Unlock() 52 if !b.ready { 53 <-b.complete 54 b.ready = true 55 } 56 return b.ready 57} 58 59// WaitTimeout takes a time in ms to wait for the flow associated with the 60// Token to complete, returns true if it returned before the timeout or 61// returns false if the timeout occurred. In the case of a timeout the Token 62// does not have an error set in case the caller wishes to wait again 63func (b *baseToken) WaitTimeout(d time.Duration) bool { 64 b.m.Lock() 65 defer b.m.Unlock() 66 if !b.ready { 67 select { 68 case <-b.complete: 69 b.ready = true 70 case <-time.After(d): 71 } 72 } 73 return b.ready 74} 75 76func (b *baseToken) flowComplete() { 77 close(b.complete) 78} 79 80func (b *baseToken) Error() error { 81 b.m.RLock() 82 defer b.m.RUnlock() 83 return b.err 84} 85 86func newToken(tType byte) Token { 87 switch tType { 88 case packets.Connect: 89 return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}} 90 case packets.Subscribe: 91 return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)} 92 case packets.Publish: 93 return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}} 94 case packets.Unsubscribe: 95 return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}} 96 case packets.Disconnect: 97 return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}} 98 } 99 return nil 100} 101 102//ConnectToken is an extension of Token containing the extra fields 103//required to provide information about calls to Connect() 104type ConnectToken struct { 105 baseToken 106 returnCode byte 107} 108 109//ReturnCode returns the acknowlegement code in the connack sent 110//in response to a Connect() 111func (c *ConnectToken) ReturnCode() byte { 112 c.m.RLock() 113 defer c.m.RUnlock() 114 return c.returnCode 115} 116 117//PublishToken is an extension of Token containing the extra fields 118//required to provide information about calls to Publish() 119type PublishToken struct { 120 baseToken 121 messageID uint16 122} 123 124//MessageID returns the MQTT message ID that was assigned to the 125//Publish packet when it was sent to the broker 126func (p *PublishToken) MessageID() uint16 { 127 return p.messageID 128} 129 130//SubscribeToken is an extension of Token containing the extra fields 131//required to provide information about calls to Subscribe() 132type SubscribeToken struct { 133 baseToken 134 subs []string 135 subResult map[string]byte 136} 137 138//Result returns a map of topics that were subscribed to along with 139//the matching return code from the broker. This is either the Qos 140//value of the subscription or an error code. 141func (s *SubscribeToken) Result() map[string]byte { 142 s.m.RLock() 143 defer s.m.RUnlock() 144 return s.subResult 145} 146 147//UnsubscribeToken is an extension of Token containing the extra fields 148//required to provide information about calls to Unsubscribe() 149type UnsubscribeToken struct { 150 baseToken 151} 152 153//DisconnectToken is an extension of Token containing the extra fields 154//required to provide information about calls to Disconnect() 155type DisconnectToken struct { 156 baseToken 157} 158