1 /*
2 * SObjectizer-5
3 */
4
5 /*!
6 * \since
7 * v.5.5.13
8 *
9 * \file
10 * \brief Implementation details for message chains.
11 */
12
13 #pragma once
14
15 #include <so_5/mchain.hpp>
16 #include <so_5/mchain_select_ifaces.hpp>
17 #include <so_5/environment.hpp>
18
19 #include <so_5/ret_code.hpp>
20 #include <so_5/exception.hpp>
21 #include <so_5/error_logger.hpp>
22
23 #include <so_5/details/at_scope_exit.hpp>
24 #include <so_5/details/safe_cv_wait_for.hpp>
25
26 #include <deque>
27 #include <vector>
28 #include <mutex>
29 #include <condition_variable>
30
31 namespace so_5 {
32
33 namespace mchain_props {
34
35 namespace details {
36
37 //
38 // ensure_queue_not_empty
39 //
40 /*!
41 * \since
42 * v.5.5.13
43 *
44 * \brief Helper function which throws an exception if queue is empty.
45 */
46 template< typename Q >
47 void
ensure_queue_not_empty(Q && queue)48 ensure_queue_not_empty( Q && queue )
49 {
50 if( queue.is_empty() )
51 SO_5_THROW_EXCEPTION(
52 rc_msg_chain_is_empty,
53 "an attempt to get message from empty demand queue" );
54 }
55
56 //
57 // ensure_queue_not_full
58 //
59 /*!
60 * \since
61 * v.5.5.13
62 *
63 * \brief Helper function which throws an exception if queue is full.
64 */
65 template< typename Q >
66 void
ensure_queue_not_full(Q && queue)67 ensure_queue_not_full( Q && queue )
68 {
69 if( queue.is_full() )
70 SO_5_THROW_EXCEPTION(
71 rc_msg_chain_is_full,
72 "an attempt to push a message to full demand queue" );
73 }
74
75 //
76 // unlimited_demand_queue
77 //
78 /*!
79 * \since
80 * v.5.5.13
81 *
82 * \brief Implementation of demands queue for size-unlimited message chain.
83 */
84 class unlimited_demand_queue
85 {
86 public :
87 /*!
88 * \note This constructor is necessary just for a convinience.
89 */
unlimited_demand_queue(const capacity_t &)90 unlimited_demand_queue( const capacity_t & ) {}
91
92 //! Is queue full?
93 /*!
94 * \note Unlimited queue can't be null. Because of that this
95 * method always returns \a false.
96 */
97 bool
is_full() const98 is_full() const { return false; }
99
100 //! Is queue empty?
101 bool
is_empty() const102 is_empty() const { return m_queue.empty(); }
103
104 //! Access to front item from queue.
105 demand_t &
front()106 front()
107 {
108 ensure_queue_not_empty( *this );
109 return m_queue.front();
110 }
111
112 //! Remove the front item from queue.
113 void
pop_front()114 pop_front()
115 {
116 ensure_queue_not_empty( *this );
117 m_queue.pop_front();
118 }
119
120 //! Add a new item to the end of the queue.
121 void
push_back(demand_t && demand)122 push_back( demand_t && demand )
123 {
124 m_queue.push_back( std::move(demand) );
125 }
126
127 //! Size of the queue.
128 std::size_t
size() const129 size() const { return m_queue.size(); }
130
131 private :
132 //! Queue's storage.
133 std::deque< demand_t > m_queue;
134 };
135
136 //
137 // limited_dynamic_demand_queue
138 //
139 /*!
140 * \since
141 * v.5.5.13
142 *
143 * \brief Implementation of demands queue for size-limited message chain with
144 * dynamically allocated storage.
145 */
146 class limited_dynamic_demand_queue
147 {
148 public :
149 //! Initializing constructor.
limited_dynamic_demand_queue(const capacity_t & capacity)150 limited_dynamic_demand_queue(
151 const capacity_t & capacity )
152 : m_max_size{ capacity.max_size() }
153 {}
154
155 //! Is queue full?
156 bool
is_full() const157 is_full() const { return m_max_size == m_queue.size(); }
158
159 //! Is queue empty?
160 bool
is_empty() const161 is_empty() const { return m_queue.empty(); }
162
163 //! Access to front item from queue.
164 demand_t &
front()165 front()
166 {
167 ensure_queue_not_empty( *this );
168 return m_queue.front();
169 }
170
171 //! Remove the front item from queue.
172 void
pop_front()173 pop_front()
174 {
175 ensure_queue_not_empty( *this );
176 m_queue.pop_front();
177 }
178
179 //! Add a new item to the end of the queue.
180 void
push_back(demand_t && demand)181 push_back( demand_t && demand )
182 {
183 ensure_queue_not_full( *this );
184 m_queue.push_back( std::move(demand) );
185 }
186
187 //! Size of the queue.
188 std::size_t
size() const189 size() const { return m_queue.size(); }
190
191 private :
192 //! Queue's storage.
193 std::deque< demand_t > m_queue;
194 //! Maximum size of the queue.
195 const std::size_t m_max_size;
196 };
197
198 //
199 // limited_preallocated_demand_queue
200 //
201 /*!
202 * \since
203 * v.5.5.13
204 *
205 * \brief Implementation of demands queue for size-limited message chain with
206 * preallocated storage.
207 */
208 class limited_preallocated_demand_queue
209 {
210 public :
211 //! Initializing constructor.
limited_preallocated_demand_queue(const capacity_t & capacity)212 limited_preallocated_demand_queue(
213 const capacity_t & capacity )
214 : m_storage( capacity.max_size(), demand_t{} )
215 , m_max_size{ capacity.max_size() }
216 , m_head{ 0 }
217 , m_size{ 0 }
218 {}
219
220 //! Is queue full?
221 bool
is_full() const222 is_full() const { return m_max_size == m_size; }
223
224 //! Is queue empty?
225 bool
is_empty() const226 is_empty() const { return 0 == m_size; }
227
228 //! Access to front item from queue.
229 demand_t &
front()230 front()
231 {
232 ensure_queue_not_empty( *this );
233 return m_storage[ m_head ];
234 }
235
236 //! Remove the front item from queue.
237 void
pop_front()238 pop_front()
239 {
240 ensure_queue_not_empty( *this );
241 m_storage[ m_head ] = demand_t{};
242 m_head = (m_head + 1) % m_max_size;
243 --m_size;
244 }
245
246 //! Add a new item to the end of the queue.
247 void
push_back(demand_t && demand)248 push_back( demand_t && demand )
249 {
250 ensure_queue_not_full( *this );
251 auto index = (m_head + m_size) % m_max_size;
252 m_storage[ index ] = std::move(demand);
253 ++m_size;
254 }
255
256 //! Size of the queue.
257 std::size_t
size() const258 size() const { return m_size; }
259
260 private :
261 //! Queue's storage.
262 std::vector< demand_t > m_storage;
263 //! Maximum size of the queue.
264 const std::size_t m_max_size;
265
266 //! Index of the queue head.
267 std::size_t m_head;
268 //! The current size of the queue.
269 std::size_t m_size;
270 };
271
272 //
273 // status
274 //
275 /*!
276 * \since
277 * v.5.5.13
278 *
279 * \brief Status of the message chain.
280 */
281 enum class status
282 {
283 //! Bag is open and can be used for message sending.
284 open,
285 //! Bag is closed. New messages cannot be sent to it.
286 closed
287 };
288
289 } /* namespace details */
290
291 //
292 // mchain_template
293 //
294 /*!
295 * \since
296 * v.5.5.13
297 *
298 * \brief Template-based implementation of message chain.
299 *
300 * \tparam Queue type of demand queue for message chain.
301 * \tparam Tracing_Base type with message tracing implementation details.
302 */
303 template< typename Queue, typename Tracing_Base >
304 class mchain_template
305 : public abstract_message_chain_t
306 , private Tracing_Base
307 {
308 public :
309 //! Initializing constructor.
310 template< typename... Tracing_Args >
mchain_template(so_5::environment_t & env,mbox_id_t id,const mchain_params_t & params,Tracing_Args &&...tracing_args)311 mchain_template(
312 //! SObjectizer Environment for which message chain is created.
313 so_5::environment_t & env,
314 //! Mbox ID for this chain.
315 mbox_id_t id,
316 //! Chain parameters.
317 const mchain_params_t & params,
318 //! Arguments for Tracing_Base's constructor.
319 Tracing_Args &&... tracing_args )
320 : Tracing_Base( std::forward<Tracing_Args>(tracing_args)... )
321 , m_env( env )
322 , m_id( id )
323 , m_capacity( params.capacity() )
324 , m_not_empty_notificator( params.not_empty_notificator() )
325 , m_queue( params.capacity() )
326 {}
327
328 mbox_id_t
id() const329 id() const override
330 {
331 return m_id;
332 }
333
334 void
subscribe_event_handler(const std::type_index &,const so_5::message_limit::control_block_t *,agent_t &)335 subscribe_event_handler(
336 const std::type_index & /*msg_type*/,
337 const so_5::message_limit::control_block_t * /*limit*/,
338 agent_t & /*subscriber*/ ) override
339 {
340 SO_5_THROW_EXCEPTION(
341 rc_msg_chain_doesnt_support_subscriptions,
342 "mchain doesn't support subscription" );
343 }
344
345 void
unsubscribe_event_handlers(const std::type_index &,agent_t &)346 unsubscribe_event_handlers(
347 const std::type_index & /*msg_type*/,
348 agent_t & /*subscriber*/ ) override
349 {}
350
351 std::string
query_name() const352 query_name() const override
353 {
354 std::ostringstream s;
355 s << "<mchain:id=" << m_id << ">";
356
357 return s.str();
358 }
359
360 mbox_type_t
type() const361 type() const override
362 {
363 return mbox_type_t::multi_producer_single_consumer;
364 }
365
366 void
do_deliver_message(const std::type_index & msg_type,const message_ref_t & message,unsigned int)367 do_deliver_message(
368 const std::type_index & msg_type,
369 const message_ref_t & message,
370 unsigned int /*overlimit_reaction_deep*/ ) override
371 {
372 this->try_to_store_message_to_queue(
373 msg_type,
374 message );
375 }
376
377 /*!
378 * \attention Will throw an exception because delivery
379 * filter is not applicable to MPSC-mboxes.
380 */
381 void
set_delivery_filter(const std::type_index &,const delivery_filter_t &,agent_t &)382 set_delivery_filter(
383 const std::type_index & /*msg_type*/,
384 const delivery_filter_t & /*filter*/,
385 agent_t & /*subscriber*/ ) override
386 {
387 SO_5_THROW_EXCEPTION(
388 rc_msg_chain_doesnt_support_delivery_filters,
389 "set_delivery_filter is called for mchain" );
390 }
391
392 void
drop_delivery_filter(const std::type_index &,agent_t &)393 drop_delivery_filter(
394 const std::type_index & /*msg_type*/,
395 agent_t & /*subscriber*/ ) noexcept override
396 {}
397
398 [[nodiscard]]
399 extraction_status_t
extract(demand_t & dest,duration_t empty_queue_timeout)400 extract(
401 demand_t & dest,
402 duration_t empty_queue_timeout ) override
403 {
404 std::unique_lock< std::mutex > lock{ m_lock };
405
406 // If queue is empty we must wait for some time.
407 bool queue_empty = m_queue.is_empty();
408 if( queue_empty )
409 {
410 if( details::status::closed == m_status )
411 // Waiting for new messages has no sence because
412 // chain is closed.
413 return extraction_status_t::chain_closed;
414
415 auto predicate = [this, &queue_empty]() -> bool {
416 queue_empty = m_queue.is_empty();
417 return !queue_empty ||
418 details::status::closed == m_status;
419 };
420
421 // Count of sleeping thread must be incremented before
422 // going to sleep and decremented right after.
423 ++m_threads_to_wakeup;
424 auto decrement_threads = so_5::details::at_scope_exit(
425 [this] { --m_threads_to_wakeup; } );
426
427 // Wait until arrival of any message or closing of chain.
428 ::so_5::details::wait_for_big_interval(
429 lock,
430 m_underflow_cond,
431 empty_queue_timeout,
432 predicate );
433 }
434
435 // If queue is still empty nothing can be extracted and
436 // we must stop operation.
437 if( queue_empty )
438 return details::status::open == m_status ?
439 // The chain is still open so there must be this result
440 extraction_status_t::no_messages :
441 // The chain is closed and there must be different result
442 extraction_status_t::chain_closed;
443
444 return extract_demand_from_not_empty_queue( dest );
445 }
446
447 bool
empty() const448 empty() const override
449 {
450 return m_queue.is_empty();
451 }
452
453 std::size_t
size() const454 size() const override
455 {
456 return m_queue.size();
457 }
458
459 void
close(close_mode_t mode)460 close( close_mode_t mode ) override
461 {
462 std::lock_guard< std::mutex > lock{ m_lock };
463
464 if( details::status::closed == m_status )
465 return;
466
467 m_status = details::status::closed;
468
469 const bool was_full = m_queue.is_full();
470
471 if( close_mode_t::drop_content == mode )
472 {
473 while( !m_queue.is_empty() )
474 {
475 this->trace_demand_drop_on_close(
476 *this, m_queue.front() );
477 m_queue.pop_front();
478 }
479 }
480
481 // Since v.5.7.0 select operations must be notified
482 // always, even if the mchain is not empty.
483 notify_multi_chain_select_ops();
484
485 if( m_threads_to_wakeup )
486 // Someone is waiting on empty chain for new messages.
487 // It must be informed that no new messages will be here.
488 m_underflow_cond.notify_all();
489
490 if( was_full )
491 // Someone can wait on full chain for free place for new message.
492 // It must be informed that the chain is closed.
493 m_overflow_cond.notify_all();
494 }
495
496 environment_t &
environment() const497 environment() const noexcept override
498 {
499 return m_env;
500 }
501
502 protected :
503 [[nodiscard]]
504 extraction_status_t
extract(demand_t & dest,select_case_t & select_case)505 extract(
506 demand_t & dest,
507 select_case_t & select_case ) override
508 {
509 std::unique_lock< std::mutex > lock{ m_lock };
510
511 const bool queue_empty = m_queue.is_empty();
512 if( queue_empty )
513 {
514 if( details::status::closed == m_status )
515 // There is no need to wait for something.
516 return extraction_status_t::chain_closed;
517
518 // In other cases select_tail must be modified.
519 select_case.set_next( m_select_tail );
520 m_select_tail = &select_case;
521
522 return extraction_status_t::no_messages;
523 }
524 else
525 return extract_demand_from_not_empty_queue( dest );
526 }
527
528 [[nodiscard]]
529 mchain_props::push_status_t
push(const std::type_index & msg_type,const message_ref_t & message,mchain_props::select_case_t & select_case)530 push(
531 const std::type_index & msg_type,
532 const message_ref_t & message,
533 mchain_props::select_case_t & select_case ) override
534 {
535 typename Tracing_Base::deliver_op_tracer tracer{
536 *this, // as tracing base.
537 *this, // as chain.
538 msg_type,
539 message };
540
541 std::unique_lock< std::mutex > lock{ m_lock };
542
543 // Message cannot be stored to closed chain.
544 if( details::status::closed == m_status )
545 return mchain_props::push_status_t::chain_closed;
546
547 if( m_queue.is_full() )
548 {
549 // The select_case should be stored until there will
550 // be a free space in the chain (or chain will be closed).
551 select_case.set_next( m_select_tail );
552 m_select_tail = &select_case;
553
554 return mchain_props::push_status_t::deffered;
555 }
556 else
557 {
558 // Just store a new message to the queue.
559 complete_store_message_to_queue(
560 tracer,
561 msg_type,
562 message );
563 return mchain_props::push_status_t::stored;
564 }
565 }
566
567 void
remove_from_select(select_case_t & select_case)568 remove_from_select(
569 select_case_t & select_case ) noexcept override
570 {
571 std::lock_guard< std::mutex > lock{ m_lock };
572
573 select_case_t * c = m_select_tail;
574 select_case_t * prev = nullptr;
575 while( c )
576 {
577 select_case_t * const next = c->query_next();
578 if( c == &select_case )
579 {
580 if( prev )
581 prev->set_next( next );
582 else
583 m_select_tail = next;
584
585 return;
586 }
587
588 prev = c;
589 c = next;
590 }
591 }
592
593 void
do_deliver_message_from_timer(const std::type_index & msg_type,const message_ref_t & message)594 do_deliver_message_from_timer(
595 const std::type_index & msg_type,
596 const message_ref_t & message ) override
597 {
598 try_to_store_message_from_timer_to_queue(
599 msg_type,
600 message );
601 }
602
603 private :
604 //! SObjectizer Environment for which message chain is created.
605 environment_t & m_env;
606
607 //! Status of the chain.
608 details::status m_status = { details::status::open };
609
610 //! Mbox ID for chain.
611 const mbox_id_t m_id;
612
613 //! Chain capacity.
614 const capacity_t m_capacity;
615
616 //! Optional notificator for 'not_empty' condition.
617 const not_empty_notification_func_t m_not_empty_notificator;
618
619 //! Chain's demands queue.
620 Queue m_queue;
621
622 //! Chain's lock.
623 std::mutex m_lock;
624
625 //! Condition variable for waiting on empty queue.
626 std::condition_variable m_underflow_cond;
627 //! Condition variable for waiting on full queue.
628 std::condition_variable m_overflow_cond;
629
630 /*!
631 * \brief Count of threads sleeping on empty mchain.
632 *
633 * This value is incremented before sleeping on m_underflow_cond and
634 * decremented just after a return from this sleep.
635 *
636 * \since
637 * v.5.5.16
638 */
639 std::size_t m_threads_to_wakeup = { 0 };
640
641 /*!
642 * \brief A queue of multi-chain selects in which this chain is used.
643 *
644 * \since
645 * v.5.5.16
646 */
647 select_case_t * m_select_tail = nullptr;
648
649 //! Actual implementation of pushing message to the queue.
650 /*!
651 * \note
652 * This implementation must be used for ordinary delivery operations.
653 * For delivery operations from timer thread another method must be
654 * called (see try_to_store_message_from_timer_to_queue()).
655 */
656 void
try_to_store_message_to_queue(const std::type_index & msg_type,const message_ref_t & message)657 try_to_store_message_to_queue(
658 const std::type_index & msg_type,
659 const message_ref_t & message )
660 {
661 typename Tracing_Base::deliver_op_tracer tracer{
662 *this, // as tracing base.
663 *this, // as chain.
664 msg_type,
665 message };
666
667 std::unique_lock< std::mutex > lock{ m_lock };
668
669 // Message cannot be stored to closed chain.
670 if( details::status::closed == m_status )
671 return;
672
673 // If queue full and waiting on full queue is enabled we
674 // must wait for some time until there will be some space in
675 // the queue.
676 bool queue_full = m_queue.is_full();
677 if( queue_full && m_capacity.is_overflow_timeout_defined() )
678 {
679 ::so_5::details::wait_for_big_interval(
680 lock,
681 m_overflow_cond,
682 m_capacity.overflow_timeout(),
683 [this, &queue_full] {
684 queue_full = m_queue.is_full();
685 return !queue_full ||
686 details::status::closed == m_status;
687 } );
688
689 // Message cannot be stored to closed chain.
690 //
691 // NOTE: this additional check is necessary after
692 // wait for overflow_timeout because the chain can
693 // be closed during that wait.
694 if( details::status::closed == m_status )
695 return;
696 }
697
698 // If queue still full we must perform some reaction.
699 if( queue_full )
700 {
701 const auto reaction = m_capacity.overflow_reaction();
702 if( overflow_reaction_t::drop_newest == reaction )
703 {
704 // New message must be simply ignored.
705 tracer.overflow_drop_newest();
706 return;
707 }
708 else if( overflow_reaction_t::remove_oldest == reaction )
709 {
710 // The oldest message must be simply removed.
711 tracer.overflow_remove_oldest( m_queue.front() );
712 m_queue.pop_front();
713 }
714 else if( overflow_reaction_t::throw_exception == reaction )
715 {
716 tracer.overflow_throw_exception();
717 SO_5_THROW_EXCEPTION(
718 rc_msg_chain_overflow,
719 "an attempt to push message to full mchain "
720 "with overflow_reaction_t::throw_exception policy" );
721 }
722 else
723 {
724 so_5::details::abort_on_fatal_error( [&] {
725 tracer.overflow_throw_exception();
726 SO_5_LOG_ERROR( m_env, log_stream ) {
727 log_stream << "overflow_reaction_t::abort_app "
728 "will be performed for mchain (id="
729 << m_id << "), msg_type: "
730 << msg_type.name()
731 << ". Application will be aborted"
732 << std::endl;
733 }
734 } );
735 }
736 }
737
738 complete_store_message_to_queue(
739 tracer,
740 msg_type,
741 message );
742 }
743
744 /*!
745 * \brief An implementation of storing another message to
746 * chain for the case of delated/periodic messages.
747 *
748 * This implementation handles overloaded chains differently:
749 * - there is no waiting on overloaded chain (even if such waiting
750 * is specified in mchain params);
751 * - overflow_reaction_t::throw_exception is replaced by
752 * overflow_reaction_t::drop_newest.
753 *
754 * These defferences are necessary because the context of timer
755 * thread is very special: there can't be any long-time operation
756 * (like waiting for free space on overloaded chain) and there can't
757 * be an exception about mchain's overflow.
758 *
759 * \since
760 * v.5.5.18
761 */
762 void
try_to_store_message_from_timer_to_queue(const std::type_index & msg_type,const message_ref_t & message)763 try_to_store_message_from_timer_to_queue(
764 const std::type_index & msg_type,
765 const message_ref_t & message )
766 {
767 typename Tracing_Base::deliver_op_tracer tracer{
768 *this, // as tracing base.
769 *this, // as chain.
770 msg_type,
771 message };
772
773 std::unique_lock< std::mutex > lock{ m_lock };
774
775 // Message cannot be stored to closed chain.
776 if( details::status::closed == m_status )
777 return;
778
779 bool queue_full = m_queue.is_full();
780 // NOTE: there is no awaiting on full mchain.
781 // If queue full we must perform some reaction.
782 if( queue_full )
783 {
784 const auto reaction = m_capacity.overflow_reaction();
785 if( overflow_reaction_t::drop_newest == reaction ||
786 overflow_reaction_t::throw_exception == reaction )
787 {
788 // New message must be simply ignored.
789 tracer.overflow_drop_newest();
790 return;
791 }
792 else if( overflow_reaction_t::remove_oldest == reaction )
793 {
794 // The oldest message must be simply removed.
795 tracer.overflow_remove_oldest( m_queue.front() );
796 m_queue.pop_front();
797 }
798 else
799 {
800 so_5::details::abort_on_fatal_error( [&] {
801 tracer.overflow_throw_exception();
802 SO_5_LOG_ERROR( m_env, log_stream ) {
803 log_stream << "overflow_reaction_t::abort_app "
804 "will be performed for mchain (id="
805 << m_id << "), msg_type: "
806 << msg_type.name()
807 << ". Application will be aborted"
808 << std::endl;
809 }
810 } );
811 }
812 }
813
814 complete_store_message_to_queue(
815 tracer,
816 msg_type,
817 message );
818 }
819
820 /*!
821 * \brief Implementation of extract operation for the case when
822 * message queue is not empty.
823 *
824 * \attention This helper method must be called when chain object
825 * is locked in some hi-level method.
826 *
827 * \since
828 * v.5.5.16
829 */
830 extraction_status_t
extract_demand_from_not_empty_queue(demand_t & dest)831 extract_demand_from_not_empty_queue(
832 demand_t & dest )
833 {
834 // If queue was full then someone can wait on it.
835 const bool queue_was_full = m_queue.is_full();
836 dest = std::move( m_queue.front() );
837 m_queue.pop_front();
838
839 this->trace_extracted_demand( *this, dest );
840
841 if( queue_was_full )
842 {
843 // Since v.5.7.0 waiting select_cases should be
844 // notified too because they are send_cases.
845 notify_multi_chain_select_ops();
846
847 m_overflow_cond.notify_all();
848 }
849
850 return extraction_status_t::msg_extracted;
851 }
852
853 /*!
854 * \since
855 * v.5.5.16
856 */
857 void
notify_multi_chain_select_ops()858 notify_multi_chain_select_ops() noexcept
859 {
860 if( m_select_tail )
861 {
862 auto old = m_select_tail;
863 m_select_tail = nullptr;
864 old->notify();
865 }
866 }
867
868 /*!
869 * \brief A reusable method with implementation of
870 * last part of storing a message into chain.
871 *
872 * \note
873 * Intended to be called from try_to_store_message_to_queue()
874 * and try_to_store_message_from_timer_to_queue().
875 *
876 * \since
877 * v.5.5.18
878 */
879 void
complete_store_message_to_queue(typename Tracing_Base::deliver_op_tracer & tracer,const std::type_index & msg_type,const message_ref_t & message)880 complete_store_message_to_queue(
881 typename Tracing_Base::deliver_op_tracer & tracer,
882 const std::type_index & msg_type,
883 const message_ref_t & message )
884 {
885 const bool was_empty = m_queue.is_empty();
886
887 m_queue.push_back( demand_t{ msg_type, message } );
888
889 tracer.stored( m_queue );
890
891 // If chain was empty then multi-chain cases must be notified.
892 // And if not_empty_notificator is defined then it must be used too.
893 if( was_empty )
894 {
895 if( m_not_empty_notificator )
896 so_5::details::invoke_noexcept_code(
897 [this] { m_not_empty_notificator(); } );
898
899 notify_multi_chain_select_ops();
900 }
901
902 // Should be wake up some sleeping thread?
903 if( m_threads_to_wakeup && m_threads_to_wakeup >= m_queue.size() )
904 // Someone is waiting on empty queue.
905 m_underflow_cond.notify_one();
906 }
907 };
908
909 } /* namespace mchain_props */
910
911 } /* namespace so_5 */
912
913