1 // Copyright (C) 2006  Davis E. King (davis@dlib.net)
2 // License: Boost Software License   See LICENSE.txt for the full license.
3 
4 
5 #include <sstream>
6 #include <string>
7 #include <cstdlib>
8 #include <ctime>
9 #include <dlib/misc_api.h>
10 #include <dlib/pipe.h>
11 
12 #include "tester.h"
13 
14 namespace
15 {
16     using namespace test;
17     using namespace dlib;
18     using namespace std;
19 
20     logger dlog("test.pipe");
21 
22     namespace pipe_kernel_test_helpers
23     {
24         const unsigned long proc1_count = 10000;
25         dlib::mutex m;
26         signaler s(m);
27         unsigned long threads_running = 0;
28         bool found_error;
29 
add_running_thread()30         inline void add_running_thread (
31         )
32         {
33             auto_mutex M(m);
34             ++threads_running;
35         }
36 
remove_running_thread()37         inline void remove_running_thread (
38         )
39         {
40             auto_mutex M(m);
41             --threads_running;
42             s.broadcast();
43         }
44 
wait_for_threads()45         inline void wait_for_threads (
46         )
47         {
48             auto_mutex M(m);
49             while (threads_running > 0)
50                 s.wait();
51         }
52 
53         template <
54             typename pipe
55             >
threadproc1(void * param)56         void threadproc1 (
57             void* param
58         )
59         {
60             add_running_thread();
61             pipe& p = *static_cast<pipe*>(param);
62             try
63             {
64 
65                 int last = -1;
66                 for (unsigned long i = 0; i < proc1_count; ++i)
67                 {
68                     int cur=0;
69                     DLIB_TEST(p.dequeue(cur) == true);
70                     DLIB_TEST(last + 1 == cur);
71                     last = cur;
72                 }
73                 DLIB_TEST(p.size() == 0);
74             }
75             catch(exception& e)
76             {
77                 auto_mutex M(m);
78                 found_error = true;
79                 cout << "\n\nERRORS FOUND" << endl;
80                 cout << e.what() << endl;
81                 dlog << LWARN << "ERRORS FOUND";
82                 dlog << LWARN << e.what();
83                 p.disable();
84             }
85 
86             remove_running_thread();
87         }
88 
89 
90         template <
91             typename pipe
92             >
threadproc2(void * param)93         void threadproc2 (
94             void* param
95         )
96         {
97             add_running_thread();
98             pipe& p = *static_cast<pipe*>(param);
99             try
100             {
101 
102                 int last = -1;
103                 int cur;
104                 while (p.dequeue(cur))
105                 {
106                     DLIB_TEST(last < cur);
107                     last = cur;
108                 }
109                 auto_mutex M(m);
110             }
111             catch(exception& e)
112             {
113                 auto_mutex M(m);
114                 found_error = true;
115                 cout << "\n\nERRORS FOUND" << endl;
116                 cout << e.what() << endl;
117                 dlog << LWARN << "ERRORS FOUND";
118                 dlog << LWARN << e.what();
119                 p.disable();
120             }
121             remove_running_thread();
122         }
123 
124 
125 
126         template <
127             typename pipe
128             >
threadproc3(void * param)129         void threadproc3 (
130             void* param
131         )
132         {
133             add_running_thread();
134             pipe& p = *static_cast<pipe*>(param);
135             try
136             {
137 
138                 int last = -1;
139                 int cur;
140                 while (p.dequeue_or_timeout(cur,100000))
141                 {
142                     DLIB_TEST(last < cur);
143                     last = cur;
144                 }
145                 auto_mutex M(m);
146             }
147             catch(exception& e)
148             {
149                 auto_mutex M(m);
150                 found_error = true;
151                 cout << "\n\nERRORS FOUND" << endl;
152                 cout << e.what() << endl;
153                 dlog << LWARN << "ERRORS FOUND";
154                 dlog << LWARN << e.what();
155                 p.disable();
156             }
157             remove_running_thread();
158         }
159 
160 
161     }
162 
163 // ----------------------------------------------------------------------------------------
164 // ----------------------------------------------------------------------------------------
165 
166     template<typename in_type, typename out_type>
167     class PipelineProcessor : private dlib::threaded_object
168     {
169     public:
PipelineProcessor(dlib::pipe<in_type> & in,dlib::pipe<out_type> & out)170         PipelineProcessor(
171             dlib::pipe<in_type> & in,
172             dlib::pipe<out_type> & out) :
173             InPipe(in),
174             OutPipe(out),
175             InMsg(),
176             OutMsg() {
177                 start();
178             }
179 
~PipelineProcessor()180         ~PipelineProcessor() {
181             // signal the thread to stop
182             stop();
183             wait();
184         }
185 
186     private:
187         dlib::pipe<in_type> & InPipe;
188         dlib::pipe<out_type> & OutPipe;
189 
190         in_type InMsg;
191         out_type OutMsg;
192 
thread()193         void thread()
194         {
195             while (!should_stop()) {
196                 if(InPipe.dequeue_or_timeout(InMsg, 100))
197                 {
198                     // if function signals ready to send OutMsg
199                     while (!OutPipe.enqueue_or_timeout(OutMsg, 100))
200                     {
201                         // try to send until should stop
202                         if (should_stop())
203                         {
204                             return;
205                         }
206                     }
207                 }
208             }
209         };
210     };
211 
212 
do_zero_size_test_with_timeouts()213     void do_zero_size_test_with_timeouts()
214     {
215         dlog << LINFO << "in do_zero_size_test_with_timeouts()";
216         // make sure we can get though this without deadlocking
217         for (int k = 0; k < 10; ++k)
218         {
219             dlib::pipe<int> in_pipe(10);
220             dlib::pipe<float> out_pipe(0);
221             {
222                 PipelineProcessor<int, float> pp(in_pipe, out_pipe);
223 
224                 int in = 1;
225                 in_pipe.enqueue(in);
226                 in = 2;
227                 in_pipe.enqueue(in);
228                 in = 3;
229                 in_pipe.enqueue(in);
230                 // sleep to make sure thread enqueued
231                 dlib::sleep(100);
232 
233                 float out = 1.0f;
234                 out_pipe.dequeue(out);
235                 dlib::sleep(100);
236             }
237             print_spinner();
238         }
239 
240     }
241 
242 // ----------------------------------------------------------------------------------------
243 // ----------------------------------------------------------------------------------------
244 
245     template <
246         typename pipe
247         >
pipe_kernel_test()248     void pipe_kernel_test (
249     )
250     /*!
251         requires
252             - pipe is an implementation of pipe/pipe_kernel_abstract.h and
253               is instantiated with int
254         ensures
255             - runs tests on pipe for compliance with the specs
256     !*/
257     {
258         using namespace pipe_kernel_test_helpers;
259         found_error = false;
260 
261 
262         print_spinner();
263         pipe test(10), test2(100);
264         pipe test_0(0), test2_0(0);
265         pipe test_1(1), test2_1(1);
266 
267         DLIB_TEST(test.size() == 0);
268         DLIB_TEST(test2.size() == 0);
269         DLIB_TEST(test_0.size() == 0);
270         DLIB_TEST(test2_0.size() == 0);
271         DLIB_TEST(test_1.size() == 0);
272         DLIB_TEST(test2_1.size() == 0);
273 
274         DLIB_TEST(test.is_enqueue_enabled() == true);
275         DLIB_TEST(test.is_dequeue_enabled() == true);
276         DLIB_TEST(test.is_enabled() == true);
277 
278         test.empty();
279         test2.empty();
280         DLIB_TEST(test.size() == 0);
281         DLIB_TEST(test2.size() == 0);
282 
283         test_0.empty();
284         test2_0.empty();
285         DLIB_TEST(test_0.size() == 0);
286         DLIB_TEST(test2_0.size() == 0);
287 
288         test_1.empty();
289         test2_1.empty();
290         DLIB_TEST(test_1.size() == 0);
291         DLIB_TEST(test2_1.size() == 0);
292 
293 
294 
295         int a;
296         a = 3;
297         test.enqueue(a);
298         DLIB_TEST(test.size() == 1);
299         a = 5;
300         test.enqueue(a);
301         DLIB_TEST(test.size() == 2);
302 
303         a = 0;
304         test.dequeue(a);
305         DLIB_TEST(a == 3);
306         DLIB_TEST(test.size() == 1);
307 
308         a = 0;
309         test.dequeue(a);
310         DLIB_TEST(a == 5);
311         DLIB_TEST(test.size() == 0);
312 
313 
314         print_spinner();
315         {
316             dlog << LINFO << "starting normal length pipe tests";
317             create_new_thread(&threadproc1<pipe>,&test);
318             create_new_thread(&threadproc2<pipe>,&test2);
319             create_new_thread(&threadproc2<pipe>,&test2);
320             create_new_thread(&threadproc2<pipe>,&test2);
321 
322             for (unsigned long i = 0; i < proc1_count; ++i)
323             {
324                 a = i;
325                 test.enqueue(a);
326             }
327             DLIB_TEST(test.is_enqueue_enabled() == true);
328             test.disable_enqueue();
329             DLIB_TEST(test.is_enqueue_enabled() == false);
330             for (unsigned long i = 0; i < proc1_count; ++i)
331             {
332                 a = i;
333                 test.enqueue(a);
334             }
335 
336             for (unsigned long i = 0; i < 100000; ++i)
337             {
338                 a = i;
339                 if (i%2 == 0)
340                     test2.enqueue(a);
341                 else
342                     test2.enqueue_or_timeout(a,100000);
343             }
344 
345             test2.wait_for_num_blocked_dequeues(3);
346             DLIB_TEST(test2.size() == 0);
347             test2.disable();
348 
349             wait_for_threads();
350             DLIB_TEST(test2.size() == 0);
351 
352             test2.enable();
353 
354             print_spinner();
355 
356             create_new_thread(&threadproc3<pipe>,&test2);
357             create_new_thread(&threadproc3<pipe>,&test2);
358 
359 
360             for (unsigned long i = 0; i < 100000; ++i)
361             {
362                 a = i;
363                 if (i%2 == 0)
364                     test2.enqueue(a);
365                 else
366                     test2.enqueue_or_timeout(a,100000);
367             }
368 
369             test2.wait_for_num_blocked_dequeues(2);
370             DLIB_TEST(test2.size() == 0);
371             test2.disable();
372 
373             wait_for_threads();
374             DLIB_TEST(test2.size() == 0);
375 
376         }
377 
378 
379         print_spinner();
380         {
381             dlog << LINFO << "starting 0 length pipe tests";
382             create_new_thread(&threadproc1<pipe>,&test_0);
383             create_new_thread(&threadproc2<pipe>,&test2_0);
384             create_new_thread(&threadproc2<pipe>,&test2_0);
385             create_new_thread(&threadproc2<pipe>,&test2_0);
386             dlog << LTRACE << "0: 1";
387 
388             for (unsigned long i = 0; i < proc1_count; ++i)
389             {
390                 a = i;
391                 test_0.enqueue(a);
392             }
393 
394             dlog << LTRACE << "0: 2";
395             DLIB_TEST(test_0.is_enqueue_enabled() == true);
396             test_0.disable_enqueue();
397             DLIB_TEST(test_0.is_enqueue_enabled() == false);
398             for (unsigned long i = 0; i < proc1_count; ++i)
399             {
400                 a = i;
401                 test_0.enqueue(a);
402             }
403 
404             dlog << LTRACE << "0: 3";
405             for (unsigned long i = 0; i < 100000; ++i)
406             {
407                 a = i;
408                 if (i%2 == 0)
409                     test2_0.enqueue(a);
410                 else
411                     test2_0.enqueue_or_timeout(a,100000);
412             }
413 
414             print_spinner();
415             dlog << LTRACE << "0: 4";
416             test2_0.wait_for_num_blocked_dequeues(3);
417             DLIB_TEST(test2_0.size() == 0);
418             test2_0.disable();
419 
420             wait_for_threads();
421             DLIB_TEST(test2_0.size() == 0);
422 
423             dlog << LTRACE << "0: 5";
424             test2_0.enable();
425 
426 
427             create_new_thread(&threadproc3<pipe>,&test2_0);
428             create_new_thread(&threadproc3<pipe>,&test2_0);
429 
430 
431             for (unsigned long i = 0; i < 20000; ++i)
432             {
433                 if ((i%100) == 0)
434                     print_spinner();
435 
436                 a = i;
437                 if (i%2 == 0)
438                     test2_0.enqueue(a);
439                 else
440                     test2_0.enqueue_or_timeout(a,100000);
441             }
442 
443             dlog << LTRACE << "0: 6";
444             test2_0.wait_for_num_blocked_dequeues(2);
445             DLIB_TEST(test2_0.size() == 0);
446             test2_0.disable();
447 
448             wait_for_threads();
449             DLIB_TEST(test2_0.size() == 0);
450 
451             dlog << LTRACE << "0: 7";
452         }
453 
454         print_spinner();
455         {
456             dlog << LINFO << "starting 1 length pipe tests";
457             create_new_thread(&threadproc1<pipe>,&test_1);
458             create_new_thread(&threadproc2<pipe>,&test2_1);
459             create_new_thread(&threadproc2<pipe>,&test2_1);
460             create_new_thread(&threadproc2<pipe>,&test2_1);
461 
462             for (unsigned long i = 0; i < proc1_count; ++i)
463             {
464                 a = i;
465                 test_1.enqueue(a);
466             }
467             DLIB_TEST(test_1.is_enqueue_enabled() == true);
468             test_1.disable_enqueue();
469             DLIB_TEST(test_1.is_enqueue_enabled() == false);
470             for (unsigned long i = 0; i < proc1_count; ++i)
471             {
472                 a = i;
473                 test_1.enqueue(a);
474             }
475             print_spinner();
476 
477             for (unsigned long i = 0; i < 100000; ++i)
478             {
479                 a = i;
480                 if (i%2 == 0)
481                     test2_1.enqueue(a);
482                 else
483                     test2_1.enqueue_or_timeout(a,100000);
484             }
485 
486             test2_1.wait_for_num_blocked_dequeues(3);
487             DLIB_TEST(test2_1.size() == 0);
488             test2_1.disable();
489 
490             wait_for_threads();
491             DLIB_TEST(test2_1.size() == 0);
492 
493             test2_1.enable();
494 
495 
496             create_new_thread(&threadproc3<pipe>,&test2_1);
497             create_new_thread(&threadproc3<pipe>,&test2_1);
498 
499 
500             for (unsigned long i = 0; i < 100000; ++i)
501             {
502                 a = i;
503                 if (i%2 == 0)
504                     test2_1.enqueue(a);
505                 else
506                     test2_1.enqueue_or_timeout(a,100000);
507             }
508 
509             test2_1.wait_for_num_blocked_dequeues(2);
510             DLIB_TEST(test2_1.size() == 0);
511             test2_1.disable();
512 
513             wait_for_threads();
514             DLIB_TEST(test2_1.size() == 0);
515 
516         }
517 
518         test.enable_enqueue();
519         test_0.enable_enqueue();
520         test_1.enable_enqueue();
521 
522         DLIB_TEST(test.is_enabled());
523         DLIB_TEST(test.is_enqueue_enabled());
524         DLIB_TEST(test_0.is_enabled());
525         DLIB_TEST(test_0.is_enqueue_enabled());
526         DLIB_TEST(test_1.is_enabled());
527         DLIB_TEST(test_1.is_enqueue_enabled());
528 
529         DLIB_TEST(test.size() == 0);
530         DLIB_TEST(test_0.size() == 0);
531         DLIB_TEST(test_1.size() == 0);
532         DLIB_TEST(test.max_size() == 10);
533         DLIB_TEST(test_0.max_size() == 0);
534         DLIB_TEST(test_1.max_size() == 1);
535 
536 
537         for (int i = 0; i < 100; ++i)
538         {
539             a = 1;
540             test.enqueue_or_timeout(a,0);
541             a = 1;
542             test_0.enqueue_or_timeout(a,0);
543             a = 1;
544             test_1.enqueue_or_timeout(a,0);
545         }
546 
547         DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
548         DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
549         DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
550 
551         for (int i = 0; i < 10; ++i)
552         {
553             a = 0;
554             DLIB_TEST(test.enqueue_or_timeout(a,10) == false);
555             a = 0;
556             DLIB_TEST(test_0.enqueue_or_timeout(a,10) == false);
557             a = 0;
558             DLIB_TEST(test_1.enqueue_or_timeout(a,10) == false);
559         }
560 
561         DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
562         DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
563         DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
564 
565         for (int i = 0; i < 10; ++i)
566         {
567             a = 0;
568             DLIB_TEST(test.dequeue_or_timeout(a,0) == true);
569             DLIB_TEST(a == 1);
570         }
571 
572         DLIB_TEST(test.max_size() == 10);
573         DLIB_TEST(test_0.max_size() == 0);
574         DLIB_TEST(test_1.max_size() == 1);
575 
576         a = 0;
577         DLIB_TEST(test_1.dequeue_or_timeout(a,0) == true);
578 
579         DLIB_TEST(test.max_size() == 10);
580         DLIB_TEST(test_0.max_size() == 0);
581         DLIB_TEST(test_1.max_size() == 1);
582 
583 
584         DLIB_TEST_MSG(a == 1,"a: " << a);
585 
586         DLIB_TEST(test.size() == 0);
587         DLIB_TEST(test_0.size() == 0);
588         DLIB_TEST(test_1.size() == 0);
589 
590         DLIB_TEST(test.dequeue_or_timeout(a,0) == false);
591         DLIB_TEST(test_0.dequeue_or_timeout(a,0) == false);
592         DLIB_TEST(test_1.dequeue_or_timeout(a,0) == false);
593         DLIB_TEST(test.dequeue_or_timeout(a,10) == false);
594         DLIB_TEST(test_0.dequeue_or_timeout(a,10) == false);
595         DLIB_TEST(test_1.dequeue_or_timeout(a,10) == false);
596 
597         DLIB_TEST(test.size() == 0);
598         DLIB_TEST(test_0.size() == 0);
599         DLIB_TEST(test_1.size() == 0);
600 
601         DLIB_TEST(found_error == false);
602 
603 
604 
605 
606         {
607             test.enable();
608             test.enable_enqueue();
609             test.empty();
610             DLIB_TEST(test.size() == 0);
611             DLIB_TEST(test.is_enabled() == true);
612             DLIB_TEST(test.is_enqueue_enabled() == true);
613             DLIB_TEST(test.is_dequeue_enabled() == true);
614             test.disable_dequeue();
615             dlog << LINFO << "Make sure disable_dequeue() works right...";
616             DLIB_TEST(test.is_dequeue_enabled() == false);
617             DLIB_TEST(test.dequeue(a) == false);
618             test.wait_until_empty();
619             a = 4;
620             test.enqueue(a);
621             test.wait_until_empty();
622             test.wait_for_num_blocked_dequeues(4);
623             DLIB_TEST(test.size() == 1);
624             DLIB_TEST(test.dequeue(a) == false);
625             DLIB_TEST(test.dequeue_or_timeout(a,10000) == false);
626             DLIB_TEST(test.size() == 1);
627             a = 0;
628             test.enable_dequeue();
629             DLIB_TEST(test.is_dequeue_enabled() == true);
630             DLIB_TEST(test.dequeue(a) == true);
631             DLIB_TEST(a == 4);
632             test_1.wait_until_empty();
633         }
634         {
635             test_1.enable();
636             test_1.enable_enqueue();
637             test_1.empty();
638             DLIB_TEST(test_1.size() == 0);
639             DLIB_TEST(test_1.is_enabled() == true);
640             DLIB_TEST(test_1.is_enqueue_enabled() == true);
641             DLIB_TEST(test_1.is_dequeue_enabled() == true);
642             test_1.disable_dequeue();
643             dlog << LINFO << "Make sure disable_dequeue() works right...";
644             DLIB_TEST(test_1.is_dequeue_enabled() == false);
645             DLIB_TEST(test_1.dequeue(a) == false);
646             a = 4;
647             test_1.wait_for_num_blocked_dequeues(4);
648             test_1.wait_for_num_blocked_dequeues(0);
649             test_1.enqueue(a);
650             test_1.wait_until_empty();
651             DLIB_TEST(test_1.size() == 1);
652             DLIB_TEST(test_1.dequeue(a) == false);
653             DLIB_TEST(test_1.dequeue_or_timeout(a,10000) == false);
654             DLIB_TEST(test_1.size() == 1);
655             a = 0;
656             test_1.enable_dequeue();
657             DLIB_TEST(test_1.is_dequeue_enabled() == true);
658             DLIB_TEST(test_1.dequeue(a) == true);
659             DLIB_TEST(a == 4);
660             test_1.wait_until_empty();
661         }
662 
663     }
664 
665 
666 
667 
668     class pipe_tester : public tester
669     {
670     public:
pipe_tester()671         pipe_tester (
672         ) :
673             tester ("test_pipe",
674                     "Runs tests on the pipe component.")
675         {}
676 
perform_test()677         void perform_test (
678         )
679         {
680             pipe_kernel_test<dlib::pipe<int> >();
681 
682             do_zero_size_test_with_timeouts();
683         }
684     } a;
685 
686 }
687 
688 
689