1 /******************************************************************************* 2 * thrill/api/context.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com> 7 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de> 8 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 9 * 10 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 11 ******************************************************************************/ 12 13 #pragma once 14 #ifndef THRILL_API_CONTEXT_HEADER 15 #define THRILL_API_CONTEXT_HEADER 16 17 #include <thrill/common/config.hpp> 18 #include <thrill/common/defines.hpp> 19 #include <thrill/common/json_logger.hpp> 20 #include <thrill/common/profile_task.hpp> 21 #include <thrill/data/block_pool.hpp> 22 #include <thrill/data/cat_stream.hpp> 23 #include <thrill/data/file.hpp> 24 #include <thrill/data/mix_stream.hpp> 25 #include <thrill/data/multiplexer.hpp> 26 #include <thrill/net/flow_control_channel.hpp> 27 #include <thrill/net/flow_control_manager.hpp> 28 #include <thrill/net/manager.hpp> 29 30 #include <algorithm> 31 #include <cassert> 32 #include <functional> 33 #include <numeric> 34 #include <random> 35 #include <string> 36 #include <tuple> 37 #include <vector> 38 39 namespace thrill { 40 namespace api { 41 42 //! \ingroup api_layer 43 //! \{ 44 45 // forward declarations 46 class DIABase; 47 48 class MemoryConfig 49 { 50 public: 51 //! detect memory configuration from environment 52 int setup_detect(); 53 54 //! setup memory size 55 void setup(size_t ram); 56 57 MemoryConfig divide(size_t hosts) const; 58 void apply(); 59 60 void print(size_t workers_per_host) const; 61 62 //! total amount of physical ram detected or THRILL_RAM 63 size_t ram_; 64 65 //! amount of RAM dedicated to data::BlockPool -- hard limit 66 size_t ram_block_pool_hard_; 67 68 //! amount of RAM dedicated to data::BlockPool -- soft limit 69 size_t ram_block_pool_soft_; 70 71 //! total amount of RAM for DIANode data structures such as the reduce 72 //! tables. divide by the number of worker threads before use. 73 size_t ram_workers_; 74 75 //! remaining free-floating RAM used for user and Thrill data structures. 76 size_t ram_floating_; 77 78 //! StageBuilder verbosity flag 79 bool verbose_ = true; 80 81 //! enable Linux /proc stats profiler (default: on) 82 bool enable_proc_profiler_ = true; 83 }; 84 85 /*! 86 * The HostContext contains all data structures shared among workers on the same 87 * host. It is used to construct and destroy them. For testing multiple 88 * instances are run in the same process. 89 */ 90 class HostContext 91 { 92 public: 93 #ifndef SWIG 94 //! constructor from existing net Groups. Used by the construction methods. 95 HostContext(size_t local_host_id, const MemoryConfig& mem_config, 96 std::unique_ptr<net::DispatcherThread> dispatcher, 97 std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups, 98 size_t workers_per_host); 99 100 //! destructor 101 ~HostContext(); 102 103 //! Construct a number of mock hosts running in this process. 104 static std::vector<std::unique_ptr<HostContext> > 105 ConstructLoopback(size_t num_hosts, size_t workers_per_host); 106 #endif 107 108 //! create host log 109 std::string MakeHostLogPath(size_t host_rank); 110 111 //! Returns local_host_id_ local_host_id() const112 size_t local_host_id() const { return local_host_id_; } 113 114 //! number of workers per host (all have the same). workers_per_host() const115 size_t workers_per_host() const { return workers_per_host_; } 116 117 //! memory limit of each worker Context for local data structures worker_mem_limit() const118 size_t worker_mem_limit() const { 119 return mem_config_.ram_workers_ / workers_per_host_; 120 } 121 122 //! host-global memory config mem_config()123 MemoryConfig& mem_config() { return mem_config_; } 124 125 //! host-global memory manager mem_manager()126 mem::Manager& mem_manager() { return mem_manager_; } 127 128 //! net manager constructs communication groups to other hosts. net_manager()129 net::Manager& net_manager() { return net_manager_; } 130 131 //! Returns id of this host in the cluser. A host is a machine in the 132 //! cluster that hosts multiple workers host_rank() const133 size_t host_rank() const { return net_manager_.my_host_rank(); } 134 135 //! the flow control group is used for collective communication. flow_manager()136 net::FlowControlChannelManager& flow_manager() { return flow_manager_; } 137 138 //! the block manager keeps all data blocks moving through the system. block_pool()139 data::BlockPool& block_pool() { return block_pool_; } 140 141 //! data multiplexer transmits large amounts of data asynchronously. data_multiplexer()142 data::Multiplexer& data_multiplexer() { return data_multiplexer_; } 143 144 private: 145 //! memory configuration 146 MemoryConfig mem_config_; 147 148 public: 149 //! \name Logging System 150 //! \{ 151 152 //! base logger exclusive for this host context 153 common::JsonLogger base_logger_; 154 155 //! public member which delivers key:value pairs as JSON log lines. this 156 //! logger is local to this Context which is exclusive for one worker 157 //! thread. 158 common::JsonLogger logger_; 159 160 //! thread for scheduling profiling methods for statistical output 161 std::unique_ptr<common::ProfileThread> profiler_; 162 163 //! \} 164 165 private: 166 //! id among all _local_ hosts (in test program runs) 167 size_t local_host_id_; 168 169 //! number of workers per host (all have the same). 170 size_t workers_per_host_; 171 172 //! host-global memory manager for internal memory only 173 mem::Manager mem_manager_ { nullptr, "HostContext" }; 174 175 //! main host network dispatcher thread backend 176 std::unique_ptr<net::DispatcherThread> dispatcher_; 177 178 //! net manager constructs communication groups to other hosts. 179 net::Manager net_manager_; 180 181 #if !THRILL_HAVE_THREAD_SANITIZER 182 //! register net_manager_'s profiling method 183 common::ProfileTaskRegistration net_manager_profiler_ { 184 std::chrono::milliseconds(500), *profiler_, &net_manager_ 185 }; 186 #endif 187 188 //! the flow control group is used for collective communication. 189 net::FlowControlChannelManager flow_manager_ { 190 net_manager_.GetFlowGroup(), workers_per_host_ 191 }; 192 193 //! data block pool 194 data::BlockPool block_pool_ { 195 mem_config_.ram_block_pool_soft_, mem_config_.ram_block_pool_hard_, 196 &logger_, &mem_manager_, workers_per_host_ 197 }; 198 199 #if !THRILL_HAVE_THREAD_SANITIZER 200 //! register BlockPool's profiling method 201 common::ProfileTaskRegistration block_pool_profiler_ { 202 std::chrono::milliseconds(500), *profiler_, &block_pool_ 203 }; 204 #endif 205 206 //! data multiplexer transmits large amounts of data asynchronously. 207 data::Multiplexer data_multiplexer_ { 208 mem_manager_, block_pool_, 209 *dispatcher_, net_manager_.GetDataGroup(), workers_per_host_ 210 }; 211 }; 212 213 /*! 214 * The Context of a job is a unique instance per worker which holds references 215 * to all underlying parts of Thrill. The context is able to give references to 216 * the \ref data::Multiplexer "stream multiplexer", the \ref net::Group "net 217 * group" \ref common::Stats "stats" and \ref common::StatsGraph "stats graph". 218 * Threads share the stream multiplexer and the net group via the context 219 * object. 220 */ 221 class Context 222 { 223 public: 224 Context(HostContext& host_context, size_t local_worker_id); 225 226 //! method used to launch a job's main procedure. it wraps it in log output. 227 void Launch(const std::function<void(Context&)>& job_startpoint); 228 229 //! \name System Information 230 //! \{ 231 232 //! Returns the total number of hosts. num_hosts() const233 size_t num_hosts() const { 234 return net_manager_.num_hosts(); 235 } 236 237 //! Returns the number of workers that is hosted on each host workers_per_host() const238 size_t workers_per_host() const { 239 return workers_per_host_; 240 } 241 242 //! Global rank of this worker among all other workers in the system. my_rank() const243 size_t my_rank() const { 244 return workers_per_host() * host_rank() + local_worker_id(); 245 } 246 247 //! memory limit of this worker Context for local data structures mem_limit() const248 size_t mem_limit() const { return mem_limit_; } 249 250 //! Global number of workers in the system. num_workers() const251 size_t num_workers() const { 252 return num_hosts() * workers_per_host(); 253 } 254 255 //! Returns id of this host in the cluser 256 //! A host is a machine in the cluster that hosts multiple workers host_rank() const257 size_t host_rank() const { 258 return net_manager_.my_host_rank(); 259 } 260 261 //! Returns the local id ot this worker on the host 262 //! A worker is _locally_ identified by this id local_worker_id() const263 size_t local_worker_id() const { 264 return local_worker_id_; 265 } 266 267 //! id among all _local_ hosts (in test program runs) local_host_id() const268 size_t local_host_id() const { return local_host_id_; } 269 270 #ifndef SWIG 271 //! Outputs the context as [host id]:[local worker id] to an std::ostream operator <<(std::ostream & os,const Context & ctx)272 friend std::ostream& operator << (std::ostream& os, const Context& ctx) { 273 return os << ctx.host_rank() << ":" << ctx.local_worker_id(); 274 } 275 #endif 276 277 //! \} 278 279 //! \name Data Subsystem 280 //! \{ 281 282 //! Returns a new File object containing a sequence of local Blocks. GetFile(size_t dia_id)283 data::File GetFile(size_t dia_id) { 284 return data::File(block_pool_, local_worker_id_, dia_id); 285 } 286 287 //! Returns a new File object containing a sequence of local Blocks. 288 data::File GetFile(DIABase* dia); 289 290 //! Returns a new File, wrapped in a CountingPtr, containing a sequence of 291 //! local Blocks. 292 data::FilePtr GetFilePtr(size_t dia_id); 293 294 //! Returns a new File, wrapped in a CountingPtr, containing a sequence of 295 //! local Blocks. 296 data::FilePtr GetFilePtr(DIABase* dia); 297 298 //! Returns a reference to a new CatStream. This method alters the state of 299 //! the context and must be called on all Workers to ensure correct 300 //! communication coordination. 301 data::CatStreamPtr GetNewCatStream(size_t dia_id); 302 303 //! Returns a reference to a new CatStream. This method alters the state of 304 //! the context and must be called on all Workers to ensure correct 305 //! communication coordination. 306 data::CatStreamPtr GetNewCatStream(DIABase* dia); 307 308 //! Returns a reference to a new MixStream. This method alters the state 309 //! of the context and must be called on all Workers to ensure correct 310 //! communication coordination. 311 data::MixStreamPtr GetNewMixStream(size_t dia_id); 312 313 //! Returns a reference to a new MixStream. This method alters the state 314 //! of the context and must be called on all Workers to ensure correct 315 //! communication coordination. 316 data::MixStreamPtr GetNewMixStream(DIABase* dia); 317 318 //! Returns a reference to a new CatStream or MixStream, selectable via 319 //! template parameter. 320 template <typename Stream> 321 tlx::CountingPtr<Stream> GetNewStream(size_t dia_id); 322 323 //! the block manager keeps all data blocks moving through the system. block_pool()324 data::BlockPool& block_pool() { return block_pool_; } 325 326 //! \} 327 328 //! host-global memory config mem_config() const329 const MemoryConfig& mem_config() const { return mem_config_; } 330 331 //! returns the host-global memory manager mem_manager()332 mem::Manager& mem_manager() { return mem_manager_; } 333 net_manager()334 net::Manager& net_manager() { return net_manager_; } 335 336 //! given a global range [0,global_size) and p PEs to split the range, calculate 337 //! the [local_begin,local_end) index range assigned to the PE i. Takes the 338 //! information from the Context. CalculateLocalRange(size_t global_size) const339 common::Range CalculateLocalRange(size_t global_size) const { 340 return common::CalculateLocalRange( 341 global_size, num_workers(), my_rank()); 342 } 343 CalculateLocalRangeOnHost(size_t global_size) const344 common::Range CalculateLocalRangeOnHost(size_t global_size) const { 345 return common::CalculateLocalRange( 346 global_size, workers_per_host(), local_worker_id()); 347 } 348 349 //! Perform collectives and print min, max, mean, stdev, and all local 350 //! values. 351 template <typename Type> PrintCollectiveMeanStdev(const char * text,const Type & local)352 void PrintCollectiveMeanStdev(const char* text, const Type& local) { 353 std::vector<Type> svec = { local }; 354 svec = net.Reduce(svec, 0, common::VectorConcat<Type>()); 355 if (my_rank() == 0) { 356 double sum = std::accumulate(svec.begin(), svec.end(), 0.0); 357 double mean = sum / svec.size(); 358 359 double sq_sum = std::inner_product( 360 svec.begin(), svec.end(), svec.begin(), 0.0); 361 double stdev = std::sqrt(sq_sum / svec.size() - mean * mean); 362 363 double min = *std::min_element(svec.begin(), svec.end()); 364 double max = *std::max_element(svec.begin(), svec.end()); 365 366 LOG1 << text << " mean " << mean 367 << " max " << max << " stdev " << stdev 368 << " = " << (stdev / mean * 100.0) << "%" 369 << " max-min " << max - min 370 << " = " << ((max - min) / min * 100.0) << "%" 371 << " max-mean " << max - mean 372 << " = " << ((max - mean) / mean * 100.0) << "%" 373 << " svec " << svec; 374 } 375 } 376 377 //! return value of consume flag. consume() const378 bool consume() const { return consume_; } 379 380 /*! 381 * Sets consume-mode flag such that DIA contents may be consumed during 382 * PushData(). When in consume mode the DIA contents is destroyed online 383 * when it is transmitted to the next operation. This enables reusing the 384 * space of the consume operations. This enabled processing more data with 385 * less space. However, by default this mode is DISABLED, because it 386 * requires deliberate insertion of .Keep() calls. 387 */ enable_consume(bool consume=true)388 void enable_consume(bool consume = true) { consume_ = consume; } 389 390 //! Returns next_dia_id_ to generate DIA::id_ serial. next_dia_id()391 size_t next_dia_id() { return ++last_dia_id_; } 392 393 private: 394 //! id among all _local_ hosts (in test program runs) 395 size_t local_host_id_; 396 397 //! number of this host context, 0..p-1, within this host 398 size_t local_worker_id_; 399 400 //! number of workers hosted per host 401 size_t workers_per_host_; 402 403 //! memory limit of this worker Context for local data structures 404 size_t mem_limit_; 405 406 //! memory configuration in HostContext 407 const MemoryConfig& mem_config_; 408 409 //! host-global memory manager 410 mem::Manager& mem_manager_; 411 412 //! net::Manager instance that is shared among workers 413 net::Manager& net_manager_; 414 415 //! net::FlowControlChannelManager instance that is shared among workers 416 net::FlowControlChannelManager& flow_manager_; 417 418 //! data block pool 419 data::BlockPool& block_pool_; 420 421 //! data::Multiplexer instance that is shared among workers 422 data::Multiplexer& multiplexer_; 423 424 //! flag to set which enables selective consumption of DIA contents! 425 bool consume_ = false; 426 427 //! the number of valid DIA ids. 0 is reserved for invalid. 428 size_t last_dia_id_ = 0; 429 430 public: 431 //! \name Shared Objects 432 //! \{ 433 434 //! a random generator 435 std::default_random_engine rng_; 436 437 //! \} 438 439 public: 440 //! \name Network Subsystem 441 //! \{ 442 443 //! public member which exposes all network primitives from 444 //! FlowControlChannel for DOp implementations. Use it as 445 //! `context_.net.Method()`. 446 net::FlowControlChannel& net { 447 flow_manager_.GetFlowControlChannel(local_worker_id_) 448 }; 449 450 //! \} 451 452 public: 453 //! \name Logging System 454 //! \{ 455 456 //! base logger exclusive for this worker 457 common::JsonLogger base_logger_; 458 459 //! public member which delivers key:value pairs as JSON log lines. this 460 //! logger is local to this Context which is exclusive for one worker 461 //! thread. 462 common::JsonLogger logger_ { 463 &base_logger_, "host_rank", host_rank(), "worker_rank", my_rank() 464 }; 465 466 //! \} 467 }; 468 469 //! \name Run Methods with Internal Networks for Testing 470 //! \{ 471 472 /*! 473 * Function to run a number of mock hosts as locally independent threads, which 474 * communicate via internal stream sockets. 475 */ 476 void RunLocalMock(const MemoryConfig& mem_config, 477 size_t num_hosts, size_t workers_per_host, 478 const std::function<void(Context&)>& job_startpoint); 479 480 /*! 481 * Helper Function to execute RunLocalMock() tests using mock networks in test 482 * suite for many different numbers of workers and hosts as independent threads 483 * in one program. Use this function in most test cases. 484 */ 485 void RunLocalTests(const std::function<void(Context&)>& job_startpoint); 486 487 /*! 488 * Helper Function to execute RunLocalMock() tests using mock networks in test 489 * suite for many different numbers of workers and hosts as independent threads 490 * in one program. Use this function in most test cases. 491 */ 492 void RunLocalTests( 493 size_t ram, const std::function<void(Context&)>& job_startpoint); 494 495 /*! 496 * Runs the given job_startpoint within the same thread with a test network --> 497 * run test with one host and one thread. 498 */ 499 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint); 500 501 /*! 502 * Check environment variable THRILL_DIE_WITH_PARENT and enable process flag: 503 * this is useful for ssh/invoke.sh: it kills spawned processes when the ssh 504 * connection breaks. Hence: no more zombies. 505 */ 506 int RunCheckDieWithParent(); 507 508 /*! 509 * Check environment variable THRILL_UNLINK_BINARY and unlink given program 510 * path: this is useful for ssh/invoke.sh: it removes the copied program files 511 * _while_ it is running, hence it is gone even if the program crashes. 512 */ 513 int RunCheckUnlinkBinary(); 514 515 //! \} 516 517 /*! 518 * Runs the given job startpoint with a Context instance. Startpoints may be 519 * called multiple times with concurrent threads and different context instances 520 * across different workers. The Thrill configuration is taken from environment 521 * variables starting the THRILL_. 522 * 523 * THRILL_NET is the network backend to use, e.g.: mock, local, tcp, or mpi. 524 * 525 * THRILL_RANK contains the rank of this worker 526 * 527 * THRILL_HOSTLIST contains a space- or comma-separated list of host:ports to 528 * connect to. 529 * 530 * THRILL_WORKERS_PER_HOST is the number of workers (threads) per host. 531 * 532 * Additional variables: 533 * 534 * THRILL_DIE_WITH_PARENT sets a flag which terminates the program if the caller 535 * terminates (this is automatically set by ssh/invoke.sh). No more zombies. 536 * 537 * THRILL_UNLINK_BINARY deletes a file. Used by ssh/invoke.sh to unlink a copied 538 * program binary while it is running. Hence, it can keep /tmp clean. 539 * 540 * \returns 0 if execution was fine on all threads. 541 */ 542 int Run(const std::function<void(Context&)>& job_startpoint); 543 544 //! \} 545 546 } // namespace api 547 548 //! imported from api namespace 549 using api::HostContext; 550 551 //! imported from api namespace 552 using api::Context; 553 554 //! imported from api namespace 555 using api::Run; 556 557 } // namespace thrill 558 559 #endif // !THRILL_API_CONTEXT_HEADER 560 561 /******************************************************************************/ 562