1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED.
3 * Copyright (C) ARM Ltd. 2016-2017. ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 # include "config.h"
10 #endif
11
12 #include <ucs/type/spinlock.h>
13 #include <ucs/arch/atomic.h>
14 #include <ucs/arch/bitops.h>
15 #include <ucs/async/async.h>
16 #include <ucs/debug/assert.h>
17 #include <ucs/debug/debug.h>
18 #include <ucs/sys/sys.h>
19
20 #include "callbackq.h"
21
22
23 #define UCS_CALLBACKQ_IDX_FLAG_SLOW 0x80000000u
24 #define UCS_CALLBACKQ_IDX_MASK 0x7fffffffu
25 #define UCS_CALLBACKQ_FAST_MAX (UCS_CALLBACKQ_FAST_COUNT - 1)
26
27
28 typedef struct ucs_callbackq_priv {
29 ucs_recursive_spinlock_t lock; /**< Protects adding / removing */
30
31 ucs_callbackq_elem_t *slow_elems; /**< Array of slow-path elements */
32 unsigned num_slow_elems; /**< Number of slow-path elements */
33 unsigned max_slow_elems; /**< Maximal number of slow-path elements */
34 int slow_proxy_id; /**< ID of slow-path proxy in fast-path array.
35 keep track while this moves around. */
36
37 uint64_t fast_remove_mask; /**< Mask of which fast-path elements
38 should be removed */
39 unsigned num_fast_elems; /**< Number of fast-path elements */
40
41 /* Lookup table for callback IDs. This allows moving callbacks around in
42 * the arrays, while the user can always use a single ID to remove the
43 * callback in O(1).
44 */
45 int free_idx_id; /**< Index of first free item in the list */
46 int num_idxs; /**< Size of idxs array */
47 unsigned *idxs; /**< ID-to-index lookup */
48
49 } ucs_callbackq_priv_t;
50
51
52 static unsigned ucs_callbackq_slow_proxy(void *arg);
53
ucs_callbackq_priv(ucs_callbackq_t * cbq)54 static inline ucs_callbackq_priv_t* ucs_callbackq_priv(ucs_callbackq_t *cbq)
55 {
56 UCS_STATIC_ASSERT(sizeof(cbq->priv) == sizeof(ucs_callbackq_priv_t));
57 return (void*)cbq->priv;
58 }
59
ucs_callbackq_enter(ucs_callbackq_t * cbq)60 static void ucs_callbackq_enter(ucs_callbackq_t *cbq)
61 {
62 ucs_recursive_spin_lock(&ucs_callbackq_priv(cbq)->lock);
63 }
64
ucs_callbackq_leave(ucs_callbackq_t * cbq)65 static void ucs_callbackq_leave(ucs_callbackq_t *cbq)
66 {
67 ucs_recursive_spin_unlock(&ucs_callbackq_priv(cbq)->lock);
68 }
69
ucs_callbackq_elem_reset(ucs_callbackq_t * cbq,ucs_callbackq_elem_t * elem)70 static void ucs_callbackq_elem_reset(ucs_callbackq_t *cbq,
71 ucs_callbackq_elem_t *elem)
72 {
73 elem->cb = NULL;
74 elem->arg = cbq;
75 elem->id = UCS_CALLBACKQ_ID_NULL;
76 elem->flags = 0;
77 }
78
ucs_callbackq_array_grow(ucs_callbackq_t * cbq,void * ptr,size_t elem_size,int count,int * new_count,const char * alloc_name)79 static void *ucs_callbackq_array_grow(ucs_callbackq_t *cbq, void *ptr,
80 size_t elem_size, int count,
81 int *new_count, const char *alloc_name)
82 {
83 void *new_ptr;
84
85 if (count == 0) {
86 *new_count = ucs_get_page_size() / elem_size;
87 } else {
88 *new_count = count * 2;
89 }
90
91 new_ptr = ucs_sys_realloc(ptr, elem_size * count, elem_size * *new_count);
92 if (new_ptr == NULL) {
93 ucs_fatal("cbq %p: could not allocate memory for %s", cbq, alloc_name);
94 }
95 return new_ptr;
96 }
97
ucs_callbackq_array_free(void * ptr,size_t elem_size,int count)98 static void ucs_callbackq_array_free(void *ptr, size_t elem_size, int count)
99 {
100 ucs_sys_free(ptr, elem_size * count);
101 }
102
103 /*
104 * @param [in] id ID to release in the lookup array.
105 * @return index which this ID used to hold.
106 */
ucs_callbackq_put_id(ucs_callbackq_t * cbq,int id)107 int ucs_callbackq_put_id(ucs_callbackq_t *cbq, int id)
108 {
109 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
110 unsigned idx_with_flag;
111
112 ucs_trace_func("cbq=%p id=%d", cbq, id);
113
114 ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
115
116 idx_with_flag = priv->idxs[id]; /* Retrieve the index */
117 priv->idxs[id] = priv->free_idx_id; /* Add ID to free-list head */
118 priv->free_idx_id = id; /* Update free-list head */
119
120 return idx_with_flag;
121 }
122
ucs_callbackq_put_id_noflag(ucs_callbackq_t * cbq,int id)123 int ucs_callbackq_put_id_noflag(ucs_callbackq_t *cbq, int id)
124 {
125 return ucs_callbackq_put_id(cbq, id) & UCS_CALLBACKQ_IDX_MASK;
126 }
127
128 /**
129 * @param [in] idx Index to save in the lookup array.
130 * @return unique ID which holds index 'idx'.
131 */
ucs_callbackq_get_id(ucs_callbackq_t * cbq,unsigned idx)132 int ucs_callbackq_get_id(ucs_callbackq_t *cbq, unsigned idx)
133 {
134 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
135 int new_num_idxs;
136 int id;
137
138 ucs_trace_func("cbq=%p idx=%u", cbq, idx);
139
140 if (priv->free_idx_id == UCS_CALLBACKQ_ID_NULL) {
141 priv->idxs = ucs_callbackq_array_grow(cbq, priv->idxs, sizeof(*priv->idxs),
142 priv->num_idxs, &new_num_idxs,
143 "indexes");
144
145 /* Add new items to free-list */
146 for (id = priv->num_idxs; id < new_num_idxs; ++id) {
147 priv->idxs[id] = priv->free_idx_id;
148 priv->free_idx_id = id;
149 }
150
151 priv->num_idxs = new_num_idxs;
152 }
153
154 id = priv->free_idx_id; /* Get free ID from the list */
155 ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
156 priv->free_idx_id = priv->idxs[id]; /* Update free-list head */
157 priv->idxs[id] = idx; /* Install provided idx to array */
158 return id;
159 }
160
ucs_callbackq_get_fast_idx(ucs_callbackq_t * cbq)161 static unsigned ucs_callbackq_get_fast_idx(ucs_callbackq_t *cbq)
162 {
163 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
164 unsigned idx;
165
166 idx = priv->num_fast_elems++;
167 ucs_assert(idx < UCS_CALLBACKQ_FAST_COUNT);
168 return idx;
169 }
170
ucs_callbackq_add_fast(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)171 static int ucs_callbackq_add_fast(ucs_callbackq_t *cbq, ucs_callback_t cb,
172 void *arg, unsigned flags)
173 {
174 unsigned idx;
175 int id;
176
177 ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
178 ucs_debug_get_symbol_name(cb), arg, flags);
179
180 ucs_assert(!(flags & UCS_CALLBACKQ_FLAG_ONESHOT));
181
182 idx = ucs_callbackq_get_fast_idx(cbq);
183 id = ucs_callbackq_get_id(cbq, idx);
184 cbq->fast_elems[idx].cb = cb;
185 cbq->fast_elems[idx].arg = arg;
186 cbq->fast_elems[idx].flags = flags;
187 cbq->fast_elems[idx].id = id;
188 return id;
189 }
190
191 /* should be called from dispatch thread only */
ucs_callbackq_remove_fast(ucs_callbackq_t * cbq,unsigned idx)192 static void ucs_callbackq_remove_fast(ucs_callbackq_t *cbq, unsigned idx)
193 {
194 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
195 ucs_callbackq_elem_t *dst_elem = &cbq->fast_elems[idx];
196 unsigned last_idx;
197 int id;
198
199 ucs_trace_func("cbq=%p idx=%u", cbq, idx);
200
201 ucs_assert(priv->num_fast_elems > 0);
202 last_idx = --priv->num_fast_elems;
203
204 /* replace removed with last */
205 *dst_elem = cbq->fast_elems[last_idx];
206 ucs_callbackq_elem_reset(cbq, &cbq->fast_elems[last_idx]);
207
208 if (priv->fast_remove_mask & UCS_BIT(last_idx)) {
209 /* replaced by marked-for-removal element, still need to remove 'idx' */
210 ucs_assert(priv->fast_remove_mask & UCS_BIT(idx));
211 priv->fast_remove_mask &= ~UCS_BIT(last_idx);
212 } else {
213 /* replaced by a live element, remove from the mask and update 'idxs' */
214 priv->fast_remove_mask &= ~UCS_BIT(idx);
215 if (last_idx != idx) {
216 id = dst_elem->id;
217 ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
218 priv->idxs[id] = idx;
219 }
220 }
221 }
222
223 /* should be called from dispatch thread only */
ucs_callbackq_purge_fast(ucs_callbackq_t * cbq)224 static void ucs_callbackq_purge_fast(ucs_callbackq_t *cbq)
225 {
226 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
227 unsigned idx;
228
229 ucs_trace_func("cbq=%p map=0x%"PRIx64, cbq, priv->fast_remove_mask);
230
231 /* Remove fast-path callbacks marked for removal */
232 while (priv->fast_remove_mask) {
233 idx = ucs_ffs64(priv->fast_remove_mask);
234 ucs_callbackq_remove_fast(cbq, idx);
235 }
236 }
237
ucs_callbackq_enable_proxy(ucs_callbackq_t * cbq)238 static void ucs_callbackq_enable_proxy(ucs_callbackq_t *cbq)
239 {
240 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
241 unsigned idx;
242 int id;
243
244 ucs_trace_func("cbq=%p", cbq);
245
246 if (priv->slow_proxy_id != UCS_CALLBACKQ_ID_NULL) {
247 return;
248 }
249
250 ucs_assert((priv->num_slow_elems > 0) || priv->fast_remove_mask);
251
252 idx = ucs_callbackq_get_fast_idx(cbq);
253 id = ucs_callbackq_get_id(cbq, idx);
254
255 ucs_assert(cbq->fast_elems[idx].arg == cbq);
256 cbq->fast_elems[idx].cb = ucs_callbackq_slow_proxy;
257 cbq->fast_elems[idx].flags = 0;
258 cbq->fast_elems[idx].id = id;
259 /* Avoid writing 'arg' because the dispatching thread may not see it in case
260 * of weak memory ordering. Instead, 'arg' is reset to 'cbq' for all free and
261 * removed elements, from the main thread.
262 */
263
264 priv->slow_proxy_id = id;
265 }
266
ucs_callbackq_disable_proxy(ucs_callbackq_t * cbq)267 static void ucs_callbackq_disable_proxy(ucs_callbackq_t *cbq)
268 {
269 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
270 unsigned idx;
271
272 ucs_trace_func("cbq=%p slow_proxy_id=%d", cbq, priv->slow_proxy_id);
273
274 if (priv->slow_proxy_id == UCS_CALLBACKQ_ID_NULL) {
275 return;
276 }
277
278 idx = ucs_callbackq_put_id(cbq, priv->slow_proxy_id);
279 ucs_callbackq_remove_fast(cbq, idx);
280 priv->slow_proxy_id = UCS_CALLBACKQ_ID_NULL;
281 }
282
ucs_callbackq_add_slow(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)283 static int ucs_callbackq_add_slow(ucs_callbackq_t *cbq, ucs_callback_t cb,
284 void *arg, unsigned flags)
285 {
286 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
287 ucs_callbackq_elem_t *new_slow_elems;
288 int new_max_slow_elems;
289 unsigned idx;
290 int id;
291
292 ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
293 ucs_debug_get_symbol_name(cb), arg, flags);
294
295 /* Grow slow-path array if needed */
296 if (priv->num_slow_elems >= priv->max_slow_elems) {
297 new_slow_elems = ucs_callbackq_array_grow(cbq, priv->slow_elems,
298 sizeof(*priv->slow_elems),
299 priv->max_slow_elems,
300 &new_max_slow_elems,
301 "slow_elems");
302 for (idx = priv->max_slow_elems; idx < new_max_slow_elems; ++idx) {
303 ucs_callbackq_elem_reset(cbq, &new_slow_elems[idx]);
304 }
305
306 priv->max_slow_elems = new_max_slow_elems;
307 priv->slow_elems = new_slow_elems;
308 }
309
310 /* Add slow-path element to the queue */
311 idx = priv->num_slow_elems++;
312 id = ucs_callbackq_get_id(cbq, idx | UCS_CALLBACKQ_IDX_FLAG_SLOW);
313 priv->slow_elems[idx].cb = cb;
314 priv->slow_elems[idx].arg = arg;
315 priv->slow_elems[idx].flags = flags;
316 priv->slow_elems[idx].id = id;
317
318 ucs_callbackq_enable_proxy(cbq);
319 return id;
320 }
321
ucs_callbackq_remove_slow(ucs_callbackq_t * cbq,unsigned idx)322 static void ucs_callbackq_remove_slow(ucs_callbackq_t *cbq, unsigned idx)
323 {
324 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
325
326 ucs_trace_func("cbq=%p idx=%u", cbq, idx);
327
328 /* Mark for removal by ucs_callbackq_purge_slow() */
329 ucs_callbackq_elem_reset(cbq, &priv->slow_elems[idx]);
330 }
331
ucs_callbackq_purge_slow(ucs_callbackq_t * cbq)332 static void ucs_callbackq_purge_slow(ucs_callbackq_t *cbq)
333 {
334 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
335 ucs_callbackq_elem_t *src_elem;
336 unsigned src_idx, dst_idx;
337
338 ucs_trace_func("cbq=%p", cbq);
339
340 /*
341 * Copy valid elements from src_idx to dst_idx, essentially rebuilding the
342 * array of elements in-place, keeping only the valid ones.
343 * As an optimization, if no elements are actually removed, then src_idx will
344 * always be equal to dst_idx, so nothing will be actually copied/moved.
345 */
346 dst_idx = 0;
347 for (src_idx = 0; src_idx < priv->num_slow_elems; ++src_idx) {
348 src_elem = &priv->slow_elems[src_idx];
349 if (src_elem->id != UCS_CALLBACKQ_ID_NULL) {
350 ucs_assert(dst_idx <= src_idx);
351 if (dst_idx != src_idx) {
352 priv->idxs[src_elem->id] = dst_idx | UCS_CALLBACKQ_IDX_FLAG_SLOW;
353 priv->slow_elems[dst_idx] = *src_elem;
354 }
355 ++dst_idx;
356 }
357 }
358
359 priv->num_slow_elems = dst_idx;
360 }
361
ucs_callbackq_slow_proxy(void * arg)362 static unsigned ucs_callbackq_slow_proxy(void *arg)
363 {
364 ucs_callbackq_t *cbq = arg;
365 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
366 ucs_callbackq_elem_t *elem;
367 unsigned UCS_V_UNUSED removed_idx;
368 unsigned slow_idx, fast_idx;
369 ucs_callbackq_elem_t tmp_elem;
370 unsigned count = 0;
371
372 ucs_trace_poll("cbq=%p", cbq);
373
374 ucs_callbackq_enter(cbq);
375
376 /* Execute and update slow-path callbacks */
377 for (slow_idx = 0; slow_idx < priv->num_slow_elems; ++slow_idx) {
378 elem = &priv->slow_elems[slow_idx];
379 if (elem->id == UCS_CALLBACKQ_ID_NULL) {
380 continue;
381 }
382
383 tmp_elem = *elem;
384 if (elem->flags & UCS_CALLBACKQ_FLAG_FAST) {
385 ucs_assert(!(elem->flags & UCS_CALLBACKQ_FLAG_ONESHOT));
386 if (priv->num_fast_elems < UCS_CALLBACKQ_FAST_MAX) {
387 fast_idx = ucs_callbackq_get_fast_idx(cbq);
388 cbq->fast_elems[fast_idx] = *elem;
389 priv->idxs[elem->id] = fast_idx;
390 ucs_callbackq_remove_slow(cbq, slow_idx);
391 }
392 } else if (elem->flags & UCS_CALLBACKQ_FLAG_ONESHOT) {
393 removed_idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
394 ucs_assert(removed_idx == slow_idx);
395 ucs_callbackq_remove_slow(cbq, slow_idx);
396 }
397
398 ucs_callbackq_leave(cbq);
399
400 count += tmp_elem.cb(tmp_elem.arg); /* Execute callback without lock */
401
402 ucs_callbackq_enter(cbq);
403 }
404
405 ucs_callbackq_purge_fast(cbq);
406 ucs_callbackq_purge_slow(cbq);
407
408 /* Disable this proxy if no more work to do */
409 if (!priv->fast_remove_mask && (priv->num_slow_elems == 0)) {
410 ucs_callbackq_disable_proxy(cbq);
411 }
412
413 ucs_callbackq_leave(cbq);
414
415 return count;
416 }
417
ucs_callbackq_init(ucs_callbackq_t * cbq)418 ucs_status_t ucs_callbackq_init(ucs_callbackq_t *cbq)
419 {
420 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
421 unsigned idx;
422
423 for (idx = 0; idx < UCS_CALLBACKQ_FAST_COUNT + 1; ++idx) {
424 ucs_callbackq_elem_reset(cbq, &cbq->fast_elems[idx]);
425 }
426
427 ucs_recursive_spinlock_init(&priv->lock, 0);
428 priv->slow_elems = NULL;
429 priv->num_slow_elems = 0;
430 priv->max_slow_elems = 0;
431 priv->slow_proxy_id = UCS_CALLBACKQ_ID_NULL;
432 priv->fast_remove_mask = 0;
433 priv->num_fast_elems = 0;
434 priv->free_idx_id = UCS_CALLBACKQ_ID_NULL;
435 priv->num_idxs = 0;
436 priv->idxs = NULL;
437 return UCS_OK;
438 }
439
ucs_callbackq_cleanup(ucs_callbackq_t * cbq)440 void ucs_callbackq_cleanup(ucs_callbackq_t *cbq)
441 {
442 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
443
444 ucs_callbackq_disable_proxy(cbq);
445 ucs_callbackq_purge_fast(cbq);
446 ucs_callbackq_purge_slow(cbq);
447
448 if ((priv->num_fast_elems) > 0 || (priv->num_slow_elems > 0)) {
449 ucs_warn("%d fast-path and %d slow-path callbacks remain in the queue",
450 priv->num_fast_elems, priv->num_slow_elems);
451 }
452
453 ucs_callbackq_array_free(priv->slow_elems, sizeof(*priv->slow_elems),
454 priv->max_slow_elems);
455 ucs_callbackq_array_free(priv->idxs, sizeof(*priv->idxs), priv->num_idxs);
456 }
457
ucs_callbackq_add(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)458 int ucs_callbackq_add(ucs_callbackq_t *cbq, ucs_callback_t cb, void *arg,
459 unsigned flags)
460 {
461 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
462 int id;
463
464 ucs_callbackq_enter(cbq);
465
466 ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
467 ucs_debug_get_symbol_name(cb), arg, flags);
468
469 if ((flags & UCS_CALLBACKQ_FLAG_FAST) &&
470 (priv->num_fast_elems < UCS_CALLBACKQ_FAST_MAX))
471 {
472 id = ucs_callbackq_add_fast(cbq, cb, arg, flags);
473 } else {
474 id = ucs_callbackq_add_slow(cbq, cb, arg, flags);
475 }
476
477 ucs_callbackq_leave(cbq);
478 return id;
479 }
480
ucs_callbackq_remove(ucs_callbackq_t * cbq,int id)481 void ucs_callbackq_remove(ucs_callbackq_t *cbq, int id)
482 {
483 unsigned idx_with_flag, idx;
484
485 ucs_callbackq_enter(cbq);
486
487 ucs_trace_func("cbq=%p id=%d", cbq, id);
488
489 ucs_callbackq_purge_fast(cbq);
490
491 idx_with_flag = ucs_callbackq_put_id(cbq, id);
492 idx = idx_with_flag & UCS_CALLBACKQ_IDX_MASK;
493
494 if (idx_with_flag & UCS_CALLBACKQ_IDX_FLAG_SLOW) {
495 ucs_callbackq_remove_slow(cbq, idx);
496 } else {
497 ucs_callbackq_remove_fast(cbq, idx);
498 }
499
500 ucs_callbackq_leave(cbq);
501 }
502
ucs_callbackq_add_safe(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)503 int ucs_callbackq_add_safe(ucs_callbackq_t *cbq, ucs_callback_t cb, void *arg,
504 unsigned flags)
505 {
506 int id;
507
508 ucs_callbackq_enter(cbq);
509
510 ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
511 ucs_debug_get_symbol_name(cb), arg, flags);
512
513 /* Add callback to slow-path, and it may be upgraded to fast-path later by
514 * the proxy callback. It's not safe to add fast-path callback directly
515 * from this context.
516 */
517 id = ucs_callbackq_add_slow(cbq, cb, arg, flags);
518
519 ucs_callbackq_leave(cbq);
520 return id;
521 }
522
ucs_callbackq_remove_safe(ucs_callbackq_t * cbq,int id)523 void ucs_callbackq_remove_safe(ucs_callbackq_t *cbq, int id)
524 {
525 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
526 unsigned idx_with_flag, idx;
527
528 ucs_callbackq_enter(cbq);
529
530 ucs_trace_func("cbq=%p id=%d", cbq, id);
531
532 idx_with_flag = ucs_callbackq_put_id(cbq, id);
533 idx = idx_with_flag & UCS_CALLBACKQ_IDX_MASK;
534
535 if (idx_with_flag & UCS_CALLBACKQ_IDX_FLAG_SLOW) {
536 ucs_callbackq_remove_slow(cbq, idx);
537 } else {
538 UCS_STATIC_ASSERT(UCS_CALLBACKQ_FAST_MAX <= 64);
539 ucs_assert(idx < priv->num_fast_elems);
540 priv->fast_remove_mask |= UCS_BIT(idx);
541 cbq->fast_elems[idx].id = UCS_CALLBACKQ_ID_NULL; /* for assertion */
542 ucs_callbackq_enable_proxy(cbq);
543 }
544
545 ucs_callbackq_leave(cbq);
546 }
547
ucs_callbackq_remove_if(ucs_callbackq_t * cbq,ucs_callbackq_predicate_t pred,void * arg)548 void ucs_callbackq_remove_if(ucs_callbackq_t *cbq, ucs_callbackq_predicate_t pred,
549 void *arg)
550 {
551 ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
552 ucs_callbackq_elem_t *elem;
553 unsigned idx;
554
555 ucs_callbackq_enter(cbq);
556
557 ucs_trace_func("cbq=%p", cbq);
558
559 ucs_callbackq_purge_fast(cbq);
560
561 /* remote fast-path elements */
562 elem = cbq->fast_elems;
563 while (elem->cb != NULL) {
564 if (pred(elem, arg)) {
565 idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
566 ucs_assert(idx == (elem - cbq->fast_elems));
567 ucs_callbackq_remove_fast(cbq, idx);
568 } else {
569 ++elem;
570 }
571 }
572
573 /* remote slow-path elements */
574 elem = priv->slow_elems;
575 while (elem < priv->slow_elems + priv->num_slow_elems) {
576 if (pred(elem, arg)) {
577 idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
578 ucs_assert(idx == (elem - priv->slow_elems));
579 ucs_callbackq_remove_slow(cbq, idx);
580 } else {
581 ++elem;
582 }
583 }
584
585 ucs_callbackq_leave(cbq);
586 }
587