1 #![feature(test)]
2 
3 extern crate test;
4 
5 use crossbeam_channel::{bounded, unbounded};
6 use crossbeam_utils::thread::scope;
7 use test::Bencher;
8 
9 const TOTAL_STEPS: usize = 40_000;
10 
11 mod unbounded {
12     use super::*;
13 
14     #[bench]
create(b: &mut Bencher)15     fn create(b: &mut Bencher) {
16         b.iter(|| unbounded::<i32>());
17     }
18 
19     #[bench]
oneshot(b: &mut Bencher)20     fn oneshot(b: &mut Bencher) {
21         b.iter(|| {
22             let (s, r) = unbounded::<i32>();
23             s.send(0).unwrap();
24             r.recv().unwrap();
25         });
26     }
27 
28     #[bench]
inout(b: &mut Bencher)29     fn inout(b: &mut Bencher) {
30         let (s, r) = unbounded::<i32>();
31         b.iter(|| {
32             s.send(0).unwrap();
33             r.recv().unwrap();
34         });
35     }
36 
37     #[bench]
par_inout(b: &mut Bencher)38     fn par_inout(b: &mut Bencher) {
39         let threads = num_cpus::get();
40         let steps = TOTAL_STEPS / threads;
41         let (s, r) = unbounded::<i32>();
42 
43         let (s1, r1) = bounded(0);
44         let (s2, r2) = bounded(0);
45         scope(|scope| {
46             for _ in 0..threads {
47                 scope.spawn(|_| {
48                     while r1.recv().is_ok() {
49                         for i in 0..steps {
50                             s.send(i as i32).unwrap();
51                             r.recv().unwrap();
52                         }
53                         s2.send(()).unwrap();
54                     }
55                 });
56             }
57 
58             b.iter(|| {
59                 for _ in 0..threads {
60                     s1.send(()).unwrap();
61                 }
62                 for _ in 0..threads {
63                     r2.recv().unwrap();
64                 }
65             });
66             drop(s1);
67         })
68         .unwrap();
69     }
70 
71     #[bench]
spsc(b: &mut Bencher)72     fn spsc(b: &mut Bencher) {
73         let steps = TOTAL_STEPS;
74         let (s, r) = unbounded::<i32>();
75 
76         let (s1, r1) = bounded(0);
77         let (s2, r2) = bounded(0);
78         scope(|scope| {
79             scope.spawn(|_| {
80                 while r1.recv().is_ok() {
81                     for i in 0..steps {
82                         s.send(i as i32).unwrap();
83                     }
84                     s2.send(()).unwrap();
85                 }
86             });
87 
88             b.iter(|| {
89                 s1.send(()).unwrap();
90                 for _ in 0..steps {
91                     r.recv().unwrap();
92                 }
93                 r2.recv().unwrap();
94             });
95             drop(s1);
96         })
97         .unwrap();
98     }
99 
100     #[bench]
spmc(b: &mut Bencher)101     fn spmc(b: &mut Bencher) {
102         let threads = num_cpus::get() - 1;
103         let steps = TOTAL_STEPS / threads;
104         let (s, r) = unbounded::<i32>();
105 
106         let (s1, r1) = bounded(0);
107         let (s2, r2) = bounded(0);
108         scope(|scope| {
109             for _ in 0..threads {
110                 scope.spawn(|_| {
111                     while r1.recv().is_ok() {
112                         for _ in 0..steps {
113                             r.recv().unwrap();
114                         }
115                         s2.send(()).unwrap();
116                     }
117                 });
118             }
119 
120             b.iter(|| {
121                 for _ in 0..threads {
122                     s1.send(()).unwrap();
123                 }
124                 for i in 0..steps * threads {
125                     s.send(i as i32).unwrap();
126                 }
127                 for _ in 0..threads {
128                     r2.recv().unwrap();
129                 }
130             });
131             drop(s1);
132         })
133         .unwrap();
134     }
135 
136     #[bench]
mpsc(b: &mut Bencher)137     fn mpsc(b: &mut Bencher) {
138         let threads = num_cpus::get() - 1;
139         let steps = TOTAL_STEPS / threads;
140         let (s, r) = unbounded::<i32>();
141 
142         let (s1, r1) = bounded(0);
143         let (s2, r2) = bounded(0);
144         scope(|scope| {
145             for _ in 0..threads {
146                 scope.spawn(|_| {
147                     while r1.recv().is_ok() {
148                         for i in 0..steps {
149                             s.send(i as i32).unwrap();
150                         }
151                         s2.send(()).unwrap();
152                     }
153                 });
154             }
155 
156             b.iter(|| {
157                 for _ in 0..threads {
158                     s1.send(()).unwrap();
159                 }
160                 for _ in 0..steps * threads {
161                     r.recv().unwrap();
162                 }
163                 for _ in 0..threads {
164                     r2.recv().unwrap();
165                 }
166             });
167             drop(s1);
168         })
169         .unwrap();
170     }
171 
172     #[bench]
mpmc(b: &mut Bencher)173     fn mpmc(b: &mut Bencher) {
174         let threads = num_cpus::get();
175         let steps = TOTAL_STEPS / threads;
176         let (s, r) = unbounded::<i32>();
177 
178         let (s1, r1) = bounded(0);
179         let (s2, r2) = bounded(0);
180         scope(|scope| {
181             for _ in 0..threads / 2 {
182                 scope.spawn(|_| {
183                     while r1.recv().is_ok() {
184                         for i in 0..steps {
185                             s.send(i as i32).unwrap();
186                         }
187                         s2.send(()).unwrap();
188                     }
189                 });
190             }
191             for _ in 0..threads / 2 {
192                 scope.spawn(|_| {
193                     while r1.recv().is_ok() {
194                         for _ in 0..steps {
195                             r.recv().unwrap();
196                         }
197                         s2.send(()).unwrap();
198                     }
199                 });
200             }
201 
202             b.iter(|| {
203                 for _ in 0..threads {
204                     s1.send(()).unwrap();
205                 }
206                 for _ in 0..threads {
207                     r2.recv().unwrap();
208                 }
209             });
210             drop(s1);
211         })
212         .unwrap();
213     }
214 }
215 
216 mod bounded_n {
217     use super::*;
218 
219     #[bench]
spsc(b: &mut Bencher)220     fn spsc(b: &mut Bencher) {
221         let steps = TOTAL_STEPS;
222         let (s, r) = bounded::<i32>(steps);
223 
224         let (s1, r1) = bounded(0);
225         let (s2, r2) = bounded(0);
226         scope(|scope| {
227             scope.spawn(|_| {
228                 while r1.recv().is_ok() {
229                     for i in 0..steps {
230                         s.send(i as i32).unwrap();
231                     }
232                     s2.send(()).unwrap();
233                 }
234             });
235 
236             b.iter(|| {
237                 s1.send(()).unwrap();
238                 for _ in 0..steps {
239                     r.recv().unwrap();
240                 }
241                 r2.recv().unwrap();
242             });
243             drop(s1);
244         })
245         .unwrap();
246     }
247 
248     #[bench]
spmc(b: &mut Bencher)249     fn spmc(b: &mut Bencher) {
250         let threads = num_cpus::get() - 1;
251         let steps = TOTAL_STEPS / threads;
252         let (s, r) = bounded::<i32>(steps * threads);
253 
254         let (s1, r1) = bounded(0);
255         let (s2, r2) = bounded(0);
256         scope(|scope| {
257             for _ in 0..threads {
258                 scope.spawn(|_| {
259                     while r1.recv().is_ok() {
260                         for _ in 0..steps {
261                             r.recv().unwrap();
262                         }
263                         s2.send(()).unwrap();
264                     }
265                 });
266             }
267 
268             b.iter(|| {
269                 for _ in 0..threads {
270                     s1.send(()).unwrap();
271                 }
272                 for i in 0..steps * threads {
273                     s.send(i as i32).unwrap();
274                 }
275                 for _ in 0..threads {
276                     r2.recv().unwrap();
277                 }
278             });
279             drop(s1);
280         })
281         .unwrap();
282     }
283 
284     #[bench]
mpsc(b: &mut Bencher)285     fn mpsc(b: &mut Bencher) {
286         let threads = num_cpus::get() - 1;
287         let steps = TOTAL_STEPS / threads;
288         let (s, r) = bounded::<i32>(steps * threads);
289 
290         let (s1, r1) = bounded(0);
291         let (s2, r2) = bounded(0);
292         scope(|scope| {
293             for _ in 0..threads {
294                 scope.spawn(|_| {
295                     while r1.recv().is_ok() {
296                         for i in 0..steps {
297                             s.send(i as i32).unwrap();
298                         }
299                         s2.send(()).unwrap();
300                     }
301                 });
302             }
303 
304             b.iter(|| {
305                 for _ in 0..threads {
306                     s1.send(()).unwrap();
307                 }
308                 for _ in 0..steps * threads {
309                     r.recv().unwrap();
310                 }
311                 for _ in 0..threads {
312                     r2.recv().unwrap();
313                 }
314             });
315             drop(s1);
316         })
317         .unwrap();
318     }
319 
320     #[bench]
par_inout(b: &mut Bencher)321     fn par_inout(b: &mut Bencher) {
322         let threads = num_cpus::get();
323         let steps = TOTAL_STEPS / threads;
324         let (s, r) = bounded::<i32>(threads);
325 
326         let (s1, r1) = bounded(0);
327         let (s2, r2) = bounded(0);
328         scope(|scope| {
329             for _ in 0..threads {
330                 scope.spawn(|_| {
331                     while r1.recv().is_ok() {
332                         for i in 0..steps {
333                             s.send(i as i32).unwrap();
334                             r.recv().unwrap();
335                         }
336                         s2.send(()).unwrap();
337                     }
338                 });
339             }
340 
341             b.iter(|| {
342                 for _ in 0..threads {
343                     s1.send(()).unwrap();
344                 }
345                 for _ in 0..threads {
346                     r2.recv().unwrap();
347                 }
348             });
349             drop(s1);
350         })
351         .unwrap();
352     }
353 
354     #[bench]
mpmc(b: &mut Bencher)355     fn mpmc(b: &mut Bencher) {
356         let threads = num_cpus::get();
357         assert_eq!(threads % 2, 0);
358         let steps = TOTAL_STEPS / threads;
359         let (s, r) = bounded::<i32>(steps * threads);
360 
361         let (s1, r1) = bounded(0);
362         let (s2, r2) = bounded(0);
363         scope(|scope| {
364             for _ in 0..threads / 2 {
365                 scope.spawn(|_| {
366                     while r1.recv().is_ok() {
367                         for i in 0..steps {
368                             s.send(i as i32).unwrap();
369                         }
370                         s2.send(()).unwrap();
371                     }
372                 });
373             }
374             for _ in 0..threads / 2 {
375                 scope.spawn(|_| {
376                     while r1.recv().is_ok() {
377                         for _ in 0..steps {
378                             r.recv().unwrap();
379                         }
380                         s2.send(()).unwrap();
381                     }
382                 });
383             }
384 
385             b.iter(|| {
386                 for _ in 0..threads {
387                     s1.send(()).unwrap();
388                 }
389                 for _ in 0..threads {
390                     r2.recv().unwrap();
391                 }
392             });
393             drop(s1);
394         })
395         .unwrap();
396     }
397 }
398 
399 mod bounded_1 {
400     use super::*;
401 
402     #[bench]
create(b: &mut Bencher)403     fn create(b: &mut Bencher) {
404         b.iter(|| bounded::<i32>(1));
405     }
406 
407     #[bench]
oneshot(b: &mut Bencher)408     fn oneshot(b: &mut Bencher) {
409         b.iter(|| {
410             let (s, r) = bounded::<i32>(1);
411             s.send(0).unwrap();
412             r.recv().unwrap();
413         });
414     }
415 
416     #[bench]
spsc(b: &mut Bencher)417     fn spsc(b: &mut Bencher) {
418         let steps = TOTAL_STEPS;
419         let (s, r) = bounded::<i32>(1);
420 
421         let (s1, r1) = bounded(0);
422         let (s2, r2) = bounded(0);
423         scope(|scope| {
424             scope.spawn(|_| {
425                 while r1.recv().is_ok() {
426                     for i in 0..steps {
427                         s.send(i as i32).unwrap();
428                     }
429                     s2.send(()).unwrap();
430                 }
431             });
432 
433             b.iter(|| {
434                 s1.send(()).unwrap();
435                 for _ in 0..steps {
436                     r.recv().unwrap();
437                 }
438                 r2.recv().unwrap();
439             });
440             drop(s1);
441         })
442         .unwrap();
443     }
444 
445     #[bench]
spmc(b: &mut Bencher)446     fn spmc(b: &mut Bencher) {
447         let threads = num_cpus::get() - 1;
448         let steps = TOTAL_STEPS / threads;
449         let (s, r) = bounded::<i32>(1);
450 
451         let (s1, r1) = bounded(0);
452         let (s2, r2) = bounded(0);
453         scope(|scope| {
454             for _ in 0..threads {
455                 scope.spawn(|_| {
456                     while r1.recv().is_ok() {
457                         for _ in 0..steps {
458                             r.recv().unwrap();
459                         }
460                         s2.send(()).unwrap();
461                     }
462                 });
463             }
464 
465             b.iter(|| {
466                 for _ in 0..threads {
467                     s1.send(()).unwrap();
468                 }
469                 for i in 0..steps * threads {
470                     s.send(i as i32).unwrap();
471                 }
472                 for _ in 0..threads {
473                     r2.recv().unwrap();
474                 }
475             });
476             drop(s1);
477         })
478         .unwrap();
479     }
480 
481     #[bench]
mpsc(b: &mut Bencher)482     fn mpsc(b: &mut Bencher) {
483         let threads = num_cpus::get() - 1;
484         let steps = TOTAL_STEPS / threads;
485         let (s, r) = bounded::<i32>(1);
486 
487         let (s1, r1) = bounded(0);
488         let (s2, r2) = bounded(0);
489         scope(|scope| {
490             for _ in 0..threads {
491                 scope.spawn(|_| {
492                     while r1.recv().is_ok() {
493                         for i in 0..steps {
494                             s.send(i as i32).unwrap();
495                         }
496                         s2.send(()).unwrap();
497                     }
498                 });
499             }
500 
501             b.iter(|| {
502                 for _ in 0..threads {
503                     s1.send(()).unwrap();
504                 }
505                 for _ in 0..steps * threads {
506                     r.recv().unwrap();
507                 }
508                 for _ in 0..threads {
509                     r2.recv().unwrap();
510                 }
511             });
512             drop(s1);
513         })
514         .unwrap();
515     }
516 
517     #[bench]
mpmc(b: &mut Bencher)518     fn mpmc(b: &mut Bencher) {
519         let threads = num_cpus::get();
520         let steps = TOTAL_STEPS / threads;
521         let (s, r) = bounded::<i32>(1);
522 
523         let (s1, r1) = bounded(0);
524         let (s2, r2) = bounded(0);
525         scope(|scope| {
526             for _ in 0..threads / 2 {
527                 scope.spawn(|_| {
528                     while r1.recv().is_ok() {
529                         for i in 0..steps {
530                             s.send(i as i32).unwrap();
531                         }
532                         s2.send(()).unwrap();
533                     }
534                 });
535             }
536             for _ in 0..threads / 2 {
537                 scope.spawn(|_| {
538                     while r1.recv().is_ok() {
539                         for _ in 0..steps {
540                             r.recv().unwrap();
541                         }
542                         s2.send(()).unwrap();
543                     }
544                 });
545             }
546 
547             b.iter(|| {
548                 for _ in 0..threads {
549                     s1.send(()).unwrap();
550                 }
551                 for _ in 0..threads {
552                     r2.recv().unwrap();
553                 }
554             });
555             drop(s1);
556         })
557         .unwrap();
558     }
559 }
560 
561 mod bounded_0 {
562     use super::*;
563 
564     #[bench]
create(b: &mut Bencher)565     fn create(b: &mut Bencher) {
566         b.iter(|| bounded::<i32>(0));
567     }
568 
569     #[bench]
spsc(b: &mut Bencher)570     fn spsc(b: &mut Bencher) {
571         let steps = TOTAL_STEPS;
572         let (s, r) = bounded::<i32>(0);
573 
574         let (s1, r1) = bounded(0);
575         let (s2, r2) = bounded(0);
576         scope(|scope| {
577             scope.spawn(|_| {
578                 while r1.recv().is_ok() {
579                     for i in 0..steps {
580                         s.send(i as i32).unwrap();
581                     }
582                     s2.send(()).unwrap();
583                 }
584             });
585 
586             b.iter(|| {
587                 s1.send(()).unwrap();
588                 for _ in 0..steps {
589                     r.recv().unwrap();
590                 }
591                 r2.recv().unwrap();
592             });
593             drop(s1);
594         })
595         .unwrap();
596     }
597 
598     #[bench]
spmc(b: &mut Bencher)599     fn spmc(b: &mut Bencher) {
600         let threads = num_cpus::get() - 1;
601         let steps = TOTAL_STEPS / threads;
602         let (s, r) = bounded::<i32>(0);
603 
604         let (s1, r1) = bounded(0);
605         let (s2, r2) = bounded(0);
606         scope(|scope| {
607             for _ in 0..threads {
608                 scope.spawn(|_| {
609                     while r1.recv().is_ok() {
610                         for _ in 0..steps {
611                             r.recv().unwrap();
612                         }
613                         s2.send(()).unwrap();
614                     }
615                 });
616             }
617 
618             b.iter(|| {
619                 for _ in 0..threads {
620                     s1.send(()).unwrap();
621                 }
622                 for i in 0..steps * threads {
623                     s.send(i as i32).unwrap();
624                 }
625                 for _ in 0..threads {
626                     r2.recv().unwrap();
627                 }
628             });
629             drop(s1);
630         })
631         .unwrap();
632     }
633 
634     #[bench]
mpsc(b: &mut Bencher)635     fn mpsc(b: &mut Bencher) {
636         let threads = num_cpus::get() - 1;
637         let steps = TOTAL_STEPS / threads;
638         let (s, r) = bounded::<i32>(0);
639 
640         let (s1, r1) = bounded(0);
641         let (s2, r2) = bounded(0);
642         scope(|scope| {
643             for _ in 0..threads {
644                 scope.spawn(|_| {
645                     while r1.recv().is_ok() {
646                         for i in 0..steps {
647                             s.send(i as i32).unwrap();
648                         }
649                         s2.send(()).unwrap();
650                     }
651                 });
652             }
653 
654             b.iter(|| {
655                 for _ in 0..threads {
656                     s1.send(()).unwrap();
657                 }
658                 for _ in 0..steps * threads {
659                     r.recv().unwrap();
660                 }
661                 for _ in 0..threads {
662                     r2.recv().unwrap();
663                 }
664             });
665             drop(s1);
666         })
667         .unwrap();
668     }
669 
670     #[bench]
mpmc(b: &mut Bencher)671     fn mpmc(b: &mut Bencher) {
672         let threads = num_cpus::get();
673         let steps = TOTAL_STEPS / threads;
674         let (s, r) = bounded::<i32>(0);
675 
676         let (s1, r1) = bounded(0);
677         let (s2, r2) = bounded(0);
678         scope(|scope| {
679             for _ in 0..threads / 2 {
680                 scope.spawn(|_| {
681                     while r1.recv().is_ok() {
682                         for i in 0..steps {
683                             s.send(i as i32).unwrap();
684                         }
685                         s2.send(()).unwrap();
686                     }
687                 });
688             }
689             for _ in 0..threads / 2 {
690                 scope.spawn(|_| {
691                     while r1.recv().is_ok() {
692                         for _ in 0..steps {
693                             r.recv().unwrap();
694                         }
695                         s2.send(()).unwrap();
696                     }
697                 });
698             }
699 
700             b.iter(|| {
701                 for _ in 0..threads {
702                     s1.send(()).unwrap();
703                 }
704                 for _ in 0..threads {
705                     r2.recv().unwrap();
706                 }
707             });
708             drop(s1);
709         })
710         .unwrap();
711     }
712 }
713