1 /**
2  * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
3  *
4  * See file LICENSE for terms.
5  */
6 
7 #include <common/test.h>
8 #include <common/test_helpers.h>
9 
10 extern "C" {
11 #include <ucs/arch/atomic.h>
12 #include <ucs/async/async.h>
13 #include <ucs/async/pipe.h>
14 #include <ucs/sys/sys.h>
15 }
16 
17 #include <sys/poll.h>
18 
19 
20 class base {
21 public:
base(ucs_async_mode_t mode)22     base(ucs_async_mode_t mode) : m_mode(mode), m_count(0), m_handler_set(0) {
23     }
24 
~base()25     virtual ~base() {
26     }
27 
count() const28     int count() const {
29         return m_count;
30     }
31 
set_handler()32     void set_handler() {
33         ASSERT_FALSE(m_handler_set);
34         m_handler_set = 1;
35     }
36 
unset_handler(bool sync=true)37     void unset_handler(bool sync = true) {
38         if (ucs_atomic_cswap32(&m_handler_set, 1, 0)) {
39             ucs_status_t status = ucs_async_remove_handler(event_id(), sync);
40             ASSERT_UCS_OK(status);
41         }
42     }
43 
44 private:
45     base(const base& other);
46 
47 protected:
48     virtual void ack_event() = 0;
49     virtual int event_id() = 0;
50 
cb(int id,int events,void * arg)51     static void cb(int id, int events, void *arg) {
52         base *self = reinterpret_cast<base*>(arg);
53         self->handler();
54     }
55 
mode() const56     ucs_async_mode_t mode() const {
57         return m_mode;
58     }
59 
handler()60     virtual void handler() {
61         ++m_count;
62         ack_event();
63     }
64 
65     const ucs_async_mode_t m_mode;
66     int                    m_count;
67     uint32_t               m_handler_set;
68 };
69 
70 class base_event : public base {
71 public:
base_event(ucs_async_mode_t mode)72     base_event(ucs_async_mode_t mode) : base(mode) {
73         ucs_status_t status = ucs_async_pipe_create(&m_event_pipe);
74         ASSERT_UCS_OK(status);
75     }
76 
~base_event()77     virtual ~base_event() {
78         ucs_async_pipe_destroy(&m_event_pipe);
79     }
80 
set_handler(ucs_async_context_t * async)81     void set_handler(ucs_async_context_t *async) {
82         ucs_status_t status =
83             ucs_async_set_event_handler(mode(), event_fd(),
84                                         UCS_EVENT_SET_EVREAD,
85                                         cb, this, async);
86         ASSERT_UCS_OK(status);
87         base::set_handler();
88     }
89 
event_id()90     virtual int event_id() {
91         return event_fd();
92     }
93 
push_event()94     void push_event() {
95         ucs_async_pipe_push(&m_event_pipe);
96     }
97 
reset()98     void reset() {
99         ucs_async_pipe_drain(&m_event_pipe);
100     }
101 
102 protected:
ack_event()103     virtual void ack_event() {
104         reset();
105     }
106 
107 private:
event_fd()108     int event_fd() {
109         return ucs_async_pipe_rfd(&m_event_pipe);
110     }
111 
112     ucs_async_pipe_t m_event_pipe;
113 };
114 
115 class base_timer : public base {
116 public:
base_timer(ucs_async_mode_t mode)117     base_timer(ucs_async_mode_t mode) :
118         base(mode), m_timer_id(-1)
119     {
120     }
121 
122     /*
123      * Cannot call this from constructor - vptr not ready!
124      */
set_timer(ucs_async_context_t * async,ucs_time_t interval)125     void set_timer(ucs_async_context_t *async, ucs_time_t interval) {
126         ucs_assert(m_timer_id == -1);
127         ucs_status_t status = ucs_async_add_timer(mode(), interval, cb,
128                                                   this, async, &m_timer_id);
129         ASSERT_UCS_OK(status);
130         base::set_handler();
131     }
132 
event_id()133     virtual int event_id() {
134         return m_timer_id;
135     }
136 
137 protected:
ack_event()138     virtual void ack_event() {
139     }
140 
141 private:
142     int          m_timer_id;
143 };
144 
145 
146 class async_poll {
147 public:
148     virtual void poll() = 0;
~async_poll()149     virtual ~async_poll() {
150     }
151 };
152 
153 class global : public async_poll {
154 public:
poll()155     virtual void poll() {
156         ucs_async_poll(NULL);
157     }
158 
~global()159     virtual ~global() {
160     }
161 };
162 
163 class global_event : public global, public base_event {
164 public:
global_event(ucs_async_mode_t mode)165     global_event(ucs_async_mode_t mode) : base_event(mode) {
166         set_handler(NULL);
167     }
168 
~global_event()169     ~global_event() {
170         unset_handler();
171     }
172 };
173 
174 class global_timer : public global,  public base_timer {
175 public:
global_timer(ucs_async_mode_t mode)176     global_timer(ucs_async_mode_t mode) : base_timer(mode) {
177         set_timer(NULL, ucs_time_from_usec(1000));
178     }
179 
~global_timer()180     ~global_timer() {
181         unset_handler();
182     }
183 };
184 
185 class local : public async_poll {
186 public:
local(ucs_async_mode_t mode)187     local(ucs_async_mode_t mode) {
188         ucs_status_t status = ucs_async_context_init(&m_async, mode);
189         ASSERT_UCS_OK(status);
190     }
191 
~local()192     virtual ~local() {
193         ucs_async_context_cleanup(&m_async);
194     }
195 
block()196     void block() {
197         UCS_ASYNC_BLOCK(&m_async);
198     }
199 
unblock()200     void unblock() {
201         UCS_ASYNC_UNBLOCK(&m_async);
202     }
203 
check_miss()204     void check_miss() {
205         ucs_async_check_miss(&m_async);
206     }
207 
poll()208     virtual void poll() {
209         ucs_async_poll(&m_async);
210     }
211 
212 protected:
213     ucs_async_context_t m_async;
214 };
215 
216 class local_event : public local,
217                     public base_event
218 {
219 public:
local_event(ucs_async_mode_t mode)220     local_event(ucs_async_mode_t mode) : local(mode), base_event(mode) {
221         set_handler(&m_async);
222     }
223 
~local_event()224     ~local_event() {
225         unset_handler();
226     }
227 };
228 
229 class local_timer : public local,
230                     public base_timer
231 {
232 public:
233     static const int TIMER_INTERVAL_USEC = 1000;
234 
local_timer(ucs_async_mode_t mode)235     local_timer(ucs_async_mode_t mode) : local(mode), base_timer(mode) {
236         set_timer(&m_async, ucs_time_from_usec(TIMER_INTERVAL_USEC));
237     }
238 
~local_timer()239     ~local_timer() {
240         unset_handler();
241     }
242 };
243 
244 class test_async : public testing::TestWithParam<ucs_async_mode_t>,
245 public ucs::test_base {
246 public:
247     UCS_TEST_BASE_IMPL;
248 
249 protected:
250     static const int      COUNT           = 40;
251     static const unsigned SLEEP_USEC      = 1000;
252     static const int      NUM_RETRIES     = 100;
253     static const int      TIMER_EXP_COUNT = COUNT / 4;
254 
suspend(double scale=1.0)255     void suspend(double scale = 1.0) {
256         ucs::safe_usleep(ucs_max(scale * SLEEP_USEC, 0) *
257                          ucs::test_time_multiplier());
258     }
259 
suspend_and_poll(async_poll * p,double scale=1.0)260     void suspend_and_poll(async_poll *p, double scale = 1.0) {
261         if (GetParam() == UCS_ASYNC_MODE_POLL) {
262             for (double t = 0; t < scale; t += 1.0) {
263                 suspend();
264                 p->poll();
265             }
266         } else {
267             suspend(scale);
268         }
269     }
270 
suspend_and_poll2(async_poll * p1,async_poll * p2,double scale=1.0)271     void suspend_and_poll2(async_poll *p1, async_poll *p2, double scale = 1.0) {
272         if (GetParam() == UCS_ASYNC_MODE_POLL) {
273             for (double t = 0; t < scale; t += 1.0) {
274                 suspend();
275                 p1->poll();
276                 p2->poll();
277             }
278         } else {
279             suspend(scale);
280         }
281     }
282 
283     template<typename E>
expect_count_GE(E & event,int value)284     void expect_count_GE(E& event, int value) {
285         for (int retry = 0; retry < NUM_RETRIES; ++retry) {
286              suspend_and_poll(&event, COUNT);
287              if (event.count() >= value) {
288                   return;
289              }
290              UCS_TEST_MESSAGE << "retry " << (retry + 1);
291          }
292          EXPECT_GE(event.count(), value) << "after " << int(NUM_RETRIES)
293                                          << " retries";
294     }
295 
296 };
297 
298 template<typename LOCAL>
299 class test_async_mt : public test_async {
300 protected:
301     static const unsigned NUM_THREADS = 32;
302 
test_async_mt()303     test_async_mt() {
304         for (unsigned i = 0; i < NUM_THREADS; ++i) {
305             m_ev[i] = NULL;
306         }
307     }
308 
init()309     virtual void init() {
310         pthread_barrier_init(&m_barrier, NULL, NUM_THREADS + 1);
311     }
312 
thread_run(unsigned index)313     int thread_run(unsigned index) {
314         LOCAL* le;
315         m_ev[index] = le = new LOCAL(GetParam());
316 
317         barrier();
318 
319         while (!m_stop[index]) {
320             le->block();
321             unsigned before = le->count();
322             suspend_and_poll(le, 1.0);
323             unsigned after  = le->count();
324             le->unblock();
325 
326             EXPECT_EQ(before, after); /* Should not handle while blocked */
327             le->check_miss();
328             suspend_and_poll(le, 1.0);
329         }
330 
331         int result = le->count();
332         delete le;
333         m_ev[index] = NULL;
334         return result;
335     }
336 
spawn()337     void spawn() {
338         for (unsigned i = 0; i < NUM_THREADS; ++i) {
339             m_stop[i] = false;
340             pthread_create(&m_threads[i], NULL, thread_func, (void*)this);
341         }
342         barrier();
343     }
344 
stop()345     void stop() {
346         for (unsigned i = 0; i < NUM_THREADS; ++i) {
347             m_stop[i] = true;
348             void *result;
349             pthread_join(m_threads[i], &result);
350             m_thread_counts[i] = (int)(uintptr_t)result;
351         }
352     }
353 
event(unsigned thread)354     LOCAL* event(unsigned thread) {
355         return m_ev[thread];
356     }
357 
thread_count(unsigned thread)358     int thread_count(unsigned thread) {
359         return m_thread_counts[thread];
360     }
361 
362 private:
barrier()363     void barrier() {
364         pthread_barrier_wait(&m_barrier);
365     }
366 
thread_func(void * arg)367     static void *thread_func(void *arg)
368     {
369         test_async_mt *self = reinterpret_cast<test_async_mt*>(arg);
370 
371         for (unsigned index = 0; index < NUM_THREADS; ++index) {
372             if (self->m_threads[index] == pthread_self()) {
373                 return (void*)(uintptr_t)self->thread_run(index);
374             }
375         }
376 
377         /* Not found */
378         return (void*)-1;
379     }
380 
381     pthread_t                      m_threads[NUM_THREADS];
382     pthread_barrier_t              m_barrier;
383     int                            m_thread_counts[NUM_THREADS];
384     bool                           m_stop[NUM_THREADS];
385     LOCAL*                         m_ev[NUM_THREADS];
386 };
387 
UCS_TEST_P(test_async,global_event)388 UCS_TEST_P(test_async, global_event) {
389     global_event ge(GetParam());
390     ge.push_event();
391     expect_count_GE(ge, 1);
392 }
393 
UCS_TEST_P(test_async,global_timer)394 UCS_TEST_P(test_async, global_timer) {
395     global_timer gt(GetParam());
396     expect_count_GE(gt, COUNT);
397 }
398 
399 UCS_TEST_P(test_async, max_events, "ASYNC_MAX_EVENTS=4") {
400     ucs_status_t status;
401     ucs_async_context_t async;
402 
403     status = ucs_async_context_init(&async, GetParam());
404     ASSERT_UCS_OK(status);
405 
406     /* 4 timers should be OK */
407     std::vector<int> timers;
408     for (unsigned count = 0; count < 4; ++count) {
409         int timer_id;
410         status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
411                                      (ucs_async_event_cb_t)ucs_empty_function,
412                                      NULL, &async, &timer_id);
413         ASSERT_UCS_OK(status);
414         timers.push_back(timer_id);
415     }
416 
417     /* 5th timer should fail */
418     int timer_id;
419     status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
420                                  (ucs_async_event_cb_t)ucs_empty_function,
421                                  NULL, &async, &timer_id);
422     EXPECT_EQ(UCS_ERR_EXCEEDS_LIMIT, status);
423 
424     if (status == UCS_OK) {
425         timers.push_back(timer_id);
426     }
427 
428     /* Release timers */
429     for (std::vector<int>::iterator iter = timers.begin(); iter != timers.end(); ++iter) {
430         status = ucs_async_remove_handler(*iter, 1);
431         ASSERT_UCS_OK(status);
432     }
433 
434     ucs_async_context_cleanup(&async);
435 }
436 
UCS_TEST_P(test_async,many_timers)437 UCS_TEST_P(test_async, many_timers) {
438     int max_iters = 4010 / ucs::test_time_multiplier();
439     for (int count = 0; count < max_iters; ++count) {
440         std::vector<int> timers;
441         ucs_status_t status;
442         int timer_id;
443 
444         for (int count2 = 0; count2 < 250; ++count2) {
445             status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
446                                          (ucs_async_event_cb_t)ucs_empty_function,
447                                          NULL, NULL, &timer_id);
448             ASSERT_UCS_OK(status);
449             timers.push_back(timer_id);
450         }
451 
452         while (!timers.empty()) {
453             ucs_async_remove_handler(timers.back(), 0);
454             timers.pop_back();
455         }
456     }
457 }
458 
UCS_TEST_P(test_async,ctx_event)459 UCS_TEST_P(test_async, ctx_event) {
460     local_event le(GetParam());
461     le.push_event();
462     expect_count_GE(le, 1);
463 }
464 
UCS_TEST_P(test_async,ctx_timer)465 UCS_TEST_P(test_async, ctx_timer) {
466     local_timer lt(GetParam());
467     expect_count_GE(lt, TIMER_EXP_COUNT);
468 }
469 
UCS_TEST_P(test_async,two_timers)470 UCS_TEST_P(test_async, two_timers) {
471     local_timer lt1(GetParam());
472     local_timer lt2(GetParam());
473     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
474         suspend_and_poll2(&lt1, &lt2, COUNT * 4);
475         if ((lt1.count() >= TIMER_EXP_COUNT) &&
476             (lt2.count() >= TIMER_EXP_COUNT)) {
477              break;
478         }
479         UCS_TEST_MESSAGE << "retry " << (retry + 1);
480     }
481     EXPECT_GE(lt1.count(), int(TIMER_EXP_COUNT));
482     EXPECT_GE(lt2.count(), int(TIMER_EXP_COUNT));
483 }
484 
UCS_TEST_P(test_async,ctx_event_block)485 UCS_TEST_P(test_async, ctx_event_block) {
486     local_event le(GetParam());
487     int count = 0;
488 
489     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
490         le.block();
491         count = le.count();
492         le.push_event();
493         suspend_and_poll(&le, COUNT);
494         EXPECT_EQ(count, le.count());
495         le.unblock();
496 
497         le.check_miss();
498         if (le.count() > count) {
499             break;
500         }
501         UCS_TEST_MESSAGE << "retry " << (retry + 1);
502     }
503     EXPECT_GT(le.count(), count);
504 }
505 
UCS_TEST_P(test_async,ctx_event_block_two_miss)506 UCS_TEST_P(test_async, ctx_event_block_two_miss) {
507     local_event le(GetParam());
508 
509     /* Step 1: While async is blocked, generate two events */
510 
511     le.block();
512     le.push_event();
513     suspend_and_poll(&le, COUNT);
514 
515     le.push_event();
516     suspend_and_poll(&le, COUNT);
517     EXPECT_EQ(0, le.count());
518     le.unblock();
519 
520     /* Step 2: When checking missed events, should get at least one event */
521 
522     le.check_miss();
523     EXPECT_GT(le.count(), 0);
524     int prev_count = le.count();
525 
526     /* Step 2: Block the async again and generate an event */
527 
528     le.block();
529     le.push_event();
530     suspend_and_poll(&le, COUNT);
531     le.unblock();
532 
533     /* Step 2: Check missed events - another event should be found */
534 
535     le.check_miss();
536     EXPECT_GT(le.count(), prev_count);
537 }
538 
UCS_TEST_P(test_async,ctx_timer_block)539 UCS_TEST_P(test_async, ctx_timer_block) {
540     local_timer lt(GetParam());
541     int count = 0;
542 
543     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
544         lt.block();
545         count = lt.count();
546         suspend_and_poll(&lt, COUNT);
547         EXPECT_EQ(count, lt.count());
548         lt.unblock();
549 
550         lt.check_miss();
551         if (lt.count() > count) {
552             break;
553         }
554         UCS_TEST_MESSAGE << "retry " << (retry + 1);
555     }
556     EXPECT_GT(lt.count(), count); /* Timer could expire again after unblock */
557 }
558 
UCS_TEST_P(test_async,modify_event)559 UCS_TEST_P(test_async, modify_event) {
560     local_event le(GetParam());
561     int count;
562 
563     le.push_event();
564     expect_count_GE(le, 1);
565 
566     ucs_async_modify_handler(le.event_id(), 0);
567     sleep(1);
568     count = le.count();
569     le.push_event();
570     suspend_and_poll(&le, COUNT);
571     EXPECT_EQ(le.count(), count);
572     le.reset();
573 
574     ucs_async_modify_handler(le.event_id(), UCS_EVENT_SET_EVREAD);
575     count = le.count();
576     le.push_event();
577     expect_count_GE(le, count + 1);
578 
579     ucs_async_modify_handler(le.event_id(), 0);
580     sleep(1);
581     count = le.count();
582     le.push_event();
583     suspend_and_poll(&le, COUNT);
584     EXPECT_EQ(le.count(), count);
585 }
586 
UCS_TEST_P(test_async,warn_block)587 UCS_TEST_P(test_async, warn_block) {
588     {
589         scoped_log_handler slh(hide_warns_logger);
590         {
591             local_event le(GetParam());
592             le.block();
593         }
594     }
595 
596     int warn_count = m_warnings.size();
597     for (int i = 0; i < warn_count; ++i) {
598         UCS_TEST_MESSAGE << "< " << m_warnings[i] << " >";
599     }
600 
601     if (GetParam() != UCS_ASYNC_MODE_POLL) {
602         EXPECT_GE(warn_count, 1);
603     }
604 }
605 
606 class local_timer_long_handler : public local_timer {
607 public:
local_timer_long_handler(ucs_async_mode_t mode,int sleep_usec)608     local_timer_long_handler(ucs_async_mode_t mode, int sleep_usec) :
609         local_timer(mode), m_sleep_usec(sleep_usec) {
610     }
611 
handler()612     virtual void handler() {
613         /* The handler would sleep long enough to increment the counter after
614          * main thread already considers it removed - unless the main thread
615          * waits for handler completion properly.
616          * It sleeps only once to avoid timer overrun deadlock in signal mode.
617          */
618         ucs::safe_usleep(m_sleep_usec * 2);
619         m_sleep_usec = 0;
620         local_timer::handler();
621     }
622 
623     int m_sleep_usec;
624 };
625 
UCS_TEST_P(test_async,remove_sync)626 UCS_TEST_P(test_async, remove_sync) {
627 
628     /* create another handler so that removing the timer would not have to
629      * completely cleanup the async context, and race condition could happen
630      */
631     local_timer le(GetParam());
632 
633     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
634         local_timer_long_handler lt(GetParam(), SLEEP_USEC * 2);
635         suspend_and_poll(&lt, 1);
636         lt.unset_handler(true);
637         int count = lt.count();
638         suspend_and_poll(&lt, 1);
639         ASSERT_EQ(count, lt.count());
640     }
641 }
642 
643 class local_timer_remove_handler : public local_timer {
644 public:
local_timer_remove_handler(ucs_async_mode_t mode)645     local_timer_remove_handler(ucs_async_mode_t mode) : local_timer(mode) {
646     }
647 
648 protected:
handler()649     virtual void handler() {
650          base::handler();
651          unset_handler(false);
652     }
653 };
654 
UCS_TEST_P(test_async,timer_unset_from_handler)655 UCS_TEST_P(test_async, timer_unset_from_handler) {
656     local_timer_remove_handler lt(GetParam());
657 
658     expect_count_GE(lt, 1);
659     suspend_and_poll(&lt, COUNT);
660     EXPECT_LE(lt.count(), 5); /* timer could fire multiple times before we remove it */
661     int count = lt.count();
662     suspend_and_poll(&lt, COUNT);
663     EXPECT_EQ(count, lt.count());
664 }
665 
666 class local_event_remove_handler : public local_event {
667 public:
local_event_remove_handler(ucs_async_mode_t mode,bool sync)668     local_event_remove_handler(ucs_async_mode_t mode, bool sync) :
669         local_event(mode), m_sync(sync) {
670     }
671 
672 protected:
handler()673     virtual void handler() {
674          base::handler();
675          unset_handler(m_sync);
676     }
677 
678 private:
679     bool m_sync;
680 };
681 
682 class test_async_event_unset_from_handler : public test_async {
683 protected:
test_unset_from_handler(bool sync)684     void test_unset_from_handler(bool sync) {
685         local_event_remove_handler le(GetParam(), sync);
686 
687         for (int iter = 0; iter < 5; ++iter) {
688             le.push_event();
689             expect_count_GE(le, 1);
690             EXPECT_EQ(1, le.count());
691         }
692     }
693 };
694 
UCS_TEST_P(test_async_event_unset_from_handler,sync)695 UCS_TEST_P(test_async_event_unset_from_handler, sync) {
696     test_unset_from_handler(true);
697 }
698 
UCS_TEST_P(test_async_event_unset_from_handler,async)699 UCS_TEST_P(test_async_event_unset_from_handler, async) {
700     test_unset_from_handler(false);
701 }
702 
703 class local_event_add_handler : public local_event {
704 public:
local_event_add_handler(ucs_async_mode_t mode)705     local_event_add_handler(ucs_async_mode_t mode) :
706         local_event(mode), m_event_set(false)
707     {
708         int ret = pipe(m_pipefd);
709         ucs_assertv_always(0 == ret, "%m");
710     }
711 
~local_event_add_handler()712     ~local_event_add_handler() {
713         close(m_pipefd[0]);
714         close(m_pipefd[1]);
715     }
716 
unset_handler(int sync)717     void unset_handler(int sync) {
718         local_event::unset_handler(sync);
719         if (m_event_set) {
720             ucs_status_t status = ucs_async_remove_handler(m_pipefd[0], sync);
721             ASSERT_UCS_OK(status);
722             m_event_set = false;
723         }
724     }
725 
726 protected:
dummy_cb(int id,int events,void * arg)727     static void dummy_cb(int id, int events, void *arg) {
728     }
729 
handler()730     virtual void handler() {
731          base::handler();
732          if (!m_event_set) {
733              ucs_status_t status =
734                  ucs_async_set_event_handler(mode(), m_pipefd[0],
735                                              UCS_EVENT_SET_EVREAD,
736                                              dummy_cb, this, &m_async);
737              ASSERT_UCS_OK(status);
738              m_event_set = true;
739          }
740     }
741 
742     int m_pipefd[2];
743     bool m_event_set;
744 };
745 
UCS_TEST_P(test_async,event_add_from_handler)746 UCS_TEST_P(test_async, event_add_from_handler) {
747     local_event_add_handler le(GetParam());
748 
749     le.push_event();
750     sched_yield(); /* let the async handler run, to provoke the race */
751     le.unset_handler(1);
752 }
753 
754 typedef test_async_mt<local_event> test_async_event_mt;
755 typedef test_async_mt<local_timer> test_async_timer_mt;
756 
757 /*
758  * Run multiple threads which all process events independently.
759  */
760 UCS_TEST_SKIP_COND_P(test_async_event_mt, multithread,
761                      !(HAVE_DECL_F_SETOWN_EX)) {
762     const int exp_min_count = (int)(COUNT * 0.5);
763     int min_count = 0;
764     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
765         spawn();
766         for (int j = 0; j < COUNT; ++j) {
767             for (unsigned i = 0; i < NUM_THREADS; ++i) {
768                 event(i)->push_event();
769                 suspend();
770             }
771         }
772         suspend();
773         stop();
774 
775         min_count = std::numeric_limits<int>::max();
776         for (unsigned i = 0; i < NUM_THREADS; ++i) {
777             int count = thread_count(i);
778             min_count = ucs_min(count, min_count);
779         }
780         if (min_count >= exp_min_count) {
781             break;
782         }
783 
784         UCS_TEST_MESSAGE << "retry " << (retry + 1);
785     }
786     EXPECT_GE(min_count, exp_min_count);
787 }
788 
UCS_TEST_P(test_async_timer_mt,multithread)789 UCS_TEST_P(test_async_timer_mt, multithread) {
790     const int exp_min_count = (int)(COUNT * 0.10);
791     int min_count = 0;
792     for (int retry = 0; retry < NUM_RETRIES; ++retry) {
793         spawn();
794         suspend(2 * COUNT);
795         stop();
796 
797         min_count = std::numeric_limits<int>::max();
798         for (unsigned i = 0; i < NUM_THREADS; ++i) {
799             int count = thread_count(i);
800             min_count = ucs_min(count, min_count);
801         }
802         if (min_count >= exp_min_count) {
803             break;
804         }
805     }
806     EXPECT_GE(min_count, exp_min_count);
807 }
808 
operator <<(std::ostream & os,ucs_async_mode_t mode)809 std::ostream& operator<<(std::ostream& os, ucs_async_mode_t mode)
810 {
811     return os << ucs_async_mode_names[mode];
812 }
813 
814 #define INSTANTIATE_ASYNC_TEST_CASES(_test_fixture) \
815     INSTANTIATE_TEST_CASE_P(signal,          _test_fixture, ::testing::Values(UCS_ASYNC_MODE_SIGNAL)); \
816     INSTANTIATE_TEST_CASE_P(thread_spinlock, _test_fixture, ::testing::Values(UCS_ASYNC_MODE_THREAD_SPINLOCK)); \
817     INSTANTIATE_TEST_CASE_P(thread_mutex,    _test_fixture, ::testing::Values(UCS_ASYNC_MODE_THREAD_MUTEX)); \
818     INSTANTIATE_TEST_CASE_P(poll,            _test_fixture, ::testing::Values(UCS_ASYNC_MODE_POLL));
819 
820 INSTANTIATE_ASYNC_TEST_CASES(test_async);
821 INSTANTIATE_ASYNC_TEST_CASES(test_async_event_unset_from_handler);
822 INSTANTIATE_ASYNC_TEST_CASES(test_async_event_mt);
823 INSTANTIATE_ASYNC_TEST_CASES(test_async_timer_mt);
824