1 /*******************************************************************************
2 * thrill/api/context.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
7 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
8 *
9 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10 ******************************************************************************/
11
12 #include <thrill/api/context.hpp>
13
14 #include <thrill/api/dia_base.hpp>
15 #include <thrill/common/linux_proc_stats.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/math.hpp>
18 #include <thrill/common/porting.hpp>
19 #include <thrill/common/profile_thread.hpp>
20 #include <thrill/common/string.hpp>
21 #include <thrill/common/system_exception.hpp>
22 #include <thrill/vfs/file_io.hpp>
23
24 #include <foxxll/io/iostats.hpp>
25 #include <foxxll/mng/config.hpp>
26 #include <tlx/math/abs_diff.hpp>
27 #include <tlx/port/setenv.hpp>
28 #include <tlx/string/format_si_iec_units.hpp>
29 #include <tlx/string/parse_si_iec_units.hpp>
30 #include <tlx/string/split.hpp>
31
32 // mock net backend is always available -tb :)
33 #include <thrill/net/mock/group.hpp>
34
35 #if THRILL_HAVE_NET_TCP
36 #include <thrill/net/tcp/construct.hpp>
37 #include <thrill/net/tcp/select_dispatcher.hpp>
38 #endif
39
40 #if THRILL_HAVE_NET_MPI
41 #include <thrill/net/mpi/dispatcher.hpp>
42 #include <thrill/net/mpi/group.hpp>
43 #endif
44
45 #if THRILL_HAVE_NET_IB
46 #include <thrill/net/ib/group.hpp>
47 #endif
48
49 #if __linux__
50
51 // linux-specific process control
52 #include <sys/prctl.h>
53
54 // for calling getrlimit() to determine memory limit
55 #include <sys/resource.h>
56 #include <sys/time.h>
57
58 #endif
59
60 #if __APPLE__
61
62 // for sysctl()
63 #include <sys/sysctl.h>
64 #include <sys/types.h>
65
66 #endif
67
68 #if defined(_MSC_VER)
69
70 // for detecting amount of physical memory
71 #include <windows.h>
72
73 #endif
74
75 #include <algorithm>
76 #include <csignal>
77 #include <iostream>
78 #include <memory>
79 #include <string>
80 #include <thread>
81 #include <tuple>
82 #include <utility>
83 #include <vector>
84
85 namespace thrill {
86 namespace api {
87
88 /******************************************************************************/
89 // Generic Network Construction
90
91 //! Generic network constructor for net backends supporting loopback tests.
92 template <typename NetGroup>
93 static inline
94 std::vector<std::unique_ptr<HostContext> >
ConstructLoopbackHostContexts(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host)95 ConstructLoopbackHostContexts(
96 const MemoryConfig& mem_config,
97 size_t num_hosts, size_t workers_per_host) {
98
99 static constexpr size_t kGroupCount = net::Manager::kGroupCount;
100
101 // construct three full mesh loopback cliques, deliver net::Groups.
102 std::array<std::vector<std::unique_ptr<NetGroup> >, kGroupCount> group;
103
104 for (size_t g = 0; g < kGroupCount; ++g) {
105 group[g] = NetGroup::ConstructLoopbackMesh(num_hosts);
106 }
107
108 std::vector<std::unique_ptr<net::DispatcherThread> > dispatcher;
109 for (size_t h = 0; h < num_hosts; ++h) {
110 dispatcher.emplace_back(
111 std::make_unique<net::DispatcherThread>(
112 std::make_unique<typename NetGroup::Dispatcher>(), h));
113 }
114
115 // construct host context
116
117 std::vector<std::unique_ptr<HostContext> > host_context;
118
119 for (size_t h = 0; h < num_hosts; h++) {
120 std::array<net::GroupPtr, kGroupCount> host_group = {
121 { std::move(group[0][h]), std::move(group[1][h]) }
122 };
123
124 host_context.emplace_back(
125 std::make_unique<HostContext>(
126 h, mem_config, std::move(dispatcher[h]),
127 std::move(host_group), workers_per_host));
128 }
129
130 return host_context;
131 }
132
133 //! Generic runner for backends supporting loopback tests.
134 template <typename NetGroup>
135 static inline void
RunLoopbackThreads(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host,size_t core_offset,const std::function<void (Context &)> & job_startpoint)136 RunLoopbackThreads(
137 const MemoryConfig& mem_config,
138 size_t num_hosts, size_t workers_per_host, size_t core_offset,
139 const std::function<void(Context&)>& job_startpoint) {
140
141 MemoryConfig host_mem_config = mem_config.divide(num_hosts);
142 mem_config.print(workers_per_host);
143
144 // construct a mock network of hosts
145 typename NetGroup::Dispatcher dispatcher;
146
147 std::vector<std::unique_ptr<HostContext> > host_contexts =
148 ConstructLoopbackHostContexts<NetGroup>(
149 host_mem_config, num_hosts, workers_per_host);
150
151 // launch thread for each of the workers on this host.
152 std::vector<std::thread> threads(num_hosts * workers_per_host);
153
154 for (size_t host = 0; host < num_hosts; ++host) {
155 std::string log_prefix = "host " + std::to_string(host);
156 for (size_t worker = 0; worker < workers_per_host; ++worker) {
157 size_t id = host * workers_per_host + worker;
158 threads[id] = common::CreateThread(
159 [&host_contexts, &job_startpoint, host, worker, log_prefix] {
160 Context ctx(*host_contexts[host], worker);
161 common::NameThisThread(
162 log_prefix + " worker " + std::to_string(worker));
163
164 ctx.Launch(job_startpoint);
165 });
166 common::SetCpuAffinity(threads[id], core_offset + id);
167 }
168 }
169
170 // join worker threads
171 for (size_t i = 0; i < num_hosts * workers_per_host; i++) {
172 threads[i].join();
173 }
174 }
175
176 /******************************************************************************/
177 // Other Configuration Initializations
178
SetupBlockSize()179 static inline bool SetupBlockSize() {
180
181 const char* env_block_size = getenv("THRILL_BLOCK_SIZE");
182 if (env_block_size == nullptr || *env_block_size == 0) return true;
183
184 char* endptr;
185 data::default_block_size = std::strtoul(env_block_size, &endptr, 10);
186
187 if (endptr == nullptr || *endptr != 0 || data::default_block_size == 0) {
188 std::cerr << "Thrill: environment variable"
189 << " THRILL_BLOCK_SIZE=" << env_block_size
190 << " is not a valid number."
191 << std::endl;
192 return false;
193 }
194
195 data::start_block_size = data::default_block_size;
196
197 std::cerr << "Thrill: setting default_block_size = "
198 << data::default_block_size
199 << std::endl;
200
201 return true;
202 }
203
FindWorkersPerHost(const char * & str_workers_per_host,const char * & env_workers_per_host)204 static inline size_t FindWorkersPerHost(
205 const char*& str_workers_per_host, const char*& env_workers_per_host) {
206
207 char* endptr;
208
209 // first check THRILL_WORKERS_PER_HOST
210
211 str_workers_per_host = "THRILL_WORKERS_PER_HOST";
212 env_workers_per_host = getenv(str_workers_per_host);
213
214 if (env_workers_per_host && *env_workers_per_host) {
215 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
216 if (!endptr || *endptr != 0 || result == 0) {
217 std::cerr << "Thrill: environment variable"
218 << ' ' << str_workers_per_host
219 << '=' << env_workers_per_host
220 << " is not a valid number of workers per host."
221 << std::endl;
222 return 0;
223 }
224 else {
225 return result;
226 }
227 }
228
229 // second check: look for OMP_NUM_THREADS
230
231 str_workers_per_host = "OMP_NUM_THREADS";
232 env_workers_per_host = getenv(str_workers_per_host);
233
234 if (env_workers_per_host && *env_workers_per_host) {
235 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
236 if (!endptr || *endptr != 0 || result == 0) {
237 std::cerr << "Thrill: environment variable"
238 << ' ' << str_workers_per_host
239 << '=' << env_workers_per_host
240 << " is not a valid number of workers per host."
241 << std::endl;
242 // fall through, try next variable
243 }
244 else {
245 return result;
246 }
247 }
248
249 // third check: look for SLURM_CPUS_ON_NODE
250
251 str_workers_per_host = "SLURM_CPUS_ON_NODE";
252 env_workers_per_host = getenv(str_workers_per_host);
253
254 if (env_workers_per_host && *env_workers_per_host) {
255 size_t result = std::strtoul(env_workers_per_host, &endptr, 10);
256 if (!endptr || *endptr != 0 || result == 0) {
257 std::cerr << "Thrill: environment variable"
258 << ' ' << str_workers_per_host
259 << '=' << env_workers_per_host
260 << " is not a valid number of workers per host."
261 << std::endl;
262 // fall through, try next variable
263 }
264 else {
265 return result;
266 }
267 }
268
269 // last check: return std::thread::hardware_concurrency()
270
271 return std::thread::hardware_concurrency();
272 }
273
Initialize()274 static inline bool Initialize() {
275
276 if (!SetupBlockSize()) return false;
277
278 vfs::Initialize();
279
280 return true;
281 }
282
Deinitialize()283 static inline bool Deinitialize() {
284
285 vfs::Deinitialize();
286
287 return true;
288 }
289
290 /******************************************************************************/
291 // Constructions using TestGroup (either mock or tcp-loopback) for local testing
292
293 #if defined(_MSC_VER)
294 using TestGroup = net::mock::Group;
295 #else
296 using TestGroup = net::tcp::Group;
297 #endif
298
RunLocalMock(const MemoryConfig & mem_config,size_t num_hosts,size_t workers_per_host,const std::function<void (Context &)> & job_startpoint)299 void RunLocalMock(const MemoryConfig& mem_config,
300 size_t num_hosts, size_t workers_per_host,
301 const std::function<void(Context&)>& job_startpoint) {
302
303 return RunLoopbackThreads<TestGroup>(
304 mem_config, num_hosts, workers_per_host, 0, job_startpoint);
305 }
306
307 std::vector<std::unique_ptr<HostContext> >
ConstructLoopback(size_t num_hosts,size_t workers_per_host)308 HostContext::ConstructLoopback(size_t num_hosts, size_t workers_per_host) {
309
310 // set fixed amount of RAM for testing
311 MemoryConfig mem_config;
312 mem_config.setup(4 * 1024 * 1024 * 1024llu);
313 mem_config.verbose_ = false;
314
315 return ConstructLoopbackHostContexts<TestGroup>(
316 mem_config, num_hosts, workers_per_host);
317 }
318
RunLocalTests(const std::function<void (Context &)> & job_startpoint)319 void RunLocalTests(const std::function<void(Context&)>& job_startpoint) {
320 // set fixed amount of RAM for testing
321 RunLocalTests(4 * 1024 * 1024 * 1024llu, job_startpoint);
322 }
323
RunLocalTests(size_t ram,const std::function<void (Context &)> & job_startpoint)324 void RunLocalTests(
325 size_t ram, const std::function<void(Context&)>& job_startpoint) {
326
327 // discard json log
328 tlx::setenv("THRILL_LOG", "", /* overwrite */ 1);
329
330 // set fixed amount of RAM for testing, disable /proc profiler
331 MemoryConfig mem_config;
332 mem_config.verbose_ = false;
333 mem_config.enable_proc_profiler_ = false;
334 mem_config.setup(ram);
335
336 static constexpr size_t num_hosts_list[] = { 1, 2, 5, 8 };
337 static constexpr size_t num_workers_list[] = { 1, 3 };
338
339 size_t max_mock_workers = 1000000;
340
341 const char* env_max_mock_workers = getenv("THRILL_MAX_MOCK_WORKERS");
342 if (env_max_mock_workers && *env_max_mock_workers) {
343 // parse envvar only if it exists.
344 char* endptr;
345 max_mock_workers = std::strtoul(env_max_mock_workers, &endptr, 10);
346
347 if (!endptr || *endptr != 0 || max_mock_workers == 0) {
348 std::cerr << "Thrill: environment variable"
349 << " THRILL_MAX_MOCK_WORKERS=" << env_max_mock_workers
350 << " is not a valid maximum number of mock hosts."
351 << std::endl;
352 return;
353 }
354 }
355
356 for (const size_t& num_hosts : num_hosts_list) {
357 for (const size_t& workers_per_host : num_workers_list) {
358 if (num_hosts * workers_per_host > max_mock_workers) {
359 std::cerr << "Thrill: skipping test with "
360 << num_hosts * workers_per_host
361 << " workers > max workers " << max_mock_workers
362 << std::endl;
363 continue;
364 }
365
366 LOG0 << "Thrill: running local test with "
367 << num_hosts << " hosts and " << workers_per_host
368 << " workers per host";
369
370 RunLocalMock(mem_config, num_hosts, workers_per_host,
371 job_startpoint);
372 }
373 }
374 }
375
RunLocalSameThread(const std::function<void (Context &)> & job_startpoint)376 void RunLocalSameThread(const std::function<void(Context&)>& job_startpoint) {
377
378 size_t my_host_rank = 0;
379 size_t workers_per_host = 1;
380 size_t num_hosts = 1;
381 static constexpr size_t kGroupCount = net::Manager::kGroupCount;
382
383 // set fixed amount of RAM for testing
384 MemoryConfig mem_config;
385 mem_config.verbose_ = false;
386 mem_config.setup(4 * 1024 * 1024 * 1024llu);
387 mem_config.print(workers_per_host);
388
389 // construct two full mesh connection cliques, deliver net::tcp::Groups.
390 std::array<std::vector<std::unique_ptr<TestGroup> >, kGroupCount> group;
391
392 for (size_t g = 0; g < kGroupCount; ++g) {
393 group[g] = TestGroup::ConstructLoopbackMesh(num_hosts);
394 }
395
396 std::array<net::GroupPtr, kGroupCount> host_group = {
397 { std::move(group[0][0]), std::move(group[1][0]) }
398 };
399
400 auto dispatcher = std::make_unique<net::DispatcherThread>(
401 std::make_unique<TestGroup::Dispatcher>(), my_host_rank);
402
403 HostContext host_context(
404 0, mem_config,
405 std::move(dispatcher), std::move(host_group), workers_per_host);
406
407 Context ctx(host_context, 0);
408 common::NameThisThread("worker " + std::to_string(my_host_rank));
409
410 job_startpoint(ctx);
411 }
412
413 /******************************************************************************/
414 // Run() Variants for Different Net Backends
415
416 //! Run() implementation which uses a loopback net backend ("mock" or "tcp").
417 template <typename NetGroup>
418 static inline
RunBackendLoopback(const char * backend,const std::function<void (Context &)> & job_startpoint)419 int RunBackendLoopback(
420 const char* backend, const std::function<void(Context&)>& job_startpoint) {
421
422 char* endptr;
423
424 // determine number of loopback hosts
425
426 size_t num_hosts = 2;
427
428 const char* env_local = getenv("THRILL_LOCAL");
429 if (env_local && *env_local) {
430 // parse envvar only if it exists.
431 num_hosts = std::strtoul(env_local, &endptr, 10);
432
433 if (!endptr || *endptr != 0 || num_hosts == 0) {
434 std::cerr << "Thrill: environment variable"
435 << " THRILL_LOCAL=" << env_local
436 << " is not a valid number of local loopback hosts."
437 << std::endl;
438 return -1;
439 }
440 }
441
442 // determine number of threads per loopback host
443
444 const char* str_workers_per_host;
445 const char* env_workers_per_host;
446
447 size_t workers_per_host = FindWorkersPerHost(
448 str_workers_per_host, env_workers_per_host);
449
450 if (workers_per_host == 0)
451 return -1;
452
453 // core offset for pinning
454
455 const char* env_core_offset = getenv("THRILL_CORE_OFFSET");
456 size_t core_offset = 0;
457 if (env_core_offset && *env_core_offset) {
458 core_offset = std::strtoul(env_core_offset, &endptr, 10);
459
460 size_t last_core = core_offset + num_hosts * workers_per_host;
461 if (!endptr || *endptr != 0 ||
462 last_core > std::thread::hardware_concurrency())
463 {
464 std::cerr << "Thrill: environment variable"
465 << " THRILL_CORE_OFFSET=" << env_core_offset
466 << " is not a valid number of cores to skip for pinning."
467 << std::endl;
468 return -1;
469 }
470 }
471
472 // detect memory config
473
474 MemoryConfig mem_config;
475 if (mem_config.setup_detect() < 0) return -1;
476 mem_config.print(workers_per_host);
477
478 // okay, configuration is good.
479
480 std::cerr << "Thrill: running locally with " << num_hosts
481 << " test hosts and " << workers_per_host << " workers per host"
482 << " in a local " << backend << " network." << std::endl;
483
484 if (!Initialize()) return -1;
485
486 RunLoopbackThreads<NetGroup>(
487 mem_config, num_hosts, workers_per_host, core_offset, job_startpoint);
488
489 if (!Deinitialize()) return -1;
490
491 return 0;
492 }
493
494 #if THRILL_HAVE_NET_TCP
495 static inline
RunBackendTcp(const std::function<void (Context &)> & job_startpoint)496 int RunBackendTcp(const std::function<void(Context&)>& job_startpoint) {
497
498 char* endptr;
499
500 // select environment variables
501
502 const char* str_rank = "THRILL_RANK";
503 const char* env_rank = getenv(str_rank);
504
505 if (env_rank == nullptr) {
506 // take SLURM_PROCID if THRILL_RANK is not set
507 str_rank = "SLURM_PROCID";
508 env_rank = getenv(str_rank);
509 }
510
511 const char* env_hostlist = getenv("THRILL_HOSTLIST");
512
513 // parse environment variables
514
515 size_t my_host_rank = 0;
516
517 if (env_rank != nullptr && *env_rank != 0) {
518 my_host_rank = std::strtoul(env_rank, &endptr, 10);
519
520 if (endptr == nullptr || *endptr != 0) {
521 std::cerr << "Thrill: environment variable "
522 << str_rank << '=' << env_rank
523 << " is not a valid number."
524 << std::endl;
525 return -1;
526 }
527 }
528 else {
529 std::cerr << "Thrill: environment variable THRILL_RANK"
530 << " is required for tcp network backend."
531 << std::endl;
532 return -1;
533 }
534
535 std::vector<std::string> hostlist;
536
537 if (env_hostlist != nullptr && *env_hostlist != 0) {
538 // first try to split by spaces, then by commas
539 std::vector<std::string> list = tlx::split(' ', env_hostlist);
540
541 if (list.size() == 1) {
542 tlx::split(&list, ',', env_hostlist);
543 }
544
545 for (const std::string& host : list) {
546 // skip empty splits
547 if (host.empty()) continue;
548
549 if (host.find(':') == std::string::npos) {
550 std::cerr << "Thrill: invalid address \"" << host << "\""
551 << "in THRILL_HOSTLIST. It must contain a port number."
552 << std::endl;
553 return -1;
554 }
555
556 hostlist.push_back(host);
557 }
558
559 if (my_host_rank >= hostlist.size()) {
560 std::cerr << "Thrill: endpoint list (" << list.size() << " entries) "
561 << "does not include my host_rank (" << my_host_rank << ")"
562 << std::endl;
563 return -1;
564 }
565 }
566 else {
567 std::cerr << "Thrill: environment variable THRILL_HOSTLIST"
568 << " is required for tcp network backend."
569 << std::endl;
570 return -1;
571 }
572
573 // determine number of local worker threads per process
574
575 const char* str_workers_per_host;
576 const char* env_workers_per_host;
577
578 size_t workers_per_host = FindWorkersPerHost(
579 str_workers_per_host, env_workers_per_host);
580
581 if (workers_per_host == 0)
582 return -1;
583
584 // detect memory config
585
586 MemoryConfig mem_config;
587 if (mem_config.setup_detect() < 0) return -1;
588 mem_config.print(workers_per_host);
589
590 // okay, configuration is good.
591
592 std::cerr << "Thrill: running in tcp network with " << hostlist.size()
593 << " hosts and " << workers_per_host << " workers per host"
594 << " with " << common::GetHostname()
595 << " as rank " << my_host_rank << " and endpoints";
596 for (const std::string& ep : hostlist)
597 std::cerr << ' ' << ep;
598 std::cerr << std::endl;
599
600 if (!Initialize()) return -1;
601
602 static constexpr size_t kGroupCount = net::Manager::kGroupCount;
603
604 // construct three TCP network groups
605 auto select_dispatcher = std::make_unique<net::tcp::SelectDispatcher>();
606
607 std::array<std::unique_ptr<net::tcp::Group>, kGroupCount> groups;
608 net::tcp::Construct(
609 *select_dispatcher, my_host_rank, hostlist,
610 groups.data(), net::Manager::kGroupCount);
611
612 std::array<net::GroupPtr, kGroupCount> host_groups = {
613 { std::move(groups[0]), std::move(groups[1]) }
614 };
615
616 // construct HostContext
617
618 auto dispatcher = std::make_unique<net::DispatcherThread>(
619 std::move(select_dispatcher), my_host_rank);
620
621 HostContext host_context(
622 0, mem_config,
623 std::move(dispatcher), std::move(host_groups), workers_per_host);
624
625 std::vector<std::thread> threads(workers_per_host);
626
627 for (size_t worker = 0; worker < workers_per_host; worker++) {
628 threads[worker] = common::CreateThread(
629 [&host_context, &job_startpoint, worker] {
630 Context ctx(host_context, worker);
631 common::NameThisThread("worker " + std::to_string(worker));
632
633 ctx.Launch(job_startpoint);
634 });
635 common::SetCpuAffinity(threads[worker], worker);
636 }
637
638 // join worker threads
639 for (size_t i = 0; i < workers_per_host; i++) {
640 threads[i].join();
641 }
642
643 if (!Deinitialize()) return -1;
644
645 return 0;
646 }
647 #endif
648
649 #if THRILL_HAVE_NET_MPI
650 static inline
RunBackendMpi(const std::function<void (Context &)> & job_startpoint)651 int RunBackendMpi(const std::function<void(Context&)>& job_startpoint) {
652
653 // determine number of local worker threads per MPI process
654
655 const char* str_workers_per_host;
656 const char* env_workers_per_host;
657
658 size_t workers_per_host = FindWorkersPerHost(
659 str_workers_per_host, env_workers_per_host);
660
661 if (workers_per_host == 0)
662 return -1;
663
664 // reserve one thread for MPI net::Dispatcher which runs a busy-waiting loop
665
666 if (workers_per_host == 1) {
667 std::cerr << "Thrill: environment variable"
668 << ' ' << str_workers_per_host
669 << '=' << env_workers_per_host
670 << " is not recommended, as one thread is used exclusively"
671 << " for MPI communication."
672 << std::endl;
673 }
674 else {
675 --workers_per_host;
676 }
677
678 // detect memory config
679
680 MemoryConfig mem_config;
681 if (mem_config.setup_detect() < 0) return -1;
682 mem_config.print(workers_per_host);
683
684 // okay, configuration is good.
685
686 size_t num_hosts = net::mpi::NumMpiProcesses();
687 size_t mpi_rank = net::mpi::MpiRank();
688 std::string hostname = common::GetHostname();
689
690 std::cerr << "Thrill: running in MPI network with " << num_hosts
691 << " hosts and " << workers_per_host << "+1 workers per host"
692 << " with " << hostname << " as rank " << mpi_rank << "."
693 << std::endl;
694
695 if (!Initialize()) return -1;
696
697 static constexpr size_t kGroupCount = net::Manager::kGroupCount;
698
699 // construct three MPI network groups
700 auto dispatcher = std::make_unique<net::DispatcherThread>(
701 std::make_unique<net::mpi::Dispatcher>(num_hosts), mpi_rank);
702
703 std::array<std::unique_ptr<net::mpi::Group>, kGroupCount> groups;
704 net::mpi::Construct(num_hosts, *dispatcher, groups.data(), kGroupCount);
705
706 std::array<net::GroupPtr, kGroupCount> host_groups = {
707 { std::move(groups[0]), std::move(groups[1]) }
708 };
709
710 // construct HostContext
711 HostContext host_context(
712 0, mem_config,
713 std::move(dispatcher), std::move(host_groups), workers_per_host);
714
715 // launch worker threads
716 std::vector<std::thread> threads(workers_per_host);
717
718 for (size_t worker = 0; worker < workers_per_host; worker++) {
719 threads[worker] = common::CreateThread(
720 [&host_context, &job_startpoint, worker] {
721 Context ctx(host_context, worker);
722 common::NameThisThread("host " + std::to_string(ctx.host_rank())
723 + " worker " + std::to_string(worker));
724
725 ctx.Launch(job_startpoint);
726 });
727 common::SetCpuAffinity(threads[worker], worker);
728 }
729
730 // join worker threads
731 for (size_t i = 0; i < workers_per_host; i++) {
732 threads[i].join();
733 }
734
735 if (!Deinitialize()) return -1;
736
737 return 0;
738 }
739 #endif
740
741 #if THRILL_HAVE_NET_IB
742 static inline
RunBackendIb(const std::function<void (Context &)> & job_startpoint)743 int RunBackendIb(const std::function<void(Context&)>& job_startpoint) {
744
745 // determine number of local worker threads per IB/MPI process
746
747 const char* str_workers_per_host;
748 const char* env_workers_per_host;
749
750 size_t workers_per_host = FindWorkersPerHost(
751 str_workers_per_host, env_workers_per_host);
752
753 if (workers_per_host == 0)
754 return -1;
755
756 // detect memory config
757
758 MemoryConfig mem_config;
759 if (mem_config.setup_detect() < 0) return -1;
760 mem_config.print(workers_per_host);
761
762 // okay, configuration is good.
763
764 size_t num_hosts = net::ib::NumMpiProcesses();
765 size_t mpi_rank = net::ib::MpiRank();
766
767 std::cerr << "Thrill: running in IB/MPI network with " << num_hosts
768 << " hosts and " << workers_per_host << " workers per host"
769 << " with " << common::GetHostname()
770 << " as rank " << mpi_rank << "."
771 << std::endl;
772
773 if (!Initialize()) return -1;
774
775 static constexpr size_t kGroupCount = net::Manager::kGroupCount;
776
777 // construct two MPI network groups
778 std::array<std::unique_ptr<net::ib::Group>, kGroupCount> groups;
779 net::ib::Construct(num_hosts, groups.data(), kGroupCount);
780
781 std::array<net::GroupPtr, kGroupCount> host_groups = {
782 { std::move(groups[0]), std::move(groups[1]) }
783 };
784
785 // construct HostContext
786 HostContext host_context(
787 0, mem_config, std::move(host_groups), workers_per_host);
788
789 // launch worker threads
790 std::vector<std::thread> threads(workers_per_host);
791
792 for (size_t worker = 0; worker < workers_per_host; worker++) {
793 threads[worker] = common::CreateThread(
794 [&host_context, &job_startpoint, worker] {
795 Context ctx(host_context, worker);
796 common::NameThisThread("host " + std::to_string(ctx.host_rank())
797 + " worker " + std::to_string(worker));
798
799 ctx.Launch(job_startpoint);
800 });
801 common::SetCpuAffinity(threads[worker], worker);
802 }
803
804 // join worker threads
805 for (size_t i = 0; i < workers_per_host; i++) {
806 threads[i].join();
807 }
808
809 if (!Deinitialize()) return -1;
810
811 return 0;
812 }
813 #endif
814
RunNotSupported(const char * env_net)815 int RunNotSupported(const char* env_net) {
816 std::cerr << "Thrill: network backend " << env_net
817 << " is not supported by this binary." << std::endl;
818 return -1;
819 }
820
821 static inline
DetectNetBackend()822 const char * DetectNetBackend() {
823 // detect openmpi and intel mpi run, add others as well.
824 if (getenv("OMPI_COMM_WORLD_SIZE") != nullptr ||
825 getenv("I_MPI_INFO_NP") != nullptr) {
826 #if THRILL_HAVE_NET_IB
827 return "ib";
828 #elif THRILL_HAVE_NET_MPI
829 return "mpi";
830 #else
831 std::cerr << "Thrill: MPI environment detected, but network backend mpi"
832 << " is not supported by this binary." << std::endl;
833 return nullptr;
834 #endif
835 }
836 #if defined(_MSC_VER)
837 return "mock";
838 #else
839 const char* env_rank = getenv("THRILL_RANK");
840 const char* env_hostlist = getenv("THRILL_HOSTLIST");
841
842 if (env_rank != nullptr || env_hostlist != nullptr)
843 return "tcp";
844 else
845 return "local";
846 #endif
847 }
848
RunCheckDieWithParent()849 int RunCheckDieWithParent() {
850
851 const char* env_die_with_parent = getenv("THRILL_DIE_WITH_PARENT");
852 if (env_die_with_parent == nullptr || *env_die_with_parent == 0) return 0;
853
854 char* endptr;
855
856 long die_with_parent = std::strtol(env_die_with_parent, &endptr, 10);
857 if (endptr == nullptr || *endptr != 0 ||
858 (die_with_parent != 0 && die_with_parent != 1)) {
859 std::cerr << "Thrill: environment variable"
860 << " THRILL_DIE_WITH_PARENT=" << env_die_with_parent
861 << " is not either 0 or 1."
862 << std::endl;
863 return -1;
864 }
865
866 if (die_with_parent == 0) return 0;
867
868 #if __linux__
869 if (prctl(PR_SET_PDEATHSIG, SIGTERM) != 0) // NOLINT
870 throw common::ErrnoException("Error calling prctl(PR_SET_PDEATHSIG)");
871 return 1;
872 #else
873 std::cerr << "Thrill: DIE_WITH_PARENT is not supported on this platform.\n"
874 << "Please submit a patch."
875 << std::endl;
876 return 0;
877 #endif
878 }
879
RunCheckUnlinkBinary()880 int RunCheckUnlinkBinary() {
881
882 const char* env_unlink_binary = getenv("THRILL_UNLINK_BINARY");
883 if (env_unlink_binary == nullptr || *env_unlink_binary == 0) return 0;
884
885 if (unlink(env_unlink_binary) != 0) {
886 throw common::ErrnoException(
887 "Error calling unlink binary \""
888 + std::string(env_unlink_binary) + "\"");
889 }
890
891 return 0;
892 }
893
894 /*----------------------------------------------------------------------------*/
895 // Customized FOXXLL Disk Config
896
897 //! config class to override foxxll's default config
898 class FoxxllConfig : public foxxll::config
899 {
900 public:
901 //! override load_default_config()
902 void load_default_config() override;
903
904 //! Returns default path of disk.
905 std::string default_disk_path() override;
906
907 //! returns the name of the default config file prefix
908 std::string default_config_file_name() override;
909 };
910
load_default_config()911 void FoxxllConfig::load_default_config() {
912 TLX_LOG1 << "foxxll: Using default disk configuration.";
913 foxxll::disk_config entry1(
914 default_disk_path(), 1000 * 1024 * 1024, default_disk_io_impl());
915 entry1.unlink_on_open = true;
916 entry1.autogrow = true;
917 add_disk(entry1);
918 }
919
default_disk_path()920 std::string FoxxllConfig::default_disk_path() {
921 #if !FOXXLL_WINDOWS
922 int pid = getpid();
923 return "/var/tmp/thrill." + common::to_str(pid) + ".tmp";
924 #else
925 DWORD pid = GetCurrentProcessId();
926 char* tmpstr = new char[255];
927 if (GetTempPathA(255, tmpstr) == 0)
928 FOXXLL_THROW_WIN_LASTERROR(resource_error, "GetTempPathA()");
929 std::string result = tmpstr;
930 result += "thrill." + common::to_str(pid) + ".tmp";
931 delete[] tmpstr;
932 return result;
933 #endif
934 }
935
default_config_file_name()936 std::string FoxxllConfig::default_config_file_name() {
937 return ".thrill";
938 }
939
RunSetupFoxxll()940 void RunSetupFoxxll() {
941 // install derived config instance in foxxll's singleton
942 foxxll::config::create_instance<FoxxllConfig>();
943 }
944
945 /*----------------------------------------------------------------------------*/
946
Run(const std::function<void (Context &)> & job_startpoint)947 int Run(const std::function<void(Context&)>& job_startpoint) {
948
949 common::NameThisThread("main");
950
951 if (RunCheckDieWithParent() < 0)
952 return -1;
953
954 if (RunCheckUnlinkBinary() < 0)
955 return -1;
956
957 RunSetupFoxxll();
958
959 // parse environment: THRILL_NET
960 const char* env_net = getenv("THRILL_NET");
961
962 // if no backend configured: automatically select one.
963 if (env_net == nullptr || *env_net == 0) {
964 env_net = DetectNetBackend();
965 if (env_net == nullptr) return -1;
966 }
967
968 // run with selected backend
969 if (strcmp(env_net, "mock") == 0) {
970 // mock network backend
971 return RunBackendLoopback<net::mock::Group>("mock", job_startpoint);
972 }
973
974 if (strcmp(env_net, "local") == 0) {
975 #if THRILL_HAVE_NET_TCP
976 // tcp loopback network backend
977 return RunBackendLoopback<net::tcp::Group>("tcp", job_startpoint);
978 #else
979 return RunNotSupported(env_net);
980 #endif
981 }
982
983 if (strcmp(env_net, "tcp") == 0) {
984 #if THRILL_HAVE_NET_TCP
985 // real tcp network backend
986 return RunBackendTcp(job_startpoint);
987 #else
988 return RunNotSupported(env_net);
989 #endif
990 }
991
992 if (strcmp(env_net, "mpi") == 0) {
993 #if THRILL_HAVE_NET_MPI
994 // mpi network backend
995 return RunBackendMpi(job_startpoint);
996 #else
997 return RunNotSupported(env_net);
998 #endif
999 }
1000
1001 if (strcmp(env_net, "ib") == 0) {
1002 #if THRILL_HAVE_NET_IB
1003 // ib/mpi network backend
1004 return RunBackendIb(job_startpoint);
1005 #else
1006 return RunNotSupported(env_net);
1007 #endif
1008 }
1009
1010 std::cerr << "Thrill: network backend " << env_net << " is unknown."
1011 << std::endl;
1012 return -1;
1013 }
1014
1015 /******************************************************************************/
1016 // MemoryConfig
1017
setup(size_t ram)1018 void MemoryConfig::setup(size_t ram) {
1019 ram_ = ram;
1020 apply();
1021 }
1022
setup_detect()1023 int MemoryConfig::setup_detect() {
1024
1025 // determine amount of physical RAM or take user's limit
1026
1027 const char* env_ram = getenv("THRILL_RAM");
1028
1029 if (env_ram != nullptr && *env_ram != 0) {
1030 uint64_t ram64;
1031 if (!tlx::parse_si_iec_units(env_ram, &ram64)) {
1032 std::cerr << "Thrill: environment variable"
1033 << " THRILL_RAM=" << env_ram
1034 << " is not a valid amount of RAM memory."
1035 << std::endl;
1036 return -1;
1037 }
1038 ram_ = static_cast<size_t>(ram64);
1039 }
1040 else {
1041 // detect amount of physical memory on system
1042 #if defined(_MSC_VER)
1043 MEMORYSTATUSEX memstx;
1044 memstx.dwLength = sizeof(memstx);
1045 GlobalMemoryStatusEx(&memstx);
1046
1047 ram_ = memstx.ullTotalPhys;
1048 #elif __APPLE__
1049 int mib[2];
1050 int64_t physical_memory;
1051 size_t length;
1052
1053 // Get the physical memory size
1054 mib[0] = CTL_HW;
1055 mib[1] = HW_MEMSIZE;
1056 length = sizeof(physical_memory);
1057 sysctl(mib, 2, &physical_memory, &length, nullptr, 0);
1058 ram_ = static_cast<size_t>(physical_memory);
1059 #else
1060 ram_ = sysconf(_SC_PHYS_PAGES) * static_cast<size_t>(sysconf(_SC_PAGESIZE));
1061 #endif
1062
1063 #if __linux__
1064 // use getrlimit() to check user limit on address space
1065 struct rlimit rl; // NOLINT
1066 if (getrlimit(RLIMIT_AS, &rl) == 0) {
1067 if (rl.rlim_cur != 0 && rl.rlim_cur * 3 / 4 < ram_) {
1068 ram_ = rl.rlim_cur * 3 / 4;
1069 }
1070 }
1071 else {
1072 sLOG1 << "getrlimit(): " << strerror(errno);
1073 }
1074 #endif
1075 }
1076
1077 apply();
1078
1079 return 0;
1080 }
1081
apply()1082 void MemoryConfig::apply() {
1083 // divide up ram_
1084
1085 ram_workers_ = ram_ / 3;
1086 ram_block_pool_hard_ = ram_ / 3;
1087 ram_block_pool_soft_ = ram_block_pool_hard_ * 9 / 10;
1088 ram_floating_ = ram_ - ram_block_pool_hard_ - ram_workers_;
1089
1090 // set memory limit, only BlockPool is excluded from malloc tracking, as
1091 // only it uses bypassing allocators.
1092 mem::set_memory_limit_indication(ram_floating_ + ram_workers_);
1093 }
1094
divide(size_t hosts) const1095 MemoryConfig MemoryConfig::divide(size_t hosts) const {
1096
1097 MemoryConfig mc = *this;
1098 mc.ram_ /= hosts;
1099 mc.ram_block_pool_hard_ /= hosts;
1100 mc.ram_block_pool_soft_ /= hosts;
1101 mc.ram_workers_ /= hosts;
1102 // free floating memory is not divided by host, as it is measured overall
1103
1104 return mc;
1105 }
1106
print(size_t workers_per_host) const1107 void MemoryConfig::print(size_t workers_per_host) const {
1108 if (!verbose_) return;
1109
1110 std::cerr
1111 << "Thrill: using "
1112 << tlx::format_iec_units(ram_) << "B RAM total,"
1113 << " BlockPool=" << tlx::format_iec_units(ram_block_pool_hard_) << "B,"
1114 << " workers="
1115 << tlx::format_iec_units(ram_workers_ / workers_per_host) << "B,"
1116 << " floating=" << tlx::format_iec_units(ram_floating_) << "B."
1117 << std::endl;
1118 }
1119
1120 /******************************************************************************/
1121 // HostContext methods
1122
HostContext(size_t local_host_id,const MemoryConfig & mem_config,std::unique_ptr<net::DispatcherThread> dispatcher,std::array<net::GroupPtr,net::Manager::kGroupCount> && groups,size_t workers_per_host)1123 HostContext::HostContext(
1124 size_t local_host_id,
1125 const MemoryConfig& mem_config,
1126 std::unique_ptr<net::DispatcherThread> dispatcher,
1127 std::array<net::GroupPtr, net::Manager::kGroupCount>&& groups,
1128 size_t workers_per_host)
1129 : mem_config_(mem_config),
1130 base_logger_(MakeHostLogPath(groups[0]->my_host_rank())),
1131 logger_(&base_logger_, "host_rank", groups[0]->my_host_rank()),
1132 profiler_(std::make_unique<common::ProfileThread>()),
1133 local_host_id_(local_host_id),
1134 workers_per_host_(workers_per_host),
1135 dispatcher_(std::move(dispatcher)),
1136 net_manager_(std::move(groups), logger_) {
1137
1138 // write command line parameters to json log
1139 common::LogCmdlineParams(logger_);
1140
1141 if (mem_config_.enable_proc_profiler_)
1142 StartLinuxProcStatsProfiler(*profiler_, logger_);
1143
1144 // run memory profiler only on local host 0 (especially for test runs)
1145 if (local_host_id == 0)
1146 mem::StartMemProfiler(*profiler_, logger_);
1147 }
1148
~HostContext()1149 HostContext::~HostContext() {
1150 // stop dispatcher _before_ stopping multiplexer
1151 dispatcher_->Terminate();
1152 }
1153
MakeHostLogPath(size_t host_rank)1154 std::string HostContext::MakeHostLogPath(size_t host_rank) {
1155 const char* env_log = getenv("THRILL_LOG");
1156 if (env_log == nullptr) {
1157 if (host_rank == 0 && mem_config().verbose_) {
1158 std::cerr << "Thrill: no THRILL_LOG was found, "
1159 << "so no json log is written."
1160 << std::endl;
1161 }
1162 return std::string();
1163 }
1164
1165 std::string output = env_log;
1166 if (output == "" || output == "-")
1167 return std::string();
1168 if (output == "/dev/stdout")
1169 return output;
1170 if (output == "stdout")
1171 return "/dev/stdout";
1172
1173 return output + "-host-" + std::to_string(host_rank) + ".json";
1174 }
1175
1176 /******************************************************************************/
1177 // Context methods
1178
Context(HostContext & host_context,size_t local_worker_id)1179 Context::Context(HostContext& host_context, size_t local_worker_id)
1180 : local_host_id_(host_context.local_host_id()),
1181 local_worker_id_(local_worker_id),
1182 workers_per_host_(host_context.workers_per_host()),
1183 mem_limit_(host_context.worker_mem_limit()),
1184 mem_config_(host_context.mem_config()),
1185 mem_manager_(host_context.mem_manager()),
1186 net_manager_(host_context.net_manager()),
1187 flow_manager_(host_context.flow_manager()),
1188 block_pool_(host_context.block_pool()),
1189 multiplexer_(host_context.data_multiplexer()),
1190 rng_(std::random_device { }
1191 () + (local_worker_id_ << 16)),
1192 base_logger_(&host_context.base_logger_) {
1193 assert(local_worker_id < workers_per_host());
1194 }
1195
GetFile(DIABase * dia)1196 data::File Context::GetFile(DIABase* dia) {
1197 return GetFile(dia != nullptr ? dia->dia_id() : 0);
1198 }
1199
GetFilePtr(size_t dia_id)1200 data::FilePtr Context::GetFilePtr(size_t dia_id) {
1201 return tlx::make_counting<data::File>(
1202 block_pool_, local_worker_id_, dia_id);
1203 }
1204
GetFilePtr(DIABase * dia)1205 data::FilePtr Context::GetFilePtr(DIABase* dia) {
1206 return GetFilePtr(dia != nullptr ? dia->dia_id() : 0);
1207 }
1208
GetNewCatStream(size_t dia_id)1209 data::CatStreamPtr Context::GetNewCatStream(size_t dia_id) {
1210 return multiplexer_.GetNewCatStream(local_worker_id_, dia_id);
1211 }
1212
GetNewCatStream(DIABase * dia)1213 data::CatStreamPtr Context::GetNewCatStream(DIABase* dia) {
1214 return GetNewCatStream(dia != nullptr ? dia->dia_id() : 0);
1215 }
1216
GetNewMixStream(size_t dia_id)1217 data::MixStreamPtr Context::GetNewMixStream(size_t dia_id) {
1218 return multiplexer_.GetNewMixStream(local_worker_id_, dia_id);
1219 }
1220
GetNewMixStream(DIABase * dia)1221 data::MixStreamPtr Context::GetNewMixStream(DIABase* dia) {
1222 return GetNewMixStream(dia != nullptr ? dia->dia_id() : 0);
1223 }
1224
1225 template <>
GetNewStream(size_t dia_id)1226 data::CatStreamPtr Context::GetNewStream<data::CatStream>(size_t dia_id) {
1227 return GetNewCatStream(dia_id);
1228 }
1229
1230 template <>
GetNewStream(size_t dia_id)1231 data::MixStreamPtr Context::GetNewStream<data::MixStream>(size_t dia_id) {
1232 return GetNewMixStream(dia_id);
1233 }
1234
1235 struct OverallStats {
1236
1237 //! overall run time
1238 double runtime;
1239
1240 //! maximum ByteBlock allocation on all workers
1241 size_t max_block_bytes;
1242
1243 //! network traffic performed by net layer
1244 size_t net_traffic_tx, net_traffic_rx;
1245
1246 //! I/O volume performed by io layer
1247 size_t io_volume;
1248
1249 //! maximum external memory allocation
1250 size_t io_max_allocation;
1251
operator <<(std::ostream & os,const OverallStats & c)1252 friend std::ostream& operator << (std::ostream& os, const OverallStats& c) {
1253 return os << "[OverallStats"
1254 << " runtime=" << c.runtime
1255 << " max_block_bytes=" << c.max_block_bytes
1256 << " net_traffic_tx=" << c.net_traffic_tx
1257 << " net_traffic_rx=" << c.net_traffic_rx
1258 << " io_volume=" << c.io_volume
1259 << " io_max_allocation=" << c.io_max_allocation
1260 << "]";
1261 }
1262
operator +thrill::api::OverallStats1263 OverallStats operator + (const OverallStats& b) const {
1264 OverallStats r;
1265 r.runtime = std::max(runtime, b.runtime);
1266 r.max_block_bytes = max_block_bytes + b.max_block_bytes;
1267 r.net_traffic_tx = net_traffic_tx + b.net_traffic_tx;
1268 r.net_traffic_rx = net_traffic_rx + b.net_traffic_rx;
1269 r.io_volume = io_volume + b.io_volume;
1270 r.io_max_allocation = std::max(io_max_allocation, b.io_max_allocation);
1271 return r;
1272 }
1273 };
1274
Launch(const std::function<void (Context &)> & job_startpoint)1275 void Context::Launch(const std::function<void(Context&)>& job_startpoint) {
1276 logger_ << "class" << "Context"
1277 << "event" << "job-start";
1278
1279 common::StatsTimerStart overall_timer;
1280
1281 try {
1282 job_startpoint(*this);
1283 }
1284 catch (std::exception& e) {
1285 LOG1 << "worker " << my_rank() << " threw " << typeid(e).name();
1286 LOG1 << " what(): " << e.what();
1287
1288 logger_ << "class" << "Context"
1289 << "event" << "job-exception"
1290 << "exception" << typeid(e).name()
1291 << "what" << e.what();
1292 throw;
1293 }
1294
1295 logger_ << "class" << "Context"
1296 << "event" << "job-done"
1297 << "elapsed" << overall_timer;
1298
1299 overall_timer.Stop();
1300
1301 // collect overall statistics
1302 OverallStats stats;
1303 stats.runtime = overall_timer.SecondsDouble();
1304
1305 stats.max_block_bytes =
1306 local_worker_id_ == 0 ? block_pool().max_total_bytes() : 0;
1307
1308 stats.net_traffic_tx = local_worker_id_ == 0 ? net_manager_.Traffic().tx : 0;
1309 stats.net_traffic_rx = local_worker_id_ == 0 ? net_manager_.Traffic().rx : 0;
1310
1311 if (local_host_id_ == 0 && local_worker_id_ == 0) {
1312 foxxll::stats_data io_stats(*foxxll::stats::get_instance());
1313 stats.io_volume = io_stats.get_read_bytes() + io_stats.get_write_bytes();
1314 stats.io_max_allocation =
1315 foxxll::block_manager::get_instance()->maximum_allocation();
1316 }
1317 else {
1318 stats.io_volume = 0;
1319 stats.io_max_allocation = 0;
1320 }
1321
1322 LOG0 << stats;
1323
1324 stats = net.Reduce(stats);
1325
1326 if (my_rank() == 0) {
1327 using tlx::format_iec_units;
1328
1329 if (stats.net_traffic_rx != stats.net_traffic_tx)
1330 LOG1 << "Manager::Traffic() tx/rx asymmetry = "
1331 << tlx::abs_diff(stats.net_traffic_tx, stats.net_traffic_rx);
1332
1333 if (mem_config().verbose_) {
1334 std::cerr
1335 << "Thrill:"
1336 << " ran " << stats.runtime << "s with max "
1337 << format_iec_units(stats.max_block_bytes) << "B in DIA Blocks, "
1338 << format_iec_units(stats.net_traffic_tx) << "B network traffic, "
1339 << format_iec_units(stats.io_volume) << "B disk I/O, and "
1340 << format_iec_units(stats.io_max_allocation) << "B max disk use."
1341 << std::endl;
1342 }
1343
1344 logger_ << "class" << "Context"
1345 << "event" << "summary"
1346 << "runtime" << stats.runtime
1347 << "net_traffic" << stats.net_traffic_tx
1348 << "io_volume" << stats.io_volume
1349 << "io_max_allocation" << stats.io_max_allocation;
1350 }
1351 }
1352
1353 } // namespace api
1354 } // namespace thrill
1355
1356 /******************************************************************************/
1357