1/*
2 * Copyright (c) 2014 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 *    Allan Stockdill-Mander
11 */
12
13package mqtt
14
15import (
16	"sync"
17	"time"
18
19	"github.com/eclipse/paho.mqtt.golang/packets"
20)
21
22//PacketAndToken is a struct that contains both a ControlPacket and a
23//Token. This struct is passed via channels between the client interface
24//code and the underlying code responsible for sending and receiving
25//MQTT messages.
26type PacketAndToken struct {
27	p packets.ControlPacket
28	t Token
29}
30
31//Token defines the interface for the tokens used to indicate when
32//actions have completed.
33type Token interface {
34	Wait() bool
35	WaitTimeout(time.Duration) bool
36	flowComplete()
37	Error() error
38}
39
40type baseToken struct {
41	m        sync.RWMutex
42	complete chan struct{}
43	ready    bool
44	err      error
45}
46
47// Wait will wait indefinitely for the Token to complete, ie the Publish
48// to be sent and confirmed receipt from the broker
49func (b *baseToken) Wait() bool {
50	b.m.Lock()
51	defer b.m.Unlock()
52	if !b.ready {
53		<-b.complete
54		b.ready = true
55	}
56	return b.ready
57}
58
59// WaitTimeout takes a time in ms to wait for the flow associated with the
60// Token to complete, returns true if it returned before the timeout or
61// returns false if the timeout occurred. In the case of a timeout the Token
62// does not have an error set in case the caller wishes to wait again
63func (b *baseToken) WaitTimeout(d time.Duration) bool {
64	b.m.Lock()
65	defer b.m.Unlock()
66	if !b.ready {
67		select {
68		case <-b.complete:
69			b.ready = true
70		case <-time.After(d):
71		}
72	}
73	return b.ready
74}
75
76func (b *baseToken) flowComplete() {
77	close(b.complete)
78}
79
80func (b *baseToken) Error() error {
81	b.m.RLock()
82	defer b.m.RUnlock()
83	return b.err
84}
85
86func newToken(tType byte) Token {
87	switch tType {
88	case packets.Connect:
89		return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
90	case packets.Subscribe:
91		return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
92	case packets.Publish:
93		return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
94	case packets.Unsubscribe:
95		return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
96	case packets.Disconnect:
97		return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
98	}
99	return nil
100}
101
102//ConnectToken is an extension of Token containing the extra fields
103//required to provide information about calls to Connect()
104type ConnectToken struct {
105	baseToken
106	returnCode byte
107}
108
109//ReturnCode returns the acknowlegement code in the connack sent
110//in response to a Connect()
111func (c *ConnectToken) ReturnCode() byte {
112	c.m.RLock()
113	defer c.m.RUnlock()
114	return c.returnCode
115}
116
117//PublishToken is an extension of Token containing the extra fields
118//required to provide information about calls to Publish()
119type PublishToken struct {
120	baseToken
121	messageID uint16
122}
123
124//MessageID returns the MQTT message ID that was assigned to the
125//Publish packet when it was sent to the broker
126func (p *PublishToken) MessageID() uint16 {
127	return p.messageID
128}
129
130//SubscribeToken is an extension of Token containing the extra fields
131//required to provide information about calls to Subscribe()
132type SubscribeToken struct {
133	baseToken
134	subs      []string
135	subResult map[string]byte
136}
137
138//Result returns a map of topics that were subscribed to along with
139//the matching return code from the broker. This is either the Qos
140//value of the subscription or an error code.
141func (s *SubscribeToken) Result() map[string]byte {
142	s.m.RLock()
143	defer s.m.RUnlock()
144	return s.subResult
145}
146
147//UnsubscribeToken is an extension of Token containing the extra fields
148//required to provide information about calls to Unsubscribe()
149type UnsubscribeToken struct {
150	baseToken
151}
152
153//DisconnectToken is an extension of Token containing the extra fields
154//required to provide information about calls to Disconnect()
155type DisconnectToken struct {
156	baseToken
157}
158