1 /*******************************************************************************
2  * thrill/net/tcp/group.hpp
3  *
4  * net::Group is a collection of NetConnections providing simple MPI-like
5  * collectives and point-to-point communication.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
10  * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_NET_TCP_GROUP_HEADER
17 #define THRILL_NET_TCP_GROUP_HEADER
18 
19 #include <thrill/common/logger.hpp>
20 #include <thrill/net/group.hpp>
21 #include <thrill/net/tcp/connection.hpp>
22 
23 #include <algorithm>
24 #include <cassert>
25 #include <cstring>
26 #include <functional>
27 #include <string>
28 #include <vector>
29 
30 namespace thrill {
31 namespace net {
32 namespace tcp {
33 
34 //! \addtogroup net_tcp TCP Socket API
35 //! \{
36 
37 class SelectDispatcher;
38 
39 /*!
40  * Collection of NetConnections to workers, allows point-to-point client
41  * communication and simple collectives like MPI.
42  */
43 class Group final : public net::Group
44 {
45     static constexpr bool debug = false;
46 
47 public:
48     //! \name Construction and Initialization
49     //! \{
50 
51     /*!
52      * Construct a test network with an underlying full mesh of local loopback
53      * stream sockets for testing. Returns vector of net::Group interfaces for
54      * each virtual client. This is ideal for testing network communication
55      * protocols.
56      */
57     static std::vector<std::unique_ptr<Group> > ConstructLoopbackMesh(
58         size_t num_hosts);
59 
60     /*!
61      * Construct a test network with an underlying full mesh of *REAL* tcp
62      * streams interconnected via localhost ports.
63      */
64     static std::vector<std::unique_ptr<Group> > ConstructLocalRealTCPMesh(
65         size_t num_hosts);
66 
67     //! Initializing constructor, used by tests for creating Groups.
Group(size_t my_rank,size_t group_size)68     Group(size_t my_rank, size_t group_size)
69         : net::Group(my_rank),
70           connections_(group_size) { }
71 
72     //! \}
73 
74     //! non-copyable: delete copy-constructor
75     Group(const Group&) = delete;
76     //! non-copyable: delete assignment operator
77     Group& operator = (const Group&) = delete;
78     //! move-constructor: default
79     Group(Group&&) = default;
80     //! move-assignment operator: default
81     Group& operator = (Group&&) = default;
82 
83     //! \name Status and Access to NetConnections
84     //! \{
85 
86     //! Return Connection to client id.
tcp_connection(size_t id)87     Connection& tcp_connection(size_t id) {
88         if (id >= connections_.size())
89             throw Exception("Group::Connection() requested "
90                             "invalid client id " + std::to_string(id));
91 
92         if (id == my_rank_)
93             throw Exception("Group::Connection() requested "
94                             "connection to self.");
95 
96         // return Connection to client id.
97         return connections_[id];
98     }
99 
connection(size_t id)100     net::Connection& connection(size_t id) final {
101         return tcp_connection(id);
102     }
103 
104     using Dispatcher = tcp::SelectDispatcher;
105 
106     std::unique_ptr<net::Dispatcher> ConstructDispatcher() const final;
107 
108     /*!
109      * Assigns a connection to this net group.  This method swaps the net
110      * connection to memory managed by this group.  The reference given to that
111      * method will be invalid afterwards.
112      *
113      * \param connection The connection to assign.
114      *
115      * \return A ref to the assigned connection, which is always valid, but
116      * might be different from the inut connection.
117      */
AssignConnection(Connection & connection)118     Connection& AssignConnection(Connection& connection) {
119         if (connection.peer_id() >= connections_.size())
120             throw Exception("Group::GetClient() requested "
121                             "invalid client id "
122                             + std::to_string(connection.peer_id()));
123 
124         connections_[connection.peer_id()] = std::move(connection);
125 
126         return connections_[connection.peer_id()];
127     }
128 
129     //! Return number of connections in this group (= number computing hosts)
num_hosts() const130     size_t num_hosts() const final {
131         return connections_.size();
132     }
133 
134     //! Closes all client connections
Close()135     void Close() {
136         for (size_t i = 0; i != connections_.size(); ++i)
137         {
138             if (i == my_rank_) continue;
139 
140             if (connections_[i].IsValid())
141                 connections_[i].Close();
142         }
143 
144         connections_.clear();
145     }
146 
147     //! Closes all client connections
~Group()148     ~Group() {
149         Close();
150     }
151 
152     //! \}
153 
154 private:
155     //! Connections to all other clients in the Group.
156     std::vector<Connection> connections_;
157 };
158 
159 //! \}
160 
161 } // namespace tcp
162 } // namespace net
163 } // namespace thrill
164 
165 #endif // !THRILL_NET_TCP_GROUP_HEADER
166 
167 /******************************************************************************/
168