1 // Copyright (c) 2016 John Biddiscombe 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #ifndef HPX_PARCELSET_POLICIES_VERBS_RDMA_CONTROLLER_HPP 7 #define HPX_PARCELSET_POLICIES_VERBS_RDMA_CONTROLLER_HPP 8 9 // config 10 #include <hpx/config/defines.hpp> 11 // 12 #include <hpx/lcos/local/shared_mutex.hpp> 13 #include <hpx/lcos/promise.hpp> 14 #include <hpx/lcos/future.hpp> 15 // 16 #include <plugins/parcelport/parcelport_logging.hpp> 17 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp> 18 #include <plugins/parcelport/verbs/rdma/rdma_locks.hpp> 19 #include <plugins/parcelport/verbs/rdma/verbs_endpoint.hpp> 20 // 21 #include <plugins/parcelport/unordered_map.hpp> 22 // 23 #include <atomic> 24 #include <memory> 25 #include <deque> 26 #include <chrono> 27 #include <iostream> 28 #include <functional> 29 #include <map> 30 #include <atomic> 31 #include <string> 32 #include <utility> 33 #include <cstdint> 34 // 35 // @TODO : Remove the client map pair as we have a copy in the verbs_parcelport class 36 // that does almost the same job. 37 // @TODO : Most of this code could be moved into the main parcelport, or the endpoint 38 // classes 39 namespace hpx { 40 namespace parcelset { 41 namespace policies { 42 namespace verbs 43 { 44 class rdma_controller 45 { 46 public: 47 typedef hpx::lcos::local::spinlock mutex_type; 48 typedef hpx::parcelset::policies::verbs::unique_lock<mutex_type> unique_lock; 49 typedef hpx::parcelset::policies::verbs::scoped_lock<mutex_type> scoped_lock; 50 51 // constructor gets infor from device and sets up all necessary 52 // maps, queues and server endpoint etc 53 rdma_controller(const char *device, const char *interface, int port); 54 55 // clean up all resources 56 ~rdma_controller(); 57 58 // initiate a listener for connections 59 int startup(); 60 61 // returns true when all connections have been disconnected and none are active isTerminated()62 bool isTerminated() { 63 return (qp_endpoint_map_.size() == 0); 64 } 65 66 // types we need for connection and disconnection callback functions 67 // into the main parcelport code. 68 typedef std::function<void(verbs_endpoint_ptr)> ConnectionFunction; 69 typedef std::function<int(verbs_endpoint_ptr client)> DisconnectionFunction; 70 71 // Set a callback which will be called immediately after 72 // RDMA_CM_EVENT_ESTABLISHED has been received. 73 // This should be used to initialize all structures for handling a new connection setConnectionFunction(ConnectionFunction f)74 void setConnectionFunction(ConnectionFunction f) { 75 this->connection_function_ = f; 76 } 77 78 // currently not used. setDisconnectionFunction(DisconnectionFunction f)79 void setDisconnectionFunction(DisconnectionFunction f) { 80 this->disconnection_function_ = f; 81 } 82 83 // This is the main polling function that checks for work completions 84 // and connection manager events, if stopped is true, then completions 85 // are thrown away, otherwise the completion callback is triggered 86 int poll_endpoints(bool stopped=false); 87 88 int poll_for_work_completions(bool stopped=false); 89 get_completion_queue() const90 inline verbs_completion_queue *get_completion_queue() const { 91 return completion_queue_.get(); 92 } 93 get_client_from_completion(struct ibv_wc & completion)94 inline verbs_endpoint *get_client_from_completion(struct ibv_wc &completion) 95 { 96 return qp_endpoint_map_.at(completion.qp_num).get(); 97 } 98 get_protection_domain()99 inline verbs_protection_domain_ptr get_protection_domain() { 100 return this->protection_domain_; 101 } 102 get_endpoint(uint32_t remote_ip)103 verbs_endpoint* get_endpoint(uint32_t remote_ip) { 104 return (std::get<0>(connections_started_.find(remote_ip)->second)).get(); 105 106 } 107 get_memory_pool()108 inline rdma_memory_pool_ptr get_memory_pool() { 109 return memory_pool_; 110 } 111 112 typedef std::function<int(struct ibv_wc completion, verbs_endpoint *client)> 113 CompletionFunction; 114 setCompletionFunction(CompletionFunction f)115 void setCompletionFunction(CompletionFunction f) { 116 this->completion_function_ = f; 117 } 118 119 void refill_client_receives(bool force=true); 120 121 hpx::shared_future<verbs_endpoint_ptr> connect_to_server(uint32_t remote_ip); 122 123 void disconnect_from_server(verbs_endpoint_ptr client); 124 125 void disconnect_all(); 126 127 bool active(); 128 129 private: 130 void debug_connections(); 131 132 int handle_event(struct rdma_cm_event *cm_event, verbs_endpoint *client); 133 134 int handle_connect_request( 135 struct rdma_cm_event *cm_event, std::uint32_t remote_ip); 136 137 int start_server_connection(uint32_t remote_ip); 138 139 // store info about local device 140 std::string device_; 141 std::string interface_; 142 sockaddr_in local_addr_; 143 144 // callback functions used for connection event handling 145 ConnectionFunction connection_function_; 146 DisconnectionFunction disconnection_function_; 147 148 // callback function for handling a completion event 149 CompletionFunction completion_function_; 150 151 // Protection domain for all resources. 152 verbs_protection_domain_ptr protection_domain_; 153 // Pinned memory pool used for allocating buffers 154 rdma_memory_pool_ptr memory_pool_; 155 // Server/Listener for RDMA connections. 156 verbs_endpoint_ptr server_endpoint_; 157 // Shared completion queue for all endoints 158 verbs_completion_queue_ptr completion_queue_; 159 // Count outstanding receives posted to SRQ + Completion queue 160 std::atomic<uint16_t> preposted_receives_; 161 162 // only allow one thread to handle connect/disconnect events etc 163 mutex_type controller_mutex_; 164 165 typedef std::tuple< 166 verbs_endpoint_ptr, 167 hpx::promise<verbs_endpoint_ptr>, 168 hpx::shared_future<verbs_endpoint_ptr> 169 > promise_tuple_type; 170 // 171 typedef std::pair<const uint32_t, promise_tuple_type> ClientMapPair; 172 typedef std::pair<const uint32_t, verbs_endpoint_ptr> QPMapPair; 173 174 // Map of all active clients indexed by queue pair number. 175 // hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> clients_; 176 177 typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> 178 ::map_read_lock_type map_read_lock_type; 179 typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> 180 ::map_write_lock_type map_write_lock_type; 181 182 // Map of connections started, needed during address resolution until 183 // qp is created, and then to hold a future to an endpoint that the parcelport 184 // can get and wait on 185 hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> connections_started_; 186 187 typedef std::pair<uint32_t, verbs_endpoint_ptr> qp_map_type; 188 hpx::concurrent::unordered_map<uint32_t, verbs_endpoint_ptr> qp_endpoint_map_; 189 190 // used to skip polling event channel too frequently 191 typedef std::chrono::time_point<std::chrono::system_clock> time_type; 192 time_type event_check_time_; 193 uint32_t event_pause_; 194 195 }; 196 197 // Smart pointer for rdma_controller object. 198 typedef std::shared_ptr<rdma_controller> rdma_controller_ptr; 199 200 }}}} 201 202 #endif 203