1 /*******************************************************************************
2 * thrill/net/tcp/group.cpp
3 *
4 * net::Group is a collection of Connections providing simple MPI-like
5 * collectives and point-to-point communication.
6 *
7 * Part of Project Thrill - http://project-thrill.org
8 *
9 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
10 * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com>
11 *
12 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13 ******************************************************************************/
14
15 #include <thrill/common/logger.hpp>
16 #include <thrill/net/tcp/construct.hpp>
17 #include <thrill/net/tcp/group.hpp>
18 #include <thrill/net/tcp/select_dispatcher.hpp>
19
20 #include <random>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
25
26 namespace thrill {
27 namespace net {
28 namespace tcp {
29
30 std::unique_ptr<Dispatcher>
ConstructDispatcher() const31 Group::ConstructDispatcher() const {
32 // construct tcp::SelectDispatcher
33 return std::make_unique<SelectDispatcher>();
34 }
35
ConstructLoopbackMesh(size_t num_hosts)36 std::vector<std::unique_ptr<Group> > Group::ConstructLoopbackMesh(
37 size_t num_hosts) {
38
39 // construct a group of num_hosts
40 std::vector<std::unique_ptr<Group> > group(num_hosts);
41
42 for (size_t i = 0; i < num_hosts; ++i) {
43 group[i] = std::make_unique<Group>(i, num_hosts);
44 }
45
46 // construct a stream socket pair for (i,j) with i < j
47 for (size_t i = 0; i != num_hosts; ++i) {
48 for (size_t j = i + 1; j < num_hosts; ++j) {
49 LOG << "doing Socket::CreatePair() for i=" << i << " j=" << j;
50
51 std::pair<Socket, Socket> sp = Socket::CreatePair();
52
53 group[i]->connections_[j] = Connection(std::move(sp.first));
54 group[j]->connections_[i] = Connection(std::move(sp.second));
55
56 group[i]->connections_[j].is_loopback_ = true;
57 group[j]->connections_[i].is_loopback_ = true;
58 }
59 }
60
61 return group;
62 }
63
ConstructLocalRealTCPMesh(size_t num_hosts)64 std::vector<std::unique_ptr<Group> > Group::ConstructLocalRealTCPMesh(
65 size_t num_hosts) {
66
67 // randomize base port number for test
68 std::default_random_engine generator(std::random_device { } ());
69 std::uniform_int_distribution<int> distribution(10000, 30000);
70 const size_t port_base = distribution(generator);
71
72 std::vector<std::string> endpoints;
73
74 for (size_t i = 0; i < num_hosts; ++i) {
75 endpoints.push_back("127.0.0.1:" + std::to_string(port_base + i));
76 }
77
78 sLOG << "Group test uses ports" << port_base << "-" << port_base + num_hosts;
79
80 std::vector<std::thread> threads(num_hosts);
81
82 // we have to create and run threads to construct Group because these create
83 // real connections.
84
85 std::vector<std::unique_ptr<Group> > groups(num_hosts);
86
87 for (size_t i = 0; i < num_hosts; i++) {
88 threads[i] = std::thread(
89 [i, &endpoints, &groups]() {
90 // construct Group i with endpoints -- with temporary Dispatcher
91 net::tcp::SelectDispatcher dispatcher;
92 Construct(dispatcher, i, endpoints, groups.data() + i, 1);
93 });
94 }
95
96 for (size_t i = 0; i < num_hosts; i++) {
97 threads[i].join();
98 }
99
100 return groups;
101 }
102
103 } // namespace tcp
104 } // namespace net
105 } // namespace thrill
106
107 /******************************************************************************/
108