1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include "test_ucp_atomic.h"
8 #include "common/gtest.h"
9
10 class test_ucp_fence : public test_ucp_atomic {
11 public:
12 typedef void (test_ucp_fence::* send_func_t)(entity *e, uint64_t *initial_buf,
13 uint64_t *result_buf, void *memheap_addr,
14 ucp_rkey_h rkey);
15
send_cb(void * request,ucs_status_t status)16 static void send_cb(void *request, ucs_status_t status)
17 {
18 }
19
20 template <typename T>
blocking_add(entity * e,uint64_t * initial_buf,uint64_t * result_buf,void * memheap_addr,ucp_rkey_h rkey)21 void blocking_add(entity *e, uint64_t *initial_buf, uint64_t *result_buf,
22 void *memheap_addr, ucp_rkey_h rkey) {
23 ucs_status_t status = ucp_atomic_post(e->ep(), UCP_ATOMIC_POST_OP_ADD,
24 *initial_buf, sizeof(T),
25 (uintptr_t)memheap_addr, rkey);
26 ASSERT_UCS_OK(status);
27 }
28
29 template <typename T>
blocking_fadd(entity * e,uint64_t * initial_buf,uint64_t * result_buf,void * memheap_addr,ucp_rkey_h rkey)30 void blocking_fadd(entity *e, uint64_t *initial_buf, uint64_t *result_buf,
31 void *memheap_addr, ucp_rkey_h rkey)
32 {
33 void *request = ucp_atomic_fetch_nb(e->ep(), UCP_ATOMIC_FETCH_OP_FADD,
34 *initial_buf, (T*)result_buf, sizeof(T),
35 (uintptr_t)memheap_addr, rkey, send_cb);
36 wait(request);
37 }
38
39 template <typename T, typename F>
test(F f1,F f2)40 void test(F f1, F f2) {
41 test_fence(static_cast<send_func_t>(f1),
42 static_cast<send_func_t>(f2), sizeof(T));
43 }
44
45 class worker {
46 public:
worker(test_ucp_fence * test,send_func_t send1,send_func_t send2,entity * entity,ucp_rkey_h rkey,void * memheap_ptr,uint64_t initial_value,uint32_t * error)47 worker(test_ucp_fence* test, send_func_t send1, send_func_t send2,
48 entity* entity, ucp_rkey_h rkey, void *memheap_ptr,
49 uint64_t initial_value, uint32_t* error):
50 test(test), value(initial_value), result(0), error(error),
51 running(true), m_rkey(rkey), m_memheap(memheap_ptr),
52 m_send_1(send1), m_send_2(send2), m_entity(entity) {
53 pthread_create(&m_thread, NULL, run, reinterpret_cast<void*>(this));
54 }
55
~worker()56 ~worker() {
57 assert(!running);
58 }
59
run(void * arg)60 static void *run(void *arg) {
61 worker *self = reinterpret_cast<worker*>(arg);
62 self->run();
63 return NULL;
64 }
65
join()66 void join() {
67 void *retval;
68 pthread_join(m_thread, &retval);
69 running = false;
70 }
71
72 test_ucp_fence* const test;
73 uint64_t value, result;
74 uint32_t* error;
75 bool running;
76
77 private:
run()78 void run() {
79 uint64_t zero = 0;
80
81 for (int i = 0; i < 500 / ucs::test_time_multiplier(); i++) {
82 (test->*m_send_1)(m_entity, &value, &result,
83 m_memheap, m_rkey);
84
85 m_entity->fence();
86
87 (test->*m_send_2)(m_entity, &zero, &result,
88 m_memheap, m_rkey);
89
90 test->flush_worker(*m_entity);
91
92 if (result != (uint64_t)(i+1))
93 (*error)++;
94
95 result = 0; /* reset for the next loop */
96 }
97 }
98
99 ucp_rkey_h m_rkey;
100 void *m_memheap;
101 send_func_t m_send_1, m_send_2;
102 entity* m_entity;
103 pthread_t m_thread;
104 };
105
run_workers(send_func_t send1,send_func_t send2,entity * sender,ucp_rkey_h rkey,void * memheap_ptr,uint64_t initial_value,uint32_t * error)106 void run_workers(send_func_t send1, send_func_t send2, entity* sender,
107 ucp_rkey_h rkey, void *memheap_ptr,
108 uint64_t initial_value, uint32_t* error) {
109 ucs::ptr_vector<worker> m_workers;
110 m_workers.clear();
111 m_workers.push_back(new worker(this, send1, send2, sender, rkey,
112 memheap_ptr, initial_value, error));
113 m_workers.at(0).join();
114 m_workers.clear();
115 }
116
117 protected:
test_fence(send_func_t send1,send_func_t send2,size_t alignment)118 void test_fence(send_func_t send1, send_func_t send2, size_t alignment) {
119 static const size_t memheap_size = sizeof(uint64_t);
120 uint32_t error = 0;
121
122 sender().connect(&receiver(), get_ep_params());
123 flush_worker(sender()); /* avoid deadlock for blocking amo */
124
125 mapped_buffer buffer(memheap_size, receiver(), 0);
126
127 EXPECT_LE(memheap_size, buffer.size());
128 memset(buffer.ptr(), 0, memheap_size);
129
130 run_workers(send1, send2, &sender(), buffer.rkey(sender()),
131 buffer.ptr(), 1, &error);
132
133 EXPECT_EQ(error, (uint32_t)0);
134
135 disconnect(sender());
136 disconnect(receiver());
137 }
138
get_ctx_params()139 static ucp_params_t get_ctx_params() {
140 ucp_params_t params = ucp_test::get_ctx_params();
141 params.features |= UCP_FEATURE_RMA;
142 return params;
143 }
144 };
145
146 class test_ucp_fence32 : public test_ucp_fence {
147 public:
get_ctx_params()148 static ucp_params_t get_ctx_params() {
149 ucp_params_t params = test_ucp_fence::get_ctx_params();
150 params.features |= UCP_FEATURE_AMO32;
151 return params;
152 }
153 };
154
UCS_TEST_P(test_ucp_fence32,atomic_add_fadd)155 UCS_TEST_P(test_ucp_fence32, atomic_add_fadd) {
156 test<uint32_t>(&test_ucp_fence32::blocking_add<uint32_t>,
157 &test_ucp_fence32::blocking_fadd<uint32_t>);
158 }
159
160 UCP_INSTANTIATE_TEST_CASE(test_ucp_fence32)
161
162 class test_ucp_fence64 : public test_ucp_fence {
163 public:
get_ctx_params()164 static ucp_params_t get_ctx_params() {
165 ucp_params_t params = test_ucp_fence::get_ctx_params();
166 params.features |= UCP_FEATURE_AMO64;
167 return params;
168 }
169 };
170
UCS_TEST_P(test_ucp_fence64,atomic_add_fadd)171 UCS_TEST_P(test_ucp_fence64, atomic_add_fadd) {
172 test<uint64_t>(&test_ucp_fence64::blocking_add<uint64_t>,
173 &test_ucp_fence64::blocking_fadd<uint64_t>);
174 }
175
176 UCP_INSTANTIATE_TEST_CASE(test_ucp_fence64)
177