1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #include <common/test.h>
8 #include <common/test_helpers.h>
9
10 extern "C" {
11 #include <ucs/arch/atomic.h>
12 #include <ucs/async/async.h>
13 #include <ucs/async/pipe.h>
14 #include <ucs/sys/sys.h>
15 }
16
17 #include <sys/poll.h>
18
19
20 class base {
21 public:
base(ucs_async_mode_t mode)22 base(ucs_async_mode_t mode) : m_mode(mode), m_count(0), m_handler_set(0) {
23 }
24
~base()25 virtual ~base() {
26 }
27
count() const28 int count() const {
29 return m_count;
30 }
31
set_handler()32 void set_handler() {
33 ASSERT_FALSE(m_handler_set);
34 m_handler_set = 1;
35 }
36
unset_handler(bool sync=true)37 void unset_handler(bool sync = true) {
38 if (ucs_atomic_cswap32(&m_handler_set, 1, 0)) {
39 ucs_status_t status = ucs_async_remove_handler(event_id(), sync);
40 ASSERT_UCS_OK(status);
41 }
42 }
43
44 private:
45 base(const base& other);
46
47 protected:
48 virtual void ack_event() = 0;
49 virtual int event_id() = 0;
50
cb(int id,int events,void * arg)51 static void cb(int id, int events, void *arg) {
52 base *self = reinterpret_cast<base*>(arg);
53 self->handler();
54 }
55
mode() const56 ucs_async_mode_t mode() const {
57 return m_mode;
58 }
59
handler()60 virtual void handler() {
61 ++m_count;
62 ack_event();
63 }
64
65 const ucs_async_mode_t m_mode;
66 int m_count;
67 uint32_t m_handler_set;
68 };
69
70 class base_event : public base {
71 public:
base_event(ucs_async_mode_t mode)72 base_event(ucs_async_mode_t mode) : base(mode) {
73 ucs_status_t status = ucs_async_pipe_create(&m_event_pipe);
74 ASSERT_UCS_OK(status);
75 }
76
~base_event()77 virtual ~base_event() {
78 ucs_async_pipe_destroy(&m_event_pipe);
79 }
80
set_handler(ucs_async_context_t * async)81 void set_handler(ucs_async_context_t *async) {
82 ucs_status_t status =
83 ucs_async_set_event_handler(mode(), event_fd(),
84 UCS_EVENT_SET_EVREAD,
85 cb, this, async);
86 ASSERT_UCS_OK(status);
87 base::set_handler();
88 }
89
event_id()90 virtual int event_id() {
91 return event_fd();
92 }
93
push_event()94 void push_event() {
95 ucs_async_pipe_push(&m_event_pipe);
96 }
97
reset()98 void reset() {
99 ucs_async_pipe_drain(&m_event_pipe);
100 }
101
102 protected:
ack_event()103 virtual void ack_event() {
104 reset();
105 }
106
107 private:
event_fd()108 int event_fd() {
109 return ucs_async_pipe_rfd(&m_event_pipe);
110 }
111
112 ucs_async_pipe_t m_event_pipe;
113 };
114
115 class base_timer : public base {
116 public:
base_timer(ucs_async_mode_t mode)117 base_timer(ucs_async_mode_t mode) :
118 base(mode), m_timer_id(-1)
119 {
120 }
121
122 /*
123 * Cannot call this from constructor - vptr not ready!
124 */
set_timer(ucs_async_context_t * async,ucs_time_t interval)125 void set_timer(ucs_async_context_t *async, ucs_time_t interval) {
126 ucs_assert(m_timer_id == -1);
127 ucs_status_t status = ucs_async_add_timer(mode(), interval, cb,
128 this, async, &m_timer_id);
129 ASSERT_UCS_OK(status);
130 base::set_handler();
131 }
132
event_id()133 virtual int event_id() {
134 return m_timer_id;
135 }
136
137 protected:
ack_event()138 virtual void ack_event() {
139 }
140
141 private:
142 int m_timer_id;
143 };
144
145
146 class async_poll {
147 public:
148 virtual void poll() = 0;
~async_poll()149 virtual ~async_poll() {
150 }
151 };
152
153 class global : public async_poll {
154 public:
poll()155 virtual void poll() {
156 ucs_async_poll(NULL);
157 }
158
~global()159 virtual ~global() {
160 }
161 };
162
163 class global_event : public global, public base_event {
164 public:
global_event(ucs_async_mode_t mode)165 global_event(ucs_async_mode_t mode) : base_event(mode) {
166 set_handler(NULL);
167 }
168
~global_event()169 ~global_event() {
170 unset_handler();
171 }
172 };
173
174 class global_timer : public global, public base_timer {
175 public:
global_timer(ucs_async_mode_t mode)176 global_timer(ucs_async_mode_t mode) : base_timer(mode) {
177 set_timer(NULL, ucs_time_from_usec(1000));
178 }
179
~global_timer()180 ~global_timer() {
181 unset_handler();
182 }
183 };
184
185 class local : public async_poll {
186 public:
local(ucs_async_mode_t mode)187 local(ucs_async_mode_t mode) {
188 ucs_status_t status = ucs_async_context_init(&m_async, mode);
189 ASSERT_UCS_OK(status);
190 }
191
~local()192 virtual ~local() {
193 ucs_async_context_cleanup(&m_async);
194 }
195
block()196 void block() {
197 UCS_ASYNC_BLOCK(&m_async);
198 }
199
unblock()200 void unblock() {
201 UCS_ASYNC_UNBLOCK(&m_async);
202 }
203
check_miss()204 void check_miss() {
205 ucs_async_check_miss(&m_async);
206 }
207
poll()208 virtual void poll() {
209 ucs_async_poll(&m_async);
210 }
211
212 protected:
213 ucs_async_context_t m_async;
214 };
215
216 class local_event : public local,
217 public base_event
218 {
219 public:
local_event(ucs_async_mode_t mode)220 local_event(ucs_async_mode_t mode) : local(mode), base_event(mode) {
221 set_handler(&m_async);
222 }
223
~local_event()224 ~local_event() {
225 unset_handler();
226 }
227 };
228
229 class local_timer : public local,
230 public base_timer
231 {
232 public:
233 static const int TIMER_INTERVAL_USEC = 1000;
234
local_timer(ucs_async_mode_t mode)235 local_timer(ucs_async_mode_t mode) : local(mode), base_timer(mode) {
236 set_timer(&m_async, ucs_time_from_usec(TIMER_INTERVAL_USEC));
237 }
238
~local_timer()239 ~local_timer() {
240 unset_handler();
241 }
242 };
243
244 class test_async : public testing::TestWithParam<ucs_async_mode_t>,
245 public ucs::test_base {
246 public:
247 UCS_TEST_BASE_IMPL;
248
249 protected:
250 static const int COUNT = 40;
251 static const unsigned SLEEP_USEC = 1000;
252 static const int NUM_RETRIES = 100;
253 static const int TIMER_EXP_COUNT = COUNT / 4;
254
suspend(double scale=1.0)255 void suspend(double scale = 1.0) {
256 ucs::safe_usleep(ucs_max(scale * SLEEP_USEC, 0) *
257 ucs::test_time_multiplier());
258 }
259
suspend_and_poll(async_poll * p,double scale=1.0)260 void suspend_and_poll(async_poll *p, double scale = 1.0) {
261 if (GetParam() == UCS_ASYNC_MODE_POLL) {
262 for (double t = 0; t < scale; t += 1.0) {
263 suspend();
264 p->poll();
265 }
266 } else {
267 suspend(scale);
268 }
269 }
270
suspend_and_poll2(async_poll * p1,async_poll * p2,double scale=1.0)271 void suspend_and_poll2(async_poll *p1, async_poll *p2, double scale = 1.0) {
272 if (GetParam() == UCS_ASYNC_MODE_POLL) {
273 for (double t = 0; t < scale; t += 1.0) {
274 suspend();
275 p1->poll();
276 p2->poll();
277 }
278 } else {
279 suspend(scale);
280 }
281 }
282
283 template<typename E>
expect_count_GE(E & event,int value)284 void expect_count_GE(E& event, int value) {
285 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
286 suspend_and_poll(&event, COUNT);
287 if (event.count() >= value) {
288 return;
289 }
290 UCS_TEST_MESSAGE << "retry " << (retry + 1);
291 }
292 EXPECT_GE(event.count(), value) << "after " << int(NUM_RETRIES)
293 << " retries";
294 }
295
296 };
297
298 template<typename LOCAL>
299 class test_async_mt : public test_async {
300 protected:
301 static const unsigned NUM_THREADS = 32;
302
test_async_mt()303 test_async_mt() {
304 for (unsigned i = 0; i < NUM_THREADS; ++i) {
305 m_ev[i] = NULL;
306 }
307 }
308
init()309 virtual void init() {
310 pthread_barrier_init(&m_barrier, NULL, NUM_THREADS + 1);
311 }
312
thread_run(unsigned index)313 int thread_run(unsigned index) {
314 LOCAL* le;
315 m_ev[index] = le = new LOCAL(GetParam());
316
317 barrier();
318
319 while (!m_stop[index]) {
320 le->block();
321 unsigned before = le->count();
322 suspend_and_poll(le, 1.0);
323 unsigned after = le->count();
324 le->unblock();
325
326 EXPECT_EQ(before, after); /* Should not handle while blocked */
327 le->check_miss();
328 suspend_and_poll(le, 1.0);
329 }
330
331 int result = le->count();
332 delete le;
333 m_ev[index] = NULL;
334 return result;
335 }
336
spawn()337 void spawn() {
338 for (unsigned i = 0; i < NUM_THREADS; ++i) {
339 m_stop[i] = false;
340 pthread_create(&m_threads[i], NULL, thread_func, (void*)this);
341 }
342 barrier();
343 }
344
stop()345 void stop() {
346 for (unsigned i = 0; i < NUM_THREADS; ++i) {
347 m_stop[i] = true;
348 void *result;
349 pthread_join(m_threads[i], &result);
350 m_thread_counts[i] = (int)(uintptr_t)result;
351 }
352 }
353
event(unsigned thread)354 LOCAL* event(unsigned thread) {
355 return m_ev[thread];
356 }
357
thread_count(unsigned thread)358 int thread_count(unsigned thread) {
359 return m_thread_counts[thread];
360 }
361
362 private:
barrier()363 void barrier() {
364 pthread_barrier_wait(&m_barrier);
365 }
366
thread_func(void * arg)367 static void *thread_func(void *arg)
368 {
369 test_async_mt *self = reinterpret_cast<test_async_mt*>(arg);
370
371 for (unsigned index = 0; index < NUM_THREADS; ++index) {
372 if (self->m_threads[index] == pthread_self()) {
373 return (void*)(uintptr_t)self->thread_run(index);
374 }
375 }
376
377 /* Not found */
378 return (void*)-1;
379 }
380
381 pthread_t m_threads[NUM_THREADS];
382 pthread_barrier_t m_barrier;
383 int m_thread_counts[NUM_THREADS];
384 bool m_stop[NUM_THREADS];
385 LOCAL* m_ev[NUM_THREADS];
386 };
387
UCS_TEST_P(test_async,global_event)388 UCS_TEST_P(test_async, global_event) {
389 global_event ge(GetParam());
390 ge.push_event();
391 expect_count_GE(ge, 1);
392 }
393
UCS_TEST_P(test_async,global_timer)394 UCS_TEST_P(test_async, global_timer) {
395 global_timer gt(GetParam());
396 expect_count_GE(gt, COUNT);
397 }
398
399 UCS_TEST_P(test_async, max_events, "ASYNC_MAX_EVENTS=4") {
400 ucs_status_t status;
401 ucs_async_context_t async;
402
403 status = ucs_async_context_init(&async, GetParam());
404 ASSERT_UCS_OK(status);
405
406 /* 4 timers should be OK */
407 std::vector<int> timers;
408 for (unsigned count = 0; count < 4; ++count) {
409 int timer_id;
410 status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
411 (ucs_async_event_cb_t)ucs_empty_function,
412 NULL, &async, &timer_id);
413 ASSERT_UCS_OK(status);
414 timers.push_back(timer_id);
415 }
416
417 /* 5th timer should fail */
418 int timer_id;
419 status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
420 (ucs_async_event_cb_t)ucs_empty_function,
421 NULL, &async, &timer_id);
422 EXPECT_EQ(UCS_ERR_EXCEEDS_LIMIT, status);
423
424 if (status == UCS_OK) {
425 timers.push_back(timer_id);
426 }
427
428 /* Release timers */
429 for (std::vector<int>::iterator iter = timers.begin(); iter != timers.end(); ++iter) {
430 status = ucs_async_remove_handler(*iter, 1);
431 ASSERT_UCS_OK(status);
432 }
433
434 ucs_async_context_cleanup(&async);
435 }
436
UCS_TEST_P(test_async,many_timers)437 UCS_TEST_P(test_async, many_timers) {
438 int max_iters = 4010 / ucs::test_time_multiplier();
439 for (int count = 0; count < max_iters; ++count) {
440 std::vector<int> timers;
441 ucs_status_t status;
442 int timer_id;
443
444 for (int count2 = 0; count2 < 250; ++count2) {
445 status = ucs_async_add_timer(GetParam(), ucs_time_from_sec(1.0),
446 (ucs_async_event_cb_t)ucs_empty_function,
447 NULL, NULL, &timer_id);
448 ASSERT_UCS_OK(status);
449 timers.push_back(timer_id);
450 }
451
452 while (!timers.empty()) {
453 ucs_async_remove_handler(timers.back(), 0);
454 timers.pop_back();
455 }
456 }
457 }
458
UCS_TEST_P(test_async,ctx_event)459 UCS_TEST_P(test_async, ctx_event) {
460 local_event le(GetParam());
461 le.push_event();
462 expect_count_GE(le, 1);
463 }
464
UCS_TEST_P(test_async,ctx_timer)465 UCS_TEST_P(test_async, ctx_timer) {
466 local_timer lt(GetParam());
467 expect_count_GE(lt, TIMER_EXP_COUNT);
468 }
469
UCS_TEST_P(test_async,two_timers)470 UCS_TEST_P(test_async, two_timers) {
471 local_timer lt1(GetParam());
472 local_timer lt2(GetParam());
473 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
474 suspend_and_poll2(<1, <2, COUNT * 4);
475 if ((lt1.count() >= TIMER_EXP_COUNT) &&
476 (lt2.count() >= TIMER_EXP_COUNT)) {
477 break;
478 }
479 UCS_TEST_MESSAGE << "retry " << (retry + 1);
480 }
481 EXPECT_GE(lt1.count(), int(TIMER_EXP_COUNT));
482 EXPECT_GE(lt2.count(), int(TIMER_EXP_COUNT));
483 }
484
UCS_TEST_P(test_async,ctx_event_block)485 UCS_TEST_P(test_async, ctx_event_block) {
486 local_event le(GetParam());
487 int count = 0;
488
489 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
490 le.block();
491 count = le.count();
492 le.push_event();
493 suspend_and_poll(&le, COUNT);
494 EXPECT_EQ(count, le.count());
495 le.unblock();
496
497 le.check_miss();
498 if (le.count() > count) {
499 break;
500 }
501 UCS_TEST_MESSAGE << "retry " << (retry + 1);
502 }
503 EXPECT_GT(le.count(), count);
504 }
505
UCS_TEST_P(test_async,ctx_event_block_two_miss)506 UCS_TEST_P(test_async, ctx_event_block_two_miss) {
507 local_event le(GetParam());
508
509 /* Step 1: While async is blocked, generate two events */
510
511 le.block();
512 le.push_event();
513 suspend_and_poll(&le, COUNT);
514
515 le.push_event();
516 suspend_and_poll(&le, COUNT);
517 EXPECT_EQ(0, le.count());
518 le.unblock();
519
520 /* Step 2: When checking missed events, should get at least one event */
521
522 le.check_miss();
523 EXPECT_GT(le.count(), 0);
524 int prev_count = le.count();
525
526 /* Step 2: Block the async again and generate an event */
527
528 le.block();
529 le.push_event();
530 suspend_and_poll(&le, COUNT);
531 le.unblock();
532
533 /* Step 2: Check missed events - another event should be found */
534
535 le.check_miss();
536 EXPECT_GT(le.count(), prev_count);
537 }
538
UCS_TEST_P(test_async,ctx_timer_block)539 UCS_TEST_P(test_async, ctx_timer_block) {
540 local_timer lt(GetParam());
541 int count = 0;
542
543 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
544 lt.block();
545 count = lt.count();
546 suspend_and_poll(<, COUNT);
547 EXPECT_EQ(count, lt.count());
548 lt.unblock();
549
550 lt.check_miss();
551 if (lt.count() > count) {
552 break;
553 }
554 UCS_TEST_MESSAGE << "retry " << (retry + 1);
555 }
556 EXPECT_GT(lt.count(), count); /* Timer could expire again after unblock */
557 }
558
UCS_TEST_P(test_async,modify_event)559 UCS_TEST_P(test_async, modify_event) {
560 local_event le(GetParam());
561 int count;
562
563 le.push_event();
564 expect_count_GE(le, 1);
565
566 ucs_async_modify_handler(le.event_id(), 0);
567 sleep(1);
568 count = le.count();
569 le.push_event();
570 suspend_and_poll(&le, COUNT);
571 EXPECT_EQ(le.count(), count);
572 le.reset();
573
574 ucs_async_modify_handler(le.event_id(), UCS_EVENT_SET_EVREAD);
575 count = le.count();
576 le.push_event();
577 expect_count_GE(le, count + 1);
578
579 ucs_async_modify_handler(le.event_id(), 0);
580 sleep(1);
581 count = le.count();
582 le.push_event();
583 suspend_and_poll(&le, COUNT);
584 EXPECT_EQ(le.count(), count);
585 }
586
UCS_TEST_P(test_async,warn_block)587 UCS_TEST_P(test_async, warn_block) {
588 {
589 scoped_log_handler slh(hide_warns_logger);
590 {
591 local_event le(GetParam());
592 le.block();
593 }
594 }
595
596 int warn_count = m_warnings.size();
597 for (int i = 0; i < warn_count; ++i) {
598 UCS_TEST_MESSAGE << "< " << m_warnings[i] << " >";
599 }
600
601 if (GetParam() != UCS_ASYNC_MODE_POLL) {
602 EXPECT_GE(warn_count, 1);
603 }
604 }
605
606 class local_timer_long_handler : public local_timer {
607 public:
local_timer_long_handler(ucs_async_mode_t mode,int sleep_usec)608 local_timer_long_handler(ucs_async_mode_t mode, int sleep_usec) :
609 local_timer(mode), m_sleep_usec(sleep_usec) {
610 }
611
handler()612 virtual void handler() {
613 /* The handler would sleep long enough to increment the counter after
614 * main thread already considers it removed - unless the main thread
615 * waits for handler completion properly.
616 * It sleeps only once to avoid timer overrun deadlock in signal mode.
617 */
618 ucs::safe_usleep(m_sleep_usec * 2);
619 m_sleep_usec = 0;
620 local_timer::handler();
621 }
622
623 int m_sleep_usec;
624 };
625
UCS_TEST_P(test_async,remove_sync)626 UCS_TEST_P(test_async, remove_sync) {
627
628 /* create another handler so that removing the timer would not have to
629 * completely cleanup the async context, and race condition could happen
630 */
631 local_timer le(GetParam());
632
633 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
634 local_timer_long_handler lt(GetParam(), SLEEP_USEC * 2);
635 suspend_and_poll(<, 1);
636 lt.unset_handler(true);
637 int count = lt.count();
638 suspend_and_poll(<, 1);
639 ASSERT_EQ(count, lt.count());
640 }
641 }
642
643 class local_timer_remove_handler : public local_timer {
644 public:
local_timer_remove_handler(ucs_async_mode_t mode)645 local_timer_remove_handler(ucs_async_mode_t mode) : local_timer(mode) {
646 }
647
648 protected:
handler()649 virtual void handler() {
650 base::handler();
651 unset_handler(false);
652 }
653 };
654
UCS_TEST_P(test_async,timer_unset_from_handler)655 UCS_TEST_P(test_async, timer_unset_from_handler) {
656 local_timer_remove_handler lt(GetParam());
657
658 expect_count_GE(lt, 1);
659 suspend_and_poll(<, COUNT);
660 EXPECT_LE(lt.count(), 5); /* timer could fire multiple times before we remove it */
661 int count = lt.count();
662 suspend_and_poll(<, COUNT);
663 EXPECT_EQ(count, lt.count());
664 }
665
666 class local_event_remove_handler : public local_event {
667 public:
local_event_remove_handler(ucs_async_mode_t mode,bool sync)668 local_event_remove_handler(ucs_async_mode_t mode, bool sync) :
669 local_event(mode), m_sync(sync) {
670 }
671
672 protected:
handler()673 virtual void handler() {
674 base::handler();
675 unset_handler(m_sync);
676 }
677
678 private:
679 bool m_sync;
680 };
681
682 class test_async_event_unset_from_handler : public test_async {
683 protected:
test_unset_from_handler(bool sync)684 void test_unset_from_handler(bool sync) {
685 local_event_remove_handler le(GetParam(), sync);
686
687 for (int iter = 0; iter < 5; ++iter) {
688 le.push_event();
689 expect_count_GE(le, 1);
690 EXPECT_EQ(1, le.count());
691 }
692 }
693 };
694
UCS_TEST_P(test_async_event_unset_from_handler,sync)695 UCS_TEST_P(test_async_event_unset_from_handler, sync) {
696 test_unset_from_handler(true);
697 }
698
UCS_TEST_P(test_async_event_unset_from_handler,async)699 UCS_TEST_P(test_async_event_unset_from_handler, async) {
700 test_unset_from_handler(false);
701 }
702
703 class local_event_add_handler : public local_event {
704 public:
local_event_add_handler(ucs_async_mode_t mode)705 local_event_add_handler(ucs_async_mode_t mode) :
706 local_event(mode), m_event_set(false)
707 {
708 int ret = pipe(m_pipefd);
709 ucs_assertv_always(0 == ret, "%m");
710 }
711
~local_event_add_handler()712 ~local_event_add_handler() {
713 close(m_pipefd[0]);
714 close(m_pipefd[1]);
715 }
716
unset_handler(int sync)717 void unset_handler(int sync) {
718 local_event::unset_handler(sync);
719 if (m_event_set) {
720 ucs_status_t status = ucs_async_remove_handler(m_pipefd[0], sync);
721 ASSERT_UCS_OK(status);
722 m_event_set = false;
723 }
724 }
725
726 protected:
dummy_cb(int id,int events,void * arg)727 static void dummy_cb(int id, int events, void *arg) {
728 }
729
handler()730 virtual void handler() {
731 base::handler();
732 if (!m_event_set) {
733 ucs_status_t status =
734 ucs_async_set_event_handler(mode(), m_pipefd[0],
735 UCS_EVENT_SET_EVREAD,
736 dummy_cb, this, &m_async);
737 ASSERT_UCS_OK(status);
738 m_event_set = true;
739 }
740 }
741
742 int m_pipefd[2];
743 bool m_event_set;
744 };
745
UCS_TEST_P(test_async,event_add_from_handler)746 UCS_TEST_P(test_async, event_add_from_handler) {
747 local_event_add_handler le(GetParam());
748
749 le.push_event();
750 sched_yield(); /* let the async handler run, to provoke the race */
751 le.unset_handler(1);
752 }
753
754 typedef test_async_mt<local_event> test_async_event_mt;
755 typedef test_async_mt<local_timer> test_async_timer_mt;
756
757 /*
758 * Run multiple threads which all process events independently.
759 */
760 UCS_TEST_SKIP_COND_P(test_async_event_mt, multithread,
761 !(HAVE_DECL_F_SETOWN_EX)) {
762 const int exp_min_count = (int)(COUNT * 0.5);
763 int min_count = 0;
764 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
765 spawn();
766 for (int j = 0; j < COUNT; ++j) {
767 for (unsigned i = 0; i < NUM_THREADS; ++i) {
768 event(i)->push_event();
769 suspend();
770 }
771 }
772 suspend();
773 stop();
774
775 min_count = std::numeric_limits<int>::max();
776 for (unsigned i = 0; i < NUM_THREADS; ++i) {
777 int count = thread_count(i);
778 min_count = ucs_min(count, min_count);
779 }
780 if (min_count >= exp_min_count) {
781 break;
782 }
783
784 UCS_TEST_MESSAGE << "retry " << (retry + 1);
785 }
786 EXPECT_GE(min_count, exp_min_count);
787 }
788
UCS_TEST_P(test_async_timer_mt,multithread)789 UCS_TEST_P(test_async_timer_mt, multithread) {
790 const int exp_min_count = (int)(COUNT * 0.10);
791 int min_count = 0;
792 for (int retry = 0; retry < NUM_RETRIES; ++retry) {
793 spawn();
794 suspend(2 * COUNT);
795 stop();
796
797 min_count = std::numeric_limits<int>::max();
798 for (unsigned i = 0; i < NUM_THREADS; ++i) {
799 int count = thread_count(i);
800 min_count = ucs_min(count, min_count);
801 }
802 if (min_count >= exp_min_count) {
803 break;
804 }
805 }
806 EXPECT_GE(min_count, exp_min_count);
807 }
808
operator <<(std::ostream & os,ucs_async_mode_t mode)809 std::ostream& operator<<(std::ostream& os, ucs_async_mode_t mode)
810 {
811 return os << ucs_async_mode_names[mode];
812 }
813
814 #define INSTANTIATE_ASYNC_TEST_CASES(_test_fixture) \
815 INSTANTIATE_TEST_CASE_P(signal, _test_fixture, ::testing::Values(UCS_ASYNC_MODE_SIGNAL)); \
816 INSTANTIATE_TEST_CASE_P(thread_spinlock, _test_fixture, ::testing::Values(UCS_ASYNC_MODE_THREAD_SPINLOCK)); \
817 INSTANTIATE_TEST_CASE_P(thread_mutex, _test_fixture, ::testing::Values(UCS_ASYNC_MODE_THREAD_MUTEX)); \
818 INSTANTIATE_TEST_CASE_P(poll, _test_fixture, ::testing::Values(UCS_ASYNC_MODE_POLL));
819
820 INSTANTIATE_ASYNC_TEST_CASES(test_async);
821 INSTANTIATE_ASYNC_TEST_CASES(test_async_event_unset_from_handler);
822 INSTANTIATE_ASYNC_TEST_CASES(test_async_event_mt);
823 INSTANTIATE_ASYNC_TEST_CASES(test_async_timer_mt);
824