1 #![feature(test)] 2 3 extern crate crossbeam_channel; 4 extern crate crossbeam_utils; 5 extern crate num_cpus; 6 extern crate test; 7 8 use crossbeam_channel::{bounded, unbounded}; 9 use crossbeam_utils::thread::scope; 10 use test::Bencher; 11 12 const TOTAL_STEPS: usize = 40_000; 13 14 mod unbounded { 15 use super::*; 16 17 #[bench] create(b: &mut Bencher)18 fn create(b: &mut Bencher) { 19 b.iter(|| unbounded::<i32>()); 20 } 21 22 #[bench] oneshot(b: &mut Bencher)23 fn oneshot(b: &mut Bencher) { 24 b.iter(|| { 25 let (s, r) = unbounded::<i32>(); 26 s.send(0).unwrap(); 27 r.recv().unwrap(); 28 }); 29 } 30 31 #[bench] inout(b: &mut Bencher)32 fn inout(b: &mut Bencher) { 33 let (s, r) = unbounded::<i32>(); 34 b.iter(|| { 35 s.send(0).unwrap(); 36 r.recv().unwrap(); 37 }); 38 } 39 40 #[bench] par_inout(b: &mut Bencher)41 fn par_inout(b: &mut Bencher) { 42 let threads = num_cpus::get(); 43 let steps = TOTAL_STEPS / threads; 44 let (s, r) = unbounded::<i32>(); 45 46 let (s1, r1) = bounded(0); 47 let (s2, r2) = bounded(0); 48 scope(|scope| { 49 for _ in 0..threads { 50 scope.spawn(|_| { 51 while r1.recv().is_ok() { 52 for i in 0..steps { 53 s.send(i as i32).unwrap(); 54 r.recv().unwrap(); 55 } 56 s2.send(()).unwrap(); 57 } 58 }); 59 } 60 61 b.iter(|| { 62 for _ in 0..threads { 63 s1.send(()).unwrap(); 64 } 65 for _ in 0..threads { 66 r2.recv().unwrap(); 67 } 68 }); 69 drop(s1); 70 }) 71 .unwrap(); 72 } 73 74 #[bench] spsc(b: &mut Bencher)75 fn spsc(b: &mut Bencher) { 76 let steps = TOTAL_STEPS; 77 let (s, r) = unbounded::<i32>(); 78 79 let (s1, r1) = bounded(0); 80 let (s2, r2) = bounded(0); 81 scope(|scope| { 82 scope.spawn(|_| { 83 while r1.recv().is_ok() { 84 for i in 0..steps { 85 s.send(i as i32).unwrap(); 86 } 87 s2.send(()).unwrap(); 88 } 89 }); 90 91 b.iter(|| { 92 s1.send(()).unwrap(); 93 for _ in 0..steps { 94 r.recv().unwrap(); 95 } 96 r2.recv().unwrap(); 97 }); 98 drop(s1); 99 }) 100 .unwrap(); 101 } 102 103 #[bench] spmc(b: &mut Bencher)104 fn spmc(b: &mut Bencher) { 105 let threads = num_cpus::get() - 1; 106 let steps = TOTAL_STEPS / threads; 107 let (s, r) = unbounded::<i32>(); 108 109 let (s1, r1) = bounded(0); 110 let (s2, r2) = bounded(0); 111 scope(|scope| { 112 for _ in 0..threads { 113 scope.spawn(|_| { 114 while r1.recv().is_ok() { 115 for _ in 0..steps { 116 r.recv().unwrap(); 117 } 118 s2.send(()).unwrap(); 119 } 120 }); 121 } 122 123 b.iter(|| { 124 for _ in 0..threads { 125 s1.send(()).unwrap(); 126 } 127 for i in 0..steps * threads { 128 s.send(i as i32).unwrap(); 129 } 130 for _ in 0..threads { 131 r2.recv().unwrap(); 132 } 133 }); 134 drop(s1); 135 }) 136 .unwrap(); 137 } 138 139 #[bench] mpsc(b: &mut Bencher)140 fn mpsc(b: &mut Bencher) { 141 let threads = num_cpus::get() - 1; 142 let steps = TOTAL_STEPS / threads; 143 let (s, r) = unbounded::<i32>(); 144 145 let (s1, r1) = bounded(0); 146 let (s2, r2) = bounded(0); 147 scope(|scope| { 148 for _ in 0..threads { 149 scope.spawn(|_| { 150 while r1.recv().is_ok() { 151 for i in 0..steps { 152 s.send(i as i32).unwrap(); 153 } 154 s2.send(()).unwrap(); 155 } 156 }); 157 } 158 159 b.iter(|| { 160 for _ in 0..threads { 161 s1.send(()).unwrap(); 162 } 163 for _ in 0..steps * threads { 164 r.recv().unwrap(); 165 } 166 for _ in 0..threads { 167 r2.recv().unwrap(); 168 } 169 }); 170 drop(s1); 171 }) 172 .unwrap(); 173 } 174 175 #[bench] mpmc(b: &mut Bencher)176 fn mpmc(b: &mut Bencher) { 177 let threads = num_cpus::get(); 178 let steps = TOTAL_STEPS / threads; 179 let (s, r) = unbounded::<i32>(); 180 181 let (s1, r1) = bounded(0); 182 let (s2, r2) = bounded(0); 183 scope(|scope| { 184 for _ in 0..threads / 2 { 185 scope.spawn(|_| { 186 while r1.recv().is_ok() { 187 for i in 0..steps { 188 s.send(i as i32).unwrap(); 189 } 190 s2.send(()).unwrap(); 191 } 192 }); 193 } 194 for _ in 0..threads / 2 { 195 scope.spawn(|_| { 196 while r1.recv().is_ok() { 197 for _ in 0..steps { 198 r.recv().unwrap(); 199 } 200 s2.send(()).unwrap(); 201 } 202 }); 203 } 204 205 b.iter(|| { 206 for _ in 0..threads { 207 s1.send(()).unwrap(); 208 } 209 for _ in 0..threads { 210 r2.recv().unwrap(); 211 } 212 }); 213 drop(s1); 214 }) 215 .unwrap(); 216 } 217 } 218 219 mod bounded_n { 220 use super::*; 221 222 #[bench] spsc(b: &mut Bencher)223 fn spsc(b: &mut Bencher) { 224 let steps = TOTAL_STEPS; 225 let (s, r) = bounded::<i32>(steps); 226 227 let (s1, r1) = bounded(0); 228 let (s2, r2) = bounded(0); 229 scope(|scope| { 230 scope.spawn(|_| { 231 while r1.recv().is_ok() { 232 for i in 0..steps { 233 s.send(i as i32).unwrap(); 234 } 235 s2.send(()).unwrap(); 236 } 237 }); 238 239 b.iter(|| { 240 s1.send(()).unwrap(); 241 for _ in 0..steps { 242 r.recv().unwrap(); 243 } 244 r2.recv().unwrap(); 245 }); 246 drop(s1); 247 }) 248 .unwrap(); 249 } 250 251 #[bench] spmc(b: &mut Bencher)252 fn spmc(b: &mut Bencher) { 253 let threads = num_cpus::get() - 1; 254 let steps = TOTAL_STEPS / threads; 255 let (s, r) = bounded::<i32>(steps * threads); 256 257 let (s1, r1) = bounded(0); 258 let (s2, r2) = bounded(0); 259 scope(|scope| { 260 for _ in 0..threads { 261 scope.spawn(|_| { 262 while r1.recv().is_ok() { 263 for _ in 0..steps { 264 r.recv().unwrap(); 265 } 266 s2.send(()).unwrap(); 267 } 268 }); 269 } 270 271 b.iter(|| { 272 for _ in 0..threads { 273 s1.send(()).unwrap(); 274 } 275 for i in 0..steps * threads { 276 s.send(i as i32).unwrap(); 277 } 278 for _ in 0..threads { 279 r2.recv().unwrap(); 280 } 281 }); 282 drop(s1); 283 }) 284 .unwrap(); 285 } 286 287 #[bench] mpsc(b: &mut Bencher)288 fn mpsc(b: &mut Bencher) { 289 let threads = num_cpus::get() - 1; 290 let steps = TOTAL_STEPS / threads; 291 let (s, r) = bounded::<i32>(steps * threads); 292 293 let (s1, r1) = bounded(0); 294 let (s2, r2) = bounded(0); 295 scope(|scope| { 296 for _ in 0..threads { 297 scope.spawn(|_| { 298 while r1.recv().is_ok() { 299 for i in 0..steps { 300 s.send(i as i32).unwrap(); 301 } 302 s2.send(()).unwrap(); 303 } 304 }); 305 } 306 307 b.iter(|| { 308 for _ in 0..threads { 309 s1.send(()).unwrap(); 310 } 311 for _ in 0..steps * threads { 312 r.recv().unwrap(); 313 } 314 for _ in 0..threads { 315 r2.recv().unwrap(); 316 } 317 }); 318 drop(s1); 319 }) 320 .unwrap(); 321 } 322 323 #[bench] par_inout(b: &mut Bencher)324 fn par_inout(b: &mut Bencher) { 325 let threads = num_cpus::get(); 326 let steps = TOTAL_STEPS / threads; 327 let (s, r) = bounded::<i32>(threads); 328 329 let (s1, r1) = bounded(0); 330 let (s2, r2) = bounded(0); 331 scope(|scope| { 332 for _ in 0..threads { 333 scope.spawn(|_| { 334 while r1.recv().is_ok() { 335 for i in 0..steps { 336 s.send(i as i32).unwrap(); 337 r.recv().unwrap(); 338 } 339 s2.send(()).unwrap(); 340 } 341 }); 342 } 343 344 b.iter(|| { 345 for _ in 0..threads { 346 s1.send(()).unwrap(); 347 } 348 for _ in 0..threads { 349 r2.recv().unwrap(); 350 } 351 }); 352 drop(s1); 353 }) 354 .unwrap(); 355 } 356 357 #[bench] mpmc(b: &mut Bencher)358 fn mpmc(b: &mut Bencher) { 359 let threads = num_cpus::get(); 360 assert_eq!(threads % 2, 0); 361 let steps = TOTAL_STEPS / threads; 362 let (s, r) = bounded::<i32>(steps * threads); 363 364 let (s1, r1) = bounded(0); 365 let (s2, r2) = bounded(0); 366 scope(|scope| { 367 for _ in 0..threads / 2 { 368 scope.spawn(|_| { 369 while r1.recv().is_ok() { 370 for i in 0..steps { 371 s.send(i as i32).unwrap(); 372 } 373 s2.send(()).unwrap(); 374 } 375 }); 376 } 377 for _ in 0..threads / 2 { 378 scope.spawn(|_| { 379 while r1.recv().is_ok() { 380 for _ in 0..steps { 381 r.recv().unwrap(); 382 } 383 s2.send(()).unwrap(); 384 } 385 }); 386 } 387 388 b.iter(|| { 389 for _ in 0..threads { 390 s1.send(()).unwrap(); 391 } 392 for _ in 0..threads { 393 r2.recv().unwrap(); 394 } 395 }); 396 drop(s1); 397 }) 398 .unwrap(); 399 } 400 } 401 402 mod bounded_1 { 403 use super::*; 404 405 #[bench] create(b: &mut Bencher)406 fn create(b: &mut Bencher) { 407 b.iter(|| bounded::<i32>(1)); 408 } 409 410 #[bench] oneshot(b: &mut Bencher)411 fn oneshot(b: &mut Bencher) { 412 b.iter(|| { 413 let (s, r) = bounded::<i32>(1); 414 s.send(0).unwrap(); 415 r.recv().unwrap(); 416 }); 417 } 418 419 #[bench] spsc(b: &mut Bencher)420 fn spsc(b: &mut Bencher) { 421 let steps = TOTAL_STEPS; 422 let (s, r) = bounded::<i32>(1); 423 424 let (s1, r1) = bounded(0); 425 let (s2, r2) = bounded(0); 426 scope(|scope| { 427 scope.spawn(|_| { 428 while r1.recv().is_ok() { 429 for i in 0..steps { 430 s.send(i as i32).unwrap(); 431 } 432 s2.send(()).unwrap(); 433 } 434 }); 435 436 b.iter(|| { 437 s1.send(()).unwrap(); 438 for _ in 0..steps { 439 r.recv().unwrap(); 440 } 441 r2.recv().unwrap(); 442 }); 443 drop(s1); 444 }) 445 .unwrap(); 446 } 447 448 #[bench] spmc(b: &mut Bencher)449 fn spmc(b: &mut Bencher) { 450 let threads = num_cpus::get() - 1; 451 let steps = TOTAL_STEPS / threads; 452 let (s, r) = bounded::<i32>(1); 453 454 let (s1, r1) = bounded(0); 455 let (s2, r2) = bounded(0); 456 scope(|scope| { 457 for _ in 0..threads { 458 scope.spawn(|_| { 459 while r1.recv().is_ok() { 460 for _ in 0..steps { 461 r.recv().unwrap(); 462 } 463 s2.send(()).unwrap(); 464 } 465 }); 466 } 467 468 b.iter(|| { 469 for _ in 0..threads { 470 s1.send(()).unwrap(); 471 } 472 for i in 0..steps * threads { 473 s.send(i as i32).unwrap(); 474 } 475 for _ in 0..threads { 476 r2.recv().unwrap(); 477 } 478 }); 479 drop(s1); 480 }) 481 .unwrap(); 482 } 483 484 #[bench] mpsc(b: &mut Bencher)485 fn mpsc(b: &mut Bencher) { 486 let threads = num_cpus::get() - 1; 487 let steps = TOTAL_STEPS / threads; 488 let (s, r) = bounded::<i32>(1); 489 490 let (s1, r1) = bounded(0); 491 let (s2, r2) = bounded(0); 492 scope(|scope| { 493 for _ in 0..threads { 494 scope.spawn(|_| { 495 while r1.recv().is_ok() { 496 for i in 0..steps { 497 s.send(i as i32).unwrap(); 498 } 499 s2.send(()).unwrap(); 500 } 501 }); 502 } 503 504 b.iter(|| { 505 for _ in 0..threads { 506 s1.send(()).unwrap(); 507 } 508 for _ in 0..steps * threads { 509 r.recv().unwrap(); 510 } 511 for _ in 0..threads { 512 r2.recv().unwrap(); 513 } 514 }); 515 drop(s1); 516 }) 517 .unwrap(); 518 } 519 520 #[bench] mpmc(b: &mut Bencher)521 fn mpmc(b: &mut Bencher) { 522 let threads = num_cpus::get(); 523 let steps = TOTAL_STEPS / threads; 524 let (s, r) = bounded::<i32>(1); 525 526 let (s1, r1) = bounded(0); 527 let (s2, r2) = bounded(0); 528 scope(|scope| { 529 for _ in 0..threads / 2 { 530 scope.spawn(|_| { 531 while r1.recv().is_ok() { 532 for i in 0..steps { 533 s.send(i as i32).unwrap(); 534 } 535 s2.send(()).unwrap(); 536 } 537 }); 538 } 539 for _ in 0..threads / 2 { 540 scope.spawn(|_| { 541 while r1.recv().is_ok() { 542 for _ in 0..steps { 543 r.recv().unwrap(); 544 } 545 s2.send(()).unwrap(); 546 } 547 }); 548 } 549 550 b.iter(|| { 551 for _ in 0..threads { 552 s1.send(()).unwrap(); 553 } 554 for _ in 0..threads { 555 r2.recv().unwrap(); 556 } 557 }); 558 drop(s1); 559 }) 560 .unwrap(); 561 } 562 } 563 564 mod bounded_0 { 565 use super::*; 566 567 #[bench] create(b: &mut Bencher)568 fn create(b: &mut Bencher) { 569 b.iter(|| bounded::<i32>(0)); 570 } 571 572 #[bench] spsc(b: &mut Bencher)573 fn spsc(b: &mut Bencher) { 574 let steps = TOTAL_STEPS; 575 let (s, r) = bounded::<i32>(0); 576 577 let (s1, r1) = bounded(0); 578 let (s2, r2) = bounded(0); 579 scope(|scope| { 580 scope.spawn(|_| { 581 while r1.recv().is_ok() { 582 for i in 0..steps { 583 s.send(i as i32).unwrap(); 584 } 585 s2.send(()).unwrap(); 586 } 587 }); 588 589 b.iter(|| { 590 s1.send(()).unwrap(); 591 for _ in 0..steps { 592 r.recv().unwrap(); 593 } 594 r2.recv().unwrap(); 595 }); 596 drop(s1); 597 }) 598 .unwrap(); 599 } 600 601 #[bench] spmc(b: &mut Bencher)602 fn spmc(b: &mut Bencher) { 603 let threads = num_cpus::get() - 1; 604 let steps = TOTAL_STEPS / threads; 605 let (s, r) = bounded::<i32>(0); 606 607 let (s1, r1) = bounded(0); 608 let (s2, r2) = bounded(0); 609 scope(|scope| { 610 for _ in 0..threads { 611 scope.spawn(|_| { 612 while r1.recv().is_ok() { 613 for _ in 0..steps { 614 r.recv().unwrap(); 615 } 616 s2.send(()).unwrap(); 617 } 618 }); 619 } 620 621 b.iter(|| { 622 for _ in 0..threads { 623 s1.send(()).unwrap(); 624 } 625 for i in 0..steps * threads { 626 s.send(i as i32).unwrap(); 627 } 628 for _ in 0..threads { 629 r2.recv().unwrap(); 630 } 631 }); 632 drop(s1); 633 }) 634 .unwrap(); 635 } 636 637 #[bench] mpsc(b: &mut Bencher)638 fn mpsc(b: &mut Bencher) { 639 let threads = num_cpus::get() - 1; 640 let steps = TOTAL_STEPS / threads; 641 let (s, r) = bounded::<i32>(0); 642 643 let (s1, r1) = bounded(0); 644 let (s2, r2) = bounded(0); 645 scope(|scope| { 646 for _ in 0..threads { 647 scope.spawn(|_| { 648 while r1.recv().is_ok() { 649 for i in 0..steps { 650 s.send(i as i32).unwrap(); 651 } 652 s2.send(()).unwrap(); 653 } 654 }); 655 } 656 657 b.iter(|| { 658 for _ in 0..threads { 659 s1.send(()).unwrap(); 660 } 661 for _ in 0..steps * threads { 662 r.recv().unwrap(); 663 } 664 for _ in 0..threads { 665 r2.recv().unwrap(); 666 } 667 }); 668 drop(s1); 669 }) 670 .unwrap(); 671 } 672 673 #[bench] mpmc(b: &mut Bencher)674 fn mpmc(b: &mut Bencher) { 675 let threads = num_cpus::get(); 676 let steps = TOTAL_STEPS / threads; 677 let (s, r) = bounded::<i32>(0); 678 679 let (s1, r1) = bounded(0); 680 let (s2, r2) = bounded(0); 681 scope(|scope| { 682 for _ in 0..threads / 2 { 683 scope.spawn(|_| { 684 while r1.recv().is_ok() { 685 for i in 0..steps { 686 s.send(i as i32).unwrap(); 687 } 688 s2.send(()).unwrap(); 689 } 690 }); 691 } 692 for _ in 0..threads / 2 { 693 scope.spawn(|_| { 694 while r1.recv().is_ok() { 695 for _ in 0..steps { 696 r.recv().unwrap(); 697 } 698 s2.send(()).unwrap(); 699 } 700 }); 701 } 702 703 b.iter(|| { 704 for _ in 0..threads { 705 s1.send(()).unwrap(); 706 } 707 for _ in 0..threads { 708 r2.recv().unwrap(); 709 } 710 }); 711 drop(s1); 712 }) 713 .unwrap(); 714 } 715 } 716