1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2012      Sandia National Laboratories.  All rights reserved.
4  * Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights
5  *                         reserved.
6  * $COPYRIGHT$
7  *
8  * Additional copyrights may follow
9  *
10  * $HEADER$
11  */
12 
13 #ifndef OSC_PT2PT_FRAG_H
14 #define OSC_PT2PT_FRAG_H
15 
16 #include "ompi/communicator/communicator.h"
17 
18 #include "osc_pt2pt_header.h"
19 #include "osc_pt2pt_request.h"
20 #include "opal/align.h"
21 
22 /** Communication buffer for packing messages */
23 struct ompi_osc_pt2pt_frag_t {
24     opal_free_list_item_t super;
25     /* target rank of buffer */
26     int target;
27     unsigned char *buffer;
28 
29     /* space remaining in buffer */
30     size_t remain_len;
31 
32     /* start of unused space */
33     char *top;
34 
35     /* Number of operations which have started writing into the frag, but not yet completed doing so */
36     volatile int32_t pending;
37     int32_t pending_long_sends;
38     ompi_osc_pt2pt_frag_header_t *header;
39     ompi_osc_pt2pt_module_t *module;
40 };
41 typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
42 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
43 
44 int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45 int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46 int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
47 int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target);
48 int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module);
49 
ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t * module,ompi_osc_pt2pt_frag_t * buffer)50 static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
51                                               ompi_osc_pt2pt_frag_t* buffer)
52 {
53     opal_atomic_wmb ();
54     if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
55         opal_atomic_mb ();
56         return ompi_osc_pt2pt_frag_start(module, buffer);
57     }
58 
59     return OMPI_SUCCESS;
60 }
61 
ompi_osc_pt2pt_frag_alloc_non_buffered(ompi_osc_pt2pt_module_t * module,ompi_osc_pt2pt_peer_t * peer,size_t request_len)62 static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module,
63                                                                              ompi_osc_pt2pt_peer_t *peer,
64                                                                              size_t request_len)
65 {
66     ompi_osc_pt2pt_frag_t *curr;
67 
68     /* to ensure ordering flush the buffer on the peer */
69     curr = peer->active_frag;
70     if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
71         /* If there's something pending, the pending finish will
72            start the buffer.  Otherwise, we need to start it now. */
73         int ret = ompi_osc_pt2pt_frag_finish (module, curr);
74         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
75             return NULL;
76         }
77     }
78 
79     curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
80     if (OPAL_UNLIKELY(NULL == curr)) {
81         return NULL;
82     }
83 
84     curr->target = peer->rank;
85 
86     curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
87     curr->top = (char*) (curr->header + 1);
88     curr->remain_len = mca_osc_pt2pt_component.buffer_size;
89     curr->module = module;
90     curr->pending = 1;
91 
92     curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
93     curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
94     if (module->passive_target_access_epoch) {
95         curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
96     }
97     curr->header->source = ompi_comm_rank(module->comm);
98     curr->header->num_ops = 1;
99 
100     return curr;
101 }
102 
103 /*
104  * Note: this function takes the module lock
105  *
106  * buffered sends will cache the fragment on the peer object associated with the
107  * target. unbuffered-sends will cause the target fragment to be flushed and
108  * will not be cached on the peer. this causes the fragment to be flushed as
109  * soon as it is sent. this allows request-based rma fragments to be completed
110  * so MPI_Test/MPI_Wait/etc will work as expected.
111  */
_ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t * module,int target,size_t request_len,ompi_osc_pt2pt_frag_t ** buffer,char ** ptr,bool long_send,bool buffered)112 static inline int _ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
113                                              size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
114                                              char **ptr, bool long_send, bool buffered)
115 {
116     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
117     ompi_osc_pt2pt_frag_t *curr;
118 
119     /* osc pt2pt headers can have 64-bit values. these will need to be aligned
120      * on an 8-byte boundary on some architectures so we up align the allocation
121      * size here. */
122     request_len = OPAL_ALIGN(request_len, 8, size_t);
123 
124     if (request_len > mca_osc_pt2pt_component.buffer_size) {
125         return OMPI_ERR_OUT_OF_RESOURCE;
126     }
127 
128     OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output,
129                          "attempting to allocate buffer for %lu bytes to target %d. long send: %d, "
130                          "buffered: %d", (unsigned long) request_len, target, long_send, buffered));
131 
132     OPAL_THREAD_LOCK(&module->lock);
133     if (buffered) {
134         curr = peer->active_frag;
135         if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
136             curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
137             if (OPAL_UNLIKELY(NULL == curr)) {
138                 OPAL_THREAD_UNLOCK(&module->lock);
139                 return OMPI_ERR_OUT_OF_RESOURCE;
140             }
141 
142             curr->pending_long_sends = long_send;
143             peer->active_frag = curr;
144         } else {
145             OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
146             curr->pending_long_sends += long_send;
147         }
148 
149         OPAL_THREAD_ADD32(&curr->pending, 1);
150     } else {
151         curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
152         if (OPAL_UNLIKELY(NULL == curr)) {
153             OPAL_THREAD_UNLOCK(&module->lock);
154             return OMPI_ERR_OUT_OF_RESOURCE;
155         }
156     }
157 
158     *ptr = curr->top;
159     *buffer = curr;
160 
161     curr->top += request_len;
162     curr->remain_len -= request_len;
163 
164     OPAL_THREAD_UNLOCK(&module->lock);
165 
166     return OMPI_SUCCESS;
167 }
168 
ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t * module,int target,size_t request_len,ompi_osc_pt2pt_frag_t ** buffer,char ** ptr,bool long_send,bool buffered)169 static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
170                                              size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
171                                              char **ptr, bool long_send, bool buffered)
172 {
173     int ret;
174 
175     if (request_len > mca_osc_pt2pt_component.buffer_size) {
176         return OMPI_ERR_OUT_OF_RESOURCE;
177     }
178 
179     do {
180         ret = _ompi_osc_pt2pt_frag_alloc (module, target, request_len , buffer, ptr, long_send, buffered);
181         if (OPAL_LIKELY(OMPI_SUCCESS == ret || OMPI_ERR_OUT_OF_RESOURCE != ret)) {
182             break;
183         }
184 
185         ompi_osc_pt2pt_frag_flush_pending_all (module);
186         opal_progress ();
187     } while (1);
188 
189     return ret;
190 }
191 
192 #endif
193