1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
4 * Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights
5 * reserved.
6 * $COPYRIGHT$
7 *
8 * Additional copyrights may follow
9 *
10 * $HEADER$
11 */
12
13 #ifndef OSC_PT2PT_FRAG_H
14 #define OSC_PT2PT_FRAG_H
15
16 #include "ompi/communicator/communicator.h"
17
18 #include "osc_pt2pt_header.h"
19 #include "osc_pt2pt_request.h"
20 #include "opal/align.h"
21
22 /** Communication buffer for packing messages */
23 struct ompi_osc_pt2pt_frag_t {
24 opal_free_list_item_t super;
25 /* target rank of buffer */
26 int target;
27 unsigned char *buffer;
28
29 /* space remaining in buffer */
30 size_t remain_len;
31
32 /* start of unused space */
33 char *top;
34
35 /* Number of operations which have started writing into the frag, but not yet completed doing so */
36 volatile int32_t pending;
37 int32_t pending_long_sends;
38 ompi_osc_pt2pt_frag_header_t *header;
39 ompi_osc_pt2pt_module_t *module;
40 };
41 typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
42 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
43
44 int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45 int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46 int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
47 int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target);
48 int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module);
49
ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t * module,ompi_osc_pt2pt_frag_t * buffer)50 static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
51 ompi_osc_pt2pt_frag_t* buffer)
52 {
53 opal_atomic_wmb ();
54 if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
55 opal_atomic_mb ();
56 return ompi_osc_pt2pt_frag_start(module, buffer);
57 }
58
59 return OMPI_SUCCESS;
60 }
61
ompi_osc_pt2pt_frag_alloc_non_buffered(ompi_osc_pt2pt_module_t * module,ompi_osc_pt2pt_peer_t * peer,size_t request_len)62 static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module,
63 ompi_osc_pt2pt_peer_t *peer,
64 size_t request_len)
65 {
66 ompi_osc_pt2pt_frag_t *curr;
67
68 /* to ensure ordering flush the buffer on the peer */
69 curr = peer->active_frag;
70 if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
71 /* If there's something pending, the pending finish will
72 start the buffer. Otherwise, we need to start it now. */
73 int ret = ompi_osc_pt2pt_frag_finish (module, curr);
74 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
75 return NULL;
76 }
77 }
78
79 curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
80 if (OPAL_UNLIKELY(NULL == curr)) {
81 return NULL;
82 }
83
84 curr->target = peer->rank;
85
86 curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
87 curr->top = (char*) (curr->header + 1);
88 curr->remain_len = mca_osc_pt2pt_component.buffer_size;
89 curr->module = module;
90 curr->pending = 1;
91
92 curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
93 curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
94 if (module->passive_target_access_epoch) {
95 curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
96 }
97 curr->header->source = ompi_comm_rank(module->comm);
98 curr->header->num_ops = 1;
99
100 return curr;
101 }
102
103 /*
104 * Note: this function takes the module lock
105 *
106 * buffered sends will cache the fragment on the peer object associated with the
107 * target. unbuffered-sends will cause the target fragment to be flushed and
108 * will not be cached on the peer. this causes the fragment to be flushed as
109 * soon as it is sent. this allows request-based rma fragments to be completed
110 * so MPI_Test/MPI_Wait/etc will work as expected.
111 */
_ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t * module,int target,size_t request_len,ompi_osc_pt2pt_frag_t ** buffer,char ** ptr,bool long_send,bool buffered)112 static inline int _ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
113 size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
114 char **ptr, bool long_send, bool buffered)
115 {
116 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
117 ompi_osc_pt2pt_frag_t *curr;
118
119 /* osc pt2pt headers can have 64-bit values. these will need to be aligned
120 * on an 8-byte boundary on some architectures so we up align the allocation
121 * size here. */
122 request_len = OPAL_ALIGN(request_len, 8, size_t);
123
124 if (request_len > mca_osc_pt2pt_component.buffer_size) {
125 return OMPI_ERR_OUT_OF_RESOURCE;
126 }
127
128 OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output,
129 "attempting to allocate buffer for %lu bytes to target %d. long send: %d, "
130 "buffered: %d", (unsigned long) request_len, target, long_send, buffered));
131
132 OPAL_THREAD_LOCK(&module->lock);
133 if (buffered) {
134 curr = peer->active_frag;
135 if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
136 curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
137 if (OPAL_UNLIKELY(NULL == curr)) {
138 OPAL_THREAD_UNLOCK(&module->lock);
139 return OMPI_ERR_OUT_OF_RESOURCE;
140 }
141
142 curr->pending_long_sends = long_send;
143 peer->active_frag = curr;
144 } else {
145 OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
146 curr->pending_long_sends += long_send;
147 }
148
149 OPAL_THREAD_ADD32(&curr->pending, 1);
150 } else {
151 curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
152 if (OPAL_UNLIKELY(NULL == curr)) {
153 OPAL_THREAD_UNLOCK(&module->lock);
154 return OMPI_ERR_OUT_OF_RESOURCE;
155 }
156 }
157
158 *ptr = curr->top;
159 *buffer = curr;
160
161 curr->top += request_len;
162 curr->remain_len -= request_len;
163
164 OPAL_THREAD_UNLOCK(&module->lock);
165
166 return OMPI_SUCCESS;
167 }
168
ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t * module,int target,size_t request_len,ompi_osc_pt2pt_frag_t ** buffer,char ** ptr,bool long_send,bool buffered)169 static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
170 size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
171 char **ptr, bool long_send, bool buffered)
172 {
173 int ret;
174
175 if (request_len > mca_osc_pt2pt_component.buffer_size) {
176 return OMPI_ERR_OUT_OF_RESOURCE;
177 }
178
179 do {
180 ret = _ompi_osc_pt2pt_frag_alloc (module, target, request_len , buffer, ptr, long_send, buffered);
181 if (OPAL_LIKELY(OMPI_SUCCESS == ret || OMPI_ERR_OUT_OF_RESOURCE != ret)) {
182 break;
183 }
184
185 ompi_osc_pt2pt_frag_flush_pending_all (module);
186 opal_progress ();
187 } while (1);
188
189 return ret;
190 }
191
192 #endif
193