1 /*******************************************************************************
2  * thrill/net/tcp/group.cpp
3  *
4  * net::Group is a collection of Connections providing simple MPI-like
5  * collectives and point-to-point communication.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
10  * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #include <thrill/common/logger.hpp>
16 #include <thrill/net/tcp/construct.hpp>
17 #include <thrill/net/tcp/group.hpp>
18 #include <thrill/net/tcp/select_dispatcher.hpp>
19 
20 #include <random>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace net {
28 namespace tcp {
29 
30 std::unique_ptr<Dispatcher>
ConstructDispatcher() const31 Group::ConstructDispatcher() const {
32     // construct tcp::SelectDispatcher
33     return std::make_unique<SelectDispatcher>();
34 }
35 
ConstructLoopbackMesh(size_t num_hosts)36 std::vector<std::unique_ptr<Group> > Group::ConstructLoopbackMesh(
37     size_t num_hosts) {
38 
39     // construct a group of num_hosts
40     std::vector<std::unique_ptr<Group> > group(num_hosts);
41 
42     for (size_t i = 0; i < num_hosts; ++i) {
43         group[i] = std::make_unique<Group>(i, num_hosts);
44     }
45 
46     // construct a stream socket pair for (i,j) with i < j
47     for (size_t i = 0; i != num_hosts; ++i) {
48         for (size_t j = i + 1; j < num_hosts; ++j) {
49             LOG << "doing Socket::CreatePair() for i=" << i << " j=" << j;
50 
51             std::pair<Socket, Socket> sp = Socket::CreatePair();
52 
53             group[i]->connections_[j] = Connection(std::move(sp.first));
54             group[j]->connections_[i] = Connection(std::move(sp.second));
55 
56             group[i]->connections_[j].is_loopback_ = true;
57             group[j]->connections_[i].is_loopback_ = true;
58         }
59     }
60 
61     return group;
62 }
63 
ConstructLocalRealTCPMesh(size_t num_hosts)64 std::vector<std::unique_ptr<Group> > Group::ConstructLocalRealTCPMesh(
65     size_t num_hosts) {
66 
67     // randomize base port number for test
68     std::default_random_engine generator(std::random_device { } ());
69     std::uniform_int_distribution<int> distribution(10000, 30000);
70     const size_t port_base = distribution(generator);
71 
72     std::vector<std::string> endpoints;
73 
74     for (size_t i = 0; i < num_hosts; ++i) {
75         endpoints.push_back("127.0.0.1:" + std::to_string(port_base + i));
76     }
77 
78     sLOG << "Group test uses ports" << port_base << "-" << port_base + num_hosts;
79 
80     std::vector<std::thread> threads(num_hosts);
81 
82     // we have to create and run threads to construct Group because these create
83     // real connections.
84 
85     std::vector<std::unique_ptr<Group> > groups(num_hosts);
86 
87     for (size_t i = 0; i < num_hosts; i++) {
88         threads[i] = std::thread(
89             [i, &endpoints, &groups]() {
90                 // construct Group i with endpoints -- with temporary Dispatcher
91                 net::tcp::SelectDispatcher dispatcher;
92                 Construct(dispatcher, i, endpoints, groups.data() + i, 1);
93             });
94     }
95 
96     for (size_t i = 0; i < num_hosts; i++) {
97         threads[i].join();
98     }
99 
100     return groups;
101 }
102 
103 } // namespace tcp
104 } // namespace net
105 } // namespace thrill
106 
107 /******************************************************************************/
108