1 /* 2 * Copyright Lingxi Li 2015. 3 * Copyright Andrey Semashev 2016. 4 * Distributed under the Boost Software License, Version 1.0. 5 * (See accompanying file LICENSE_1_0.txt or copy at 6 * http://www.boost.org/LICENSE_1_0.txt) 7 */ 8 /*! 9 * \file utility/ipc/reliable_message_queue.hpp 10 * \author Lingxi Li 11 * \author Andrey Semashev 12 * \date 01.01.2016 13 * 14 * The header contains declaration of a reliable interprocess message queue. 15 */ 16 17 #ifndef BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_ 18 #define BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_ 19 20 #include <boost/log/detail/config.hpp> 21 #include <cstddef> 22 #include <boost/cstdint.hpp> 23 #include <boost/move/core.hpp> 24 #include <boost/log/keywords/open_mode.hpp> 25 #include <boost/log/keywords/name.hpp> 26 #include <boost/log/keywords/capacity.hpp> 27 #include <boost/log/keywords/block_size.hpp> 28 #include <boost/log/keywords/overflow_policy.hpp> 29 #include <boost/log/keywords/permissions.hpp> 30 #include <boost/log/utility/open_mode.hpp> 31 #include <boost/log/utility/permissions.hpp> 32 #include <boost/log/utility/ipc/object_name.hpp> 33 #include <boost/log/detail/parameter_tools.hpp> 34 #include <boost/log/detail/header.hpp> 35 36 #ifdef BOOST_HAS_PRAGMA_ONCE 37 #pragma once 38 #endif 39 40 namespace boost { 41 42 BOOST_LOG_OPEN_NAMESPACE 43 44 namespace ipc { 45 46 namespace aux { 47 48 template< typename T, typename R > 49 struct enable_if_byte {}; 50 template< typename R > 51 struct enable_if_byte< char, R > { typedef R type; }; 52 template< typename R > 53 struct enable_if_byte< signed char, R > { typedef R type; }; 54 template< typename R > 55 struct enable_if_byte< unsigned char, R > { typedef R type; }; 56 57 } // namespace aux 58 59 /*! 60 * \brief A reliable interprocess message queue 61 * 62 * The queue implements a reliable one-way channel of passing messages from one or multiple writers to a single reader. 63 * The format of the messages is user-defined and must be consistent across all writers and the reader. The queue does 64 * not enforce any specific format of the messages, other than they should be supplied as a contiguous array of bytes. 65 * 66 * The queue internally uses a process-shared storage identified by an \c object_name (the queue name). Refer to \c object_name 67 * documentation for details on restrictions imposed on object names. 68 * 69 * The queue storage is organized as a fixed number of blocks of a fixed size. The block size must be an integer power of 2 and 70 * is expressed in bytes. Each written message, together with some metadata added by the queue, consumes an integer number 71 * of blocks. Each read message received by the reader releases the blocks allocated for that message. As such the maximum size 72 * of a message is slightly less than block size times capacity of the queue. For efficiency, it is recommended to choose 73 * block size large enough to accommodate most of the messages to be passed through the queue. 74 * 75 * The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point 76 * of enqueueing a message when there is not enough free blocks to accommodate the message. 77 * 78 * The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the 79 * case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is 80 * full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at 81 * the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies. 82 * 83 * If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer. 84 * 85 * A blocked reader or writer can be unblocked by calling \c stop_local. After this method is called, all threads blocked on 86 * this particular object are released and return \c operation_result::aborted. The other instances of the queue (in the current 87 * or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the \c stop_local 88 * call the user has to invoke \c reset_local. 89 * 90 * The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a 91 * particular writer thread will be received in the order of sending. 92 * 93 * Methods of this class are not thread-safe, unless otherwise specified. 94 */ 95 class reliable_message_queue 96 { 97 public: 98 //! Result codes for various operations on the queue 99 enum operation_result 100 { 101 succeeded, //!< The operation has completed successfully 102 no_space, //!< The message could not be sent because the queue is full 103 aborted //!< The operation has been aborted because the queue method <tt>stop_local()</tt> has been called 104 }; 105 106 //! Interprocess queue overflow policies 107 enum overflow_policy 108 { 109 //! Block the send operation when the queue is full 110 block_on_overflow, 111 //! Return \c operation_result::no_space when the queue is full 112 fail_on_overflow, 113 //! Throw \c capacity_limit_reached exception when the queue is full 114 throw_on_overflow 115 }; 116 117 //! Queue message size type 118 typedef uint32_t size_type; 119 120 #if !defined(BOOST_LOG_DOXYGEN_PASS) 121 122 BOOST_MOVABLE_BUT_NOT_COPYABLE(reliable_message_queue) 123 124 private: 125 typedef void (*receive_handler)(void* state, const void* data, size_type size); 126 127 struct fixed_buffer_state 128 { 129 uint8_t* data; 130 size_type size; 131 }; 132 133 struct implementation; 134 implementation* m_impl; 135 136 #endif // !defined(BOOST_LOG_DOXYGEN_PASS) 137 138 public: 139 /*! 140 * Default constructor. The method constructs an object that is not associated with any 141 * message queue. 142 * 143 * \post <tt>is_open() == false</tt> 144 */ reliable_message_queue()145 BOOST_CONSTEXPR reliable_message_queue() BOOST_NOEXCEPT : m_impl(NULL) 146 { 147 } 148 149 /*! 150 * Constructor. The method is used to construct an object and create the associated 151 * message queue. The constructed object will be in running state if the message queue is 152 * successfully created. 153 * 154 * \post <tt>is_open() == true</tt> 155 * 156 * \param name Name of the message queue to be associated with. 157 * \param capacity Maximum number of allocation blocks the queue can hold. 158 * \param block_size Size in bytes of allocation block. Must be a power of 2. 159 * \param oflow_policy Queue behavior policy in case of overflow. 160 * \param perms Access permissions for the associated message queue. 161 */ reliable_message_queue(open_mode::create_only_tag,object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())162 reliable_message_queue 163 ( 164 open_mode::create_only_tag, 165 object_name const& name, 166 uint32_t capacity, 167 size_type block_size, 168 overflow_policy oflow_policy = block_on_overflow, 169 permissions const& perms = permissions() 170 ) : 171 m_impl(NULL) 172 { 173 this->create(name, capacity, block_size, oflow_policy, perms); 174 } 175 176 /*! 177 * Constructor. The method is used to construct an object and create or open the associated 178 * message queue. The constructed object will be in running state if the message queue is 179 * successfully created or opened. If the message queue that is identified by the name already 180 * exists then the other queue parameters are ignored. The actual queue parameters can be obtained 181 * with accessors from the constructed object. 182 * 183 * \post <tt>is_open() == true</tt> 184 * 185 * \param name Name of the message queue to be associated with. 186 * \param capacity Maximum number of allocation blocks the queue can hold. 187 * \param block_size Size in bytes of allocation block. Must be a power of 2. 188 * \param oflow_policy Queue behavior policy in case of overflow. 189 * \param perms Access permissions for the associated message queue. 190 */ reliable_message_queue(open_mode::open_or_create_tag,object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())191 reliable_message_queue 192 ( 193 open_mode::open_or_create_tag, 194 object_name const& name, 195 uint32_t capacity, 196 size_type block_size, 197 overflow_policy oflow_policy = block_on_overflow, 198 permissions const& perms = permissions() 199 ) : 200 m_impl(NULL) 201 { 202 this->open_or_create(name, capacity, block_size, oflow_policy, perms); 203 } 204 205 /*! 206 * Constructor. The method is used to construct an object and open the existing 207 * message queue. The constructed object will be in running state if the message queue is 208 * successfully opened. 209 * 210 * \post <tt>is_open() == true</tt> 211 * 212 * \param name Name of the message queue to be associated with. 213 * \param oflow_policy Queue behavior policy in case of overflow. 214 * \param perms Access permissions for the associated message queue. The permissions will only be used 215 * if the queue implementation has to create system objects while operating. 216 * This parameter is currently not used on POSIX systems. 217 */ reliable_message_queue(open_mode::open_only_tag,object_name const & name,overflow_policy oflow_policy=block_on_overflow,permissions const & perms=permissions ())218 reliable_message_queue 219 ( 220 open_mode::open_only_tag, 221 object_name const& name, 222 overflow_policy oflow_policy = block_on_overflow, 223 permissions const& perms = permissions() 224 ) : 225 m_impl(NULL) 226 { 227 this->open(name, oflow_policy, perms); 228 } 229 230 /*! 231 * Constructor with named parameters. The method is used to construct an object and create or open 232 * the associated message queue. The constructed object will be in running state if the message queue is 233 * successfully created. 234 * 235 * The following named parameters are accepted: 236 * 237 * * open_mode - One of the open mode tags: \c open_mode::create_only, \c open_mode::open_only or 238 * \c open_mode::open_or_create. 239 * * name - Name of the message queue to be associated with. 240 * * capacity - Maximum number of allocation blocks the queue can hold. Used only if the queue is created. 241 * * block_size - Size in bytes of allocation block. Must be a power of 2. Used only if the queue is created. 242 * * overflow_policy - Queue behavior policy in case of overflow, see \c overflow_policy. 243 * * permissions - Access permissions for the associated message queue. 244 * 245 * \post <tt>is_open() == true</tt> 246 */ 247 #if !defined(BOOST_LOG_DOXYGEN_PASS) BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_CALL(reliable_message_queue,construct)248 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_CALL(reliable_message_queue, construct) 249 #else 250 template< typename... Args > 251 explicit reliable_message_queue(Args const&... args); 252 #endif 253 254 /*! 255 * Destructor. Calls <tt>close()</tt>. 256 */ 257 ~reliable_message_queue() BOOST_NOEXCEPT 258 { 259 this->close(); 260 } 261 262 /*! 263 * Move constructor. The method move-constructs an object from \c other. After 264 * the call, the constructed object becomes \c other, while \c other is left in 265 * default constructed state. 266 * 267 * \param that The object to be moved. 268 */ reliable_message_queue(BOOST_RV_REF (reliable_message_queue)that)269 reliable_message_queue(BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT : 270 m_impl(that.m_impl) 271 { 272 that.m_impl = NULL; 273 } 274 275 /*! 276 * Move assignment operator. If the object is associated with a message queue, 277 * <tt>close()</tt> is first called and the precondition to calling <tt>close()</tt> 278 * applies. After the call, the object becomes \a that while \a that is left 279 * in default constructed state. 280 * 281 * \param that The object to be moved. 282 * 283 * \return A reference to the assigned object. 284 */ operator =(BOOST_RV_REF (reliable_message_queue)that)285 reliable_message_queue& operator= (BOOST_RV_REF(reliable_message_queue) that) BOOST_NOEXCEPT 286 { 287 reliable_message_queue other(static_cast< BOOST_RV_REF(reliable_message_queue) >(that)); 288 this->swap(other); 289 return *this; 290 } 291 292 /*! 293 * The method swaps the object with \a that. 294 * 295 * \param that The other object to swap with. 296 */ swap(reliable_message_queue & that)297 void swap(reliable_message_queue& that) BOOST_NOEXCEPT 298 { 299 implementation* p = m_impl; 300 m_impl = that.m_impl; 301 that.m_impl = p; 302 } 303 304 //! Swaps the two \c reliable_message_queue objects. swap(reliable_message_queue & a,reliable_message_queue & b)305 friend void swap(reliable_message_queue& a, reliable_message_queue& b) BOOST_NOEXCEPT 306 { 307 a.swap(b); 308 } 309 310 /*! 311 * The method creates the message queue to be associated with the object. After the call, 312 * the object will be in running state if a message queue is successfully created. 313 * 314 * \pre <tt>is_open() == false</tt> 315 * \post <tt>is_open() == true</tt> 316 * 317 * \param name Name of the message queue to be associated with. 318 * \param capacity Maximum number of allocation blocks the queue can hold. 319 * \param block_size Size in bytes of allocation block. Must be a power of 2. 320 * \param oflow_policy Queue behavior policy in case of overflow. 321 * \param perms Access permissions for the associated message queue. 322 */ 323 BOOST_LOG_API void create 324 ( 325 object_name const& name, 326 uint32_t capacity, 327 size_type block_size, 328 overflow_policy oflow_policy = block_on_overflow, 329 permissions const& perms = permissions() 330 ); 331 332 /*! 333 * The method creates or opens the message queue to be associated with the object. 334 * After the call, the object will be in running state if a message queue is successfully 335 * created or opened. If the message queue that is identified by the name already exists then 336 * the other queue parameters are ignored. The actual queue parameters can be obtained 337 * with accessors from this object after this method returns. 338 * 339 * \pre <tt>is_open() == false</tt> 340 * \post <tt>is_open() == true</tt> 341 * 342 * \param name Name of the message queue to be associated with. 343 * \param capacity Maximum number of allocation blocks the queue can hold. 344 * \param block_size Size in bytes of allocation block. Must be a power of 2. 345 * \param oflow_policy Queue behavior policy in case of overflow. 346 * \param perms Access permissions for the associated message queue. 347 */ 348 BOOST_LOG_API void open_or_create 349 ( 350 object_name const& name, 351 uint32_t capacity, 352 size_type block_size, 353 overflow_policy oflow_policy = block_on_overflow, 354 permissions const& perms = permissions() 355 ); 356 357 /*! 358 * The method opens the existing message queue to be associated with the object. 359 * After the call, the object will be in running state if a message queue is successfully 360 * opened. 361 * 362 * \pre <tt>is_open() == false</tt> 363 * \post <tt>is_open() == true</tt> 364 * 365 * \param name Name of the message queue to be associated with. 366 * \param oflow_policy Queue behavior policy in case of overflow. 367 * \param perms Access permissions for the associated message queue. The permissions will only be used 368 * if the queue implementation has to create system objects while operating. 369 * This parameter is currently not used on POSIX systems. 370 */ 371 BOOST_LOG_API void open 372 ( 373 object_name const& name, 374 overflow_policy oflow_policy = block_on_overflow, 375 permissions const& perms = permissions() 376 ); 377 378 /*! 379 * Tests whether the object is associated with any message queue. 380 * 381 * \return \c true if the object is associated with a message queue, and \c false otherwise. 382 */ is_open() const383 bool is_open() const BOOST_NOEXCEPT 384 { 385 return m_impl != NULL; 386 } 387 388 /*! 389 * This method empties the associated message queue. Concurrent calls to this method, <tt>send()</tt>, 390 * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, and <tt>stop_local()</tt> are allowed. 391 * 392 * \pre <tt>is_open() == true</tt> 393 */ 394 BOOST_LOG_API void clear(); 395 396 /*! 397 * The method returns the name of the associated message queue. 398 * 399 * \pre <tt>is_open() == true</tt> 400 * 401 * \return Name of the associated message queue 402 */ 403 BOOST_LOG_API object_name const& name() const; 404 405 /*! 406 * The method returns the maximum number of allocation blocks the associated message queue 407 * can hold. Note that the returned value may be different from the corresponding 408 * value passed to the constructor or <tt>open_or_create()</tt>, for the message queue may 409 * not have been created by this object. 410 * 411 * \pre <tt>is_open() == true</tt> 412 * 413 * \return Maximum number of allocation blocks the associated message queue can hold. 414 */ 415 BOOST_LOG_API uint32_t capacity() const; 416 417 /*! 418 * The method returns the allocation block size, in bytes. Each message in the 419 * associated message queue consumes an integer number of allocation blocks. 420 * Note that the returned value may be different from the corresponding value passed 421 * to the constructor or <tt>open_or_create()</tt>, for the message queue may not 422 * have been created by this object. 423 * 424 * \pre <tt>is_open() == true</tt> 425 * 426 * \return Allocation block size, in bytes. 427 */ 428 BOOST_LOG_API size_type block_size() const; 429 430 /*! 431 * The method wakes up all threads that are blocked in calls to <tt>send()</tt> or 432 * <tt>receive()</tt>. Those calls would then return <tt>operation_result::aborted</tt>. 433 * Note that, the method does not block until the woken-up threads have actually 434 * returned from <tt>send()</tt> or <tt>receive()</tt>. Other means is needed to ensure 435 * that calls to <tt>send()</tt> or <tt>receive()</tt> have returned, e.g., joining the 436 * threads that might be blocking on the calls. 437 * 438 * The method also puts the object in stopped state. When in stopped state, calls to 439 * <tt>send()</tt> or <tt>receive()</tt> will return immediately with return value 440 * <tt>operation_result::aborted</tt> when they would otherwise block in running state. 441 * 442 * Concurrent calls to this method, <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 443 * <tt>try_receive()</tt>, and <tt>clear()</tt> are allowed. 444 * 445 * \pre <tt>is_open() == true</tt> 446 */ 447 BOOST_LOG_API void stop_local(); 448 449 /*! 450 * The method puts the object in running state where calls to <tt>send()</tt> or 451 * <tt>receive()</tt> may block. This method is not thread-safe. 452 * 453 * \pre <tt>is_open() == true</tt> 454 */ 455 BOOST_LOG_API void reset_local(); 456 457 /*! 458 * The method disassociates the associated message queue, if any. No other threads 459 * should be using this object before calling this method. The <tt>stop_local()</tt> method 460 * can be used to have any threads currently blocked in <tt>send()</tt> or 461 * <tt>receive()</tt> return, and prevent further calls to them from blocking. Typically, 462 * before calling this method, one would first call <tt>stop_local()</tt> and then join all 463 * threads that might be blocking on <tt>send()</tt> or <tt>receive()</tt> to ensure that 464 * they have returned from the calls. The associated message queue is destroyed if the 465 * object represents the last outstanding reference to it. 466 * 467 * \post <tt>is_open() == false</tt> 468 */ close()469 void close() BOOST_NOEXCEPT 470 { 471 if (is_open()) 472 do_close(); 473 } 474 475 /*! 476 * The method sends a message to the associated message queue. When the object is in 477 * running state and the queue has no free space for the message, the method either blocks 478 * or throws an exception, depending on the overflow policy that was specified on the queue 479 * opening/creation. If blocking policy is in effect, the blocking can be interrupted by 480 * calling <tt>stop_local()</tt>, in which case the method returns \c operation_result::aborted. 481 * When the object is already in the stopped state, the method does not block but returns 482 * immediately with return value \c operation_result::aborted. 483 * 484 * It is possible to send an empty message by passing \c 0 to the parameter \c message_size. 485 * 486 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, 487 * <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 488 * 489 * \pre <tt>is_open() == true</tt> 490 * 491 * \param message_data The message data to send. Ignored when \c message_size is \c 0. 492 * \param message_size Size of the message data in bytes. If the size is larger than 493 * the associated message queue capacity, an <tt>std::logic_error</tt> exception is thrown. 494 * 495 * \retval operation_result::succeeded if the operation is successful 496 * \retval operation_result::no_space if \c overflow_policy::fail_on_overflow is in effect and the queue is full 497 * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt> 498 * 499 * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue 500 * capacity, <tt>system_error</tt> in case if a native OS method fails. 501 */ 502 BOOST_LOG_API operation_result send(void const* message_data, size_type message_size); 503 504 /*! 505 * The method performs an attempt to send a message to the associated message queue. 506 * The method is non-blocking, and always returns immediately. 507 * <tt>boost::system::system_error</tt> is thrown for errors resulting from native 508 * operating system calls. Note that it is possible to send an empty message by passing 509 * \c 0 to the parameter \c message_size. Concurrent calls to <tt>send()</tt>, 510 * <tt>try_send()</tt>, <tt>receive()</tt>, <tt>try_receive()</tt>, <tt>stop_local()</tt>, 511 * and <tt>clear()</tt> are allowed. 512 * 513 * \pre <tt>is_open() == true</tt> 514 * 515 * \param message_data The message data to send. Ignored when \c message_size is \c 0. 516 * \param message_size Size of the message data in bytes. If the size is larger than the 517 * maximum size allowed by the associated message queue, an 518 * <tt>std::logic_error</tt> exception is thrown. 519 * 520 * \return \c true if the message is successfully sent, and \c false otherwise (e.g., 521 * when the queue is full). 522 * 523 * <b>Throws:</b> <tt>std::logic_error</tt> in case if the message size exceeds the queue 524 * capacity, <tt>system_error</tt> in case if a native OS method fails. 525 */ 526 BOOST_LOG_API bool try_send(void const* message_data, size_type message_size); 527 528 /*! 529 * The method takes a message from the associated message queue. When the object is in 530 * running state and the queue is empty, the method blocks. The blocking is interrupted 531 * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted. 532 * When the object is already in the stopped state and the queue is empty, the method 533 * does not block but returns immediately with return value \c operation_result::aborted. 534 * 535 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 536 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 537 * 538 * \pre <tt>is_open() == true</tt> 539 * 540 * \param buffer The memory buffer to store the received message in. 541 * \param buffer_size The size of the buffer, in bytes. 542 * \param message_size Receives the size of the received message, in bytes. 543 * 544 * \retval operation_result::succeeded if the operation is successful 545 * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt> 546 */ receive(void * buffer,size_type buffer_size,size_type & message_size)547 operation_result receive(void* buffer, size_type buffer_size, size_type& message_size) 548 { 549 fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size }; 550 operation_result result = do_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state); 551 message_size = buffer_size - state.size; 552 return result; 553 } 554 555 /*! 556 * The method takes a message from the associated message queue. When the object is in 557 * running state and the queue is empty, the method blocks. The blocking is interrupted 558 * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted. 559 * When the object is already in the stopped state and the queue is empty, the method 560 * does not block but returns immediately with return value \c operation_result::aborted. 561 * 562 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 563 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 564 * 565 * \pre <tt>is_open() == true</tt> 566 * 567 * \param buffer The memory buffer to store the received message in. 568 * \param message_size Receives the size of the received message, in bytes. 569 * 570 * \retval operation_result::succeeded if the operation is successful 571 * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt> 572 */ 573 template< typename ElementT, size_type SizeV > 574 #if !defined(BOOST_LOG_DOXYGEN_PASS) 575 typename aux::enable_if_byte< ElementT, operation_result >::type 576 #else 577 operation_result 578 #endif receive(ElementT (& buffer)[SizeV],size_type & message_size)579 receive(ElementT (&buffer)[SizeV], size_type& message_size) 580 { 581 return receive(buffer, SizeV, message_size); 582 } 583 584 /*! 585 * The method takes a message from the associated message queue. When the object is in 586 * running state and the queue is empty, the method blocks. The blocking is interrupted 587 * when <tt>stop_local()</tt> is called, in which case the method returns \c operation_result::aborted. 588 * When the object is already in the stopped state and the queue is empty, the method 589 * does not block but returns immediately with return value \c operation_result::aborted. 590 * 591 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 592 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 593 * 594 * \pre <tt>is_open() == true</tt> 595 * 596 * \param container The container to store the received message in. The container should have 597 * value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt> 598 * and support inserting elements at the end. 599 * 600 * \retval operation_result::succeeded if the operation is successful 601 * \retval operation_result::aborted if the call was interrupted by <tt>stop_local()</tt> 602 */ 603 template< typename ContainerT > 604 #if !defined(BOOST_LOG_DOXYGEN_PASS) 605 typename aux::enable_if_byte< typename ContainerT::value_type, operation_result >::type 606 #else 607 operation_result 608 #endif receive(ContainerT & container)609 receive(ContainerT& container) 610 { 611 return do_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container); 612 } 613 614 /*! 615 * The method performs an attempt to take a message from the associated message queue. The 616 * method is non-blocking, and always returns immediately. 617 * 618 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 619 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 620 * 621 * \pre <tt>is_open() == true</tt> 622 * 623 * \param buffer The memory buffer to store the received message in. 624 * \param buffer_size The size of the buffer, in bytes. 625 * \param message_size Receives the size of the received message, in bytes. 626 * 627 * \return \c true if a message is successfully received, and \c false otherwise (e.g., 628 * when the queue is empty). 629 */ try_receive(void * buffer,size_type buffer_size,size_type & message_size)630 bool try_receive(void* buffer, size_type buffer_size, size_type& message_size) 631 { 632 fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size }; 633 bool result = do_try_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state); 634 message_size = buffer_size - state.size; 635 return result; 636 } 637 638 /*! 639 * The method performs an attempt to take a message from the associated message queue. The 640 * method is non-blocking, and always returns immediately. 641 * 642 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 643 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 644 * 645 * \pre <tt>is_open() == true</tt> 646 * 647 * \param buffer The memory buffer to store the received message in. 648 * \param message_size Receives the size of the received message, in bytes. 649 * 650 * \return \c true if a message is successfully received, and \c false otherwise (e.g., 651 * when the queue is empty). 652 */ 653 template< typename ElementT, size_type SizeV > 654 #if !defined(BOOST_LOG_DOXYGEN_PASS) 655 typename aux::enable_if_byte< ElementT, bool >::type 656 #else 657 bool 658 #endif try_receive(ElementT (& buffer)[SizeV],size_type & message_size)659 try_receive(ElementT (&buffer)[SizeV], size_type& message_size) 660 { 661 return try_receive(buffer, SizeV, message_size); 662 } 663 664 /*! 665 * The method performs an attempt to take a message from the associated message queue. The 666 * method is non-blocking, and always returns immediately. 667 * 668 * Concurrent calls to <tt>send()</tt>, <tt>try_send()</tt>, <tt>receive()</tt>, 669 * <tt>try_receive()</tt>, <tt>stop_local()</tt>, and <tt>clear()</tt> are allowed. 670 * 671 * \pre <tt>is_open() == true</tt> 672 * 673 * \param container The container to store the received message in. The container should have 674 * value type of <tt>char</tt>, <tt>signed char</tt> or <tt>unsigned char</tt> 675 * and support inserting elements at the end. 676 * 677 * \return \c true if a message is successfully received, and \c false otherwise (e.g., 678 * when the queue is empty). 679 */ 680 template< typename ContainerT > 681 #if !defined(BOOST_LOG_DOXYGEN_PASS) 682 typename aux::enable_if_byte< typename ContainerT::value_type, bool >::type 683 #else 684 bool 685 #endif try_receive(ContainerT & container)686 try_receive(ContainerT& container) 687 { 688 return do_try_receive(&reliable_message_queue::container_receive_handler< ContainerT >, &container); 689 } 690 691 /*! 692 * The method frees system-wide resources, associated with the interprocess queue with the supplied name. 693 * The queue referred to by the specified name must not be opened in any process at the point of this call. 694 * After this call succeeds a new queue with the specified name can be created. 695 * 696 * This call can be useful to recover from an earlier process misbehavior (e.g. a crash without properly 697 * closing the message queue). In this case resources allocated for the interprocess queue may remain 698 * allocated after the last process closed the queue, which in turn may prevent creating a new queue with 699 * the same name. By calling this method before creating a queue the application can attempt to ensure 700 * it starts with a clean slate. 701 * 702 * On some platforms resources associated with the queue are automatically reclaimed by the operating system 703 * when the last process using those resources terminates (even if it terminates abnormally). On these 704 * platforms this call may be a no-op. However, portable code should still call this method at appropriate 705 * places to ensure compatibility with other platforms and future library versions, which may change implementation 706 * of the queue. 707 * 708 * \param name Name of the message queue to be removed. 709 */ 710 static BOOST_LOG_API void remove(object_name const& name); 711 712 #if !defined(BOOST_LOG_DOXYGEN_PASS) 713 private: 714 //! Implementation of the constructor with named arguments 715 template< typename ArgsT > construct(ArgsT const & args)716 void construct(ArgsT const& args) 717 { 718 m_impl = NULL; 719 construct_dispatch(args[keywords::open_mode], args); 720 } 721 722 //! Implementation of the constructor with named arguments 723 template< typename ArgsT > construct_dispatch(open_mode::create_only_tag,ArgsT const & args)724 void construct_dispatch(open_mode::create_only_tag, ArgsT const& args) 725 { 726 this->create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]); 727 } 728 729 //! Implementation of the constructor with named arguments 730 template< typename ArgsT > construct_dispatch(open_mode::open_or_create_tag,ArgsT const & args)731 void construct_dispatch(open_mode::open_or_create_tag, ArgsT const& args) 732 { 733 this->open_or_create(args[keywords::name], args[keywords::capacity], args[keywords::block_size], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]); 734 } 735 736 //! Implementation of the constructor with named arguments 737 template< typename ArgsT > construct_dispatch(open_mode::open_only_tag,ArgsT const & args)738 void construct_dispatch(open_mode::open_only_tag, ArgsT const& args) 739 { 740 this->open(args[keywords::name], args[keywords::overflow_policy | block_on_overflow], args[keywords::permissions | permissions()]); 741 } 742 743 //! Closes the message queue, if it's open 744 BOOST_LOG_API void do_close() BOOST_NOEXCEPT; 745 746 //! Receives the message from the queue and calls the handler to place the data in the user's storage 747 BOOST_LOG_API operation_result do_receive(receive_handler handler, void* state); 748 //! Attempts to receives the message from the queue and calls the handler to place the data in the user's storage 749 BOOST_LOG_API bool do_try_receive(receive_handler handler, void* state); 750 751 //! Fixed buffer receive handler 752 static BOOST_LOG_API void fixed_buffer_receive_handler(void* state, const void* data, size_type size); 753 //! Receive handler for a container 754 template< typename ContainerT > container_receive_handler(void * state,const void * data,size_type size)755 static void container_receive_handler(void* state, const void* data, size_type size) 756 { 757 ContainerT* const container = static_cast< ContainerT* >(state); 758 container->insert 759 ( 760 container->end(), 761 static_cast< typename ContainerT::value_type const* >(data), 762 static_cast< typename ContainerT::value_type const* >(data) + size 763 ); 764 } 765 #endif 766 }; 767 768 } // namespace ipc 769 770 BOOST_LOG_CLOSE_NAMESPACE // namespace log 771 772 } // namespace boost 773 774 #include <boost/log/detail/footer.hpp> 775 776 #endif // BOOST_LOG_UTILITY_IPC_RELIABLE_MESSAGE_QUEUE_HPP_INCLUDED_ 777