1 /*===========================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 
27 /**
28 * Unit tests for KProc interfaces
29 */
30 
31 #include <ktst/unit_test.hpp>
32 
33 #include <klib/rc.h>
34 #include <klib/time.h>
35 #include <klib/log.h>
36 
37 #include <atomic32.h>
38 #include <os-native.h>
39 
40 #include <kproc/cond.h>
41 #include <kproc/lock.h>
42 #include <kproc/thread.h>
43 #include <kproc/timeout.h>
44 #include <kproc/queue.h>
45 
46 #include <stdexcept>
47 #include <sstream>
48 
49 #include <cstring> // mamset
50 
51 using namespace std;
52 using namespace ncbi::NK;
53 
54 extern "C" { static rc_t argsHandler(int argc, char* argv[]); }
55 TEST_SUITE_WITH_ARGS_HANDLER(KProcTestSuite, argsHandler);
56 
57 //TODO: KThread
58 
59 ///////////////////////// KLock
TEST_CASE(KLock_NULL)60 TEST_CASE( KLock_NULL )
61 {
62     REQUIRE_RC_FAIL(KLockMake(NULL));
63 }
64 
65 class KLockFixture
66 {
67 public:
KLockFixture()68     KLockFixture()
69     :   threadRc(0),
70         thread(0),
71         lock(0)
72     {
73         if (KLockMake(&lock) != 0)
74             throw logic_error("KLockFixture: KLockMake failed");
75     }
~KLockFixture()76     ~KLockFixture()
77     {
78         if (thread != 0 && KThreadRelease(thread) != 0)
79             throw logic_error("~KLockFixture: KThreadRelease failed");
80         if (KLockRelease((const KLock*)lock) != 0)
81             throw logic_error("~KLockFixture: KLockRelease failed");
82     }
83 
84 protected:
85     class Thread {
86     public:
87         // danger - this should be an extern "C" function
88         // with CC calling convention on Windows
KLock_ThreadFn(const KThread * thread,void * data)89         static rc_t KLock_ThreadFn ( const KThread *thread, void *data )
90         {
91             KLockFixture* self = (KLockFixture*)data;
92 
93             LOG(LogLevel::e_message, "KLock_ThreadFn acquiring lock, set threadWaiting to 1" << endl);
94             atomic32_set ( & self->threadWaiting, 1 );
95 
96             while (KLockAcquire(self->lock) != 0)
97             {
98                 TestEnv::SleepMs(1);
99             }
100             LOG(LogLevel::e_message, "KLock_ThreadFn: lock acquired" << endl);
101 
102             atomic32_set ( & self->threadWaiting, 0 );
103             LOG(LogLevel::e_message, "KLock_ThreadFn: set threadWaiting to 0" << endl);
104 
105             self->threadRc = KLockUnlock(self->lock);
106             LOG(LogLevel::e_message, "KLock_Timed_ThreadFn: exiting" << endl);
107             return 0;
108         }
109     };
110 
StartThread()111     rc_t StartThread()
112     {
113         atomic32_set ( & threadWaiting, 0 );
114         LOG(LogLevel::e_message, "StartThread: set threadWaiting to 0" << endl);
115 
116         threadRc = 0;
117         rc_t rc = KThreadMake(&thread, Thread::KLock_ThreadFn, this);
118         while (threadRc == 0 && !atomic32_read (&threadWaiting))
119         {
120             TestEnv::SleepMs(1);
121         }
122         LOG(LogLevel::e_message, "StartThread: threadWaiting == 1" << endl);
123         return rc;
124     }
125 
126 public:
127     rc_t threadRc;
128     KThread* thread;
129     volatile atomic32_t threadWaiting;
130     timeout_t tm;
131     KLock* lock;
132 };
133 
FIXTURE_TEST_CASE(KLock_Acquire,KLockFixture)134 FIXTURE_TEST_CASE(KLock_Acquire, KLockFixture)
135 {
136     // lock
137     REQUIRE_RC(KLockAcquire(lock));
138     // start a thread that tries to lock, see it wait for the lock to become available
139     REQUIRE_RC(StartThread()); // makes sure threadWaiting == 1
140 
141     // unlock, see the thread finish
142     REQUIRE_RC(KLockUnlock(lock));
143     while (atomic32_read (&threadWaiting))
144     {
145         TestEnv::SleepMs(1);
146     }
147 
148     REQUIRE_RC(threadRc);
149     LOG(LogLevel::e_message, "KLock_Acquire: done" << endl);
150 }
151 
152 ///////////////////////// KTimedLock
TEST_CASE(KTimedLock_NULL)153 TEST_CASE( KTimedLock_NULL )
154 {
155     REQUIRE_RC_FAIL(KTimedLockMake(NULL));
156 }
157 
158 class KTimedLockFixture
159 {
160 public:
KTimedLockFixture()161     KTimedLockFixture()
162     :   threadRc(0),
163         thread(0),
164         lock(0)
165     {
166         if (KTimedLockMake(&lock) != 0)
167             throw logic_error("KLockFixture: KLockMake failed");
168     }
~KTimedLockFixture()169     ~KTimedLockFixture()
170     {
171         if (thread != 0 && KThreadRelease(thread) != 0)
172             throw logic_error("~KLockFixture: KThreadRelease failed");
173         if (KLockRelease((const KLock*)lock) != 0)
174             throw logic_error("~KLockFixture: KLockRelease failed");
175     }
176 
177 protected:
178     class Thread {
179     public:
180         // danger - this should be an extern "C" function
181         // with CC calling convention on Windows
KLock_Timed_ThreadFn(const KThread * thread,void * data)182         static rc_t KLock_Timed_ThreadFn ( const KThread *thread, void *data )
183         {
184             KTimedLockFixture* self = (KTimedLockFixture*)data;
185 
186             LOG(LogLevel::e_message, "KLock_Timed_ThreadFn acquiring lock, set threadWaiting to 1, timeout = " << self->tm.mS << "ms" << endl);
187             atomic32_set ( & self->threadWaiting, 1 );
188 
189             self->threadRc = KTimedLockAcquire(self->lock, &self->tm);
190             if (self->threadRc == 0)
191                 LOG(LogLevel::e_message, "KLock_Timed_ThreadFn: lock acquired" << endl);
192             else
193                 LOG(LogLevel::e_message, "KLock_Timed_ThreadFn: lock acquire failed" << endl);
194 
195             LOG(LogLevel::e_message, "KLock_Timed_ThreadFn: set threadWaiting to 0" << endl);
196 
197             if (self->threadRc == 0)
198                 self->threadRc = KTimedLockUnlock(self->lock);
199             LOG(LogLevel::e_message, "KLock_Timed_ThreadFn: exiting" << endl);
200             atomic32_set ( & self->threadWaiting, 0 );
201             return 0;
202         }
203     };
204 
StartThread(size_t timeout)205     rc_t StartThread(size_t timeout)
206     {
207         rc_t rc = TimeoutInit( &tm, timeout );
208         if ( rc == 0)
209         {
210             atomic32_set ( & threadWaiting, 0 );
211             LOG(LogLevel::e_message, "StartTimedThread: set threadWaiting to 0" << endl);
212 
213             threadRc = 0;
214             rc = KThreadMake(&thread, Thread::KLock_Timed_ThreadFn, this);
215             while (threadRc == 0 && !atomic32_read (&threadWaiting))
216             {
217                 TestEnv::SleepMs(1);
218             }
219             LOG(LogLevel::e_message, "StartTimedThread: threadWaiting == 1" << endl);
220         }
221         return rc;
222     }
223 
224 public:
225     rc_t threadRc;
226     KThread* thread;
227     volatile atomic32_t threadWaiting;
228     timeout_t tm;
229     KTimedLock* lock;
230 };
231 
FIXTURE_TEST_CASE(KTimedLock_Acquire,KTimedLockFixture)232 FIXTURE_TEST_CASE(KTimedLock_Acquire, KTimedLockFixture)
233 {
234     // lock
235     REQUIRE_RC(KTimedLockAcquire(lock, NULL));
236 
237     // start a thread that tries to lock
238     LOG(LogLevel::e_message, "TEST_KLock_TimedAcquire: starting thread" << endl);
239     REQUIRE_RC(StartThread(1000));// makes sure threadWaiting == 1
240 
241     // unlock, see the thread finish
242     LOG(LogLevel::e_message, "TEST_KLock_TimedAcquire: unlocking" << endl);
243     REQUIRE_RC(KTimedLockUnlock(lock));
244 
245     // wait for the thread to finish
246     while (atomic32_read (&threadWaiting))
247     {
248         TestEnv::SleepMs(1);
249     }
250     REQUIRE_RC(threadRc);
251 
252     LOG(LogLevel::e_message, "TEST_KLock_TimedAcquire: done" << endl);
253 }
254 
255 #ifdef WINDOWS
FIXTURE_TEST_CASE(KTimedLock_Acquire_Busy,KTimedLockFixture)256 FIXTURE_TEST_CASE(KTimedLock_Acquire_Busy, KTimedLockFixture)
257 {
258     // lock
259     REQUIRE_RC(KTimedLockAcquire(lock, NULL));
260 
261     // start a thread that tries to lock, see it error out
262     REQUIRE_RC(StartThread(100));// makes sure threadWaiting == 1
263 
264     // do not unlock, wait for the thread to finish
265     while (atomic32_read (&threadWaiting))
266     {
267         TestEnv::SleepMs(1);
268     }
269     REQUIRE_EQ(threadRc, RC(rcPS, rcLock, rcLocking, rcLock, rcBusy));
270 
271     REQUIRE_RC(KTimedLockUnlock(lock));
272 }
273 #else
FIXTURE_TEST_CASE(KTimedLock_Acquire_Timeout,KTimedLockFixture)274 FIXTURE_TEST_CASE(KTimedLock_Acquire_Timeout, KTimedLockFixture)
275 {
276     // lock
277     REQUIRE_RC(KTimedLockAcquire(lock, NULL));
278 
279     // start a thread that tries to lock, see it time out
280     REQUIRE_RC(StartThread(100));// makes sure threadWaiting == 1
281 
282     // do not unlock, wait for the thread to finish
283     while (atomic32_read(&threadWaiting))
284     {
285         TestEnv::SleepMs(1);
286     }
287     REQUIRE_EQ(threadRc, RC(rcPS, rcLock, rcLocking, rcTimeout, rcExhausted)); // timed out
288 
289     REQUIRE_RC(KTimedLockUnlock(lock));
290 }
291 #endif
292 
293 ///////////////////////// KRWLock
TEST_CASE(KRWLock_NULL)294 TEST_CASE( KRWLock_NULL )
295 {
296     REQUIRE_RC_FAIL(KRWLockMake(NULL));
297 }
298 
299 class KRWLockFixture
300 {
301 public:
KRWLockFixture()302     KRWLockFixture()
303     :   threadRc(0),
304         thread(0),
305         lock(0)
306     {
307         atomic32_set ( & threadWaiting, 0 );
308 
309         if (KRWLockMake(&lock) != 0)
310             throw logic_error("KLockFixture: KLockMake failed");
311     }
~KRWLockFixture()312     ~KRWLockFixture()
313     {
314         if (thread != 0 && KThreadRelease(thread) != 0)
315             throw logic_error("~KRWLockFixture: KThreadRelease failed");
316         if (KRWLockRelease((const KRWLock*)lock) != 0)
317             throw logic_error("~KRWLockFixture: KLockRelease failed");
318     }
319 
320 protected:
321     class Thread {
322     public:
KRWLock_Reader_ThreadFn(const KThread * thread,void * data)323         static rc_t KRWLock_Reader_ThreadFn ( const KThread *thread, void *data )
324         {
325             KRWLockFixture* self = (KRWLockFixture*)data;
326             atomic32_set( & self->threadWaiting, true );
327 
328             while (KRWLockAcquireShared(self->lock) != 0)
329             {
330                 TestEnv::SleepMs(1);
331             }
332             self->threadRc = KRWLockUnlock(self->lock);
333             atomic32_set( & self->threadWaiting, false );
334             return 0;
335         }
KRWLock_Writer_ThreadFn(const KThread * thread,void * data)336         static rc_t KRWLock_Writer_ThreadFn ( const KThread *thread, void *data )
337         {
338             KRWLockFixture* self = (KRWLockFixture*)data;
339             atomic32_set( & self->threadWaiting, true );
340 
341             LOG(LogLevel::e_message, "KRWLock_Writer_ThreadFn: calling KRWLockAcquireExcl\n");
342             self->threadRc = KRWLockAcquireExcl(self->lock);
343             LOG(LogLevel::e_message, "KRWLock_Writer_ThreadFn: out of KRWLockAcquireExcl\n");
344             if (self->threadRc == 0)
345             {
346                 LOG(LogLevel::e_message, "KRWLock_Writer_ThreadFn: calling KRWLockUnlock\n");
347                 self->threadRc = KRWLockUnlock(self->lock);
348             }
349             atomic32_set( & self->threadWaiting, false );
350             return 0;
351         }
KRWLock_ReaderTimed_ThreadFn(const KThread * thread,void * data)352         static rc_t KRWLock_ReaderTimed_ThreadFn ( const KThread *thread, void *data )
353         {
354             KRWLockFixture* self = (KRWLockFixture*)data;
355             atomic32_set( & self->threadWaiting, true );
356 
357             self->threadRc = KRWLockTimedAcquireShared(self->lock, &self->tm);
358             if (self->threadRc == 0)
359                 self->threadRc = KRWLockUnlock(self->lock);
360             atomic32_set( & self->threadWaiting, false );
361             return 0;
362         }
KRWLock_WriterTimed_ThreadFn(const KThread * thread,void * data)363         static rc_t KRWLock_WriterTimed_ThreadFn ( const KThread *thread, void *data )
364         {
365             KRWLockFixture* self = (KRWLockFixture*)data;
366             atomic32_set( & self->threadWaiting, true );
367 
368             self->threadRc = KRWLockTimedAcquireExcl(self->lock, &self->tm);
369             if (self->threadRc == 0)
370                 self->threadRc = KRWLockUnlock(self->lock);
371             atomic32_set( & self->threadWaiting, false );
372             return 0;
373         }
374     };
375 
StartThread(bool writer)376     rc_t StartThread(bool writer)
377     {
378         threadRc = 0;
379         rc_t rc = KThreadMake(&thread, writer ? Thread::KRWLock_Writer_ThreadFn : Thread::KRWLock_Reader_ThreadFn, this);
380         while (!atomic32_read (&threadWaiting))
381         {
382             TestEnv::SleepMs(1);
383         }
384         return rc;
385     }
StartThread(bool writer,size_t timeout)386     rc_t StartThread(bool writer, size_t timeout)
387     {
388         rc_t rc = TimeoutInit( &tm, timeout );
389         if ( rc == 0)
390         {
391             threadRc = 0;
392             rc = KThreadMake(&thread, writer ? Thread::KRWLock_WriterTimed_ThreadFn : Thread::KRWLock_ReaderTimed_ThreadFn, this);
393             while (!atomic32_read (&threadWaiting))
394             {
395                 TestEnv::SleepMs(1);
396             }
397         }
398         return rc;
399     }
400 
401     volatile atomic32_t threadWaiting;
402     rc_t threadRc;
403     KThread* thread;
404     timeout_t tm;
405     KRWLock* lock;
406 };
407 
FIXTURE_TEST_CASE(KRWLock_ManyReaders,KRWLockFixture)408 FIXTURE_TEST_CASE( KRWLock_ManyReaders, KRWLockFixture )
409 {
410     // get multiple read locks
411     REQUIRE_RC(KRWLockAcquireShared(lock));
412     REQUIRE_RC(KRWLockAcquireShared(lock));
413     REQUIRE_RC(KRWLockAcquireShared(lock));
414     //NB: On Linux, KRWLock goes away without a leak even if Unlock is not called.
415     // On Windows, Unlocks are required.
416     KRWLockUnlock(lock);
417     KRWLockUnlock(lock);
418     KRWLockUnlock(lock);
419 }
420 
FIXTURE_TEST_CASE(KRWLock_OneWriter,KRWLockFixture)421 FIXTURE_TEST_CASE( KRWLock_OneWriter, KRWLockFixture )
422 {
423     REQUIRE_RC(KRWLockAcquireExcl(lock));
424     //NB: trying to lock again from the same thread errors out on Linux but hangs on Windows
425     //REQUIRE_RC_FAIL(KRWLockAcquireExcl(lock));
426     //REQUIRE_RC_FAIL(KRWLockAcquireShared(lock));
427     //TODO: try to acquire from a different thread
428 
429     REQUIRE_RC(KRWLockUnlock(lock));
430 
431     // now, can lock again
432     REQUIRE_RC(KRWLockAcquireShared(lock));
433     REQUIRE_RC(KRWLockUnlock(lock));
434 }
435 
FIXTURE_TEST_CASE(KRWLock_WriterWaitsForReader,KRWLockFixture)436 FIXTURE_TEST_CASE( KRWLock_WriterWaitsForReader, KRWLockFixture )
437 {
438     LOG(LogLevel::e_message, "KRWLock_WriterWaitsForReader: calling KRWLockAcquireShared\n");
439     REQUIRE_RC(KRWLockAcquireShared(lock));
440 
441     // start a thread that tries to write-lock, see it wait
442     LOG(LogLevel::e_message, "KRWLock_WriterWaitsForReader: starting thread\n");
443     REQUIRE_RC(StartThread(true));
444 
445     REQUIRE(atomic32_read ( & threadWaiting ));
446 
447     LOG(LogLevel::e_message, "KRWLock_WriterWaitsForReader: calling KRWLockUnlock\n");
448     REQUIRE_RC(KRWLockUnlock(lock));
449     // let the thread finish
450     while (atomic32_read (&threadWaiting))
451     {
452         TestEnv::SleepMs(1);
453     }
454     REQUIRE(!atomic32_read ( & threadWaiting ));
455 }
456 
FIXTURE_TEST_CASE(KWRLock_Reader_TimedAcquire,KRWLockFixture)457 FIXTURE_TEST_CASE(KWRLock_Reader_TimedAcquire, KRWLockFixture)
458 {
459     // lock
460     REQUIRE_RC(KRWLockAcquireExcl(lock));
461 
462     // start a thread that tries to lock
463     REQUIRE_RC(StartThread(false, 1000));
464 
465     // see the thread wait
466     REQUIRE(atomic32_read ( & threadWaiting ));
467 
468     // unlock, see the thread finish
469     REQUIRE_RC(KRWLockUnlock(lock));
470     while (atomic32_read (&threadWaiting))
471     {
472         TestEnv::SleepMs(1);
473     }
474     REQUIRE_RC(threadRc);
475 }
476 
FIXTURE_TEST_CASE(KWRLock_Reader_TimedAcquire_Timeout,KRWLockFixture)477 FIXTURE_TEST_CASE(KWRLock_Reader_TimedAcquire_Timeout, KRWLockFixture)
478 {
479     // lock
480     REQUIRE_RC(KRWLockAcquireExcl(lock));
481 
482     // start a thread that tries to lock, see it time out
483     REQUIRE_RC(StartThread(false, 500));
484 
485     // see the thread time out
486     while (atomic32_read (&threadWaiting))
487     {
488         TestEnv::SleepMs(1);
489     }
490     rc_t req_rc = RC ( rcPS, rcRWLock, rcLocking, rcTimeout, rcExhausted );
491     REQUIRE_EQ(threadRc, req_rc); // timed out
492 
493     REQUIRE_RC(KRWLockUnlock(lock));
494 }
495 
FIXTURE_TEST_CASE(KWRLock_Writer_TimedAcquire,KRWLockFixture)496 FIXTURE_TEST_CASE(KWRLock_Writer_TimedAcquire, KRWLockFixture)
497 {
498     // read-lock
499     REQUIRE_RC(KRWLockAcquireShared(lock));
500 
501     // start a thread that tries to write-lock and see it wait
502     REQUIRE_RC(StartThread(true, 1000));
503 
504     // see the thread wait
505     TestEnv::SleepMs(300);
506     REQUIRE(atomic32_read (&threadWaiting));
507 
508     // unlock, see the thread finish
509     REQUIRE_RC(KRWLockUnlock(lock));
510     while (atomic32_read (&threadWaiting))
511     {
512         TestEnv::SleepMs(1);
513     }
514     REQUIRE_RC(threadRc);
515 }
516 
FIXTURE_TEST_CASE(KWRLock_Writer_TimedAcquire_Timeout,KRWLockFixture)517 FIXTURE_TEST_CASE(KWRLock_Writer_TimedAcquire_Timeout, KRWLockFixture)
518 {
519     // read-lock
520     REQUIRE_RC(KRWLockAcquireShared(lock));
521 
522     // start a thread that tries to write-lock, see it time out
523     REQUIRE_RC(StartThread(true, 500));
524 
525     // see the thread time out
526     while (atomic32_read (&threadWaiting))
527     {
528         TestEnv::SleepMs(1);
529     }
530     REQUIRE_EQ(threadRc, RC ( rcPS, rcRWLock, rcLocking, rcTimeout, rcExhausted )); // timed out
531 
532     REQUIRE_RC(KRWLockUnlock(lock));
533 }
534 
535 //KCondition
TEST_CASE(KCondition_NULL)536 TEST_CASE( KCondition_NULL )
537 {
538     REQUIRE_RC_FAIL(KConditionMake(NULL));
539 }
TEST_CASE(KCondition_MakeRelease)540 TEST_CASE( KCondition_MakeRelease )
541 {
542     KCondition* cond;
543     REQUIRE_RC(KConditionMake(&cond));
544     REQUIRE_RC(KConditionRelease(cond));
545 }
546 
547 class KConditionFixture
548 {
549 public:
KConditionFixture()550     KConditionFixture()
551     :   threadRc(0),
552         thread(0),
553         lock(0),
554         is_signaled(false),
555         do_broadcast(false)
556     {
557         if (KLockMake(&lock) != 0)
558             throw logic_error("KConditionFixture: KLockMake failed");
559         if (KConditionMake(&cond) != 0)
560             throw logic_error("KConditionFixture: KConditionMake failed");
561     }
~KConditionFixture()562     ~KConditionFixture()
563     {
564         if (thread != 0)
565         {
566             if (KThreadWait(thread, NULL) != 0)
567                 throw logic_error("~KConditionFixture: KThreadWait failed");
568             if (threadRc != 0)
569                 throw logic_error("~KConditionFixture: thread failed, threadRc != 0");
570             if (KThreadRelease(thread) != 0)
571                 throw logic_error("~KConditionFixture: KThreadRelease failed");
572         }
573         if (KLockRelease((const KLock*)lock) != 0)
574             throw logic_error("~KConditionFixture: KLockRelease failed");
575         if (KConditionRelease(cond) != 0)
576             throw logic_error("~KConditionFixture: KConditionRelease failed");
577     }
578 
579 protected:
580     class Thread {
581     public:
582         // danger - this should be an extern "C" function
583         // with CC calling convention on Windows
KCondition_ThreadFn(const KThread * thread,void * data)584         static rc_t KCondition_ThreadFn ( const KThread *thread, void *data )
585         {
586             KConditionFixture* self = (KConditionFixture*)data;
587 
588             LOG(LogLevel::e_message, "KCondition_ThreadFn: sleeping" << endl);
589             TestEnv::SleepMs(300);
590             LOG(LogLevel::e_message, "KCondition_ThreadFn: signaling condition" << endl);
591             self->is_signaled = true;
592             if (!self->do_broadcast)
593                 self->threadRc = KConditionSignal(self->cond);
594             else
595                 self->threadRc = KConditionBroadcast(self->cond);
596 
597             LOG(LogLevel::e_message, "KCondition_ThreadFn: exiting" << endl);
598             return 0;
599         }
600     };
601 
StartThread()602     rc_t StartThread()
603     {
604         LOG(LogLevel::e_message, "StartThread: starting thread" << endl);
605 
606         threadRc = 0;
607         rc_t rc = KThreadMake(&thread, Thread::KCondition_ThreadFn, this);
608         return rc;
609     }
610 
611 public:
612     rc_t threadRc;
613     KThread* thread;
614     timeout_t tm;
615     KLock* lock;
616     KCondition* cond;
617     bool is_signaled;
618     bool do_broadcast;
619 };
620 
FIXTURE_TEST_CASE(KCondition_TimedWait_Timeout,KConditionFixture)621 FIXTURE_TEST_CASE( KCondition_TimedWait_Timeout, KConditionFixture )
622 {
623     REQUIRE_RC(KLockAcquire(lock));
624     REQUIRE_RC(TimeoutInit(&tm, 100));
625     REQUIRE_RC(KConditionSignal(cond)); // signaling before waiting should not do anything
626     REQUIRE_EQ(KConditionTimedWait(cond, lock, &tm), RC ( rcPS, rcCondition, rcWaiting, rcTimeout, rcExhausted )); // timed out
627 
628     REQUIRE_RC(KLockUnlock(lock));
629 }
630 
FIXTURE_TEST_CASE(KCondition_TimedWait_Signaled,KConditionFixture)631 FIXTURE_TEST_CASE( KCondition_TimedWait_Signaled, KConditionFixture )
632 {
633     is_signaled = false;
634 
635     REQUIRE_RC(KLockAcquire(lock));
636 
637     REQUIRE_RC(StartThread());
638     REQUIRE_RC(KConditionWait(cond, lock));
639     REQUIRE(is_signaled == true);
640 
641     REQUIRE_RC(KLockUnlock(lock));
642 }
643 
FIXTURE_TEST_CASE(KCondition_TimedWait_Signaled_Broadcast,KConditionFixture)644 FIXTURE_TEST_CASE( KCondition_TimedWait_Signaled_Broadcast, KConditionFixture )
645 {
646     is_signaled = false;
647     do_broadcast = true;
648 
649     REQUIRE_RC(KLockAcquire(lock));
650 
651     REQUIRE_RC(StartThread());
652     REQUIRE_RC(KConditionWait(cond, lock));
653     REQUIRE(is_signaled == true);
654 
655     REQUIRE_RC(KLockUnlock(lock));
656 }
657 
658 ///////////////////////// KQueue
TEST_CASE(KQueue_NULL)659 TEST_CASE( KQueue_NULL )
660 {
661     REQUIRE_RC_FAIL(KQueueMake(NULL, 1));
662 }
663 
TEST_CASE(KQueueSimpleTest)664 TEST_CASE(KQueueSimpleTest) {
665     KQueue * queue = NULL;
666     REQUIRE_RC(KQueueMake(&queue, 2));
667 
668     timeout_t tm = { 0 };
669     void *item = NULL;
670     {   // pushed 2 - popped 2 = ok
671         for (uint64_t i = 1; i < 3; ++i) {
672             item = (void*)i;
673             REQUIRE_RC(KQueuePush(queue, item, & tm));
674         }
675         for (uint64_t i = 1; i < 3; ++i) {
676             uint64_t j = 0;
677             REQUIRE_RC(KQueuePop(queue, &item, & tm));
678             j = (uint64_t)item;
679             REQUIRE_EQ(i, j);
680         }
681     }
682 
683     {   // pushed 3 > capacity (failure) - popped 2 (ok)
684         for (uint64_t i = 1; i < 3; ++i) {
685             void *item = (void*)i;
686             REQUIRE_RC(KQueuePush(queue, item, & tm));
687         }
688         REQUIRE_RC_FAIL(KQueuePush(queue, item, & tm));
689         for (uint64_t i = 1; i < 3; ++i) {
690             uint64_t j = 0;
691             void *item = 0;
692             REQUIRE_RC(KQueuePop(queue, &item, & tm));
693             j = (uint64_t)item;
694             REQUIRE_EQ(i, j);
695         }
696     }
697 
698     {   // pushed 2 = capacity (ok) - popped 3 >capacity (failure)
699         for (uint64_t i = 1; i < 3; ++i) {
700             void *item = (void*)i;
701             REQUIRE_RC(KQueuePush(queue, item, & tm));
702         }
703         for (uint64_t i = 1; i < 3; ++i) {
704             uint64_t j = 0;
705             void *item = 0;
706             REQUIRE_RC(KQueuePop(queue, &item, & tm));
707             j = (uint64_t)item;
708             REQUIRE_EQ(i, j);
709         }
710         REQUIRE_RC_FAIL(KQueuePop(queue, &item, & tm));
711     }
712 
713     REQUIRE_RC(KQueueRelease(queue));
714 }
715 
716 class KQueueFixture
717 {
718 public:
KQueueFixture()719     KQueueFixture()
720     :   threadRcs(NULL),
721         threads(NULL),
722         threadsData(NULL),
723         nThreads(32),
724         nStartedThreads(0),
725         sealed(false)
726     {
727         threads = (KThread**)malloc(sizeof(*threads) * nThreads);
728         if (threads == NULL)
729             throw logic_error("KQueueFixture: threads malloc failed");
730         threadsData = (ThreadData*)calloc(nThreads, sizeof(*threadsData));
731         if (threadsData == NULL)
732             throw logic_error("KQueueFixture: threadsData malloc failed");
733         threadRcs = (rc_t*)calloc(nThreads, sizeof(*threadRcs));
734         if (threadRcs == NULL)
735             throw logic_error("KQueueFixture: threadRcs calloc failed");
736         if (KQueueMake(&queue, nThreads) != 0)
737             throw logic_error("KQueueFixture: KQueueMake failed");
738     }
~KQueueFixture()739     ~KQueueFixture()
740     {
741         if (threads != NULL)
742         {
743             for (unsigned i = 0; i < nStartedThreads; ++i)
744             {
745                 if (threads[i] != 0 && KThreadRelease(threads[i]) != 0)
746                     throw logic_error("~KQueueFixture: KThreadRelease failed");
747             }
748         }
749         free(threads);
750         free(threadsData);
751         free(threadRcs);
752 
753         if (KQueueRelease((const KQueue*)queue) != 0)
754             throw logic_error("~KQueueFixture: KQueueRelease failed");
755     }
756 
757 protected:
758     struct ThreadData {
759         KQueue * queue;
760         int tid;
761         size_t max_tid;
762         uint32_t timeout_ms;
763         bool is_reader;
764         bool finish; // will stop generating events and seal the queue once detected
765         bool allow_timeout; // if set, we won't treat timeout as an error
766     };
767 
768     class Thread {
769     public:
770         // danger - this should be an extern "C" function
771         // with CC calling convention on Windows
KQueue_ThreadFn(const KThread * thread,void * data)772         static rc_t KQueue_ThreadFn ( const KThread *thread, void *data )
773         {
774             ThreadData* td = (ThreadData*)data;
775             rc_t rc = 0;
776             int numOps = 8192;
777 
778             for (int i = 0; i < numOps; ++i)
779             {
780                 timeout_t tm = { 0 };
781                 timeout_t* tm_p = &tm;
782                 void * item;
783                 if (tm_p != NULL)
784                     rc = TimeoutInit(tm_p, td->timeout_ms);
785                 if (rc != 0)
786                 {
787                     LOG(LogLevel::e_fatal_error, "KQueue_ThreadFn: TimeoutInit failed\n");
788                     break;
789                 }
790                 if (td->finish)
791                 {
792                     rc = KQueueSeal(td->queue);
793                     if (rc != 0)
794                         LOG(LogLevel::e_fatal_error, "KQueue_ThreadFn: failed to seal queue\n");
795                     break;
796                 }
797                 else if (td->is_reader)
798                 {
799                     rc = KQueuePop(td->queue, &item, tm_p);
800                     if (rc == 0 && (item == NULL || (uint64_t)item > td->max_tid))
801                     {
802                         std::stringstream ss;
803                         ss << "KQueue_ThreadFn: KQueuePop returned invalid item: " << (uint64_t)item << "\n";
804                         LOG(LogLevel::e_fatal_error, ss.str().c_str());
805                         rc = RC(rcExe, rcQueue, rcValidating, rcItem, rcOutofrange);
806                         break;
807                     }
808                 }
809                 else
810                 {
811                     item = reinterpret_cast<void*>(td->tid);
812                     rc = KQueuePush(td->queue, item, tm_p);
813                 }
814 
815                 if (rc != 0)
816                 {
817                     if (td->allow_timeout && GetRCObject ( rc ) == (enum RCObject)rcTimeout)
818                         rc = 0;
819                     else if (GetRCObject ( rc ) == (enum RCObject)rcData && GetRCState ( rc ) == rcDone)
820                         break;
821                     else if (GetRCObject ( rc ) == (enum RCObject)rcQueue && GetRCState ( rc ) == rcReadonly)
822                         break;
823                     else
824                     {
825                         LOGERR ( klogFatal, rc, "KQueue_ThreadFn: failed to push/pop to/from queue" );
826                         break;
827                     }
828                 }
829 
830             }
831 
832             return rc;
833         }
834     };
835 
StartThread(bool is_reader,bool allow_timeout,uint32_t timeout_ms)836     void StartThread(bool is_reader, bool allow_timeout, uint32_t timeout_ms)
837     {
838         if (nStartedThreads >= nThreads)
839             throw logic_error("StartThread: too many threads requested");
840         if (sealed)
841             throw logic_error("StartThread: cannot start new thread, fixture is already sealed");
842 
843         rc_t rc;
844         int tid = nStartedThreads++;
845         ThreadData* td;
846 
847         td = &threadsData[tid];
848         td->tid = tid;
849         td->max_tid = nThreads - 1;
850         td->is_reader = is_reader;
851         td->allow_timeout = allow_timeout;
852         td->queue = queue;
853         td->timeout_ms = timeout_ms;
854         rc = KThreadMake(&threads[tid], Thread::KQueue_ThreadFn, td);
855         if (rc != 0)
856             throw logic_error("StartThread: KThreadMake failed");
857     }
858 
StartThreads(int numReaders,int numWriters,bool allow_timeout=true,uint32_t timeout_ms=0)859     void StartThreads(int numReaders, int numWriters, bool allow_timeout = true, uint32_t timeout_ms = 0)
860     {
861         if (numReaders + numWriters + nStartedThreads > nThreads)
862             throw logic_error("RunThreads: too many threads requested");
863         if (numReaders <= 0)
864             throw logic_error("RunThreads: please specify at least one reader");
865         if (numWriters <= 0)
866             throw logic_error("RunThreads: please specify at least one writer");
867 
868         for (int i = 0; i < numReaders; ++i)
869         {
870             StartThread(true, allow_timeout, timeout_ms);
871         }
872         for (int i = 0; i < numWriters; ++i)
873         {
874             StartThread(false, allow_timeout, timeout_ms);
875         }
876     }
877 
WaitThreads(bool checkRcs=true)878     void WaitThreads(bool checkRcs = true)
879     {
880         rc_t rc = 0;
881         sealed = true;
882         for (unsigned i = 0; i < nStartedThreads; ++i)
883         {
884             rc_t rc2 = KThreadWait(threads[i], &threadRcs[i]);
885             if (rc2 != 0)
886             {
887                 LOG(LogLevel::e_fatal_error, "KThreadWait: KThreadWait failed\n");
888                 if (rc == 0)
889                     rc = rc2;
890             }
891         }
892         if (rc != 0)
893             throw logic_error("WaitThreads: KThreadWait failed");
894         if (checkRcs)
895             CheckThreadsRc();
896 
897     }
CheckThreadsRc()898     void CheckThreadsRc()
899     {
900         for (unsigned i = 0; i < nStartedThreads; ++i)
901         {
902             if (threadRcs[i] != 0)
903                 throw logic_error("CheckThreadsRc: thread returned unexpected exit code");
904         }
905     }
906 
907 public:
908     rc_t* threadRcs;
909     KThread** threads;
910     ThreadData* threadsData;
911     size_t nThreads;
912     size_t nStartedThreads;
913     KQueue* queue;
914     bool sealed;
915 };
916 
FIXTURE_TEST_CASE(KQueue_Single_Reader_Single_Writer,KQueueFixture)917 FIXTURE_TEST_CASE(KQueue_Single_Reader_Single_Writer, KQueueFixture)
918 {
919     StartThreads(1, 1);
920     WaitThreads();
921 }
922 
FIXTURE_TEST_CASE(KQueue_Multi_Reader_Single_Writer,KQueueFixture)923 FIXTURE_TEST_CASE(KQueue_Multi_Reader_Single_Writer, KQueueFixture)
924 {
925     StartThreads(31, 1);
926     WaitThreads();
927 }
928 
FIXTURE_TEST_CASE(KQueue_Single_Reader_Multi_Writer,KQueueFixture)929 FIXTURE_TEST_CASE(KQueue_Single_Reader_Multi_Writer, KQueueFixture)
930 {
931     StartThreads(1, 31);
932     WaitThreads();
933 }
934 
FIXTURE_TEST_CASE(KQueue_Multi_Reader_Multi_Writer,KQueueFixture)935 FIXTURE_TEST_CASE(KQueue_Multi_Reader_Multi_Writer, KQueueFixture)
936 {
937     StartThreads(16, 16, false, 5000);
938     WaitThreads();
939 }
940 
FIXTURE_TEST_CASE(KQueue_Multi_Reader_Single_Writer_Seal,KQueueFixture)941 FIXTURE_TEST_CASE(KQueue_Multi_Reader_Single_Writer_Seal, KQueueFixture)
942 {
943     KTimeMs_t timeBefore = KTimeMsStamp();
944     const int numReaders = 31;
945     const int timeoutMs = 5000;
946     StartThreads(numReaders, 1, false, timeoutMs);
947     threadsData[numReaders].finish = true;
948     WaitThreads(false);
949     KTimeMs_t timeAfter = KTimeMsStamp();
950     for (unsigned i = 0; i < nStartedThreads; ++i)
951     {
952         rc_t expectedRc = (i == numReaders) ? 0 : SILENT_RC ( rcCont, rcQueue, rcRemoving, rcData, rcDone );
953         REQUIRE_EQ ( threadRcs[i], expectedRc );
954     }
955     REQUIRE_LT ( (int)(timeAfter - timeBefore), timeoutMs );
956 }
957 
FIXTURE_TEST_CASE(KQueue_Single_Reader_Multi_Writer_Seal,KQueueFixture)958 FIXTURE_TEST_CASE(KQueue_Single_Reader_Multi_Writer_Seal, KQueueFixture)
959 {
960     KTimeMs_t timeBefore = KTimeMsStamp();
961     const int numWriters = 31;
962     const int timeoutMs = 5000;
963     StartThreads(1, numWriters, false, timeoutMs);
964     threadsData[0].finish = true;
965     WaitThreads(false);
966     KTimeMs_t timeAfter = KTimeMsStamp();
967     for (unsigned i = 0; i < nStartedThreads; ++i)
968     {
969         rc_t expectedRc = (i == 0) ? 0 : SILENT_RC ( rcCont, rcQueue, rcInserting, rcQueue, rcReadonly );
970         REQUIRE_EQ ( threadRcs[i], expectedRc );
971     }
972     REQUIRE_LT ( (int)(timeAfter - timeBefore), timeoutMs );
973 }
974 
975 //TODO: KConditionWait, KConditionTimedWait, KConditionSignal, KConditionBroadcast
976 
977 //TODO: KSemaphore
978 //TODO: Timeout
979 //TODO: KBarrier (is it used anywhere? is there a Windows implementation?)
980 
981 //////////////////////////////////////////// Main
982 extern "C"
983 {
984 
985 #include <kapp/args.h>
986 
KAppVersion(void)987 ver_t CC KAppVersion ( void )
988 {
989     return 0x1000000;
990 }
991 
UsageSummary(const char * progname)992 rc_t CC UsageSummary ( const char * progname )
993 {
994     return 0;
995 }
996 
Usage(const Args * args)997 rc_t CC Usage ( const Args * args )
998 {
999     return 0;
1000 }
1001 
1002 
1003 const char UsageDefaultName[] = "test-kproc";
1004 
argsHandler(int argc,char * argv[])1005 static rc_t argsHandler(int argc, char* argv[]) {
1006     Args* args = NULL;
1007     rc_t rc = ArgsMakeAndHandle(&args, argc, argv, 0, NULL, 0);
1008     ArgsWhack(args);
1009     return rc;
1010 }
1011 
KMain(int argc,char * argv[])1012 rc_t CC KMain ( int argc, char *argv [] )
1013 {
1014 	// this makes messages from the test code appear
1015 	// (same as running the executable with "-l=message")
1016 	//TestEnv::verbosity = LogLevel::e_message;
1017 
1018     rc_t rc = KProcTestSuite( argc, argv );
1019     return rc;
1020 }
1021 
1022 }
1023