1 /*
2 SObjectizer 5.
3 */
4
5 #include <so_5/disp/active_group/pub.hpp>
6
7 #include <so_5/send_functions.hpp>
8
9 #include <so_5/details/rollback_on_exception.hpp>
10
11 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
12 #include <so_5/disp/reuse/make_actual_dispatcher.hpp>
13
14 #include <so_5/disp/reuse/work_thread/work_thread.hpp>
15
16 #include <so_5/stats/repository.hpp>
17 #include <so_5/stats/messages.hpp>
18 #include <so_5/stats/std_names.hpp>
19
20 #include <map>
21 #include <mutex>
22 #include <algorithm>
23
24 namespace so_5
25 {
26
27 namespace disp
28 {
29
30 namespace active_group
31 {
32
33 namespace impl
34 {
35
36 namespace work_thread = so_5::disp::reuse::work_thread;
37 namespace stats = so_5::stats;
38
39 namespace
40 {
41
42 /*!
43 * \brief Just a helper function for consequetive call to shutdown and wait.
44 *
45 * \since
46 * v.5.5.4
47 */
48 template< class T >
49 void
shutdown_and_wait(T & w)50 shutdown_and_wait( T & w )
51 {
52 w.shutdown();
53 w.wait();
54 }
55
56 void
send_thread_activity_stats(const so_5::mbox_t &,const stats::prefix_t &,work_thread::work_thread_no_activity_tracking_t &)57 send_thread_activity_stats(
58 const so_5::mbox_t &,
59 const stats::prefix_t &,
60 work_thread::work_thread_no_activity_tracking_t & )
61 {
62 /* Nothing to do */
63 }
64
65 void
send_thread_activity_stats(const so_5::mbox_t & mbox,const stats::prefix_t & prefix,work_thread::work_thread_with_activity_tracking_t & wt)66 send_thread_activity_stats(
67 const so_5::mbox_t & mbox,
68 const stats::prefix_t & prefix,
69 work_thread::work_thread_with_activity_tracking_t & wt )
70 {
71 so_5::send< stats::messages::work_thread_activity >(
72 mbox,
73 prefix,
74 stats::suffixes::work_thread_activity(),
75 wt.thread_id(),
76 wt.take_activity_stats() );
77 }
78
79 } /* anonymous */
80
81 //
82 // actual_dispatcher_iface_t
83 //
84 /*!
85 * \brief An actual interface of active group dispatcher.
86 *
87 * \since
88 * v.5.6.0
89 */
90 class actual_dispatcher_iface_t : public basic_dispatcher_iface_t
91 {
92 public :
93 /*!
94 * \brief Create a new thread for a group if it necessary.
95 *
96 * If name \a group_name is unknown then a new work
97 * thread is started. This thread is marked as it has one
98 * working agent on it.
99 *
100 * If there already is a thread for \a group_name then the
101 * counter of working agents is incremented.
102 */
103 virtual void
104 allocate_thread_for_group( const std::string & group_name ) = 0;
105
106 /*!
107 * \brief Get the event_queue for the specified active group.
108 *
109 * It is expected that thread for the group is already
110 * created by calling allocate_thread_for_group() method.
111 */
112 virtual so_5::event_queue_t *
113 query_thread_for_group( const std::string & group_name ) noexcept = 0;
114
115 /*!
116 * \brief Release the thread for the specified active group.
117 *
118 * Method decrements the working agent count for the thread of
119 * \a group_name. If there no more working agents left then
120 * the event_queue and working thread for that group will be
121 * destroyed.
122 */
123 virtual void
124 release_thread_for_group( const std::string & group_name ) noexcept = 0;
125 };
126
127 //
128 // actual_dispatcher_iface_shptr_t
129 //
130 using actual_dispatcher_iface_shptr_t =
131 std::shared_ptr< actual_dispatcher_iface_t >;
132
133 //
134 // actual_binder_t
135 //
136 /*!
137 * \brief Implementation of binder interface for %active_group dispatcher.
138 *
139 * \since
140 * v.5.6.0
141 */
142 class actual_binder_t final : public disp_binder_t
143 {
144 //! Dispatcher to be used.
145 actual_dispatcher_iface_shptr_t m_disp;
146 //! Name of group for new agents.
147 const std::string m_group_name;
148
149 public :
actual_binder_t(actual_dispatcher_iface_shptr_t disp,nonempty_name_t group_name)150 actual_binder_t(
151 actual_dispatcher_iface_shptr_t disp,
152 nonempty_name_t group_name ) noexcept
153 : m_disp{ std::move(disp) }
154 , m_group_name{ group_name.giveout_value() }
155 {}
156
157 void
preallocate_resources(agent_t &)158 preallocate_resources(
159 agent_t & /*agent*/ ) override
160 {
161 m_disp->allocate_thread_for_group( m_group_name );
162 }
163
164 void
undo_preallocation(agent_t &)165 undo_preallocation(
166 agent_t & /*agent*/ ) noexcept override
167 {
168 m_disp->release_thread_for_group( m_group_name );
169 }
170
171 void
bind(agent_t & agent)172 bind(
173 agent_t & agent ) noexcept override
174 {
175 auto queue = m_disp->query_thread_for_group( m_group_name );
176 agent.so_bind_to_dispatcher( *queue );
177 }
178
179 void
unbind(agent_t &)180 unbind(
181 agent_t & /*agent*/ ) noexcept override
182 {
183 m_disp->release_thread_for_group( m_group_name );
184 }
185 };
186
187 //
188 // dispatcher_template_t
189 //
190
191 /*!
192 * \brief Implementation of active object dispatcher in form of template class.
193 */
194 template< typename Work_Thread >
195 class dispatcher_template_t final : public actual_dispatcher_iface_t
196 {
197 public:
dispatcher_template_t(outliving_reference_t<environment_t> env,const std::string_view name_base,disp_params_t params)198 dispatcher_template_t(
199 //! SObjectizer Environment to work in.
200 outliving_reference_t< environment_t > env,
201 //! Base part of data sources names.
202 const std::string_view name_base,
203 //! Dispatcher's parameters.
204 disp_params_t params )
205 : m_params{ std::move(params) }
206 , m_data_source{
207 outliving_mutable(env.get().stats_repository()),
208 name_base,
209 outliving_mutable( *this )
210 }
211 {}
212
~dispatcher_template_t()213 ~dispatcher_template_t() noexcept override
214 {
215 // All working threads should receive stop signal.
216 for( auto & p: m_groups )
217 p.second.m_thread->shutdown();
218
219 // All working threads should be joined.
220 for( auto & p: m_groups )
221 p.second.m_thread->wait();
222 }
223
224 disp_binder_shptr_t
binder(nonempty_name_t group_name)225 binder( nonempty_name_t group_name ) override
226 {
227 return std::make_shared< actual_binder_t >(
228 this->shared_from_this(),
229 std::move(group_name) );
230 }
231
232 void
allocate_thread_for_group(const std::string & group_name)233 allocate_thread_for_group( const std::string & group_name ) override
234 {
235 std::lock_guard< std::mutex > lock{ m_lock };
236
237 auto it = m_groups.find( group_name );
238
239 if( m_groups.end() == it )
240 {
241 // New thread should be created.
242 auto thread = std::make_shared< Work_Thread >(
243 m_params.queue_params().lock_factory() );
244
245 thread->start();
246
247 so_5::details::do_with_rollback_on_exception(
248 [&] {
249 m_groups.emplace(
250 group_name,
251 thread_with_refcounter_t{ thread, 1u } );
252 },
253 [&thread] { shutdown_and_wait( *thread ); } );
254 }
255 else
256 {
257 // Number of agents bound has to be incremented now.
258 it->second.m_user_agent += 1u;
259 }
260 }
261
262 so_5::event_queue_t *
query_thread_for_group(const std::string & group_name)263 query_thread_for_group( const std::string & group_name ) noexcept override
264 {
265 std::lock_guard< std::mutex > lock{ m_lock };
266
267 return m_groups.find( group_name )->second.m_thread->
268 get_agent_binding();
269 }
270
271 void
release_thread_for_group(const std::string & group_name)272 release_thread_for_group( const std::string & group_name ) noexcept override
273 {
274 auto thread = search_and_try_remove_group_from_map( group_name );
275 if( thread )
276 shutdown_and_wait( *thread );
277 }
278
279 private:
280 friend class disp_data_source_t;
281
282 //! An alias for shared pointer to work thread.
283 using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
284
285 //! Auxiliary class for the working agent counting.
286 struct thread_with_refcounter_t
287 {
288 work_thread_shptr_t m_thread;
289 std::size_t m_user_agent;
290 };
291
292 //! Typedef for mapping from group names to a single thread
293 //! dispatcher.
294 using active_group_map_t =
295 std::map< std::string, thread_with_refcounter_t >;
296
297 /*!
298 * \brief Data source for run-time monitoring of whole dispatcher.
299 *
300 * \since
301 * v.5.5.4
302 */
303 class disp_data_source_t final : public stats::source_t
304 {
305 //! Dispatcher to work with.
306 outliving_reference_t< dispatcher_template_t > m_dispatcher;
307
308 //! Basic prefix for data sources.
309 stats::prefix_t m_base_prefix;
310
311 public :
disp_data_source_t(const std::string_view name_base,outliving_reference_t<dispatcher_template_t> disp)312 disp_data_source_t(
313 const std::string_view name_base,
314 outliving_reference_t< dispatcher_template_t > disp )
315 : m_dispatcher{ disp }
316 {
317 using namespace so_5::disp::reuse;
318
319 m_base_prefix = make_disp_prefix(
320 "ag", // ao -- active_groups
321 name_base,
322 &m_dispatcher );
323 }
324
325 void
distribute(const so_5::mbox_t & mbox)326 distribute( const so_5::mbox_t & mbox ) override
327 {
328 auto & disp = m_dispatcher.get();
329
330 std::lock_guard< std::mutex > lock{ disp.m_lock };
331
332 so_5::send< stats::messages::quantity< std::size_t > >(
333 mbox,
334 m_base_prefix,
335 stats::suffixes::disp_active_group_count(),
336 disp.m_groups.size() );
337
338 std::size_t agent_count = 0;
339 for( const auto & p : disp.m_groups )
340 {
341 distribute_value_for_work_thread(
342 mbox,
343 p.first,
344 p.second );
345
346 agent_count += p.second.m_user_agent;
347 }
348
349 so_5::send< stats::messages::quantity< std::size_t > >(
350 mbox,
351 m_base_prefix,
352 stats::suffixes::agent_count(),
353 agent_count );
354 }
355
356 private:
357 void
distribute_value_for_work_thread(const so_5::mbox_t & mbox,const std::string & group_name,const thread_with_refcounter_t & wt)358 distribute_value_for_work_thread(
359 const so_5::mbox_t & mbox,
360 const std::string & group_name,
361 const thread_with_refcounter_t & wt )
362 {
363 std::ostringstream ss;
364 ss << m_base_prefix.c_str() << "/wt-" << group_name;
365
366 const stats::prefix_t prefix{ ss.str() };
367
368 so_5::send< stats::messages::quantity< std::size_t > >(
369 mbox,
370 prefix,
371 stats::suffixes::agent_count(),
372 wt.m_user_agent );
373
374 so_5::send< stats::messages::quantity< std::size_t > >(
375 mbox,
376 prefix,
377 stats::suffixes::work_thread_queue_size(),
378 wt.m_thread->demands_count() );
379
380 send_thread_activity_stats(
381 mbox,
382 prefix,
383 *(wt.m_thread) );
384 }
385 };
386
387 //! Parameters for the dispatcher.
388 const disp_params_t m_params;
389
390 //! A map of dispatchers for active groups.
391 active_group_map_t m_groups;
392
393 //! This object lock.
394 std::mutex m_lock;
395
396 /*!
397 * \brief Data source for run-time monitoring.
398 *
399 * \since
400 * v.5.5.4, v.5.6.0
401 */
402 stats::auto_registered_source_holder_t< disp_data_source_t >
403 m_data_source;
404
405 /*!
406 * \brief Helper function for searching and erasing agent's
407 * thread from map of active threads.
408 *
409 * \since
410 * v.5.5.4
411 *
412 * \note Does all actions on locked object.
413 *
414 * \return nullptr if thread for the group is not found
415 * or there are still some agents on it.
416 */
417 work_thread_shptr_t
search_and_try_remove_group_from_map(const std::string & group_name)418 search_and_try_remove_group_from_map(
419 const std::string & group_name ) noexcept
420 {
421 work_thread_shptr_t result;
422
423 std::lock_guard< std::mutex > lock{ m_lock };
424
425 auto it = m_groups.find( group_name );
426
427 if( m_groups.end() != it && 0u == --(it->second.m_user_agent) )
428 {
429 result = it->second.m_thread;
430 m_groups.erase( it );
431 }
432
433 return result;
434 }
435 };
436
437 //
438 // dispatcher_handle_maker_t
439 //
440 class dispatcher_handle_maker_t
441 {
442 public :
443 static dispatcher_handle_t
make(actual_dispatcher_iface_shptr_t disp)444 make( actual_dispatcher_iface_shptr_t disp ) noexcept
445 {
446 return { std::move( disp ) };
447 }
448 };
449
450 } /* namespace impl */
451
452 //
453 // make_dispatcher
454 //
455 SO_5_FUNC dispatcher_handle_t
make_dispatcher(environment_t & env,const std::string_view data_sources_name_base,disp_params_t params)456 make_dispatcher(
457 environment_t & env,
458 const std::string_view data_sources_name_base,
459 disp_params_t params )
460 {
461 using namespace so_5::disp::reuse;
462
463 using dispatcher_no_activity_tracking_t =
464 impl::dispatcher_template_t<
465 work_thread::work_thread_no_activity_tracking_t >;
466
467 using dispatcher_with_activity_tracking_t =
468 impl::dispatcher_template_t<
469 work_thread::work_thread_with_activity_tracking_t >;
470
471 auto binder = so_5::disp::reuse::make_actual_dispatcher<
472 impl::actual_dispatcher_iface_t,
473 dispatcher_no_activity_tracking_t,
474 dispatcher_with_activity_tracking_t >(
475 outliving_mutable(env),
476 data_sources_name_base,
477 std::move(params) );
478
479 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
480 }
481
482 } /* namespace active_group */
483
484 } /* namespace disp */
485
486 } /* namespace so_5 */
487
488