1 /** 2 * DeferredGet.h 3 * 4 * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> 5 * @copyright 2014 - 2018 Copernica BV 6 */ 7 8 /** 9 * Include guard 10 */ 11 #pragma once 12 13 /** 14 * Dependencies 15 */ 16 #include "deferredextreceiver.h" 17 18 /** 19 * Set up namespace 20 */ 21 namespace AMQP { 22 23 /** 24 * Class definition 25 * 26 * This class implements the 'shared_from_this' functionality, because 27 * it grabs a self-pointer when the callback is running, otherwise the onFinalize() 28 * is called before the actual message is consumed. 29 */ 30 class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredGet> 31 { 32 private: 33 /** 34 * Callback in case the queue is empty 35 * @var EmptyCallback 36 */ 37 EmptyCallback _emptyCallback; 38 39 /** 40 * Callback with the number of messages still in the queue 41 * @var CountCallback 42 */ 43 CountCallback _countCallback; 44 45 /** 46 * Report success for a get operation 47 * @param messagecount Number of messages left in the queue 48 * @param deliveryTag Delivery tag of the message coming in 49 * @param redelivered Was the message redelivered? 50 */ 51 virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) override; 52 53 /** 54 * Report success when queue was empty 55 * @return Deferred 56 */ 57 virtual const std::shared_ptr<Deferred> &reportSuccess() const override; 58 59 /** 60 * Get reference to self to prevent that object falls out of scope 61 * @return std::shared_ptr 62 */ lock()63 virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); } 64 65 /** 66 * Extended implementation of the complete method that is called when a message was fully received 67 */ 68 virtual void complete() override; 69 70 /** 71 * The channel implementation may call our 72 * private members and construct us 73 */ 74 friend class ChannelImpl; 75 friend class ConsumedMessage; 76 77 public: 78 /** 79 * Protected constructor that can only be called 80 * from within the channel implementation 81 * 82 * Note: this constructor _should_ be protected, but because make_shared 83 * will then not work, we have decided to make it public after all, 84 * because the work-around would result in not-so-easy-to-read code. 85 * 86 * @param channel the channel implementation 87 * @param failed are we already failed? 88 */ 89 DeferredGet(ChannelImpl *channel, bool failed = false) : DeferredExtReceiver(failed,channel)90 DeferredExtReceiver(failed, channel) {} 91 92 public: 93 /** 94 * Register a function to be called when a message arrives 95 * This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it 96 * @param callback 97 */ onSuccess(const MessageCallback & callback)98 DeferredGet &onSuccess(const MessageCallback &callback) 99 { 100 // store the callback 101 _messageCallback = callback; 102 103 // allow chaining 104 return *this; 105 } 106 107 /** 108 * Register a function to be called when an error occurs. This should be defined, otherwise the base methods are used. 109 * @param callback 110 */ onError(const ErrorCallback & callback)111 DeferredGet &onError(const ErrorCallback &callback) 112 { 113 // store the callback 114 _errorCallback = callback; 115 116 // allow chaining 117 return *this; 118 } 119 120 /** 121 * Register a function to be called when a message arrives 122 * This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it 123 * @param callback the callback to execute 124 */ onReceived(const MessageCallback & callback)125 DeferredGet &onReceived(const MessageCallback &callback) 126 { 127 // store callback 128 _messageCallback = callback; 129 130 // allow chaining 131 return *this; 132 } 133 134 /** 135 * Register a function to be called when a message arrives 136 * This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it 137 * @param callback the callback to execute 138 */ onMessage(const MessageCallback & callback)139 DeferredGet &onMessage(const MessageCallback &callback) 140 { 141 // store callback 142 _messageCallback = callback; 143 144 // allow chaining 145 return *this; 146 } 147 148 /** 149 * Register a function to be called if no message could be fetched 150 * @param callback the callback to execute 151 */ onEmpty(const EmptyCallback & callback)152 DeferredGet &onEmpty(const EmptyCallback &callback) 153 { 154 // store callback 155 _emptyCallback = callback; 156 157 // allow chaining 158 return *this; 159 } 160 161 /** 162 * Register a function to be called when queue size information is known 163 * @param callback the callback to execute 164 */ onCount(const CountCallback & callback)165 DeferredGet &onCount(const CountCallback &callback) 166 { 167 // store callback 168 _countCallback = callback; 169 170 // allow chaining 171 return *this; 172 } 173 174 /** 175 * Register the function to be called when a new message is expected 176 * 177 * @param callback The callback to invoke 178 * @return Same object for chaining 179 */ onBegin(const StartCallback & callback)180 DeferredGet &onBegin(const StartCallback &callback) 181 { 182 // store callback 183 _startCallback = callback; 184 185 // allow chaining 186 return *this; 187 } 188 189 /** 190 * Register the function to be called when a new message is expected 191 * 192 * @param callback The callback to invoke 193 * @return Same object for chaining 194 */ onStart(const StartCallback & callback)195 DeferredGet &onStart(const StartCallback &callback) 196 { 197 // store callback 198 _startCallback = callback; 199 200 // allow chaining 201 return *this; 202 } 203 204 /** 205 * Register a function that is called when the message size is known 206 * 207 * @param callback The callback to invoke for message headers 208 * @return Same object for chaining 209 */ onSize(const SizeCallback & callback)210 DeferredGet &onSize(const SizeCallback &callback) 211 { 212 // store callback 213 _sizeCallback = callback; 214 215 // allow chaining 216 return *this; 217 } 218 219 /** 220 * Register the function to be called when message headers come in 221 * 222 * @param callback The callback to invoke for message headers 223 * @return Same object for chaining 224 */ onHeaders(const HeaderCallback & callback)225 DeferredGet &onHeaders(const HeaderCallback &callback) 226 { 227 // store callback 228 _headerCallback = callback; 229 230 // allow chaining 231 return *this; 232 } 233 234 /** 235 * Register the function to be called when a chunk of data comes in 236 * 237 * Note that this function may be called zero, one or multiple times 238 * for each incoming message depending on the size of the message data. 239 * 240 * If you install this callback you very likely also want to install 241 * the onComplete callback so you know when the last data part was 242 * received. 243 * 244 * @param callback The callback to invoke for chunks of message data 245 * @return Same object for chaining 246 */ onData(const DataCallback & callback)247 DeferredGet &onData(const DataCallback &callback) 248 { 249 // store callback 250 _dataCallback = callback; 251 252 // allow chaining 253 return *this; 254 } 255 256 /** 257 * Register a funtion to be called when a message was completely received 258 * 259 * @param callback The callback to invoke 260 * @return Same object for chaining 261 */ onComplete(const DeliveredCallback & callback)262 DeferredGet &onComplete(const DeliveredCallback &callback) 263 { 264 // store callback 265 _deliveredCallback = callback; 266 267 // allow chaining 268 return *this; 269 } 270 271 /** 272 * Register a funtion to be called when a message was completely received 273 * 274 * @param callback The callback to invoke 275 * @return Same object for chaining 276 */ onDelivered(const DeliveredCallback & callback)277 DeferredGet &onDelivered(const DeliveredCallback &callback) 278 { 279 // store callback 280 _deliveredCallback = callback; 281 282 // allow chaining 283 return *this; 284 } 285 }; 286 287 /** 288 * End of namespace 289 */ 290 } 291 292