1 //  Copyright (c) 2016 John Biddiscombe
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #ifndef HPX_PARCELSET_POLICIES_VERBS_RDMA_CONTROLLER_HPP
7 #define HPX_PARCELSET_POLICIES_VERBS_RDMA_CONTROLLER_HPP
8 
9 // config
10 #include <hpx/config/defines.hpp>
11 //
12 #include <hpx/lcos/local/shared_mutex.hpp>
13 #include <hpx/lcos/promise.hpp>
14 #include <hpx/lcos/future.hpp>
15 //
16 #include <plugins/parcelport/parcelport_logging.hpp>
17 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp>
18 #include <plugins/parcelport/verbs/rdma/rdma_locks.hpp>
19 #include <plugins/parcelport/verbs/rdma/verbs_endpoint.hpp>
20 //
21 #include <plugins/parcelport/unordered_map.hpp>
22 //
23 #include <atomic>
24 #include <memory>
25 #include <deque>
26 #include <chrono>
27 #include <iostream>
28 #include <functional>
29 #include <map>
30 #include <atomic>
31 #include <string>
32 #include <utility>
33 #include <cstdint>
34 //
35 // @TODO : Remove the client map pair as we have a copy in the verbs_parcelport class
36 // that does almost the same job.
37 // @TODO : Most of this code could be moved into the main parcelport, or the endpoint
38 // classes
39 namespace hpx {
40 namespace parcelset {
41 namespace policies {
42 namespace verbs
43 {
44     class rdma_controller
45     {
46     public:
47         typedef hpx::lcos::local::spinlock mutex_type;
48         typedef hpx::parcelset::policies::verbs::unique_lock<mutex_type> unique_lock;
49         typedef hpx::parcelset::policies::verbs::scoped_lock<mutex_type> scoped_lock;
50 
51         // constructor gets infor from device and sets up all necessary
52         // maps, queues and server endpoint etc
53         rdma_controller(const char *device, const char *interface, int port);
54 
55         // clean up all resources
56         ~rdma_controller();
57 
58         // initiate a listener for connections
59         int startup();
60 
61         // returns true when all connections have been disconnected and none are active
isTerminated()62         bool isTerminated() {
63             return (qp_endpoint_map_.size() == 0);
64         }
65 
66         // types we need for connection and disconnection callback functions
67         // into the main parcelport code.
68         typedef std::function<void(verbs_endpoint_ptr)>       ConnectionFunction;
69         typedef std::function<int(verbs_endpoint_ptr client)> DisconnectionFunction;
70 
71         // Set a callback which will be called immediately after
72         // RDMA_CM_EVENT_ESTABLISHED has been received.
73         // This should be used to initialize all structures for handling a new connection
setConnectionFunction(ConnectionFunction f)74         void setConnectionFunction(ConnectionFunction f) {
75             this->connection_function_ = f;
76         }
77 
78         // currently not used.
setDisconnectionFunction(DisconnectionFunction f)79         void setDisconnectionFunction(DisconnectionFunction f) {
80             this->disconnection_function_ = f;
81         }
82 
83         // This is the main polling function that checks for work completions
84         // and connection manager events, if stopped is true, then completions
85         // are thrown away, otherwise the completion callback is triggered
86         int poll_endpoints(bool stopped=false);
87 
88         int poll_for_work_completions(bool stopped=false);
89 
get_completion_queue() const90         inline verbs_completion_queue *get_completion_queue() const {
91             return completion_queue_.get();
92         }
93 
get_client_from_completion(struct ibv_wc & completion)94         inline verbs_endpoint *get_client_from_completion(struct ibv_wc &completion)
95         {
96             return qp_endpoint_map_.at(completion.qp_num).get();
97         }
98 
get_protection_domain()99         inline verbs_protection_domain_ptr get_protection_domain() {
100             return this->protection_domain_;
101         }
102 
get_endpoint(uint32_t remote_ip)103         verbs_endpoint* get_endpoint(uint32_t remote_ip) {
104             return (std::get<0>(connections_started_.find(remote_ip)->second)).get();
105 
106         }
107 
get_memory_pool()108         inline rdma_memory_pool_ptr get_memory_pool() {
109             return memory_pool_;
110         }
111 
112         typedef std::function<int(struct ibv_wc completion, verbs_endpoint *client)>
113             CompletionFunction;
114 
setCompletionFunction(CompletionFunction f)115         void setCompletionFunction(CompletionFunction f) {
116             this->completion_function_ = f;
117         }
118 
119         void refill_client_receives(bool force=true);
120 
121         hpx::shared_future<verbs_endpoint_ptr> connect_to_server(uint32_t remote_ip);
122 
123         void disconnect_from_server(verbs_endpoint_ptr client);
124 
125         void disconnect_all();
126 
127         bool active();
128 
129     private:
130         void debug_connections();
131 
132         int handle_event(struct rdma_cm_event *cm_event, verbs_endpoint *client);
133 
134         int handle_connect_request(
135             struct rdma_cm_event *cm_event, std::uint32_t remote_ip);
136 
137         int start_server_connection(uint32_t remote_ip);
138 
139         // store info about local device
140         std::string           device_;
141         std::string           interface_;
142         sockaddr_in           local_addr_;
143 
144         // callback functions used for connection event handling
145         ConnectionFunction    connection_function_;
146         DisconnectionFunction disconnection_function_;
147 
148         // callback function for handling a completion event
149         CompletionFunction    completion_function_;
150 
151         // Protection domain for all resources.
152         verbs_protection_domain_ptr protection_domain_;
153         // Pinned memory pool used for allocating buffers
154         rdma_memory_pool_ptr        memory_pool_;
155         // Server/Listener for RDMA connections.
156         verbs_endpoint_ptr          server_endpoint_;
157         // Shared completion queue for all endoints
158         verbs_completion_queue_ptr  completion_queue_;
159         // Count outstanding receives posted to SRQ + Completion queue
160         std::atomic<uint16_t>       preposted_receives_;
161 
162         // only allow one thread to handle connect/disconnect events etc
163         mutex_type            controller_mutex_;
164 
165         typedef std::tuple<
166             verbs_endpoint_ptr,
167             hpx::promise<verbs_endpoint_ptr>,
168             hpx::shared_future<verbs_endpoint_ptr>
169         > promise_tuple_type;
170         //
171         typedef std::pair<const uint32_t, promise_tuple_type> ClientMapPair;
172         typedef std::pair<const uint32_t, verbs_endpoint_ptr> QPMapPair;
173 
174         // Map of all active clients indexed by queue pair number.
175 //        hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> clients_;
176 
177         typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type>
178             ::map_read_lock_type map_read_lock_type;
179         typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type>
180             ::map_write_lock_type map_write_lock_type;
181 
182         // Map of connections started, needed during address resolution until
183         // qp is created, and then to hold a future to an endpoint that the parcelport
184         // can get and wait on
185         hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> connections_started_;
186 
187         typedef std::pair<uint32_t, verbs_endpoint_ptr> qp_map_type;
188         hpx::concurrent::unordered_map<uint32_t, verbs_endpoint_ptr> qp_endpoint_map_;
189 
190         // used to skip polling event channel too frequently
191         typedef std::chrono::time_point<std::chrono::system_clock> time_type;
192         time_type event_check_time_;
193         uint32_t  event_pause_;
194 
195     };
196 
197     // Smart pointer for rdma_controller object.
198     typedef std::shared_ptr<rdma_controller> rdma_controller_ptr;
199 
200 }}}}
201 
202 #endif
203