1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #ifndef MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED
7 #define MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED
8
9 #include "mpidimpl.h"
10 #include "mpidu_genqi_shmem_types.h"
11
12 #include <stdint.h>
13 #include <stdio.h>
14
15 #define MPIDU_GENQ_SHMEM_QUEUE_TYPE__MPSC MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC
16
17 typedef enum {
18 MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL,
19 MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC,
20 MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC,
21 } MPIDU_genq_shmem_queue_type_e;
22
23 /* SERIAL */
24
MPIDU_genqi_serial_init(MPIDU_genq_shmem_queue_u * queue)25 static inline int MPIDU_genqi_serial_init(MPIDU_genq_shmem_queue_u * queue)
26 {
27 queue->q.head.s = 0;
28 queue->q.tail.s = 0;
29 return 0;
30 }
31
MPIDU_genqi_serial_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)32 static inline int MPIDU_genqi_serial_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
33 MPIDU_genq_shmem_queue_u * queue, void **cell)
34 {
35 MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
36 cell_h = HANDLE_TO_HEADER(pool_obj, queue->q.head.s);
37 if (queue->q.head.s) {
38 queue->q.head.s = cell_h->u.serial_queue.next;
39 if (!cell_h->u.serial_queue.next) {
40 queue->q.tail.s = 0;
41 }
42 *cell = HEADER_TO_CELL(cell_h);
43 } else {
44 *cell = NULL;
45 }
46 return 0;
47 }
48
MPIDU_genqi_serial_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)49 static inline int MPIDU_genqi_serial_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
50 MPIDU_genq_shmem_queue_u * queue, void *cell)
51 {
52 MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
53 cell_h->u.serial_queue.next = 0;
54
55 uintptr_t handle = cell_h->handle;
56
57 if (queue->q.tail.s) {
58 HANDLE_TO_HEADER(pool_obj, queue->q.tail.s)->u.serial_queue.next = handle;
59 }
60 queue->q.tail.s = handle;
61 if (!queue->q.head.s) {
62 queue->q.head.s = queue->q.tail.s;
63 }
64 return 0;
65 }
66
67 /* NEMESIS QUEUE */
68
MPIDU_genqi_nem_mpsc_init(MPIDU_genq_shmem_queue_u * queue)69 static inline int MPIDU_genqi_nem_mpsc_init(MPIDU_genq_shmem_queue_u * queue)
70 {
71 MPL_atomic_store_ptr(&queue->q.head.m, NULL);
72 MPL_atomic_store_ptr(&queue->q.tail.m, NULL);
73 return 0;
74 }
75
MPIDU_genqi_nem_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)76 static inline int MPIDU_genqi_nem_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
77 MPIDU_genq_shmem_queue_u * queue, void **cell)
78 {
79 void *handle = MPL_atomic_load_ptr(&queue->q.head.m);
80 if (!handle) {
81 /* queue is empty */
82 *cell = NULL;
83 } else {
84 MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
85 cell_h = HANDLE_TO_HEADER(pool_obj, handle);
86 *cell = HEADER_TO_CELL(cell_h);
87
88 void *next_handle = MPL_atomic_load_ptr(&cell_h->u.nem_queue.next_m);
89 if (next_handle != NULL) {
90 /* just dequeue the head */
91 MPL_atomic_store_ptr(&queue->q.head.m, next_handle);
92 } else {
93 /* single element, tail == head,
94 * have to make sure no enqueuing is in progress */
95 MPL_atomic_store_ptr(&queue->q.head.m, NULL);
96 if (MPL_atomic_cas_ptr(&queue->q.tail.m, handle, NULL) == handle) {
97 /* no enqueuing in progress, we are done */
98 } else {
99 /* busy wait for the enqueuing to finish */
100 do {
101 next_handle = MPL_atomic_load_ptr(&cell_h->u.nem_queue.next_m);
102 } while (next_handle == NULL);
103 /* then set the header */
104 MPL_atomic_store_ptr(&queue->q.head.m, next_handle);
105 }
106 }
107 }
108 return 0;
109 }
110
MPIDU_genqi_nem_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)111 static inline int MPIDU_genqi_nem_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
112 MPIDU_genq_shmem_queue_u * queue, void *cell)
113 {
114 MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
115 MPL_atomic_store_ptr(&cell_h->u.nem_queue.next_m, NULL);
116
117 void *handle = (void *) cell_h->handle;
118
119 void *tail_handle = NULL;
120 tail_handle = MPL_atomic_swap_ptr(&queue->q.tail.m, handle);
121 if (tail_handle == NULL) {
122 /* queue was empty */
123 MPL_atomic_store_ptr(&queue->q.head.m, handle);
124 } else {
125 MPIDU_genqi_shmem_cell_header_s *tail_cell_h = NULL;
126 tail_cell_h = HANDLE_TO_HEADER(pool_obj, tail_handle);
127 MPL_atomic_store_ptr(&tail_cell_h->u.nem_queue.next_m, handle);
128 }
129 return 0;
130 }
131
132 /* INVERSE QUEUE */
133
MPIDU_genqi_inv_mpsc_init(MPIDU_genq_shmem_queue_u * queue)134 static inline int MPIDU_genqi_inv_mpsc_init(MPIDU_genq_shmem_queue_u * queue)
135 {
136 queue->q.head.s = 0;
137 /* sp and mp all use atomic tail */
138 MPL_atomic_store_ptr(&queue->q.tail.m, NULL);
139
140 return 0;
141 }
142
143 static inline MPIDU_genqi_shmem_cell_header_s
MPIDU_genqi_shmem_get_head_cell_header(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue)144 * MPIDU_genqi_shmem_get_head_cell_header(MPIDU_genqi_shmem_pool_s * pool_obj,
145 MPIDU_genq_shmem_queue_u * queue)
146 {
147 void *tail = NULL;
148 MPIDU_genqi_shmem_cell_header_s *head_cell_h = NULL;
149
150 /* prepares the cells for dequeuing from the head in the following steps.
151 * 1. atomic detaching all cells frm the queue tail
152 * 2. find the head of the queue and rebuild the "next" pointers for cells
153 */
154 tail = MPL_atomic_swap_ptr(&queue->q.tail.m, NULL);
155 if (!tail) {
156 return NULL;
157 }
158 head_cell_h = HANDLE_TO_HEADER(pool_obj, tail);
159
160 if (head_cell_h != NULL) {
161 uintptr_t curr_handle = head_cell_h->handle;
162 while (head_cell_h->u.inverse_queue.prev) {
163 MPIDU_genqi_shmem_cell_header_s *prev_cell_h;
164 prev_cell_h = HANDLE_TO_HEADER(pool_obj, head_cell_h->u.inverse_queue.prev);
165 prev_cell_h->u.inverse_queue.next = curr_handle;
166 curr_handle = head_cell_h->u.inverse_queue.prev;
167 head_cell_h = prev_cell_h;
168 }
169 return head_cell_h;
170 } else {
171 return NULL;
172 }
173 }
174
MPIDU_genqi_inv_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)175 static inline int MPIDU_genqi_inv_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
176 MPIDU_genq_shmem_queue_u * queue, void **cell)
177 {
178 int rc = MPI_SUCCESS;
179 MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
180
181 if (!queue->q.head.s) {
182 cell_h = MPIDU_genqi_shmem_get_head_cell_header(pool_obj, queue);
183 if (cell_h) {
184 *cell = HEADER_TO_CELL(cell_h);
185 queue->q.head.s = cell_h->u.inverse_queue.next;
186 } else {
187 *cell = NULL;
188 }
189 } else {
190 cell_h = HANDLE_TO_HEADER(pool_obj, queue->q.head.s);
191 *cell = HEADER_TO_CELL(cell_h);
192 queue->q.head.s = cell_h->u.inverse_queue.next;
193 }
194
195 return rc;
196 }
197
MPIDU_genqi_inv_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)198 static inline int MPIDU_genqi_inv_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
199 MPIDU_genq_shmem_queue_u * queue, void *cell)
200 {
201 int rc = MPI_SUCCESS;
202 MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
203 cell_h->u.inverse_queue.next = 0;
204 cell_h->u.inverse_queue.prev = 0;
205
206 void *prev_handle = NULL;
207 void *handle = (void *) cell_h->handle;
208
209 do {
210 prev_handle = MPL_atomic_load_ptr(&queue->q.tail.m);
211 cell_h->u.inverse_queue.prev = (uintptr_t) prev_handle;
212 } while (MPL_atomic_cas_ptr(&queue->q.tail.m, prev_handle, handle) != prev_handle);
213
214 return rc;
215 }
216
217 /* EXTERNAL INTERFACE */
218
MPIDU_genq_shmem_queue_dequeue(MPIDU_genq_shmem_pool_t pool,MPIDU_genq_shmem_queue_t queue,void ** cell)219 static inline int MPIDU_genq_shmem_queue_dequeue(MPIDU_genq_shmem_pool_t pool,
220 MPIDU_genq_shmem_queue_t queue, void **cell)
221 {
222 int rc = MPI_SUCCESS;
223 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
224 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
225
226 MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool;
227 MPIDU_genq_shmem_queue_u *queue_obj = (MPIDU_genq_shmem_queue_u *) queue;
228 int flags = queue_obj->q.flags;
229 if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL) {
230 rc = MPIDU_genqi_serial_dequeue(pool_obj, queue_obj, cell);
231 } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC) {
232 rc = MPIDU_genqi_inv_mpsc_dequeue(pool_obj, queue_obj, cell);
233 } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC) {
234 rc = MPIDU_genqi_nem_mpsc_dequeue(pool_obj, queue_obj, cell);
235 } else {
236 MPIR_Assert(0 && "Invalid GenQ flag");
237 }
238
239 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
240 return rc;
241 }
242
MPIDU_genq_shmem_queue_enqueue(MPIDU_genq_shmem_pool_t pool,MPIDU_genq_shmem_queue_t queue,void * cell)243 static inline int MPIDU_genq_shmem_queue_enqueue(MPIDU_genq_shmem_pool_t pool,
244 MPIDU_genq_shmem_queue_t queue, void *cell)
245 {
246 int rc = MPI_SUCCESS;
247 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
248 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
249
250 MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool;
251 MPIDU_genq_shmem_queue_u *queue_obj = (MPIDU_genq_shmem_queue_u *) queue;
252 int flags = queue_obj->q.flags;
253 if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL) {
254 rc = MPIDU_genqi_serial_enqueue(pool_obj, queue_obj, cell);
255 } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC) {
256 rc = MPIDU_genqi_inv_mpsc_enqueue(pool_obj, queue_obj, cell);
257 } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC) {
258 rc = MPIDU_genqi_nem_mpsc_enqueue(pool_obj, queue_obj, cell);
259 } else {
260 MPIR_Assert(0 && "Invalid GenQ flag");
261 }
262
263 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
264 return rc;
265 }
266
267 #endif /* ifndef MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED */
268