1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
3 * Copyright (C) ARM Ltd. 2016-2017.  ALL RIGHTS RESERVED.
4 *
5 * See file LICENSE for terms.
6 */
7 
8 #ifdef HAVE_CONFIG_H
9 #  include "config.h"
10 #endif
11 
12 #include <ucs/type/spinlock.h>
13 #include <ucs/arch/atomic.h>
14 #include <ucs/arch/bitops.h>
15 #include <ucs/async/async.h>
16 #include <ucs/debug/assert.h>
17 #include <ucs/debug/debug.h>
18 #include <ucs/sys/sys.h>
19 
20 #include "callbackq.h"
21 
22 
23 #define UCS_CALLBACKQ_IDX_FLAG_SLOW  0x80000000u
24 #define UCS_CALLBACKQ_IDX_MASK       0x7fffffffu
25 #define UCS_CALLBACKQ_FAST_MAX       (UCS_CALLBACKQ_FAST_COUNT - 1)
26 
27 
28 typedef struct ucs_callbackq_priv {
29     ucs_recursive_spinlock_t lock;           /**< Protects adding / removing */
30 
31     ucs_callbackq_elem_t     *slow_elems;    /**< Array of slow-path elements */
32     unsigned                 num_slow_elems; /**< Number of slow-path elements */
33     unsigned                 max_slow_elems; /**< Maximal number of slow-path elements */
34     int                      slow_proxy_id;  /**< ID of slow-path proxy in fast-path array.
35                                                   keep track while this moves around. */
36 
37     uint64_t                 fast_remove_mask; /**< Mask of which fast-path elements
38                                                     should be removed */
39     unsigned                 num_fast_elems; /**< Number of fast-path elements */
40 
41     /* Lookup table for callback IDs. This allows moving callbacks around in
42      * the arrays, while the user can always use a single ID to remove the
43      * callback in O(1).
44      */
45     int                      free_idx_id;    /**< Index of first free item in the list */
46     int                      num_idxs;       /**< Size of idxs array */
47     unsigned                 *idxs;          /**< ID-to-index lookup */
48 
49 } ucs_callbackq_priv_t;
50 
51 
52 static unsigned ucs_callbackq_slow_proxy(void *arg);
53 
ucs_callbackq_priv(ucs_callbackq_t * cbq)54 static inline ucs_callbackq_priv_t* ucs_callbackq_priv(ucs_callbackq_t *cbq)
55 {
56     UCS_STATIC_ASSERT(sizeof(cbq->priv) == sizeof(ucs_callbackq_priv_t));
57     return (void*)cbq->priv;
58 }
59 
ucs_callbackq_enter(ucs_callbackq_t * cbq)60 static void ucs_callbackq_enter(ucs_callbackq_t *cbq)
61 {
62     ucs_recursive_spin_lock(&ucs_callbackq_priv(cbq)->lock);
63 }
64 
ucs_callbackq_leave(ucs_callbackq_t * cbq)65 static void ucs_callbackq_leave(ucs_callbackq_t *cbq)
66 {
67     ucs_recursive_spin_unlock(&ucs_callbackq_priv(cbq)->lock);
68 }
69 
ucs_callbackq_elem_reset(ucs_callbackq_t * cbq,ucs_callbackq_elem_t * elem)70 static void ucs_callbackq_elem_reset(ucs_callbackq_t *cbq,
71                                      ucs_callbackq_elem_t *elem)
72 {
73     elem->cb    = NULL;
74     elem->arg   = cbq;
75     elem->id    = UCS_CALLBACKQ_ID_NULL;
76     elem->flags = 0;
77 }
78 
ucs_callbackq_array_grow(ucs_callbackq_t * cbq,void * ptr,size_t elem_size,int count,int * new_count,const char * alloc_name)79 static void *ucs_callbackq_array_grow(ucs_callbackq_t *cbq, void *ptr,
80                                       size_t elem_size, int count,
81                                       int *new_count, const char *alloc_name)
82 {
83     void *new_ptr;
84 
85     if (count == 0) {
86         *new_count = ucs_get_page_size() / elem_size;
87     } else {
88         *new_count = count * 2;
89     }
90 
91     new_ptr = ucs_sys_realloc(ptr, elem_size * count, elem_size * *new_count);
92     if (new_ptr == NULL) {
93         ucs_fatal("cbq %p: could not allocate memory for %s", cbq, alloc_name);
94     }
95     return new_ptr;
96 }
97 
ucs_callbackq_array_free(void * ptr,size_t elem_size,int count)98 static void ucs_callbackq_array_free(void *ptr, size_t elem_size, int count)
99 {
100     ucs_sys_free(ptr, elem_size * count);
101 }
102 
103 /*
104  * @param [in]  id  ID to release in the lookup array.
105  * @return index which this ID used to hold.
106  */
ucs_callbackq_put_id(ucs_callbackq_t * cbq,int id)107 int ucs_callbackq_put_id(ucs_callbackq_t *cbq, int id)
108 {
109     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
110     unsigned idx_with_flag;
111 
112     ucs_trace_func("cbq=%p id=%d", cbq, id);
113 
114     ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
115 
116     idx_with_flag     = priv->idxs[id];    /* Retrieve the index */
117     priv->idxs[id]    = priv->free_idx_id; /* Add ID to free-list head */
118     priv->free_idx_id = id;                /* Update free-list head */
119 
120     return idx_with_flag;
121 }
122 
ucs_callbackq_put_id_noflag(ucs_callbackq_t * cbq,int id)123 int ucs_callbackq_put_id_noflag(ucs_callbackq_t *cbq, int id)
124 {
125     return ucs_callbackq_put_id(cbq, id) & UCS_CALLBACKQ_IDX_MASK;
126 }
127 
128 /**
129  * @param [in]  idx  Index to save in the lookup array.
130  * @return unique ID which holds index 'idx'.
131  */
ucs_callbackq_get_id(ucs_callbackq_t * cbq,unsigned idx)132 int ucs_callbackq_get_id(ucs_callbackq_t *cbq, unsigned idx)
133 {
134     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
135     int new_num_idxs;
136     int id;
137 
138     ucs_trace_func("cbq=%p idx=%u", cbq, idx);
139 
140     if (priv->free_idx_id == UCS_CALLBACKQ_ID_NULL) {
141         priv->idxs = ucs_callbackq_array_grow(cbq, priv->idxs, sizeof(*priv->idxs),
142                                               priv->num_idxs, &new_num_idxs,
143                                               "indexes");
144 
145         /* Add new items to free-list */
146         for (id = priv->num_idxs; id < new_num_idxs; ++id) {
147             priv->idxs[id]    = priv->free_idx_id;
148             priv->free_idx_id = id;
149         }
150 
151         priv->num_idxs = new_num_idxs;
152     }
153 
154     id = priv->free_idx_id;             /* Get free ID from the list */
155     ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
156     priv->free_idx_id = priv->idxs[id]; /* Update free-list head */
157     priv->idxs[id]    = idx;            /* Install provided idx to array */
158     return id;
159 }
160 
ucs_callbackq_get_fast_idx(ucs_callbackq_t * cbq)161 static unsigned ucs_callbackq_get_fast_idx(ucs_callbackq_t *cbq)
162 {
163     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
164     unsigned idx;
165 
166     idx = priv->num_fast_elems++;
167     ucs_assert(idx < UCS_CALLBACKQ_FAST_COUNT);
168     return idx;
169 }
170 
ucs_callbackq_add_fast(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)171 static int ucs_callbackq_add_fast(ucs_callbackq_t *cbq, ucs_callback_t cb,
172                                   void *arg, unsigned flags)
173 {
174     unsigned idx;
175     int id;
176 
177     ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
178                    ucs_debug_get_symbol_name(cb), arg, flags);
179 
180     ucs_assert(!(flags & UCS_CALLBACKQ_FLAG_ONESHOT));
181 
182     idx = ucs_callbackq_get_fast_idx(cbq);
183     id  = ucs_callbackq_get_id(cbq, idx);
184     cbq->fast_elems[idx].cb    = cb;
185     cbq->fast_elems[idx].arg   = arg;
186     cbq->fast_elems[idx].flags = flags;
187     cbq->fast_elems[idx].id    = id;
188     return id;
189 }
190 
191 /* should be called from dispatch thread only */
ucs_callbackq_remove_fast(ucs_callbackq_t * cbq,unsigned idx)192 static void ucs_callbackq_remove_fast(ucs_callbackq_t *cbq, unsigned idx)
193 {
194     ucs_callbackq_priv_t *priv     = ucs_callbackq_priv(cbq);
195     ucs_callbackq_elem_t *dst_elem = &cbq->fast_elems[idx];
196     unsigned last_idx;
197     int id;
198 
199     ucs_trace_func("cbq=%p idx=%u", cbq, idx);
200 
201     ucs_assert(priv->num_fast_elems > 0);
202     last_idx = --priv->num_fast_elems;
203 
204     /* replace removed with last */
205     *dst_elem = cbq->fast_elems[last_idx];
206     ucs_callbackq_elem_reset(cbq, &cbq->fast_elems[last_idx]);
207 
208     if (priv->fast_remove_mask & UCS_BIT(last_idx)) {
209         /* replaced by marked-for-removal element, still need to remove 'idx' */
210         ucs_assert(priv->fast_remove_mask & UCS_BIT(idx));
211         priv->fast_remove_mask &= ~UCS_BIT(last_idx);
212     } else {
213         /* replaced by a live element, remove from the mask and update 'idxs' */
214         priv->fast_remove_mask &= ~UCS_BIT(idx);
215         if (last_idx != idx) {
216             id = dst_elem->id;
217             ucs_assert(id != UCS_CALLBACKQ_ID_NULL);
218             priv->idxs[id] = idx;
219         }
220     }
221 }
222 
223 /* should be called from dispatch thread only */
ucs_callbackq_purge_fast(ucs_callbackq_t * cbq)224 static void ucs_callbackq_purge_fast(ucs_callbackq_t *cbq)
225 {
226     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
227     unsigned idx;
228 
229     ucs_trace_func("cbq=%p map=0x%"PRIx64, cbq, priv->fast_remove_mask);
230 
231     /* Remove fast-path callbacks marked for removal */
232     while (priv->fast_remove_mask) {
233         idx = ucs_ffs64(priv->fast_remove_mask);
234         ucs_callbackq_remove_fast(cbq, idx);
235     }
236 }
237 
ucs_callbackq_enable_proxy(ucs_callbackq_t * cbq)238 static void ucs_callbackq_enable_proxy(ucs_callbackq_t *cbq)
239 {
240     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
241     unsigned idx;
242     int id;
243 
244     ucs_trace_func("cbq=%p", cbq);
245 
246     if (priv->slow_proxy_id != UCS_CALLBACKQ_ID_NULL) {
247         return;
248     }
249 
250     ucs_assert((priv->num_slow_elems > 0) || priv->fast_remove_mask);
251 
252     idx = ucs_callbackq_get_fast_idx(cbq);
253     id  = ucs_callbackq_get_id(cbq, idx);
254 
255     ucs_assert(cbq->fast_elems[idx].arg == cbq);
256     cbq->fast_elems[idx].cb    = ucs_callbackq_slow_proxy;
257     cbq->fast_elems[idx].flags = 0;
258     cbq->fast_elems[idx].id    = id;
259     /* Avoid writing 'arg' because the dispatching thread may not see it in case
260      * of weak memory ordering. Instead, 'arg' is reset to 'cbq' for all free and
261      * removed elements, from the main thread.
262      */
263 
264     priv->slow_proxy_id        = id;
265 }
266 
ucs_callbackq_disable_proxy(ucs_callbackq_t * cbq)267 static void ucs_callbackq_disable_proxy(ucs_callbackq_t *cbq)
268 {
269     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
270     unsigned idx;
271 
272     ucs_trace_func("cbq=%p slow_proxy_id=%d", cbq, priv->slow_proxy_id);
273 
274     if (priv->slow_proxy_id == UCS_CALLBACKQ_ID_NULL) {
275         return;
276     }
277 
278     idx = ucs_callbackq_put_id(cbq, priv->slow_proxy_id);
279     ucs_callbackq_remove_fast(cbq, idx);
280     priv->slow_proxy_id = UCS_CALLBACKQ_ID_NULL;
281 }
282 
ucs_callbackq_add_slow(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)283 static int ucs_callbackq_add_slow(ucs_callbackq_t *cbq, ucs_callback_t cb,
284                                   void *arg, unsigned flags)
285 {
286     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
287     ucs_callbackq_elem_t *new_slow_elems;
288     int new_max_slow_elems;
289     unsigned idx;
290     int id;
291 
292     ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
293                    ucs_debug_get_symbol_name(cb), arg, flags);
294 
295     /* Grow slow-path array if needed */
296     if (priv->num_slow_elems >= priv->max_slow_elems) {
297         new_slow_elems = ucs_callbackq_array_grow(cbq, priv->slow_elems,
298                                                   sizeof(*priv->slow_elems),
299                                                   priv->max_slow_elems,
300                                                   &new_max_slow_elems,
301                                                   "slow_elems");
302         for (idx = priv->max_slow_elems; idx < new_max_slow_elems; ++idx) {
303             ucs_callbackq_elem_reset(cbq, &new_slow_elems[idx]);
304         }
305 
306         priv->max_slow_elems = new_max_slow_elems;
307         priv->slow_elems     = new_slow_elems;
308     }
309 
310     /* Add slow-path element to the queue */
311     idx = priv->num_slow_elems++;
312     id  = ucs_callbackq_get_id(cbq, idx | UCS_CALLBACKQ_IDX_FLAG_SLOW);
313     priv->slow_elems[idx].cb    = cb;
314     priv->slow_elems[idx].arg   = arg;
315     priv->slow_elems[idx].flags = flags;
316     priv->slow_elems[idx].id    = id;
317 
318     ucs_callbackq_enable_proxy(cbq);
319     return id;
320 }
321 
ucs_callbackq_remove_slow(ucs_callbackq_t * cbq,unsigned idx)322 static void ucs_callbackq_remove_slow(ucs_callbackq_t *cbq, unsigned idx)
323 {
324     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
325 
326     ucs_trace_func("cbq=%p idx=%u", cbq, idx);
327 
328     /* Mark for removal by ucs_callbackq_purge_slow() */
329     ucs_callbackq_elem_reset(cbq, &priv->slow_elems[idx]);
330 }
331 
ucs_callbackq_purge_slow(ucs_callbackq_t * cbq)332 static void ucs_callbackq_purge_slow(ucs_callbackq_t *cbq)
333 {
334     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
335     ucs_callbackq_elem_t *src_elem;
336     unsigned src_idx, dst_idx;
337 
338     ucs_trace_func("cbq=%p", cbq);
339 
340     /*
341      * Copy valid elements from src_idx to dst_idx, essentially rebuilding the
342      * array of elements in-place, keeping only the valid ones.
343      * As an optimization, if no elements are actually removed, then src_idx will
344      * always be equal to dst_idx, so nothing will be actually copied/moved.
345      */
346     dst_idx = 0;
347     for (src_idx = 0; src_idx < priv->num_slow_elems; ++src_idx) {
348         src_elem = &priv->slow_elems[src_idx];
349         if (src_elem->id != UCS_CALLBACKQ_ID_NULL) {
350             ucs_assert(dst_idx <= src_idx);
351             if (dst_idx != src_idx) {
352                 priv->idxs[src_elem->id]  = dst_idx | UCS_CALLBACKQ_IDX_FLAG_SLOW;
353                 priv->slow_elems[dst_idx] = *src_elem;
354             }
355             ++dst_idx;
356         }
357     }
358 
359     priv->num_slow_elems = dst_idx;
360 }
361 
ucs_callbackq_slow_proxy(void * arg)362 static unsigned ucs_callbackq_slow_proxy(void *arg)
363 {
364     ucs_callbackq_t      *cbq  = arg;
365     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
366     ucs_callbackq_elem_t *elem;
367     unsigned UCS_V_UNUSED removed_idx;
368     unsigned slow_idx, fast_idx;
369     ucs_callbackq_elem_t tmp_elem;
370     unsigned count = 0;
371 
372     ucs_trace_poll("cbq=%p", cbq);
373 
374     ucs_callbackq_enter(cbq);
375 
376     /* Execute and update slow-path callbacks */
377     for (slow_idx = 0; slow_idx < priv->num_slow_elems; ++slow_idx) {
378         elem = &priv->slow_elems[slow_idx];
379         if (elem->id == UCS_CALLBACKQ_ID_NULL) {
380             continue;
381         }
382 
383         tmp_elem = *elem;
384         if (elem->flags & UCS_CALLBACKQ_FLAG_FAST) {
385             ucs_assert(!(elem->flags & UCS_CALLBACKQ_FLAG_ONESHOT));
386             if (priv->num_fast_elems < UCS_CALLBACKQ_FAST_MAX) {
387                 fast_idx = ucs_callbackq_get_fast_idx(cbq);
388                 cbq->fast_elems[fast_idx] = *elem;
389                 priv->idxs[elem->id]      = fast_idx;
390                 ucs_callbackq_remove_slow(cbq, slow_idx);
391             }
392         } else if (elem->flags & UCS_CALLBACKQ_FLAG_ONESHOT) {
393             removed_idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
394             ucs_assert(removed_idx == slow_idx);
395             ucs_callbackq_remove_slow(cbq, slow_idx);
396         }
397 
398         ucs_callbackq_leave(cbq);
399 
400         count += tmp_elem.cb(tmp_elem.arg); /* Execute callback without lock */
401 
402         ucs_callbackq_enter(cbq);
403     }
404 
405     ucs_callbackq_purge_fast(cbq);
406     ucs_callbackq_purge_slow(cbq);
407 
408     /* Disable this proxy if no more work to do */
409     if (!priv->fast_remove_mask && (priv->num_slow_elems == 0)) {
410         ucs_callbackq_disable_proxy(cbq);
411     }
412 
413     ucs_callbackq_leave(cbq);
414 
415     return count;
416 }
417 
ucs_callbackq_init(ucs_callbackq_t * cbq)418 ucs_status_t ucs_callbackq_init(ucs_callbackq_t *cbq)
419 {
420     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
421     unsigned idx;
422 
423     for (idx = 0; idx < UCS_CALLBACKQ_FAST_COUNT + 1; ++idx) {
424         ucs_callbackq_elem_reset(cbq, &cbq->fast_elems[idx]);
425     }
426 
427     ucs_recursive_spinlock_init(&priv->lock, 0);
428     priv->slow_elems        = NULL;
429     priv->num_slow_elems    = 0;
430     priv->max_slow_elems    = 0;
431     priv->slow_proxy_id     = UCS_CALLBACKQ_ID_NULL;
432     priv->fast_remove_mask  = 0;
433     priv->num_fast_elems    = 0;
434     priv->free_idx_id       = UCS_CALLBACKQ_ID_NULL;
435     priv->num_idxs          = 0;
436     priv->idxs              = NULL;
437     return UCS_OK;
438 }
439 
ucs_callbackq_cleanup(ucs_callbackq_t * cbq)440 void ucs_callbackq_cleanup(ucs_callbackq_t *cbq)
441 {
442     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
443 
444     ucs_callbackq_disable_proxy(cbq);
445     ucs_callbackq_purge_fast(cbq);
446     ucs_callbackq_purge_slow(cbq);
447 
448     if ((priv->num_fast_elems) > 0 || (priv->num_slow_elems > 0)) {
449         ucs_warn("%d fast-path and %d slow-path callbacks remain in the queue",
450                  priv->num_fast_elems, priv->num_slow_elems);
451     }
452 
453     ucs_callbackq_array_free(priv->slow_elems, sizeof(*priv->slow_elems),
454                              priv->max_slow_elems);
455     ucs_callbackq_array_free(priv->idxs, sizeof(*priv->idxs), priv->num_idxs);
456 }
457 
ucs_callbackq_add(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)458 int ucs_callbackq_add(ucs_callbackq_t *cbq, ucs_callback_t cb, void *arg,
459                       unsigned flags)
460 {
461     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
462     int id;
463 
464     ucs_callbackq_enter(cbq);
465 
466     ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
467                    ucs_debug_get_symbol_name(cb), arg, flags);
468 
469     if ((flags & UCS_CALLBACKQ_FLAG_FAST) &&
470         (priv->num_fast_elems < UCS_CALLBACKQ_FAST_MAX))
471     {
472         id = ucs_callbackq_add_fast(cbq, cb, arg, flags);
473     } else {
474         id = ucs_callbackq_add_slow(cbq, cb, arg, flags);
475     }
476 
477     ucs_callbackq_leave(cbq);
478     return id;
479 }
480 
ucs_callbackq_remove(ucs_callbackq_t * cbq,int id)481 void ucs_callbackq_remove(ucs_callbackq_t *cbq, int id)
482 {
483     unsigned idx_with_flag, idx;
484 
485     ucs_callbackq_enter(cbq);
486 
487     ucs_trace_func("cbq=%p id=%d", cbq, id);
488 
489     ucs_callbackq_purge_fast(cbq);
490 
491     idx_with_flag = ucs_callbackq_put_id(cbq, id);
492     idx           = idx_with_flag & UCS_CALLBACKQ_IDX_MASK;
493 
494     if (idx_with_flag & UCS_CALLBACKQ_IDX_FLAG_SLOW) {
495         ucs_callbackq_remove_slow(cbq, idx);
496     } else {
497         ucs_callbackq_remove_fast(cbq, idx);
498     }
499 
500     ucs_callbackq_leave(cbq);
501 }
502 
ucs_callbackq_add_safe(ucs_callbackq_t * cbq,ucs_callback_t cb,void * arg,unsigned flags)503 int ucs_callbackq_add_safe(ucs_callbackq_t *cbq, ucs_callback_t cb, void *arg,
504                            unsigned flags)
505 {
506     int id;
507 
508     ucs_callbackq_enter(cbq);
509 
510     ucs_trace_func("cbq=%p cb=%s arg=%p flags=%u", cbq,
511                    ucs_debug_get_symbol_name(cb), arg, flags);
512 
513     /* Add callback to slow-path, and it may be upgraded to fast-path later by
514      * the proxy callback. It's not safe to add fast-path callback directly
515      * from this context.
516      */
517     id = ucs_callbackq_add_slow(cbq, cb, arg, flags);
518 
519     ucs_callbackq_leave(cbq);
520     return id;
521 }
522 
ucs_callbackq_remove_safe(ucs_callbackq_t * cbq,int id)523 void ucs_callbackq_remove_safe(ucs_callbackq_t *cbq, int id)
524 {
525     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
526     unsigned idx_with_flag, idx;
527 
528     ucs_callbackq_enter(cbq);
529 
530     ucs_trace_func("cbq=%p id=%d", cbq, id);
531 
532     idx_with_flag = ucs_callbackq_put_id(cbq, id);
533     idx           = idx_with_flag & UCS_CALLBACKQ_IDX_MASK;
534 
535     if (idx_with_flag & UCS_CALLBACKQ_IDX_FLAG_SLOW) {
536         ucs_callbackq_remove_slow(cbq, idx);
537     } else {
538         UCS_STATIC_ASSERT(UCS_CALLBACKQ_FAST_MAX <= 64);
539         ucs_assert(idx < priv->num_fast_elems);
540         priv->fast_remove_mask |= UCS_BIT(idx);
541         cbq->fast_elems[idx].id = UCS_CALLBACKQ_ID_NULL; /* for assertion */
542         ucs_callbackq_enable_proxy(cbq);
543     }
544 
545     ucs_callbackq_leave(cbq);
546 }
547 
ucs_callbackq_remove_if(ucs_callbackq_t * cbq,ucs_callbackq_predicate_t pred,void * arg)548 void ucs_callbackq_remove_if(ucs_callbackq_t *cbq, ucs_callbackq_predicate_t pred,
549                              void *arg)
550 {
551     ucs_callbackq_priv_t *priv = ucs_callbackq_priv(cbq);
552     ucs_callbackq_elem_t *elem;
553     unsigned idx;
554 
555     ucs_callbackq_enter(cbq);
556 
557     ucs_trace_func("cbq=%p", cbq);
558 
559     ucs_callbackq_purge_fast(cbq);
560 
561     /* remote fast-path elements  */
562     elem = cbq->fast_elems;
563     while (elem->cb != NULL) {
564         if (pred(elem, arg)) {
565             idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
566             ucs_assert(idx == (elem - cbq->fast_elems));
567             ucs_callbackq_remove_fast(cbq, idx);
568         } else {
569             ++elem;
570        }
571     }
572 
573     /* remote slow-path elements */
574     elem = priv->slow_elems;
575     while (elem < priv->slow_elems + priv->num_slow_elems) {
576         if (pred(elem, arg)) {
577             idx = ucs_callbackq_put_id_noflag(cbq, elem->id);
578             ucs_assert(idx == (elem - priv->slow_elems));
579             ucs_callbackq_remove_slow(cbq, idx);
580         } else {
581             ++elem;
582        }
583     }
584 
585     ucs_callbackq_leave(cbq);
586 }
587