1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
3 * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #include "ucp_test.h"
9 
10 #include <common/test_helpers.h>
11 
12 #if _OPENMP
13 #include "omp.h"
14 #endif
15 
16 using namespace ucs; /* For vector<char> serialization */
17 
18 class test_ucp_rma_mt : public ucp_test {
19 public:
get_ctx_params()20     static ucp_params_t get_ctx_params() {
21         ucp_params_t params = ucp_test::get_ctx_params();
22         params.features     = UCP_FEATURE_RMA;
23         return params;
24     }
25 
init()26     void init()
27     {
28         ucp_test::init();
29         sender().connect(&receiver(), get_ep_params());
30         for (int i = 0; i < sender().get_num_workers(); i++) {
31             /* avoid deadlock for blocking rma */
32             flush_worker(sender(), i);
33         }
34     }
35 
send_cb(void * req,ucs_status_t status)36     static void send_cb(void *req, ucs_status_t status)
37     {
38     }
39 
enum_test_params(const ucp_params_t & ctx_params,const std::string & name,const std::string & test_case_name,const std::string & tls)40     static std::vector<ucp_test_param> enum_test_params(const ucp_params_t& ctx_params,
41                                                         const std::string& name,
42                                                         const std::string& test_case_name,
43                                                         const std::string& tls)
44     {
45         std::vector<ucp_test_param> result;
46 
47         generate_test_params_variant(ctx_params, name, test_case_name, tls, 0,
48                                      result, MULTI_THREAD_CONTEXT);
49         generate_test_params_variant(ctx_params, name, test_case_name, tls, 0,
50                                      result, MULTI_THREAD_WORKER);
51         return result;
52     }
53 };
54 
UCS_TEST_P(test_ucp_rma_mt,put_get)55 UCS_TEST_P(test_ucp_rma_mt, put_get) {
56     ucs_status_t st;
57     uint64_t orig_data[MT_TEST_NUM_THREADS] GTEST_ATTRIBUTE_UNUSED_;
58     uint64_t target_data[MT_TEST_NUM_THREADS] GTEST_ATTRIBUTE_UNUSED_;
59 
60     ucp_mem_map_params_t params;
61     ucp_mem_h memh;
62     void *memheap = target_data;
63 
64     params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
65                         UCP_MEM_MAP_PARAM_FIELD_LENGTH |
66                         UCP_MEM_MAP_PARAM_FIELD_FLAGS;
67     params.address    = memheap;
68     params.length     = sizeof(uint64_t) * MT_TEST_NUM_THREADS;
69     params.flags      = GetParam().variant;
70 
71     st = ucp_mem_map(receiver().ucph(), &params, &memh);
72     ASSERT_UCS_OK(st);
73 
74     void *rkey_buffer;
75     size_t rkey_buffer_size;
76 
77     st = ucp_rkey_pack(receiver().ucph(), memh, &rkey_buffer, &rkey_buffer_size);
78     ASSERT_UCS_OK(st);
79 
80     std::vector<ucp_rkey_h> rkey;
81     rkey.resize(MT_TEST_NUM_THREADS);
82 
83     /* test parallel rkey unpack */
84 #if _OPENMP && ENABLE_MT
85 #pragma omp parallel for
86     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
87         int worker_index = 0;
88         if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
89             worker_index = i;
90         }
91         ucs_status_t status = ucp_ep_rkey_unpack(sender().ep(worker_index),
92                                                  rkey_buffer, &rkey[i]);
93         ASSERT_UCS_OK(status);
94     }
95 #endif
96 
97     ucp_rkey_buffer_release(rkey_buffer);
98 
99     /* test blocking PUT */
100 
101     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
102         orig_data[i] = 0xdeadbeefdeadbeef + 10 * i;
103         target_data[i] = 0;
104     }
105 
106 #if _OPENMP && ENABLE_MT
107 #pragma omp parallel for
108     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
109         int worker_index = 0;
110 
111         if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
112             worker_index = i;
113         }
114 
115         void* req = ucp_put_nb(sender().ep(worker_index), &orig_data[i],
116                                sizeof(uint64_t), (uintptr_t)((uint64_t*)memheap + i),
117                                rkey[i], send_cb);
118         wait(req, worker_index);
119 
120         flush_worker(sender(), worker_index);
121 
122         EXPECT_EQ(orig_data[i], target_data[i]);
123     }
124 #endif
125 
126     /* test nonblocking PUT */
127 
128     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
129         orig_data[i] = 0xdeadbeefdeadbeef + 10 * i;
130         target_data[i] = 0;
131     }
132 
133 #if _OPENMP && ENABLE_MT
134 #pragma omp parallel for
135     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
136         ucs_status_t status;
137         int worker_index = 0;
138 
139         if (GetParam().thread_type == MULTI_THREAD_CONTEXT)
140             worker_index = i;
141 
142         status = ucp_put_nbi(sender().ep(worker_index), &orig_data[i], sizeof(uint64_t),
143                              (uintptr_t)((uint64_t*)memheap + i), rkey[i]);
144         ASSERT_UCS_OK_OR_INPROGRESS(status);
145 
146         flush_worker(sender(), worker_index);
147 
148         EXPECT_EQ(orig_data[i], target_data[i]);
149     }
150 #endif
151 
152     /* test blocking GET */
153 
154     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
155         orig_data[i] = 0;
156         target_data[i] = 0xdeadbeefdeadbeef + 10 * i;
157     }
158 
159 #if _OPENMP && ENABLE_MT
160 #pragma omp parallel for
161     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
162         int worker_index = 0;
163 
164         if (GetParam().thread_type == MULTI_THREAD_CONTEXT) {
165             worker_index = i;
166         }
167 
168         void *req = ucp_get_nb(sender().ep(worker_index), &orig_data[i],
169                                sizeof(uint64_t), (uintptr_t)((uint64_t*)memheap + i),
170                                rkey[i], send_cb);
171         wait(req, worker_index);
172 
173         flush_worker(sender(), worker_index);
174 
175         EXPECT_EQ(orig_data[i], target_data[i]);
176     }
177 #endif
178 
179     /* test nonblocking GET */
180 
181     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
182         orig_data[i] = 0;
183         target_data[i] = 0xdeadbeefdeadbeef + 10 * i;
184     }
185 
186 #if _OPENMP && ENABLE_MT
187 #pragma omp parallel for
188     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
189         ucs_status_t status;
190         int worker_index = 0;
191 
192         if (GetParam().thread_type == MULTI_THREAD_CONTEXT)
193             worker_index = i;
194 
195         status = ucp_get_nbi(sender().ep(worker_index), &orig_data[i], sizeof(uint64_t),
196                              (uintptr_t)((uint64_t *)memheap + i), rkey[i]);
197         ASSERT_UCS_OK_OR_INPROGRESS(status);
198 
199         flush_worker(sender(), worker_index);
200 
201         EXPECT_EQ(orig_data[i], target_data[i]);
202     }
203 #endif
204 
205 #if _OPENMP && ENABLE_MT
206 #pragma omp parallel for
207     for (int i = 0; i < MT_TEST_NUM_THREADS; i++) {
208         ucp_rkey_destroy(rkey[i]);
209     }
210 #endif
211 
212     st = ucp_mem_unmap(receiver().ucph(), memh);
213     ASSERT_UCS_OK(st);
214 }
215 
216 UCP_INSTANTIATE_TEST_CASE(test_ucp_rma_mt)
217