1 /**
2  *  DeferredGet.h
3  *
4  *  @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
5  *  @copyright 2014 - 2018 Copernica BV
6  */
7 
8 /**
9  *  Include guard
10  */
11 #pragma once
12 
13 /**
14  *  Dependencies
15  */
16 #include "deferredextreceiver.h"
17 
18 /**
19  *  Set up namespace
20  */
21 namespace AMQP {
22 
23 /**
24  *  Class definition
25  *
26  *  This class implements the 'shared_from_this' functionality, because
27  *  it grabs a self-pointer when the callback is running, otherwise the onFinalize()
28  *  is called before the actual message is consumed.
29  */
30 class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredGet>
31 {
32 private:
33     /**
34      *  Callback in case the queue is empty
35      *  @var    EmptyCallback
36      */
37     EmptyCallback _emptyCallback;
38 
39     /**
40      *  Callback with the number of messages still in the queue
41      *  @var    CountCallback
42      */
43     CountCallback _countCallback;
44 
45     /**
46      *  Report success for a get operation
47      *  @param  messagecount    Number of messages left in the queue
48      *  @param  deliveryTag     Delivery tag of the message coming in
49      *  @param  redelivered     Was the message redelivered?
50      */
51     virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) override;
52 
53     /**
54      *  Report success when queue was empty
55      *  @return Deferred
56      */
57     virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
58 
59     /**
60      *  Get reference to self to prevent that object falls out of scope
61      *  @return std::shared_ptr
62      */
lock()63     virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
64 
65     /**
66      *  Extended implementation of the complete method that is called when a message was fully received
67      */
68     virtual void complete() override;
69 
70     /**
71      *  The channel implementation may call our
72      *  private members and construct us
73      */
74     friend class ChannelImpl;
75     friend class ConsumedMessage;
76 
77 public:
78     /**
79      *  Protected constructor that can only be called
80      *  from within the channel implementation
81      *
82      *  Note: this constructor _should_ be protected, but because make_shared
83      *  will then not work, we have decided to make it public after all,
84      *  because the work-around would result in not-so-easy-to-read code.
85      *
86      *  @param  channel     the channel implementation
87      *  @param  failed      are we already failed?
88      */
89     DeferredGet(ChannelImpl *channel, bool failed = false) :
DeferredExtReceiver(failed,channel)90         DeferredExtReceiver(failed, channel) {}
91 
92 public:
93     /**
94      *  Register a function to be called when a message arrives
95      *  This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
96      *  @param  callback
97      */
onSuccess(const MessageCallback & callback)98     DeferredGet &onSuccess(const MessageCallback &callback)
99     {
100         // store the callback
101         _messageCallback = callback;
102 
103         // allow chaining
104         return *this;
105     }
106 
107     /**
108      *  Register a function to be called when an error occurs. This should be defined, otherwise the base methods are used.
109      *  @param  callback
110      */
onError(const ErrorCallback & callback)111     DeferredGet &onError(const ErrorCallback &callback)
112     {
113         // store the callback
114         _errorCallback = callback;
115 
116         // allow chaining
117         return *this;
118     }
119 
120     /**
121      *  Register a function to be called when a message arrives
122      *  This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it
123      *  @param  callback    the callback to execute
124      */
onReceived(const MessageCallback & callback)125     DeferredGet &onReceived(const MessageCallback &callback)
126     {
127         // store callback
128         _messageCallback = callback;
129 
130         // allow chaining
131         return *this;
132     }
133 
134     /**
135      *  Register a function to be called when a message arrives
136      *  This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it
137      *  @param  callback    the callback to execute
138      */
onMessage(const MessageCallback & callback)139     DeferredGet &onMessage(const MessageCallback &callback)
140     {
141         // store callback
142         _messageCallback = callback;
143 
144         // allow chaining
145         return *this;
146     }
147 
148     /**
149      *  Register a function to be called if no message could be fetched
150      *  @param  callback    the callback to execute
151      */
onEmpty(const EmptyCallback & callback)152     DeferredGet &onEmpty(const EmptyCallback &callback)
153     {
154         // store callback
155         _emptyCallback = callback;
156 
157         // allow chaining
158         return *this;
159     }
160 
161     /**
162      *  Register a function to be called when queue size information is known
163      *  @param  callback    the callback to execute
164      */
onCount(const CountCallback & callback)165     DeferredGet &onCount(const CountCallback &callback)
166     {
167         // store callback
168         _countCallback = callback;
169 
170         // allow chaining
171         return *this;
172     }
173 
174     /**
175      *  Register the function to be called when a new message is expected
176      *
177      *  @param  callback    The callback to invoke
178      *  @return Same object for chaining
179      */
onBegin(const StartCallback & callback)180     DeferredGet &onBegin(const StartCallback &callback)
181     {
182         // store callback
183         _startCallback = callback;
184 
185         // allow chaining
186         return *this;
187     }
188 
189     /**
190      *  Register the function to be called when a new message is expected
191      *
192      *  @param  callback    The callback to invoke
193      *  @return Same object for chaining
194      */
onStart(const StartCallback & callback)195     DeferredGet &onStart(const StartCallback &callback)
196     {
197         // store callback
198         _startCallback = callback;
199 
200         // allow chaining
201         return *this;
202     }
203 
204     /**
205      *  Register a function that is called when the message size is known
206      *
207      *  @param  callback    The callback to invoke for message headers
208      *  @return Same object for chaining
209      */
onSize(const SizeCallback & callback)210     DeferredGet &onSize(const SizeCallback &callback)
211     {
212         // store callback
213         _sizeCallback = callback;
214 
215         // allow chaining
216         return *this;
217     }
218 
219     /**
220      *  Register the function to be called when message headers come in
221      *
222      *  @param  callback    The callback to invoke for message headers
223      *  @return Same object for chaining
224      */
onHeaders(const HeaderCallback & callback)225     DeferredGet &onHeaders(const HeaderCallback &callback)
226     {
227         // store callback
228         _headerCallback = callback;
229 
230         // allow chaining
231         return *this;
232     }
233 
234     /**
235      *  Register the function to be called when a chunk of data comes in
236      *
237      *  Note that this function may be called zero, one or multiple times
238      *  for each incoming message depending on the size of the message data.
239      *
240      *  If you install this callback you very likely also want to install
241      *  the onComplete callback so you know when the last data part was
242      *  received.
243      *
244      *  @param  callback    The callback to invoke for chunks of message data
245      *  @return Same object for chaining
246      */
onData(const DataCallback & callback)247     DeferredGet &onData(const DataCallback &callback)
248     {
249         // store callback
250         _dataCallback = callback;
251 
252         // allow chaining
253         return *this;
254     }
255 
256     /**
257      *  Register a funtion to be called when a message was completely received
258      *
259      *  @param  callback    The callback to invoke
260      *  @return Same object for chaining
261      */
onComplete(const DeliveredCallback & callback)262     DeferredGet &onComplete(const DeliveredCallback &callback)
263     {
264         // store callback
265         _deliveredCallback = callback;
266 
267         // allow chaining
268         return *this;
269     }
270 
271     /**
272      *  Register a funtion to be called when a message was completely received
273      *
274      *  @param  callback    The callback to invoke
275      *  @return Same object for chaining
276      */
onDelivered(const DeliveredCallback & callback)277     DeferredGet &onDelivered(const DeliveredCallback &callback)
278     {
279         // store callback
280         _deliveredCallback = callback;
281 
282         // allow chaining
283         return *this;
284     }
285 };
286 
287 /**
288  *  End of namespace
289  */
290 }
291 
292