1 /*******************************************************************************
2  * thrill/api/context.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
7  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/context.hpp>
13 
14 #include <thrill/api/dia_base.hpp>
15 #include <thrill/common/linux_proc_stats.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
18 #include <thrill/common/porting.hpp>
19 #include <thrill/common/profile_thread.hpp>
20 #include <thrill/common/string.hpp>
21 #include <thrill/common/system_exception.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 
24 #include <foxxll/io/iostats.hpp>
25 #include <foxxll/mng/config.hpp>
26 #include <tlx/math/abs_diff.hpp>
27 #include <tlx/port/setenv.hpp>
28 #include <tlx/string/format_si_iec_units.hpp>
29 #include <tlx/string/parse_si_iec_units.hpp>
30 #include <tlx/string/split.hpp>
31 
32 // mock net backend is always available -tb :)
33 #include <thrill/net/mock/group.hpp>
34 
35 #if THRILL_HAVE_NET_TCP
36 #include <thrill/net/tcp/construct.hpp>
37 #include <thrill/net/tcp/select_dispatcher.hpp>
38 #endif
39 
40 #if THRILL_HAVE_NET_MPI
41 #include <thrill/net/mpi/dispatcher.hpp>
42 #include <thrill/net/mpi/group.hpp>
43 #endif
44 
45 #if THRILL_HAVE_NET_IB
46 #include <thrill/net/ib/group.hpp>
47 #endif
48 
49 #if __linux__
50 
51 // linux-specific process control
52 #include <sys/prctl.h>
53 
54 // for calling getrlimit() to determine memory limit
55 #include <sys/resource.h>
56 #include <sys/time.h>
57 
58 #endif
59 
60 #if __APPLE__
61 
62 // for sysctl()
63 #include <sys/sysctl.h>
64 #include <sys/types.h>
65 
66 #endif
67 
68 #if defined(_MSC_VER)
69 
70 // for detecting amount of physical memory
71 #include <windows.h>
72 
73 #endif
74 
75 #include <algorithm>
76 #include <csignal>
77 #include <iostream>
78 #include <memory>
79 #include <string>
80 #include <thread>
81 #include <tuple>
82 #include <utility>
83 #include <vector>
84 
85 namespace thrill {
86 namespace api {
87 
88 /******************************************************************************/
89 // Generic Network Construction
90 
91 //! Generic network constructor for net backends supporting loopback tests.
92 template <typename NetGroup>
93 static inline
94 std::vector<std::unique_ptr<HostContext> >
ConstructLoopbackHostContexts(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host)95 ConstructLoopbackHostContexts(
96     const MemoryConfig& mem_config,
97     size_t num_hosts, size_t workers_per_host) {
98 
99     static constexpr size_t kGroupCount = net::Manager::kGroupCount;
100 
101     // construct three full mesh loopback cliques, deliver net::Groups.
102     std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
103 
104     for (size_t g = 0; g < kGroupCount; ++g) {
105         group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
106     }
107 
108     std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
109     for (size_t h = 0; h < num_hosts; ++h) {
110         dispatcher.emplace_back(
111             std::make_unique<net::DispatcherThread>(
112                 std::make_unique<typename NetGroup::Dispatcher>(), h));
113     }
114 
115     // construct host context
116 
117     std::vector<std::unique_ptr<HostContext> > host_context;
118 
119     for (size_t h = 0; h < num_hosts; h++) {
120         std::array<net::GroupPtr, kGroupCount> host_group = {
121             { std::move(group[0][h]), std::move(group[1][h]) }
122         };
123 
124         host_context.emplace_back(
125             std::make_unique<HostContext>(
126                 h, mem_config, std::move(dispatcher[h]),
127                 std::move(host_group), workers_per_host));
128     }
129 
130     return host_context;
131 }
132 
133 //! Generic runner for backends supporting loopback tests.
134 template <typename NetGroup>
135 static inline void
RunLoopbackThreads(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host,size_t core_offset,const std::function<void (Context &)> & job_startpoint)136 RunLoopbackThreads(
137     const MemoryConfig& mem_config,
138     size_t num_hosts, size_t workers_per_host, size_t core_offset,
139     const std::function<void(Context&)>& job_startpoint) {
140 
141     MemoryConfig host_mem_config = mem_config.divide(num_hosts);
142     mem_config.print(workers_per_host);
143 
144     // construct a mock network of hosts
145     typename NetGroup::Dispatcher dispatcher;
146 
147     std::vector<std::unique_ptr<HostContext> > host_contexts =
148         ConstructLoopbackHostContexts<NetGroup>(
149             host_mem_config, num_hosts, workers_per_host);
150 
151     // launch thread for each of the workers on this host.
152     std::vector<std::thread> threads(num_hosts * workers_per_host);
153 
154     for (size_t host = 0; host < num_hosts; ++host) {
155         std::string log_prefix = "host " + std::to_string(host);
156         for (size_t worker = 0; worker < workers_per_host; ++worker) {
157             size_t id = host * workers_per_host + worker;
158             threads[id] = common::CreateThread(
159                 [&host_contexts, &job_startpoint, host, worker, log_prefix] {
160                     Context ctx(*host_contexts[host], worker);
161                     common::NameThisThread(
162                         log_prefix + " worker " + std::to_string(worker));
163 
164                     ctx.Launch(job_startpoint);
165                 });
166             common::SetCpuAffinity(threads[id], core_offset + id);
167         }
168     }
169 
170     // join worker threads
171     for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
172         threads[i].join();
173     }
174 }
175 
176 /******************************************************************************/
177 // Other Configuration Initializations
178 
SetupBlockSize()179 static inline bool SetupBlockSize() {
180 
181     const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
182     if (env_block_size == nullptr || *env_block_size == 0) return true;
183 
184     char* endptr;
185     data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
186 
187     if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
188         std::cerr << "Thrill: environment variable"
189                   << " THRILL_BLOCK_SIZE=" << env_block_size
190                   << " is not a valid number."
191                   << std::endl;
192         return false;
193     }
194 
195     data::start_block_size = data::default_block_size;
196 
197     std::cerr << "Thrill: setting default_block_size = "
198               << data::default_block_size
199               << std::endl;
200 
201     return true;
202 }
203 
FindWorkersPerHost(const char * & str_workers_per_host,const char * & env_workers_per_host)204 static inline size_t FindWorkersPerHost(
205     const char*& str_workers_per_host, const char*& env_workers_per_host) {
206 
207     char* endptr;
208 
209     // first check THRILL_WORKERS_PER_HOST
210 
211     str_workers_per_host = "THRILL_WORKERS_PER_HOST";
212     env_workers_per_host = getenv(str_workers_per_host);
213 
214     if (env_workers_per_host && *env_workers_per_host) {
215         size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
216         if (!endptr || *endptr != 0 || result == 0) {
217             std::cerr << "Thrill: environment variable"
218                       << ' ' << str_workers_per_host
219                       << '=' << env_workers_per_host
220                       << " is not a valid number of workers per host."
221                       << std::endl;
222             return 0;
223         }
224         else {
225             return result;
226         }
227     }
228 
229     // second check: look for OMP_NUM_THREADS
230 
231     str_workers_per_host = "OMP_NUM_THREADS";
232     env_workers_per_host = getenv(str_workers_per_host);
233 
234     if (env_workers_per_host && *env_workers_per_host) {
235         size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
236         if (!endptr || *endptr != 0 || result == 0) {
237             std::cerr << "Thrill: environment variable"
238                       << ' ' << str_workers_per_host
239                       << '=' << env_workers_per_host
240                       << " is not a valid number of workers per host."
241                       << std::endl;
242             // fall through, try next variable
243         }
244         else {
245             return result;
246         }
247     }
248 
249     // third check: look for SLURM_CPUS_ON_NODE
250 
251     str_workers_per_host = "SLURM_CPUS_ON_NODE";
252     env_workers_per_host = getenv(str_workers_per_host);
253 
254     if (env_workers_per_host && *env_workers_per_host) {
255         size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
256         if (!endptr || *endptr != 0 || result == 0) {
257             std::cerr << "Thrill: environment variable"
258                       << ' ' << str_workers_per_host
259                       << '=' << env_workers_per_host
260                       << " is not a valid number of workers per host."
261                       << std::endl;
262             // fall through, try next variable
263         }
264         else {
265             return result;
266         }
267     }
268 
269     // last check: return std::thread::hardware_concurrency()
270 
271     return std::thread::hardware_concurrency();
272 }
273 
Initialize()274 static inline bool Initialize() {
275 
276     if (!SetupBlockSize()) return false;
277 
278     vfs::Initialize();
279 
280     return true;
281 }
282 
Deinitialize()283 static inline bool Deinitialize() {
284 
285     vfs::Deinitialize();
286 
287     return true;
288 }
289 
290 /******************************************************************************/
291 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
292 
293 #if defined(_MSC_VER)
294 using TestGroup = net::mock::Group;
295 #else
296 using TestGroup = net::tcp::Group;
297 #endif
298 
RunLocalMock(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host,const std::function<void (Context &)> & job_startpoint)299 void RunLocalMock(const MemoryConfig& mem_config,
300                   size_t num_hosts, size_t workers_per_host,
301                   const std::function<void(Context&)>& job_startpoint) {
302 
303     return RunLoopbackThreads<TestGroup>(
304         mem_config, num_hosts, workers_per_host, 0, job_startpoint);
305 }
306 
307 std::vector<std::unique_ptr<HostContext> >
ConstructLoopback(size_t num_hosts,size_t workers_per_host)308 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
309 
310     // set fixed amount of RAM for testing
311     MemoryConfig mem_config;
312     mem_config.setup(4 * 1024 * 1024 * 1024llu);
313     mem_config.verbose_ = false;
314 
315     return ConstructLoopbackHostContexts<TestGroup>(
316         mem_config, num_hosts, workers_per_host);
317 }
318 
RunLocalTests(const std::function<void (Context &)> & job_startpoint)319 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
320     // set fixed amount of RAM for testing
321     RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
322 }
323 
RunLocalTests(size_t ram,const std::function<void (Context &)> & job_startpoint)324 void RunLocalTests(
325     size_t ram, const std::function<void(Context&)>& job_startpoint) {
326 
327     // discard json log
328     tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
329 
330     // set fixed amount of RAM for testing, disable /proc profiler
331     MemoryConfig mem_config;
332     mem_config.verbose_ = false;
333     mem_config.enable_proc_profiler_ = false;
334     mem_config.setup(ram);
335 
336     static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
337     static constexpr size_t num_workers_list[] = { 1, 3 };
338 
339     size_t max_mock_workers = 1000000;
340 
341     const char* env_max_mock_workers = getenv("THRILL_MAX_MOCK_WORKERS");
342     if (env_max_mock_workers && *env_max_mock_workers) {
343         // parse envvar only if it exists.
344         char* endptr;
345         max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
346 
347         if (!endptr || *endptr != 0 || max_mock_workers == 0) {
348             std::cerr << "Thrill: environment variable"
349                       << " THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
350                       << " is not a valid maximum number of mock hosts."
351                       << std::endl;
352             return;
353         }
354     }
355 
356     for (const size_t& num_hosts : num_hosts_list) {
357         for (const size_t& workers_per_host : num_workers_list) {
358             if (num_hosts * workers_per_host > max_mock_workers) {
359                 std::cerr << "Thrill: skipping test with "
360                           << num_hosts * workers_per_host
361                           << " workers > max workers " << max_mock_workers
362                           << std::endl;
363                 continue;
364             }
365 
366             LOG0 << "Thrill: running local test with "
367                  << num_hosts << " hosts and " << workers_per_host
368                  << " workers per host";
369 
370             RunLocalMock(mem_config, num_hosts, workers_per_host,
371                          job_startpoint);
372         }
373     }
374 }
375 
RunLocalSameThread(const std::function<void (Context &)> & job_startpoint)376 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
377 
378     size_t my_host_rank = 0;
379     size_t workers_per_host = 1;
380     size_t num_hosts = 1;
381     static constexpr size_t kGroupCount = net::Manager::kGroupCount;
382 
383     // set fixed amount of RAM for testing
384     MemoryConfig mem_config;
385     mem_config.verbose_ = false;
386     mem_config.setup(4 * 1024 * 1024 * 1024llu);
387     mem_config.print(workers_per_host);
388 
389     // construct two full mesh connection cliques, deliver net::tcp::Groups.
390     std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
391 
392     for (size_t g = 0; g < kGroupCount; ++g) {
393         group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
394     }
395 
396     std::array<net::GroupPtr, kGroupCount> host_group = {
397         { std::move(group[0][0]), std::move(group[1][0]) }
398     };
399 
400     auto dispatcher = std::make_unique<net::DispatcherThread>(
401         std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
402 
403     HostContext host_context(
404         0, mem_config,
405         std::move(dispatcher), std::move(host_group), workers_per_host);
406 
407     Context ctx(host_context, 0);
408     common::NameThisThread("worker " + std::to_string(my_host_rank));
409 
410     job_startpoint(ctx);
411 }
412 
413 /******************************************************************************/
414 // Run() Variants for Different Net Backends
415 
416 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
417 template <typename NetGroup>
418 static inline
RunBackendLoopback(const char * backend,const std::function<void (Context &)> & job_startpoint)419 int RunBackendLoopback(
420     const char* backend, const std::function<void(Context&)>& job_startpoint) {
421 
422     char* endptr;
423 
424     // determine number of loopback hosts
425 
426     size_t num_hosts = 2;
427 
428     const char* env_local = getenv("THRILL_LOCAL");
429     if (env_local && *env_local) {
430         // parse envvar only if it exists.
431         num_hosts = std::strtoul(env_local, &endptr, 10);
432 
433         if (!endptr || *endptr != 0 || num_hosts == 0) {
434             std::cerr << "Thrill: environment variable"
435                       << " THRILL_LOCAL=" << env_local
436                       << " is not a valid number of local loopback hosts."
437                       << std::endl;
438             return -1;
439         }
440     }
441 
442     // determine number of threads per loopback host
443 
444     const char* str_workers_per_host;
445     const char* env_workers_per_host;
446 
447     size_t workers_per_host = FindWorkersPerHost(
448         str_workers_per_host, env_workers_per_host);
449 
450     if (workers_per_host == 0)
451         return -1;
452 
453     // core offset for pinning
454 
455     const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
456     size_t core_offset = 0;
457     if (env_core_offset && *env_core_offset) {
458         core_offset = std::strtoul(env_core_offset, &endptr, 10);
459 
460         size_t last_core = core_offset + num_hosts * workers_per_host;
461         if (!endptr || *endptr != 0 ||
462             last_core > std::thread::hardware_concurrency())
463         {
464             std::cerr << "Thrill: environment variable"
465                       << " THRILL_CORE_OFFSET=" << env_core_offset
466                       << " is not a valid number of cores to skip for pinning."
467                       << std::endl;
468             return -1;
469         }
470     }
471 
472     // detect memory config
473 
474     MemoryConfig mem_config;
475     if (mem_config.setup_detect() < 0) return -1;
476     mem_config.print(workers_per_host);
477 
478     // okay, configuration is good.
479 
480     std::cerr << "Thrill: running locally with " << num_hosts
481               << " test hosts and " << workers_per_host << " workers per host"
482               << " in a local " << backend << " network." << std::endl;
483 
484     if (!Initialize()) return -1;
485 
486     RunLoopbackThreads<NetGroup>(
487         mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
488 
489     if (!Deinitialize()) return -1;
490 
491     return 0;
492 }
493 
494 #if THRILL_HAVE_NET_TCP
495 static inline
RunBackendTcp(const std::function<void (Context &)> & job_startpoint)496 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
497 
498     char* endptr;
499 
500     // select environment variables
501 
502     const char* str_rank = "THRILL_RANK";
503     const char* env_rank = getenv(str_rank);
504 
505     if (env_rank == nullptr) {
506         // take SLURM_PROCID if THRILL_RANK is not set
507         str_rank = "SLURM_PROCID";
508         env_rank = getenv(str_rank);
509     }
510 
511     const char* env_hostlist = getenv("THRILL_HOSTLIST");
512 
513     // parse environment variables
514 
515     size_t my_host_rank = 0;
516 
517     if (env_rank != nullptr && *env_rank != 0) {
518         my_host_rank = std::strtoul(env_rank, &endptr, 10);
519 
520         if (endptr == nullptr || *endptr != 0) {
521             std::cerr << "Thrill: environment variable "
522                       << str_rank << '=' << env_rank
523                       << " is not a valid number."
524                       << std::endl;
525             return -1;
526         }
527     }
528     else {
529         std::cerr << "Thrill: environment variable THRILL_RANK"
530                   << " is required for tcp network backend."
531                   << std::endl;
532         return -1;
533     }
534 
535     std::vector<std::string> hostlist;
536 
537     if (env_hostlist != nullptr && *env_hostlist != 0) {
538         // first try to split by spaces, then by commas
539         std::vector<std::string> list = tlx::split(' ', env_hostlist);
540 
541         if (list.size() == 1) {
542             tlx::split(&list, ',', env_hostlist);
543         }
544 
545         for (const std::string& host : list) {
546             // skip empty splits
547             if (host.empty()) continue;
548 
549             if (host.find(':') == std::string::npos) {
550                 std::cerr << "Thrill: invalid address \"" << host << "\""
551                           << "in THRILL_HOSTLIST. It must contain a port number."
552                           << std::endl;
553                 return -1;
554             }
555 
556             hostlist.push_back(host);
557         }
558 
559         if (my_host_rank >= hostlist.size()) {
560             std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
561                       << "does not include my host_rank (" << my_host_rank << ")"
562                       << std::endl;
563             return -1;
564         }
565     }
566     else {
567         std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
568                   << " is required for tcp network backend."
569                   << std::endl;
570         return -1;
571     }
572 
573     // determine number of local worker threads per process
574 
575     const char* str_workers_per_host;
576     const char* env_workers_per_host;
577 
578     size_t workers_per_host = FindWorkersPerHost(
579         str_workers_per_host, env_workers_per_host);
580 
581     if (workers_per_host == 0)
582         return -1;
583 
584     // detect memory config
585 
586     MemoryConfig mem_config;
587     if (mem_config.setup_detect() < 0) return -1;
588     mem_config.print(workers_per_host);
589 
590     // okay, configuration is good.
591 
592     std::cerr << "Thrill: running in tcp network with " << hostlist.size()
593               << " hosts and " << workers_per_host << " workers per host"
594               << " with " << common::GetHostname()
595               << " as rank " << my_host_rank << " and endpoints";
596     for (const std::string& ep : hostlist)
597         std::cerr << ' ' << ep;
598     std::cerr << std::endl;
599 
600     if (!Initialize()) return -1;
601 
602     static constexpr size_t kGroupCount = net::Manager::kGroupCount;
603 
604     // construct three TCP network groups
605     auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
606 
607     std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
608     net::tcp::Construct(
609         *select_dispatcher, my_host_rank, hostlist,
610         groups.data(), net::Manager::kGroupCount);
611 
612     std::array<net::GroupPtr, kGroupCount> host_groups = {
613         { std::move(groups[0]), std::move(groups[1]) }
614     };
615 
616     // construct HostContext
617 
618     auto dispatcher = std::make_unique<net::DispatcherThread>(
619         std::move(select_dispatcher), my_host_rank);
620 
621     HostContext host_context(
622         0, mem_config,
623         std::move(dispatcher), std::move(host_groups), workers_per_host);
624 
625     std::vector<std::thread> threads(workers_per_host);
626 
627     for (size_t worker = 0; worker < workers_per_host; worker++) {
628         threads[worker] = common::CreateThread(
629             [&host_context, &job_startpoint, worker] {
630                 Context ctx(host_context, worker);
631                 common::NameThisThread("worker " + std::to_string(worker));
632 
633                 ctx.Launch(job_startpoint);
634             });
635         common::SetCpuAffinity(threads[worker], worker);
636     }
637 
638     // join worker threads
639     for (size_t i = 0; i < workers_per_host; i++) {
640         threads[i].join();
641     }
642 
643     if (!Deinitialize()) return -1;
644 
645     return 0;
646 }
647 #endif
648 
649 #if THRILL_HAVE_NET_MPI
650 static inline
RunBackendMpi(const std::function<void (Context &)> & job_startpoint)651 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
652 
653     // determine number of local worker threads per MPI process
654 
655     const char* str_workers_per_host;
656     const char* env_workers_per_host;
657 
658     size_t workers_per_host = FindWorkersPerHost(
659         str_workers_per_host, env_workers_per_host);
660 
661     if (workers_per_host == 0)
662         return -1;
663 
664     // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
665 
666     if (workers_per_host == 1) {
667         std::cerr << "Thrill: environment variable"
668                   << ' ' << str_workers_per_host
669                   << '=' << env_workers_per_host
670                   << " is not recommended, as one thread is used exclusively"
671                   << " for MPI communication."
672                   << std::endl;
673     }
674     else {
675         --workers_per_host;
676     }
677 
678     // detect memory config
679 
680     MemoryConfig mem_config;
681     if (mem_config.setup_detect() < 0) return -1;
682     mem_config.print(workers_per_host);
683 
684     // okay, configuration is good.
685 
686     size_t num_hosts = net::mpi::NumMpiProcesses();
687     size_t mpi_rank = net::mpi::MpiRank();
688     std::string hostname = common::GetHostname();
689 
690     std::cerr << "Thrill: running in MPI network with " << num_hosts
691               << " hosts and " << workers_per_host << "+1 workers per host"
692               << " with " << hostname << " as rank " << mpi_rank << "."
693               << std::endl;
694 
695     if (!Initialize()) return -1;
696 
697     static constexpr size_t kGroupCount = net::Manager::kGroupCount;
698 
699     // construct three MPI network groups
700     auto dispatcher = std::make_unique<net::DispatcherThread>(
701         std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
702 
703     std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
704     net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
705 
706     std::array<net::GroupPtr, kGroupCount> host_groups = {
707         { std::move(groups[0]), std::move(groups[1]) }
708     };
709 
710     // construct HostContext
711     HostContext host_context(
712         0, mem_config,
713         std::move(dispatcher), std::move(host_groups), workers_per_host);
714 
715     // launch worker threads
716     std::vector<std::thread> threads(workers_per_host);
717 
718     for (size_t worker = 0; worker < workers_per_host; worker++) {
719         threads[worker] = common::CreateThread(
720             [&host_context, &job_startpoint, worker] {
721                 Context ctx(host_context, worker);
722                 common::NameThisThread("host " + std::to_string(ctx.host_rank())
723                                        + " worker " + std::to_string(worker));
724 
725                 ctx.Launch(job_startpoint);
726             });
727         common::SetCpuAffinity(threads[worker], worker);
728     }
729 
730     // join worker threads
731     for (size_t i = 0; i < workers_per_host; i++) {
732         threads[i].join();
733     }
734 
735     if (!Deinitialize()) return -1;
736 
737     return 0;
738 }
739 #endif
740 
741 #if THRILL_HAVE_NET_IB
742 static inline
RunBackendIb(const std::function<void (Context &)> & job_startpoint)743 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
744 
745     // determine number of local worker threads per IB/MPI process
746 
747     const char* str_workers_per_host;
748     const char* env_workers_per_host;
749 
750     size_t workers_per_host = FindWorkersPerHost(
751         str_workers_per_host, env_workers_per_host);
752 
753     if (workers_per_host == 0)
754         return -1;
755 
756     // detect memory config
757 
758     MemoryConfig mem_config;
759     if (mem_config.setup_detect() < 0) return -1;
760     mem_config.print(workers_per_host);
761 
762     // okay, configuration is good.
763 
764     size_t num_hosts = net::ib::NumMpiProcesses();
765     size_t mpi_rank = net::ib::MpiRank();
766 
767     std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
768               << " hosts and " << workers_per_host << " workers per host"
769               << " with " << common::GetHostname()
770               << " as rank " << mpi_rank << "."
771               << std::endl;
772 
773     if (!Initialize()) return -1;
774 
775     static constexpr size_t kGroupCount = net::Manager::kGroupCount;
776 
777     // construct two MPI network groups
778     std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
779     net::ib::Construct(num_hosts, groups.data(), kGroupCount);
780 
781     std::array<net::GroupPtr, kGroupCount> host_groups = {
782         { std::move(groups[0]), std::move(groups[1]) }
783     };
784 
785     // construct HostContext
786     HostContext host_context(
787         0, mem_config, std::move(host_groups), workers_per_host);
788 
789     // launch worker threads
790     std::vector<std::thread> threads(workers_per_host);
791 
792     for (size_t worker = 0; worker < workers_per_host; worker++) {
793         threads[worker] = common::CreateThread(
794             [&host_context, &job_startpoint, worker] {
795                 Context ctx(host_context, worker);
796                 common::NameThisThread("host " + std::to_string(ctx.host_rank())
797                                        + " worker " + std::to_string(worker));
798 
799                 ctx.Launch(job_startpoint);
800             });
801         common::SetCpuAffinity(threads[worker], worker);
802     }
803 
804     // join worker threads
805     for (size_t i = 0; i < workers_per_host; i++) {
806         threads[i].join();
807     }
808 
809     if (!Deinitialize()) return -1;
810 
811     return 0;
812 }
813 #endif
814 
RunNotSupported(const char * env_net)815 int RunNotSupported(const char* env_net) {
816     std::cerr << "Thrill: network backend " << env_net
817               << " is not supported by this binary." << std::endl;
818     return -1;
819 }
820 
821 static inline
DetectNetBackend()822 const char * DetectNetBackend() {
823     // detect openmpi and intel mpi run, add others as well.
824     if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
825         getenv("I_MPI_INFO_NP") != nullptr) {
826 #if THRILL_HAVE_NET_IB
827         return "ib";
828 #elif THRILL_HAVE_NET_MPI
829         return "mpi";
830 #else
831         std::cerr << "Thrill: MPI environment detected, but network backend mpi"
832                   << " is not supported by this binary." << std::endl;
833         return nullptr;
834 #endif
835     }
836 #if defined(_MSC_VER)
837     return "mock";
838 #else
839     const char* env_rank = getenv("THRILL_RANK");
840     const char* env_hostlist = getenv("THRILL_HOSTLIST");
841 
842     if (env_rank != nullptr || env_hostlist != nullptr)
843         return "tcp";
844     else
845         return "local";
846 #endif
847 }
848 
RunCheckDieWithParent()849 int RunCheckDieWithParent() {
850 
851     const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
852     if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
853 
854     char* endptr;
855 
856     long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
857     if (endptr == nullptr || *endptr != 0 ||
858         (die_with_parent != 0 && die_with_parent != 1)) {
859         std::cerr << "Thrill: environment variable"
860                   << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
861                   << " is not either 0 or 1."
862                   << std::endl;
863         return -1;
864     }
865 
866     if (die_with_parent == 0) return 0;
867 
868 #if __linux__
869     if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
870         throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
871     return 1;
872 #else
873     std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
874               << "Please submit a patch."
875               << std::endl;
876     return 0;
877 #endif
878 }
879 
RunCheckUnlinkBinary()880 int RunCheckUnlinkBinary() {
881 
882     const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
883     if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
884 
885     if (unlink(env_unlink_binary) != 0) {
886         throw common::ErrnoException(
887                   "Error calling unlink binary \""
888                   + std::string(env_unlink_binary) + "\"");
889     }
890 
891     return 0;
892 }
893 
894 /*----------------------------------------------------------------------------*/
895 // Customized FOXXLL Disk Config
896 
897 //! config class to override foxxll's default config
898 class FoxxllConfig : public foxxll::config
899 {
900 public:
901     //! override load_default_config()
902     void load_default_config() override;
903 
904     //! Returns default path of disk.
905     std::string default_disk_path() override;
906 
907     //! returns the name of the default config file prefix
908     std::string default_config_file_name() override;
909 };
910 
load_default_config()911 void FoxxllConfig::load_default_config() {
912     TLX_LOG1 << "foxxll: Using default disk configuration.";
913     foxxll::disk_config entry1(
914         default_disk_path(), 1000 * 1024 * 1024, default_disk_io_impl());
915     entry1.unlink_on_open = true;
916     entry1.autogrow = true;
917     add_disk(entry1);
918 }
919 
default_disk_path()920 std::string FoxxllConfig::default_disk_path() {
921 #if !FOXXLL_WINDOWS
922     int pid = getpid();
923     return "/var/tmp/thrill." + common::to_str(pid) + ".tmp";
924 #else
925     DWORD pid = GetCurrentProcessId();
926     char* tmpstr = new char[255];
927     if (GetTempPathA(255, tmpstr) == 0)
928         FOXXLL_THROW_WIN_LASTERROR(resource_error, "GetTempPathA()");
929     std::string result = tmpstr;
930     result += "thrill." + common::to_str(pid) + ".tmp";
931     delete[] tmpstr;
932     return result;
933 #endif
934 }
935 
default_config_file_name()936 std::string FoxxllConfig::default_config_file_name() {
937     return ".thrill";
938 }
939 
RunSetupFoxxll()940 void RunSetupFoxxll() {
941     // install derived config instance in foxxll's singleton
942     foxxll::config::create_instance<FoxxllConfig>();
943 }
944 
945 /*----------------------------------------------------------------------------*/
946 
Run(const std::function<void (Context &)> & job_startpoint)947 int Run(const std::function<void(Context&)>& job_startpoint) {
948 
949     common::NameThisThread("main");
950 
951     if (RunCheckDieWithParent() < 0)
952         return -1;
953 
954     if (RunCheckUnlinkBinary() < 0)
955         return -1;
956 
957     RunSetupFoxxll();
958 
959     // parse environment: THRILL_NET
960     const char* env_net = getenv("THRILL_NET");
961 
962     // if no backend configured: automatically select one.
963     if (env_net == nullptr || *env_net == 0) {
964         env_net = DetectNetBackend();
965         if (env_net == nullptr) return -1;
966     }
967 
968     // run with selected backend
969     if (strcmp(env_net, "mock") == 0) {
970         // mock network backend
971         return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
972     }
973 
974     if (strcmp(env_net, "local") == 0) {
975 #if THRILL_HAVE_NET_TCP
976         // tcp loopback network backend
977         return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
978 #else
979         return RunNotSupported(env_net);
980 #endif
981     }
982 
983     if (strcmp(env_net, "tcp") == 0) {
984 #if THRILL_HAVE_NET_TCP
985         // real tcp network backend
986         return RunBackendTcp(job_startpoint);
987 #else
988         return RunNotSupported(env_net);
989 #endif
990     }
991 
992     if (strcmp(env_net, "mpi") == 0) {
993 #if THRILL_HAVE_NET_MPI
994         // mpi network backend
995         return RunBackendMpi(job_startpoint);
996 #else
997         return RunNotSupported(env_net);
998 #endif
999     }
1000 
1001     if (strcmp(env_net, "ib") == 0) {
1002 #if THRILL_HAVE_NET_IB
1003         // ib/mpi network backend
1004         return RunBackendIb(job_startpoint);
1005 #else
1006         return RunNotSupported(env_net);
1007 #endif
1008     }
1009 
1010     std::cerr << "Thrill: network backend " << env_net << " is unknown."
1011               << std::endl;
1012     return -1;
1013 }
1014 
1015 /******************************************************************************/
1016 // MemoryConfig
1017 
setup(size_t ram)1018 void MemoryConfig::setup(size_t ram) {
1019     ram_ = ram;
1020     apply();
1021 }
1022 
setup_detect()1023 int MemoryConfig::setup_detect() {
1024 
1025     // determine amount of physical RAM or take user's limit
1026 
1027     const char* env_ram = getenv("THRILL_RAM");
1028 
1029     if (env_ram != nullptr && *env_ram != 0) {
1030         uint64_t ram64;
1031         if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
1032             std::cerr << "Thrill: environment variable"
1033                       << " THRILL_RAM=" << env_ram
1034                       << " is not a valid amount of RAM memory."
1035                       << std::endl;
1036             return -1;
1037         }
1038         ram_ = static_cast<size_t>(ram64);
1039     }
1040     else {
1041         // detect amount of physical memory on system
1042 #if defined(_MSC_VER)
1043         MEMORYSTATUSEX memstx;
1044         memstx.dwLength = sizeof(memstx);
1045         GlobalMemoryStatusEx(&memstx);
1046 
1047         ram_ = memstx.ullTotalPhys;
1048 #elif __APPLE__
1049         int mib[2];
1050         int64_t physical_memory;
1051         size_t length;
1052 
1053         // Get the physical memory size
1054         mib[0] = CTL_HW;
1055         mib[1] = HW_MEMSIZE;
1056         length = sizeof(physical_memory);
1057         sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
1058         ram_ = static_cast<size_t>(physical_memory);
1059 #else
1060         ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
1061 #endif
1062 
1063 #if __linux__
1064         // use getrlimit() to check user limit on address space
1065         struct rlimit rl; // NOLINT
1066         if (getrlimit(RLIMIT_AS, &rl) == 0) {
1067             if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1068                 ram_ = rl.rlim_cur * 3 / 4;
1069             }
1070         }
1071         else {
1072             sLOG1 << "getrlimit(): " << strerror(errno);
1073         }
1074 #endif
1075     }
1076 
1077     apply();
1078 
1079     return 0;
1080 }
1081 
apply()1082 void MemoryConfig::apply() {
1083     // divide up ram_
1084 
1085     ram_workers_ = ram_ / 3;
1086     ram_block_pool_hard_ = ram_ / 3;
1087     ram_block_pool_soft_ = ram_block_pool_hard_ * 9 / 10;
1088     ram_floating_ = ram_ - ram_block_pool_hard_ - ram_workers_;
1089 
1090     // set memory limit, only BlockPool is excluded from malloc tracking, as
1091     // only it uses bypassing allocators.
1092     mem::set_memory_limit_indication(ram_floating_ + ram_workers_);
1093 }
1094 
divide(size_t hosts) const1095 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1096 
1097     MemoryConfig mc = *this;
1098     mc.ram_ /= hosts;
1099     mc.ram_block_pool_hard_ /= hosts;
1100     mc.ram_block_pool_soft_ /= hosts;
1101     mc.ram_workers_ /= hosts;
1102     // free floating memory is not divided by host, as it is measured overall
1103 
1104     return mc;
1105 }
1106 
print(size_t workers_per_host) const1107 void MemoryConfig::print(size_t workers_per_host) const {
1108     if (!verbose_) return;
1109 
1110     std::cerr
1111         << "Thrill: using "
1112         << tlx::format_iec_units(ram_) << "B RAM total,"
1113         << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1114         << " workers="
1115         << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1116         << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1117         << std::endl;
1118 }
1119 
1120 /******************************************************************************/
1121 // HostContext methods
1122 
HostContext(size_t local_host_id,const MemoryConfig & mem_config,std::unique_ptr<net::DispatcherThread> dispatcher,std::array<net::GroupPtr,net::Manager::kGroupCount> && groups,size_t workers_per_host)1123 HostContext::HostContext(
1124     size_t local_host_id,
1125     const MemoryConfig& mem_config,
1126     std::unique_ptr<net::DispatcherThread> dispatcher,
1127     std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1128     size_t workers_per_host)
1129     : mem_config_(mem_config),
1130       base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1131       logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1132       profiler_(std::make_unique<common::ProfileThread>()),
1133       local_host_id_(local_host_id),
1134       workers_per_host_(workers_per_host),
1135       dispatcher_(std::move(dispatcher)),
1136       net_manager_(std::move(groups), logger_) {
1137 
1138     // write command line parameters to json log
1139     common::LogCmdlineParams(logger_);
1140 
1141     if (mem_config_.enable_proc_profiler_)
1142         StartLinuxProcStatsProfiler(*profiler_, logger_);
1143 
1144     // run memory profiler only on local host 0 (especially for test runs)
1145     if (local_host_id == 0)
1146         mem::StartMemProfiler(*profiler_, logger_);
1147 }
1148 
~HostContext()1149 HostContext::~HostContext() {
1150     // stop dispatcher _before_ stopping multiplexer
1151     dispatcher_->Terminate();
1152 }
1153 
MakeHostLogPath(size_t host_rank)1154 std::string HostContext::MakeHostLogPath(size_t host_rank) {
1155     const char* env_log = getenv("THRILL_LOG");
1156     if (env_log == nullptr) {
1157         if (host_rank == 0 && mem_config().verbose_) {
1158             std::cerr << "Thrill: no THRILL_LOG was found, "
1159                       << "so no json log is written."
1160                       << std::endl;
1161         }
1162         return std::string();
1163     }
1164 
1165     std::string output = env_log;
1166     if (output == "" || output == "-")
1167         return std::string();
1168     if (output == "/dev/stdout")
1169         return output;
1170     if (output == "stdout")
1171         return "/dev/stdout";
1172 
1173     return output + "-host-" + std::to_string(host_rank) + ".json";
1174 }
1175 
1176 /******************************************************************************/
1177 // Context methods
1178 
Context(HostContext & host_context,size_t local_worker_id)1179 Context::Context(HostContext& host_context, size_t local_worker_id)
1180     : local_host_id_(host_context.local_host_id()),
1181       local_worker_id_(local_worker_id),
1182       workers_per_host_(host_context.workers_per_host()),
1183       mem_limit_(host_context.worker_mem_limit()),
1184       mem_config_(host_context.mem_config()),
1185       mem_manager_(host_context.mem_manager()),
1186       net_manager_(host_context.net_manager()),
1187       flow_manager_(host_context.flow_manager()),
1188       block_pool_(host_context.block_pool()),
1189       multiplexer_(host_context.data_multiplexer()),
1190       rng_(std::random_device { }
1191            () + (local_worker_id_ << 16)),
1192       base_logger_(&host_context.base_logger_) {
1193     assert(local_worker_id < workers_per_host());
1194 }
1195 
GetFile(DIABase * dia)1196 data::File Context::GetFile(DIABase* dia) {
1197     return GetFile(dia != nullptr ? dia->dia_id() : 0);
1198 }
1199 
GetFilePtr(size_t dia_id)1200 data::FilePtr Context::GetFilePtr(size_t dia_id) {
1201     return tlx::make_counting<data::File>(
1202         block_pool_, local_worker_id_, dia_id);
1203 }
1204 
GetFilePtr(DIABase * dia)1205 data::FilePtr Context::GetFilePtr(DIABase* dia) {
1206     return GetFilePtr(dia != nullptr ? dia->dia_id() : 0);
1207 }
1208 
GetNewCatStream(size_t dia_id)1209 data::CatStreamPtr Context::GetNewCatStream(size_t dia_id) {
1210     return multiplexer_.GetNewCatStream(local_worker_id_, dia_id);
1211 }
1212 
GetNewCatStream(DIABase * dia)1213 data::CatStreamPtr Context::GetNewCatStream(DIABase* dia) {
1214     return GetNewCatStream(dia != nullptr ? dia->dia_id() : 0);
1215 }
1216 
GetNewMixStream(size_t dia_id)1217 data::MixStreamPtr Context::GetNewMixStream(size_t dia_id) {
1218     return multiplexer_.GetNewMixStream(local_worker_id_, dia_id);
1219 }
1220 
GetNewMixStream(DIABase * dia)1221 data::MixStreamPtr Context::GetNewMixStream(DIABase* dia) {
1222     return GetNewMixStream(dia != nullptr ? dia->dia_id() : 0);
1223 }
1224 
1225 template <>
GetNewStream(size_t dia_id)1226 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1227     return GetNewCatStream(dia_id);
1228 }
1229 
1230 template <>
GetNewStream(size_t dia_id)1231 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1232     return GetNewMixStream(dia_id);
1233 }
1234 
1235 struct OverallStats {
1236 
1237     //! overall run time
1238     double runtime;
1239 
1240     //! maximum ByteBlock allocation on all workers
1241     size_t max_block_bytes;
1242 
1243     //! network traffic performed by net layer
1244     size_t net_traffic_tx, net_traffic_rx;
1245 
1246     //! I/O volume performed by io layer
1247     size_t io_volume;
1248 
1249     //! maximum external memory allocation
1250     size_t io_max_allocation;
1251 
operator <<(std::ostream & os,const OverallStats & c)1252     friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1253         return os << "[OverallStats"
1254                   << " runtime=" << c.runtime
1255                   << " max_block_bytes=" << c.max_block_bytes
1256                   << " net_traffic_tx=" << c.net_traffic_tx
1257                   << " net_traffic_rx=" << c.net_traffic_rx
1258                   << " io_volume=" << c.io_volume
1259                   << " io_max_allocation=" << c.io_max_allocation
1260                   << "]";
1261     }
1262 
operator +thrill::api::OverallStats1263     OverallStats operator + (const OverallStats& b) const {
1264         OverallStats r;
1265         r.runtime = std::max(runtime, b.runtime);
1266         r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1267         r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1268         r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1269         r.io_volume = io_volume + b.io_volume;
1270         r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1271         return r;
1272     }
1273 };
1274 
Launch(const std::function<void (Context &)> & job_startpoint)1275 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1276     logger_ << "class" << "Context"
1277             << "event" << "job-start";
1278 
1279     common::StatsTimerStart overall_timer;
1280 
1281     try {
1282         job_startpoint(*this);
1283     }
1284     catch (std::exception& e) {
1285         LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1286         LOG1 << "  what(): " << e.what();
1287 
1288         logger_ << "class" << "Context"
1289                 << "event" << "job-exception"
1290                 << "exception" << typeid(e).name()
1291                 << "what" << e.what();
1292         throw;
1293     }
1294 
1295     logger_ << "class" << "Context"
1296             << "event" << "job-done"
1297             << "elapsed" << overall_timer;
1298 
1299     overall_timer.Stop();
1300 
1301     // collect overall statistics
1302     OverallStats stats;
1303     stats.runtime = overall_timer.SecondsDouble();
1304 
1305     stats.max_block_bytes =
1306         local_worker_id_ == 0 ? block_pool().max_total_bytes() : 0;
1307 
1308     stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1309     stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1310 
1311     if (local_host_id_ == 0 && local_worker_id_ == 0) {
1312         foxxll::stats_data io_stats(*foxxll::stats::get_instance());
1313         stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1314         stats.io_max_allocation =
1315             foxxll::block_manager::get_instance()->maximum_allocation();
1316     }
1317     else {
1318         stats.io_volume = 0;
1319         stats.io_max_allocation = 0;
1320     }
1321 
1322     LOG0 << stats;
1323 
1324     stats = net.Reduce(stats);
1325 
1326     if (my_rank() == 0) {
1327         using tlx::format_iec_units;
1328 
1329         if (stats.net_traffic_rx != stats.net_traffic_tx)
1330             LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1331                  << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1332 
1333         if (mem_config().verbose_) {
1334             std::cerr
1335                 << "Thrill:"
1336                 << " ran " << stats.runtime << "s with max "
1337                 << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1338                 << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1339                 << format_iec_units(stats.io_volume) << "B disk I/O, and "
1340                 << format_iec_units(stats.io_max_allocation) << "B max disk use."
1341                 << std::endl;
1342         }
1343 
1344         logger_ << "class" << "Context"
1345                 << "event" << "summary"
1346                 << "runtime" << stats.runtime
1347                 << "net_traffic" << stats.net_traffic_tx
1348                 << "io_volume" << stats.io_volume
1349                 << "io_max_allocation" << stats.io_max_allocation;
1350     }
1351 }
1352 
1353 } // namespace api
1354 } // namespace thrill
1355 
1356 /******************************************************************************/
1357