1 /*
2  * SObjectizer-5
3  */
4 
5 /*!
6  * \since
7  * v.5.5.13
8  *
9  * \file
10  * \brief Implementation details for message chains.
11  */
12 
13 #pragma once
14 
15 #include <so_5/mchain.hpp>
16 #include <so_5/mchain_select_ifaces.hpp>
17 #include <so_5/environment.hpp>
18 
19 #include <so_5/ret_code.hpp>
20 #include <so_5/exception.hpp>
21 #include <so_5/error_logger.hpp>
22 
23 #include <so_5/details/at_scope_exit.hpp>
24 #include <so_5/details/safe_cv_wait_for.hpp>
25 
26 #include <deque>
27 #include <vector>
28 #include <mutex>
29 #include <condition_variable>
30 
31 namespace so_5 {
32 
33 namespace mchain_props {
34 
35 namespace details {
36 
37 //
38 // ensure_queue_not_empty
39 //
40 /*!
41  * \since
42  * v.5.5.13
43  *
44  * \brief Helper function which throws an exception if queue is empty.
45  */
46 template< typename Q >
47 void
ensure_queue_not_empty(Q && queue)48 ensure_queue_not_empty( Q && queue )
49 	{
50 		if( queue.is_empty() )
51 			SO_5_THROW_EXCEPTION(
52 					rc_msg_chain_is_empty,
53 					"an attempt to get message from empty demand queue" );
54 	}
55 
56 //
57 // ensure_queue_not_full
58 //
59 /*!
60  * \since
61  * v.5.5.13
62  *
63  * \brief Helper function which throws an exception if queue is full.
64  */
65 template< typename Q >
66 void
ensure_queue_not_full(Q && queue)67 ensure_queue_not_full( Q && queue )
68 	{
69 		if( queue.is_full() )
70 			SO_5_THROW_EXCEPTION(
71 					rc_msg_chain_is_full,
72 					"an attempt to push a message to full demand queue" );
73 	}
74 
75 //
76 // unlimited_demand_queue
77 //
78 /*!
79  * \since
80  * v.5.5.13
81  *
82  * \brief Implementation of demands queue for size-unlimited message chain.
83  */
84 class unlimited_demand_queue
85 	{
86 	public :
87 		/*!
88 		 * \note This constructor is necessary just for a convinience.
89 		 */
unlimited_demand_queue(const capacity_t &)90 		unlimited_demand_queue( const capacity_t & ) {}
91 
92 		//! Is queue full?
93 		/*!
94 		 * \note Unlimited queue can't be null. Because of that this
95 		 * method always returns \a false.
96 		 */
97 		bool
is_full() const98 		is_full() const { return false; }
99 
100 		//! Is queue empty?
101 		bool
is_empty() const102 		is_empty() const { return m_queue.empty(); }
103 
104 		//! Access to front item from queue.
105 		demand_t &
front()106 		front()
107 			{
108 				ensure_queue_not_empty( *this );
109 				return m_queue.front();
110 			}
111 
112 		//! Remove the front item from queue.
113 		void
pop_front()114 		pop_front()
115 			{
116 				ensure_queue_not_empty( *this );
117 				m_queue.pop_front();
118 			}
119 
120 		//! Add a new item to the end of the queue.
121 		void
push_back(demand_t && demand)122 		push_back( demand_t && demand )
123 			{
124 				m_queue.push_back( std::move(demand) );
125 			}
126 
127 		//! Size of the queue.
128 		std::size_t
size() const129 		size() const { return m_queue.size(); }
130 
131 	private :
132 		//! Queue's storage.
133 		std::deque< demand_t > m_queue;
134 	};
135 
136 //
137 // limited_dynamic_demand_queue
138 //
139 /*!
140  * \since
141  * v.5.5.13
142  *
143  * \brief Implementation of demands queue for size-limited message chain with
144  * dynamically allocated storage.
145  */
146 class limited_dynamic_demand_queue
147 	{
148 	public :
149 		//! Initializing constructor.
limited_dynamic_demand_queue(const capacity_t & capacity)150 		limited_dynamic_demand_queue(
151 			const capacity_t & capacity )
152 			:	m_max_size{ capacity.max_size() }
153 			{}
154 
155 		//! Is queue full?
156 		bool
is_full() const157 		is_full() const { return m_max_size == m_queue.size(); }
158 
159 		//! Is queue empty?
160 		bool
is_empty() const161 		is_empty() const { return m_queue.empty(); }
162 
163 		//! Access to front item from queue.
164 		demand_t &
front()165 		front()
166 			{
167 				ensure_queue_not_empty( *this );
168 				return m_queue.front();
169 			}
170 
171 		//! Remove the front item from queue.
172 		void
pop_front()173 		pop_front()
174 			{
175 				ensure_queue_not_empty( *this );
176 				m_queue.pop_front();
177 			}
178 
179 		//! Add a new item to the end of the queue.
180 		void
push_back(demand_t && demand)181 		push_back( demand_t && demand )
182 			{
183 				ensure_queue_not_full( *this );
184 				m_queue.push_back( std::move(demand) );
185 			}
186 
187 		//! Size of the queue.
188 		std::size_t
size() const189 		size() const { return m_queue.size(); }
190 
191 	private :
192 		//! Queue's storage.
193 		std::deque< demand_t > m_queue;
194 		//! Maximum size of the queue.
195 		const std::size_t m_max_size;
196 	};
197 
198 //
199 // limited_preallocated_demand_queue
200 //
201 /*!
202  * \since
203  * v.5.5.13
204  *
205  * \brief Implementation of demands queue for size-limited message chain with
206  * preallocated storage.
207  */
208 class limited_preallocated_demand_queue
209 	{
210 	public :
211 		//! Initializing constructor.
limited_preallocated_demand_queue(const capacity_t & capacity)212 		limited_preallocated_demand_queue(
213 			const capacity_t & capacity )
214 			:	m_storage( capacity.max_size(), demand_t{} )
215 			,	m_max_size{ capacity.max_size() }
216 			,	m_head{ 0 }
217 			,	m_size{ 0 }
218 			{}
219 
220 		//! Is queue full?
221 		bool
is_full() const222 		is_full() const { return m_max_size == m_size; }
223 
224 		//! Is queue empty?
225 		bool
is_empty() const226 		is_empty() const { return 0 == m_size; }
227 
228 		//! Access to front item from queue.
229 		demand_t &
front()230 		front()
231 			{
232 				ensure_queue_not_empty( *this );
233 				return m_storage[ m_head ];
234 			}
235 
236 		//! Remove the front item from queue.
237 		void
pop_front()238 		pop_front()
239 			{
240 				ensure_queue_not_empty( *this );
241 				m_storage[ m_head ] = demand_t{};
242 				m_head = (m_head + 1) % m_max_size;
243 				--m_size;
244 			}
245 
246 		//! Add a new item to the end of the queue.
247 		void
push_back(demand_t && demand)248 		push_back( demand_t && demand )
249 			{
250 				ensure_queue_not_full( *this );
251 				auto index = (m_head + m_size) % m_max_size;
252 				m_storage[ index ] = std::move(demand);
253 				++m_size;
254 			}
255 
256 		//! Size of the queue.
257 		std::size_t
size() const258 		size() const { return m_size; }
259 
260 	private :
261 		//! Queue's storage.
262 		std::vector< demand_t > m_storage;
263 		//! Maximum size of the queue.
264 		const std::size_t m_max_size;
265 
266 		//! Index of the queue head.
267 		std::size_t m_head;
268 		//! The current size of the queue.
269 		std::size_t m_size;
270 	};
271 
272 //
273 // status
274 //
275 /*!
276  * \since
277  * v.5.5.13
278  *
279  * \brief Status of the message chain.
280  */
281 enum class status
282 	{
283 		//! Bag is open and can be used for message sending.
284 		open,
285 		//! Bag is closed. New messages cannot be sent to it.
286 		closed
287 	};
288 
289 } /* namespace details */
290 
291 //
292 // mchain_template
293 //
294 /*!
295  * \since
296  * v.5.5.13
297  *
298  * \brief Template-based implementation of message chain.
299  *
300  * \tparam Queue type of demand queue for message chain.
301  * \tparam Tracing_Base type with message tracing implementation details.
302  */
303 template< typename Queue, typename Tracing_Base >
304 class mchain_template
305 	:	public abstract_message_chain_t
306 	,	private Tracing_Base
307 	{
308 	public :
309 		//! Initializing constructor.
310 		template< typename... Tracing_Args >
mchain_template(so_5::environment_t & env,mbox_id_t id,const mchain_params_t & params,Tracing_Args &&...tracing_args)311 		mchain_template(
312 			//! SObjectizer Environment for which message chain is created.
313 			so_5::environment_t & env,
314 			//! Mbox ID for this chain.
315 			mbox_id_t id,
316 			//! Chain parameters.
317 			const mchain_params_t & params,
318 			//! Arguments for Tracing_Base's constructor.
319 			Tracing_Args &&... tracing_args )
320 			:	Tracing_Base( std::forward<Tracing_Args>(tracing_args)... )
321 			,	m_env( env )
322 			,	m_id( id )
323 			,	m_capacity( params.capacity() )
324 			,	m_not_empty_notificator( params.not_empty_notificator() )
325 			,	m_queue( params.capacity() )
326 			{}
327 
328 		mbox_id_t
id() const329 		id() const override
330 			{
331 				return m_id;
332 			}
333 
334 		void
subscribe_event_handler(const std::type_index &,const so_5::message_limit::control_block_t *,agent_t &)335 		subscribe_event_handler(
336 			const std::type_index & /*msg_type*/,
337 			const so_5::message_limit::control_block_t * /*limit*/,
338 			agent_t & /*subscriber*/ ) override
339 			{
340 				SO_5_THROW_EXCEPTION(
341 						rc_msg_chain_doesnt_support_subscriptions,
342 						"mchain doesn't support subscription" );
343 			}
344 
345 		void
unsubscribe_event_handlers(const std::type_index &,agent_t &)346 		unsubscribe_event_handlers(
347 			const std::type_index & /*msg_type*/,
348 			agent_t & /*subscriber*/ ) override
349 			{}
350 
351 		std::string
query_name() const352 		query_name() const override
353 			{
354 				std::ostringstream s;
355 				s << "<mchain:id=" << m_id << ">";
356 
357 				return s.str();
358 			}
359 
360 		mbox_type_t
type() const361 		type() const override
362 			{
363 				return mbox_type_t::multi_producer_single_consumer;
364 			}
365 
366 		void
do_deliver_message(const std::type_index & msg_type,const message_ref_t & message,unsigned int)367 		do_deliver_message(
368 			const std::type_index & msg_type,
369 			const message_ref_t & message,
370 			unsigned int /*overlimit_reaction_deep*/ ) override
371 			{
372 				this->try_to_store_message_to_queue(
373 							msg_type,
374 							message );
375 			}
376 
377 		/*!
378 		 * \attention Will throw an exception because delivery
379 		 * filter is not applicable to MPSC-mboxes.
380 		 */
381 		void
set_delivery_filter(const std::type_index &,const delivery_filter_t &,agent_t &)382 		set_delivery_filter(
383 			const std::type_index & /*msg_type*/,
384 			const delivery_filter_t & /*filter*/,
385 			agent_t & /*subscriber*/ ) override
386 			{
387 				SO_5_THROW_EXCEPTION(
388 						rc_msg_chain_doesnt_support_delivery_filters,
389 						"set_delivery_filter is called for mchain" );
390 			}
391 
392 		void
drop_delivery_filter(const std::type_index &,agent_t &)393 		drop_delivery_filter(
394 			const std::type_index & /*msg_type*/,
395 			agent_t & /*subscriber*/ ) noexcept override
396 			{}
397 
398 		[[nodiscard]]
399 		extraction_status_t
extract(demand_t & dest,duration_t empty_queue_timeout)400 		extract(
401 			demand_t & dest,
402 			duration_t empty_queue_timeout ) override
403 			{
404 				std::unique_lock< std::mutex > lock{ m_lock };
405 
406 				// If queue is empty we must wait for some time.
407 				bool queue_empty = m_queue.is_empty();
408 				if( queue_empty )
409 					{
410 						if( details::status::closed == m_status )
411 							// Waiting for new messages has no sence because
412 							// chain is closed.
413 							return extraction_status_t::chain_closed;
414 
415 						auto predicate = [this, &queue_empty]() -> bool {
416 								queue_empty = m_queue.is_empty();
417 								return !queue_empty ||
418 										details::status::closed == m_status;
419 							};
420 
421 						// Count of sleeping thread must be incremented before
422 						// going to sleep and decremented right after.
423 						++m_threads_to_wakeup;
424 						auto decrement_threads = so_5::details::at_scope_exit(
425 								[this] { --m_threads_to_wakeup; } );
426 
427 						// Wait until arrival of any message or closing of chain.
428 						::so_5::details::wait_for_big_interval(
429 								lock,
430 								m_underflow_cond,
431 								empty_queue_timeout,
432 								predicate );
433 					}
434 
435 				// If queue is still empty nothing can be extracted and
436 				// we must stop operation.
437 				if( queue_empty )
438 					return details::status::open == m_status ?
439 							// The chain is still open so there must be this result
440 							extraction_status_t::no_messages :
441 							// The chain is closed and there must be different result
442 							extraction_status_t::chain_closed;
443 
444 				return extract_demand_from_not_empty_queue( dest );
445 			}
446 
447 		bool
empty() const448 		empty() const override
449 			{
450 				return m_queue.is_empty();
451 			}
452 
453 		std::size_t
size() const454 		size() const override
455 			{
456 				return m_queue.size();
457 			}
458 
459 		void
close(close_mode_t mode)460 		close( close_mode_t mode ) override
461 			{
462 				std::lock_guard< std::mutex > lock{ m_lock };
463 
464 				if( details::status::closed == m_status )
465 					return;
466 
467 				m_status = details::status::closed;
468 
469 				const bool was_full = m_queue.is_full();
470 
471 				if( close_mode_t::drop_content == mode )
472 					{
473 						while( !m_queue.is_empty() )
474 							{
475 								this->trace_demand_drop_on_close(
476 										*this, m_queue.front() );
477 								m_queue.pop_front();
478 							}
479 					}
480 
481 				// Since v.5.7.0 select operations must be notified
482 				// always, even if the mchain is not empty.
483 				notify_multi_chain_select_ops();
484 
485 				if( m_threads_to_wakeup )
486 					// Someone is waiting on empty chain for new messages.
487 					// It must be informed that no new messages will be here.
488 					m_underflow_cond.notify_all();
489 
490 				if( was_full )
491 					// Someone can wait on full chain for free place for new message.
492 					// It must be informed that the chain is closed.
493 					m_overflow_cond.notify_all();
494 			}
495 
496 		environment_t &
environment() const497 		environment() const noexcept override
498 			{
499 				return m_env;
500 			}
501 
502 	protected :
503 		[[nodiscard]]
504 		extraction_status_t
extract(demand_t & dest,select_case_t & select_case)505 		extract(
506 			demand_t & dest,
507 			select_case_t & select_case ) override
508 			{
509 				std::unique_lock< std::mutex > lock{ m_lock };
510 
511 				const bool queue_empty = m_queue.is_empty();
512 				if( queue_empty )
513 					{
514 						if( details::status::closed == m_status )
515 							// There is no need to wait for something.
516 							return extraction_status_t::chain_closed;
517 
518 						// In other cases select_tail must be modified.
519 						select_case.set_next( m_select_tail );
520 						m_select_tail = &select_case;
521 
522 						return extraction_status_t::no_messages;
523 					}
524 				else
525 					return extract_demand_from_not_empty_queue( dest );
526 			}
527 
528 		[[nodiscard]]
529 		mchain_props::push_status_t
push(const std::type_index & msg_type,const message_ref_t & message,mchain_props::select_case_t & select_case)530 		push(
531 			const std::type_index & msg_type,
532 			const message_ref_t & message,
533 			mchain_props::select_case_t & select_case ) override
534 			{
535 				typename Tracing_Base::deliver_op_tracer tracer{
536 						*this, // as tracing base.
537 						*this, // as chain.
538 						msg_type,
539 						message };
540 
541 				std::unique_lock< std::mutex > lock{ m_lock };
542 
543 				// Message cannot be stored to closed chain.
544 				if( details::status::closed == m_status )
545 					return mchain_props::push_status_t::chain_closed;
546 
547 				if( m_queue.is_full() )
548 					{
549 						// The select_case should be stored until there will
550 						// be a free space in the chain (or chain will be closed).
551 						select_case.set_next( m_select_tail );
552 						m_select_tail = &select_case;
553 
554 						return mchain_props::push_status_t::deffered;
555 					}
556 				else
557 					{
558 						// Just store a new message to the queue.
559 						complete_store_message_to_queue(
560 								tracer,
561 								msg_type,
562 								message );
563 						return mchain_props::push_status_t::stored;
564 					}
565 			}
566 
567 		void
remove_from_select(select_case_t & select_case)568 		remove_from_select(
569 			select_case_t & select_case ) noexcept override
570 			{
571 				std::lock_guard< std::mutex > lock{ m_lock };
572 
573 				select_case_t * c = m_select_tail;
574 				select_case_t * prev = nullptr;
575 				while( c )
576 					{
577 						select_case_t * const next = c->query_next();
578 						if( c == &select_case )
579 							{
580 								if( prev )
581 									prev->set_next( next );
582 								else
583 									m_select_tail = next;
584 
585 								return;
586 							}
587 
588 						prev = c;
589 						c = next;
590 					}
591 			}
592 
593 		void
do_deliver_message_from_timer(const std::type_index & msg_type,const message_ref_t & message)594 		do_deliver_message_from_timer(
595 			const std::type_index & msg_type,
596 			const message_ref_t & message ) override
597 			{
598 				try_to_store_message_from_timer_to_queue(
599 						msg_type,
600 						message );
601 			}
602 
603 	private :
604 		//! SObjectizer Environment for which message chain is created.
605 		environment_t & m_env;
606 
607 		//! Status of the chain.
608 		details::status m_status = { details::status::open };
609 
610 		//! Mbox ID for chain.
611 		const mbox_id_t m_id;
612 
613 		//! Chain capacity.
614 		const capacity_t m_capacity;
615 
616 		//! Optional notificator for 'not_empty' condition.
617 		const not_empty_notification_func_t m_not_empty_notificator;
618 
619 		//! Chain's demands queue.
620 		Queue m_queue;
621 
622 		//! Chain's lock.
623 		std::mutex m_lock;
624 
625 		//! Condition variable for waiting on empty queue.
626 		std::condition_variable m_underflow_cond;
627 		//! Condition variable for waiting on full queue.
628 		std::condition_variable m_overflow_cond;
629 
630 		/*!
631 		 * \brief Count of threads sleeping on empty mchain.
632 		 *
633 		 * This value is incremented before sleeping on m_underflow_cond and
634 		 * decremented just after a return from this sleep.
635 		 *
636 		 * \since
637 		 * v.5.5.16
638 		 */
639 		std::size_t m_threads_to_wakeup = { 0 };
640 
641 		/*!
642 		 * \brief A queue of multi-chain selects in which this chain is used.
643 		 *
644 		 * \since
645 		 * v.5.5.16
646 		 */
647 		select_case_t * m_select_tail = nullptr;
648 
649 		//! Actual implementation of pushing message to the queue.
650 		/*!
651 		 * \note
652 		 * This implementation must be used for ordinary delivery operations.
653 		 * For delivery operations from timer thread another method must be
654 		 * called (see try_to_store_message_from_timer_to_queue()).
655 		 */
656 		void
try_to_store_message_to_queue(const std::type_index & msg_type,const message_ref_t & message)657 		try_to_store_message_to_queue(
658 			const std::type_index & msg_type,
659 			const message_ref_t & message )
660 			{
661 				typename Tracing_Base::deliver_op_tracer tracer{
662 						*this, // as tracing base.
663 						*this, // as chain.
664 						msg_type,
665 						message };
666 
667 				std::unique_lock< std::mutex > lock{ m_lock };
668 
669 				// Message cannot be stored to closed chain.
670 				if( details::status::closed == m_status )
671 					return;
672 
673 				// If queue full and waiting on full queue is enabled we
674 				// must wait for some time until there will be some space in
675 				// the queue.
676 				bool queue_full = m_queue.is_full();
677 				if( queue_full && m_capacity.is_overflow_timeout_defined() )
678 					{
679 						::so_5::details::wait_for_big_interval(
680 								lock,
681 								m_overflow_cond,
682 								m_capacity.overflow_timeout(),
683 								[this, &queue_full] {
684 									queue_full = m_queue.is_full();
685 									return !queue_full ||
686 											details::status::closed == m_status;
687 								} );
688 
689 						// Message cannot be stored to closed chain.
690 						//
691 						// NOTE: this additional check is necessary after
692 						// wait for overflow_timeout because the chain can
693 						// be closed during that wait.
694 						if( details::status::closed == m_status )
695 							return;
696 					}
697 
698 				// If queue still full we must perform some reaction.
699 				if( queue_full )
700 					{
701 						const auto reaction = m_capacity.overflow_reaction();
702 						if( overflow_reaction_t::drop_newest == reaction )
703 							{
704 								// New message must be simply ignored.
705 								tracer.overflow_drop_newest();
706 								return;
707 							}
708 						else if( overflow_reaction_t::remove_oldest == reaction )
709 							{
710 								// The oldest message must be simply removed.
711 								tracer.overflow_remove_oldest( m_queue.front() );
712 								m_queue.pop_front();
713 							}
714 						else if( overflow_reaction_t::throw_exception == reaction )
715 							{
716 								tracer.overflow_throw_exception();
717 								SO_5_THROW_EXCEPTION(
718 										rc_msg_chain_overflow,
719 										"an attempt to push message to full mchain "
720 										"with overflow_reaction_t::throw_exception policy" );
721 							}
722 						else
723 							{
724 								so_5::details::abort_on_fatal_error( [&] {
725 										tracer.overflow_throw_exception();
726 										SO_5_LOG_ERROR( m_env, log_stream ) {
727 											log_stream << "overflow_reaction_t::abort_app "
728 													"will be performed for mchain (id="
729 													<< m_id << "), msg_type: "
730 													<< msg_type.name()
731 													<< ". Application will be aborted"
732 													<< std::endl;
733 										}
734 									} );
735 							}
736 					}
737 
738 				complete_store_message_to_queue(
739 						tracer,
740 						msg_type,
741 						message );
742 			}
743 
744 		/*!
745 		 * \brief An implementation of storing another message to
746 		 * chain for the case of delated/periodic messages.
747 		 *
748 		 * This implementation handles overloaded chains differently:
749 		 * - there is no waiting on overloaded chain (even if such waiting
750 		 *   is specified in mchain params);
751 		 * - overflow_reaction_t::throw_exception is replaced by
752 		 *   overflow_reaction_t::drop_newest.
753 		 *
754 		 * These defferences are necessary because the context of timer
755 		 * thread is very special: there can't be any long-time operation
756 		 * (like waiting for free space on overloaded chain) and there can't
757 		 * be an exception about mchain's overflow.
758 		 *
759 		 * \since
760 		 * v.5.5.18
761 		 */
762 		void
try_to_store_message_from_timer_to_queue(const std::type_index & msg_type,const message_ref_t & message)763 		try_to_store_message_from_timer_to_queue(
764 			const std::type_index & msg_type,
765 			const message_ref_t & message )
766 			{
767 				typename Tracing_Base::deliver_op_tracer tracer{
768 						*this, // as tracing base.
769 						*this, // as chain.
770 						msg_type,
771 						message };
772 
773 				std::unique_lock< std::mutex > lock{ m_lock };
774 
775 				// Message cannot be stored to closed chain.
776 				if( details::status::closed == m_status )
777 					return;
778 
779 				bool queue_full = m_queue.is_full();
780 				// NOTE: there is no awaiting on full mchain.
781 				// If queue full we must perform some reaction.
782 				if( queue_full )
783 					{
784 						const auto reaction = m_capacity.overflow_reaction();
785 						if( overflow_reaction_t::drop_newest == reaction ||
786 								overflow_reaction_t::throw_exception == reaction )
787 							{
788 								// New message must be simply ignored.
789 								tracer.overflow_drop_newest();
790 								return;
791 							}
792 						else if( overflow_reaction_t::remove_oldest == reaction )
793 							{
794 								// The oldest message must be simply removed.
795 								tracer.overflow_remove_oldest( m_queue.front() );
796 								m_queue.pop_front();
797 							}
798 						else
799 							{
800 								so_5::details::abort_on_fatal_error( [&] {
801 										tracer.overflow_throw_exception();
802 										SO_5_LOG_ERROR( m_env, log_stream ) {
803 											log_stream << "overflow_reaction_t::abort_app "
804 													"will be performed for mchain (id="
805 													<< m_id << "), msg_type: "
806 													<< msg_type.name()
807 													<< ". Application will be aborted"
808 													<< std::endl;
809 										}
810 									} );
811 							}
812 					}
813 
814 				complete_store_message_to_queue(
815 						tracer,
816 						msg_type,
817 						message );
818 			}
819 
820 		/*!
821 		 * \brief Implementation of extract operation for the case when
822 		 * message queue is not empty.
823 		 *
824 		 * \attention This helper method must be called when chain object
825 		 * is locked in some hi-level method.
826 		 *
827 		 * \since
828 		 * v.5.5.16
829 		 */
830 		extraction_status_t
extract_demand_from_not_empty_queue(demand_t & dest)831 		extract_demand_from_not_empty_queue(
832 			demand_t & dest )
833 			{
834 				// If queue was full then someone can wait on it.
835 				const bool queue_was_full = m_queue.is_full();
836 				dest = std::move( m_queue.front() );
837 				m_queue.pop_front();
838 
839 				this->trace_extracted_demand( *this, dest );
840 
841 				if( queue_was_full )
842 					{
843 						// Since v.5.7.0 waiting select_cases should be
844 						// notified too because they are send_cases.
845 						notify_multi_chain_select_ops();
846 
847 						m_overflow_cond.notify_all();
848 					}
849 
850 				return extraction_status_t::msg_extracted;
851 			}
852 
853 		/*!
854 		 * \since
855 		 * v.5.5.16
856 		 */
857 		void
notify_multi_chain_select_ops()858 		notify_multi_chain_select_ops() noexcept
859 			{
860 				if( m_select_tail )
861 					{
862 						auto old = m_select_tail;
863 						m_select_tail = nullptr;
864 						old->notify();
865 					}
866 			}
867 
868 		/*!
869 		 * \brief A reusable method with implementation of
870 		 * last part of storing a message into chain.
871 		 *
872 		 * \note
873 		 * Intended to be called from try_to_store_message_to_queue()
874 		 * and try_to_store_message_from_timer_to_queue().
875 		 *
876 		 * \since
877 		 * v.5.5.18
878 		 */
879 		void
complete_store_message_to_queue(typename Tracing_Base::deliver_op_tracer & tracer,const std::type_index & msg_type,const message_ref_t & message)880 		complete_store_message_to_queue(
881 			typename Tracing_Base::deliver_op_tracer & tracer,
882 			const std::type_index & msg_type,
883 			const message_ref_t & message )
884 			{
885 				const bool was_empty = m_queue.is_empty();
886 
887 				m_queue.push_back( demand_t{ msg_type, message } );
888 
889 				tracer.stored( m_queue );
890 
891 				// If chain was empty then multi-chain cases must be notified.
892 				// And if not_empty_notificator is defined then it must be used too.
893 				if( was_empty )
894 					{
895 						if( m_not_empty_notificator )
896 							so_5::details::invoke_noexcept_code(
897 								[this] { m_not_empty_notificator(); } );
898 
899 						notify_multi_chain_select_ops();
900 					}
901 
902 				// Should be wake up some sleeping thread?
903 				if( m_threads_to_wakeup && m_threads_to_wakeup >= m_queue.size() )
904 					// Someone is waiting on empty queue.
905 					m_underflow_cond.notify_one();
906 			}
907 	};
908 
909 } /* namespace mchain_props */
910 
911 } /* namespace so_5 */
912 
913