1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #include "ucp_test.h"
9
10 #include <common/test_helpers.h>
11
12 #if _OPENMP
13 #include "omp.h"
14 #endif
15
16 using namespace ucs; /* For vector<char> serialization */
17
18 class test_ucp_rma_mt : public ucp_test {
19 public:
get_ctx_params()20 static ucp_params_t get_ctx_params() {
21 ucp_params_t params = ucp_test::get_ctx_params();
22 params.features = UCP_FEATURE_RMA;
23 return params;
24 }
25
init()26 void init()
27 {
28 ucp_test::init();
29 sender().connect(&receiver(), get_ep_params());
30 for (int i = 0; i < sender().get_num_workers(); i++) {
31 /* avoid deadlock for blocking rma */
32 flush_worker(sender(), i);
33 }
34 }
35
send_cb(void * req,ucs_status_t status)36 static void send_cb(void *req, ucs_status_t status)
37 {
38 }
39
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)40 static std::vector<ucp_test_param> enum_test_params(const ucp_params_t& ctx_params,
41 const std::string& name,
42 const std::string& test_case_name,
43 const std::string& tls)
44 {
45 std::vector<ucp_test_param> result;
46
47 generate_test_params_variant(ctx_params, name, test_case_name, tls, 0,
48 result, MULTI_THREAD_CONTEXT);
49 generate_test_params_variant(ctx_params, name, test_case_name, tls, 0,
50 result, MULTI_THREAD_WORKER);
51 return result;
52 }
53 };
54
UCS_TEST_P(test_ucp_rma_mt,put_get)55 UCS_TEST_P(test_ucp_rma_mt, put_get) {
56 ucs_status_t st;
57 uint64_t orig_data[MT_TEST_NUM_THREADS] GTEST_ATTRIBUTE_UNUSED_;
58 uint64_t target_data[MT_TEST_NUM_THREADS] GTEST_ATTRIBUTE_UNUSED_;
59
60 ucp_mem_map_params_t params;
61 ucp_mem_h memh;
62 void *memheap = target_data;
63
64 params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
65 UCP_MEM_MAP_PARAM_FIELD_LENGTH |
66 UCP_MEM_MAP_PARAM_FIELD_FLAGS;
67 params.address = memheap;
68 params.length = sizeof(uint64_t) * MT_TEST_NUM_THREADS;
69 params.flags = GetParam().variant;
70
71 st = ucp_mem_map(receiver().ucph(), ¶ms, &memh);
72 ASSERT_UCS_OK(st);
73
74 void *rkey_buffer;
75 size_t rkey_buffer_size;
76
77 st = ucp_rkey_pack(receiver().ucph(), memh, &rkey_buffer, &rkey_buffer_size);
78 ASSERT_UCS_OK(st);
79
80 std::vector<ucp_rkey_h> rkey;
81 rkey.resize(MT_TEST_NUM_THREADS);
82
83 /* test parallel rkey unpack */
84 #if _OPENMP && ENABLE_MT
85 #pragma omp parallel for
86 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
87 int worker_index = 0;
88 if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
89 worker_index = i;
90 }
91 ucs_status_t status = ucp_ep_rkey_unpack(sender().ep(worker_index),
92 rkey_buffer, &rkey[i]);
93 ASSERT_UCS_OK(status);
94 }
95 #endif
96
97 ucp_rkey_buffer_release(rkey_buffer);
98
99 /* test blocking PUT */
100
101 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
102 orig_data[i] = 0xdeadbeefdeadbeef + 10 * i;
103 target_data[i] = 0;
104 }
105
106 #if _OPENMP && ENABLE_MT
107 #pragma omp parallel for
108 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
109 int worker_index = 0;
110
111 if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
112 worker_index = i;
113 }
114
115 void* req = ucp_put_nb(sender().ep(worker_index), &orig_data[i],
116 sizeof(uint64_t), (uintptr_t)((uint64_t*)memheap + i),
117 rkey[i], send_cb);
118 wait(req, worker_index);
119
120 flush_worker(sender(), worker_index);
121
122 EXPECT_EQ(orig_data[i], target_data[i]);
123 }
124 #endif
125
126 /* test nonblocking PUT */
127
128 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
129 orig_data[i] = 0xdeadbeefdeadbeef + 10 * i;
130 target_data[i] = 0;
131 }
132
133 #if _OPENMP && ENABLE_MT
134 #pragma omp parallel for
135 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
136 ucs_status_t status;
137 int worker_index = 0;
138
139 if (GetParam().thread_type == MULTI_THREAD_CONTEXT)
140 worker_index = i;
141
142 status = ucp_put_nbi(sender().ep(worker_index), &orig_data[i], sizeof(uint64_t),
143 (uintptr_t)((uint64_t*)memheap + i), rkey[i]);
144 ASSERT_UCS_OK_OR_INPROGRESS(status);
145
146 flush_worker(sender(), worker_index);
147
148 EXPECT_EQ(orig_data[i], target_data[i]);
149 }
150 #endif
151
152 /* test blocking GET */
153
154 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
155 orig_data[i] = 0;
156 target_data[i] = 0xdeadbeefdeadbeef + 10 * i;
157 }
158
159 #if _OPENMP && ENABLE_MT
160 #pragma omp parallel for
161 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
162 int worker_index = 0;
163
164 if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
165 worker_index = i;
166 }
167
168 void *req = ucp_get_nb(sender().ep(worker_index), &orig_data[i],
169 sizeof(uint64_t), (uintptr_t)((uint64_t*)memheap + i),
170 rkey[i], send_cb);
171 wait(req, worker_index);
172
173 flush_worker(sender(), worker_index);
174
175 EXPECT_EQ(orig_data[i], target_data[i]);
176 }
177 #endif
178
179 /* test nonblocking GET */
180
181 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
182 orig_data[i] = 0;
183 target_data[i] = 0xdeadbeefdeadbeef + 10 * i;
184 }
185
186 #if _OPENMP && ENABLE_MT
187 #pragma omp parallel for
188 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
189 ucs_status_t status;
190 int worker_index = 0;
191
192 if (GetParam().thread_type == MULTI_THREAD_CONTEXT)
193 worker_index = i;
194
195 status = ucp_get_nbi(sender().ep(worker_index), &orig_data[i], sizeof(uint64_t),
196 (uintptr_t)((uint64_t *)memheap + i), rkey[i]);
197 ASSERT_UCS_OK_OR_INPROGRESS(status);
198
199 flush_worker(sender(), worker_index);
200
201 EXPECT_EQ(orig_data[i], target_data[i]);
202 }
203 #endif
204
205 #if _OPENMP && ENABLE_MT
206 #pragma omp parallel for
207 for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
208 ucp_rkey_destroy(rkey[i]);
209 }
210 #endif
211
212 st = ucp_mem_unmap(receiver().ucph(), memh);
213 ASSERT_UCS_OK(st);
214 }
215
216 UCP_INSTANTIATE_TEST_CASE(test_ucp_rma_mt)
217