1 // Copyright (C) 2006 Davis E. King (davis@dlib.net) 2 // License: Boost Software License See LICENSE.txt for the full license. 3 4 5 #include <sstream> 6 #include <string> 7 #include <cstdlib> 8 #include <ctime> 9 #include <dlib/misc_api.h> 10 #include <dlib/pipe.h> 11 12 #include "tester.h" 13 14 namespace 15 { 16 using namespace test; 17 using namespace dlib; 18 using namespace std; 19 20 logger dlog("test.pipe"); 21 22 namespace pipe_kernel_test_helpers 23 { 24 const unsigned long proc1_count = 10000; 25 dlib::mutex m; 26 signaler s(m); 27 unsigned long threads_running = 0; 28 bool found_error; 29 add_running_thread()30 inline void add_running_thread ( 31 ) 32 { 33 auto_mutex M(m); 34 ++threads_running; 35 } 36 remove_running_thread()37 inline void remove_running_thread ( 38 ) 39 { 40 auto_mutex M(m); 41 --threads_running; 42 s.broadcast(); 43 } 44 wait_for_threads()45 inline void wait_for_threads ( 46 ) 47 { 48 auto_mutex M(m); 49 while (threads_running > 0) 50 s.wait(); 51 } 52 53 template < 54 typename pipe 55 > threadproc1(void * param)56 void threadproc1 ( 57 void* param 58 ) 59 { 60 add_running_thread(); 61 pipe& p = *static_cast<pipe*>(param); 62 try 63 { 64 65 int last = -1; 66 for (unsigned long i = 0; i < proc1_count; ++i) 67 { 68 int cur=0; 69 DLIB_TEST(p.dequeue(cur) == true); 70 DLIB_TEST(last + 1 == cur); 71 last = cur; 72 } 73 DLIB_TEST(p.size() == 0); 74 } 75 catch(exception& e) 76 { 77 auto_mutex M(m); 78 found_error = true; 79 cout << "\n\nERRORS FOUND" << endl; 80 cout << e.what() << endl; 81 dlog << LWARN << "ERRORS FOUND"; 82 dlog << LWARN << e.what(); 83 p.disable(); 84 } 85 86 remove_running_thread(); 87 } 88 89 90 template < 91 typename pipe 92 > threadproc2(void * param)93 void threadproc2 ( 94 void* param 95 ) 96 { 97 add_running_thread(); 98 pipe& p = *static_cast<pipe*>(param); 99 try 100 { 101 102 int last = -1; 103 int cur; 104 while (p.dequeue(cur)) 105 { 106 DLIB_TEST(last < cur); 107 last = cur; 108 } 109 auto_mutex M(m); 110 } 111 catch(exception& e) 112 { 113 auto_mutex M(m); 114 found_error = true; 115 cout << "\n\nERRORS FOUND" << endl; 116 cout << e.what() << endl; 117 dlog << LWARN << "ERRORS FOUND"; 118 dlog << LWARN << e.what(); 119 p.disable(); 120 } 121 remove_running_thread(); 122 } 123 124 125 126 template < 127 typename pipe 128 > threadproc3(void * param)129 void threadproc3 ( 130 void* param 131 ) 132 { 133 add_running_thread(); 134 pipe& p = *static_cast<pipe*>(param); 135 try 136 { 137 138 int last = -1; 139 int cur; 140 while (p.dequeue_or_timeout(cur,100000)) 141 { 142 DLIB_TEST(last < cur); 143 last = cur; 144 } 145 auto_mutex M(m); 146 } 147 catch(exception& e) 148 { 149 auto_mutex M(m); 150 found_error = true; 151 cout << "\n\nERRORS FOUND" << endl; 152 cout << e.what() << endl; 153 dlog << LWARN << "ERRORS FOUND"; 154 dlog << LWARN << e.what(); 155 p.disable(); 156 } 157 remove_running_thread(); 158 } 159 160 161 } 162 163 // ---------------------------------------------------------------------------------------- 164 // ---------------------------------------------------------------------------------------- 165 166 template<typename in_type, typename out_type> 167 class PipelineProcessor : private dlib::threaded_object 168 { 169 public: PipelineProcessor(dlib::pipe<in_type> & in,dlib::pipe<out_type> & out)170 PipelineProcessor( 171 dlib::pipe<in_type> & in, 172 dlib::pipe<out_type> & out) : 173 InPipe(in), 174 OutPipe(out), 175 InMsg(), 176 OutMsg() { 177 start(); 178 } 179 ~PipelineProcessor()180 ~PipelineProcessor() { 181 // signal the thread to stop 182 stop(); 183 wait(); 184 } 185 186 private: 187 dlib::pipe<in_type> & InPipe; 188 dlib::pipe<out_type> & OutPipe; 189 190 in_type InMsg; 191 out_type OutMsg; 192 thread()193 void thread() 194 { 195 while (!should_stop()) { 196 if(InPipe.dequeue_or_timeout(InMsg, 100)) 197 { 198 // if function signals ready to send OutMsg 199 while (!OutPipe.enqueue_or_timeout(OutMsg, 100)) 200 { 201 // try to send until should stop 202 if (should_stop()) 203 { 204 return; 205 } 206 } 207 } 208 } 209 }; 210 }; 211 212 do_zero_size_test_with_timeouts()213 void do_zero_size_test_with_timeouts() 214 { 215 dlog << LINFO << "in do_zero_size_test_with_timeouts()"; 216 // make sure we can get though this without deadlocking 217 for (int k = 0; k < 10; ++k) 218 { 219 dlib::pipe<int> in_pipe(10); 220 dlib::pipe<float> out_pipe(0); 221 { 222 PipelineProcessor<int, float> pp(in_pipe, out_pipe); 223 224 int in = 1; 225 in_pipe.enqueue(in); 226 in = 2; 227 in_pipe.enqueue(in); 228 in = 3; 229 in_pipe.enqueue(in); 230 // sleep to make sure thread enqueued 231 dlib::sleep(100); 232 233 float out = 1.0f; 234 out_pipe.dequeue(out); 235 dlib::sleep(100); 236 } 237 print_spinner(); 238 } 239 240 } 241 242 // ---------------------------------------------------------------------------------------- 243 // ---------------------------------------------------------------------------------------- 244 245 template < 246 typename pipe 247 > pipe_kernel_test()248 void pipe_kernel_test ( 249 ) 250 /*! 251 requires 252 - pipe is an implementation of pipe/pipe_kernel_abstract.h and 253 is instantiated with int 254 ensures 255 - runs tests on pipe for compliance with the specs 256 !*/ 257 { 258 using namespace pipe_kernel_test_helpers; 259 found_error = false; 260 261 262 print_spinner(); 263 pipe test(10), test2(100); 264 pipe test_0(0), test2_0(0); 265 pipe test_1(1), test2_1(1); 266 267 DLIB_TEST(test.size() == 0); 268 DLIB_TEST(test2.size() == 0); 269 DLIB_TEST(test_0.size() == 0); 270 DLIB_TEST(test2_0.size() == 0); 271 DLIB_TEST(test_1.size() == 0); 272 DLIB_TEST(test2_1.size() == 0); 273 274 DLIB_TEST(test.is_enqueue_enabled() == true); 275 DLIB_TEST(test.is_dequeue_enabled() == true); 276 DLIB_TEST(test.is_enabled() == true); 277 278 test.empty(); 279 test2.empty(); 280 DLIB_TEST(test.size() == 0); 281 DLIB_TEST(test2.size() == 0); 282 283 test_0.empty(); 284 test2_0.empty(); 285 DLIB_TEST(test_0.size() == 0); 286 DLIB_TEST(test2_0.size() == 0); 287 288 test_1.empty(); 289 test2_1.empty(); 290 DLIB_TEST(test_1.size() == 0); 291 DLIB_TEST(test2_1.size() == 0); 292 293 294 295 int a; 296 a = 3; 297 test.enqueue(a); 298 DLIB_TEST(test.size() == 1); 299 a = 5; 300 test.enqueue(a); 301 DLIB_TEST(test.size() == 2); 302 303 a = 0; 304 test.dequeue(a); 305 DLIB_TEST(a == 3); 306 DLIB_TEST(test.size() == 1); 307 308 a = 0; 309 test.dequeue(a); 310 DLIB_TEST(a == 5); 311 DLIB_TEST(test.size() == 0); 312 313 314 print_spinner(); 315 { 316 dlog << LINFO << "starting normal length pipe tests"; 317 create_new_thread(&threadproc1<pipe>,&test); 318 create_new_thread(&threadproc2<pipe>,&test2); 319 create_new_thread(&threadproc2<pipe>,&test2); 320 create_new_thread(&threadproc2<pipe>,&test2); 321 322 for (unsigned long i = 0; i < proc1_count; ++i) 323 { 324 a = i; 325 test.enqueue(a); 326 } 327 DLIB_TEST(test.is_enqueue_enabled() == true); 328 test.disable_enqueue(); 329 DLIB_TEST(test.is_enqueue_enabled() == false); 330 for (unsigned long i = 0; i < proc1_count; ++i) 331 { 332 a = i; 333 test.enqueue(a); 334 } 335 336 for (unsigned long i = 0; i < 100000; ++i) 337 { 338 a = i; 339 if (i%2 == 0) 340 test2.enqueue(a); 341 else 342 test2.enqueue_or_timeout(a,100000); 343 } 344 345 test2.wait_for_num_blocked_dequeues(3); 346 DLIB_TEST(test2.size() == 0); 347 test2.disable(); 348 349 wait_for_threads(); 350 DLIB_TEST(test2.size() == 0); 351 352 test2.enable(); 353 354 print_spinner(); 355 356 create_new_thread(&threadproc3<pipe>,&test2); 357 create_new_thread(&threadproc3<pipe>,&test2); 358 359 360 for (unsigned long i = 0; i < 100000; ++i) 361 { 362 a = i; 363 if (i%2 == 0) 364 test2.enqueue(a); 365 else 366 test2.enqueue_or_timeout(a,100000); 367 } 368 369 test2.wait_for_num_blocked_dequeues(2); 370 DLIB_TEST(test2.size() == 0); 371 test2.disable(); 372 373 wait_for_threads(); 374 DLIB_TEST(test2.size() == 0); 375 376 } 377 378 379 print_spinner(); 380 { 381 dlog << LINFO << "starting 0 length pipe tests"; 382 create_new_thread(&threadproc1<pipe>,&test_0); 383 create_new_thread(&threadproc2<pipe>,&test2_0); 384 create_new_thread(&threadproc2<pipe>,&test2_0); 385 create_new_thread(&threadproc2<pipe>,&test2_0); 386 dlog << LTRACE << "0: 1"; 387 388 for (unsigned long i = 0; i < proc1_count; ++i) 389 { 390 a = i; 391 test_0.enqueue(a); 392 } 393 394 dlog << LTRACE << "0: 2"; 395 DLIB_TEST(test_0.is_enqueue_enabled() == true); 396 test_0.disable_enqueue(); 397 DLIB_TEST(test_0.is_enqueue_enabled() == false); 398 for (unsigned long i = 0; i < proc1_count; ++i) 399 { 400 a = i; 401 test_0.enqueue(a); 402 } 403 404 dlog << LTRACE << "0: 3"; 405 for (unsigned long i = 0; i < 100000; ++i) 406 { 407 a = i; 408 if (i%2 == 0) 409 test2_0.enqueue(a); 410 else 411 test2_0.enqueue_or_timeout(a,100000); 412 } 413 414 print_spinner(); 415 dlog << LTRACE << "0: 4"; 416 test2_0.wait_for_num_blocked_dequeues(3); 417 DLIB_TEST(test2_0.size() == 0); 418 test2_0.disable(); 419 420 wait_for_threads(); 421 DLIB_TEST(test2_0.size() == 0); 422 423 dlog << LTRACE << "0: 5"; 424 test2_0.enable(); 425 426 427 create_new_thread(&threadproc3<pipe>,&test2_0); 428 create_new_thread(&threadproc3<pipe>,&test2_0); 429 430 431 for (unsigned long i = 0; i < 20000; ++i) 432 { 433 if ((i%100) == 0) 434 print_spinner(); 435 436 a = i; 437 if (i%2 == 0) 438 test2_0.enqueue(a); 439 else 440 test2_0.enqueue_or_timeout(a,100000); 441 } 442 443 dlog << LTRACE << "0: 6"; 444 test2_0.wait_for_num_blocked_dequeues(2); 445 DLIB_TEST(test2_0.size() == 0); 446 test2_0.disable(); 447 448 wait_for_threads(); 449 DLIB_TEST(test2_0.size() == 0); 450 451 dlog << LTRACE << "0: 7"; 452 } 453 454 print_spinner(); 455 { 456 dlog << LINFO << "starting 1 length pipe tests"; 457 create_new_thread(&threadproc1<pipe>,&test_1); 458 create_new_thread(&threadproc2<pipe>,&test2_1); 459 create_new_thread(&threadproc2<pipe>,&test2_1); 460 create_new_thread(&threadproc2<pipe>,&test2_1); 461 462 for (unsigned long i = 0; i < proc1_count; ++i) 463 { 464 a = i; 465 test_1.enqueue(a); 466 } 467 DLIB_TEST(test_1.is_enqueue_enabled() == true); 468 test_1.disable_enqueue(); 469 DLIB_TEST(test_1.is_enqueue_enabled() == false); 470 for (unsigned long i = 0; i < proc1_count; ++i) 471 { 472 a = i; 473 test_1.enqueue(a); 474 } 475 print_spinner(); 476 477 for (unsigned long i = 0; i < 100000; ++i) 478 { 479 a = i; 480 if (i%2 == 0) 481 test2_1.enqueue(a); 482 else 483 test2_1.enqueue_or_timeout(a,100000); 484 } 485 486 test2_1.wait_for_num_blocked_dequeues(3); 487 DLIB_TEST(test2_1.size() == 0); 488 test2_1.disable(); 489 490 wait_for_threads(); 491 DLIB_TEST(test2_1.size() == 0); 492 493 test2_1.enable(); 494 495 496 create_new_thread(&threadproc3<pipe>,&test2_1); 497 create_new_thread(&threadproc3<pipe>,&test2_1); 498 499 500 for (unsigned long i = 0; i < 100000; ++i) 501 { 502 a = i; 503 if (i%2 == 0) 504 test2_1.enqueue(a); 505 else 506 test2_1.enqueue_or_timeout(a,100000); 507 } 508 509 test2_1.wait_for_num_blocked_dequeues(2); 510 DLIB_TEST(test2_1.size() == 0); 511 test2_1.disable(); 512 513 wait_for_threads(); 514 DLIB_TEST(test2_1.size() == 0); 515 516 } 517 518 test.enable_enqueue(); 519 test_0.enable_enqueue(); 520 test_1.enable_enqueue(); 521 522 DLIB_TEST(test.is_enabled()); 523 DLIB_TEST(test.is_enqueue_enabled()); 524 DLIB_TEST(test_0.is_enabled()); 525 DLIB_TEST(test_0.is_enqueue_enabled()); 526 DLIB_TEST(test_1.is_enabled()); 527 DLIB_TEST(test_1.is_enqueue_enabled()); 528 529 DLIB_TEST(test.size() == 0); 530 DLIB_TEST(test_0.size() == 0); 531 DLIB_TEST(test_1.size() == 0); 532 DLIB_TEST(test.max_size() == 10); 533 DLIB_TEST(test_0.max_size() == 0); 534 DLIB_TEST(test_1.max_size() == 1); 535 536 537 for (int i = 0; i < 100; ++i) 538 { 539 a = 1; 540 test.enqueue_or_timeout(a,0); 541 a = 1; 542 test_0.enqueue_or_timeout(a,0); 543 a = 1; 544 test_1.enqueue_or_timeout(a,0); 545 } 546 547 DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() ); 548 DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() ); 549 DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() ); 550 551 for (int i = 0; i < 10; ++i) 552 { 553 a = 0; 554 DLIB_TEST(test.enqueue_or_timeout(a,10) == false); 555 a = 0; 556 DLIB_TEST(test_0.enqueue_or_timeout(a,10) == false); 557 a = 0; 558 DLIB_TEST(test_1.enqueue_or_timeout(a,10) == false); 559 } 560 561 DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() ); 562 DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() ); 563 DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() ); 564 565 for (int i = 0; i < 10; ++i) 566 { 567 a = 0; 568 DLIB_TEST(test.dequeue_or_timeout(a,0) == true); 569 DLIB_TEST(a == 1); 570 } 571 572 DLIB_TEST(test.max_size() == 10); 573 DLIB_TEST(test_0.max_size() == 0); 574 DLIB_TEST(test_1.max_size() == 1); 575 576 a = 0; 577 DLIB_TEST(test_1.dequeue_or_timeout(a,0) == true); 578 579 DLIB_TEST(test.max_size() == 10); 580 DLIB_TEST(test_0.max_size() == 0); 581 DLIB_TEST(test_1.max_size() == 1); 582 583 584 DLIB_TEST_MSG(a == 1,"a: " << a); 585 586 DLIB_TEST(test.size() == 0); 587 DLIB_TEST(test_0.size() == 0); 588 DLIB_TEST(test_1.size() == 0); 589 590 DLIB_TEST(test.dequeue_or_timeout(a,0) == false); 591 DLIB_TEST(test_0.dequeue_or_timeout(a,0) == false); 592 DLIB_TEST(test_1.dequeue_or_timeout(a,0) == false); 593 DLIB_TEST(test.dequeue_or_timeout(a,10) == false); 594 DLIB_TEST(test_0.dequeue_or_timeout(a,10) == false); 595 DLIB_TEST(test_1.dequeue_or_timeout(a,10) == false); 596 597 DLIB_TEST(test.size() == 0); 598 DLIB_TEST(test_0.size() == 0); 599 DLIB_TEST(test_1.size() == 0); 600 601 DLIB_TEST(found_error == false); 602 603 604 605 606 { 607 test.enable(); 608 test.enable_enqueue(); 609 test.empty(); 610 DLIB_TEST(test.size() == 0); 611 DLIB_TEST(test.is_enabled() == true); 612 DLIB_TEST(test.is_enqueue_enabled() == true); 613 DLIB_TEST(test.is_dequeue_enabled() == true); 614 test.disable_dequeue(); 615 dlog << LINFO << "Make sure disable_dequeue() works right..."; 616 DLIB_TEST(test.is_dequeue_enabled() == false); 617 DLIB_TEST(test.dequeue(a) == false); 618 test.wait_until_empty(); 619 a = 4; 620 test.enqueue(a); 621 test.wait_until_empty(); 622 test.wait_for_num_blocked_dequeues(4); 623 DLIB_TEST(test.size() == 1); 624 DLIB_TEST(test.dequeue(a) == false); 625 DLIB_TEST(test.dequeue_or_timeout(a,10000) == false); 626 DLIB_TEST(test.size() == 1); 627 a = 0; 628 test.enable_dequeue(); 629 DLIB_TEST(test.is_dequeue_enabled() == true); 630 DLIB_TEST(test.dequeue(a) == true); 631 DLIB_TEST(a == 4); 632 test_1.wait_until_empty(); 633 } 634 { 635 test_1.enable(); 636 test_1.enable_enqueue(); 637 test_1.empty(); 638 DLIB_TEST(test_1.size() == 0); 639 DLIB_TEST(test_1.is_enabled() == true); 640 DLIB_TEST(test_1.is_enqueue_enabled() == true); 641 DLIB_TEST(test_1.is_dequeue_enabled() == true); 642 test_1.disable_dequeue(); 643 dlog << LINFO << "Make sure disable_dequeue() works right..."; 644 DLIB_TEST(test_1.is_dequeue_enabled() == false); 645 DLIB_TEST(test_1.dequeue(a) == false); 646 a = 4; 647 test_1.wait_for_num_blocked_dequeues(4); 648 test_1.wait_for_num_blocked_dequeues(0); 649 test_1.enqueue(a); 650 test_1.wait_until_empty(); 651 DLIB_TEST(test_1.size() == 1); 652 DLIB_TEST(test_1.dequeue(a) == false); 653 DLIB_TEST(test_1.dequeue_or_timeout(a,10000) == false); 654 DLIB_TEST(test_1.size() == 1); 655 a = 0; 656 test_1.enable_dequeue(); 657 DLIB_TEST(test_1.is_dequeue_enabled() == true); 658 DLIB_TEST(test_1.dequeue(a) == true); 659 DLIB_TEST(a == 4); 660 test_1.wait_until_empty(); 661 } 662 663 } 664 665 666 667 668 class pipe_tester : public tester 669 { 670 public: pipe_tester()671 pipe_tester ( 672 ) : 673 tester ("test_pipe", 674 "Runs tests on the pipe component.") 675 {} 676 perform_test()677 void perform_test ( 678 ) 679 { 680 pipe_kernel_test<dlib::pipe<int> >(); 681 682 do_zero_size_test_with_timeouts(); 683 } 684 } a; 685 686 } 687 688 689