1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3  * Copyright (c) UT-Battelle, LLC. 2015-2017. ALL RIGHTS RESERVED.
4  * Copyright (C) Los Alamos National Security, LLC. 2019 ALL RIGHTS RESERVED.
5  *
6  * See file LICENSE for terms.
7  */
8 
9 
10 #ifndef UCP_REQUEST_H_
11 #define UCP_REQUEST_H_
12 
13 #include "ucp_context.h"
14 #include "ucp_mm.h"
15 
16 #include <ucp/api/ucp.h>
17 #include <uct/api/uct.h>
18 #include <ucs/datastruct/mpool.h>
19 #include <ucs/datastruct/queue_types.h>
20 #include <ucs/debug/assert.h>
21 #include <ucp/dt/dt.h>
22 #include <ucp/rma/rma.h>
23 #include <ucp/wireup/wireup.h>
24 
25 
26 #define ucp_trace_req(_sreq, _message, ...) \
27     ucs_trace_req("req %p: " _message, (_sreq), ## __VA_ARGS__)
28 
29 
30 /**
31  * Request flags
32  */
33 enum {
34     UCP_REQUEST_FLAG_COMPLETED            = UCS_BIT(0),
35     UCP_REQUEST_FLAG_RELEASED             = UCS_BIT(1),
36     UCP_REQUEST_FLAG_EXPECTED             = UCS_BIT(3),
37     UCP_REQUEST_FLAG_LOCAL_COMPLETED      = UCS_BIT(4),
38     UCP_REQUEST_FLAG_REMOTE_COMPLETED     = UCS_BIT(5),
39     UCP_REQUEST_FLAG_CALLBACK             = UCS_BIT(6),
40     UCP_REQUEST_FLAG_RECV                 = UCS_BIT(7),
41     UCP_REQUEST_FLAG_SYNC                 = UCS_BIT(8),
42     UCP_REQUEST_FLAG_OFFLOADED            = UCS_BIT(10),
43     UCP_REQUEST_FLAG_BLOCK_OFFLOAD        = UCS_BIT(11),
44     UCP_REQUEST_FLAG_STREAM_RECV_WAITALL  = UCS_BIT(12),
45     UCP_REQUEST_FLAG_SEND_AM              = UCS_BIT(13),
46     UCP_REQUEST_FLAG_SEND_TAG             = UCS_BIT(14),
47     UCP_REQUEST_FLAG_RNDV_FRAG            = UCS_BIT(15),
48 #if UCS_ENABLE_ASSERT
49     UCP_REQUEST_FLAG_STREAM_RECV          = UCS_BIT(16),
50     UCP_REQUEST_DEBUG_FLAG_EXTERNAL       = UCS_BIT(17)
51 #else
52     UCP_REQUEST_FLAG_STREAM_RECV          = 0,
53     UCP_REQUEST_DEBUG_FLAG_EXTERNAL       = 0
54 #endif
55 };
56 
57 
58 /**
59  * Protocols enumerator to work with send request state
60  */
61 enum {
62     UCP_REQUEST_SEND_PROTO_BCOPY_AM = 0,
63     UCP_REQUEST_SEND_PROTO_ZCOPY_AM,
64     UCP_REQUEST_SEND_PROTO_RNDV_GET,
65     UCP_REQUEST_SEND_PROTO_RNDV_PUT,
66     UCP_REQUEST_SEND_PROTO_RMA
67 };
68 
69 
70 /**
71  * Receive descriptor flags.
72  */
73 enum {
74     UCP_RECV_DESC_FLAG_UCT_DESC       = UCS_BIT(0), /* Descriptor allocated by UCT */
75     UCP_RECV_DESC_FLAG_EAGER          = UCS_BIT(1), /* Eager tag message */
76     UCP_RECV_DESC_FLAG_EAGER_ONLY     = UCS_BIT(2), /* Eager tag message with single fragment */
77     UCP_RECV_DESC_FLAG_EAGER_SYNC     = UCS_BIT(3), /* Eager tag message which requires reply */
78     UCP_RECV_DESC_FLAG_EAGER_OFFLOAD  = UCS_BIT(4), /* Eager tag from offload */
79     UCP_RECV_DESC_FLAG_EAGER_LAST     = UCS_BIT(5), /* Last fragment of eager tag message.
80                                                        Used by tag offload protocol. */
81     UCP_RECV_DESC_FLAG_RNDV           = UCS_BIT(6), /* Rendezvous request */
82     UCP_RECV_DESC_FLAG_MALLOC         = UCS_BIT(7)  /* Descriptor was allocated with malloc
83                                                        and must be freed, not returned to the
84                                                        memory pool or UCT */
85 };
86 
87 
88 /**
89  * Receive descriptor list pointers
90  */
91 enum {
92     UCP_RDESC_HASH_LIST = 0,
93     UCP_RDESC_ALL_LIST  = 1
94 };
95 
96 
97 /**
98  * Request in progress.
99  */
100 struct ucp_request {
101     ucs_status_t                  status;     /* Operation status */
102     uint32_t                      flags;      /* Request flags */
103     void                          *user_data; /* Completion user data */
104 
105     union {
106 
107         /* "send" part - used for tag_send, am_send, stream_send, put, get, and atomic
108          * operations */
109         struct {
110             ucp_ep_h                ep;
111             void                    *buffer;    /* Send buffer */
112             ucp_datatype_t          datatype;   /* Send type */
113             size_t                  length;     /* Total length, in bytes */
114             ucs_memory_type_t       mem_type;   /* Memory type */
115             ucp_send_nbx_callback_t cb;         /* Completion callback */
116 
117             union {
118                 ucp_wireup_msg_t  wireup;
119 
120                 struct {
121                     ucp_lane_index_t     am_bw_index; /* AM BW lane index */
122                     uint64_t             message_id;  /* used to identify matching parts
123                                                          of a large message */
124 
125                     struct {
126                         ucp_tag_t        tag;
127                         uintptr_t        rreq_ptr;    /* receive request ptr on the
128                                                          recv side (used in AM rndv) */
129                     } tag;
130 
131                     struct {
132                         uint16_t         am_id;
133                         unsigned         flags;
134                     } am;
135                 } msg_proto;
136 
137                 struct {
138                     uint64_t      remote_addr; /* Remote address */
139                     ucp_rkey_h    rkey;     /* Remote memory key */
140                 } rma;
141 
142                 struct {
143                     uintptr_t     remote_request; /* pointer to the send request on receiver side */
144                     ucp_request_t *sreq;       /* original send request of frag put */
145                     uint8_t       am_id;
146                     ucs_status_t  status;
147                     ucp_tag_t     sender_tag;  /* Sender tag, which is sent back in sync ack */
148                     ucp_request_callback_t comp_cb; /* Called to complete the request */
149                 } proto;
150 
151                 struct {
152                     uct_pending_req_t    *req;
153                     ucp_wireup_ep_t      *wireup_ep;
154                 } proxy;
155 
156                 struct {
157                     uint64_t             remote_address;  /* address of the sender's data buffer */
158                     uintptr_t            remote_request;  /* pointer to the sender's request */
159                     ucp_request_t        *rreq;           /* receive request on the recv side */
160                     ucp_rkey_h           rkey;            /* key for remote send buffer */
161                     ucp_lane_map_t       lanes_map_avail; /* used lanes map */
162                     ucp_lane_map_t       lanes_map_all;   /* actual lanes map */
163                     uint8_t              lanes_count;     /* actual lanes count */
164                     uint8_t              rkey_index[UCP_MAX_LANES];
165 
166                 } rndv_get;
167 
168                 struct {
169                     uint64_t             remote_address; /* address of the receiver's data buffer */
170                     uintptr_t            remote_request; /* pointer to the receiver's receive request */
171                     ucp_request_t        *sreq;          /* send request on the send side */
172                     ucp_rkey_h           rkey;           /* key for remote receive buffer */
173                     uct_rkey_t           uct_rkey;       /* UCT remote key */
174                 } rndv_put;
175 
176                 struct {
177                     ucs_queue_elem_t     queue_elem;
178                     uintptr_t            remote_request; /* pointer to the sender's request */
179                     ucp_request_t        *rreq;          /* receive request on the recv side */
180                     ucp_rkey_h           rkey;           /* key for remote send buffer */
181                 } rkey_ptr;
182 
183                 struct {
184                     uintptr_t         remote_request; /* pointer to the send request on receiver side */
185                     ucp_request_t     *rreq;          /* pointer to the receive request */
186                     size_t            length;         /* the length of the data that should be fetched
187                                                        * from sender side */
188                     size_t            offset;         /* offset in recv buffer */
189                 } rndv_rtr;
190 
191                 struct {
192                     ucp_request_callback_t flushed_cb;/* Called when flushed */
193                     ucp_request_t          *worker_req;
194                     ucs_queue_elem_t       queue;     /* Queue element in proto_status */
195                     unsigned               uct_flags; /* Flags to pass to @ref uct_ep_flush */
196                     uct_worker_cb_id_t     prog_id;   /* Progress callback ID */
197                     uint32_t               cmpl_sn;   /* Sequence number of the remote completion
198                                                          this request is waiting for */
199                     uint8_t                sw_started;
200                     uint8_t                sw_done;
201                     uint8_t                num_lanes; /* How many lanes are being flushed */
202                     ucp_lane_map_t         started_lanes;/* Which lanes need were flushed */
203                 } flush;
204 
205                 struct {
206                     uct_worker_cb_id_t        prog_id;/* Slow-path callback */
207                 } disconnect;
208 
209                 struct {
210                     uint64_t              remote_addr; /* Remote address */
211                     ucp_rkey_h            rkey;        /* Remote memory key */
212                     uint64_t              value;       /* Atomic argument */
213                     uct_atomic_op_t       uct_op;      /* Requested UCT AMO */
214                 } amo;
215 
216                 struct {
217                     ucs_queue_elem_t  queue;     /* Elem in outgoing ssend reqs queue */
218                     ucp_tag_t         ssend_tag; /* Tag in offload sync send */
219                     void              *rndv_op;  /* Handler of issued rndv send. Need to cancel
220                                                     the operation if it is completed by SW. */
221                 } tag_offload;
222 
223                 struct {
224                     uintptr_t              req;  /* Remote get request pointer */
225                 } get_reply;
226 
227                 struct {
228                     uintptr_t              req;  /* Remote atomic request pointer */
229                     ucp_atomic_reply_t     data; /* Atomic reply data */
230                 } atomic_reply;
231             };
232 
233             /* This structure holds all mutable fields, and everything else
234              * except common send/recv fields 'status' and 'flags' is
235              * immutable
236              * TODO: rework RMA case where length is used instead of dt.offset */
237             struct {
238                 ucp_dt_state_t    dt;       /* Position in the send buffer */
239                 uct_completion_t  uct_comp; /* UCT completion */
240             } state;
241 
242             ucp_lane_index_t      pending_lane; /* Lane on which request was moved
243                                                  * to pending state */
244             ucp_lane_index_t      lane;     /* Lane on which this request is being sent */
245             uct_pending_req_t     uct;      /* UCT pending request */
246             ucp_mem_desc_t        *mdesc;
247         } send;
248 
249         /* "receive" part - used for tag_recv and stream_recv operations */
250         struct {
251             ucs_queue_elem_t      queue;    /* Expected queue element */
252             void                  *buffer;  /* Buffer to receive data to */
253             ucp_datatype_t        datatype; /* Receive type */
254             size_t                length;   /* Total length, in bytes */
255             ucs_memory_type_t     mem_type; /* Memory type */
256             ucp_dt_state_t        state;
257             ucp_worker_t          *worker;
258             uct_tag_context_t     uct_ctx;  /* Transport offload context */
259 
260             union {
261                 struct {
262                     ucp_tag_t                   tag;        /* Expected tag */
263                     ucp_tag_t                   tag_mask;   /* Expected tag mask */
264                     uint64_t                    sn;         /* Tag match sequence */
265                     ucp_tag_recv_nbx_callback_t cb;         /* Completion callback */
266                     ucp_tag_recv_info_t         info;       /* Completion info to fill */
267                     ssize_t                     remaining;  /* How much more data
268                                                              * to be received */
269 
270                     /* Can use union, because rdesc is used in expected flow,
271                      * while non_contig_buf is used in unexpected flow only. */
272                     union {
273                         ucp_mem_desc_t      *rdesc;   /* Offload bounce buffer */
274                         void                *non_contig_buf; /* Used for assembling
275                                                                 multi-fragment
276                                                                 non-contig unexpected
277                                                                 message in tag offload flow. */
278                     };
279                     ucp_worker_iface_t      *wiface;    /* Cached iface this request
280                                                            is received on. Used in
281                                                            tag offload expected callbacks*/
282                 } tag;
283 
284                 struct {
285                     ucp_request_t           *rreq;    /* recv request on recv side */
286                     size_t                  offset;   /* offset in recv buffer */
287                 } frag;
288 
289                 struct {
290                     ucp_stream_recv_nbx_callback_t cb;     /* Completion callback */
291                     size_t                         offset; /* Receive data offset */
292                     size_t                         length; /* Completion info to fill */
293                 } stream;
294             };
295         } recv;
296 
297         struct {
298             ucp_worker_h            worker;     /* Worker to flush */
299             ucp_send_nbx_callback_t cb;         /* Completion callback */
300             uct_worker_cb_id_t      prog_id;    /* Progress callback ID */
301             int                     comp_count; /* Countdown to request completion */
302             ucp_ep_ext_gen_t        *next_ep;   /* Next endpoint to flush */
303         } flush_worker;
304     };
305 };
306 
307 
308 /**
309  * Unexpected receive descriptor. If it is initialized in the headroom of UCT
310  * descriptor, the layout looks like the following:
311  *
312  *
313  * headroom                                    data
314  * |-------------------------------------------|-------------------------|
315  * | unused | ucp_recv_desc |      priv_length |                         |
316  * |        |               |                  |                         |
317  * |-------------------------------------------|-------------------------|
318  *
319  * Some protocols (i. e. tag offload) may need some space right before the
320  * incoming data to add specific headers needed for further message processing.
321  * Note: priv_length value should be in [0, UCP_WORKER_HEADROOM_PRIV_SIZE] range.
322  */
323 struct ucp_recv_desc {
324     union {
325         ucs_list_link_t     tag_list[2];     /* Hash list TAG-element */
326         ucs_queue_elem_t    stream_queue;    /* Queue STREAM-element */
327         ucs_queue_elem_t    tag_frag_queue;  /* Tag fragments queue */
328         ucp_am_first_desc_t am_first;        /* AM first fragment data needed
329                                                 for assembling the message */
330         ucs_queue_elem_t    am_mid_queue;    /* AM middle fragments queue */
331     };
332     uint32_t                length;          /* Received length */
333     uint32_t                payload_offset;  /* Offset from end of the descriptor
334                                               * to AM data */
335     uint16_t                flags;           /* Flags */
336     int16_t                 uct_desc_offset; /* Offset which needs to be
337                                                 substructed from rdesc when
338                                                 releasing it back to UCT */
339 };
340 
341 
342 /**
343  * Defines protocol functions for ucp_request_send_start() function.
344  * TODO will be removed when switching to new protocols implementation.
345  */
346 struct ucp_request_send_proto {
347     uct_pending_callback_t     contig_short;     /**< Progress short data */
348     uct_pending_callback_t     bcopy_single;     /**< Progress bcopy single fragment */
349     uct_pending_callback_t     bcopy_multi;      /**< Progress bcopy multi-fragment */
350     uct_pending_callback_t     zcopy_single;     /**< Progress zcopy single fragment */
351     uct_pending_callback_t     zcopy_multi;      /**< Progress zcopy multi-fragment */
352     uct_completion_callback_t  zcopy_completion; /**< Callback for UCT zcopy completion */
353     size_t                     only_hdr_size;    /**< Header size for single / short */
354 };
355 
356 
357 extern ucs_mpool_ops_t ucp_request_mpool_ops;
358 extern ucs_mpool_ops_t ucp_rndv_get_mpool_ops;
359 extern const ucp_request_param_t ucp_request_null_param;
360 
361 
362 int ucp_request_pending_add(ucp_request_t *req, ucs_status_t *req_status,
363                             unsigned pending_flags);
364 
365 ucs_status_t ucp_request_memory_reg(ucp_context_t *context, ucp_md_map_t md_map,
366                                     void *buffer, size_t length, ucp_datatype_t datatype,
367                                     ucp_dt_state_t *state, ucs_memory_type_t mem_type,
368                                     ucp_request_t *req_dbg, unsigned uct_flags);
369 
370 void ucp_request_memory_dereg(ucp_context_t *context, ucp_datatype_t datatype,
371                               ucp_dt_state_t *state, ucp_request_t *req_dbg);
372 
373 ucs_status_t ucp_request_send_start(ucp_request_t *req, ssize_t max_short,
374                                     size_t zcopy_thresh, size_t zcopy_max,
375                                     size_t dt_count,
376                                     const ucp_ep_msg_config_t* msg_config,
377                                     const ucp_request_send_proto_t *proto);
378 
379 /* Fast-forward to data end */
380 void ucp_request_send_state_ff(ucp_request_t *req, ucs_status_t status);
381 
382 ucs_status_t ucp_request_recv_msg_truncated(ucp_request_t *req, size_t length,
383                                             size_t offset);
384 
385 #endif
386