1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 /* TODO figure out how to rewrite some/all of this queue code to use
7 * explicit relaxed atomic operations */
8
9 #ifndef MPID_NEM_QUEUE_H_INCLUDED
10 #define MPID_NEM_QUEUE_H_INCLUDED
11 #include "mpid_nem_datatypes.h"
12 #include "mpid_nem_defs.h"
13 #include "mpid_nem_atomics.h"
14
15 int MPID_nem_network_poll(int in_blocking_progress);
16
17 /* Assertion macros for nemesis queues. We don't use the normal
18 * assertion macros because we don't usually want to assert several
19 * times per queue operation. These assertions serve more as structured
20 * comments that can easily transformed into being real assertions */
21 #if 0
22 #define MPID_nem_q_assert(a_) \
23 do { \
24 if (unlikely(!(a_))) { \
25 MPID_nem_q_assert_fail(#a_, __FILE__, __LINE__); \
26 } \
27 } while (0)
28 #define MPID_nem_q_assert_fail(a_str_, file_, line_) \
29 do {/*nothing*/} while(0)
30 #else
31 #define MPID_nem_q_assert(a_) \
32 do {/*nothing*/} while (0)
33 #endif
34
MPID_nem_cell_init(MPID_nem_cell_ptr_t cell)35 static inline void MPID_nem_cell_init(MPID_nem_cell_ptr_t cell)
36 {
37 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_CELL_INIT);
38
39 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_CELL_INIT);
40
41 MPID_NEM_SET_REL_NULL(cell->next);
42 memset((void *)&cell->header, 0, sizeof(MPID_nem_pkt_header_t));
43
44 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_CELL_INIT);
45 }
46
MPID_nem_queue_init(MPID_nem_queue_ptr_t qhead)47 static inline void MPID_nem_queue_init(MPID_nem_queue_ptr_t qhead)
48 {
49 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPID_NEM_QUEUE_INIT);
50
51 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPID_NEM_QUEUE_INIT);
52
53 MPID_NEM_SET_REL_NULL(qhead->head);
54 MPID_NEM_SET_REL_NULL(qhead->my_head);
55 MPID_NEM_SET_REL_NULL(qhead->tail);
56
57 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPID_NEM_QUEUE_INIT);
58 }
59
60 #define MPID_NEM_USE_SHADOW_HEAD
61
MPID_NEM_SWAP_REL(MPID_nem_cell_rel_ptr_t * ptr,MPID_nem_cell_rel_ptr_t val)62 static inline MPID_nem_cell_rel_ptr_t MPID_NEM_SWAP_REL (MPID_nem_cell_rel_ptr_t *ptr, MPID_nem_cell_rel_ptr_t val)
63 {
64 MPID_nem_cell_rel_ptr_t ret;
65 MPL_atomic_release_store_ptr(&ret.p, MPL_atomic_swap_ptr(&(ptr->p), MPL_atomic_acquire_load_ptr(&val.p)));
66 return ret;
67 }
68
69 /* do a compare-and-swap with MPID_NEM_RELNULL */
MPID_NEM_CAS_REL_NULL(MPID_nem_cell_rel_ptr_t * ptr,MPID_nem_cell_rel_ptr_t oldv)70 static inline MPID_nem_cell_rel_ptr_t MPID_NEM_CAS_REL_NULL (MPID_nem_cell_rel_ptr_t *ptr, MPID_nem_cell_rel_ptr_t oldv)
71 {
72 MPID_nem_cell_rel_ptr_t ret;
73 MPL_atomic_release_store_ptr(&ret.p, MPL_atomic_cas_ptr(&(ptr->p), MPL_atomic_acquire_load_ptr(&oldv.p), MPID_NEM_REL_NULL));
74 return ret;
75 }
76
77 static inline void
MPID_nem_queue_enqueue(MPID_nem_queue_ptr_t qhead,MPID_nem_cell_ptr_t element)78 MPID_nem_queue_enqueue (MPID_nem_queue_ptr_t qhead, MPID_nem_cell_ptr_t element)
79 {
80 MPID_nem_cell_rel_ptr_t r_prev;
81 MPID_nem_cell_rel_ptr_t r_element = MPID_NEM_ABS_TO_REL (element);
82
83 /* the _dequeue can break if this does not hold */
84 MPID_nem_q_assert(MPID_NEM_IS_REL_NULL(element->next));
85
86 /* Orders payload and e->next=NULL w.r.t. the SWAP, updating head, and
87 * updating prev->next. We assert e->next==NULL above, but it may have been
88 * done by us in the preceding _dequeue operation.
89 *
90 * The SWAP itself does not need to be ordered w.r.t. the payload because
91 * the consumer does not directly inspect the tail. But the subsequent
92 * update to the head or e->next field does need to be ordered w.r.t. the
93 * payload or the consumer may read incorrect data. */
94 MPL_atomic_write_barrier();
95
96 /* enqueue at tail */
97 r_prev = MPID_NEM_SWAP_REL (&(qhead->tail), r_element);
98 if (MPID_NEM_IS_REL_NULL (r_prev))
99 {
100 /* queue was empty, element is the new head too */
101
102 /* no write barrier needed, we believe atomic SWAP with a control
103 * dependence (if) will enforce ordering between the SWAP and the head
104 * assignment */
105 qhead->head = r_element;
106 }
107 else
108 {
109 /* queue was not empty, swing old tail's next field to point to
110 * our element */
111 MPID_nem_q_assert(MPID_NEM_IS_REL_NULL(MPID_NEM_REL_TO_ABS(r_prev)->next));
112
113 /* no write barrier needed, we believe atomic SWAP with a control
114 * dependence (if/else) will enforce ordering between the SWAP and the
115 * prev->next assignment */
116 MPID_NEM_REL_TO_ABS (r_prev)->next = r_element;
117 }
118 }
119
120 /* This operation is only safe because this is a single-dequeuer queue impl.
121 Assumes that MPID_nem_queue_empty was called immediately prior to fix up any
122 shadow head issues. */
123 static inline MPID_nem_cell_ptr_t
MPID_nem_queue_head(MPID_nem_queue_ptr_t qhead)124 MPID_nem_queue_head (MPID_nem_queue_ptr_t qhead)
125 {
126 MPID_nem_q_assert(MPID_NEM_IS_REL_NULL(qhead->head));
127 return MPID_NEM_REL_TO_ABS(qhead->my_head);
128 }
129
130 static inline int
MPID_nem_queue_empty(MPID_nem_queue_ptr_t qhead)131 MPID_nem_queue_empty (MPID_nem_queue_ptr_t qhead)
132 {
133 /* outside of this routine my_head and head should never both
134 * contain a non-null value */
135 MPID_nem_q_assert(MPID_NEM_IS_REL_NULL(qhead->my_head) ||
136 MPID_NEM_IS_REL_NULL(qhead->head));
137
138 if (MPID_NEM_IS_REL_NULL (qhead->my_head))
139 {
140 /* the order of comparison between my_head and head does not
141 * matter, no read barrier needed here */
142 if (MPID_NEM_IS_REL_NULL (qhead->head))
143 {
144 /* both null, nothing in queue */
145 return 1;
146 }
147 else
148 {
149 /* shadow head null and head has value, move the value to
150 * our private shadow head and zero the real head */
151 qhead->my_head = qhead->head;
152 /* no barrier needed, my_head is entirely private to consumer */
153 MPID_NEM_SET_REL_NULL (qhead->head);
154 }
155 }
156
157 /* the following assertions are present at the beginning of _dequeue:
158 MPID_nem_q_assert(!MPID_NEM_IS_REL_NULL(qhead->my_head));
159 MPID_nem_q_assert( MPID_NEM_IS_REL_NULL(qhead->head));
160 */
161 return 0;
162 }
163
164
165 /* Gets the head */
166 static inline void
MPID_nem_queue_dequeue(MPID_nem_queue_ptr_t qhead,MPID_nem_cell_ptr_t * e)167 MPID_nem_queue_dequeue (MPID_nem_queue_ptr_t qhead, MPID_nem_cell_ptr_t *e)
168 {
169 MPID_nem_cell_ptr_t _e;
170 MPID_nem_cell_rel_ptr_t _r_e;
171
172 /* _empty always called first, moving head-->my_head */
173 MPID_nem_q_assert(!MPID_NEM_IS_REL_NULL(qhead->my_head));
174 MPID_nem_q_assert( MPID_NEM_IS_REL_NULL(qhead->head));
175
176 _r_e = qhead->my_head;
177 _e = MPID_NEM_REL_TO_ABS (_r_e);
178
179 /* no barrier needed, my_head is private to consumer, plus
180 * head/my_head and _e->next are ordered by a data dependency */
181 if (!MPID_NEM_IS_REL_NULL(_e->next))
182 {
183 qhead->my_head = _e->next;
184 }
185 else
186 {
187 /* we've reached the end (tail) of the queue */
188 MPID_nem_cell_rel_ptr_t old_tail;
189
190 MPID_NEM_SET_REL_NULL (qhead->my_head);
191 /* no barrier needed, the caller doesn't need any ordering w.r.t.
192 * my_head or the tail */
193 old_tail = MPID_NEM_CAS_REL_NULL (&(qhead->tail), _r_e);
194
195 if (!MPID_NEM_REL_ARE_EQUAL (old_tail, _r_e))
196 {
197 /* FIXME is a barrier needed here because of the control-only dependency? */
198 while (MPID_NEM_IS_REL_NULL (_e->next))
199 {
200 SKIP;
201 }
202 /* no read barrier needed between loads from the same location */
203 qhead->my_head = _e->next;
204 }
205 }
206 MPID_NEM_SET_REL_NULL (_e->next);
207
208 /* Conservative read barrier here to ensure loads from head are ordered
209 * w.r.t. payload reads by the caller. The McKenney "whymb" document's
210 * Figure 11 indicates that we don't need a barrier, but we are currently
211 * unconvinced of this. Further work, ideally using more formal methods,
212 * should justify removing this. (note that this barrier won't cost us
213 * anything on many platforms, esp. x86) */
214 MPL_atomic_read_barrier();
215
216 *e = _e;
217 }
218
219 #endif /* MPID_NEM_QUEUE_H_INCLUDED */
220