1 /*
2 	SObjectizer 5.
3 */
4 
5 #include <so_5/disp/active_group/pub.hpp>
6 
7 #include <so_5/send_functions.hpp>
8 
9 #include <so_5/details/rollback_on_exception.hpp>
10 
11 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
12 #include <so_5/disp/reuse/make_actual_dispatcher.hpp>
13 
14 #include <so_5/disp/reuse/work_thread/work_thread.hpp>
15 
16 #include <so_5/stats/repository.hpp>
17 #include <so_5/stats/messages.hpp>
18 #include <so_5/stats/std_names.hpp>
19 
20 #include <map>
21 #include <mutex>
22 #include <algorithm>
23 
24 namespace so_5
25 {
26 
27 namespace disp
28 {
29 
30 namespace active_group
31 {
32 
33 namespace impl
34 {
35 
36 namespace work_thread = so_5::disp::reuse::work_thread;
37 namespace stats = so_5::stats;
38 
39 namespace
40 {
41 
42 /*!
43  * \brief Just a helper function for consequetive call to shutdown and wait.
44  *
45  * \since
46  * v.5.5.4
47  */
48 template< class T >
49 void
shutdown_and_wait(T & w)50 shutdown_and_wait( T & w )
51 	{
52 		w.shutdown();
53 		w.wait();
54 	}
55 
56 void
send_thread_activity_stats(const so_5::mbox_t &,const stats::prefix_t &,work_thread::work_thread_no_activity_tracking_t &)57 send_thread_activity_stats(
58 	const so_5::mbox_t &,
59 	const stats::prefix_t &,
60 	work_thread::work_thread_no_activity_tracking_t & )
61 	{
62 		/* Nothing to do */
63 	}
64 
65 void
send_thread_activity_stats(const so_5::mbox_t & mbox,const stats::prefix_t & prefix,work_thread::work_thread_with_activity_tracking_t & wt)66 send_thread_activity_stats(
67 	const so_5::mbox_t & mbox,
68 	const stats::prefix_t & prefix,
69 	work_thread::work_thread_with_activity_tracking_t & wt )
70 	{
71 		so_5::send< stats::messages::work_thread_activity >(
72 				mbox,
73 				prefix,
74 				stats::suffixes::work_thread_activity(),
75 				wt.thread_id(),
76 				wt.take_activity_stats() );
77 	}
78 
79 } /* anonymous */
80 
81 //
82 // actual_dispatcher_iface_t
83 //
84 /*!
85  * \brief An actual interface of active group dispatcher.
86  *
87  * \since
88  * v.5.6.0
89  */
90 class actual_dispatcher_iface_t : public basic_dispatcher_iface_t
91 	{
92 	public :
93 		/*!
94 		 * \brief Create a new thread for a group if it necessary.
95 		 *
96 		 * If name \a group_name is unknown then a new work
97 		 * thread is started. This thread is marked as it has one
98 		 * working agent on it.
99 		 *
100 		 * If there already is a thread for \a group_name then the
101 		 * counter of working agents is incremented.
102 		 */
103 		virtual void
104 		allocate_thread_for_group( const std::string & group_name ) = 0;
105 
106 		/*!
107 		 * \brief Get the event_queue for the specified active group.
108 		 *
109 		 * It is expected that thread for the group is already
110 		 * created by calling allocate_thread_for_group() method.
111 		 */
112 		virtual so_5::event_queue_t *
113 		query_thread_for_group( const std::string & group_name ) noexcept = 0;
114 
115 		/*!
116 		 * \brief Release the thread for the specified active group.
117 		 *
118 		 * Method decrements the working agent count for the thread of
119 		 * \a group_name. If there no more working agents left then
120 		 * the event_queue and working thread for that group will be
121 		 * destroyed.
122 		 */
123 		virtual void
124 		release_thread_for_group( const std::string & group_name ) noexcept = 0;
125 	};
126 
127 //
128 // actual_dispatcher_iface_shptr_t
129 //
130 using actual_dispatcher_iface_shptr_t =
131 		std::shared_ptr< actual_dispatcher_iface_t >;
132 
133 //
134 // actual_binder_t
135 //
136 /*!
137  * \brief Implementation of binder interface for %active_group dispatcher.
138  *
139  * \since
140  * v.5.6.0
141  */
142 class actual_binder_t final : public disp_binder_t
143 	{
144 		//! Dispatcher to be used.
145 		actual_dispatcher_iface_shptr_t m_disp;
146 		//! Name of group for new agents.
147 		const std::string m_group_name;
148 
149 	public :
actual_binder_t(actual_dispatcher_iface_shptr_t disp,nonempty_name_t group_name)150 		actual_binder_t(
151 			actual_dispatcher_iface_shptr_t disp,
152 			nonempty_name_t group_name ) noexcept
153 			:	m_disp{ std::move(disp) }
154 			,	m_group_name{ group_name.giveout_value() }
155 			{}
156 
157 		void
preallocate_resources(agent_t &)158 		preallocate_resources(
159 			agent_t & /*agent*/ ) override
160 			{
161 				m_disp->allocate_thread_for_group( m_group_name );
162 			}
163 
164 		void
undo_preallocation(agent_t &)165 		undo_preallocation(
166 			agent_t & /*agent*/ ) noexcept override
167 			{
168 				m_disp->release_thread_for_group( m_group_name );
169 			}
170 
171 		void
bind(agent_t & agent)172 		bind(
173 			agent_t & agent ) noexcept override
174 			{
175 				auto queue = m_disp->query_thread_for_group( m_group_name );
176 				agent.so_bind_to_dispatcher( *queue );
177 			}
178 
179 		void
unbind(agent_t &)180 		unbind(
181 			agent_t & /*agent*/ ) noexcept override
182 			{
183 				m_disp->release_thread_for_group( m_group_name );
184 			}
185 	};
186 
187 //
188 // dispatcher_template_t
189 //
190 
191 /*!
192  * \brief Implementation of active object dispatcher in form of template class.
193  */
194 template< typename Work_Thread >
195 class dispatcher_template_t final : public actual_dispatcher_iface_t
196 	{
197 	public:
dispatcher_template_t(outliving_reference_t<environment_t> env,const std::string_view name_base,disp_params_t params)198 		dispatcher_template_t(
199 			//! SObjectizer Environment to work in.
200 			outliving_reference_t< environment_t > env,
201 			//! Base part of data sources names.
202 			const std::string_view name_base,
203 			//! Dispatcher's parameters.
204 			disp_params_t params )
205 			:	m_params{ std::move(params) }
206 			,	m_data_source{
207 					outliving_mutable(env.get().stats_repository()),
208 					name_base,
209 					outliving_mutable( *this )
210 				}
211 			{}
212 
~dispatcher_template_t()213 		~dispatcher_template_t() noexcept override
214 			{
215 				// All working threads should receive stop signal.
216 				for( auto & p: m_groups )
217 					p.second.m_thread->shutdown();
218 
219 				// All working threads should be joined.
220 				for( auto & p: m_groups )
221 					p.second.m_thread->wait();
222 			}
223 
224 		disp_binder_shptr_t
binder(nonempty_name_t group_name)225 		binder( nonempty_name_t group_name ) override
226 			{
227 				return std::make_shared< actual_binder_t >(
228 						this->shared_from_this(),
229 						std::move(group_name) );
230 			}
231 
232 		void
allocate_thread_for_group(const std::string & group_name)233 		allocate_thread_for_group( const std::string & group_name ) override
234 			{
235 				std::lock_guard< std::mutex > lock{ m_lock };
236 
237 				auto it = m_groups.find( group_name );
238 
239 				if( m_groups.end() == it )
240 					{
241 						// New thread should be created.
242 						auto thread = std::make_shared< Work_Thread >(
243 								m_params.queue_params().lock_factory() );
244 
245 						thread->start();
246 
247 						so_5::details::do_with_rollback_on_exception(
248 								[&] {
249 									m_groups.emplace(
250 											group_name,
251 											thread_with_refcounter_t{ thread, 1u } );
252 								},
253 								[&thread] { shutdown_and_wait( *thread ); } );
254 					}
255 				else
256 					{
257 						// Number of agents bound has to be incremented now.
258 						it->second.m_user_agent += 1u;
259 					}
260 			}
261 
262 		so_5::event_queue_t *
query_thread_for_group(const std::string & group_name)263 		query_thread_for_group( const std::string & group_name ) noexcept override
264 			{
265 				std::lock_guard< std::mutex > lock{ m_lock };
266 
267 				return m_groups.find( group_name )->second.m_thread->
268 						get_agent_binding();
269 			}
270 
271 		void
release_thread_for_group(const std::string & group_name)272 		release_thread_for_group( const std::string & group_name ) noexcept override
273 			{
274 				auto thread = search_and_try_remove_group_from_map( group_name );
275 				if( thread )
276 					shutdown_and_wait( *thread );
277 			}
278 
279 	private:
280 		friend class disp_data_source_t;
281 
282 		//! An alias for shared pointer to work thread.
283 		using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
284 
285 		//! Auxiliary class for the working agent counting.
286 		struct thread_with_refcounter_t
287 			{
288 				work_thread_shptr_t m_thread;
289 				std::size_t m_user_agent;
290 			};
291 
292 		//! Typedef for mapping from group names to a single thread
293 		//! dispatcher.
294 		using active_group_map_t =
295 				std::map< std::string, thread_with_refcounter_t >;
296 
297 		/*!
298 		 * \brief Data source for run-time monitoring of whole dispatcher.
299 		 *
300 		 * \since
301 		 * v.5.5.4
302 		 */
303 		class disp_data_source_t final : public stats::source_t
304 			{
305 				//! Dispatcher to work with.
306 				outliving_reference_t< dispatcher_template_t > m_dispatcher;
307 
308 				//! Basic prefix for data sources.
309 				stats::prefix_t m_base_prefix;
310 
311 			public :
disp_data_source_t(const std::string_view name_base,outliving_reference_t<dispatcher_template_t> disp)312 				disp_data_source_t(
313 					const std::string_view name_base,
314 					outliving_reference_t< dispatcher_template_t > disp )
315 					:	m_dispatcher{ disp }
316 					{
317 						using namespace so_5::disp::reuse;
318 
319 						m_base_prefix = make_disp_prefix(
320 								"ag", // ao -- active_groups
321 								name_base,
322 								&m_dispatcher );
323 					}
324 
325 				void
distribute(const so_5::mbox_t & mbox)326 				distribute( const so_5::mbox_t & mbox ) override
327 					{
328 						auto & disp = m_dispatcher.get();
329 
330 						std::lock_guard< std::mutex > lock{ disp.m_lock };
331 
332 						so_5::send< stats::messages::quantity< std::size_t > >(
333 								mbox,
334 								m_base_prefix,
335 								stats::suffixes::disp_active_group_count(),
336 								disp.m_groups.size() );
337 
338 						std::size_t agent_count = 0;
339 						for( const auto & p : disp.m_groups )
340 							{
341 								distribute_value_for_work_thread(
342 										mbox,
343 										p.first,
344 										p.second );
345 
346 								agent_count += p.second.m_user_agent;
347 							}
348 
349 						so_5::send< stats::messages::quantity< std::size_t > >(
350 								mbox,
351 								m_base_prefix,
352 								stats::suffixes::agent_count(),
353 								agent_count );
354 					}
355 
356 			private:
357 				void
distribute_value_for_work_thread(const so_5::mbox_t & mbox,const std::string & group_name,const thread_with_refcounter_t & wt)358 				distribute_value_for_work_thread(
359 					const so_5::mbox_t & mbox,
360 					const std::string & group_name,
361 					const thread_with_refcounter_t & wt )
362 					{
363 						std::ostringstream ss;
364 						ss << m_base_prefix.c_str() << "/wt-" << group_name;
365 
366 						const stats::prefix_t prefix{ ss.str() };
367 
368 						so_5::send< stats::messages::quantity< std::size_t > >(
369 								mbox,
370 								prefix,
371 								stats::suffixes::agent_count(),
372 								wt.m_user_agent );
373 
374 						so_5::send< stats::messages::quantity< std::size_t > >(
375 								mbox,
376 								prefix,
377 								stats::suffixes::work_thread_queue_size(),
378 								wt.m_thread->demands_count() );
379 
380 						send_thread_activity_stats(
381 								mbox,
382 								prefix,
383 								*(wt.m_thread) );
384 					}
385 			};
386 
387 		//! Parameters for the dispatcher.
388 		const disp_params_t m_params;
389 
390 		//! A map of dispatchers for active groups.
391 		active_group_map_t m_groups;
392 
393 		//! This object lock.
394 		std::mutex m_lock;
395 
396 		/*!
397 		 * \brief Data source for run-time monitoring.
398 		 *
399 		 * \since
400 		 * v.5.5.4, v.5.6.0
401 		 */
402 		stats::auto_registered_source_holder_t< disp_data_source_t >
403 				m_data_source;
404 
405 		/*!
406 		 * \brief Helper function for searching and erasing agent's
407 		 * thread from map of active threads.
408 		 *
409 		 * \since
410 		 * v.5.5.4
411 		 *
412 		 * \note Does all actions on locked object.
413 		 *
414 		 * \return nullptr if thread for the group is not found
415 		 * or there are still some agents on it.
416 		 */
417 		work_thread_shptr_t
search_and_try_remove_group_from_map(const std::string & group_name)418 		search_and_try_remove_group_from_map(
419 			const std::string & group_name ) noexcept
420 			{
421 				work_thread_shptr_t result;
422 
423 				std::lock_guard< std::mutex > lock{ m_lock };
424 
425 				auto it = m_groups.find( group_name );
426 
427 				if( m_groups.end() != it && 0u == --(it->second.m_user_agent) )
428 					{
429 						result = it->second.m_thread;
430 						m_groups.erase( it );
431 					}
432 
433 				return result;
434 			}
435 };
436 
437 //
438 // dispatcher_handle_maker_t
439 //
440 class dispatcher_handle_maker_t
441 	{
442 	public :
443 		static dispatcher_handle_t
make(actual_dispatcher_iface_shptr_t disp)444 		make( actual_dispatcher_iface_shptr_t disp ) noexcept
445 			{
446 				return { std::move( disp ) };
447 			}
448 	};
449 
450 } /* namespace impl */
451 
452 //
453 // make_dispatcher
454 //
455 SO_5_FUNC dispatcher_handle_t
make_dispatcher(environment_t & env,const std::string_view data_sources_name_base,disp_params_t params)456 make_dispatcher(
457 	environment_t & env,
458 	const std::string_view data_sources_name_base,
459 	disp_params_t params )
460 	{
461 		using namespace so_5::disp::reuse;
462 
463 		using dispatcher_no_activity_tracking_t =
464 				impl::dispatcher_template_t<
465 						work_thread::work_thread_no_activity_tracking_t >;
466 
467 		using dispatcher_with_activity_tracking_t =
468 				impl::dispatcher_template_t<
469 						work_thread::work_thread_with_activity_tracking_t >;
470 
471 		auto binder = so_5::disp::reuse::make_actual_dispatcher<
472 						impl::actual_dispatcher_iface_t,
473 						dispatcher_no_activity_tracking_t,
474 						dispatcher_with_activity_tracking_t >(
475 				outliving_mutable(env),
476 				data_sources_name_base,
477 				std::move(params) );
478 
479 		return impl::dispatcher_handle_maker_t::make( std::move(binder) );
480 	}
481 
482 } /* namespace active_group */
483 
484 } /* namespace disp */
485 
486 } /* namespace so_5 */
487 
488