1 /*
2  * SObjectizer-5
3  */
4 
5 /*!
6  * \file
7  * \brief Public part of message chain related stuff.
8  * \since
9  * v.5.5.13
10  */
11 
12 #pragma once
13 
14 #include <so_5/mbox.hpp>
15 #include <so_5/handler_makers.hpp>
16 
17 #include <so_5/fwd.hpp>
18 
19 #include <so_5/details/invoke_noexcept_code.hpp>
20 #include <so_5/details/remaining_time_counter.hpp>
21 
22 #include <chrono>
23 #include <functional>
24 
25 namespace so_5 {
26 
27 namespace mchain_props {
28 
29 /*!
30  * \since
31  * v.5.5.13
32  *
33  * \brief An alias for type for repesenting timeout values.
34  */
35 using duration_t = std::chrono::high_resolution_clock::duration;
36 
37 namespace details {
38 
39 //
40 // no_wait_special_timevalue
41 //
42 /*!
43  * \since
44  * v.5.5.13
45  *
46  * \brief Special value of %duration to indicate 'no_wait' case.
47  */
48 inline duration_t
no_wait_special_timevalue()49 no_wait_special_timevalue() { return duration_t::zero(); }
50 
51 //
52 // infinite_wait_special_timevalue
53 //
54 /*!
55  * \since
56  * v.5.5.13
57  *
58  * \brief Special value of %duration to indicate 'infinite_wait' case.
59  */
60 inline duration_t
infinite_wait_special_timevalue()61 infinite_wait_special_timevalue() { return duration_t::max(); }
62 
63 //
64 // is_no_wait_timevalue
65 //
66 /*!
67  * \since
68  * v.5.5.13
69  *
70  * \brief Is time value means 'no_wait'?
71  */
72 inline bool
is_no_wait_timevalue(duration_t v)73 is_no_wait_timevalue( duration_t v )
74 	{
75 		return v == no_wait_special_timevalue();
76 	}
77 
78 //
79 // is_infinite_wait_timevalue
80 //
81 /*!
82  * \since
83  * v.5.5.13
84  *
85  * \brief Is time value means 'infinite_wait'?
86  */
87 inline bool
is_infinite_wait_timevalue(duration_t v)88 is_infinite_wait_timevalue( duration_t v )
89 	{
90 		return v == infinite_wait_special_timevalue();
91 	}
92 
93 //
94 // actual_timeout
95 //
96 
97 /*!
98  * \since
99  * v.5.5.13
100  *
101  * \brief Helper function for detection of actual value for waiting timeout.
102  *
103  * \note This helper implements convention that infinite waiting is
104  * represented as duration_t::max() value.
105  */
106 inline duration_t
actual_timeout(infinite_wait_indication)107 actual_timeout( infinite_wait_indication )
108 	{
109 		return infinite_wait_special_timevalue();
110 	}
111 
112 /*!
113  * \since
114  * v.5.5.13
115  *
116  * \brief Helper function for detection of actual value for waiting timeout.
117  *
118  * \note This helper implements convention that no waiting is
119  * represented as duration_t::zero() value.
120  */
121 inline duration_t
actual_timeout(no_wait_indication)122 actual_timeout( no_wait_indication )
123 	{
124 		return no_wait_special_timevalue();
125 	}
126 
127 /*!
128  * \since
129  * v.5.5.13
130  *
131  * \brief Helper function for detection of actual value for waiting timeout.
132  */
133 template< typename V >
134 duration_t
actual_timeout(V value)135 actual_timeout( V value )
136 	{
137 		return duration_t( value );
138 	}
139 
140 } /* namespace details */
141 
142 //
143 // demand_t
144 //
145 /*!
146  * \since
147  * v.5.5.13
148  *
149  * \brief Description of one demand in message chain.
150  */
151 struct demand_t
152 	{
153 		//! Type of the message.
154 		std::type_index m_msg_type;
155 		//! Event incident.
156 		so_5::message_ref_t m_message_ref;
157 
158 		//! Default constructor.
demand_tso_5::mchain_props::demand_t159 		demand_t()
160 			:	m_msg_type( typeid(void) )
161 			{}
162 		//! Initializing constructor.
demand_tso_5::mchain_props::demand_t163 		demand_t(
164 			std::type_index msg_type,
165 			so_5::message_ref_t message_ref )
166 			:	m_msg_type{ std::move(msg_type) }
167 			,	m_message_ref{ std::move(message_ref) }
168 			{}
169 
170 		//! Swap operation.
171 		friend void
swap(demand_t & a,demand_t & b)172 		swap( demand_t & a, demand_t & b )
173 			{
174 				using std::swap;
175 
176 				swap( a.m_msg_type, b.m_msg_type );
177 				swap( a.m_message_ref, b.m_message_ref );
178 			}
179 	};
180 
181 //
182 // memory_usage_t
183 //
184 /*!
185  * \since
186  * v.5.5.13
187  *
188  * \brief Memory allocation for storage for size-limited chains.
189  */
190 enum class memory_usage_t
191 	{
192 		//! Storage can be allocated and deallocated dynamically.
193 		dynamic,
194 		//! Storage must be preallocated once and doesn't change after that.
195 		preallocated
196 	};
197 
198 //
199 // overflow_reaction_t
200 //
201 /*!
202  * \since
203  * v.5.5.13
204  *
205  * \brief What reaction must be performed on attempt to push new message to
206  * the full message chain.
207  */
208 enum class overflow_reaction_t
209 	{
210 		//! Application must be aborted.
211 		abort_app,
212 		//! An exception must be thrown.
213 		/*!
214 		 * \note
215 		 * Since v.5.5.18 this value leads to an exception only if
216 		 * ordinary `send` is used for pushing message to overloaded
217 		 * message chain. If there is an attempt to push
218 		 * delayed or periodic message to overloaded message chain then
219 		 * throw_exception reaction is replaced by drop_newest. It is becasue
220 		 * the context of timer thread is a special contex. No exceptions
221 		 * should be thrown on it.
222 		 */
223 		throw_exception,
224 		//! New message must be ignored and droped.
225 		drop_newest,
226 		//! Oldest message in chain must be removed.
227 		remove_oldest
228 	};
229 
230 //
231 // capacity_t
232 //
233 /*!
234  * \since
235  * v.5.5.13
236  *
237  * \brief Parameters for defining chain size.
238  */
239 class capacity_t
240 	{
241 		//! Has chain unlimited size?
242 		bool m_unlimited = { true };
243 
244 		// NOTE: all other atributes have sence only if m_unlimited != true.
245 
246 		//! Max size of the chain with limited size.
247 		std::size_t m_max_size;
248 
249 		//! Type of the storage for size-limited chain.
250 		memory_usage_t m_memory;
251 
252 		//! Type of reaction for chain overflow.
253 		overflow_reaction_t m_overflow_reaction;
254 
255 		//! Timeout for waiting on full chain during 'message push' operation.
256 		/*!
257 		 * \note Value 'zero' means that there must not be waiting on
258 		 * full chain.
259 		 */
260 		duration_t m_overflow_timeout;
261 
262 		//! Initializing constructor for size-limited message chain.
capacity_t(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction,duration_t overflow_timeout)263 		capacity_t(
264 			std::size_t max_size,
265 			memory_usage_t memory_usage,
266 			overflow_reaction_t overflow_reaction,
267 			duration_t overflow_timeout )
268 			:	m_unlimited{ false }
269 			,	m_max_size{ max_size }
270 			,	m_memory{ memory_usage }
271 			,	m_overflow_reaction{ overflow_reaction }
272 			,	m_overflow_timeout( overflow_timeout )
273 			{}
274 
275 	public :
276 		//! Default constructor.
277 		/*!
278 		 * Creates description for size-unlimited chain.
279 		 */
capacity_t()280 		capacity_t()
281 			{}
282 
283 		//! Create capacity description for size-unlimited message chain.
284 		inline static capacity_t
make_unlimited()285 		make_unlimited() { return capacity_t{}; }
286 
287 		//! Create capacity description for size-limited message chain
288 		//! without waiting on full queue during 'push message' operation.
289 		inline static capacity_t
make_limited_without_waiting(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction)290 		make_limited_without_waiting(
291 			//! Max size of the chain.
292 			std::size_t max_size,
293 			//! Type of chain storage.
294 			memory_usage_t memory_usage,
295 			//! Reaction on chain overflow.
296 			overflow_reaction_t overflow_reaction )
297 			{
298 				return capacity_t{
299 						max_size,
300 						memory_usage,
301 						overflow_reaction,
302 						details::no_wait_special_timevalue()
303 				};
304 			}
305 
306 		//! Create capacity description for size-limited message chain
307 		//! with waiting on full queue during 'push message' operation.
308 		inline static capacity_t
make_limited_with_waiting(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction,duration_t wait_timeout)309 		make_limited_with_waiting(
310 			//! Max size of the chain.
311 			std::size_t max_size,
312 			//! Type of chain storage.
313 			memory_usage_t memory_usage,
314 			//! Reaction on chain overflow.
315 			overflow_reaction_t overflow_reaction,
316 			//! Waiting time on full message chain.
317 			duration_t wait_timeout )
318 			{
319 				return capacity_t{
320 						max_size,
321 						memory_usage,
322 						overflow_reaction,
323 						wait_timeout
324 				};
325 			}
326 
327 		//! Is message chain have no size limit?
328 		bool
unlimited() const329 		unlimited() const { return m_unlimited; }
330 
331 		//! Max size for size-limited chain.
332 		/*!
333 		 * \attention Has sence only for size-limited chain.
334 		 */
335 		std::size_t
max_size() const336 		max_size() const { return m_max_size; }
337 
338 		//! Memory allocation type for size-limited chain.
339 		/*!
340 		 * \attention Has sence only for size-limited chain.
341 		 */
342 		memory_usage_t
memory_usage() const343 		memory_usage() const { return m_memory; }
344 
345 		//! Overflow reaction for size-limited chain.
346 		/*!
347 		 * \attention Has sence only for size-limited chain.
348 		 */
349 		overflow_reaction_t
overflow_reaction() const350 		overflow_reaction() const { return m_overflow_reaction; }
351 
352 		//! Is waiting timeout for overflow case defined?
353 		/*!
354 		 * \attention Has sence only for size-limited chain.
355 		 */
356 		bool
is_overflow_timeout_defined() const357 		is_overflow_timeout_defined() const
358 			{
359 				return !details::is_no_wait_timevalue( m_overflow_timeout );
360 			}
361 
362 		//! Get the value of waiting timeout for overflow case.
363 		/*!
364 		 * \attention Has sence only for size-limited chain.
365 		 */
366 		duration_t
overflow_timeout() const367 		overflow_timeout() const
368 			{
369 				return m_overflow_timeout;
370 			}
371 	};
372 
373 //
374 // extraction_status_t
375 //
376 /*!
377  * \since
378  * v.5.5.13
379  *
380  * \brief Result of extraction of message from a message chain.
381  */
382 enum class extraction_status_t
383 	{
384 		//! No available messages in the chain.
385 		no_messages,
386 		//! Message extracted successfully.
387 		msg_extracted,
388 		//! Message cannot be extracted because chain is closed.
389 		chain_closed
390 	};
391 
392 //
393 // push_status_t
394 //
395 /*!
396  * \brief Result of attempt of pushing a message into a message chain.
397  *
398  * \since
399  * v.5.7.0
400  */
401 enum class push_status_t
402 	{
403 		//! Message wasn't stored.
404 		not_stored,
405 		//! Message stored into a message chain.
406 		stored,
407 		//! Message is not stored but the store operation is registered
408 		//! into a message chain.
409 		deffered,
410 		//! Message wasn't stored because chain is closed.
411 		chain_closed
412 	};
413 
414 //
415 // close_mode_t
416 //
417 /*!
418  * \since
419  * v.5.5.13
420  *
421  * \brief What to do with chain's content at close.
422  */
423 enum class close_mode_t
424 	{
425 		//! All messages must be removed from chain.
426 		drop_content,
427 		//! All messages must be retained until they will be
428 		//! processed at receiver's side.
429 		retain_content
430 	};
431 
432 //
433 // not_empty_notification_func_t
434 //
435 /*!
436  * \since
437  * v.5.5.13
438  *
439  * \brief Type of functor for notifies about arrival of a message to
440  * the empty chain.
441  *
442  * \attention This function must be noexcept.
443  */
444 using not_empty_notification_func_t = std::function< void() >;
445 
446 //
447 // Forward declarations related to multi chain select operations.
448 //
449 class select_case_t;
450 
451 } /* namespace mchain_props */
452 
453 //
454 // abstract_message_chain_t
455 //
456 /*!
457  * \since
458  * v.5.5.13
459  *
460  * \brief An interace of message chain.
461  */
462 class SO_5_TYPE abstract_message_chain_t : protected so_5::abstract_message_box_t
463 	{
464 		friend class intrusive_ptr_t< abstract_message_chain_t >;
465 		friend class mchain_props::select_case_t;
466 
467 		abstract_message_chain_t( const abstract_message_chain_t & ) = delete;
468 		abstract_message_chain_t &
469 		operator=( const abstract_message_chain_t & ) = delete;
470 
471 	protected :
472 		abstract_message_chain_t() = default;
473 		~abstract_message_chain_t() noexcept override = default;
474 
475 	public :
476 		using abstract_message_box_t::id;
477 		using abstract_message_box_t::environment;
478 
479 		[[nodiscard]]
480 		virtual mchain_props::extraction_status_t
481 		extract(
482 			//! Destination for extracted messages.
483 			mchain_props::demand_t & dest,
484 			//! Max time to wait on empty queue.
485 			mchain_props::duration_t empty_queue_timeout ) = 0;
486 
487 		//! Cast message chain to message box.
488 		so_5::mbox_t
489 		as_mbox();
490 
491 		//! Is message chain empty?
492 		virtual bool
493 		empty() const = 0;
494 
495 		//! Count of messages in the chain.
496 		virtual std::size_t
497 		size() const = 0;
498 
499 		//! Close the chain.
500 		virtual void
501 		close(
502 			//! What to do with chain's content.
503 			mchain_props::close_mode_t mode ) = 0;
504 
505 	protected :
506 		/*!
507 		 * \brief An extraction attempt as a part of multi chain select.
508 		 *
509 		 * \attention
510 		 * This method is a pure virtual since v.5.6.2.
511 		 *
512 		 * \note This method is intended to be used by select_case_t.
513 		 *
514 		 * \since
515 		 * v.5.5.16
516 		 */
517 		[[nodiscard]]
518 		virtual mchain_props::extraction_status_t
519 		extract(
520 			//! Destination for extracted messages.
521 			mchain_props::demand_t & dest,
522 			//! Select case to be stored for notification if mchain is empty.
523 			mchain_props::select_case_t & select_case ) = 0;
524 
525 		/*!
526 		 * \brief An attempt to push a new message into the mchain.
527 		 *
528 		 * Unlike do_deliver_message() method the push() doesn't apply
529 		 * the overload reaction if the mchain if full. The \a select_case
530 		 * is stored to select_cases list instead.
531 		 *
532 		 * \note
533 		 * This method is intended to be used by select_case_t.
534 		 *
535 		 * \since
536 		 * v.5.7.0
537 		 */
538 		[[nodiscard]]
539 		virtual mchain_props::push_status_t
540 		push(
541 			//! Type of message/signal to be pushed.
542 			const std::type_index & msg_type,
543 			//! Message/signal to be pushed.
544 			const message_ref_t & message,
545 			//! Select case to be stored for notification if mchain is full.
546 			mchain_props::select_case_t & select_case ) = 0;
547 
548 		/*!
549 		 * \brief Removement of mchain from multi chain select.
550 		 *
551 		 * \attention
552 		 * This method is a pure virtual and noexcept since v.5.6.2.
553 		 *
554 		 * \note This method is intended to be used by select_case_t.
555 		 *
556 		 * \since
557 		 * v.5.5.16
558 		 */
559 		virtual void
560 		remove_from_select(
561 			//! Select case to be removed from notification queue.
562 			mchain_props::select_case_t & select_case ) noexcept = 0;
563 	};
564 
565 //
566 // mchain_t
567 //
568 /*!
569  * \since
570  * v.5.5.13
571  *
572  * \brief Short name for smart pointer to message chain.
573  */
574 using mchain_t = intrusive_ptr_t< abstract_message_chain_t >;
575 
576 //
577 // close_drop_content
578 //
579 /*!
580  * \since
581  * v.5.5.13
582  *
583  * \brief Helper function for closing a message chain with dropping
584  * all its content.
585  *
586  * \note Because of ADL it can be used without specifying namespaces.
587  *
588  * \par Usage example.
589 	\code
590 	so_5::mchain_t & ch = ...;
591 	... // Some work with chain.
592 	close_drop_content( ch );
593 	// Or:
594 	ch->close( so_5::mchain_props::close_mode_t::drop_content );
595 	\endcode
596  */
597 inline void
close_drop_content(const mchain_t & ch)598 close_drop_content( const mchain_t & ch )
599 	{
600 		ch->close( mchain_props::close_mode_t::drop_content );
601 	}
602 
603 //
604 // close_retain_content
605 //
606 /*!
607  * \since
608  * v.5.5.13
609  *
610  * \brief Helper function for closing a message chain with retaining
611  * all its content.
612  *
613  * \note Because of ADL it can be used without specifying namespaces.
614  *
615  * \par Usage example.
616 	\code
617 	so_5::mchain_t & ch = ...;
618 	... // Some work with chain.
619 	close_retain_content( ch );
620 	// Or:
621 	ch->close( so_5::mchain_props::close_mode_t::retain_content );
622 	\endcode
623  */
624 inline void
close_retain_content(const mchain_t & ch)625 close_retain_content( const mchain_t & ch )
626 	{
627 		ch->close( mchain_props::close_mode_t::retain_content );
628 	}
629 
630 //
631 // mchain_params_t
632 //
633 /*!
634  * \since
635  * v.5.5.13
636  *
637  * \brief Parameters for message chain.
638  */
639 class mchain_params_t
640 	{
641 		//! Chain's capacity.
642 		mchain_props::capacity_t m_capacity;
643 
644 		//! An optional notificator for 'not_empty' condition.
645 		mchain_props::not_empty_notification_func_t m_not_empty_notificator;
646 
647 		//! Is message delivery tracing disabled explicitly?
648 		bool m_msg_tracing_disabled = { false };
649 
650 	public :
651 		//! Initializing constructor.
mchain_params_t(mchain_props::capacity_t capacity)652 		mchain_params_t(
653 			//! Chain's capacity and related params.
654 			mchain_props::capacity_t capacity )
655 			:	m_capacity{ capacity }
656 			{}
657 
658 		//! Set chain's capacity and related params.
659 		mchain_params_t &
capacity(mchain_props::capacity_t capacity)660 		capacity( mchain_props::capacity_t capacity )
661 			{
662 				m_capacity = capacity;
663 				return *this;
664 			}
665 
666 		//! Get chain's capacity and related params.
667 		const mchain_props::capacity_t &
capacity() const668 		capacity() const
669 			{
670 				return m_capacity;
671 			}
672 
673 		//! Set chain's notificator for 'not_empty' condition.
674 		/*!
675 		 * This notificator will be called when a message is stored to
676 		 * the empty chain and chain becomes not empty.
677 		 */
678 		mchain_params_t &
not_empty_notificator(mchain_props::not_empty_notification_func_t notificator)679 		not_empty_notificator(
680 			mchain_props::not_empty_notification_func_t notificator )
681 			{
682 				m_not_empty_notificator = std::move(notificator);
683 				return *this;
684 			}
685 
686 		//! Get chain's notificator for 'not_empty' condition.
687 		const mchain_props::not_empty_notification_func_t &
not_empty_notificator() const688 		not_empty_notificator() const
689 			{
690 				return m_not_empty_notificator;
691 			}
692 
693 		//! Disable message delivery tracing explicitly.
694 		/*!
695 		 * If this method called then message delivery tracing will
696 		 * not be used for that mchain even if message delivery
697 		 * tracing will be used for the whole SObjectizer Environment.
698 		 */
699 		mchain_params_t &
disable_msg_tracing()700 		disable_msg_tracing()
701 			{
702 				m_msg_tracing_disabled = true;
703 				return *this;
704 			}
705 
706 		//! Is message delivery tracing disabled explicitly?
707 		bool
msg_tracing_disabled() const708 		msg_tracing_disabled() const
709 			{
710 				return m_msg_tracing_disabled;
711 			}
712 	};
713 
714 /*!
715  * \name Helper functions for creating parameters for %mchain.
716  * \{
717  */
718 
719 /*!
720  * \since
721  * v.5.5.13
722  *
723  * \brief Create parameters for size-unlimited %mchain.
724  *
725  * \par Usage example:
726 	\code
727 	so_5::environment_t & env = ...;
728 	auto chain = env.create_mchain( so_5::make_unlimited_mchain_params() );
729 	\endcode
730  */
731 inline mchain_params_t
make_unlimited_mchain_params()732 make_unlimited_mchain_params()
733 	{
734 		return mchain_params_t{ mchain_props::capacity_t::make_unlimited() };
735 	}
736 
737 /*!
738  * \since
739  * v.5.5.13
740  *
741  * \brief Create parameters for size-limited %mchain without waiting on overflow.
742  *
743  * \par Usage example:
744 	\code
745 	so_5::environment_t & env = ...;
746 	auto chain = env.create_mchain( so_5::make_limited_without_waiting_mchain_params(
747 			// No more than 200 messages in the chain.
748 			200,
749 			// Memory will be allocated dynamically.
750 			so_5::mchain_props::memory_usage_t::dynamic,
751 			// New messages will be ignored on chain's overflow.
752 			so_5::mchain_props::overflow_reaction_t::drop_newest ) );
753 	\endcode
754  */
755 inline mchain_params_t
make_limited_without_waiting_mchain_params(std::size_t max_size,mchain_props::memory_usage_t memory_usage,mchain_props::overflow_reaction_t overflow_reaction)756 make_limited_without_waiting_mchain_params(
757 	//! Max capacity of %mchain.
758 	std::size_t max_size,
759 	//! Type of chain storage.
760 	mchain_props::memory_usage_t memory_usage,
761 	//! Reaction on chain overflow.
762 	mchain_props::overflow_reaction_t overflow_reaction )
763 	{
764 		return mchain_params_t{
765 				mchain_props::capacity_t::make_limited_without_waiting(
766 						max_size,
767 						memory_usage,
768 						overflow_reaction )
769 		};
770 	}
771 
772 /*!
773  * \since
774  * v.5.5.13
775  *
776  * \brief Create parameters for size-limited %mchain with waiting on overflow.
777  *
778  * \par Usage example:
779 	\code
780 	so_5::environment_t & env = ...;
781 	auto chain = env.create_mchain( so_5::make_limited_with_waiting_mchain_params(
782 			// No more than 200 messages in the chain.
783 			200,
784 			// Memory will be preallocated.
785 			so_5::mchain_props::memory_usage_t::preallocated,
786 			// New messages will be ignored on chain's overflow.
787 			so_5::mchain_props::overflow_reaction_t::drop_newest,
788 			// But before dropping a new message there will be 500ms timeout
789 			std::chrono::milliseconds(500) ) );
790 	\endcode
791  *
792  * \note
793  * Since v.5.5.18 there is an important difference in mchain behavior.
794  * If an ordinary `send` is used for message pushing then there will
795  * be waiting for free space if the message chain is full.
796  * But if message push is performed from timer thread (it means that
797  * message is a delayed or a periodic message) then there will not be
798  * any waiting. It is because the context of timer thread is very
799  * special: there is no possibility to spend some time on waiting for
800  * some free space in message chain. All operations on the context of
801  * timer thread must be done as fast as possible.
802  */
803 inline mchain_params_t
make_limited_with_waiting_mchain_params(std::size_t max_size,mchain_props::memory_usage_t memory_usage,mchain_props::overflow_reaction_t overflow_reaction,mchain_props::duration_t wait_timeout)804 make_limited_with_waiting_mchain_params(
805 	//! Max size of the chain.
806 	std::size_t max_size,
807 	//! Type of chain storage.
808 	mchain_props::memory_usage_t memory_usage,
809 	//! Reaction on chain overflow.
810 	mchain_props::overflow_reaction_t overflow_reaction,
811 	//! Waiting time on full message chain.
812 	mchain_props::duration_t wait_timeout )
813 	{
814 		return mchain_params_t {
815 				mchain_props::capacity_t::make_limited_with_waiting(
816 						max_size,
817 						memory_usage,
818 						overflow_reaction,
819 						wait_timeout )
820 		};
821 	}
822 
823 /*!
824  * \}
825  */
826 
827 //
828 // mchain_receive_result_t
829 //
830 /*!
831  * \since
832  * v.5.5.13
833  *
834  * \brief A result of receive from %mchain.
835  */
836 class mchain_receive_result_t
837 	{
838 		//! Count of extracted messages.
839 		std::size_t m_extracted;
840 		//! Count of handled messages.
841 		std::size_t m_handled;
842 		//! Extraction status (e.g. no messages, chain closed and so on).
843 		mchain_props::extraction_status_t m_status;
844 
845 	public :
846 		//! Default constructor.
mchain_receive_result_t()847 		mchain_receive_result_t() noexcept
848 			:	m_extracted{ 0 }
849 			,	m_handled{ 0 }
850 			,	m_status{ mchain_props::extraction_status_t::no_messages }
851 			{}
852 
853 		//! Initializing constructor.
mchain_receive_result_t(std::size_t extracted,std::size_t handled,mchain_props::extraction_status_t status)854 		mchain_receive_result_t(
855 			//! Count of extracted messages.
856 			std::size_t extracted,
857 			//! Count of handled messages.
858 			std::size_t handled,
859 			//! Status of extraction operation.
860 			mchain_props::extraction_status_t status ) noexcept
861 			:	m_extracted{ extracted }
862 			,	m_handled{ handled }
863 			,	m_status{ status }
864 			{}
865 
866 		//! Count of extracted messages.
867 		[[nodiscard]]
868 		std::size_t
extracted() const869 		extracted() const noexcept { return m_extracted; }
870 
871 		//! Count of handled messages.
872 		[[nodiscard]]
873 		std::size_t
handled() const874 		handled() const noexcept { return m_handled; }
875 
876 		//! Extraction status (e.g. no messages, chain closed and so on).
877 		[[nodiscard]]
878 		mchain_props::extraction_status_t
status() const879 		status() const noexcept { return m_status; }
880 	};
881 
882 //
883 // mchain_send_result_t
884 //
885 /*!
886  * \brief A result of attempt of sending messages to a message chain.
887  *
888  * This type plays the same role as mchain_receive_result_t but is used
889  * for send operations.
890  *
891  * \since
892  * v.5.7.0
893  */
894 class mchain_send_result_t
895 	{
896 		//! Count of messages sent.
897 		std::size_t m_sent;
898 
899 		//! The status of send operation.
900 		mchain_props::push_status_t m_status;
901 
902 	public:
903 		//! Default constructor.
904 		/*!
905 		 * Sets push_status_t::not_stored status.
906 		 */
mchain_send_result_t()907 		mchain_send_result_t() noexcept
908 			:	m_sent{ 0u }
909 			,	m_status{ mchain_props::push_status_t::not_stored }
910 			{}
911 
912 		//! Initializing constructor.
mchain_send_result_t(std::size_t sent,mchain_props::push_status_t status)913 		mchain_send_result_t(
914 			std::size_t sent,
915 			mchain_props::push_status_t status )
916 			:	m_sent{ sent }
917 			,	m_status{ status }
918 			{}
919 
920 		//! Count of messages sent.
921 		[[nodiscard]]
922 		std::size_t
sent() const923 		sent() const noexcept { return m_sent; }
924 
925 		//! Status of send operation.
926 		[[nodiscard]]
927 		mchain_props::push_status_t
status() const928 		status() const noexcept { return m_status; }
929 	};
930 
931 namespace mchain_props {
932 
933 //
934 // msg_count_status_t
935 //
936 /*!
937  * \brief Status of limit for messages to be extracted/handled
938  * during a bulk operation on a mchain.
939  *
940  * \since
941  * v.5.6.0
942  */
943 enum class msg_count_status_t
944 	{
945 		//! Message count limit is not set yet.
946 		undefined,
947 		//! Message count limit is set.
948 		defined
949 	};
950 
951 namespace details {
952 
953 //
954 // bulk_processing_basic_data_t
955 //
956 struct bulk_processing_basic_data_t
957 	{
958 		//! Type of stop-predicate.
959 		/*!
960 		 * Must return \a true if receive procedure should be stopped.
961 		 */
962 		using stop_predicate_t = std::function< bool() >;
963 
964 		//! Type of chain-closed event.
965 		using chain_closed_handler_t = std::function< void(const mchain_t &) >;
966 
967 		//! Minimal count of messages to be extracted.
968 		/*!
969 		 * Value 0 means that this parameter is not set.
970 		 */
971 		std::size_t m_to_extract = { 0 };
972 		//! Minimal count of messages to be handled.
973 		/*!
974 		 * Value 0 means that this parameter it not set.
975 		 */
976 		std::size_t m_to_handle = { 0 };
977 
978 		//! Timeout for waiting on empty queue.
979 		mchain_props::duration_t m_empty_timeout =
980 				{ mchain_props::details::infinite_wait_special_timevalue() };
981 
982 		//! Total time for all work of advanced receive.
983 		mchain_props::duration_t m_total_time =
984 				{ mchain_props::details::infinite_wait_special_timevalue() };
985 
986 		//! Optional stop-predicate.
987 		stop_predicate_t m_stop_predicate;
988 
989 		//! Optional chain-closed handler.
990 		chain_closed_handler_t m_chain_closed_handler;
991 	};
992 
993 //
994 // mchain_bulk_processing_basic_params_t
995 //
996 template< typename Basic_Data >
997 class mchain_bulk_processing_basic_params_t
998 	{
999 	public :
1000 		//! Type of stop-predicate.
1001 		using stop_predicate_t = typename Basic_Data::stop_predicate_t;
1002 
1003 		//! Type of chain-closed event.
1004 		using chain_closed_handler_t = typename Basic_Data::chain_closed_handler_t;
1005 
1006 	private :
1007 		Basic_Data m_data;
1008 
1009 	protected :
1010 		//! Set limit for count of messages to be extracted.
1011 		void
set_extract_n(std::size_t v)1012 		set_extract_n( std::size_t v ) noexcept
1013 			{
1014 				m_data.m_to_extract = v;
1015 			}
1016 
1017 		//! Set limit for count of messages to be handled.
1018 		void
set_handle_n(std::size_t v)1019 		set_handle_n( std::size_t v ) noexcept
1020 			{
1021 				m_data.m_to_handle = v;
1022 			}
1023 
1024 		//! Set timeout for waiting on empty chain.
1025 		/*!
1026 		 * \note This value will be ignored if total_time() is also used
1027 		 * to set total receive time.
1028 		 *
1029 		 * \note Argument \a v can be of type duration_t or
1030 		 * so_5::infinite_wait or so_5::no_wait.
1031 		 */
1032 		template< typename Timeout >
1033 		void
set_empty_timeout(Timeout v)1034 		set_empty_timeout( Timeout v ) noexcept
1035 			{
1036 				m_data.m_empty_timeout = mchain_props::details::actual_timeout( v );
1037 			}
1038 
1039 		//! Set total time for the whole receive operation.
1040 		/*!
1041 		 * \note Argument \a v can be of type duration_t or
1042 		 * so_5::infinite_wait or so_5::no_wait.
1043 		 */
1044 		template< typename Timeout >
1045 		void
set_total_time(Timeout v)1046 		set_total_time( Timeout v ) noexcept
1047 			{
1048 				m_data.m_total_time = mchain_props::details::actual_timeout( v );
1049 			}
1050 
1051 		//! Set user condition for stopping receive operation.
1052 		/*!
1053 		 * \note \a predicate should return \a true if receive must
1054 		 * be stopped.
1055 		 */
1056 		void
set_stop_on(stop_predicate_t predicate)1057 		set_stop_on( stop_predicate_t predicate ) noexcept
1058 			{
1059 				m_data.m_stop_predicate = std::move(predicate);
1060 			}
1061 
1062 		//! Set handler for chain-closed event.
1063 		/*!
1064 		 * If there is a previously set handler the old handler will be lost.
1065 		 */
1066 		void
set_on_close(chain_closed_handler_t handler)1067 		set_on_close( chain_closed_handler_t handler ) noexcept
1068 			{
1069 				m_data.m_chain_closed_handler = std::move(handler);
1070 			}
1071 
1072 	public :
1073 		//! Default constructor.
1074 		mchain_bulk_processing_basic_params_t() = default;
1075 
1076 		//! Initializing constructor.
mchain_bulk_processing_basic_params_t(Basic_Data data)1077 		mchain_bulk_processing_basic_params_t( Basic_Data data )
1078 			:	m_data{ std::move(data) }
1079 			{}
1080 
1081 		//! Get limit for count of messages to be extracted.
1082 		std::size_t
to_extract() const1083 		to_extract() const noexcept { return m_data.m_to_extract; }
1084 
1085 		//! Get limit for count of message to be handled.
1086 		std::size_t
to_handle() const1087 		to_handle() const noexcept { return m_data.m_to_handle; }
1088 
1089 		//! Get timeout for waiting on empty chain.
1090 		const mchain_props::duration_t &
empty_timeout() const1091 		empty_timeout() const noexcept { return m_data.m_empty_timeout; }
1092 
1093 		//! Get total time for the whole receive operation.
1094 		const mchain_props::duration_t &
total_time() const1095 		total_time() const noexcept { return m_data.m_total_time; }
1096 
1097 		//! Get user condition for stopping receive operation.
1098 		const stop_predicate_t &
stop_on() const1099 		stop_on() const noexcept
1100 			{
1101 				return m_data.m_stop_predicate;
1102 			}
1103 
1104 		//! Get handler for chain-closed event.
1105 		const chain_closed_handler_t &
closed_handler() const1106 		closed_handler() const noexcept
1107 			{
1108 				return m_data.m_chain_closed_handler;
1109 			}
1110 
1111 		//! Access to internal data.
1112 		const auto &
so5_data() const1113 		so5_data() const noexcept { return m_data; }
1114 	};
1115 
1116 } /* namespace details */
1117 
1118 } /* namespace mchain_props */
1119 
1120 //
1121 // mchain_bulk_processing_params_t
1122 //
1123 /*!
1124  * \brief Basic parameters for advanced receive from %mchain and for
1125  * multi chain select.
1126  *
1127  * \since
1128  * v.5.5.16
1129  */
1130 template< typename Data, typename Derived >
1131 class mchain_bulk_processing_params_t
1132 	:	public mchain_props::details::mchain_bulk_processing_basic_params_t< Data >
1133 	{
1134 		using basic_t =
1135 				mchain_props::details::mchain_bulk_processing_basic_params_t<Data>;
1136 
1137 	public :
1138 		using actual_type = Derived;
1139 
1140 		using data_type = Data;
1141 
1142 	protected :
1143 		//! Helper method to get a reference to itself but with a
1144 		//! type of the derived class.
1145 		actual_type &
self_reference()1146 		self_reference() { return static_cast< actual_type & >(*this); }
1147 
1148 		//! Helper method to make a clone with msg_count_status_t::defined
1149 		//! status.
1150 		decltype(auto)
clone_as_defined()1151 		clone_as_defined() noexcept
1152 			{
1153 				return self_reference().template so5_clone_if_necessary<
1154 						mchain_props::msg_count_status_t::defined >();
1155 			}
1156 
1157 	public :
1158 		//! Default constructor.
1159 		mchain_bulk_processing_params_t() = default;
1160 
1161 		//! Initializing constructor.
mchain_bulk_processing_params_t(Data data)1162 		mchain_bulk_processing_params_t( Data data )
1163 			:	basic_t{ std::move(data) }
1164 			{}
1165 
1166 		//! A directive to handle all messages until chain will be closed
1167 		//! or receiving will be stopped manually.
1168 		/*!
1169 		 * Usage example:
1170 		 * \code
1171 		 * so_5::receive(so_5::from(ch).handle_all(), ...);
1172 		 * \endcode
1173 		 *
1174 		 * \since
1175 		 * v.5.6.0
1176 		 */
1177 		decltype(auto)
handle_all()1178 		handle_all() noexcept
1179 			{
1180 				this->set_handle_n( 0u );
1181 				return clone_as_defined();
1182 			}
1183 
1184 		//! Set limit for count of messages to be extracted.
1185 		/*!
1186 		 * When extract_n() is used then receive() will be finished
1187 		 * after extraction of the specified number of message.
1188 		 *
1189 		 * Usage example:
1190 		 * \code
1191 		 * so_5::receive(so_5::from(ch).extract_n(2), ...);
1192 		 * \endcode
1193 		 */
1194 		decltype(auto)
extract_n(std::size_t v)1195 		extract_n( std::size_t v ) noexcept
1196 			{
1197 				this->set_extract_n( v );
1198 				return clone_as_defined();
1199 			}
1200 
1201 		//! Set limit for count of messages to be handled.
1202 		/*!
1203 		 * When handled_n() is used then receive() will be finished
1204 		 * after handling of the specified number of message.
1205 		 *
1206 		 * Usage example:
1207 		 * \code
1208 		 * so_5::receive(so_5::from(ch).handle_n(2), ...);
1209 		 * \endcode
1210 		 */
1211 		decltype(auto)
handle_n(std::size_t v)1212 		handle_n( std::size_t v ) noexcept
1213 			{
1214 				this->set_handle_n( v );
1215 				return clone_as_defined();
1216 			}
1217 
1218 		//! Set timeout for waiting on empty chain.
1219 		/*!
1220 		 * \note This value will be ignored if total_time() is also used
1221 		 * to set total receive time.
1222 		 *
1223 		 * \note Argument \a v can be of type duration_t or
1224 		 * so_5::infinite_wait or so_5::no_wait.
1225 		 */
1226 		template< typename Timeout >
1227 		actual_type &
empty_timeout(Timeout v)1228 		empty_timeout( Timeout v ) noexcept
1229 			{
1230 				this->set_empty_timeout( std::move(v) );
1231 				return self_reference();
1232 			}
1233 
1234 		using basic_t::empty_timeout;
1235 
1236 		/*!
1237 		 * \brief Disable waiting on the empty queue.
1238 		 *
1239 		 * \par Usage example:
1240 		 * \code
1241 			so_5::mchain_t ch = env.create_mchain(...);
1242 			receive( from(ch).no_wait_on_empty(), ... );
1243 		 * \endcode
1244 		 *
1245 		 * \note It is just a shorthand for:
1246 		 * \code
1247 			receive( from(chain).empty_timeout(std::chrono::seconds(0)), ...);
1248 		 * \endcode
1249 		 */
1250 		actual_type &
no_wait_on_empty()1251 		no_wait_on_empty() noexcept
1252 			{
1253 				return empty_timeout(
1254 						mchain_props::details::no_wait_special_timevalue() );
1255 			}
1256 
1257 		//! Set total time for the whole receive operation.
1258 		/*!
1259 		 * \note Argument \a v can be of type duration_t or
1260 		 * so_5::infinite_wait or so_5::no_wait.
1261 		 */
1262 		template< typename Timeout >
1263 		actual_type &
total_time(Timeout v)1264 		total_time( Timeout v ) noexcept
1265 			{
1266 				this->set_total_time( std::move(v) );
1267 				return self_reference();
1268 			}
1269 
1270 		using basic_t::total_time;
1271 
1272 		//! Set user condition for stopping receive operation.
1273 		/*!
1274 		 * \note \a predicate should return \a true if receive must
1275 		 * be stopped.
1276 		 */
1277 		actual_type &
stop_on(typename basic_t::stop_predicate_t predicate)1278 		stop_on( typename basic_t::stop_predicate_t predicate ) noexcept
1279 			{
1280 				this->set_stop_on( std::move(predicate) );
1281 				return self_reference();
1282 			}
1283 
1284 		using basic_t::stop_on;
1285 
1286 		//! Set handler for chain-closed event.
1287 		/*!
1288 		 * If there is a previously set handler the old handler will be lost.
1289 		 *
1290 		 * Usage example:
1291 		 * \code
1292 		 * so_5::mchain_t ch1 = so_5::create_mchain(...);
1293 		 * so_5::mchain_t ch2 = so_5::create_mchain(...);
1294 		 * ...
1295 		 * // Stop reading channels when any of channels is closed.
1296 		 * bool some_ch_closed = false;
1297 		 * so_5::select(
1298 		 * 	so_5::from_all()
1299 		 * 		.handle_all()
1300 		 * 		.on_close([&some_ch_closed](const so_5::mchain_t &) {
1301 		 * 				some_ch_closed = true;
1302 		 * 			})
1303 		 * 		.stop_on([&some_ch_closed]{ return some_ch_closed; }),
1304 		 * 	receive_case(ch1, ...)
1305 		 * 	receive_case(ch2, ...)
1306 		 * 	...);
1307 		 * \endcode
1308 		 *
1309 		 * \since
1310 		 * v.5.5.17
1311 		 */
1312 		actual_type &
on_close(typename basic_t::chain_closed_handler_t handler)1313 		on_close( typename basic_t::chain_closed_handler_t handler ) noexcept
1314 			{
1315 				this->set_on_close( std::move(handler) );
1316 				return self_reference();
1317 			}
1318 	};
1319 
1320 namespace mchain_props {
1321 
1322 namespace details {
1323 
1324 //
1325 // adv_receive_data_t
1326 //
1327 /*!
1328  * \brief Container of parameters for receive() function.
1329  *
1330  * \since
1331  * v.5.6.0
1332  */
1333 struct adv_receive_data_t : public bulk_processing_basic_data_t
1334 	{
1335 		//! A chain to be used in receive operation.
1336 		mchain_t m_chain;
1337 
1338 		//! Default constructor.
1339 		adv_receive_data_t() = default;
1340 
1341 		//! Initializing constructor.
adv_receive_data_tso_5::mchain_props::details::adv_receive_data_t1342 		adv_receive_data_t( mchain_t chain )
1343 			:	m_chain{ std::move(chain) }
1344 			{}
1345 	};
1346 
1347 } /* namespace details */
1348 
1349 } /* namespace mchain_props */
1350 
1351 //
1352 // mchain_receive_params_t
1353 //
1354 /*!
1355  * \brief Parameters for advanced receive from %mchain.
1356  *
1357  * \sa so_5::from().
1358  *
1359  * \note Derived from basic_receive_params_t since v.5.5.16.
1360  *
1361  * \tparam Msg_Count_Status status of message count limit.
1362  *
1363  * \since
1364  * v.5.5.13
1365  */
1366 template< mchain_props::msg_count_status_t Msg_Count_Status >
1367 class mchain_receive_params_t final
1368 	: public mchain_bulk_processing_params_t<
1369 	  		mchain_props::details::adv_receive_data_t,
1370 	  		mchain_receive_params_t< Msg_Count_Status > >
1371 	{
1372 		//! Short alias for base type.
1373 		using base_type = mchain_bulk_processing_params_t<
1374 				mchain_props::details::adv_receive_data_t,
1375 				mchain_receive_params_t< Msg_Count_Status > >;
1376 
1377 	public :
1378 		//! Make of clone with different Msg_Count_Status or return
1379 		//! a reference to the same object.
1380 		template< mchain_props::msg_count_status_t New_Msg_Count_Status >
1381 		[[nodiscard]]
1382 		decltype(auto)
so5_clone_if_necessary()1383 		so5_clone_if_necessary() noexcept
1384 			{
1385 				if constexpr( New_Msg_Count_Status != Msg_Count_Status )
1386 					return mchain_receive_params_t< New_Msg_Count_Status >{
1387 							this->so5_data()
1388 						};
1389 				else
1390 					return *this;
1391 			}
1392 
1393 		//! Initializing constructor.
mchain_receive_params_t(mchain_t chain)1394 		mchain_receive_params_t(
1395 			//! Chain from which messages must be extracted and handled.
1396 			mchain_t chain )
1397 			:	base_type{ typename base_type::data_type{ std::move(chain) } }
1398 			{}
1399 
1400 		//! Initializing constructor for the case of cloning.
mchain_receive_params_t(typename base_type::data_type data)1401 		mchain_receive_params_t(
1402 			typename base_type::data_type data )
1403 			:	base_type{ std::move(data) }
1404 			{}
1405 
1406 		//! Chain from which messages must be extracted and handled.
1407 		const mchain_t &
chain() const1408 		chain() const { return this->so5_data().m_chain; }
1409 	};
1410 
1411 //
1412 // from
1413 //
1414 /*!
1415  * \since
1416  * v.5.5.13
1417  *
1418  * \brief A helper function for simplification of creation of %mchain_receive_params instance.
1419  *
1420  * \attention
1421  * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1422  * called before passing result of from() to receive() function.
1423  *
1424  * \par Usage examples:
1425 	\code
1426 	so_5::mchain_t chain = env.create_mchain(...);
1427 
1428 	// Receive and handle 3 messages.
1429 	// If there is no 3 messages in the mchain the receive will wait infinitely.
1430 	// A return from receive will be after handling of 3 messages or
1431 	// if the mchain is closed explicitely.
1432 	receive( from(chain).handle_n( 3 ),
1433 			handlers... );
1434 
1435 	// Receive and handle 3 messages.
1436 	// If there is no 3 messages in the mchain the receive will wait
1437 	// no more that 200ms.
1438 	// A return from receive will be after handling of 3 messages or
1439 	// if the mchain is closed explicitely, or if there is no messages
1440 	// for more than 200ms.
1441 	receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1442 			handlers... );
1443 
1444 	// Receive all messages from the chain.
1445 	// If there is no message in the chain then wait no more than 500ms.
1446 	// A return from receive will be after explicit close of the chain
1447 	// or if there is no messages for more than 500ms.
1448 	receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
1449 			handlers... );
1450 
1451 	// Receve any number of messages from the chain but do waiting and
1452 	// handling for no more than 2s.
1453 	receive( from(chain).handle_all().total_time( seconds(2) ),
1454 			handlers... );
1455 
1456 	// Receve 1000 messages from the chain but do waiting and
1457 	// handling for no more than 2s.
1458 	receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
1459 			handlers... );
1460 	\endcode
1461  */
1462 inline mchain_receive_params_t< mchain_props::msg_count_status_t::undefined >
from(mchain_t chain)1463 from( mchain_t chain )
1464 	{
1465 		return { std::move(chain) };
1466 	}
1467 
1468 namespace mchain_props {
1469 
1470 namespace details {
1471 
1472 //
1473 // receive_actions_performer_t
1474 //
1475 /*!
1476  * \since
1477  * v.5.5.13
1478  *
1479  * \brief Helper class with implementation of main actions of
1480  * advanced receive operation.
1481  */
1482 template< typename Bunch >
1483 class receive_actions_performer_t
1484 	{
1485 		const mchain_receive_params_t< msg_count_status_t::defined > & m_params;
1486 		const Bunch & m_bunch;
1487 
1488 		std::size_t m_extracted_messages = 0;
1489 		std::size_t m_handled_messages = 0;
1490 		extraction_status_t m_status;
1491 
1492 	public :
receive_actions_performer_t(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1493 		receive_actions_performer_t(
1494 			const mchain_receive_params_t< msg_count_status_t::defined > & params,
1495 			const Bunch & bunch )
1496 			:	m_params( params )
1497 			,	m_bunch( bunch )
1498 			{}
1499 
1500 		void
handle_next(duration_t empty_timeout)1501 		handle_next( duration_t empty_timeout )
1502 			{
1503 				demand_t extracted_demand;
1504 				m_status = m_params.chain()->extract(
1505 						extracted_demand, empty_timeout );
1506 
1507 				if( extraction_status_t::msg_extracted == m_status )
1508 					{
1509 						++m_extracted_messages;
1510 						const bool handled = m_bunch.handle(
1511 								extracted_demand.m_msg_type,
1512 								extracted_demand.m_message_ref );
1513 						if( handled )
1514 							++m_handled_messages;
1515 					}
1516 				// Since v.5.5.17 we must check presence of chain-closed handler.
1517 				// This handler must be used if chain is closed.
1518 				else if( extraction_status_t::chain_closed == m_status )
1519 					{
1520 						if( const auto & handler = m_params.closed_handler() )
1521 							so_5::details::invoke_noexcept_code(
1522 								[&handler, this] {
1523 									handler( m_params.chain() );
1524 								} );
1525 					}
1526 			}
1527 
1528 		extraction_status_t
last_status() const1529 		last_status() const { return m_status; }
1530 
1531 		bool
can_continue() const1532 		can_continue() const
1533 			{
1534 				if( extraction_status_t::chain_closed == m_status )
1535 					return false;
1536 
1537 				if( m_params.to_handle() &&
1538 						m_handled_messages >= m_params.to_handle() )
1539 					return false;
1540 
1541 				if( m_params.to_extract() &&
1542 						m_extracted_messages >= m_params.to_extract() )
1543 					return false;
1544 
1545 				if( m_params.stop_on() && m_params.stop_on()() )
1546 					return false;
1547 
1548 				return true;
1549 			}
1550 
1551 		mchain_receive_result_t
make_result() const1552 		make_result() const
1553 			{
1554 				return mchain_receive_result_t{
1555 						m_extracted_messages,
1556 						m_handled_messages,
1557 						m_extracted_messages ? extraction_status_t::msg_extracted :
1558 								m_status
1559 					};
1560 			}
1561 	};
1562 
1563 /*!
1564  * \since
1565  * v.5.5.13
1566  *
1567  * \brief An implementation of advanced receive when a limit for total
1568  * operation time is defined.
1569  */
1570 template< typename Bunch >
1571 inline mchain_receive_result_t
receive_with_finite_total_time(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1572 receive_with_finite_total_time(
1573 	const mchain_receive_params_t<msg_count_status_t::defined> & params,
1574 	const Bunch & bunch )
1575 	{
1576 		receive_actions_performer_t< Bunch > performer( params, bunch );
1577 
1578 		so_5::details::remaining_time_counter_t remaining_time(
1579 				params.total_time() );
1580 		do
1581 			{
1582 				performer.handle_next( remaining_time.remaining() );
1583 				if( !performer.can_continue() )
1584 					break;
1585 				remaining_time.update();
1586 			}
1587 		while( remaining_time );
1588 
1589 		return performer.make_result();
1590 	}
1591 
1592 /*!
1593  * \since
1594  * v.5.5.13
1595  *
1596  * \brief An implementation of advanced receive when there is no
1597  * limit for total operation time is defined.
1598  */
1599 template< typename Bunch >
1600 inline mchain_receive_result_t
receive_without_total_time(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1601 receive_without_total_time(
1602 	const mchain_receive_params_t<msg_count_status_t::defined> & params,
1603 	const Bunch & bunch )
1604 	{
1605 		receive_actions_performer_t< Bunch > performer{ params, bunch };
1606 
1607 		do
1608 			{
1609 				performer.handle_next( params.empty_timeout() );
1610 
1611 				if( extraction_status_t::no_messages == performer.last_status() )
1612 					// There is no need to continue.
1613 					// This status means that empty_timeout has some value
1614 					// and there is no any new message during empty_timeout.
1615 					// And this means a condition for return from advanced
1616 					// receive.
1617 					break;
1618 			}
1619 		while( performer.can_continue() );
1620 
1621 		return performer.make_result();
1622 	}
1623 
1624 /*!
1625  * \brief An implementation of main receive actions.
1626  *
1627  * \since
1628  * v.5.5.17
1629  */
1630 template< typename Bunch >
1631 inline mchain_receive_result_t
perform_receive(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1632 perform_receive(
1633 	const mchain_receive_params_t<msg_count_status_t::defined> & params,
1634 	const Bunch & bunch )
1635 	{
1636 		if( !is_infinite_wait_timevalue( params.total_time() ) )
1637 			return receive_with_finite_total_time( params, bunch );
1638 		else
1639 			return receive_without_total_time( params, bunch );
1640 	}
1641 
1642 } /* namespace details */
1643 
1644 } /* namespace mchain_props */
1645 
1646 //
1647 // receve (advanced version)
1648 //
1649 
1650 /*!
1651  * \since
1652  * v.5.5.13
1653  *
1654  * \brief Advanced version of receive from %mchain.
1655  *
1656  * \attention It is an error if there are more than one handler for the
1657  * same message type in \a handlers.
1658  *
1659  * \attention
1660  * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1661  * called before passing result of from() to receive() function.
1662  *
1663  * \par Usage examples:
1664 	\code
1665 	so_5::mchain_t chain = env.create_mchain(...);
1666 
1667 	// Receive and handle 3 messages.
1668 	// If there is no 3 messages in the mchain the receive will wait infinitely.
1669 	// A return from receive will be after handling of 3 messages or
1670 	// if the mchain is closed explicitely.
1671 	receive( from(chain).handle_n( 3 ),
1672 			[]( const first_message_type & msg ) { ... },
1673 			[]( const second_message_type & msg ) { ... }, ... );
1674 
1675 	// Receive and handle 3 messages.
1676 	// If there is no 3 messages in the mchain the receive will wait
1677 	// no more that 200ms.
1678 	// A return from receive will be after handling of 3 messages or
1679 	// if the mchain is closed explicitely, or if there is no messages
1680 	// for more than 200ms.
1681 	receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1682 			[]( const first_message_type & msg ) { ... },
1683 			[]( const second_message_type & msg ) { ... }, ... );
1684 
1685 	// Receive all messages from the chain.
1686 	// If there is no message in the chain then wait no more than 500ms.
1687 	// A return from receive will be after explicit close of the chain
1688 	// or if there is no messages for more than 500ms.
1689 	receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
1690 			[]( const first_message_type & msg ) { ... },
1691 			[]( const second_message_type & msg ) { ... }, ... );
1692 
1693 	// Receve any number of messages from the chain but do waiting and
1694 	// handling for no more than 2s.
1695 	receive( from(chain).handle_all().total_time( seconds(2) ),
1696 			[]( const first_message_type & msg ) { ... },
1697 			[]( const second_message_type & msg ) { ... }, ... );
1698 
1699 	// Receve 1000 messages from the chain but do waiting and
1700 	// handling for no more than 2s.
1701 	receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
1702 			[]( const first_message_type & msg ) { ... },
1703 			[]( const second_message_type & msg ) { ... }, ... );
1704 	\endcode
1705  *
1706  * \par Handlers format examples:
1707  * \code
1708 	receive( ch, so_5::infinite_wait,
1709 		// Message instance by const reference.
1710 		[]( const std::string & v ) {...},
1711 		// Message instance by value (efficient for small types like int).
1712 		[]( int v ) {...},
1713 		// Message instance via mhood_t value.
1714 		[]( so_5::mhood_t< some_message > v ) {...},
1715 		// Message instance via const reference to mhood_t.
1716 		[]( const so_5::mhood_t< some_another_message > & v ) {...},
1717 		// Explicitly specified signal handler.
1718 		so_5::handler< some_signal >( []{...} ),
1719 		// Signal handler via mhood_t value.
1720 		[]( so_5::mhood_t< some_another_signal > ) {...},
1721 		// Signal handler via const reference to mhood_t.
1722 		[]( const so_5::mhood_t< yet_another_signal > & ) {...} );
1723  * \endcode
1724  */
1725 template<
1726 	mchain_props::msg_count_status_t Msg_Count_Status,
1727 	typename... Handlers >
1728 inline mchain_receive_result_t
receive(const mchain_receive_params_t<Msg_Count_Status> & params,Handlers &&...handlers)1729 receive(
1730 	//! Parameters for receive.
1731 	const mchain_receive_params_t<Msg_Count_Status> & params,
1732 	//! Handlers for message processing.
1733 	Handlers &&... handlers )
1734 	{
1735 		static_assert(
1736 				Msg_Count_Status == mchain_props::msg_count_status_t::defined,
1737 				"message count to be processed/extracted should be defined "
1738 				"by using handle_all()/handle_n()/extract_n() methods" );
1739 
1740 		using namespace so_5::details;
1741 		using namespace so_5::mchain_props;
1742 		using namespace so_5::mchain_props::details;
1743 
1744 		handlers_bunch_t< sizeof...(handlers) > bunch;
1745 		fill_handlers_bunch( bunch, 0,
1746 				std::forward< Handlers >(handlers)... );
1747 
1748 		return perform_receive( params, bunch );
1749 	}
1750 
1751 //
1752 // prepared_receive_t
1753 //
1754 /*!
1755  * \brief Special container for holding receive parameters and receive cases.
1756  *
1757  * \note Instances of that type usually used without specifying the actual
1758  * type:
1759  * \code
1760 	auto prepared = so_5::prepare_receive(
1761 		from(ch).handle_n(10).empty_timeout(10s),
1762 		some_handlers... );
1763 	...
1764 	auto r = so_5::receive( prepared );
1765  * \endcode
1766  * \note This is a moveable type, not copyable.
1767  *
1768  * \since
1769  * v.5.5.17
1770  */
1771 template< std::size_t Handlers_Count >
1772 class prepared_receive_t
1773 	{
1774 		//! Parameters for receive.
1775 		mchain_receive_params_t< mchain_props::msg_count_status_t::defined > m_params;
1776 
1777 		//! Cases for receive.
1778 		so_5::details::handlers_bunch_t< Handlers_Count > m_bunch;
1779 
1780 	public :
1781 		prepared_receive_t( const prepared_receive_t & ) = delete;
1782 		prepared_receive_t &
1783 		operator=( const prepared_receive_t & ) = delete;
1784 
1785 		//! Initializing constructor.
1786 		template< typename... Handlers >
prepared_receive_t(mchain_receive_params_t<mchain_props::msg_count_status_t::defined> params,Handlers &&...cases)1787 		prepared_receive_t(
1788 			mchain_receive_params_t< mchain_props::msg_count_status_t::defined > params,
1789 			Handlers &&... cases )
1790 			:	m_params( std::move(params) )
1791 			{
1792 				static_assert( sizeof...(Handlers) == Handlers_Count,
1793 						"Handlers_count and sizeof...(Handlers) mismatch" );
1794 
1795 				fill_handlers_bunch(
1796 						m_bunch, 0u, std::forward<Handlers>(cases)... );
1797 			}
1798 
1799 		//! Move constructor.
prepared_receive_t(prepared_receive_t && other)1800 		prepared_receive_t(
1801 			prepared_receive_t && other )
1802 			:	m_params( std::move(other.m_params) )
1803 			,	m_bunch( std::move(other.m_bunch) )
1804 			{}
1805 
1806 		//! Move operator.
1807 		prepared_receive_t &
operator =(prepared_receive_t && other)1808 		operator=( prepared_receive_t && other ) noexcept
1809 			{
1810 				prepared_receive_t tmp( std::move(other) );
1811 				swap( &this, tmp );
1812 
1813 				return *this;
1814 			}
1815 
1816 		//! Swap operation.
1817 		friend void
swap(prepared_receive_t & a,prepared_receive_t & b)1818 		swap( prepared_receive_t & a, prepared_receive_t & b ) noexcept
1819 			{
1820 				using std::swap;
1821 
1822 				swap( a.m_params, b.m_params );
1823 				swap( a.m_bunch, b.m_bunch );
1824 			}
1825 
1826 		/*!
1827 		 * \name Getters
1828 		 * \{
1829 		 */
1830 		const auto &
params() const1831 		params() const noexcept { return m_params; }
1832 
1833 		const auto &
handlers() const1834 		handlers() const noexcept { return m_bunch; }
1835 		/*!
1836 		 * \}
1837 		 */
1838 	};
1839 
1840 //
1841 // prepare_receive
1842 //
1843 /*!
1844  * \brief Create parameters for receive function to be used later.
1845  *
1846  * \attention
1847  * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1848  * called before passing result of from() to prepare_receive() function.
1849  *
1850  * Accepts all parameters as advanced receive() version. For example:
1851  * \code
1852 	// Receive and handle 3 messages.
1853 	// If there is no 3 messages in the mchain the receive will wait
1854 	// no more that 200ms.
1855 	// A return from receive will be after handling of 3 messages or
1856 	// if the mchain is closed explicitely, or if there is no messages
1857 	// for more than 200ms.
1858 	auto prepared1 = prepare_receive(
1859 			from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1860 			[]( const first_message_type & msg ) { ... },
1861 			[]( const second_message_type & msg ) { ... }, ... );
1862 
1863 	// Receive all messages from the chain.
1864 	// If there is no message in the chain then wait no more than 500ms.
1865 	// A return from receive will be after explicit close of the chain
1866 	// or if there is no messages for more than 500ms.
1867 	auto prepared2 = prepare_receive(
1868 			from(chain).handle_all().empty_timeout( milliseconds(500) ),
1869 			[]( const first_message_type & msg ) { ... },
1870 			[]( const second_message_type & msg ) { ... }, ... );
1871  * \endcode
1872  *
1873  * \since
1874  * v.5.5.17
1875  */
1876 template<
1877 	mchain_props::msg_count_status_t Msg_Count_Status,
1878 	typename... Handlers >
1879 prepared_receive_t< sizeof...(Handlers) >
prepare_receive(const mchain_receive_params_t<Msg_Count_Status> & params,Handlers &&...handlers)1880 prepare_receive(
1881 	//! Parameters for advanced receive.
1882 	const mchain_receive_params_t< Msg_Count_Status > & params,
1883 	//! Handlers
1884 	Handlers &&... handlers )
1885 	{
1886 		static_assert(
1887 				Msg_Count_Status == mchain_props::msg_count_status_t::defined,
1888 				"message count to be processed/extracted should be defined "
1889 				"by using handle_all()/handle_n()/extract_n() methods" );
1890 
1891 		return prepared_receive_t< sizeof...(Handlers) >(
1892 				params,
1893 				std::forward<Handlers>(handlers)... );
1894 	}
1895 
1896 /*!
1897  * \brief A receive operation to be done on previously prepared receive params.
1898  *
1899  * Usage of ordinary forms of receive() functions inside loops could be
1900  * inefficient because of wasting resources on constructions of internal
1901  * objects with descriptions of handlers on each receive() call.  More
1902  * efficient way is preparation of all receive params and reusing them later. A
1903  * combination of so_5::prepare_receive() and so_5::receive(prepared_receive_t)
1904  * allows to do that.
1905  *
1906  * Usage example:
1907  * \code
1908 	auto prepared = so_5::prepare_receive(
1909 		so_5::from(ch).extract_n(10).empty_timeout(200ms),
1910 		some_handlers... );
1911 	...
1912 	while( !some_condition )
1913 	{
1914 		auto r = so_5::receive( prepared );
1915 		...
1916 	}
1917  * \endcode
1918  * \since
1919  * v.5.5.17
1920  */
1921 template< std::size_t Handlers_Count >
1922 mchain_receive_result_t
receive(const prepared_receive_t<Handlers_Count> & prepared)1923 receive(
1924 	const prepared_receive_t< Handlers_Count > & prepared )
1925 	{
1926 		return mchain_props::details::perform_receive(
1927 				prepared.params(),
1928 				prepared.handlers() );
1929 	}
1930 
1931 } /* namespace so_5 */
1932 
1933