1 #![feature(test)]
2 
3 extern crate crossbeam_channel;
4 extern crate crossbeam_utils;
5 extern crate num_cpus;
6 extern crate test;
7 
8 use crossbeam_channel::{bounded, unbounded};
9 use crossbeam_utils::thread::scope;
10 use test::Bencher;
11 
12 const TOTAL_STEPS: usize = 40_000;
13 
14 mod unbounded {
15     use super::*;
16 
17     #[bench]
create(b: &mut Bencher)18     fn create(b: &mut Bencher) {
19         b.iter(|| unbounded::<i32>());
20     }
21 
22     #[bench]
oneshot(b: &mut Bencher)23     fn oneshot(b: &mut Bencher) {
24         b.iter(|| {
25             let (s, r) = unbounded::<i32>();
26             s.send(0).unwrap();
27             r.recv().unwrap();
28         });
29     }
30 
31     #[bench]
inout(b: &mut Bencher)32     fn inout(b: &mut Bencher) {
33         let (s, r) = unbounded::<i32>();
34         b.iter(|| {
35             s.send(0).unwrap();
36             r.recv().unwrap();
37         });
38     }
39 
40     #[bench]
par_inout(b: &mut Bencher)41     fn par_inout(b: &mut Bencher) {
42         let threads = num_cpus::get();
43         let steps = TOTAL_STEPS / threads;
44         let (s, r) = unbounded::<i32>();
45 
46         let (s1, r1) = bounded(0);
47         let (s2, r2) = bounded(0);
48         scope(|scope| {
49             for _ in 0..threads {
50                 scope.spawn(|_| {
51                     while r1.recv().is_ok() {
52                         for i in 0..steps {
53                             s.send(i as i32).unwrap();
54                             r.recv().unwrap();
55                         }
56                         s2.send(()).unwrap();
57                     }
58                 });
59             }
60 
61             b.iter(|| {
62                 for _ in 0..threads {
63                     s1.send(()).unwrap();
64                 }
65                 for _ in 0..threads {
66                     r2.recv().unwrap();
67                 }
68             });
69             drop(s1);
70         })
71         .unwrap();
72     }
73 
74     #[bench]
spsc(b: &mut Bencher)75     fn spsc(b: &mut Bencher) {
76         let steps = TOTAL_STEPS;
77         let (s, r) = unbounded::<i32>();
78 
79         let (s1, r1) = bounded(0);
80         let (s2, r2) = bounded(0);
81         scope(|scope| {
82             scope.spawn(|_| {
83                 while r1.recv().is_ok() {
84                     for i in 0..steps {
85                         s.send(i as i32).unwrap();
86                     }
87                     s2.send(()).unwrap();
88                 }
89             });
90 
91             b.iter(|| {
92                 s1.send(()).unwrap();
93                 for _ in 0..steps {
94                     r.recv().unwrap();
95                 }
96                 r2.recv().unwrap();
97             });
98             drop(s1);
99         })
100         .unwrap();
101     }
102 
103     #[bench]
spmc(b: &mut Bencher)104     fn spmc(b: &mut Bencher) {
105         let threads = num_cpus::get() - 1;
106         let steps = TOTAL_STEPS / threads;
107         let (s, r) = unbounded::<i32>();
108 
109         let (s1, r1) = bounded(0);
110         let (s2, r2) = bounded(0);
111         scope(|scope| {
112             for _ in 0..threads {
113                 scope.spawn(|_| {
114                     while r1.recv().is_ok() {
115                         for _ in 0..steps {
116                             r.recv().unwrap();
117                         }
118                         s2.send(()).unwrap();
119                     }
120                 });
121             }
122 
123             b.iter(|| {
124                 for _ in 0..threads {
125                     s1.send(()).unwrap();
126                 }
127                 for i in 0..steps * threads {
128                     s.send(i as i32).unwrap();
129                 }
130                 for _ in 0..threads {
131                     r2.recv().unwrap();
132                 }
133             });
134             drop(s1);
135         })
136         .unwrap();
137     }
138 
139     #[bench]
mpsc(b: &mut Bencher)140     fn mpsc(b: &mut Bencher) {
141         let threads = num_cpus::get() - 1;
142         let steps = TOTAL_STEPS / threads;
143         let (s, r) = unbounded::<i32>();
144 
145         let (s1, r1) = bounded(0);
146         let (s2, r2) = bounded(0);
147         scope(|scope| {
148             for _ in 0..threads {
149                 scope.spawn(|_| {
150                     while r1.recv().is_ok() {
151                         for i in 0..steps {
152                             s.send(i as i32).unwrap();
153                         }
154                         s2.send(()).unwrap();
155                     }
156                 });
157             }
158 
159             b.iter(|| {
160                 for _ in 0..threads {
161                     s1.send(()).unwrap();
162                 }
163                 for _ in 0..steps * threads {
164                     r.recv().unwrap();
165                 }
166                 for _ in 0..threads {
167                     r2.recv().unwrap();
168                 }
169             });
170             drop(s1);
171         })
172         .unwrap();
173     }
174 
175     #[bench]
mpmc(b: &mut Bencher)176     fn mpmc(b: &mut Bencher) {
177         let threads = num_cpus::get();
178         let steps = TOTAL_STEPS / threads;
179         let (s, r) = unbounded::<i32>();
180 
181         let (s1, r1) = bounded(0);
182         let (s2, r2) = bounded(0);
183         scope(|scope| {
184             for _ in 0..threads / 2 {
185                 scope.spawn(|_| {
186                     while r1.recv().is_ok() {
187                         for i in 0..steps {
188                             s.send(i as i32).unwrap();
189                         }
190                         s2.send(()).unwrap();
191                     }
192                 });
193             }
194             for _ in 0..threads / 2 {
195                 scope.spawn(|_| {
196                     while r1.recv().is_ok() {
197                         for _ in 0..steps {
198                             r.recv().unwrap();
199                         }
200                         s2.send(()).unwrap();
201                     }
202                 });
203             }
204 
205             b.iter(|| {
206                 for _ in 0..threads {
207                     s1.send(()).unwrap();
208                 }
209                 for _ in 0..threads {
210                     r2.recv().unwrap();
211                 }
212             });
213             drop(s1);
214         })
215         .unwrap();
216     }
217 }
218 
219 mod bounded_n {
220     use super::*;
221 
222     #[bench]
spsc(b: &mut Bencher)223     fn spsc(b: &mut Bencher) {
224         let steps = TOTAL_STEPS;
225         let (s, r) = bounded::<i32>(steps);
226 
227         let (s1, r1) = bounded(0);
228         let (s2, r2) = bounded(0);
229         scope(|scope| {
230             scope.spawn(|_| {
231                 while r1.recv().is_ok() {
232                     for i in 0..steps {
233                         s.send(i as i32).unwrap();
234                     }
235                     s2.send(()).unwrap();
236                 }
237             });
238 
239             b.iter(|| {
240                 s1.send(()).unwrap();
241                 for _ in 0..steps {
242                     r.recv().unwrap();
243                 }
244                 r2.recv().unwrap();
245             });
246             drop(s1);
247         })
248         .unwrap();
249     }
250 
251     #[bench]
spmc(b: &mut Bencher)252     fn spmc(b: &mut Bencher) {
253         let threads = num_cpus::get() - 1;
254         let steps = TOTAL_STEPS / threads;
255         let (s, r) = bounded::<i32>(steps * threads);
256 
257         let (s1, r1) = bounded(0);
258         let (s2, r2) = bounded(0);
259         scope(|scope| {
260             for _ in 0..threads {
261                 scope.spawn(|_| {
262                     while r1.recv().is_ok() {
263                         for _ in 0..steps {
264                             r.recv().unwrap();
265                         }
266                         s2.send(()).unwrap();
267                     }
268                 });
269             }
270 
271             b.iter(|| {
272                 for _ in 0..threads {
273                     s1.send(()).unwrap();
274                 }
275                 for i in 0..steps * threads {
276                     s.send(i as i32).unwrap();
277                 }
278                 for _ in 0..threads {
279                     r2.recv().unwrap();
280                 }
281             });
282             drop(s1);
283         })
284         .unwrap();
285     }
286 
287     #[bench]
mpsc(b: &mut Bencher)288     fn mpsc(b: &mut Bencher) {
289         let threads = num_cpus::get() - 1;
290         let steps = TOTAL_STEPS / threads;
291         let (s, r) = bounded::<i32>(steps * threads);
292 
293         let (s1, r1) = bounded(0);
294         let (s2, r2) = bounded(0);
295         scope(|scope| {
296             for _ in 0..threads {
297                 scope.spawn(|_| {
298                     while r1.recv().is_ok() {
299                         for i in 0..steps {
300                             s.send(i as i32).unwrap();
301                         }
302                         s2.send(()).unwrap();
303                     }
304                 });
305             }
306 
307             b.iter(|| {
308                 for _ in 0..threads {
309                     s1.send(()).unwrap();
310                 }
311                 for _ in 0..steps * threads {
312                     r.recv().unwrap();
313                 }
314                 for _ in 0..threads {
315                     r2.recv().unwrap();
316                 }
317             });
318             drop(s1);
319         })
320         .unwrap();
321     }
322 
323     #[bench]
par_inout(b: &mut Bencher)324     fn par_inout(b: &mut Bencher) {
325         let threads = num_cpus::get();
326         let steps = TOTAL_STEPS / threads;
327         let (s, r) = bounded::<i32>(threads);
328 
329         let (s1, r1) = bounded(0);
330         let (s2, r2) = bounded(0);
331         scope(|scope| {
332             for _ in 0..threads {
333                 scope.spawn(|_| {
334                     while r1.recv().is_ok() {
335                         for i in 0..steps {
336                             s.send(i as i32).unwrap();
337                             r.recv().unwrap();
338                         }
339                         s2.send(()).unwrap();
340                     }
341                 });
342             }
343 
344             b.iter(|| {
345                 for _ in 0..threads {
346                     s1.send(()).unwrap();
347                 }
348                 for _ in 0..threads {
349                     r2.recv().unwrap();
350                 }
351             });
352             drop(s1);
353         })
354         .unwrap();
355     }
356 
357     #[bench]
mpmc(b: &mut Bencher)358     fn mpmc(b: &mut Bencher) {
359         let threads = num_cpus::get();
360         assert_eq!(threads % 2, 0);
361         let steps = TOTAL_STEPS / threads;
362         let (s, r) = bounded::<i32>(steps * threads);
363 
364         let (s1, r1) = bounded(0);
365         let (s2, r2) = bounded(0);
366         scope(|scope| {
367             for _ in 0..threads / 2 {
368                 scope.spawn(|_| {
369                     while r1.recv().is_ok() {
370                         for i in 0..steps {
371                             s.send(i as i32).unwrap();
372                         }
373                         s2.send(()).unwrap();
374                     }
375                 });
376             }
377             for _ in 0..threads / 2 {
378                 scope.spawn(|_| {
379                     while r1.recv().is_ok() {
380                         for _ in 0..steps {
381                             r.recv().unwrap();
382                         }
383                         s2.send(()).unwrap();
384                     }
385                 });
386             }
387 
388             b.iter(|| {
389                 for _ in 0..threads {
390                     s1.send(()).unwrap();
391                 }
392                 for _ in 0..threads {
393                     r2.recv().unwrap();
394                 }
395             });
396             drop(s1);
397         })
398         .unwrap();
399     }
400 }
401 
402 mod bounded_1 {
403     use super::*;
404 
405     #[bench]
create(b: &mut Bencher)406     fn create(b: &mut Bencher) {
407         b.iter(|| bounded::<i32>(1));
408     }
409 
410     #[bench]
oneshot(b: &mut Bencher)411     fn oneshot(b: &mut Bencher) {
412         b.iter(|| {
413             let (s, r) = bounded::<i32>(1);
414             s.send(0).unwrap();
415             r.recv().unwrap();
416         });
417     }
418 
419     #[bench]
spsc(b: &mut Bencher)420     fn spsc(b: &mut Bencher) {
421         let steps = TOTAL_STEPS;
422         let (s, r) = bounded::<i32>(1);
423 
424         let (s1, r1) = bounded(0);
425         let (s2, r2) = bounded(0);
426         scope(|scope| {
427             scope.spawn(|_| {
428                 while r1.recv().is_ok() {
429                     for i in 0..steps {
430                         s.send(i as i32).unwrap();
431                     }
432                     s2.send(()).unwrap();
433                 }
434             });
435 
436             b.iter(|| {
437                 s1.send(()).unwrap();
438                 for _ in 0..steps {
439                     r.recv().unwrap();
440                 }
441                 r2.recv().unwrap();
442             });
443             drop(s1);
444         })
445         .unwrap();
446     }
447 
448     #[bench]
spmc(b: &mut Bencher)449     fn spmc(b: &mut Bencher) {
450         let threads = num_cpus::get() - 1;
451         let steps = TOTAL_STEPS / threads;
452         let (s, r) = bounded::<i32>(1);
453 
454         let (s1, r1) = bounded(0);
455         let (s2, r2) = bounded(0);
456         scope(|scope| {
457             for _ in 0..threads {
458                 scope.spawn(|_| {
459                     while r1.recv().is_ok() {
460                         for _ in 0..steps {
461                             r.recv().unwrap();
462                         }
463                         s2.send(()).unwrap();
464                     }
465                 });
466             }
467 
468             b.iter(|| {
469                 for _ in 0..threads {
470                     s1.send(()).unwrap();
471                 }
472                 for i in 0..steps * threads {
473                     s.send(i as i32).unwrap();
474                 }
475                 for _ in 0..threads {
476                     r2.recv().unwrap();
477                 }
478             });
479             drop(s1);
480         })
481         .unwrap();
482     }
483 
484     #[bench]
mpsc(b: &mut Bencher)485     fn mpsc(b: &mut Bencher) {
486         let threads = num_cpus::get() - 1;
487         let steps = TOTAL_STEPS / threads;
488         let (s, r) = bounded::<i32>(1);
489 
490         let (s1, r1) = bounded(0);
491         let (s2, r2) = bounded(0);
492         scope(|scope| {
493             for _ in 0..threads {
494                 scope.spawn(|_| {
495                     while r1.recv().is_ok() {
496                         for i in 0..steps {
497                             s.send(i as i32).unwrap();
498                         }
499                         s2.send(()).unwrap();
500                     }
501                 });
502             }
503 
504             b.iter(|| {
505                 for _ in 0..threads {
506                     s1.send(()).unwrap();
507                 }
508                 for _ in 0..steps * threads {
509                     r.recv().unwrap();
510                 }
511                 for _ in 0..threads {
512                     r2.recv().unwrap();
513                 }
514             });
515             drop(s1);
516         })
517         .unwrap();
518     }
519 
520     #[bench]
mpmc(b: &mut Bencher)521     fn mpmc(b: &mut Bencher) {
522         let threads = num_cpus::get();
523         let steps = TOTAL_STEPS / threads;
524         let (s, r) = bounded::<i32>(1);
525 
526         let (s1, r1) = bounded(0);
527         let (s2, r2) = bounded(0);
528         scope(|scope| {
529             for _ in 0..threads / 2 {
530                 scope.spawn(|_| {
531                     while r1.recv().is_ok() {
532                         for i in 0..steps {
533                             s.send(i as i32).unwrap();
534                         }
535                         s2.send(()).unwrap();
536                     }
537                 });
538             }
539             for _ in 0..threads / 2 {
540                 scope.spawn(|_| {
541                     while r1.recv().is_ok() {
542                         for _ in 0..steps {
543                             r.recv().unwrap();
544                         }
545                         s2.send(()).unwrap();
546                     }
547                 });
548             }
549 
550             b.iter(|| {
551                 for _ in 0..threads {
552                     s1.send(()).unwrap();
553                 }
554                 for _ in 0..threads {
555                     r2.recv().unwrap();
556                 }
557             });
558             drop(s1);
559         })
560         .unwrap();
561     }
562 }
563 
564 mod bounded_0 {
565     use super::*;
566 
567     #[bench]
create(b: &mut Bencher)568     fn create(b: &mut Bencher) {
569         b.iter(|| bounded::<i32>(0));
570     }
571 
572     #[bench]
spsc(b: &mut Bencher)573     fn spsc(b: &mut Bencher) {
574         let steps = TOTAL_STEPS;
575         let (s, r) = bounded::<i32>(0);
576 
577         let (s1, r1) = bounded(0);
578         let (s2, r2) = bounded(0);
579         scope(|scope| {
580             scope.spawn(|_| {
581                 while r1.recv().is_ok() {
582                     for i in 0..steps {
583                         s.send(i as i32).unwrap();
584                     }
585                     s2.send(()).unwrap();
586                 }
587             });
588 
589             b.iter(|| {
590                 s1.send(()).unwrap();
591                 for _ in 0..steps {
592                     r.recv().unwrap();
593                 }
594                 r2.recv().unwrap();
595             });
596             drop(s1);
597         })
598         .unwrap();
599     }
600 
601     #[bench]
spmc(b: &mut Bencher)602     fn spmc(b: &mut Bencher) {
603         let threads = num_cpus::get() - 1;
604         let steps = TOTAL_STEPS / threads;
605         let (s, r) = bounded::<i32>(0);
606 
607         let (s1, r1) = bounded(0);
608         let (s2, r2) = bounded(0);
609         scope(|scope| {
610             for _ in 0..threads {
611                 scope.spawn(|_| {
612                     while r1.recv().is_ok() {
613                         for _ in 0..steps {
614                             r.recv().unwrap();
615                         }
616                         s2.send(()).unwrap();
617                     }
618                 });
619             }
620 
621             b.iter(|| {
622                 for _ in 0..threads {
623                     s1.send(()).unwrap();
624                 }
625                 for i in 0..steps * threads {
626                     s.send(i as i32).unwrap();
627                 }
628                 for _ in 0..threads {
629                     r2.recv().unwrap();
630                 }
631             });
632             drop(s1);
633         })
634         .unwrap();
635     }
636 
637     #[bench]
mpsc(b: &mut Bencher)638     fn mpsc(b: &mut Bencher) {
639         let threads = num_cpus::get() - 1;
640         let steps = TOTAL_STEPS / threads;
641         let (s, r) = bounded::<i32>(0);
642 
643         let (s1, r1) = bounded(0);
644         let (s2, r2) = bounded(0);
645         scope(|scope| {
646             for _ in 0..threads {
647                 scope.spawn(|_| {
648                     while r1.recv().is_ok() {
649                         for i in 0..steps {
650                             s.send(i as i32).unwrap();
651                         }
652                         s2.send(()).unwrap();
653                     }
654                 });
655             }
656 
657             b.iter(|| {
658                 for _ in 0..threads {
659                     s1.send(()).unwrap();
660                 }
661                 for _ in 0..steps * threads {
662                     r.recv().unwrap();
663                 }
664                 for _ in 0..threads {
665                     r2.recv().unwrap();
666                 }
667             });
668             drop(s1);
669         })
670         .unwrap();
671     }
672 
673     #[bench]
mpmc(b: &mut Bencher)674     fn mpmc(b: &mut Bencher) {
675         let threads = num_cpus::get();
676         let steps = TOTAL_STEPS / threads;
677         let (s, r) = bounded::<i32>(0);
678 
679         let (s1, r1) = bounded(0);
680         let (s2, r2) = bounded(0);
681         scope(|scope| {
682             for _ in 0..threads / 2 {
683                 scope.spawn(|_| {
684                     while r1.recv().is_ok() {
685                         for i in 0..steps {
686                             s.send(i as i32).unwrap();
687                         }
688                         s2.send(()).unwrap();
689                     }
690                 });
691             }
692             for _ in 0..threads / 2 {
693                 scope.spawn(|_| {
694                     while r1.recv().is_ok() {
695                         for _ in 0..steps {
696                             r.recv().unwrap();
697                         }
698                         s2.send(()).unwrap();
699                     }
700                 });
701             }
702 
703             b.iter(|| {
704                 for _ in 0..threads {
705                     s1.send(()).unwrap();
706                 }
707                 for _ in 0..threads {
708                     r2.recv().unwrap();
709                 }
710             });
711             drop(s1);
712         })
713         .unwrap();
714     }
715 }
716