1 /*
2  *                Copyright Lingxi Li 2015.
3  *             Copyright Andrey Semashev 2016.
4  * Distributed under the Boost Software License, Version 1.0.
5  *    (See accompanying file LICENSE_1_0.txt or copy at
6  *          http://www.boost.org/LICENSE_1_0.txt)
7  */
8 /*!
9  * \file   utility/ipc/reliable_message_queue.hpp
10  * \author Lingxi Li
11  * \author Andrey Semashev
12  * \date   01.01.2016
13  *
14  * The header contains declaration of a reliable interprocess message queue.
15  */
16 
17 #ifndef BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_
18 #define BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_
19 
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <boost/cstdint.hpp>
23 #include <boost/move/core.hpp>
24 #include <boost/log/keywords/open_mode.hpp>
25 #include <boost/log/keywords/name.hpp>
26 #include <boost/log/keywords/capacity.hpp>
27 #include <boost/log/keywords/block_size.hpp>
28 #include <boost/log/keywords/overflow_policy.hpp>
29 #include <boost/log/keywords/permissions.hpp>
30 #include <boost/log/utility/open_mode.hpp>
31 #include <boost/log/utility/permissions.hpp>
32 #include <boost/log/utility/ipc/object_name.hpp>
33 #include <boost/log/detail/parameter_tools.hpp>
34 #include <boost/log/detail/header.hpp>
35 
36 #ifdef BOOST_HAS_PRAGMA_ONCE
37 #pragma once
38 #endif
39 
40 namespace boost {
41 
42 BOOST_LOG_OPEN_NAMESPACE
43 
44 namespace ipc {
45 
46 namespace aux {
47 
48 template< typename T, typename R >
49 struct enable_if_byte {};
50 template< typename R >
51 struct enable_if_byte< char, R > { typedef R type; };
52 template< typename R >
53 struct enable_if_byte< signed char, R > { typedef R type; };
54 template< typename R >
55 struct enable_if_byte< unsigned char, R > { typedef R type; };
56 
57 } // namespace aux
58 
59 /*!
60  * \brief A reliable interprocess message queue
61  *
62  * The queue implements a reliable one-way channel of passing messages from one or multiple writers to a single reader.
63  * The format of the messages is user-defined and must be consistent across all writers and the reader. The queue does
64  * not enforce any specific format of the messages, other than they should be supplied as a contiguous array of bytes.
65  *
66  * The queue internally uses a process-shared storage identified by an \c object_name (the queue name). Refer to \c object_name
67  * documentation for details on restrictions imposed on object names.
68  *
69  * The queue storage is organized as a fixed number of blocks of a fixed size. The block size must be an integer power of 2 and
70  * is expressed in bytes. Each written message, together with some metadata added by the queue, consumes an integer number
71  * of blocks. Each read message received by the reader releases the blocks allocated for that message. As such the maximum size
72  * of a message is slightly less than block size times capacity of the queue. For efficiency, it is recommended to choose
73  * block size large enough to accommodate most of the messages to be passed through the queue.
74  *
75  * The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point
76  * of enqueueing a message when there is not enough free blocks to accommodate the message.
77  *
78  * The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the
79  * case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is
80  * full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at
81  * the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies.
82  *
83  * If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer.
84  *
85  * A blocked reader or writer can be unblocked by calling \c stop_local. After this method is called, all threads blocked on
86  * this particular object are released and return \c operation_result::aborted. The other instances of the queue (in the current
87  * or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the \c stop_local
88  * call the user has to invoke \c reset_local.
89  *
90  * The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a
91  * particular writer thread will be received in the order of sending.
92  *
93  * Methods of this class are not thread-safe, unless otherwise specified.
94  */
95 class reliable_message_queue
96 {
97 public:
98     //! Result codes for various operations on the queue
99     enum operation_result
100     {
101         succeeded,    //!< The operation has completed successfully
102         no_space,     //!< The message could not be sent because the queue is full
103         aborted       //!< The operation has been aborted because the queue method <tt>stop_local()</tt> has been called
104     };
105 
106     //! Interprocess queue overflow policies
107     enum overflow_policy
108     {
109         //! Block the send operation when the queue is full
110         block_on_overflow,
111         //! Return \c operation_result::no_space when the queue is full
112         fail_on_overflow,
113         //! Throw \c capacity_limit_reached exception when the queue is full
114         throw_on_overflow
115     };
116 
117     //! Queue message size type
118     typedef uint32_t size_type;
119 
120 #if !defined(BOOST_LOG_DOXYGEN_PASS)
121 
122     BOOST_MOVABLE_BUT_NOT_COPYABLE(reliable_message_queue)
123 
124 private:
125     typedef void (*receive_handler)(void* state, const void* data, size_type size);
126 
127     struct fixed_buffer_state
128     {
129         uint8_t* data;
130         size_type size;
131     };
132 
133     struct implementation;
134     implementation* m_impl;
135 
136 #endif // !defined(BOOST_LOG_DOXYGEN_PASS)
137 
138 public:
139     /*!
140      * Default constructor. The method constructs an object that is not associated with any
141      * message queue.
142      *
143      * \post <tt>is_open() == false</tt>
144      */
reliable_message_queue()145     BOOST_CONSTEXPR reliable_message_queue() BOOST_NOEXCEPT : m_impl(NULL)
146     {
147     }
148 
149     /*!
150      * Constructor. The method is used to construct an object and create the associated
151      * message queue. The constructed object will be in running state if the message queue is
152      * successfully created.
153      *
154      * \post <tt>is_open() == true</tt>
155      *
156      * \param name Name of the message queue to be associated with.
157      * \param capacity Maximum number of allocation blocks the queue can hold.
158      * \param block_size Size in bytes of allocation block. Must be a power of 2.
159      * \param oflow_policy Queue behavior policy in case of overflow.
160      * \param perms Access permissions for the associated message queue.
161      */
reliable_message_queue(open_mode::create_only_tag,object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())162     reliable_message_queue
163     (
164         open_mode::create_only_tag,
165         object_name const& name,
166         uint32_t capacity,
167         size_type block_size,
168         overflow_policy oflow_policy = block_on_overflow,
169         permissions const& perms = permissions()
170     ) :
171         m_impl(NULL)
172     {
173         this->create(name, capacity, block_size, oflow_policy, perms);
174     }
175 
176     /*!
177      * Constructor. The method is used to construct an object and create or open the associated
178      * message queue. The constructed object will be in running state if the message queue is
179      * successfully created or opened. If the message queue that is identified by the name already
180      * exists then the other queue parameters are ignored. The actual queue parameters can be obtained
181      * with accessors from the constructed object.
182      *
183      * \post <tt>is_open() == true</tt>
184      *
185      * \param name Name of the message queue to be associated with.
186      * \param capacity Maximum number of allocation blocks the queue can hold.
187      * \param block_size Size in bytes of allocation block. Must be a power of 2.
188      * \param oflow_policy Queue behavior policy in case of overflow.
189      * \param perms Access permissions for the associated message queue.
190      */
reliable_message_queue(open_mode::open_or_create_tag,object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())191     reliable_message_queue
192     (
193         open_mode::open_or_create_tag,
194         object_name const& name,
195         uint32_t capacity,
196         size_type block_size,
197         overflow_policy oflow_policy = block_on_overflow,
198         permissions const& perms = permissions()
199     ) :
200         m_impl(NULL)
201     {
202         this->open_or_create(name, capacity, block_size, oflow_policy, perms);
203     }
204 
205     /*!
206      * Constructor. The method is used to construct an object and open the existing
207      * message queue. The constructed object will be in running state if the message queue is
208      * successfully opened.
209      *
210      * \post <tt>is_open() == true</tt>
211      *
212      * \param name Name of the message queue to be associated with.
213      * \param oflow_policy Queue behavior policy in case of overflow.
214      * \param perms Access permissions for the associated message queue. The permissions will only be used
215      *              if the queue implementation has to create system objects while operating.
216      *              This parameter is currently not used on POSIX systems.
217      */
reliable_message_queue(open_mode::open_only_tag,object_name const & name,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())218     reliable_message_queue
219     (
220         open_mode::open_only_tag,
221         object_name const& name,
222         overflow_policy oflow_policy = block_on_overflow,
223         permissions const& perms = permissions()
224     ) :
225         m_impl(NULL)
226     {
227         this->open(name, oflow_policy, perms);
228     }
229 
230     /*!
231      * Constructor with named parameters. The method is used to construct an object and create or open
232      * the associated message queue. The constructed object will be in running state if the message queue is
233      * successfully created.
234      *
235      * The following named parameters are accepted:
236      *
237      * * open_mode - One of the open mode tags: \c open_mode::create_only, \c open_mode::open_only or
238      *               \c open_mode::open_or_create.
239      * * name - Name of the message queue to be associated with.
240      * * capacity - Maximum number of allocation blocks the queue can hold. Used only if the queue is created.
241      * * block_size - Size in bytes of allocation block. Must be a power of 2. Used only if the queue is created.
242      * * overflow_policy - Queue behavior policy in case of overflow, see \c overflow_policy.
243      * * permissions - Access permissions for the associated message queue.
244      *
245      * \post <tt>is_open() == true</tt>
246      */
247 #if !defined(BOOST_LOG_DOXYGEN_PASS)
BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_CALL(reliable_message_queue,construct)248     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_CALL(reliable_message_queue, construct)
249 #else
250     template< typename... Args >
251     explicit reliable_message_queue(Args const&... args);
252 #endif
253 
254     /*!
255      * Destructor. Calls <tt>close()</tt>.
256      */
257     ~reliable_message_queue() BOOST_NOEXCEPT
258     {
259         this->close();
260     }
261 
262     /*!
263      * Move constructor. The method move-constructs an object from \c other. After
264      * the call, the constructed object becomes \c other, while \c other is left in
265      * default constructed state.
266      *
267      * \param that The object to be moved.
268      */
reliable_message_queue(BOOST_RV_REF (reliable_message_queue)that)269     reliable_message_queue(BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT :
270         m_impl(that.m_impl)
271     {
272         that.m_impl = NULL;
273     }
274 
275     /*!
276      * Move assignment operator. If the object is associated with a message queue,
277      * <tt>close()</tt> is first called and the precondition to calling <tt>close()</tt>
278      * applies. After the call, the object becomes \a that while \a that is left
279      * in default constructed state.
280      *
281      * \param that The object to be moved.
282      *
283      * \return A reference to the assigned object.
284      */
operator =(BOOST_RV_REF (reliable_message_queue)that)285     reliable_message_queue& operator= (BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT
286     {
287         reliable_message_queue other(static_cast< BOOST_RV_REF(reliable_message_queue) >(that));
288         this->swap(other);
289         return *this;
290     }
291 
292     /*!
293      * The method swaps the object with \a that.
294      *
295      * \param that The other object to swap with.
296      */
swap(reliable_message_queue & that)297     void swap(reliable_message_queue& that) BOOST_NOEXCEPT
298     {
299         implementation* p = m_impl;
300         m_impl = that.m_impl;
301         that.m_impl = p;
302     }
303 
304     //! Swaps the two \c reliable_message_queue objects.
swap(reliable_message_queue & a,reliable_message_queue & b)305     friend void swap(reliable_message_queue& a, reliable_message_queue& b) BOOST_NOEXCEPT
306     {
307         a.swap(b);
308     }
309 
310     /*!
311      * The method creates the message queue to be associated with the object. After the call,
312      * the object will be in running state if a message queue is successfully created.
313      *
314      * \pre <tt>is_open() == false</tt>
315      * \post <tt>is_open() == true</tt>
316      *
317      * \param name Name of the message queue to be associated with.
318      * \param capacity Maximum number of allocation blocks the queue can hold.
319      * \param block_size Size in bytes of allocation block. Must be a power of 2.
320      * \param oflow_policy Queue behavior policy in case of overflow.
321      * \param perms Access permissions for the associated message queue.
322      */
323     BOOST_LOG_API void create
324     (
325         object_name const& name,
326         uint32_t capacity,
327         size_type block_size,
328         overflow_policy oflow_policy = block_on_overflow,
329         permissions const& perms = permissions()
330     );
331 
332     /*!
333      * The method creates or opens the message queue to be associated with the object.
334      * After the call, the object will be in running state if a message queue is successfully
335      * created or opened. If the message queue that is identified by the name already exists then
336      * the other queue parameters are ignored. The actual queue parameters can be obtained
337      * with accessors from this object after this method returns.
338      *
339      * \pre <tt>is_open() == false</tt>
340      * \post <tt>is_open() == true</tt>
341      *
342      * \param name Name of the message queue to be associated with.
343      * \param capacity Maximum number of allocation blocks the queue can hold.
344      * \param block_size Size in bytes of allocation block. Must be a power of 2.
345      * \param oflow_policy Queue behavior policy in case of overflow.
346      * \param perms Access permissions for the associated message queue.
347      */
348     BOOST_LOG_API void open_or_create
349     (
350         object_name const& name,
351         uint32_t capacity,
352         size_type block_size,
353         overflow_policy oflow_policy = block_on_overflow,
354         permissions const& perms = permissions()
355     );
356 
357     /*!
358      * The method opens the existing message queue to be associated with the object.
359      * After the call, the object will be in running state if a message queue is successfully
360      * opened.
361      *
362      * \pre <tt>is_open() == false</tt>
363      * \post <tt>is_open() == true</tt>
364      *
365      * \param name Name of the message queue to be associated with.
366      * \param oflow_policy Queue behavior policy in case of overflow.
367      * \param perms Access permissions for the associated message queue. The permissions will only be used
368      *              if the queue implementation has to create system objects while operating.
369      *              This parameter is currently not used on POSIX systems.
370      */
371     BOOST_LOG_API void open
372     (
373         object_name const& name,
374         overflow_policy oflow_policy = block_on_overflow,
375         permissions const& perms = permissions()
376     );
377 
378     /*!
379      * Tests whether the object is associated with any message queue.
380      *
381      * \return \c true if the object is associated with a message queue, and \c false otherwise.
382      */
is_open() const383     bool is_open() const BOOST_NOEXCEPT
384     {
385         return m_impl != NULL;
386     }
387 
388     /*!
389      * This method empties the associated message queue. Concurrent calls to this method, <tt>send()</tt>,
390      * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, and <tt>stop_local()</tt> are allowed.
391      *
392      * \pre <tt>is_open() == true</tt>
393      */
394     BOOST_LOG_API void clear();
395 
396     /*!
397      * The method returns the name of the associated message queue.
398      *
399      * \pre <tt>is_open() == true</tt>
400      *
401      * \return Name of the associated message queue
402      */
403     BOOST_LOG_API object_name const& name() const;
404 
405     /*!
406      * The method returns the maximum number of allocation blocks the associated message queue
407      * can hold. Note that the returned value may be different from the corresponding
408      * value passed to the constructor or <tt>open_or_create()</tt>, for the message queue may
409      * not have been created by this object.
410      *
411      * \pre <tt>is_open() == true</tt>
412      *
413      * \return Maximum number of allocation blocks the associated message queue can hold.
414      */
415     BOOST_LOG_API uint32_t capacity() const;
416 
417     /*!
418      * The method returns the allocation block size, in bytes. Each message in the
419      * associated message queue consumes an integer number of allocation blocks.
420      * Note that the returned value may be different from the corresponding value passed
421      * to the constructor or <tt>open_or_create()</tt>, for the message queue may not
422      * have been created by this object.
423      *
424      * \pre <tt>is_open() == true</tt>
425      *
426      * \return Allocation block size, in bytes.
427      */
428     BOOST_LOG_API size_type block_size() const;
429 
430     /*!
431      * The method wakes up all threads that are blocked in calls to <tt>send()</tt> or
432      * <tt>receive()</tt>. Those calls would then return <tt>operation_result::aborted</tt>.
433      * Note that, the method does not block until the woken-up threads have actually
434      * returned from <tt>send()</tt> or <tt>receive()</tt>. Other means is needed to ensure
435      * that calls to <tt>send()</tt> or <tt>receive()</tt> have returned, e.g., joining the
436      * threads that might be blocking on the calls.
437      *
438      * The method also puts the object in stopped state. When in stopped state, calls to
439      * <tt>send()</tt> or <tt>receive()</tt> will return immediately with return value
440      * <tt>operation_result::aborted</tt> when they would otherwise block in running state.
441      *
442      * Concurrent calls to this method, <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
443      * <tt>try_receive()</tt>, and <tt>clear()</tt> are allowed.
444      *
445      * \pre <tt>is_open() == true</tt>
446      */
447     BOOST_LOG_API void stop_local();
448 
449     /*!
450      * The method puts the object in running state where calls to <tt>send()</tt> or
451      * <tt>receive()</tt> may block. This method is not thread-safe.
452      *
453      * \pre <tt>is_open() == true</tt>
454      */
455     BOOST_LOG_API void reset_local();
456 
457     /*!
458      * The method disassociates the associated message queue, if any. No other threads
459      * should be using this object before calling this method. The <tt>stop_local()</tt> method
460      * can be used to have any threads currently blocked in <tt>send()</tt> or
461      * <tt>receive()</tt> return, and prevent further calls to them from blocking. Typically,
462      * before calling this method, one would first call <tt>stop_local()</tt> and then join all
463      * threads that might be blocking on <tt>send()</tt> or <tt>receive()</tt> to ensure that
464      * they have returned from the calls. The associated message queue is destroyed if the
465      * object represents the last outstanding reference to it.
466      *
467      * \post <tt>is_open() == false</tt>
468      */
close()469     void close() BOOST_NOEXCEPT
470     {
471         if (is_open())
472             do_close();
473     }
474 
475     /*!
476      * The method sends a message to the associated message queue. When the object is in
477      * running state and the queue has no free space for the message, the method either blocks
478      * or throws an exception, depending on the overflow policy that was specified on the queue
479      * opening/creation. If blocking policy is in effect, the blocking can be interrupted by
480      * calling <tt>stop_local()</tt>, in which case the method returns \c operation_result::aborted.
481      * When the object is already in the stopped state, the method does not block but returns
482      * immediately with return value \c operation_result::aborted.
483      *
484      * It is possible to send an empty message by passing \c 0 to the parameter \c message_size.
485      *
486      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>,
487      * <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
488      *
489      * \pre <tt>is_open() == true</tt>
490      *
491      * \param message_data The message data to send. Ignored when \c message_size is \c 0.
492      * \param message_size Size of the message data in bytes. If the size is larger than
493      *                     the associated message queue capacity, an <tt>std::logic_error</tt> exception is thrown.
494      *
495      * \retval operation_result::succeeded if the operation is successful
496      * \retval operation_result::no_space if \c overflow_policy::fail_on_overflow is in effect and the queue is full
497      * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
498      *
499      * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue
500      *                capacity, <tt>system_error</tt> in case if a native OS method fails.
501      */
502     BOOST_LOG_API operation_result send(void const* message_data, size_type message_size);
503 
504     /*!
505      * The method performs an attempt to send a message to the associated message queue.
506      * The method is non-blocking, and always returns immediately.
507      * <tt>boost::system::system_error</tt> is thrown for errors resulting from native
508      * operating system calls. Note that it is possible to send an empty message by passing
509      * \c 0 to the parameter \c message_size. Concurrent calls to <tt>send()</tt>,
510      * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, <tt>stop_local()</tt>,
511      * and <tt>clear()</tt> are allowed.
512      *
513      * \pre <tt>is_open() == true</tt>
514      *
515      * \param message_data The message data to send. Ignored when \c message_size is \c 0.
516      * \param message_size Size of the message data in bytes. If the size is larger than the
517      *                     maximum size allowed by the associated message queue, an
518      *                     <tt>std::logic_error</tt> exception is thrown.
519      *
520      * \return \c true if the message is successfully sent, and \c false otherwise (e.g.,
521      *         when the queue is full).
522      *
523      * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue
524      *                capacity, <tt>system_error</tt> in case if a native OS method fails.
525      */
526     BOOST_LOG_API bool try_send(void const* message_data, size_type message_size);
527 
528     /*!
529      * The method takes a message from the associated message queue. When the object is in
530      * running state and the queue is empty, the method blocks. The blocking is interrupted
531      * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
532      * When the object is already in the stopped state and the queue is empty, the method
533      * does not block but returns immediately with return value \c operation_result::aborted.
534      *
535      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
536      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
537      *
538      * \pre <tt>is_open() == true</tt>
539      *
540      * \param buffer The memory buffer to store the received message in.
541      * \param buffer_size The size of the buffer, in bytes.
542      * \param message_size Receives the size of the received message, in bytes.
543      *
544      * \retval operation_result::succeeded if the operation is successful
545      * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
546      */
receive(void * buffer,size_type buffer_size,size_type & message_size)547     operation_result receive(void* buffer, size_type buffer_size, size_type& message_size)
548     {
549         fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size };
550         operation_result result = do_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state);
551         message_size = buffer_size - state.size;
552         return result;
553     }
554 
555     /*!
556      * The method takes a message from the associated message queue. When the object is in
557      * running state and the queue is empty, the method blocks. The blocking is interrupted
558      * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
559      * When the object is already in the stopped state and the queue is empty, the method
560      * does not block but returns immediately with return value \c operation_result::aborted.
561      *
562      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
563      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
564      *
565      * \pre <tt>is_open() == true</tt>
566      *
567      * \param buffer The memory buffer to store the received message in.
568      * \param message_size Receives the size of the received message, in bytes.
569      *
570      * \retval operation_result::succeeded if the operation is successful
571      * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
572      */
573     template< typename ElementT, size_type SizeV >
574 #if !defined(BOOST_LOG_DOXYGEN_PASS)
575     typename aux::enable_if_byte< ElementT, operation_result >::type
576 #else
577     operation_result
578 #endif
receive(ElementT (& buffer)[SizeV],size_type & message_size)579     receive(ElementT (&buffer)[SizeV], size_type& message_size)
580     {
581         return receive(buffer, SizeV, message_size);
582     }
583 
584     /*!
585      * The method takes a message from the associated message queue. When the object is in
586      * running state and the queue is empty, the method blocks. The blocking is interrupted
587      * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted.
588      * When the object is already in the stopped state and the queue is empty, the method
589      * does not block but returns immediately with return value \c operation_result::aborted.
590      *
591      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
592      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
593      *
594      * \pre <tt>is_open() == true</tt>
595      *
596      * \param container The container to store the received message in. The container should have
597      *                  value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt>
598      *                  and support inserting elements at the end.
599      *
600      * \retval operation_result::succeeded if the operation is successful
601      * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt>
602      */
603     template< typename ContainerT >
604 #if !defined(BOOST_LOG_DOXYGEN_PASS)
605     typename aux::enable_if_byte< typename ContainerT::value_type, operation_result >::type
606 #else
607     operation_result
608 #endif
receive(ContainerT & container)609     receive(ContainerT& container)
610     {
611         return do_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container);
612     }
613 
614     /*!
615      * The method performs an attempt to take a message from the associated message queue. The
616      * method is non-blocking, and always returns immediately.
617      *
618      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
619      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
620      *
621      * \pre <tt>is_open() == true</tt>
622      *
623      * \param buffer The memory buffer to store the received message in.
624      * \param buffer_size The size of the buffer, in bytes.
625      * \param message_size Receives the size of the received message, in bytes.
626      *
627      * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
628      *         when the queue is empty).
629      */
try_receive(void * buffer,size_type buffer_size,size_type & message_size)630     bool try_receive(void* buffer, size_type buffer_size, size_type& message_size)
631     {
632         fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size };
633         bool result = do_try_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state);
634         message_size = buffer_size - state.size;
635         return result;
636     }
637 
638     /*!
639      * The method performs an attempt to take a message from the associated message queue. The
640      * method is non-blocking, and always returns immediately.
641      *
642      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
643      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
644      *
645      * \pre <tt>is_open() == true</tt>
646      *
647      * \param buffer The memory buffer to store the received message in.
648      * \param message_size Receives the size of the received message, in bytes.
649      *
650      * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
651      *         when the queue is empty).
652      */
653     template< typename ElementT, size_type SizeV >
654 #if !defined(BOOST_LOG_DOXYGEN_PASS)
655     typename aux::enable_if_byte< ElementT, bool >::type
656 #else
657     bool
658 #endif
try_receive(ElementT (& buffer)[SizeV],size_type & message_size)659     try_receive(ElementT (&buffer)[SizeV], size_type& message_size)
660     {
661         return try_receive(buffer, SizeV, message_size);
662     }
663 
664     /*!
665      * The method performs an attempt to take a message from the associated message queue. The
666      * method is non-blocking, and always returns immediately.
667      *
668      * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>,
669      * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed.
670      *
671      * \pre <tt>is_open() == true</tt>
672      *
673      * \param container The container to store the received message in. The container should have
674      *                  value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt>
675      *                  and support inserting elements at the end.
676      *
677      * \return \c true if a message is successfully received, and \c false otherwise (e.g.,
678      *         when the queue is empty).
679      */
680     template< typename ContainerT >
681 #if !defined(BOOST_LOG_DOXYGEN_PASS)
682     typename aux::enable_if_byte< typename ContainerT::value_type, bool >::type
683 #else
684     bool
685 #endif
try_receive(ContainerT & container)686     try_receive(ContainerT& container)
687     {
688         return do_try_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container);
689     }
690 
691     /*!
692      * The method frees system-wide resources, associated with the interprocess queue with the supplied name.
693      * The queue referred to by the specified name must not be opened in any process at the point of this call.
694      * After this call succeeds a new queue with the specified name can be created.
695      *
696      * This call can be useful to recover from an earlier process misbehavior (e.g. a crash without properly
697      * closing the message queue). In this case resources allocated for the interprocess queue may remain
698      * allocated after the last process closed the queue, which in turn may prevent creating a new queue with
699      * the same name. By calling this method before creating a queue the application can attempt to ensure
700      * it starts with a clean slate.
701      *
702      * On some platforms resources associated with the queue are automatically reclaimed by the operating system
703      * when the last process using those resources terminates (even if it terminates abnormally). On these
704      * platforms this call may be a no-op. However, portable code should still call this method at appropriate
705      * places to ensure compatibility with other platforms and future library versions, which may change implementation
706      * of the queue.
707      *
708      * \param name Name of the message queue to be removed.
709      */
710     static BOOST_LOG_API void remove(object_name const& name);
711 
712 #if !defined(BOOST_LOG_DOXYGEN_PASS)
713 private:
714     //! Implementation of the constructor with named arguments
715     template< typename ArgsT >
construct(ArgsT const & args)716     void construct(ArgsT const& args)
717     {
718         m_impl = NULL;
719         construct_dispatch(args[keywords::open_mode], args);
720     }
721 
722     //! Implementation of the constructor with named arguments
723     template< typename ArgsT >
construct_dispatch(open_mode::create_only_tag,ArgsT const & args)724     void construct_dispatch(open_mode::create_only_tag, ArgsT const& args)
725     {
726         this->create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
727     }
728 
729     //! Implementation of the constructor with named arguments
730     template< typename ArgsT >
construct_dispatch(open_mode::open_or_create_tag,ArgsT const & args)731     void construct_dispatch(open_mode::open_or_create_tag, ArgsT const& args)
732     {
733         this->open_or_create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
734     }
735 
736     //! Implementation of the constructor with named arguments
737     template< typename ArgsT >
construct_dispatch(open_mode::open_only_tag,ArgsT const & args)738     void construct_dispatch(open_mode::open_only_tag, ArgsT const& args)
739     {
740         this->open(args[keywords::name], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]);
741     }
742 
743     //! Closes the message queue, if it's open
744     BOOST_LOG_API void do_close() BOOST_NOEXCEPT;
745 
746     //! Receives the message from the queue and calls the handler to place the data in the user's storage
747     BOOST_LOG_API operation_result do_receive(receive_handler handler, void* state);
748     //! Attempts to receives the message from the queue and calls the handler to place the data in the user's storage
749     BOOST_LOG_API bool do_try_receive(receive_handler handler, void* state);
750 
751     //! Fixed buffer receive handler
752     static BOOST_LOG_API void fixed_buffer_receive_handler(void* state, const void* data, size_type size);
753     //! Receive handler for a container
754     template< typename ContainerT >
container_receive_handler(void * state,const void * data,size_type size)755     static void container_receive_handler(void* state, const void* data, size_type size)
756     {
757         ContainerT* const container = static_cast< ContainerT* >(state);
758         container->insert
759         (
760             container->end(),
761             static_cast< typename ContainerT::value_type const* >(data),
762             static_cast< typename ContainerT::value_type const* >(data) + size
763         );
764     }
765 #endif
766 };
767 
768 } // namespace ipc
769 
770 BOOST_LOG_CLOSE_NAMESPACE // namespace log
771 
772 } // namespace boost
773 
774 #include <boost/log/detail/footer.hpp>
775 
776 #endif // BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_
777