1 /*
2 * SObjectizer-5
3 */
4
5 /*!
6 * \file
7 * \brief Public part of message chain related stuff.
8 * \since
9 * v.5.5.13
10 */
11
12 #pragma once
13
14 #include <so_5/mbox.hpp>
15 #include <so_5/handler_makers.hpp>
16
17 #include <so_5/fwd.hpp>
18
19 #include <so_5/details/invoke_noexcept_code.hpp>
20 #include <so_5/details/remaining_time_counter.hpp>
21
22 #include <chrono>
23 #include <functional>
24
25 namespace so_5 {
26
27 namespace mchain_props {
28
29 /*!
30 * \since
31 * v.5.5.13
32 *
33 * \brief An alias for type for repesenting timeout values.
34 */
35 using duration_t = std::chrono::high_resolution_clock::duration;
36
37 namespace details {
38
39 //
40 // no_wait_special_timevalue
41 //
42 /*!
43 * \since
44 * v.5.5.13
45 *
46 * \brief Special value of %duration to indicate 'no_wait' case.
47 */
48 inline duration_t
no_wait_special_timevalue()49 no_wait_special_timevalue() { return duration_t::zero(); }
50
51 //
52 // infinite_wait_special_timevalue
53 //
54 /*!
55 * \since
56 * v.5.5.13
57 *
58 * \brief Special value of %duration to indicate 'infinite_wait' case.
59 */
60 inline duration_t
infinite_wait_special_timevalue()61 infinite_wait_special_timevalue() { return duration_t::max(); }
62
63 //
64 // is_no_wait_timevalue
65 //
66 /*!
67 * \since
68 * v.5.5.13
69 *
70 * \brief Is time value means 'no_wait'?
71 */
72 inline bool
is_no_wait_timevalue(duration_t v)73 is_no_wait_timevalue( duration_t v )
74 {
75 return v == no_wait_special_timevalue();
76 }
77
78 //
79 // is_infinite_wait_timevalue
80 //
81 /*!
82 * \since
83 * v.5.5.13
84 *
85 * \brief Is time value means 'infinite_wait'?
86 */
87 inline bool
is_infinite_wait_timevalue(duration_t v)88 is_infinite_wait_timevalue( duration_t v )
89 {
90 return v == infinite_wait_special_timevalue();
91 }
92
93 //
94 // actual_timeout
95 //
96
97 /*!
98 * \since
99 * v.5.5.13
100 *
101 * \brief Helper function for detection of actual value for waiting timeout.
102 *
103 * \note This helper implements convention that infinite waiting is
104 * represented as duration_t::max() value.
105 */
106 inline duration_t
actual_timeout(infinite_wait_indication)107 actual_timeout( infinite_wait_indication )
108 {
109 return infinite_wait_special_timevalue();
110 }
111
112 /*!
113 * \since
114 * v.5.5.13
115 *
116 * \brief Helper function for detection of actual value for waiting timeout.
117 *
118 * \note This helper implements convention that no waiting is
119 * represented as duration_t::zero() value.
120 */
121 inline duration_t
actual_timeout(no_wait_indication)122 actual_timeout( no_wait_indication )
123 {
124 return no_wait_special_timevalue();
125 }
126
127 /*!
128 * \since
129 * v.5.5.13
130 *
131 * \brief Helper function for detection of actual value for waiting timeout.
132 */
133 template< typename V >
134 duration_t
actual_timeout(V value)135 actual_timeout( V value )
136 {
137 return duration_t( value );
138 }
139
140 } /* namespace details */
141
142 //
143 // demand_t
144 //
145 /*!
146 * \since
147 * v.5.5.13
148 *
149 * \brief Description of one demand in message chain.
150 */
151 struct demand_t
152 {
153 //! Type of the message.
154 std::type_index m_msg_type;
155 //! Event incident.
156 so_5::message_ref_t m_message_ref;
157
158 //! Default constructor.
demand_tso_5::mchain_props::demand_t159 demand_t()
160 : m_msg_type( typeid(void) )
161 {}
162 //! Initializing constructor.
demand_tso_5::mchain_props::demand_t163 demand_t(
164 std::type_index msg_type,
165 so_5::message_ref_t message_ref )
166 : m_msg_type{ std::move(msg_type) }
167 , m_message_ref{ std::move(message_ref) }
168 {}
169
170 //! Swap operation.
171 friend void
swap(demand_t & a,demand_t & b)172 swap( demand_t & a, demand_t & b )
173 {
174 using std::swap;
175
176 swap( a.m_msg_type, b.m_msg_type );
177 swap( a.m_message_ref, b.m_message_ref );
178 }
179 };
180
181 //
182 // memory_usage_t
183 //
184 /*!
185 * \since
186 * v.5.5.13
187 *
188 * \brief Memory allocation for storage for size-limited chains.
189 */
190 enum class memory_usage_t
191 {
192 //! Storage can be allocated and deallocated dynamically.
193 dynamic,
194 //! Storage must be preallocated once and doesn't change after that.
195 preallocated
196 };
197
198 //
199 // overflow_reaction_t
200 //
201 /*!
202 * \since
203 * v.5.5.13
204 *
205 * \brief What reaction must be performed on attempt to push new message to
206 * the full message chain.
207 */
208 enum class overflow_reaction_t
209 {
210 //! Application must be aborted.
211 abort_app,
212 //! An exception must be thrown.
213 /*!
214 * \note
215 * Since v.5.5.18 this value leads to an exception only if
216 * ordinary `send` is used for pushing message to overloaded
217 * message chain. If there is an attempt to push
218 * delayed or periodic message to overloaded message chain then
219 * throw_exception reaction is replaced by drop_newest. It is becasue
220 * the context of timer thread is a special contex. No exceptions
221 * should be thrown on it.
222 */
223 throw_exception,
224 //! New message must be ignored and droped.
225 drop_newest,
226 //! Oldest message in chain must be removed.
227 remove_oldest
228 };
229
230 //
231 // capacity_t
232 //
233 /*!
234 * \since
235 * v.5.5.13
236 *
237 * \brief Parameters for defining chain size.
238 */
239 class capacity_t
240 {
241 //! Has chain unlimited size?
242 bool m_unlimited = { true };
243
244 // NOTE: all other atributes have sence only if m_unlimited != true.
245
246 //! Max size of the chain with limited size.
247 std::size_t m_max_size;
248
249 //! Type of the storage for size-limited chain.
250 memory_usage_t m_memory;
251
252 //! Type of reaction for chain overflow.
253 overflow_reaction_t m_overflow_reaction;
254
255 //! Timeout for waiting on full chain during 'message push' operation.
256 /*!
257 * \note Value 'zero' means that there must not be waiting on
258 * full chain.
259 */
260 duration_t m_overflow_timeout;
261
262 //! Initializing constructor for size-limited message chain.
capacity_t(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction,duration_t overflow_timeout)263 capacity_t(
264 std::size_t max_size,
265 memory_usage_t memory_usage,
266 overflow_reaction_t overflow_reaction,
267 duration_t overflow_timeout )
268 : m_unlimited{ false }
269 , m_max_size{ max_size }
270 , m_memory{ memory_usage }
271 , m_overflow_reaction{ overflow_reaction }
272 , m_overflow_timeout( overflow_timeout )
273 {}
274
275 public :
276 //! Default constructor.
277 /*!
278 * Creates description for size-unlimited chain.
279 */
capacity_t()280 capacity_t()
281 {}
282
283 //! Create capacity description for size-unlimited message chain.
284 inline static capacity_t
make_unlimited()285 make_unlimited() { return capacity_t{}; }
286
287 //! Create capacity description for size-limited message chain
288 //! without waiting on full queue during 'push message' operation.
289 inline static capacity_t
make_limited_without_waiting(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction)290 make_limited_without_waiting(
291 //! Max size of the chain.
292 std::size_t max_size,
293 //! Type of chain storage.
294 memory_usage_t memory_usage,
295 //! Reaction on chain overflow.
296 overflow_reaction_t overflow_reaction )
297 {
298 return capacity_t{
299 max_size,
300 memory_usage,
301 overflow_reaction,
302 details::no_wait_special_timevalue()
303 };
304 }
305
306 //! Create capacity description for size-limited message chain
307 //! with waiting on full queue during 'push message' operation.
308 inline static capacity_t
make_limited_with_waiting(std::size_t max_size,memory_usage_t memory_usage,overflow_reaction_t overflow_reaction,duration_t wait_timeout)309 make_limited_with_waiting(
310 //! Max size of the chain.
311 std::size_t max_size,
312 //! Type of chain storage.
313 memory_usage_t memory_usage,
314 //! Reaction on chain overflow.
315 overflow_reaction_t overflow_reaction,
316 //! Waiting time on full message chain.
317 duration_t wait_timeout )
318 {
319 return capacity_t{
320 max_size,
321 memory_usage,
322 overflow_reaction,
323 wait_timeout
324 };
325 }
326
327 //! Is message chain have no size limit?
328 bool
unlimited() const329 unlimited() const { return m_unlimited; }
330
331 //! Max size for size-limited chain.
332 /*!
333 * \attention Has sence only for size-limited chain.
334 */
335 std::size_t
max_size() const336 max_size() const { return m_max_size; }
337
338 //! Memory allocation type for size-limited chain.
339 /*!
340 * \attention Has sence only for size-limited chain.
341 */
342 memory_usage_t
memory_usage() const343 memory_usage() const { return m_memory; }
344
345 //! Overflow reaction for size-limited chain.
346 /*!
347 * \attention Has sence only for size-limited chain.
348 */
349 overflow_reaction_t
overflow_reaction() const350 overflow_reaction() const { return m_overflow_reaction; }
351
352 //! Is waiting timeout for overflow case defined?
353 /*!
354 * \attention Has sence only for size-limited chain.
355 */
356 bool
is_overflow_timeout_defined() const357 is_overflow_timeout_defined() const
358 {
359 return !details::is_no_wait_timevalue( m_overflow_timeout );
360 }
361
362 //! Get the value of waiting timeout for overflow case.
363 /*!
364 * \attention Has sence only for size-limited chain.
365 */
366 duration_t
overflow_timeout() const367 overflow_timeout() const
368 {
369 return m_overflow_timeout;
370 }
371 };
372
373 //
374 // extraction_status_t
375 //
376 /*!
377 * \since
378 * v.5.5.13
379 *
380 * \brief Result of extraction of message from a message chain.
381 */
382 enum class extraction_status_t
383 {
384 //! No available messages in the chain.
385 no_messages,
386 //! Message extracted successfully.
387 msg_extracted,
388 //! Message cannot be extracted because chain is closed.
389 chain_closed
390 };
391
392 //
393 // push_status_t
394 //
395 /*!
396 * \brief Result of attempt of pushing a message into a message chain.
397 *
398 * \since
399 * v.5.7.0
400 */
401 enum class push_status_t
402 {
403 //! Message wasn't stored.
404 not_stored,
405 //! Message stored into a message chain.
406 stored,
407 //! Message is not stored but the store operation is registered
408 //! into a message chain.
409 deffered,
410 //! Message wasn't stored because chain is closed.
411 chain_closed
412 };
413
414 //
415 // close_mode_t
416 //
417 /*!
418 * \since
419 * v.5.5.13
420 *
421 * \brief What to do with chain's content at close.
422 */
423 enum class close_mode_t
424 {
425 //! All messages must be removed from chain.
426 drop_content,
427 //! All messages must be retained until they will be
428 //! processed at receiver's side.
429 retain_content
430 };
431
432 //
433 // not_empty_notification_func_t
434 //
435 /*!
436 * \since
437 * v.5.5.13
438 *
439 * \brief Type of functor for notifies about arrival of a message to
440 * the empty chain.
441 *
442 * \attention This function must be noexcept.
443 */
444 using not_empty_notification_func_t = std::function< void() >;
445
446 //
447 // Forward declarations related to multi chain select operations.
448 //
449 class select_case_t;
450
451 } /* namespace mchain_props */
452
453 //
454 // abstract_message_chain_t
455 //
456 /*!
457 * \since
458 * v.5.5.13
459 *
460 * \brief An interace of message chain.
461 */
462 class SO_5_TYPE abstract_message_chain_t : protected so_5::abstract_message_box_t
463 {
464 friend class intrusive_ptr_t< abstract_message_chain_t >;
465 friend class mchain_props::select_case_t;
466
467 abstract_message_chain_t( const abstract_message_chain_t & ) = delete;
468 abstract_message_chain_t &
469 operator=( const abstract_message_chain_t & ) = delete;
470
471 protected :
472 abstract_message_chain_t() = default;
473 ~abstract_message_chain_t() noexcept override = default;
474
475 public :
476 using abstract_message_box_t::id;
477 using abstract_message_box_t::environment;
478
479 [[nodiscard]]
480 virtual mchain_props::extraction_status_t
481 extract(
482 //! Destination for extracted messages.
483 mchain_props::demand_t & dest,
484 //! Max time to wait on empty queue.
485 mchain_props::duration_t empty_queue_timeout ) = 0;
486
487 //! Cast message chain to message box.
488 so_5::mbox_t
489 as_mbox();
490
491 //! Is message chain empty?
492 virtual bool
493 empty() const = 0;
494
495 //! Count of messages in the chain.
496 virtual std::size_t
497 size() const = 0;
498
499 //! Close the chain.
500 virtual void
501 close(
502 //! What to do with chain's content.
503 mchain_props::close_mode_t mode ) = 0;
504
505 protected :
506 /*!
507 * \brief An extraction attempt as a part of multi chain select.
508 *
509 * \attention
510 * This method is a pure virtual since v.5.6.2.
511 *
512 * \note This method is intended to be used by select_case_t.
513 *
514 * \since
515 * v.5.5.16
516 */
517 [[nodiscard]]
518 virtual mchain_props::extraction_status_t
519 extract(
520 //! Destination for extracted messages.
521 mchain_props::demand_t & dest,
522 //! Select case to be stored for notification if mchain is empty.
523 mchain_props::select_case_t & select_case ) = 0;
524
525 /*!
526 * \brief An attempt to push a new message into the mchain.
527 *
528 * Unlike do_deliver_message() method the push() doesn't apply
529 * the overload reaction if the mchain if full. The \a select_case
530 * is stored to select_cases list instead.
531 *
532 * \note
533 * This method is intended to be used by select_case_t.
534 *
535 * \since
536 * v.5.7.0
537 */
538 [[nodiscard]]
539 virtual mchain_props::push_status_t
540 push(
541 //! Type of message/signal to be pushed.
542 const std::type_index & msg_type,
543 //! Message/signal to be pushed.
544 const message_ref_t & message,
545 //! Select case to be stored for notification if mchain is full.
546 mchain_props::select_case_t & select_case ) = 0;
547
548 /*!
549 * \brief Removement of mchain from multi chain select.
550 *
551 * \attention
552 * This method is a pure virtual and noexcept since v.5.6.2.
553 *
554 * \note This method is intended to be used by select_case_t.
555 *
556 * \since
557 * v.5.5.16
558 */
559 virtual void
560 remove_from_select(
561 //! Select case to be removed from notification queue.
562 mchain_props::select_case_t & select_case ) noexcept = 0;
563 };
564
565 //
566 // mchain_t
567 //
568 /*!
569 * \since
570 * v.5.5.13
571 *
572 * \brief Short name for smart pointer to message chain.
573 */
574 using mchain_t = intrusive_ptr_t< abstract_message_chain_t >;
575
576 //
577 // close_drop_content
578 //
579 /*!
580 * \since
581 * v.5.5.13
582 *
583 * \brief Helper function for closing a message chain with dropping
584 * all its content.
585 *
586 * \note Because of ADL it can be used without specifying namespaces.
587 *
588 * \par Usage example.
589 \code
590 so_5::mchain_t & ch = ...;
591 ... // Some work with chain.
592 close_drop_content( ch );
593 // Or:
594 ch->close( so_5::mchain_props::close_mode_t::drop_content );
595 \endcode
596 */
597 inline void
close_drop_content(const mchain_t & ch)598 close_drop_content( const mchain_t & ch )
599 {
600 ch->close( mchain_props::close_mode_t::drop_content );
601 }
602
603 //
604 // close_retain_content
605 //
606 /*!
607 * \since
608 * v.5.5.13
609 *
610 * \brief Helper function for closing a message chain with retaining
611 * all its content.
612 *
613 * \note Because of ADL it can be used without specifying namespaces.
614 *
615 * \par Usage example.
616 \code
617 so_5::mchain_t & ch = ...;
618 ... // Some work with chain.
619 close_retain_content( ch );
620 // Or:
621 ch->close( so_5::mchain_props::close_mode_t::retain_content );
622 \endcode
623 */
624 inline void
close_retain_content(const mchain_t & ch)625 close_retain_content( const mchain_t & ch )
626 {
627 ch->close( mchain_props::close_mode_t::retain_content );
628 }
629
630 //
631 // mchain_params_t
632 //
633 /*!
634 * \since
635 * v.5.5.13
636 *
637 * \brief Parameters for message chain.
638 */
639 class mchain_params_t
640 {
641 //! Chain's capacity.
642 mchain_props::capacity_t m_capacity;
643
644 //! An optional notificator for 'not_empty' condition.
645 mchain_props::not_empty_notification_func_t m_not_empty_notificator;
646
647 //! Is message delivery tracing disabled explicitly?
648 bool m_msg_tracing_disabled = { false };
649
650 public :
651 //! Initializing constructor.
mchain_params_t(mchain_props::capacity_t capacity)652 mchain_params_t(
653 //! Chain's capacity and related params.
654 mchain_props::capacity_t capacity )
655 : m_capacity{ capacity }
656 {}
657
658 //! Set chain's capacity and related params.
659 mchain_params_t &
capacity(mchain_props::capacity_t capacity)660 capacity( mchain_props::capacity_t capacity )
661 {
662 m_capacity = capacity;
663 return *this;
664 }
665
666 //! Get chain's capacity and related params.
667 const mchain_props::capacity_t &
capacity() const668 capacity() const
669 {
670 return m_capacity;
671 }
672
673 //! Set chain's notificator for 'not_empty' condition.
674 /*!
675 * This notificator will be called when a message is stored to
676 * the empty chain and chain becomes not empty.
677 */
678 mchain_params_t &
not_empty_notificator(mchain_props::not_empty_notification_func_t notificator)679 not_empty_notificator(
680 mchain_props::not_empty_notification_func_t notificator )
681 {
682 m_not_empty_notificator = std::move(notificator);
683 return *this;
684 }
685
686 //! Get chain's notificator for 'not_empty' condition.
687 const mchain_props::not_empty_notification_func_t &
not_empty_notificator() const688 not_empty_notificator() const
689 {
690 return m_not_empty_notificator;
691 }
692
693 //! Disable message delivery tracing explicitly.
694 /*!
695 * If this method called then message delivery tracing will
696 * not be used for that mchain even if message delivery
697 * tracing will be used for the whole SObjectizer Environment.
698 */
699 mchain_params_t &
disable_msg_tracing()700 disable_msg_tracing()
701 {
702 m_msg_tracing_disabled = true;
703 return *this;
704 }
705
706 //! Is message delivery tracing disabled explicitly?
707 bool
msg_tracing_disabled() const708 msg_tracing_disabled() const
709 {
710 return m_msg_tracing_disabled;
711 }
712 };
713
714 /*!
715 * \name Helper functions for creating parameters for %mchain.
716 * \{
717 */
718
719 /*!
720 * \since
721 * v.5.5.13
722 *
723 * \brief Create parameters for size-unlimited %mchain.
724 *
725 * \par Usage example:
726 \code
727 so_5::environment_t & env = ...;
728 auto chain = env.create_mchain( so_5::make_unlimited_mchain_params() );
729 \endcode
730 */
731 inline mchain_params_t
make_unlimited_mchain_params()732 make_unlimited_mchain_params()
733 {
734 return mchain_params_t{ mchain_props::capacity_t::make_unlimited() };
735 }
736
737 /*!
738 * \since
739 * v.5.5.13
740 *
741 * \brief Create parameters for size-limited %mchain without waiting on overflow.
742 *
743 * \par Usage example:
744 \code
745 so_5::environment_t & env = ...;
746 auto chain = env.create_mchain( so_5::make_limited_without_waiting_mchain_params(
747 // No more than 200 messages in the chain.
748 200,
749 // Memory will be allocated dynamically.
750 so_5::mchain_props::memory_usage_t::dynamic,
751 // New messages will be ignored on chain's overflow.
752 so_5::mchain_props::overflow_reaction_t::drop_newest ) );
753 \endcode
754 */
755 inline mchain_params_t
make_limited_without_waiting_mchain_params(std::size_t max_size,mchain_props::memory_usage_t memory_usage,mchain_props::overflow_reaction_t overflow_reaction)756 make_limited_without_waiting_mchain_params(
757 //! Max capacity of %mchain.
758 std::size_t max_size,
759 //! Type of chain storage.
760 mchain_props::memory_usage_t memory_usage,
761 //! Reaction on chain overflow.
762 mchain_props::overflow_reaction_t overflow_reaction )
763 {
764 return mchain_params_t{
765 mchain_props::capacity_t::make_limited_without_waiting(
766 max_size,
767 memory_usage,
768 overflow_reaction )
769 };
770 }
771
772 /*!
773 * \since
774 * v.5.5.13
775 *
776 * \brief Create parameters for size-limited %mchain with waiting on overflow.
777 *
778 * \par Usage example:
779 \code
780 so_5::environment_t & env = ...;
781 auto chain = env.create_mchain( so_5::make_limited_with_waiting_mchain_params(
782 // No more than 200 messages in the chain.
783 200,
784 // Memory will be preallocated.
785 so_5::mchain_props::memory_usage_t::preallocated,
786 // New messages will be ignored on chain's overflow.
787 so_5::mchain_props::overflow_reaction_t::drop_newest,
788 // But before dropping a new message there will be 500ms timeout
789 std::chrono::milliseconds(500) ) );
790 \endcode
791 *
792 * \note
793 * Since v.5.5.18 there is an important difference in mchain behavior.
794 * If an ordinary `send` is used for message pushing then there will
795 * be waiting for free space if the message chain is full.
796 * But if message push is performed from timer thread (it means that
797 * message is a delayed or a periodic message) then there will not be
798 * any waiting. It is because the context of timer thread is very
799 * special: there is no possibility to spend some time on waiting for
800 * some free space in message chain. All operations on the context of
801 * timer thread must be done as fast as possible.
802 */
803 inline mchain_params_t
make_limited_with_waiting_mchain_params(std::size_t max_size,mchain_props::memory_usage_t memory_usage,mchain_props::overflow_reaction_t overflow_reaction,mchain_props::duration_t wait_timeout)804 make_limited_with_waiting_mchain_params(
805 //! Max size of the chain.
806 std::size_t max_size,
807 //! Type of chain storage.
808 mchain_props::memory_usage_t memory_usage,
809 //! Reaction on chain overflow.
810 mchain_props::overflow_reaction_t overflow_reaction,
811 //! Waiting time on full message chain.
812 mchain_props::duration_t wait_timeout )
813 {
814 return mchain_params_t {
815 mchain_props::capacity_t::make_limited_with_waiting(
816 max_size,
817 memory_usage,
818 overflow_reaction,
819 wait_timeout )
820 };
821 }
822
823 /*!
824 * \}
825 */
826
827 //
828 // mchain_receive_result_t
829 //
830 /*!
831 * \since
832 * v.5.5.13
833 *
834 * \brief A result of receive from %mchain.
835 */
836 class mchain_receive_result_t
837 {
838 //! Count of extracted messages.
839 std::size_t m_extracted;
840 //! Count of handled messages.
841 std::size_t m_handled;
842 //! Extraction status (e.g. no messages, chain closed and so on).
843 mchain_props::extraction_status_t m_status;
844
845 public :
846 //! Default constructor.
mchain_receive_result_t()847 mchain_receive_result_t() noexcept
848 : m_extracted{ 0 }
849 , m_handled{ 0 }
850 , m_status{ mchain_props::extraction_status_t::no_messages }
851 {}
852
853 //! Initializing constructor.
mchain_receive_result_t(std::size_t extracted,std::size_t handled,mchain_props::extraction_status_t status)854 mchain_receive_result_t(
855 //! Count of extracted messages.
856 std::size_t extracted,
857 //! Count of handled messages.
858 std::size_t handled,
859 //! Status of extraction operation.
860 mchain_props::extraction_status_t status ) noexcept
861 : m_extracted{ extracted }
862 , m_handled{ handled }
863 , m_status{ status }
864 {}
865
866 //! Count of extracted messages.
867 [[nodiscard]]
868 std::size_t
extracted() const869 extracted() const noexcept { return m_extracted; }
870
871 //! Count of handled messages.
872 [[nodiscard]]
873 std::size_t
handled() const874 handled() const noexcept { return m_handled; }
875
876 //! Extraction status (e.g. no messages, chain closed and so on).
877 [[nodiscard]]
878 mchain_props::extraction_status_t
status() const879 status() const noexcept { return m_status; }
880 };
881
882 //
883 // mchain_send_result_t
884 //
885 /*!
886 * \brief A result of attempt of sending messages to a message chain.
887 *
888 * This type plays the same role as mchain_receive_result_t but is used
889 * for send operations.
890 *
891 * \since
892 * v.5.7.0
893 */
894 class mchain_send_result_t
895 {
896 //! Count of messages sent.
897 std::size_t m_sent;
898
899 //! The status of send operation.
900 mchain_props::push_status_t m_status;
901
902 public:
903 //! Default constructor.
904 /*!
905 * Sets push_status_t::not_stored status.
906 */
mchain_send_result_t()907 mchain_send_result_t() noexcept
908 : m_sent{ 0u }
909 , m_status{ mchain_props::push_status_t::not_stored }
910 {}
911
912 //! Initializing constructor.
mchain_send_result_t(std::size_t sent,mchain_props::push_status_t status)913 mchain_send_result_t(
914 std::size_t sent,
915 mchain_props::push_status_t status )
916 : m_sent{ sent }
917 , m_status{ status }
918 {}
919
920 //! Count of messages sent.
921 [[nodiscard]]
922 std::size_t
sent() const923 sent() const noexcept { return m_sent; }
924
925 //! Status of send operation.
926 [[nodiscard]]
927 mchain_props::push_status_t
status() const928 status() const noexcept { return m_status; }
929 };
930
931 namespace mchain_props {
932
933 //
934 // msg_count_status_t
935 //
936 /*!
937 * \brief Status of limit for messages to be extracted/handled
938 * during a bulk operation on a mchain.
939 *
940 * \since
941 * v.5.6.0
942 */
943 enum class msg_count_status_t
944 {
945 //! Message count limit is not set yet.
946 undefined,
947 //! Message count limit is set.
948 defined
949 };
950
951 namespace details {
952
953 //
954 // bulk_processing_basic_data_t
955 //
956 struct bulk_processing_basic_data_t
957 {
958 //! Type of stop-predicate.
959 /*!
960 * Must return \a true if receive procedure should be stopped.
961 */
962 using stop_predicate_t = std::function< bool() >;
963
964 //! Type of chain-closed event.
965 using chain_closed_handler_t = std::function< void(const mchain_t &) >;
966
967 //! Minimal count of messages to be extracted.
968 /*!
969 * Value 0 means that this parameter is not set.
970 */
971 std::size_t m_to_extract = { 0 };
972 //! Minimal count of messages to be handled.
973 /*!
974 * Value 0 means that this parameter it not set.
975 */
976 std::size_t m_to_handle = { 0 };
977
978 //! Timeout for waiting on empty queue.
979 mchain_props::duration_t m_empty_timeout =
980 { mchain_props::details::infinite_wait_special_timevalue() };
981
982 //! Total time for all work of advanced receive.
983 mchain_props::duration_t m_total_time =
984 { mchain_props::details::infinite_wait_special_timevalue() };
985
986 //! Optional stop-predicate.
987 stop_predicate_t m_stop_predicate;
988
989 //! Optional chain-closed handler.
990 chain_closed_handler_t m_chain_closed_handler;
991 };
992
993 //
994 // mchain_bulk_processing_basic_params_t
995 //
996 template< typename Basic_Data >
997 class mchain_bulk_processing_basic_params_t
998 {
999 public :
1000 //! Type of stop-predicate.
1001 using stop_predicate_t = typename Basic_Data::stop_predicate_t;
1002
1003 //! Type of chain-closed event.
1004 using chain_closed_handler_t = typename Basic_Data::chain_closed_handler_t;
1005
1006 private :
1007 Basic_Data m_data;
1008
1009 protected :
1010 //! Set limit for count of messages to be extracted.
1011 void
set_extract_n(std::size_t v)1012 set_extract_n( std::size_t v ) noexcept
1013 {
1014 m_data.m_to_extract = v;
1015 }
1016
1017 //! Set limit for count of messages to be handled.
1018 void
set_handle_n(std::size_t v)1019 set_handle_n( std::size_t v ) noexcept
1020 {
1021 m_data.m_to_handle = v;
1022 }
1023
1024 //! Set timeout for waiting on empty chain.
1025 /*!
1026 * \note This value will be ignored if total_time() is also used
1027 * to set total receive time.
1028 *
1029 * \note Argument \a v can be of type duration_t or
1030 * so_5::infinite_wait or so_5::no_wait.
1031 */
1032 template< typename Timeout >
1033 void
set_empty_timeout(Timeout v)1034 set_empty_timeout( Timeout v ) noexcept
1035 {
1036 m_data.m_empty_timeout = mchain_props::details::actual_timeout( v );
1037 }
1038
1039 //! Set total time for the whole receive operation.
1040 /*!
1041 * \note Argument \a v can be of type duration_t or
1042 * so_5::infinite_wait or so_5::no_wait.
1043 */
1044 template< typename Timeout >
1045 void
set_total_time(Timeout v)1046 set_total_time( Timeout v ) noexcept
1047 {
1048 m_data.m_total_time = mchain_props::details::actual_timeout( v );
1049 }
1050
1051 //! Set user condition for stopping receive operation.
1052 /*!
1053 * \note \a predicate should return \a true if receive must
1054 * be stopped.
1055 */
1056 void
set_stop_on(stop_predicate_t predicate)1057 set_stop_on( stop_predicate_t predicate ) noexcept
1058 {
1059 m_data.m_stop_predicate = std::move(predicate);
1060 }
1061
1062 //! Set handler for chain-closed event.
1063 /*!
1064 * If there is a previously set handler the old handler will be lost.
1065 */
1066 void
set_on_close(chain_closed_handler_t handler)1067 set_on_close( chain_closed_handler_t handler ) noexcept
1068 {
1069 m_data.m_chain_closed_handler = std::move(handler);
1070 }
1071
1072 public :
1073 //! Default constructor.
1074 mchain_bulk_processing_basic_params_t() = default;
1075
1076 //! Initializing constructor.
mchain_bulk_processing_basic_params_t(Basic_Data data)1077 mchain_bulk_processing_basic_params_t( Basic_Data data )
1078 : m_data{ std::move(data) }
1079 {}
1080
1081 //! Get limit for count of messages to be extracted.
1082 std::size_t
to_extract() const1083 to_extract() const noexcept { return m_data.m_to_extract; }
1084
1085 //! Get limit for count of message to be handled.
1086 std::size_t
to_handle() const1087 to_handle() const noexcept { return m_data.m_to_handle; }
1088
1089 //! Get timeout for waiting on empty chain.
1090 const mchain_props::duration_t &
empty_timeout() const1091 empty_timeout() const noexcept { return m_data.m_empty_timeout; }
1092
1093 //! Get total time for the whole receive operation.
1094 const mchain_props::duration_t &
total_time() const1095 total_time() const noexcept { return m_data.m_total_time; }
1096
1097 //! Get user condition for stopping receive operation.
1098 const stop_predicate_t &
stop_on() const1099 stop_on() const noexcept
1100 {
1101 return m_data.m_stop_predicate;
1102 }
1103
1104 //! Get handler for chain-closed event.
1105 const chain_closed_handler_t &
closed_handler() const1106 closed_handler() const noexcept
1107 {
1108 return m_data.m_chain_closed_handler;
1109 }
1110
1111 //! Access to internal data.
1112 const auto &
so5_data() const1113 so5_data() const noexcept { return m_data; }
1114 };
1115
1116 } /* namespace details */
1117
1118 } /* namespace mchain_props */
1119
1120 //
1121 // mchain_bulk_processing_params_t
1122 //
1123 /*!
1124 * \brief Basic parameters for advanced receive from %mchain and for
1125 * multi chain select.
1126 *
1127 * \since
1128 * v.5.5.16
1129 */
1130 template< typename Data, typename Derived >
1131 class mchain_bulk_processing_params_t
1132 : public mchain_props::details::mchain_bulk_processing_basic_params_t< Data >
1133 {
1134 using basic_t =
1135 mchain_props::details::mchain_bulk_processing_basic_params_t<Data>;
1136
1137 public :
1138 using actual_type = Derived;
1139
1140 using data_type = Data;
1141
1142 protected :
1143 //! Helper method to get a reference to itself but with a
1144 //! type of the derived class.
1145 actual_type &
self_reference()1146 self_reference() { return static_cast< actual_type & >(*this); }
1147
1148 //! Helper method to make a clone with msg_count_status_t::defined
1149 //! status.
1150 decltype(auto)
clone_as_defined()1151 clone_as_defined() noexcept
1152 {
1153 return self_reference().template so5_clone_if_necessary<
1154 mchain_props::msg_count_status_t::defined >();
1155 }
1156
1157 public :
1158 //! Default constructor.
1159 mchain_bulk_processing_params_t() = default;
1160
1161 //! Initializing constructor.
mchain_bulk_processing_params_t(Data data)1162 mchain_bulk_processing_params_t( Data data )
1163 : basic_t{ std::move(data) }
1164 {}
1165
1166 //! A directive to handle all messages until chain will be closed
1167 //! or receiving will be stopped manually.
1168 /*!
1169 * Usage example:
1170 * \code
1171 * so_5::receive(so_5::from(ch).handle_all(), ...);
1172 * \endcode
1173 *
1174 * \since
1175 * v.5.6.0
1176 */
1177 decltype(auto)
handle_all()1178 handle_all() noexcept
1179 {
1180 this->set_handle_n( 0u );
1181 return clone_as_defined();
1182 }
1183
1184 //! Set limit for count of messages to be extracted.
1185 /*!
1186 * When extract_n() is used then receive() will be finished
1187 * after extraction of the specified number of message.
1188 *
1189 * Usage example:
1190 * \code
1191 * so_5::receive(so_5::from(ch).extract_n(2), ...);
1192 * \endcode
1193 */
1194 decltype(auto)
extract_n(std::size_t v)1195 extract_n( std::size_t v ) noexcept
1196 {
1197 this->set_extract_n( v );
1198 return clone_as_defined();
1199 }
1200
1201 //! Set limit for count of messages to be handled.
1202 /*!
1203 * When handled_n() is used then receive() will be finished
1204 * after handling of the specified number of message.
1205 *
1206 * Usage example:
1207 * \code
1208 * so_5::receive(so_5::from(ch).handle_n(2), ...);
1209 * \endcode
1210 */
1211 decltype(auto)
handle_n(std::size_t v)1212 handle_n( std::size_t v ) noexcept
1213 {
1214 this->set_handle_n( v );
1215 return clone_as_defined();
1216 }
1217
1218 //! Set timeout for waiting on empty chain.
1219 /*!
1220 * \note This value will be ignored if total_time() is also used
1221 * to set total receive time.
1222 *
1223 * \note Argument \a v can be of type duration_t or
1224 * so_5::infinite_wait or so_5::no_wait.
1225 */
1226 template< typename Timeout >
1227 actual_type &
empty_timeout(Timeout v)1228 empty_timeout( Timeout v ) noexcept
1229 {
1230 this->set_empty_timeout( std::move(v) );
1231 return self_reference();
1232 }
1233
1234 using basic_t::empty_timeout;
1235
1236 /*!
1237 * \brief Disable waiting on the empty queue.
1238 *
1239 * \par Usage example:
1240 * \code
1241 so_5::mchain_t ch = env.create_mchain(...);
1242 receive( from(ch).no_wait_on_empty(), ... );
1243 * \endcode
1244 *
1245 * \note It is just a shorthand for:
1246 * \code
1247 receive( from(chain).empty_timeout(std::chrono::seconds(0)), ...);
1248 * \endcode
1249 */
1250 actual_type &
no_wait_on_empty()1251 no_wait_on_empty() noexcept
1252 {
1253 return empty_timeout(
1254 mchain_props::details::no_wait_special_timevalue() );
1255 }
1256
1257 //! Set total time for the whole receive operation.
1258 /*!
1259 * \note Argument \a v can be of type duration_t or
1260 * so_5::infinite_wait or so_5::no_wait.
1261 */
1262 template< typename Timeout >
1263 actual_type &
total_time(Timeout v)1264 total_time( Timeout v ) noexcept
1265 {
1266 this->set_total_time( std::move(v) );
1267 return self_reference();
1268 }
1269
1270 using basic_t::total_time;
1271
1272 //! Set user condition for stopping receive operation.
1273 /*!
1274 * \note \a predicate should return \a true if receive must
1275 * be stopped.
1276 */
1277 actual_type &
stop_on(typename basic_t::stop_predicate_t predicate)1278 stop_on( typename basic_t::stop_predicate_t predicate ) noexcept
1279 {
1280 this->set_stop_on( std::move(predicate) );
1281 return self_reference();
1282 }
1283
1284 using basic_t::stop_on;
1285
1286 //! Set handler for chain-closed event.
1287 /*!
1288 * If there is a previously set handler the old handler will be lost.
1289 *
1290 * Usage example:
1291 * \code
1292 * so_5::mchain_t ch1 = so_5::create_mchain(...);
1293 * so_5::mchain_t ch2 = so_5::create_mchain(...);
1294 * ...
1295 * // Stop reading channels when any of channels is closed.
1296 * bool some_ch_closed = false;
1297 * so_5::select(
1298 * so_5::from_all()
1299 * .handle_all()
1300 * .on_close([&some_ch_closed](const so_5::mchain_t &) {
1301 * some_ch_closed = true;
1302 * })
1303 * .stop_on([&some_ch_closed]{ return some_ch_closed; }),
1304 * receive_case(ch1, ...)
1305 * receive_case(ch2, ...)
1306 * ...);
1307 * \endcode
1308 *
1309 * \since
1310 * v.5.5.17
1311 */
1312 actual_type &
on_close(typename basic_t::chain_closed_handler_t handler)1313 on_close( typename basic_t::chain_closed_handler_t handler ) noexcept
1314 {
1315 this->set_on_close( std::move(handler) );
1316 return self_reference();
1317 }
1318 };
1319
1320 namespace mchain_props {
1321
1322 namespace details {
1323
1324 //
1325 // adv_receive_data_t
1326 //
1327 /*!
1328 * \brief Container of parameters for receive() function.
1329 *
1330 * \since
1331 * v.5.6.0
1332 */
1333 struct adv_receive_data_t : public bulk_processing_basic_data_t
1334 {
1335 //! A chain to be used in receive operation.
1336 mchain_t m_chain;
1337
1338 //! Default constructor.
1339 adv_receive_data_t() = default;
1340
1341 //! Initializing constructor.
adv_receive_data_tso_5::mchain_props::details::adv_receive_data_t1342 adv_receive_data_t( mchain_t chain )
1343 : m_chain{ std::move(chain) }
1344 {}
1345 };
1346
1347 } /* namespace details */
1348
1349 } /* namespace mchain_props */
1350
1351 //
1352 // mchain_receive_params_t
1353 //
1354 /*!
1355 * \brief Parameters for advanced receive from %mchain.
1356 *
1357 * \sa so_5::from().
1358 *
1359 * \note Derived from basic_receive_params_t since v.5.5.16.
1360 *
1361 * \tparam Msg_Count_Status status of message count limit.
1362 *
1363 * \since
1364 * v.5.5.13
1365 */
1366 template< mchain_props::msg_count_status_t Msg_Count_Status >
1367 class mchain_receive_params_t final
1368 : public mchain_bulk_processing_params_t<
1369 mchain_props::details::adv_receive_data_t,
1370 mchain_receive_params_t< Msg_Count_Status > >
1371 {
1372 //! Short alias for base type.
1373 using base_type = mchain_bulk_processing_params_t<
1374 mchain_props::details::adv_receive_data_t,
1375 mchain_receive_params_t< Msg_Count_Status > >;
1376
1377 public :
1378 //! Make of clone with different Msg_Count_Status or return
1379 //! a reference to the same object.
1380 template< mchain_props::msg_count_status_t New_Msg_Count_Status >
1381 [[nodiscard]]
1382 decltype(auto)
so5_clone_if_necessary()1383 so5_clone_if_necessary() noexcept
1384 {
1385 if constexpr( New_Msg_Count_Status != Msg_Count_Status )
1386 return mchain_receive_params_t< New_Msg_Count_Status >{
1387 this->so5_data()
1388 };
1389 else
1390 return *this;
1391 }
1392
1393 //! Initializing constructor.
mchain_receive_params_t(mchain_t chain)1394 mchain_receive_params_t(
1395 //! Chain from which messages must be extracted and handled.
1396 mchain_t chain )
1397 : base_type{ typename base_type::data_type{ std::move(chain) } }
1398 {}
1399
1400 //! Initializing constructor for the case of cloning.
mchain_receive_params_t(typename base_type::data_type data)1401 mchain_receive_params_t(
1402 typename base_type::data_type data )
1403 : base_type{ std::move(data) }
1404 {}
1405
1406 //! Chain from which messages must be extracted and handled.
1407 const mchain_t &
chain() const1408 chain() const { return this->so5_data().m_chain; }
1409 };
1410
1411 //
1412 // from
1413 //
1414 /*!
1415 * \since
1416 * v.5.5.13
1417 *
1418 * \brief A helper function for simplification of creation of %mchain_receive_params instance.
1419 *
1420 * \attention
1421 * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1422 * called before passing result of from() to receive() function.
1423 *
1424 * \par Usage examples:
1425 \code
1426 so_5::mchain_t chain = env.create_mchain(...);
1427
1428 // Receive and handle 3 messages.
1429 // If there is no 3 messages in the mchain the receive will wait infinitely.
1430 // A return from receive will be after handling of 3 messages or
1431 // if the mchain is closed explicitely.
1432 receive( from(chain).handle_n( 3 ),
1433 handlers... );
1434
1435 // Receive and handle 3 messages.
1436 // If there is no 3 messages in the mchain the receive will wait
1437 // no more that 200ms.
1438 // A return from receive will be after handling of 3 messages or
1439 // if the mchain is closed explicitely, or if there is no messages
1440 // for more than 200ms.
1441 receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1442 handlers... );
1443
1444 // Receive all messages from the chain.
1445 // If there is no message in the chain then wait no more than 500ms.
1446 // A return from receive will be after explicit close of the chain
1447 // or if there is no messages for more than 500ms.
1448 receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
1449 handlers... );
1450
1451 // Receve any number of messages from the chain but do waiting and
1452 // handling for no more than 2s.
1453 receive( from(chain).handle_all().total_time( seconds(2) ),
1454 handlers... );
1455
1456 // Receve 1000 messages from the chain but do waiting and
1457 // handling for no more than 2s.
1458 receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
1459 handlers... );
1460 \endcode
1461 */
1462 inline mchain_receive_params_t< mchain_props::msg_count_status_t::undefined >
from(mchain_t chain)1463 from( mchain_t chain )
1464 {
1465 return { std::move(chain) };
1466 }
1467
1468 namespace mchain_props {
1469
1470 namespace details {
1471
1472 //
1473 // receive_actions_performer_t
1474 //
1475 /*!
1476 * \since
1477 * v.5.5.13
1478 *
1479 * \brief Helper class with implementation of main actions of
1480 * advanced receive operation.
1481 */
1482 template< typename Bunch >
1483 class receive_actions_performer_t
1484 {
1485 const mchain_receive_params_t< msg_count_status_t::defined > & m_params;
1486 const Bunch & m_bunch;
1487
1488 std::size_t m_extracted_messages = 0;
1489 std::size_t m_handled_messages = 0;
1490 extraction_status_t m_status;
1491
1492 public :
receive_actions_performer_t(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1493 receive_actions_performer_t(
1494 const mchain_receive_params_t< msg_count_status_t::defined > & params,
1495 const Bunch & bunch )
1496 : m_params( params )
1497 , m_bunch( bunch )
1498 {}
1499
1500 void
handle_next(duration_t empty_timeout)1501 handle_next( duration_t empty_timeout )
1502 {
1503 demand_t extracted_demand;
1504 m_status = m_params.chain()->extract(
1505 extracted_demand, empty_timeout );
1506
1507 if( extraction_status_t::msg_extracted == m_status )
1508 {
1509 ++m_extracted_messages;
1510 const bool handled = m_bunch.handle(
1511 extracted_demand.m_msg_type,
1512 extracted_demand.m_message_ref );
1513 if( handled )
1514 ++m_handled_messages;
1515 }
1516 // Since v.5.5.17 we must check presence of chain-closed handler.
1517 // This handler must be used if chain is closed.
1518 else if( extraction_status_t::chain_closed == m_status )
1519 {
1520 if( const auto & handler = m_params.closed_handler() )
1521 so_5::details::invoke_noexcept_code(
1522 [&handler, this] {
1523 handler( m_params.chain() );
1524 } );
1525 }
1526 }
1527
1528 extraction_status_t
last_status() const1529 last_status() const { return m_status; }
1530
1531 bool
can_continue() const1532 can_continue() const
1533 {
1534 if( extraction_status_t::chain_closed == m_status )
1535 return false;
1536
1537 if( m_params.to_handle() &&
1538 m_handled_messages >= m_params.to_handle() )
1539 return false;
1540
1541 if( m_params.to_extract() &&
1542 m_extracted_messages >= m_params.to_extract() )
1543 return false;
1544
1545 if( m_params.stop_on() && m_params.stop_on()() )
1546 return false;
1547
1548 return true;
1549 }
1550
1551 mchain_receive_result_t
make_result() const1552 make_result() const
1553 {
1554 return mchain_receive_result_t{
1555 m_extracted_messages,
1556 m_handled_messages,
1557 m_extracted_messages ? extraction_status_t::msg_extracted :
1558 m_status
1559 };
1560 }
1561 };
1562
1563 /*!
1564 * \since
1565 * v.5.5.13
1566 *
1567 * \brief An implementation of advanced receive when a limit for total
1568 * operation time is defined.
1569 */
1570 template< typename Bunch >
1571 inline mchain_receive_result_t
receive_with_finite_total_time(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1572 receive_with_finite_total_time(
1573 const mchain_receive_params_t<msg_count_status_t::defined> & params,
1574 const Bunch & bunch )
1575 {
1576 receive_actions_performer_t< Bunch > performer( params, bunch );
1577
1578 so_5::details::remaining_time_counter_t remaining_time(
1579 params.total_time() );
1580 do
1581 {
1582 performer.handle_next( remaining_time.remaining() );
1583 if( !performer.can_continue() )
1584 break;
1585 remaining_time.update();
1586 }
1587 while( remaining_time );
1588
1589 return performer.make_result();
1590 }
1591
1592 /*!
1593 * \since
1594 * v.5.5.13
1595 *
1596 * \brief An implementation of advanced receive when there is no
1597 * limit for total operation time is defined.
1598 */
1599 template< typename Bunch >
1600 inline mchain_receive_result_t
receive_without_total_time(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1601 receive_without_total_time(
1602 const mchain_receive_params_t<msg_count_status_t::defined> & params,
1603 const Bunch & bunch )
1604 {
1605 receive_actions_performer_t< Bunch > performer{ params, bunch };
1606
1607 do
1608 {
1609 performer.handle_next( params.empty_timeout() );
1610
1611 if( extraction_status_t::no_messages == performer.last_status() )
1612 // There is no need to continue.
1613 // This status means that empty_timeout has some value
1614 // and there is no any new message during empty_timeout.
1615 // And this means a condition for return from advanced
1616 // receive.
1617 break;
1618 }
1619 while( performer.can_continue() );
1620
1621 return performer.make_result();
1622 }
1623
1624 /*!
1625 * \brief An implementation of main receive actions.
1626 *
1627 * \since
1628 * v.5.5.17
1629 */
1630 template< typename Bunch >
1631 inline mchain_receive_result_t
perform_receive(const mchain_receive_params_t<msg_count_status_t::defined> & params,const Bunch & bunch)1632 perform_receive(
1633 const mchain_receive_params_t<msg_count_status_t::defined> & params,
1634 const Bunch & bunch )
1635 {
1636 if( !is_infinite_wait_timevalue( params.total_time() ) )
1637 return receive_with_finite_total_time( params, bunch );
1638 else
1639 return receive_without_total_time( params, bunch );
1640 }
1641
1642 } /* namespace details */
1643
1644 } /* namespace mchain_props */
1645
1646 //
1647 // receve (advanced version)
1648 //
1649
1650 /*!
1651 * \since
1652 * v.5.5.13
1653 *
1654 * \brief Advanced version of receive from %mchain.
1655 *
1656 * \attention It is an error if there are more than one handler for the
1657 * same message type in \a handlers.
1658 *
1659 * \attention
1660 * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1661 * called before passing result of from() to receive() function.
1662 *
1663 * \par Usage examples:
1664 \code
1665 so_5::mchain_t chain = env.create_mchain(...);
1666
1667 // Receive and handle 3 messages.
1668 // If there is no 3 messages in the mchain the receive will wait infinitely.
1669 // A return from receive will be after handling of 3 messages or
1670 // if the mchain is closed explicitely.
1671 receive( from(chain).handle_n( 3 ),
1672 []( const first_message_type & msg ) { ... },
1673 []( const second_message_type & msg ) { ... }, ... );
1674
1675 // Receive and handle 3 messages.
1676 // If there is no 3 messages in the mchain the receive will wait
1677 // no more that 200ms.
1678 // A return from receive will be after handling of 3 messages or
1679 // if the mchain is closed explicitely, or if there is no messages
1680 // for more than 200ms.
1681 receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1682 []( const first_message_type & msg ) { ... },
1683 []( const second_message_type & msg ) { ... }, ... );
1684
1685 // Receive all messages from the chain.
1686 // If there is no message in the chain then wait no more than 500ms.
1687 // A return from receive will be after explicit close of the chain
1688 // or if there is no messages for more than 500ms.
1689 receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
1690 []( const first_message_type & msg ) { ... },
1691 []( const second_message_type & msg ) { ... }, ... );
1692
1693 // Receve any number of messages from the chain but do waiting and
1694 // handling for no more than 2s.
1695 receive( from(chain).handle_all().total_time( seconds(2) ),
1696 []( const first_message_type & msg ) { ... },
1697 []( const second_message_type & msg ) { ... }, ... );
1698
1699 // Receve 1000 messages from the chain but do waiting and
1700 // handling for no more than 2s.
1701 receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
1702 []( const first_message_type & msg ) { ... },
1703 []( const second_message_type & msg ) { ... }, ... );
1704 \endcode
1705 *
1706 * \par Handlers format examples:
1707 * \code
1708 receive( ch, so_5::infinite_wait,
1709 // Message instance by const reference.
1710 []( const std::string & v ) {...},
1711 // Message instance by value (efficient for small types like int).
1712 []( int v ) {...},
1713 // Message instance via mhood_t value.
1714 []( so_5::mhood_t< some_message > v ) {...},
1715 // Message instance via const reference to mhood_t.
1716 []( const so_5::mhood_t< some_another_message > & v ) {...},
1717 // Explicitly specified signal handler.
1718 so_5::handler< some_signal >( []{...} ),
1719 // Signal handler via mhood_t value.
1720 []( so_5::mhood_t< some_another_signal > ) {...},
1721 // Signal handler via const reference to mhood_t.
1722 []( const so_5::mhood_t< yet_another_signal > & ) {...} );
1723 * \endcode
1724 */
1725 template<
1726 mchain_props::msg_count_status_t Msg_Count_Status,
1727 typename... Handlers >
1728 inline mchain_receive_result_t
receive(const mchain_receive_params_t<Msg_Count_Status> & params,Handlers &&...handlers)1729 receive(
1730 //! Parameters for receive.
1731 const mchain_receive_params_t<Msg_Count_Status> & params,
1732 //! Handlers for message processing.
1733 Handlers &&... handlers )
1734 {
1735 static_assert(
1736 Msg_Count_Status == mchain_props::msg_count_status_t::defined,
1737 "message count to be processed/extracted should be defined "
1738 "by using handle_all()/handle_n()/extract_n() methods" );
1739
1740 using namespace so_5::details;
1741 using namespace so_5::mchain_props;
1742 using namespace so_5::mchain_props::details;
1743
1744 handlers_bunch_t< sizeof...(handlers) > bunch;
1745 fill_handlers_bunch( bunch, 0,
1746 std::forward< Handlers >(handlers)... );
1747
1748 return perform_receive( params, bunch );
1749 }
1750
1751 //
1752 // prepared_receive_t
1753 //
1754 /*!
1755 * \brief Special container for holding receive parameters and receive cases.
1756 *
1757 * \note Instances of that type usually used without specifying the actual
1758 * type:
1759 * \code
1760 auto prepared = so_5::prepare_receive(
1761 from(ch).handle_n(10).empty_timeout(10s),
1762 some_handlers... );
1763 ...
1764 auto r = so_5::receive( prepared );
1765 * \endcode
1766 * \note This is a moveable type, not copyable.
1767 *
1768 * \since
1769 * v.5.5.17
1770 */
1771 template< std::size_t Handlers_Count >
1772 class prepared_receive_t
1773 {
1774 //! Parameters for receive.
1775 mchain_receive_params_t< mchain_props::msg_count_status_t::defined > m_params;
1776
1777 //! Cases for receive.
1778 so_5::details::handlers_bunch_t< Handlers_Count > m_bunch;
1779
1780 public :
1781 prepared_receive_t( const prepared_receive_t & ) = delete;
1782 prepared_receive_t &
1783 operator=( const prepared_receive_t & ) = delete;
1784
1785 //! Initializing constructor.
1786 template< typename... Handlers >
prepared_receive_t(mchain_receive_params_t<mchain_props::msg_count_status_t::defined> params,Handlers &&...cases)1787 prepared_receive_t(
1788 mchain_receive_params_t< mchain_props::msg_count_status_t::defined > params,
1789 Handlers &&... cases )
1790 : m_params( std::move(params) )
1791 {
1792 static_assert( sizeof...(Handlers) == Handlers_Count,
1793 "Handlers_count and sizeof...(Handlers) mismatch" );
1794
1795 fill_handlers_bunch(
1796 m_bunch, 0u, std::forward<Handlers>(cases)... );
1797 }
1798
1799 //! Move constructor.
prepared_receive_t(prepared_receive_t && other)1800 prepared_receive_t(
1801 prepared_receive_t && other )
1802 : m_params( std::move(other.m_params) )
1803 , m_bunch( std::move(other.m_bunch) )
1804 {}
1805
1806 //! Move operator.
1807 prepared_receive_t &
operator =(prepared_receive_t && other)1808 operator=( prepared_receive_t && other ) noexcept
1809 {
1810 prepared_receive_t tmp( std::move(other) );
1811 swap( &this, tmp );
1812
1813 return *this;
1814 }
1815
1816 //! Swap operation.
1817 friend void
swap(prepared_receive_t & a,prepared_receive_t & b)1818 swap( prepared_receive_t & a, prepared_receive_t & b ) noexcept
1819 {
1820 using std::swap;
1821
1822 swap( a.m_params, b.m_params );
1823 swap( a.m_bunch, b.m_bunch );
1824 }
1825
1826 /*!
1827 * \name Getters
1828 * \{
1829 */
1830 const auto &
params() const1831 params() const noexcept { return m_params; }
1832
1833 const auto &
handlers() const1834 handlers() const noexcept { return m_bunch; }
1835 /*!
1836 * \}
1837 */
1838 };
1839
1840 //
1841 // prepare_receive
1842 //
1843 /*!
1844 * \brief Create parameters for receive function to be used later.
1845 *
1846 * \attention
1847 * Since v.5.6.0 at least handle_all(), handle_n() or extract_n() should be
1848 * called before passing result of from() to prepare_receive() function.
1849 *
1850 * Accepts all parameters as advanced receive() version. For example:
1851 * \code
1852 // Receive and handle 3 messages.
1853 // If there is no 3 messages in the mchain the receive will wait
1854 // no more that 200ms.
1855 // A return from receive will be after handling of 3 messages or
1856 // if the mchain is closed explicitely, or if there is no messages
1857 // for more than 200ms.
1858 auto prepared1 = prepare_receive(
1859 from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
1860 []( const first_message_type & msg ) { ... },
1861 []( const second_message_type & msg ) { ... }, ... );
1862
1863 // Receive all messages from the chain.
1864 // If there is no message in the chain then wait no more than 500ms.
1865 // A return from receive will be after explicit close of the chain
1866 // or if there is no messages for more than 500ms.
1867 auto prepared2 = prepare_receive(
1868 from(chain).handle_all().empty_timeout( milliseconds(500) ),
1869 []( const first_message_type & msg ) { ... },
1870 []( const second_message_type & msg ) { ... }, ... );
1871 * \endcode
1872 *
1873 * \since
1874 * v.5.5.17
1875 */
1876 template<
1877 mchain_props::msg_count_status_t Msg_Count_Status,
1878 typename... Handlers >
1879 prepared_receive_t< sizeof...(Handlers) >
prepare_receive(const mchain_receive_params_t<Msg_Count_Status> & params,Handlers &&...handlers)1880 prepare_receive(
1881 //! Parameters for advanced receive.
1882 const mchain_receive_params_t< Msg_Count_Status > & params,
1883 //! Handlers
1884 Handlers &&... handlers )
1885 {
1886 static_assert(
1887 Msg_Count_Status == mchain_props::msg_count_status_t::defined,
1888 "message count to be processed/extracted should be defined "
1889 "by using handle_all()/handle_n()/extract_n() methods" );
1890
1891 return prepared_receive_t< sizeof...(Handlers) >(
1892 params,
1893 std::forward<Handlers>(handlers)... );
1894 }
1895
1896 /*!
1897 * \brief A receive operation to be done on previously prepared receive params.
1898 *
1899 * Usage of ordinary forms of receive() functions inside loops could be
1900 * inefficient because of wasting resources on constructions of internal
1901 * objects with descriptions of handlers on each receive() call. More
1902 * efficient way is preparation of all receive params and reusing them later. A
1903 * combination of so_5::prepare_receive() and so_5::receive(prepared_receive_t)
1904 * allows to do that.
1905 *
1906 * Usage example:
1907 * \code
1908 auto prepared = so_5::prepare_receive(
1909 so_5::from(ch).extract_n(10).empty_timeout(200ms),
1910 some_handlers... );
1911 ...
1912 while( !some_condition )
1913 {
1914 auto r = so_5::receive( prepared );
1915 ...
1916 }
1917 * \endcode
1918 * \since
1919 * v.5.5.17
1920 */
1921 template< std::size_t Handlers_Count >
1922 mchain_receive_result_t
receive(const prepared_receive_t<Handlers_Count> & prepared)1923 receive(
1924 const prepared_receive_t< Handlers_Count > & prepared )
1925 {
1926 return mchain_props::details::perform_receive(
1927 prepared.params(),
1928 prepared.handlers() );
1929 }
1930
1931 } /* namespace so_5 */
1932
1933