1 /******************************************************************************* 2 * thrill/net/tcp/group.hpp 3 * 4 * net::Group is a collection of NetConnections providing simple MPI-like 5 * collectives and point-to-point communication. 6 * 7 * Part of Project Thrill - http://project-thrill.org 8 * 9 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 10 * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com> 11 * 12 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 13 ******************************************************************************/ 14 15 #pragma once 16 #ifndef THRILL_NET_TCP_GROUP_HEADER 17 #define THRILL_NET_TCP_GROUP_HEADER 18 19 #include <thrill/common/logger.hpp> 20 #include <thrill/net/group.hpp> 21 #include <thrill/net/tcp/connection.hpp> 22 23 #include <algorithm> 24 #include <cassert> 25 #include <cstring> 26 #include <functional> 27 #include <string> 28 #include <vector> 29 30 namespace thrill { 31 namespace net { 32 namespace tcp { 33 34 //! \addtogroup net_tcp TCP Socket API 35 //! \{ 36 37 class SelectDispatcher; 38 39 /*! 40 * Collection of NetConnections to workers, allows point-to-point client 41 * communication and simple collectives like MPI. 42 */ 43 class Group final : public net::Group 44 { 45 static constexpr bool debug = false; 46 47 public: 48 //! \name Construction and Initialization 49 //! \{ 50 51 /*! 52 * Construct a test network with an underlying full mesh of local loopback 53 * stream sockets for testing. Returns vector of net::Group interfaces for 54 * each virtual client. This is ideal for testing network communication 55 * protocols. 56 */ 57 static std::vector<std::unique_ptr<Group> > ConstructLoopbackMesh( 58 size_t num_hosts); 59 60 /*! 61 * Construct a test network with an underlying full mesh of *REAL* tcp 62 * streams interconnected via localhost ports. 63 */ 64 static std::vector<std::unique_ptr<Group> > ConstructLocalRealTCPMesh( 65 size_t num_hosts); 66 67 //! Initializing constructor, used by tests for creating Groups. Group(size_t my_rank,size_t group_size)68 Group(size_t my_rank, size_t group_size) 69 : net::Group(my_rank), 70 connections_(group_size) { } 71 72 //! \} 73 74 //! non-copyable: delete copy-constructor 75 Group(const Group&) = delete; 76 //! non-copyable: delete assignment operator 77 Group& operator = (const Group&) = delete; 78 //! move-constructor: default 79 Group(Group&&) = default; 80 //! move-assignment operator: default 81 Group& operator = (Group&&) = default; 82 83 //! \name Status and Access to NetConnections 84 //! \{ 85 86 //! Return Connection to client id. tcp_connection(size_t id)87 Connection& tcp_connection(size_t id) { 88 if (id >= connections_.size()) 89 throw Exception("Group::Connection() requested " 90 "invalid client id " + std::to_string(id)); 91 92 if (id == my_rank_) 93 throw Exception("Group::Connection() requested " 94 "connection to self."); 95 96 // return Connection to client id. 97 return connections_[id]; 98 } 99 connection(size_t id)100 net::Connection& connection(size_t id) final { 101 return tcp_connection(id); 102 } 103 104 using Dispatcher = tcp::SelectDispatcher; 105 106 std::unique_ptr<net::Dispatcher> ConstructDispatcher() const final; 107 108 /*! 109 * Assigns a connection to this net group. This method swaps the net 110 * connection to memory managed by this group. The reference given to that 111 * method will be invalid afterwards. 112 * 113 * \param connection The connection to assign. 114 * 115 * \return A ref to the assigned connection, which is always valid, but 116 * might be different from the inut connection. 117 */ AssignConnection(Connection & connection)118 Connection& AssignConnection(Connection& connection) { 119 if (connection.peer_id() >= connections_.size()) 120 throw Exception("Group::GetClient() requested " 121 "invalid client id " 122 + std::to_string(connection.peer_id())); 123 124 connections_[connection.peer_id()] = std::move(connection); 125 126 return connections_[connection.peer_id()]; 127 } 128 129 //! Return number of connections in this group (= number computing hosts) num_hosts() const130 size_t num_hosts() const final { 131 return connections_.size(); 132 } 133 134 //! Closes all client connections Close()135 void Close() { 136 for (size_t i = 0; i != connections_.size(); ++i) 137 { 138 if (i == my_rank_) continue; 139 140 if (connections_[i].IsValid()) 141 connections_[i].Close(); 142 } 143 144 connections_.clear(); 145 } 146 147 //! Closes all client connections ~Group()148 ~Group() { 149 Close(); 150 } 151 152 //! \} 153 154 private: 155 //! Connections to all other clients in the Group. 156 std::vector<Connection> connections_; 157 }; 158 159 //! \} 160 161 } // namespace tcp 162 } // namespace net 163 } // namespace thrill 164 165 #endif // !THRILL_NET_TCP_GROUP_HEADER 166 167 /******************************************************************************/ 168