1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #ifndef MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED
7 #define MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED
8 
9 #include "mpidimpl.h"
10 #include "mpidu_genqi_shmem_types.h"
11 
12 #include <stdint.h>
13 #include <stdio.h>
14 
15 #define MPIDU_GENQ_SHMEM_QUEUE_TYPE__MPSC MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC
16 
17 typedef enum {
18     MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL,
19     MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC,
20     MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC,
21 } MPIDU_genq_shmem_queue_type_e;
22 
23 /* SERIAL */
24 
MPIDU_genqi_serial_init(MPIDU_genq_shmem_queue_u * queue)25 static inline int MPIDU_genqi_serial_init(MPIDU_genq_shmem_queue_u * queue)
26 {
27     queue->q.head.s = 0;
28     queue->q.tail.s = 0;
29     return 0;
30 }
31 
MPIDU_genqi_serial_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)32 static inline int MPIDU_genqi_serial_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
33                                              MPIDU_genq_shmem_queue_u * queue, void **cell)
34 {
35     MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
36     cell_h = HANDLE_TO_HEADER(pool_obj, queue->q.head.s);
37     if (queue->q.head.s) {
38         queue->q.head.s = cell_h->u.serial_queue.next;
39         if (!cell_h->u.serial_queue.next) {
40             queue->q.tail.s = 0;
41         }
42         *cell = HEADER_TO_CELL(cell_h);
43     } else {
44         *cell = NULL;
45     }
46     return 0;
47 }
48 
MPIDU_genqi_serial_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)49 static inline int MPIDU_genqi_serial_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
50                                              MPIDU_genq_shmem_queue_u * queue, void *cell)
51 {
52     MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
53     cell_h->u.serial_queue.next = 0;
54 
55     uintptr_t handle = cell_h->handle;
56 
57     if (queue->q.tail.s) {
58         HANDLE_TO_HEADER(pool_obj, queue->q.tail.s)->u.serial_queue.next = handle;
59     }
60     queue->q.tail.s = handle;
61     if (!queue->q.head.s) {
62         queue->q.head.s = queue->q.tail.s;
63     }
64     return 0;
65 }
66 
67 /* NEMESIS QUEUE */
68 
MPIDU_genqi_nem_mpsc_init(MPIDU_genq_shmem_queue_u * queue)69 static inline int MPIDU_genqi_nem_mpsc_init(MPIDU_genq_shmem_queue_u * queue)
70 {
71     MPL_atomic_store_ptr(&queue->q.head.m, NULL);
72     MPL_atomic_store_ptr(&queue->q.tail.m, NULL);
73     return 0;
74 }
75 
MPIDU_genqi_nem_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)76 static inline int MPIDU_genqi_nem_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
77                                                MPIDU_genq_shmem_queue_u * queue, void **cell)
78 {
79     void *handle = MPL_atomic_load_ptr(&queue->q.head.m);
80     if (!handle) {
81         /* queue is empty */
82         *cell = NULL;
83     } else {
84         MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
85         cell_h = HANDLE_TO_HEADER(pool_obj, handle);
86         *cell = HEADER_TO_CELL(cell_h);
87 
88         void *next_handle = MPL_atomic_load_ptr(&cell_h->u.nem_queue.next_m);
89         if (next_handle != NULL) {
90             /* just dequeue the head */
91             MPL_atomic_store_ptr(&queue->q.head.m, next_handle);
92         } else {
93             /* single element, tail == head,
94              * have to make sure no enqueuing is in progress */
95             MPL_atomic_store_ptr(&queue->q.head.m, NULL);
96             if (MPL_atomic_cas_ptr(&queue->q.tail.m, handle, NULL) == handle) {
97                 /* no enqueuing in progress, we are done */
98             } else {
99                 /* busy wait for the enqueuing to finish */
100                 do {
101                     next_handle = MPL_atomic_load_ptr(&cell_h->u.nem_queue.next_m);
102                 } while (next_handle == NULL);
103                 /* then set the header */
104                 MPL_atomic_store_ptr(&queue->q.head.m, next_handle);
105             }
106         }
107     }
108     return 0;
109 }
110 
MPIDU_genqi_nem_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)111 static inline int MPIDU_genqi_nem_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
112                                                MPIDU_genq_shmem_queue_u * queue, void *cell)
113 {
114     MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
115     MPL_atomic_store_ptr(&cell_h->u.nem_queue.next_m, NULL);
116 
117     void *handle = (void *) cell_h->handle;
118 
119     void *tail_handle = NULL;
120     tail_handle = MPL_atomic_swap_ptr(&queue->q.tail.m, handle);
121     if (tail_handle == NULL) {
122         /* queue was empty */
123         MPL_atomic_store_ptr(&queue->q.head.m, handle);
124     } else {
125         MPIDU_genqi_shmem_cell_header_s *tail_cell_h = NULL;
126         tail_cell_h = HANDLE_TO_HEADER(pool_obj, tail_handle);
127         MPL_atomic_store_ptr(&tail_cell_h->u.nem_queue.next_m, handle);
128     }
129     return 0;
130 }
131 
132 /* INVERSE QUEUE */
133 
MPIDU_genqi_inv_mpsc_init(MPIDU_genq_shmem_queue_u * queue)134 static inline int MPIDU_genqi_inv_mpsc_init(MPIDU_genq_shmem_queue_u * queue)
135 {
136     queue->q.head.s = 0;
137     /* sp and mp all use atomic tail */
138     MPL_atomic_store_ptr(&queue->q.tail.m, NULL);
139 
140     return 0;
141 }
142 
143 static inline MPIDU_genqi_shmem_cell_header_s
MPIDU_genqi_shmem_get_head_cell_header(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue)144     * MPIDU_genqi_shmem_get_head_cell_header(MPIDU_genqi_shmem_pool_s * pool_obj,
145                                              MPIDU_genq_shmem_queue_u * queue)
146 {
147     void *tail = NULL;
148     MPIDU_genqi_shmem_cell_header_s *head_cell_h = NULL;
149 
150     /* prepares the cells for dequeuing from the head in the following steps.
151      * 1. atomic detaching all cells frm the queue tail
152      * 2. find the head of the queue and rebuild the "next" pointers for cells
153      */
154     tail = MPL_atomic_swap_ptr(&queue->q.tail.m, NULL);
155     if (!tail) {
156         return NULL;
157     }
158     head_cell_h = HANDLE_TO_HEADER(pool_obj, tail);
159 
160     if (head_cell_h != NULL) {
161         uintptr_t curr_handle = head_cell_h->handle;
162         while (head_cell_h->u.inverse_queue.prev) {
163             MPIDU_genqi_shmem_cell_header_s *prev_cell_h;
164             prev_cell_h = HANDLE_TO_HEADER(pool_obj, head_cell_h->u.inverse_queue.prev);
165             prev_cell_h->u.inverse_queue.next = curr_handle;
166             curr_handle = head_cell_h->u.inverse_queue.prev;
167             head_cell_h = prev_cell_h;
168         }
169         return head_cell_h;
170     } else {
171         return NULL;
172     }
173 }
174 
MPIDU_genqi_inv_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void ** cell)175 static inline int MPIDU_genqi_inv_mpsc_dequeue(MPIDU_genqi_shmem_pool_s * pool_obj,
176                                                MPIDU_genq_shmem_queue_u * queue, void **cell)
177 {
178     int rc = MPI_SUCCESS;
179     MPIDU_genqi_shmem_cell_header_s *cell_h = NULL;
180 
181     if (!queue->q.head.s) {
182         cell_h = MPIDU_genqi_shmem_get_head_cell_header(pool_obj, queue);
183         if (cell_h) {
184             *cell = HEADER_TO_CELL(cell_h);
185             queue->q.head.s = cell_h->u.inverse_queue.next;
186         } else {
187             *cell = NULL;
188         }
189     } else {
190         cell_h = HANDLE_TO_HEADER(pool_obj, queue->q.head.s);
191         *cell = HEADER_TO_CELL(cell_h);
192         queue->q.head.s = cell_h->u.inverse_queue.next;
193     }
194 
195     return rc;
196 }
197 
MPIDU_genqi_inv_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,MPIDU_genq_shmem_queue_u * queue,void * cell)198 static inline int MPIDU_genqi_inv_mpsc_enqueue(MPIDU_genqi_shmem_pool_s * pool_obj,
199                                                MPIDU_genq_shmem_queue_u * queue, void *cell)
200 {
201     int rc = MPI_SUCCESS;
202     MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell);
203     cell_h->u.inverse_queue.next = 0;
204     cell_h->u.inverse_queue.prev = 0;
205 
206     void *prev_handle = NULL;
207     void *handle = (void *) cell_h->handle;
208 
209     do {
210         prev_handle = MPL_atomic_load_ptr(&queue->q.tail.m);
211         cell_h->u.inverse_queue.prev = (uintptr_t) prev_handle;
212     } while (MPL_atomic_cas_ptr(&queue->q.tail.m, prev_handle, handle) != prev_handle);
213 
214     return rc;
215 }
216 
217 /* EXTERNAL INTERFACE */
218 
MPIDU_genq_shmem_queue_dequeue(MPIDU_genq_shmem_pool_t pool,MPIDU_genq_shmem_queue_t queue,void ** cell)219 static inline int MPIDU_genq_shmem_queue_dequeue(MPIDU_genq_shmem_pool_t pool,
220                                                  MPIDU_genq_shmem_queue_t queue, void **cell)
221 {
222     int rc = MPI_SUCCESS;
223     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
224     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
225 
226     MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool;
227     MPIDU_genq_shmem_queue_u *queue_obj = (MPIDU_genq_shmem_queue_u *) queue;
228     int flags = queue_obj->q.flags;
229     if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL) {
230         rc = MPIDU_genqi_serial_dequeue(pool_obj, queue_obj, cell);
231     } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC) {
232         rc = MPIDU_genqi_inv_mpsc_dequeue(pool_obj, queue_obj, cell);
233     } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC) {
234         rc = MPIDU_genqi_nem_mpsc_dequeue(pool_obj, queue_obj, cell);
235     } else {
236         MPIR_Assert(0 && "Invalid GenQ flag");
237     }
238 
239     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
240     return rc;
241 }
242 
MPIDU_genq_shmem_queue_enqueue(MPIDU_genq_shmem_pool_t pool,MPIDU_genq_shmem_queue_t queue,void * cell)243 static inline int MPIDU_genq_shmem_queue_enqueue(MPIDU_genq_shmem_pool_t pool,
244                                                  MPIDU_genq_shmem_queue_t queue, void *cell)
245 {
246     int rc = MPI_SUCCESS;
247     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
248     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
249 
250     MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool;
251     MPIDU_genq_shmem_queue_u *queue_obj = (MPIDU_genq_shmem_queue_u *) queue;
252     int flags = queue_obj->q.flags;
253     if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL) {
254         rc = MPIDU_genqi_serial_enqueue(pool_obj, queue_obj, cell);
255     } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__INV_MPSC) {
256         rc = MPIDU_genqi_inv_mpsc_enqueue(pool_obj, queue_obj, cell);
257     } else if (flags == MPIDU_GENQ_SHMEM_QUEUE_TYPE__NEM_MPSC) {
258         rc = MPIDU_genqi_nem_mpsc_enqueue(pool_obj, queue_obj, cell);
259     } else {
260         MPIR_Assert(0 && "Invalid GenQ flag");
261     }
262 
263     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_GENQ_SHMEM_QUEUE_INIT);
264     return rc;
265 }
266 
267 #endif /* ifndef MPIDU_GENQ_SHMEM_QUEUE_H_INCLUDED */
268