1 ////////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2011 Bryce Adelstein-Lelbach
3 // Copyright (c) 2012-2017 Hartmut Kaiser
4 // Copyright (c) 2016 Thomas Heller
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 ////////////////////////////////////////////////////////////////////////////////
9
10 #include <hpx/config.hpp>
11 #include <hpx/performance_counters/counter_creators.hpp>
12 #include <hpx/performance_counters/counters.hpp>
13 #include <hpx/performance_counters/manage_counter_type.hpp>
14 #include <hpx/runtime/actions/continuation.hpp>
15 #include <hpx/runtime/agas/namespace_action_code.hpp>
16 #include <hpx/runtime/agas/server/locality_namespace.hpp>
17 #include <hpx/runtime/agas/server/primary_namespace.hpp>
18 #include <hpx/runtime/components/component_type.hpp>
19 #include <hpx/runtime/serialization/vector.hpp>
20 #include <hpx/util/assert.hpp>
21 #include <hpx/util/bind_back.hpp>
22 #include <hpx/util/bind_front.hpp>
23 #include <hpx/util/format.hpp>
24 #include <hpx/util/get_and_reset_value.hpp>
25 #include <hpx/util/insert_checked.hpp>
26 #include <hpx/util/logging.hpp>
27 #include <hpx/util/scoped_timer.hpp>
28
29 #include <atomic>
30 #include <cstddef>
31 #include <cstdint>
32 #include <list>
33 #include <map>
34 #include <mutex>
35 #include <string>
36 #include <utility>
37 #include <vector>
38
39 namespace hpx { namespace agas { namespace server
40 {
41
42 // register all performance counter types exposed by this component
register_counter_types(error_code & ec)43 void locality_namespace::register_counter_types(
44 error_code& ec
45 )
46 {
47 performance_counters::create_counter_func creator(
48 util::bind_back(&performance_counters::agas_raw_counter_creator
49 , agas::server::locality_namespace_service_name));
50
51 for (std::size_t i = 0;
52 i != detail::num_locality_namespace_services;
53 ++i)
54 {
55 // global counters are handled elsewhere
56 if (detail::locality_namespace_services[i].code_ ==
57 locality_ns_statistics_counter)
58 continue;
59
60 std::string name(detail::locality_namespace_services[i].name_);
61 std::string help;
62 std::string::size_type p = name.find_last_of('/');
63 HPX_ASSERT(p != std::string::npos);
64
65 if (detail::locality_namespace_services[i].target_ ==
66 detail::counter_target_count)
67 help = hpx::util::format(
68 "returns the number of invocations of the AGAS service '{}'",
69 name.substr(p+1));
70 else
71 help = hpx::util::format(
72 "returns the overall execution time of the AGAS service '{}'",
73 name.substr(p+1));
74
75 performance_counters::install_counter_type(
76 agas::performance_counter_basename + name
77 , performance_counters::counter_raw
78 , help
79 , creator
80 , &performance_counters::locality0_counter_discoverer
81 , HPX_PERFORMANCE_COUNTER_V1
82 , detail::locality_namespace_services[i].uom_
83 , ec
84 );
85 if (ec) return;
86 }
87 }
88
register_global_counter_types(error_code & ec)89 void locality_namespace::register_global_counter_types(
90 error_code& ec
91 )
92 {
93 performance_counters::create_counter_func creator(
94 util::bind_back(&performance_counters::agas_raw_counter_creator
95 , agas::server::locality_namespace_service_name));
96
97 for (std::size_t i = 0;
98 i != detail::num_locality_namespace_services;
99 ++i)
100 {
101 // local counters are handled elsewhere
102 if (detail::locality_namespace_services[i].code_ !=
103 locality_ns_statistics_counter)
104 continue;
105
106 std::string help;
107 if (detail::locality_namespace_services[i].target_ ==
108 detail::counter_target_count)
109 help = "returns the overall number of invocations \
110 of all locality AGAS services";
111 else
112 help = "returns the overall execution time of all locality AGAS services";
113
114 performance_counters::install_counter_type(
115 std::string(agas::performance_counter_basename) +
116 detail::locality_namespace_services[i].name_
117 , performance_counters::counter_raw
118 , help
119 , creator
120 , &performance_counters::locality0_counter_discoverer
121 , HPX_PERFORMANCE_COUNTER_V1
122 , detail::locality_namespace_services[i].uom_
123 , ec
124 );
125 if (ec) return;
126 }
127 }
128
register_server_instance(char const * servicename,error_code & ec)129 void locality_namespace::register_server_instance(
130 char const* servicename
131 , error_code& ec
132 )
133 {
134 // now register this AGAS instance with AGAS :-P
135 instance_name_ = agas::service_name;
136 instance_name_ += servicename;
137 instance_name_ += agas::server::locality_namespace_service_name;
138
139 // register a gid (not the id) to avoid AGAS holding a reference to this
140 // component
141 agas::register_name(launch::sync, instance_name_,
142 get_unmanaged_id().get_gid(), ec);
143 }
144
unregister_server_instance(error_code & ec)145 void locality_namespace::unregister_server_instance(
146 error_code& ec
147 )
148 {
149 agas::unregister_name(launch::sync, instance_name_, ec);
150 this->base_type::finalize();
151 }
152
finalize()153 void locality_namespace::finalize()
154 {
155 if (!instance_name_.empty())
156 {
157 error_code ec(lightweight);
158 agas::unregister_name(launch::sync, instance_name_, ec);
159 }
160 }
161
allocate(parcelset::endpoints_type const & endpoints,std::uint64_t count,std::uint32_t num_threads,naming::gid_type suggested_prefix)162 std::uint32_t locality_namespace::allocate(
163 parcelset::endpoints_type const& endpoints
164 , std::uint64_t count
165 , std::uint32_t num_threads
166 , naming::gid_type suggested_prefix
167 )
168 { // {{{ allocate implementation
169 util::scoped_timer<std::atomic<std::int64_t> > update(
170 counter_data_.allocate_.time_,
171 counter_data_.allocate_.enabled_
172 );
173 counter_data_.increment_allocate_count();
174
175 using hpx::util::get;
176
177 std::unique_lock<mutex_type> l(mutex_);
178
179 #if defined(HPX_DEBUG)
180 for (partition_table_type::value_type const& partition : partitions_)
181 {
182 HPX_ASSERT(get<0>(partition.second) != endpoints);
183 }
184 #endif
185 // Check for address space exhaustion.
186 if (HPX_UNLIKELY(0xFFFFFFFE < partitions_.size())) //-V104
187 {
188 l.unlock();
189
190 HPX_THROW_EXCEPTION(internal_server_error
191 , "locality_namespace::allocate"
192 , "primary namespace has been exhausted");
193 }
194
195 // Compute the locality's prefix.
196 std::uint32_t prefix = naming::invalid_locality_id;
197
198 // check if the suggested prefix can be used instead of the next
199 // free one
200 std::uint32_t suggested_locality_id =
201 naming::get_locality_id_from_gid(suggested_prefix);
202
203 partition_table_type::iterator it = partitions_.end();
204 if (suggested_locality_id != naming::invalid_locality_id)
205 {
206 it = partitions_.find(suggested_locality_id);
207
208 if(it == partitions_.end())
209 {
210 prefix = suggested_locality_id;
211 }
212 else
213 {
214 do {
215 prefix = prefix_counter_++;
216 it = partitions_.find(prefix);
217 } while (it != partitions_.end());
218 }
219 }
220 else
221 {
222 do {
223 prefix = prefix_counter_++;
224 it = partitions_.find(prefix);
225 } while (it != partitions_.end());
226 }
227
228 // We need to create an entry in the partition table for this
229 // locality.
230 if(HPX_UNLIKELY(!util::insert_checked(partitions_.insert(
231 std::make_pair(prefix, partition_type(endpoints, num_threads))), it)))
232 {
233 l.unlock();
234
235 HPX_THROW_EXCEPTION(lock_error
236 , "locality_namespace::allocate"
237 , hpx::util::format(
238 "partition table insertion failed due to a locking "
239 "error or memory corruption, endpoint({1}), "
240 "prefix({2})", endpoints, prefix));
241 }
242
243
244 // Now that we've inserted the locality into the partition table
245 // successfully, we need to put the locality's GID into the GVA
246 // table so that parcels can be sent to the memory of a locality.
247 if (primary_)
248 {
249 naming::gid_type id(naming::get_gid_from_locality_id(prefix));
250 gva const g(id, components::component_runtime_support, count);
251
252 if(!primary_->bind_gid(g, id, id))
253 {
254 HPX_THROW_EXCEPTION(bad_request
255 , "locality_namespace::allocate"
256 , hpx::util::format(
257 "unable to bind prefix({1}) to a gid", prefix));
258 }
259 return prefix;
260 }
261
262 LAGAS_(info) << hpx::util::format(
263 "locality_namespace::allocate, ep({1}), count({2}), "
264 "prefix({3})",
265 endpoints, count, prefix);
266
267 return prefix;
268 } // }}}
269
resolve_locality(naming::gid_type locality)270 parcelset::endpoints_type locality_namespace::resolve_locality(
271 naming::gid_type locality)
272 { // {{{ resolve_locality implementation
273 util::scoped_timer<std::atomic<std::int64_t> > update(
274 counter_data_.resolve_locality_.time_,
275 counter_data_.resolve_locality_.enabled_
276 );
277 counter_data_.increment_resolve_locality_count();
278
279 using hpx::util::get;
280 std::uint32_t prefix = naming::get_locality_id_from_gid(locality);
281
282 std::lock_guard<mutex_type> l(mutex_);
283 partition_table_type::iterator it = partitions_.find(prefix);
284
285 if(it != partitions_.end())
286 {
287 return get<0>(it->second);
288 }
289
290 return parcelset::endpoints_type();
291 } // }}}
292
free(naming::gid_type locality)293 void locality_namespace::free(naming::gid_type locality)
294 { // {{{ free implementation
295 util::scoped_timer<std::atomic<std::int64_t> > update(
296 counter_data_.free_.time_,
297 counter_data_.free_.enabled_
298 );
299 counter_data_.increment_free_count();
300
301 using hpx::util::get;
302
303 // parameters
304 std::uint32_t prefix = naming::get_locality_id_from_gid(locality);
305
306 std::unique_lock<mutex_type> l(mutex_);
307
308 partition_table_type::iterator pit = partitions_.find(prefix)
309 , pend = partitions_.end();
310
311 if (pit != pend)
312 {
313 /*
314 // Wipe the locality from the tables.
315 naming::gid_type locality =
316 naming::get_gid_from_locality_id(get<0>(pit->second));
317
318 // first remove entry from reverse partition table
319 prefixes_.erase(get<0>(pit->second));
320 */
321
322 // now remove it from the main partition table
323 partitions_.erase(pit);
324
325 if (primary_)
326 {
327 l.unlock();
328
329 // remove primary namespace
330 {
331 naming::gid_type service(HPX_AGAS_PRIMARY_NS_MSB,
332 HPX_AGAS_PRIMARY_NS_LSB);
333 primary_->unbind_gid(
334 1, naming::replace_locality_id(service, prefix));
335 }
336
337 // remove symbol namespace
338 {
339 naming::gid_type service(HPX_AGAS_SYMBOL_NS_MSB,
340 HPX_AGAS_SYMBOL_NS_LSB);
341 primary_->unbind_gid(
342 1, naming::replace_locality_id(service, prefix));
343 }
344
345 // remove locality itself
346 {
347 primary_->unbind_gid(0, locality);
348 }
349 }
350
351 /*
352 LAGAS_(info) << hpx::util::format(
353 "locality_namespace::free, ep({1})",
354 ep);
355 */
356 }
357
358 /*
359 LAGAS_(info) << hpx::util::format(
360 "locality_namespace::free, ep({1}), "
361 "response(no_success)",
362 ep);
363 */
364 } // }}}
365
localities()366 std::vector<std::uint32_t> locality_namespace::localities()
367 { // {{{ localities implementation
368 util::scoped_timer<std::atomic<std::int64_t> > update(
369 counter_data_.localities_.time_,
370 counter_data_.localities_.enabled_
371 );
372 counter_data_.increment_localities_count();
373
374 std::lock_guard<mutex_type> l(mutex_);
375
376 std::vector<std::uint32_t> p;
377
378 partition_table_type::const_iterator it = partitions_.begin()
379 , end = partitions_.end();
380
381 for (/**/; it != end; ++it)
382 p.push_back(it->first);
383
384 LAGAS_(info) << hpx::util::format(
385 "locality_namespace::localities, localities({1})",
386 p.size());
387
388 return p;
389 } // }}}
390
get_num_localities()391 std::uint32_t locality_namespace::get_num_localities()
392 { // {{{ get_num_localities implementation
393 util::scoped_timer<std::atomic<std::int64_t> > update(
394 counter_data_.num_localities_.time_,
395 counter_data_.num_localities_.enabled_
396 );
397 counter_data_.increment_num_localities_count();
398 std::lock_guard<mutex_type> l(mutex_);
399
400 std::uint32_t num_localities =
401 static_cast<std::uint32_t>(partitions_.size());
402
403 LAGAS_(info) << hpx::util::format(
404 "locality_namespace::get_num_localities, localities({1})",
405 num_localities);
406
407 return num_localities;
408 } // }}}
409
get_num_threads()410 std::vector<std::uint32_t> locality_namespace::get_num_threads()
411 { // {{{ get_num_threads implementation
412 std::lock_guard<mutex_type> l(mutex_);
413
414 std::vector<std::uint32_t> num_threads;
415
416 partition_table_type::iterator end = partitions_.end();
417 for (partition_table_type::iterator it = partitions_.begin();
418 it != end; ++it)
419 {
420 using hpx::util::get;
421 num_threads.push_back(get<1>(it->second));
422 }
423
424 LAGAS_(info) << hpx::util::format(
425 "locality_namespace::get_num_threads, localities({1})",
426 num_threads.size());
427
428 return num_threads;
429 } // }}}
430
get_num_overall_threads()431 std::uint32_t locality_namespace::get_num_overall_threads()
432 {
433 std::lock_guard<mutex_type> l(mutex_);
434
435 std::uint32_t num_threads = 0;
436
437 partition_table_type::iterator end = partitions_.end();
438 for (partition_table_type::iterator it = partitions_.begin();
439 it != end; ++it)
440 {
441 using hpx::util::get;
442 num_threads += get<1>(it->second);
443 }
444
445 LAGAS_(info) << hpx::util::format(
446 "locality_namespace::get_num_overall_threads, localities({1})",
447 num_threads);
448
449 return num_threads;
450 }
451
statistics_counter(std::string name)452 naming::gid_type locality_namespace::statistics_counter(std::string name)
453 { // {{{ statistics_counter implementation
454 LAGAS_(info) << "locality_namespace::statistics_counter";
455
456 performance_counters::counter_path_elements p;
457 performance_counters::get_counter_path_elements(name, p);
458
459 if (p.objectname_ != "agas")
460 {
461 HPX_THROW_EXCEPTION(bad_parameter,
462 "locality_namespace::statistics_counter",
463 "unknown performance counter (unrelated to AGAS)");
464 }
465
466 namespace_action_code code = invalid_request;
467 detail::counter_target target = detail::counter_target_invalid;
468 for (std::size_t i = 0;
469 i != detail::num_locality_namespace_services;
470 ++i)
471 {
472 if (p.countername_ == detail::locality_namespace_services[i].name_)
473 {
474 code = detail::locality_namespace_services[i].code_;
475 target = detail::locality_namespace_services[i].target_;
476 break;
477 }
478 }
479
480 if (code == invalid_request || target == detail::counter_target_invalid)
481 {
482 HPX_THROW_EXCEPTION(bad_parameter,
483 "locality_namespace::statistics_counter",
484 "unknown performance counter (unrelated to AGAS)");
485 }
486
487 typedef locality_namespace::counter_data cd;
488
489 util::function_nonser<std::int64_t(bool)> get_data_func;
490 if (target == detail::counter_target_count)
491 {
492 switch (code) {
493 case locality_ns_allocate:
494 get_data_func = util::bind_front(&cd::get_allocate_count,
495 &counter_data_);
496 counter_data_.allocate_.enabled_ = true;
497 break;
498 case locality_ns_resolve_locality:
499 get_data_func = util::bind_front(&cd::get_resolve_locality_count,
500 &counter_data_);
501 counter_data_.resolve_locality_.enabled_ = true;
502 break;
503 case locality_ns_free:
504 get_data_func = util::bind_front(&cd::get_free_count,
505 &counter_data_);
506 counter_data_.free_.enabled_ = true;
507 break;
508 case locality_ns_localities:
509 get_data_func = util::bind_front(&cd::get_localities_count,
510 &counter_data_);
511 counter_data_.localities_.enabled_ = true;
512 break;
513 case locality_ns_num_localities:
514 get_data_func = util::bind_front(&cd::get_num_localities_count,
515 &counter_data_);
516 counter_data_.num_localities_.enabled_ = true;
517 break;
518 case locality_ns_num_threads:
519 get_data_func = util::bind_front(&cd::get_num_threads_count,
520 &counter_data_);
521 counter_data_.num_threads_.enabled_ = true;
522 break;
523 case locality_ns_statistics_counter:
524 get_data_func = util::bind_front(&cd::get_overall_count,
525 &counter_data_);
526 counter_data_.enable_all();
527 break;
528 default:
529 HPX_THROW_EXCEPTION(bad_parameter
530 , "locality_namespace::statistics"
531 , "bad action code while querying statistics");
532 }
533 }
534 else {
535 HPX_ASSERT(detail::counter_target_time == target);
536 switch (code) {
537 case locality_ns_allocate:
538 get_data_func = util::bind_front(&cd::get_allocate_time,
539 &counter_data_);
540 counter_data_.allocate_.enabled_ = true;
541 break;
542 case locality_ns_resolve_locality:
543 get_data_func = util::bind_front(&cd::get_resolve_locality_time,
544 &counter_data_);
545 counter_data_.resolve_locality_.enabled_ = true;
546 break;
547 case locality_ns_free:
548 get_data_func = util::bind_front(&cd::get_free_time,
549 &counter_data_);
550 counter_data_.free_.enabled_ = true;
551 break;
552 case locality_ns_localities:
553 get_data_func = util::bind_front(&cd::get_localities_time,
554 &counter_data_);
555 counter_data_.localities_.enabled_ = true;
556 break;
557 case locality_ns_num_localities:
558 get_data_func = util::bind_front(&cd::get_num_localities_time,
559 &counter_data_);
560 counter_data_.num_localities_.enabled_ = true;
561 break;
562 case locality_ns_num_threads:
563 get_data_func = util::bind_front(&cd::get_num_threads_time,
564 &counter_data_);
565 counter_data_.num_threads_.enabled_ = true;
566 break;
567 case locality_ns_statistics_counter:
568 get_data_func = util::bind_front(&cd::get_overall_time,
569 &counter_data_);
570 counter_data_.enable_all();
571 break;
572 default:
573 HPX_THROW_EXCEPTION(bad_parameter
574 , "locality_namespace::statistics"
575 , "bad action code while querying statistics");
576 }
577 }
578
579 performance_counters::counter_info info;
580 performance_counters::get_counter_type(name, info);
581
582 performance_counters::complement_counter_info(info);
583
584 using performance_counters::detail::create_raw_counter;
585 naming::gid_type gid = create_raw_counter(info, get_data_func, hpx::throws);
586
587 return naming::detail::strip_credits_from_gid(gid);
588 }
589
590 // access current counter values
get_allocate_count(bool reset)591 std::int64_t locality_namespace::counter_data::get_allocate_count(bool reset)
592 {
593 return util::get_and_reset_value(allocate_.count_, reset);
594 }
595
get_resolve_locality_count(bool reset)596 std::int64_t locality_namespace::counter_data::get_resolve_locality_count(bool reset)
597 {
598 return util::get_and_reset_value(resolve_locality_.count_, reset);
599 }
600
get_free_count(bool reset)601 std::int64_t locality_namespace::counter_data::get_free_count(bool reset)
602 {
603 return util::get_and_reset_value(free_.count_, reset);
604 }
605
get_localities_count(bool reset)606 std::int64_t locality_namespace::counter_data::get_localities_count(bool reset)
607 {
608 return util::get_and_reset_value(localities_.count_, reset);
609 }
610
get_num_localities_count(bool reset)611 std::int64_t locality_namespace::counter_data::get_num_localities_count(bool reset)
612 {
613 return util::get_and_reset_value(num_localities_.count_, reset);
614 }
615
get_num_threads_count(bool reset)616 std::int64_t locality_namespace::counter_data::get_num_threads_count(bool reset)
617 {
618 return util::get_and_reset_value(num_threads_.count_, reset);
619 }
620
get_overall_count(bool reset)621 std::int64_t locality_namespace::counter_data::get_overall_count(bool reset)
622 {
623 return util::get_and_reset_value(allocate_.count_, reset) +
624 util::get_and_reset_value(resolve_locality_.count_, reset) +
625 util::get_and_reset_value(free_.count_, reset) +
626 util::get_and_reset_value(localities_.count_, reset) +
627 util::get_and_reset_value(num_localities_.count_, reset) +
628 util::get_and_reset_value(num_threads_.count_, reset);
629 }
630
enable_all()631 void locality_namespace::counter_data::enable_all()
632 {
633 allocate_.enabled_ = true;
634 resolve_locality_.enabled_ = true;
635 free_.enabled_ = true;
636 localities_.enabled_ = true;
637 num_localities_.enabled_ = true;
638 num_threads_.enabled_ = true;
639 }
640
641 // access execution time counters
get_allocate_time(bool reset)642 std::int64_t locality_namespace::counter_data::get_allocate_time(bool reset)
643 {
644 return util::get_and_reset_value(allocate_.time_, reset);
645 }
646
get_resolve_locality_time(bool reset)647 std::int64_t locality_namespace::counter_data::get_resolve_locality_time(bool reset)
648 {
649 return util::get_and_reset_value(resolve_locality_.time_, reset);
650 }
651
get_free_time(bool reset)652 std::int64_t locality_namespace::counter_data::get_free_time(bool reset)
653 {
654 return util::get_and_reset_value(free_.time_, reset);
655 }
656
get_localities_time(bool reset)657 std::int64_t locality_namespace::counter_data::get_localities_time(bool reset)
658 {
659 return util::get_and_reset_value(localities_.time_, reset);
660 }
661
get_num_localities_time(bool reset)662 std::int64_t locality_namespace::counter_data::get_num_localities_time(bool reset)
663 {
664 return util::get_and_reset_value(num_localities_.time_, reset);
665 }
666
get_num_threads_time(bool reset)667 std::int64_t locality_namespace::counter_data::get_num_threads_time(bool reset)
668 {
669 return util::get_and_reset_value(num_threads_.time_, reset);
670 }
671
get_overall_time(bool reset)672 std::int64_t locality_namespace::counter_data::get_overall_time(bool reset)
673 {
674 return util::get_and_reset_value(allocate_.time_, reset) +
675 util::get_and_reset_value(resolve_locality_.time_, reset) +
676 util::get_and_reset_value(free_.time_, reset) +
677 util::get_and_reset_value(localities_.time_, reset) +
678 util::get_and_reset_value(num_localities_.time_, reset) +
679 util::get_and_reset_value(num_threads_.time_, reset);
680 }
681
682 // increment counter values
increment_allocate_count()683 void locality_namespace::counter_data::increment_allocate_count()
684 {
685 if (allocate_.enabled_)
686 {
687 ++allocate_.count_;
688 }
689 }
690
increment_resolve_locality_count()691 void locality_namespace::counter_data::increment_resolve_locality_count()
692 {
693 if (resolve_locality_.enabled_)
694 {
695 ++resolve_locality_.count_;
696 }
697 }
698
increment_free_count()699 void locality_namespace::counter_data::increment_free_count()
700 {
701 if (free_.enabled_)
702 {
703 ++free_.count_;
704 }
705 }
706
increment_localities_count()707 void locality_namespace::counter_data::increment_localities_count()
708 {
709 if (localities_.enabled_)
710 {
711 ++localities_.count_;
712 }
713 }
714
increment_num_localities_count()715 void locality_namespace::counter_data::increment_num_localities_count()
716 {
717 if (num_localities_.enabled_)
718 {
719 ++num_localities_.count_;
720 }
721 }
722
increment_num_threads_count()723 void locality_namespace::counter_data::increment_num_threads_count()
724 {
725 if (num_threads_.enabled_)
726 {
727 ++num_threads_.count_;
728 }
729 }
730 }}}
731
732