1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10 
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
13 
14 #ifndef BOOST_CONFIG_HPP
15 #  include <boost/config.hpp>
16 #endif
17 #
18 #if defined(BOOST_HAS_PRAGMA_ONCE)
19 #  pragma once
20 #endif
21 
22 #include <boost/interprocess/detail/config_begin.hpp>
23 #include <boost/interprocess/detail/workaround.hpp>
24 
25 #include <boost/interprocess/shared_memory_object.hpp>
26 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
27 #include <boost/interprocess/sync/interprocess_condition.hpp>
28 #include <boost/interprocess/sync/interprocess_mutex.hpp>
29 #include <boost/interprocess/sync/scoped_lock.hpp>
30 #include <boost/interprocess/detail/utilities.hpp>
31 #include <boost/interprocess/offset_ptr.hpp>
32 #include <boost/interprocess/creation_tags.hpp>
33 #include <boost/interprocess/exceptions.hpp>
34 #include <boost/interprocess/permissions.hpp>
35 #include <boost/core/no_exceptions_support.hpp>
36 #include <boost/interprocess/detail/type_traits.hpp>
37 #include <boost/intrusive/pointer_traits.hpp>
38 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
39 #include <boost/intrusive/pointer_traits.hpp>
40 #include <boost/assert.hpp>
41 #include <algorithm> //std::lower_bound
42 #include <cstddef>   //std::size_t
43 #include <cstring>   //memcpy
44 
45 
46 //!\file
47 //!Describes an inter-process message queue. This class allows sending
48 //!messages between processes and allows blocking, non-blocking and timed
49 //!sending and receiving.
50 
51 namespace boost{  namespace interprocess{
52 
53 namespace ipcdetail
54 {
55    template<class VoidPointer>
56    class msg_queue_initialization_func_t;
57 }
58 
59 //!A class that allows sending messages
60 //!between processes.
61 template<class VoidPointer>
62 class message_queue_t
63 {
64    #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
65    //Blocking modes
66    enum block_t   {  blocking,   timed,   non_blocking   };
67 
68    message_queue_t();
69    #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
70 
71    public:
72    typedef VoidPointer                                                 void_pointer;
73    typedef typename boost::intrusive::
74       pointer_traits<void_pointer>::template
75          rebind_pointer<char>::type                                    char_ptr;
76    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
77    typedef typename boost::container::dtl::make_unsigned<difference_type>::type        size_type;
78 
79    //!Creates a process shared message queue with name "name". For this message queue,
80    //!the maximum number of messages will be "max_num_msg" and the maximum message size
81    //!will be "max_msg_size". Throws on error and if the queue was previously created.
82    message_queue_t(create_only_t create_only,
83                  const char *name,
84                  size_type max_num_msg,
85                  size_type max_msg_size,
86                  const permissions &perm = permissions());
87 
88    //!Opens or creates a process shared message queue with name "name".
89    //!If the queue is created, the maximum number of messages will be "max_num_msg"
90    //!and the maximum message size will be "max_msg_size". If queue was previously
91    //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
92    //!are ignored. Throws on error.
93    message_queue_t(open_or_create_t open_or_create,
94                  const char *name,
95                  size_type max_num_msg,
96                  size_type max_msg_size,
97                  const permissions &perm = permissions());
98 
99    //!Opens a previously created process shared message queue with name "name".
100    //!If the queue was not previously created or there are no free resources,
101    //!throws an error.
102    message_queue_t(open_only_t open_only,
103                  const char *name);
104 
105    //!Destroys *this and indicates that the calling process is finished using
106    //!the resource. All opened message queues are still
107    //!valid after destruction. The destructor function will deallocate
108    //!any system resources allocated by the system for use by this process for
109    //!this resource. The resource can still be opened again calling
110    //!the open constructor overload. To erase the message queue from the system
111    //!use remove().
112    ~message_queue_t();
113 
114    //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
115    //!message queue with priority "priority". If the message queue is full
116    //!the sender is blocked. Throws interprocess_error on error.
117    void send (const void *buffer,     size_type buffer_size,
118               unsigned int priority);
119 
120    //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
121    //!message queue with priority "priority". If the message queue is full
122    //!the sender is not blocked and returns false, otherwise returns true.
123    //!Throws interprocess_error on error.
124    bool try_send    (const void *buffer,     size_type buffer_size,
125                          unsigned int priority);
126 
127    //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
128    //!message queue with priority "priority". If the message queue is full
129    //!the sender retries until time "abs_time" is reached. Returns true if
130    //!the message has been successfully sent. Returns false if timeout is reached.
131    //!Throws interprocess_error on error.
132    bool timed_send    (const void *buffer,     size_type buffer_size,
133                            unsigned int priority,  const boost::posix_time::ptime& abs_time);
134 
135    //!Receives a message from the message queue. The message is stored in buffer
136    //!"buffer", which has size "buffer_size". The received message has size
137    //!"recvd_size" and priority "priority". If the message queue is empty
138    //!the receiver is blocked. Throws interprocess_error on error.
139    void receive (void *buffer,           size_type buffer_size,
140                  size_type &recvd_size,unsigned int &priority);
141 
142    //!Receives a message from the message queue. The message is stored in buffer
143    //!"buffer", which has size "buffer_size". The received message has size
144    //!"recvd_size" and priority "priority". If the message queue is empty
145    //!the receiver is not blocked and returns false, otherwise returns true.
146    //!Throws interprocess_error on error.
147    bool try_receive (void *buffer,           size_type buffer_size,
148                      size_type &recvd_size,unsigned int &priority);
149 
150    //!Receives a message from the message queue. The message is stored in buffer
151    //!"buffer", which has size "buffer_size". The received message has size
152    //!"recvd_size" and priority "priority". If the message queue is empty
153    //!the receiver retries until time "abs_time" is reached. Returns true if
154    //!the message has been successfully sent. Returns false if timeout is reached.
155    //!Throws interprocess_error on error.
156    bool timed_receive (void *buffer,           size_type buffer_size,
157                        size_type &recvd_size,unsigned int &priority,
158                        const boost::posix_time::ptime &abs_time);
159 
160    //!Returns the maximum number of messages allowed by the queue. The message
161    //!queue must be opened or created previously. Otherwise, returns 0.
162    //!Never throws
163    size_type get_max_msg() const;
164 
165    //!Returns the maximum size of message allowed by the queue. The message
166    //!queue must be opened or created previously. Otherwise, returns 0.
167    //!Never throws
168    size_type get_max_msg_size() const;
169 
170    //!Returns the number of messages currently stored.
171    //!Never throws
172    size_type get_num_msg() const;
173 
174    //!Removes the message queue from the system.
175    //!Returns false on error. Never throws
176    static bool remove(const char *name);
177 
178    #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
179    private:
180    typedef boost::posix_time::ptime ptime;
181 
182    friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
183 
184    bool do_receive(block_t block,
185                    void *buffer,         size_type buffer_size,
186                    size_type &recvd_size, unsigned int &priority,
187                    const ptime &abs_time);
188 
189    bool do_send(block_t block,
190                 const void *buffer,      size_type buffer_size,
191                 unsigned int priority,   const ptime &abs_time);
192 
193    //!Returns the needed memory size for the shared message queue.
194    //!Never throws
195    static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
196    typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
197    open_create_impl_t m_shmem;
198    #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
199 };
200 
201 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
202 
203 namespace ipcdetail {
204 
205 //!This header is the prefix of each message in the queue
206 template<class VoidPointer>
207 class msg_hdr_t
208 {
209    typedef VoidPointer                                                           void_pointer;
210    typedef typename boost::intrusive::
211       pointer_traits<void_pointer>::template
212          rebind_pointer<char>::type                                              char_ptr;
213    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type  difference_type;
214    typedef typename boost::container::dtl::make_unsigned<difference_type>::type                  size_type;
215 
216    public:
217    size_type               len;     // Message length
218    unsigned int            priority;// Message priority
219    //!Returns the data buffer associated with this this message
data()220    void * data(){ return this+1; }  //
221 };
222 
223 //!This functor is the predicate to order stored messages by priority
224 template<class VoidPointer>
225 class priority_functor
226 {
227    typedef typename boost::intrusive::
228       pointer_traits<VoidPointer>::template
229          rebind_pointer<msg_hdr_t<VoidPointer> >::type                  msg_hdr_ptr_t;
230 
231    public:
operator ()(const msg_hdr_ptr_t & msg1,const msg_hdr_ptr_t & msg2) const232    bool operator()(const msg_hdr_ptr_t &msg1,
233                    const msg_hdr_ptr_t &msg2) const
234       {  return msg1->priority < msg2->priority;  }
235 };
236 
237 //!This header is placed in the beginning of the shared memory and contains
238 //!the data to control the queue. This class initializes the shared memory
239 //!in the following way: in ascending memory address with proper alignment
240 //!fillings:
241 //!
242 //!-> mq_hdr_t:
243 //!   Main control block that controls the rest of the elements
244 //!
245 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
246 //!   An array of pointers with size "max_num_msg" called index. Each pointer
247 //!   points to a preallocated message. Elements of this array are
248 //!   reordered in runtime in the following way:
249 //!
250 //!   IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
251 //!
252 //!   When the current number of messages is "cur_num_msg", the array
253 //!   is treated like a circular buffer. Starting from position "cur_first_msg"
254 //!   "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
255 //!   point to free messages. Those "cur_num_msg" pointers are
256 //!   ordered by the priority of the pointed message and by insertion order
257 //!   if two messages have the same priority. So the next message to be
258 //!   used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
259 //!   and the first free message ready to be used in a "send" operation is
260 //!   [cur_first_msg] if circular buffer is extended from front,
261 //!   [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
262 //!
263 //!   This transforms the index in a circular buffer with an embedded free
264 //!   message queue.
265 //!
266 //!   ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
267 //!
268 //!   When the current number of messages is "cur_num_msg", the first
269 //!   "cur_num_msg" pointers point to inserted messages and the rest
270 //!   point to free messages. The first "cur_num_msg" pointers are
271 //!   ordered by the priority of the pointed message and by insertion order
272 //!   if two messages have the same priority. So the next message to be
273 //!   used in a "receive" is pointed by index [cur_num_msg-1] and the first free
274 //!   message ready to be used in a "send" operation is index [cur_num_msg].
275 //!
276 //!   This transforms the index in a fixed size priority queue with an embedded free
277 //!   message queue.
278 //!
279 //!-> struct message_t
280 //!   {
281 //!      msg_hdr_t            header;
282 //!      char[max_msg_size]   data;
283 //!   } messages [max_num_msg];
284 //!
285 //!   An array of buffers of preallocated messages, each one prefixed with the
286 //!   msg_hdr_t structure. Each of this message is pointed by one pointer of
287 //!   the index structure.
288 template<class VoidPointer>
289 class mq_hdr_t
290    : public ipcdetail::priority_functor<VoidPointer>
291 {
292    typedef VoidPointer                                                     void_pointer;
293    typedef msg_hdr_t<void_pointer>                                         msg_header;
294    typedef typename boost::intrusive::
295       pointer_traits<void_pointer>::template
296          rebind_pointer<msg_header>::type                                  msg_hdr_ptr_t;
297    typedef typename boost::intrusive::pointer_traits
298       <msg_hdr_ptr_t>::difference_type                                     difference_type;
299    typedef typename boost::container::
300       dtl::make_unsigned<difference_type>::type               size_type;
301    typedef typename boost::intrusive::
302       pointer_traits<void_pointer>::template
303          rebind_pointer<msg_hdr_ptr_t>::type                              msg_hdr_ptr_ptr_t;
304    typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
305 
306    public:
307    //!Constructor. This object must be constructed in the beginning of the
308    //!shared memory of the size returned by the function "get_mem_size".
309    //!This constructor initializes the needed resources and creates
310    //!the internal structures like the priority index. This can throw.
mq_hdr_t(size_type max_num_msg,size_type max_msg_size)311    mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
312       : m_max_num_msg(max_num_msg),
313          m_max_msg_size(max_msg_size),
314          m_cur_num_msg(0)
315          #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
316          ,m_cur_first_msg(0u)
317          ,m_blocked_senders(0u)
318          ,m_blocked_receivers(0u)
319          #endif
320       {  this->initialize_memory();  }
321 
322    //!Returns true if the message queue is full
is_full() const323    bool is_full() const
324       {  return m_cur_num_msg == m_max_num_msg;  }
325 
326    //!Returns true if the message queue is empty
is_empty() const327    bool is_empty() const
328       {  return !m_cur_num_msg;  }
329 
330    //!Frees the top priority message and saves it in the free message list
free_top_msg()331    void free_top_msg()
332       {  --m_cur_num_msg;  }
333 
334    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
335 
336    typedef msg_hdr_ptr_t *iterator;
337 
end_pos() const338    size_type end_pos() const
339    {
340       const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
341       return space_until_bufend > m_cur_num_msg
342          ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
343    }
344 
345    //!Returns the inserted message with top priority
top_msg()346    msg_header &top_msg()
347    {
348       size_type pos = this->end_pos();
349       return *mp_index[pos ? --pos : m_max_num_msg - 1];
350    }
351 
352    //!Returns the inserted message with bottom priority
bottom_msg()353    msg_header &bottom_msg()
354       {  return *mp_index[m_cur_first_msg];   }
355 
inserted_ptr_begin() const356    iterator inserted_ptr_begin() const
357    {  return &mp_index[m_cur_first_msg]; }
358 
inserted_ptr_end() const359    iterator inserted_ptr_end() const
360       {  return &mp_index[this->end_pos()];  }
361 
lower_bound(const msg_hdr_ptr_t & value,priority_functor<VoidPointer> func)362    iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
363    {
364       iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
365       if(end < begin){
366          iterator idx_end = &mp_index[m_max_num_msg];
367          iterator ret = std::lower_bound(begin, idx_end, value, func);
368          if(idx_end == ret){
369             iterator idx_beg = &mp_index[0];
370             ret = std::lower_bound(idx_beg, end, value, func);
371             //sanity check, these cases should not call lower_bound (optimized out)
372             BOOST_ASSERT(ret != end);
373             BOOST_ASSERT(ret != begin);
374             return ret;
375          }
376          else{
377             return ret;
378          }
379       }
380       else{
381          return std::lower_bound(begin, end, value, func);
382       }
383    }
384 
insert_at(iterator where)385    msg_header & insert_at(iterator where)
386    {
387       iterator it_inserted_ptr_end = this->inserted_ptr_end();
388       iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
389       if(where == it_inserted_ptr_beg){
390          //unsigned integer guarantees underflow
391          m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
392          --m_cur_first_msg;
393          ++m_cur_num_msg;
394          return *mp_index[m_cur_first_msg];
395       }
396       else if(where == it_inserted_ptr_end){
397          ++m_cur_num_msg;
398          return **it_inserted_ptr_end;
399       }
400       else{
401          size_type pos  = where - &mp_index[0];
402          size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
403          //Check if it's more efficient to move back or move front
404          if(circ_pos < m_cur_num_msg/2){
405             //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
406             //indicates two step insertion
407             if(!pos){
408                pos   = m_max_num_msg;
409                where = &mp_index[m_max_num_msg-1];
410             }
411             else{
412                --where;
413             }
414             const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
415             const size_type first_segment_beg  = unique_segment ? m_cur_first_msg : 1u;
416             const size_type first_segment_end  = pos;
417             const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
418             const size_type second_segment_end = m_max_num_msg;
419             const msg_hdr_ptr_t backup   = *(&mp_index[0] + (unique_segment ?  first_segment_beg : second_segment_beg) - 1);
420 
421             //First segment
422             if(!unique_segment){
423                std::copy( &mp_index[0] + second_segment_beg
424                         , &mp_index[0] + second_segment_end
425                         , &mp_index[0] + second_segment_beg - 1);
426                mp_index[m_max_num_msg-1] = mp_index[0];
427             }
428             std::copy( &mp_index[0] + first_segment_beg
429                      , &mp_index[0] + first_segment_end
430                      , &mp_index[0] + first_segment_beg - 1);
431             *where = backup;
432             m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
433             --m_cur_first_msg;
434             ++m_cur_num_msg;
435             return **where;
436          }
437          else{
438             //The queue can't be full so end_pos < m_cur_first_msg
439             //indicates two step insertion
440             const size_type pos_end = this->end_pos();
441             const bool unique_segment = pos < pos_end;
442             const size_type first_segment_beg  = pos;
443             const size_type first_segment_end  = unique_segment  ? pos_end : m_max_num_msg-1;
444             const size_type second_segment_beg = 0u;
445             const size_type second_segment_end = unique_segment ? 0u : pos_end;
446             const msg_hdr_ptr_t backup   = *it_inserted_ptr_end;
447 
448             //First segment
449             if(!unique_segment){
450                std::copy_backward( &mp_index[0] + second_segment_beg
451                                  , &mp_index[0] + second_segment_end
452                                  , &mp_index[0] + second_segment_end + 1);
453                mp_index[0] = mp_index[m_max_num_msg-1];
454             }
455             std::copy_backward( &mp_index[0] + first_segment_beg
456                               , &mp_index[0] + first_segment_end
457                               , &mp_index[0] + first_segment_end + 1);
458             *where = backup;
459             ++m_cur_num_msg;
460             return **where;
461          }
462       }
463    }
464 
465    #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
466 
467    typedef msg_hdr_ptr_t *iterator;
468 
469    //!Returns the inserted message with top priority
top_msg()470    msg_header &top_msg()
471       {  return *mp_index[m_cur_num_msg-1];   }
472 
473    //!Returns the inserted message with bottom priority
bottom_msg()474    msg_header &bottom_msg()
475       {  return *mp_index[0];   }
476 
inserted_ptr_begin() const477    iterator inserted_ptr_begin() const
478    {  return &mp_index[0]; }
479 
inserted_ptr_end() const480    iterator inserted_ptr_end() const
481    {  return &mp_index[m_cur_num_msg]; }
482 
lower_bound(const msg_hdr_ptr_t & value,priority_functor<VoidPointer> func)483    iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
484    {  return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func);  }
485 
insert_at(iterator pos)486    msg_header & insert_at(iterator pos)
487    {
488       const msg_hdr_ptr_t backup = *inserted_ptr_end();
489       std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
490       *pos = backup;
491       ++m_cur_num_msg;
492       return **pos;
493    }
494 
495    #endif   //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
496 
497    //!Inserts the first free message in the priority queue
queue_free_msg(unsigned int priority)498    msg_header & queue_free_msg(unsigned int priority)
499    {
500       //Get priority queue's range
501       iterator it  (inserted_ptr_begin()), it_end(inserted_ptr_end());
502       //Optimize for non-priority usage
503       if(m_cur_num_msg && priority > this->bottom_msg().priority){
504          //Check for higher priority than all stored messages
505          if(priority > this->top_msg().priority){
506             it = it_end;
507          }
508          else{
509             //Since we don't now which free message we will pick
510             //build a dummy header for searches
511             msg_header dummy_hdr;
512             dummy_hdr.priority = priority;
513 
514             //Get free msg
515             msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
516 
517             //Check where the free message should be placed
518             it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
519          }
520       }
521       //Insert the free message in the correct position
522       return this->insert_at(it);
523    }
524 
525    //!Returns the number of bytes needed to construct a message queue with
526    //!"max_num_size" maximum number of messages and "max_msg_size" maximum
527    //!message size. Never throws.
get_mem_size(size_type max_msg_size,size_type max_num_msg)528    static size_type get_mem_size
529       (size_type max_msg_size, size_type max_num_msg)
530    {
531       const size_type
532        msg_hdr_align  = ::boost::container::dtl::alignment_of<msg_header>::value,
533        index_align    = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
534          r_hdr_size     = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
535          r_index_size   = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
536          r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
537       return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
538          open_create_impl_t::ManagedOpenOrCreateUserOffset;
539    }
540 
541    //!Initializes the memory structures to preallocate messages and constructs the
542    //!message index. Never throws.
initialize_memory()543    void initialize_memory()
544    {
545       const size_type
546         msg_hdr_align  = ::boost::container::dtl::alignment_of<msg_header>::value,
547         index_align    = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
548          r_hdr_size     = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
549          r_index_size   = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
550          r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
551 
552       //Pointer to the index
553       msg_hdr_ptr_t *index =  reinterpret_cast<msg_hdr_ptr_t*>
554                                  (reinterpret_cast<char*>(this)+r_hdr_size);
555 
556       //Pointer to the first message header
557       msg_header *msg_hdr   =  reinterpret_cast<msg_header*>
558                                  (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
559 
560       //Initialize the pointer to the index
561       mp_index             = index;
562 
563       //Initialize the index so each slot points to a preallocated message
564       for(size_type i = 0; i < m_max_num_msg; ++i){
565          index[i] = msg_hdr;
566          msg_hdr  = reinterpret_cast<msg_header*>
567                         (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
568       }
569    }
570 
571    public:
572    //Pointer to the index
573    msg_hdr_ptr_ptr_t          mp_index;
574    //Maximum number of messages of the queue
575    const size_type            m_max_num_msg;
576    //Maximum size of messages of the queue
577    const size_type            m_max_msg_size;
578    //Current number of messages
579    size_type                  m_cur_num_msg;
580    //Mutex to protect data structures
581    interprocess_mutex         m_mutex;
582    //Condition block receivers when there are no messages
583    interprocess_condition     m_cond_recv;
584    //Condition block senders when the queue is full
585    interprocess_condition     m_cond_send;
586    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
587    //Current start offset in the circular index
588    size_type                  m_cur_first_msg;
589    size_type                  m_blocked_senders;
590    size_type                  m_blocked_receivers;
591    #endif
592 };
593 
594 
595 //!This is the atomic functor to be executed when creating or opening
596 //!shared memory. Never throws
597 template<class VoidPointer>
598 class msg_queue_initialization_func_t
599 {
600    public:
601    typedef typename boost::intrusive::
602       pointer_traits<VoidPointer>::template
603          rebind_pointer<char>::type                               char_ptr;
604    typedef typename boost::intrusive::pointer_traits<char_ptr>::
605       difference_type                                             difference_type;
606    typedef typename boost::container::dtl::
607       make_unsigned<difference_type>::type                        size_type;
608 
msg_queue_initialization_func_t(size_type maxmsg=0,size_type maxmsgsize=0)609    msg_queue_initialization_func_t(size_type maxmsg = 0,
610                          size_type maxmsgsize = 0)
611       : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
612 
operator ()(void * address,size_type,bool created)613    bool operator()(void *address, size_type, bool created)
614    {
615       char      *mptr;
616 
617       if(created){
618          mptr     = reinterpret_cast<char*>(address);
619          //Construct the message queue header at the beginning
620          BOOST_TRY{
621             new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
622          }
623          BOOST_CATCH(...){
624             return false;
625          }
626          BOOST_CATCH_END
627       }
628       return true;
629    }
630 
get_min_size() const631    std::size_t get_min_size() const
632    {
633       return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
634       - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
635    }
636 
637    const size_type m_maxmsg;
638    const size_type m_maxmsgsize;
639 };
640 
641 }  //namespace ipcdetail {
642 
643 template<class VoidPointer>
~message_queue_t()644 inline message_queue_t<VoidPointer>::~message_queue_t()
645 {}
646 
647 template<class VoidPointer>
get_mem_size(size_type max_msg_size,size_type max_num_msg)648 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
649    (size_type max_msg_size, size_type max_num_msg)
650 {  return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg);   }
651 
652 template<class VoidPointer>
message_queue_t(create_only_t,const char * name,size_type max_num_msg,size_type max_msg_size,const permissions & perm)653 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
654                                     const char *name,
655                                     size_type max_num_msg,
656                                     size_type max_msg_size,
657                                     const permissions &perm)
658       //Create shared memory and execute functor atomically
659    :  m_shmem(create_only,
660               name,
661               get_mem_size(max_msg_size, max_num_msg),
662               read_write,
663               static_cast<void*>(0),
664               //Prepare initialization functor
665               ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
666               perm)
667 {}
668 
669 template<class VoidPointer>
message_queue_t(open_or_create_t,const char * name,size_type max_num_msg,size_type max_msg_size,const permissions & perm)670 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
671                                     const char *name,
672                                     size_type max_num_msg,
673                                     size_type max_msg_size,
674                                     const permissions &perm)
675       //Create shared memory and execute functor atomically
676    :  m_shmem(open_or_create,
677               name,
678               get_mem_size(max_msg_size, max_num_msg),
679               read_write,
680               static_cast<void*>(0),
681               //Prepare initialization functor
682               ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
683               perm)
684 {}
685 
686 template<class VoidPointer>
message_queue_t(open_only_t,const char * name)687 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
688    //Create shared memory and execute functor atomically
689    :  m_shmem(open_only,
690               name,
691               read_write,
692               static_cast<void*>(0),
693               //Prepare initialization functor
694               ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
695 {}
696 
697 template<class VoidPointer>
send(const void * buffer,size_type buffer_size,unsigned int priority)698 inline void message_queue_t<VoidPointer>::send
699    (const void *buffer, size_type buffer_size, unsigned int priority)
700 {  this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
701 
702 template<class VoidPointer>
try_send(const void * buffer,size_type buffer_size,unsigned int priority)703 inline bool message_queue_t<VoidPointer>::try_send
704    (const void *buffer, size_type buffer_size, unsigned int priority)
705 {  return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
706 
707 template<class VoidPointer>
timed_send(const void * buffer,size_type buffer_size,unsigned int priority,const boost::posix_time::ptime & abs_time)708 inline bool message_queue_t<VoidPointer>::timed_send
709    (const void *buffer, size_type buffer_size
710    ,unsigned int priority, const boost::posix_time::ptime &abs_time)
711 {
712    if(abs_time == boost::posix_time::pos_infin){
713       this->send(buffer, buffer_size, priority);
714       return true;
715    }
716    return this->do_send(timed, buffer, buffer_size, priority, abs_time);
717 }
718 
719 template<class VoidPointer>
do_send(block_t block,const void * buffer,size_type buffer_size,unsigned int priority,const boost::posix_time::ptime & abs_time)720 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
721                                 const void *buffer,      size_type buffer_size,
722                                 unsigned int priority,   const boost::posix_time::ptime &abs_time)
723 {
724    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
725    //Check if buffer is smaller than maximum allowed
726    if (buffer_size > p_hdr->m_max_msg_size) {
727       throw interprocess_exception(size_error);
728    }
729 
730    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
731    bool notify_blocked_receivers = false;
732    #endif
733    //---------------------------------------------
734    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
735    //---------------------------------------------
736    {
737       //If the queue is full execute blocking logic
738       if (p_hdr->is_full()) {
739          BOOST_TRY{
740             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
741             ++p_hdr->m_blocked_senders;
742             #endif
743             switch(block){
744                case non_blocking :
745                   #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
746                   --p_hdr->m_blocked_senders;
747                   #endif
748                   return false;
749                break;
750 
751                case blocking :
752                   do{
753                      p_hdr->m_cond_send.wait(lock);
754                   }
755                   while (p_hdr->is_full());
756                break;
757 
758                case timed :
759                   do{
760                      if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
761                         if(p_hdr->is_full()){
762                            #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
763                            --p_hdr->m_blocked_senders;
764                            #endif
765                            return false;
766                         }
767                         break;
768                      }
769                   }
770                   while (p_hdr->is_full());
771                break;
772                default:
773                break;
774             }
775             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
776             --p_hdr->m_blocked_senders;
777             #endif
778          }
779          BOOST_CATCH(...){
780             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
781             --p_hdr->m_blocked_senders;
782             #endif
783             BOOST_RETHROW;
784          }
785          BOOST_CATCH_END
786       }
787 
788       #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
789       notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
790       #endif
791       //Insert the first free message in the priority queue
792       ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
793 
794       //Sanity check, free msgs are always cleaned when received
795       BOOST_ASSERT(free_msg_hdr.priority == 0);
796       BOOST_ASSERT(free_msg_hdr.len == 0);
797 
798       //Copy control data to the free message
799       free_msg_hdr.priority = priority;
800       free_msg_hdr.len      = buffer_size;
801 
802       //Copy user buffer to the message
803       std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
804    }  // Lock end
805 
806    //Notify outside lock to avoid contention. This might produce some
807    //spurious wakeups, but it's usually far better than notifying inside.
808    //If this message changes the queue empty state, notify it to receivers
809    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
810    if (notify_blocked_receivers){
811       p_hdr->m_cond_recv.notify_one();
812    }
813    #else
814    p_hdr->m_cond_recv.notify_one();
815    #endif
816 
817    return true;
818 }
819 
820 template<class VoidPointer>
receive(void * buffer,size_type buffer_size,size_type & recvd_size,unsigned int & priority)821 inline void message_queue_t<VoidPointer>::receive(void *buffer,        size_type buffer_size,
822                         size_type &recvd_size,   unsigned int &priority)
823 {  this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
824 
825 template<class VoidPointer>
826 inline bool
try_receive(void * buffer,size_type buffer_size,size_type & recvd_size,unsigned int & priority)827    message_queue_t<VoidPointer>::try_receive(void *buffer,              size_type buffer_size,
828                               size_type &recvd_size,   unsigned int &priority)
829 {  return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
830 
831 template<class VoidPointer>
832 inline bool
timed_receive(void * buffer,size_type buffer_size,size_type & recvd_size,unsigned int & priority,const boost::posix_time::ptime & abs_time)833    message_queue_t<VoidPointer>::timed_receive(void *buffer,            size_type buffer_size,
834                                 size_type &recvd_size,   unsigned int &priority,
835                                 const boost::posix_time::ptime &abs_time)
836 {
837    if(abs_time == boost::posix_time::pos_infin){
838       this->receive(buffer, buffer_size, recvd_size, priority);
839       return true;
840    }
841    return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
842 }
843 
844 template<class VoidPointer>
845 inline bool
do_receive(block_t block,void * buffer,size_type buffer_size,size_type & recvd_size,unsigned int & priority,const boost::posix_time::ptime & abs_time)846    message_queue_t<VoidPointer>::do_receive(block_t block,
847                           void *buffer,            size_type buffer_size,
848                           size_type &recvd_size,   unsigned int &priority,
849                           const boost::posix_time::ptime &abs_time)
850 {
851    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
852    //Check if buffer is big enough for any message
853    if (buffer_size < p_hdr->m_max_msg_size) {
854       throw interprocess_exception(size_error);
855    }
856 
857    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
858    bool notify_blocked_senders = false;
859    #endif
860    //---------------------------------------------
861    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
862    //---------------------------------------------
863    {
864       //If there are no messages execute blocking logic
865       if (p_hdr->is_empty()) {
866          BOOST_TRY{
867             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
868             ++p_hdr->m_blocked_receivers;
869             #endif
870             switch(block){
871                case non_blocking :
872                   #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
873                   --p_hdr->m_blocked_receivers;
874                   #endif
875                   return false;
876                break;
877 
878                case blocking :
879                   do{
880                      p_hdr->m_cond_recv.wait(lock);
881                   }
882                   while (p_hdr->is_empty());
883                break;
884 
885                case timed :
886                   do{
887                      if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
888                         if(p_hdr->is_empty()){
889                            #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
890                            --p_hdr->m_blocked_receivers;
891                            #endif
892                            return false;
893                         }
894                         break;
895                      }
896                   }
897                   while (p_hdr->is_empty());
898                break;
899 
900                //Paranoia check
901                default:
902                break;
903             }
904             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
905             --p_hdr->m_blocked_receivers;
906             #endif
907          }
908          BOOST_CATCH(...){
909             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
910             --p_hdr->m_blocked_receivers;
911             #endif
912             BOOST_RETHROW;
913          }
914          BOOST_CATCH_END
915       }
916 
917       #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
918       notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
919       #endif
920 
921       //There is at least one message ready to pick, get the top one
922       ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
923 
924       //Get data from the message
925       recvd_size     = top_msg.len;
926       priority       = top_msg.priority;
927 
928       //Some cleanup to ease debugging
929       top_msg.len       = 0;
930       top_msg.priority  = 0;
931 
932       //Copy data to receiver's bufers
933       std::memcpy(buffer, top_msg.data(), recvd_size);
934 
935       //Free top message and put it in the free message list
936       p_hdr->free_top_msg();
937    }  //Lock end
938 
939    //Notify outside lock to avoid contention. This might produce some
940    //spurious wakeups, but it's usually far better than notifying inside.
941    //If this reception changes the queue full state, notify senders
942    #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
943    if (notify_blocked_senders){
944       p_hdr->m_cond_send.notify_one();
945    }
946    #else
947    p_hdr->m_cond_send.notify_one();
948    #endif
949 
950    return true;
951 }
952 
953 template<class VoidPointer>
get_max_msg() const954 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
955 {
956    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
957    return p_hdr ? p_hdr->m_max_num_msg : 0;  }
958 
959 template<class VoidPointer>
get_max_msg_size() const960 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
961 {
962    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
963    return p_hdr ? p_hdr->m_max_msg_size : 0;
964 }
965 
966 template<class VoidPointer>
get_num_msg() const967 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
968 {
969    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
970    if(p_hdr){
971       //---------------------------------------------
972       scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
973       //---------------------------------------------
974       return p_hdr->m_cur_num_msg;
975    }
976 
977    return 0;
978 }
979 
980 template<class VoidPointer>
remove(const char * name)981 inline bool message_queue_t<VoidPointer>::remove(const char *name)
982 {  return shared_memory_object::remove(name);  }
983 
984 #else
985 
986 //!Typedef for a default message queue
987 //!to be used between processes
988 typedef message_queue_t<offset_ptr<void> > message_queue;
989 
990 #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
991 
992 }} //namespace boost{  namespace interprocess{
993 
994 #include <boost/interprocess/detail/config_end.hpp>
995 
996 #endif   //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
997