1 /*******************************************************************************
2  * thrill/api/context.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
7  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
8  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_CONTEXT_HEADER
15 #define THRILL_API_CONTEXT_HEADER
16 
17 #include <thrill/common/config.hpp>
18 #include <thrill/common/defines.hpp>
19 #include <thrill/common/json_logger.hpp>
20 #include <thrill/common/profile_task.hpp>
21 #include <thrill/data/block_pool.hpp>
22 #include <thrill/data/cat_stream.hpp>
23 #include <thrill/data/file.hpp>
24 #include <thrill/data/mix_stream.hpp>
25 #include <thrill/data/multiplexer.hpp>
26 #include <thrill/net/flow_control_channel.hpp>
27 #include <thrill/net/flow_control_manager.hpp>
28 #include <thrill/net/manager.hpp>
29 
30 #include <algorithm>
31 #include <cassert>
32 #include <functional>
33 #include <numeric>
34 #include <random>
35 #include <string>
36 #include <tuple>
37 #include <vector>
38 
39 namespace thrill {
40 namespace api {
41 
42 //! \ingroup api_layer
43 //! \{
44 
45 // forward declarations
46 class DIABase;
47 
48 class MemoryConfig
49 {
50 public:
51     //! detect memory configuration from environment
52     int setup_detect();
53 
54     //! setup memory size
55     void setup(size_t ram);
56 
57     MemoryConfig divide(size_t hosts) const;
58     void apply();
59 
60     void print(size_t workers_per_host) const;
61 
62     //! total amount of physical ram detected or THRILL_RAM
63     size_t ram_;
64 
65     //! amount of RAM dedicated to data::BlockPool -- hard limit
66     size_t ram_block_pool_hard_;
67 
68     //! amount of RAM dedicated to data::BlockPool -- soft limit
69     size_t ram_block_pool_soft_;
70 
71     //! total amount of RAM for DIANode data structures such as the reduce
72     //! tables. divide by the number of worker threads before use.
73     size_t ram_workers_;
74 
75     //! remaining free-floating RAM used for user and Thrill data structures.
76     size_t ram_floating_;
77 
78     //! StageBuilder verbosity flag
79     bool verbose_ = true;
80 
81     //! enable Linux /proc stats profiler (default: on)
82     bool enable_proc_profiler_ = true;
83 };
84 
85 /*!
86  * The HostContext contains all data structures shared among workers on the same
87  * host. It is used to construct and destroy them. For testing multiple
88  * instances are run in the same process.
89  */
90 class HostContext
91 {
92 public:
93 #ifndef SWIG
94     //! constructor from existing net Groups. Used by the construction methods.
95     HostContext(size_t local_host_id, const MemoryConfig& mem_config,
96                 std::unique_ptr<net::DispatcherThread> dispatcher,
97                 std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
98                 size_t workers_per_host);
99 
100     //! destructor
101     ~HostContext();
102 
103     //! Construct a number of mock hosts running in this process.
104     static std::vector<std::unique_ptr<HostContext> >
105     ConstructLoopback(size_t num_hosts, size_t workers_per_host);
106 #endif
107 
108     //! create host log
109     std::string MakeHostLogPath(size_t host_rank);
110 
111     //! Returns local_host_id_
local_host_id() const112     size_t local_host_id() const { return local_host_id_; }
113 
114     //! number of workers per host (all have the same).
workers_per_host() const115     size_t workers_per_host() const { return workers_per_host_; }
116 
117     //! memory limit of each worker Context for local data structures
worker_mem_limit() const118     size_t worker_mem_limit() const {
119         return mem_config_.ram_workers_ / workers_per_host_;
120     }
121 
122     //! host-global memory config
mem_config()123     MemoryConfig& mem_config() { return mem_config_; }
124 
125     //! host-global memory manager
mem_manager()126     mem::Manager& mem_manager() { return mem_manager_; }
127 
128     //! net manager constructs communication groups to other hosts.
net_manager()129     net::Manager& net_manager() { return net_manager_; }
130 
131     //! Returns id of this host in the cluser. A host is a machine in the
132     //! cluster that hosts multiple workers
host_rank() const133     size_t host_rank() const { return net_manager_.my_host_rank(); }
134 
135     //! the flow control group is used for collective communication.
flow_manager()136     net::FlowControlChannelManager& flow_manager() { return flow_manager_; }
137 
138     //! the block manager keeps all data blocks moving through the system.
block_pool()139     data::BlockPool& block_pool() { return block_pool_; }
140 
141     //! data multiplexer transmits large amounts of data asynchronously.
data_multiplexer()142     data::Multiplexer& data_multiplexer() { return data_multiplexer_; }
143 
144 private:
145     //! memory configuration
146     MemoryConfig mem_config_;
147 
148 public:
149     //! \name Logging System
150     //! \{
151 
152     //! base logger exclusive for this host context
153     common::JsonLogger base_logger_;
154 
155     //! public member which delivers key:value pairs as JSON log lines. this
156     //! logger is local to this Context which is exclusive for one worker
157     //! thread.
158     common::JsonLogger logger_;
159 
160     //! thread for scheduling profiling methods for statistical output
161     std::unique_ptr<common::ProfileThread> profiler_;
162 
163     //! \}
164 
165 private:
166     //! id among all _local_ hosts (in test program runs)
167     size_t local_host_id_;
168 
169     //! number of workers per host (all have the same).
170     size_t workers_per_host_;
171 
172     //! host-global memory manager for internal memory only
173     mem::Manager mem_manager_ { nullptr, "HostContext" };
174 
175     //! main host network dispatcher thread backend
176     std::unique_ptr<net::DispatcherThread> dispatcher_;
177 
178     //! net manager constructs communication groups to other hosts.
179     net::Manager net_manager_;
180 
181 #if !THRILL_HAVE_THREAD_SANITIZER
182     //! register net_manager_'s profiling method
183     common::ProfileTaskRegistration net_manager_profiler_ {
184         std::chrono::milliseconds(500), *profiler_, &net_manager_
185     };
186 #endif
187 
188     //! the flow control group is used for collective communication.
189     net::FlowControlChannelManager flow_manager_ {
190         net_manager_.GetFlowGroup(), workers_per_host_
191     };
192 
193     //! data block pool
194     data::BlockPool block_pool_ {
195         mem_config_.ram_block_pool_soft_, mem_config_.ram_block_pool_hard_,
196         &logger_, &mem_manager_, workers_per_host_
197     };
198 
199 #if !THRILL_HAVE_THREAD_SANITIZER
200     //! register BlockPool's profiling method
201     common::ProfileTaskRegistration block_pool_profiler_ {
202         std::chrono::milliseconds(500), *profiler_, &block_pool_
203     };
204 #endif
205 
206     //! data multiplexer transmits large amounts of data asynchronously.
207     data::Multiplexer data_multiplexer_ {
208         mem_manager_, block_pool_,
209         *dispatcher_, net_manager_.GetDataGroup(), workers_per_host_
210     };
211 };
212 
213 /*!
214  * The Context of a job is a unique instance per worker which holds references
215  * to all underlying parts of Thrill. The context is able to give references to
216  * the \ref data::Multiplexer "stream multiplexer", the \ref net::Group "net
217  * group" \ref common::Stats "stats" and \ref common::StatsGraph "stats graph".
218  * Threads share the stream multiplexer and the net group via the context
219  * object.
220  */
221 class Context
222 {
223 public:
224     Context(HostContext& host_context, size_t local_worker_id);
225 
226     //! method used to launch a job's main procedure. it wraps it in log output.
227     void Launch(const std::function<void(Context&)>& job_startpoint);
228 
229     //! \name System Information
230     //! \{
231 
232     //! Returns the total number of hosts.
num_hosts() const233     size_t num_hosts() const {
234         return net_manager_.num_hosts();
235     }
236 
237     //! Returns the number of workers that is hosted on each host
workers_per_host() const238     size_t workers_per_host() const {
239         return workers_per_host_;
240     }
241 
242     //! Global rank of this worker among all other workers in the system.
my_rank() const243     size_t my_rank() const {
244         return workers_per_host() * host_rank() + local_worker_id();
245     }
246 
247     //! memory limit of this worker Context for local data structures
mem_limit() const248     size_t mem_limit() const { return mem_limit_; }
249 
250     //! Global number of workers in the system.
num_workers() const251     size_t num_workers() const {
252         return num_hosts() * workers_per_host();
253     }
254 
255     //! Returns id of this host in the cluser
256     //! A host is a machine in the cluster that hosts multiple workers
host_rank() const257     size_t host_rank() const {
258         return net_manager_.my_host_rank();
259     }
260 
261     //! Returns the local id ot this worker on the host
262     //! A worker is _locally_ identified by this id
local_worker_id() const263     size_t local_worker_id() const {
264         return local_worker_id_;
265     }
266 
267     //! id among all _local_ hosts (in test program runs)
local_host_id() const268     size_t local_host_id() const { return local_host_id_; }
269 
270 #ifndef SWIG
271     //! Outputs the context as [host id]:[local worker id] to an std::ostream
operator <<(std::ostream & os,const Context & ctx)272     friend std::ostream& operator << (std::ostream& os, const Context& ctx) {
273         return os << ctx.host_rank() << ":" << ctx.local_worker_id();
274     }
275 #endif
276 
277     //! \}
278 
279     //! \name Data Subsystem
280     //! \{
281 
282     //! Returns a new File object containing a sequence of local Blocks.
GetFile(size_t dia_id)283     data::File GetFile(size_t dia_id) {
284         return data::File(block_pool_, local_worker_id_, dia_id);
285     }
286 
287     //! Returns a new File object containing a sequence of local Blocks.
288     data::File GetFile(DIABase* dia);
289 
290     //! Returns a new File, wrapped in a CountingPtr, containing a sequence of
291     //! local Blocks.
292     data::FilePtr GetFilePtr(size_t dia_id);
293 
294     //! Returns a new File, wrapped in a CountingPtr, containing a sequence of
295     //! local Blocks.
296     data::FilePtr GetFilePtr(DIABase* dia);
297 
298     //! Returns a reference to a new CatStream. This method alters the state of
299     //! the context and must be called on all Workers to ensure correct
300     //! communication coordination.
301     data::CatStreamPtr GetNewCatStream(size_t dia_id);
302 
303     //! Returns a reference to a new CatStream. This method alters the state of
304     //! the context and must be called on all Workers to ensure correct
305     //! communication coordination.
306     data::CatStreamPtr GetNewCatStream(DIABase* dia);
307 
308     //! Returns a reference to a new MixStream. This method alters the state
309     //! of the context and must be called on all Workers to ensure correct
310     //! communication coordination.
311     data::MixStreamPtr GetNewMixStream(size_t dia_id);
312 
313     //! Returns a reference to a new MixStream. This method alters the state
314     //! of the context and must be called on all Workers to ensure correct
315     //! communication coordination.
316     data::MixStreamPtr GetNewMixStream(DIABase* dia);
317 
318     //! Returns a reference to a new CatStream or MixStream, selectable via
319     //! template parameter.
320     template <typename Stream>
321     tlx::CountingPtr<Stream> GetNewStream(size_t dia_id);
322 
323     //! the block manager keeps all data blocks moving through the system.
block_pool()324     data::BlockPool& block_pool() { return block_pool_; }
325 
326     //! \}
327 
328     //! host-global memory config
mem_config() const329     const MemoryConfig& mem_config() const { return mem_config_; }
330 
331     //! returns the host-global memory manager
mem_manager()332     mem::Manager& mem_manager() { return mem_manager_; }
333 
net_manager()334     net::Manager& net_manager() { return net_manager_; }
335 
336     //! given a global range [0,global_size) and p PEs to split the range, calculate
337     //! the [local_begin,local_end) index range assigned to the PE i. Takes the
338     //! information from the Context.
CalculateLocalRange(size_t global_size) const339     common::Range CalculateLocalRange(size_t global_size) const {
340         return common::CalculateLocalRange(
341             global_size, num_workers(), my_rank());
342     }
343 
CalculateLocalRangeOnHost(size_t global_size) const344     common::Range CalculateLocalRangeOnHost(size_t global_size) const {
345         return common::CalculateLocalRange(
346             global_size, workers_per_host(), local_worker_id());
347     }
348 
349     //! Perform collectives and print min, max, mean, stdev, and all local
350     //! values.
351     template <typename Type>
PrintCollectiveMeanStdev(const char * text,const Type & local)352     void PrintCollectiveMeanStdev(const char* text, const Type& local) {
353         std::vector<Type> svec = { local };
354         svec = net.Reduce(svec, 0, common::VectorConcat<Type>());
355         if (my_rank() == 0) {
356             double sum = std::accumulate(svec.begin(), svec.end(), 0.0);
357             double mean = sum / svec.size();
358 
359             double sq_sum = std::inner_product(
360                 svec.begin(), svec.end(), svec.begin(), 0.0);
361             double stdev = std::sqrt(sq_sum / svec.size() - mean * mean);
362 
363             double min = *std::min_element(svec.begin(), svec.end());
364             double max = *std::max_element(svec.begin(), svec.end());
365 
366             LOG1 << text << " mean " << mean
367                  << " max " << max << " stdev " << stdev
368                  << " = " << (stdev / mean * 100.0) << "%"
369                  << " max-min " << max - min
370                  << " = " << ((max - min) / min * 100.0) << "%"
371                  << " max-mean " << max - mean
372                  << " = " << ((max - mean) / mean * 100.0) << "%"
373                  << " svec " << svec;
374         }
375     }
376 
377     //! return value of consume flag.
consume() const378     bool consume() const { return consume_; }
379 
380     /*!
381      * Sets consume-mode flag such that DIA contents may be consumed during
382      * PushData(). When in consume mode the DIA contents is destroyed online
383      * when it is transmitted to the next operation. This enables reusing the
384      * space of the consume operations. This enabled processing more data with
385      * less space. However, by default this mode is DISABLED, because it
386      * requires deliberate insertion of .Keep() calls.
387      */
enable_consume(bool consume=true)388     void enable_consume(bool consume = true) { consume_ = consume; }
389 
390     //! Returns next_dia_id_ to generate DIA::id_ serial.
next_dia_id()391     size_t next_dia_id() { return ++last_dia_id_; }
392 
393 private:
394     //! id among all _local_ hosts (in test program runs)
395     size_t local_host_id_;
396 
397     //! number of this host context, 0..p-1, within this host
398     size_t local_worker_id_;
399 
400     //! number of workers hosted per host
401     size_t workers_per_host_;
402 
403     //! memory limit of this worker Context for local data structures
404     size_t mem_limit_;
405 
406     //! memory configuration in HostContext
407     const MemoryConfig& mem_config_;
408 
409     //! host-global memory manager
410     mem::Manager& mem_manager_;
411 
412     //! net::Manager instance that is shared among workers
413     net::Manager& net_manager_;
414 
415     //! net::FlowControlChannelManager instance that is shared among workers
416     net::FlowControlChannelManager& flow_manager_;
417 
418     //! data block pool
419     data::BlockPool& block_pool_;
420 
421     //! data::Multiplexer instance that is shared among workers
422     data::Multiplexer& multiplexer_;
423 
424     //! flag to set which enables selective consumption of DIA contents!
425     bool consume_ = false;
426 
427     //! the number of valid DIA ids. 0 is reserved for invalid.
428     size_t last_dia_id_ = 0;
429 
430 public:
431     //! \name Shared Objects
432     //! \{
433 
434     //! a random generator
435     std::default_random_engine rng_;
436 
437     //! \}
438 
439 public:
440     //! \name Network Subsystem
441     //! \{
442 
443     //! public member which exposes all network primitives from
444     //! FlowControlChannel for DOp implementations. Use it as
445     //! `context_.net.Method()`.
446     net::FlowControlChannel& net {
447         flow_manager_.GetFlowControlChannel(local_worker_id_)
448     };
449 
450     //! \}
451 
452 public:
453     //! \name Logging System
454     //! \{
455 
456     //! base logger exclusive for this worker
457     common::JsonLogger base_logger_;
458 
459     //! public member which delivers key:value pairs as JSON log lines. this
460     //! logger is local to this Context which is exclusive for one worker
461     //! thread.
462     common::JsonLogger logger_ {
463         &base_logger_, "host_rank", host_rank(), "worker_rank", my_rank()
464     };
465 
466     //! \}
467 };
468 
469 //! \name Run Methods with Internal Networks for Testing
470 //! \{
471 
472 /*!
473  * Function to run a number of mock hosts as locally independent threads, which
474  * communicate via internal stream sockets.
475  */
476 void RunLocalMock(const MemoryConfig& mem_config,
477                   size_t num_hosts, size_t workers_per_host,
478                   const std::function<void(Context&)>& job_startpoint);
479 
480 /*!
481  * Helper Function to execute RunLocalMock() tests using mock networks in test
482  * suite for many different numbers of workers and hosts as independent threads
483  * in one program. Use this function in most test cases.
484  */
485 void RunLocalTests(const std::function<void(Context&)>& job_startpoint);
486 
487 /*!
488  * Helper Function to execute RunLocalMock() tests using mock networks in test
489  * suite for many different numbers of workers and hosts as independent threads
490  * in one program. Use this function in most test cases.
491  */
492 void RunLocalTests(
493     size_t ram, const std::function<void(Context&)>& job_startpoint);
494 
495 /*!
496  * Runs the given job_startpoint within the same thread with a test network -->
497  * run test with one host and one thread.
498  */
499 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint);
500 
501 /*!
502  * Check environment variable THRILL_DIE_WITH_PARENT and enable process flag:
503  * this is useful for ssh/invoke.sh: it kills spawned processes when the ssh
504  * connection breaks. Hence: no more zombies.
505  */
506 int RunCheckDieWithParent();
507 
508 /*!
509  * Check environment variable THRILL_UNLINK_BINARY and unlink given program
510  * path: this is useful for ssh/invoke.sh: it removes the copied program files
511  * _while_ it is running, hence it is gone even if the program crashes.
512  */
513 int RunCheckUnlinkBinary();
514 
515 //! \}
516 
517 /*!
518  * Runs the given job startpoint with a Context instance.  Startpoints may be
519  * called multiple times with concurrent threads and different context instances
520  * across different workers.  The Thrill configuration is taken from environment
521  * variables starting the THRILL_.
522  *
523  * THRILL_NET is the network backend to use, e.g.: mock, local, tcp, or mpi.
524  *
525  * THRILL_RANK contains the rank of this worker
526  *
527  * THRILL_HOSTLIST contains a space- or comma-separated list of host:ports to
528  * connect to.
529  *
530  * THRILL_WORKERS_PER_HOST is the number of workers (threads) per host.
531  *
532  * Additional variables:
533  *
534  * THRILL_DIE_WITH_PARENT sets a flag which terminates the program if the caller
535  * terminates (this is automatically set by ssh/invoke.sh). No more zombies.
536  *
537  * THRILL_UNLINK_BINARY deletes a file. Used by ssh/invoke.sh to unlink a copied
538  * program binary while it is running. Hence, it can keep /tmp clean.
539  *
540  * \returns 0 if execution was fine on all threads.
541  */
542 int Run(const std::function<void(Context&)>& job_startpoint);
543 
544 //! \}
545 
546 } // namespace api
547 
548 //! imported from api namespace
549 using api::HostContext;
550 
551 //! imported from api namespace
552 using api::Context;
553 
554 //! imported from api namespace
555 using api::Run;
556 
557 } // namespace thrill
558 
559 #endif // !THRILL_API_CONTEXT_HEADER
560 
561 /******************************************************************************/
562