1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "prio.h"
7 #include "prprf.h"
8 #include "prlog.h"
9 #include "prmem.h"
10 #include "pratom.h"
11 #include "prlock.h"
12 #include "prmwait.h"
13 #include "prclist.h"
14 #include "prerror.h"
15 #include "prinrval.h"
16 #include "prnetdb.h"
17 #include "prthread.h"
18 
19 #include "plstr.h"
20 #include "plerror.h"
21 #include "plgetopt.h"
22 
23 #include <string.h>
24 
25 typedef struct Shared
26 {
27     const char *title;
28     PRLock *list_lock;
29     PRWaitGroup *group;
30     PRIntervalTime timeout;
31 } Shared;
32 
33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
34 
35 static PRFileDesc *debug = NULL;
36 static PRInt32 desc_allocated = 0;
37 static PRUint16 default_port = 12273;
38 static enum Verbosity verbosity = quiet;
39 static PRInt32 ops_required = 1000, ops_done = 0;
40 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
41 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
42 
43 #if defined(DEBUG)
44 #define MW_ASSERT(_expr) \
45     ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
_MW_Assert(const char * s,const char * file,PRIntn ln)46 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
47 {
48     if (NULL != debug) PL_FPrintError(debug, NULL);
49     PR_Assert(s, file, ln);
50 }  /* _MW_Assert */
51 #else
52 #define MW_ASSERT(_expr)
53 #endif
54 
PrintRecvDesc(PRRecvWait * desc,const char * msg)55 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
56 {
57     const char *tag[] = {
58         "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
59         "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
60     PR_fprintf(
61         debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
62         msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
63 }  /* PrintRecvDesc */
64 
MakeShared(const char * title)65 static Shared *MakeShared(const char *title)
66 {
67     Shared *shared = PR_NEWZAP(Shared);
68     shared->group = PR_CreateWaitGroup(1);
69     shared->timeout = PR_SecondsToInterval(1);
70     shared->list_lock = PR_NewLock();
71     shared->title = title;
72     return shared;
73 }  /* MakeShared */
74 
DestroyShared(Shared * shared)75 static void DestroyShared(Shared *shared)
76 {
77     PRStatus rv;
78     if (verbosity > quiet)
79         PR_fprintf(debug, "%s: destroying group\n", shared->title);
80     rv = PR_DestroyWaitGroup(shared->group);
81     MW_ASSERT(PR_SUCCESS == rv);
82     PR_DestroyLock(shared->list_lock);
83     PR_DELETE(shared);
84 }  /* DestroyShared */
85 
CreateRecvWait(PRFileDesc * fd,PRIntervalTime timeout)86 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
87 {
88     PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
89     MW_ASSERT(NULL != desc_out);
90 
91     MW_ASSERT(NULL != fd);
92     desc_out->fd = fd;
93     desc_out->timeout = timeout;
94     desc_out->buffer.length = 120;
95     desc_out->buffer.start = PR_CALLOC(120);
96 
97     PR_AtomicIncrement(&desc_allocated);
98 
99     if (verbosity > chatty)
100         PrintRecvDesc(desc_out, "Allocated");
101     return desc_out;
102 }  /* CreateRecvWait */
103 
DestroyRecvWait(PRRecvWait * desc_out)104 static void DestroyRecvWait(PRRecvWait *desc_out)
105 {
106     if (verbosity > chatty)
107         PrintRecvDesc(desc_out, "Destroying");
108     PR_Close(desc_out->fd);
109     if (NULL != desc_out->buffer.start)
110         PR_DELETE(desc_out->buffer.start);
111     PR_Free(desc_out);
112     (void)PR_AtomicDecrement(&desc_allocated);
113 }  /* DestroyRecvWait */
114 
CancelGroup(Shared * shared)115 static void CancelGroup(Shared *shared)
116 {
117     PRRecvWait *desc_out;
118 
119     if (verbosity > quiet)
120         PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
121 
122     do
123     {
124         desc_out = PR_CancelWaitGroup(shared->group);
125         if (NULL != desc_out) DestroyRecvWait(desc_out);
126     } while (NULL != desc_out);
127 
128     MW_ASSERT(0 == desc_allocated);
129     MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
130 }  /* CancelGroup */
131 
ClientThread(void * arg)132 static void PR_CALLBACK ClientThread(void* arg)
133 {
134     PRStatus rv;
135     PRInt32 bytes;
136     PRIntn empty_flags = 0;
137     PRNetAddr server_address;
138     unsigned char buffer[100];
139     Shared *shared = (Shared*)arg;
140     PRFileDesc *server = PR_NewTCPSocket();
141     if ((NULL == server)
142     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
143     MW_ASSERT(NULL != server);
144 
145     if (verbosity > chatty)
146         PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
147 
148     /* Initialize the buffer so that Purify won't complain */
149     memset(buffer, 0, sizeof(buffer));
150 
151     rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
152     MW_ASSERT(PR_SUCCESS == rv);
153 
154     if (verbosity > quiet)
155         PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
156     rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
157 
158     if (PR_FAILURE == rv)
159     {
160         if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
161         return;
162     }
163 
164     while (ops_done < ops_required)
165     {
166         bytes = PR_Send(
167             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
168         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
169         MW_ASSERT(sizeof(buffer) == bytes);
170         if (verbosity > chatty)
171             PR_fprintf(
172                 debug, "%s: Client sent %d bytes\n",
173                 shared->title, sizeof(buffer));
174         bytes = PR_Recv(
175             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
176         if (verbosity > chatty)
177             PR_fprintf(
178                 debug, "%s: Client received %d bytes\n",
179                 shared->title, sizeof(buffer));
180         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
181         MW_ASSERT(sizeof(buffer) == bytes);
182         PR_Sleep(shared->timeout);
183     }
184     rv = PR_Close(server);
185     MW_ASSERT(PR_SUCCESS == rv);
186 
187 }  /* ClientThread */
188 
OneInThenCancelled(Shared * shared)189 static void OneInThenCancelled(Shared *shared)
190 {
191     PRStatus rv;
192     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
193 
194     shared->timeout = PR_INTERVAL_NO_TIMEOUT;
195 
196     desc_in->fd = PR_NewTCPSocket();
197     desc_in->timeout = shared->timeout;
198 
199     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
200 
201     rv = PR_AddWaitFileDesc(shared->group, desc_in);
202     MW_ASSERT(PR_SUCCESS == rv);
203 
204     if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
205     rv = PR_CancelWaitFileDesc(shared->group, desc_in);
206     MW_ASSERT(PR_SUCCESS == rv);
207 
208     desc_out = PR_WaitRecvReady(shared->group);
209     MW_ASSERT(desc_out == desc_in);
210     MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
211     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
212     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
213 
214     rv = PR_Close(desc_in->fd);
215     MW_ASSERT(PR_SUCCESS == rv);
216 
217     if (verbosity > quiet)
218         PR_fprintf(debug, "%s: destroying group\n", shared->title);
219 
220     PR_DELETE(desc_in);
221 }  /* OneInThenCancelled */
222 
OneOpOneThread(Shared * shared)223 static void OneOpOneThread(Shared *shared)
224 {
225     PRStatus rv;
226     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
227 
228     desc_in->fd = PR_NewTCPSocket();
229     desc_in->timeout = shared->timeout;
230 
231     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
232 
233     rv = PR_AddWaitFileDesc(shared->group, desc_in);
234     MW_ASSERT(PR_SUCCESS == rv);
235     desc_out = PR_WaitRecvReady(shared->group);
236     MW_ASSERT(desc_out == desc_in);
237     MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
238     MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
239     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
240 
241     rv = PR_Close(desc_in->fd);
242     MW_ASSERT(PR_SUCCESS == rv);
243 
244     PR_DELETE(desc_in);
245 }  /* OneOpOneThread */
246 
ManyOpOneThread(Shared * shared)247 static void ManyOpOneThread(Shared *shared)
248 {
249     PRStatus rv;
250     PRIntn index;
251     PRRecvWait *desc_in;
252     PRRecvWait *desc_out;
253 
254     if (verbosity > quiet)
255         PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
256 
257     for (index = 0; index < wait_objects; ++index)
258     {
259         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
260 
261         rv = PR_AddWaitFileDesc(shared->group, desc_in);
262         MW_ASSERT(PR_SUCCESS == rv);
263     }
264 
265     while (ops_done < ops_required)
266     {
267         desc_out = PR_WaitRecvReady(shared->group);
268         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
269         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
270         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
271         rv = PR_AddWaitFileDesc(shared->group, desc_out);
272         MW_ASSERT(PR_SUCCESS == rv);
273         (void)PR_AtomicIncrement(&ops_done);
274     }
275 
276     CancelGroup(shared);
277 }  /* ManyOpOneThread */
278 
SomeOpsThread(void * arg)279 static void PR_CALLBACK SomeOpsThread(void *arg)
280 {
281     PRRecvWait *desc_out;
282     PRStatus rv = PR_SUCCESS;
283     Shared *shared = (Shared*)arg;
284     do  /* until interrupted */
285     {
286         desc_out = PR_WaitRecvReady(shared->group);
287         if (NULL == desc_out)
288         {
289             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
290             if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
291             break;
292         }
293         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
294         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
295         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
296 
297         if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
298         desc_out->timeout = shared->timeout;
299         rv = PR_AddWaitFileDesc(shared->group, desc_out);
300         PR_AtomicIncrement(&ops_done);
301         if (ops_done > ops_required) break;
302     } while (PR_SUCCESS == rv);
303     MW_ASSERT(PR_SUCCESS == rv);
304 }  /* SomeOpsThread */
305 
SomeOpsSomeThreads(Shared * shared)306 static void SomeOpsSomeThreads(Shared *shared)
307 {
308     PRStatus rv;
309     PRThread **thread;
310     PRIntn index;
311     PRRecvWait *desc_in;
312 
313     thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
314 
315     /* Create some threads */
316 
317     if (verbosity > quiet)
318         PR_fprintf(debug, "%s: creating threads\n", shared->title);
319     for (index = 0; index < worker_threads; ++index)
320     {
321         thread[index] = PR_CreateThread(
322             PR_USER_THREAD, SomeOpsThread, shared,
323             PR_PRIORITY_HIGH, thread_scope,
324             PR_JOINABLE_THREAD, 16 * 1024);
325     }
326 
327     /* then create some operations */
328     if (verbosity > quiet)
329         PR_fprintf(debug, "%s: creating desc\n", shared->title);
330     for (index = 0; index < wait_objects; ++index)
331     {
332         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
333         rv = PR_AddWaitFileDesc(shared->group, desc_in);
334         MW_ASSERT(PR_SUCCESS == rv);
335     }
336 
337     if (verbosity > quiet)
338         PR_fprintf(debug, "%s: sleeping\n", shared->title);
339     while (ops_done < ops_required) PR_Sleep(shared->timeout);
340 
341     if (verbosity > quiet)
342         PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
343     for (index = 0; index < worker_threads; ++index)
344     {
345         rv = PR_Interrupt(thread[index]);
346         MW_ASSERT(PR_SUCCESS == rv);
347         rv = PR_JoinThread(thread[index]);
348         MW_ASSERT(PR_SUCCESS == rv);
349     }
350     PR_DELETE(thread);
351 
352     CancelGroup(shared);
353 }  /* SomeOpsSomeThreads */
354 
ServiceRequest(Shared * shared,PRRecvWait * desc)355 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
356 {
357     PRInt32 bytes_out;
358 
359     if (verbosity > chatty)
360         PR_fprintf(
361             debug, "%s: Service received %d bytes\n",
362             shared->title, desc->bytesRecv);
363 
364     if (0 == desc->bytesRecv) goto quitting;
365     if ((-1 == desc->bytesRecv)
366     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
367 
368     bytes_out = PR_Send(
369         desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
370     if (verbosity > chatty)
371         PR_fprintf(
372             debug, "%s: Service sent %d bytes\n",
373             shared->title, bytes_out);
374 
375     if ((-1 == bytes_out)
376     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
377     MW_ASSERT(bytes_out == desc->bytesRecv);
378 
379     return PR_SUCCESS;
380 
381 aborted:
382 quitting:
383     return PR_FAILURE;
384 }  /* ServiceRequest */
385 
ServiceThread(void * arg)386 static void PR_CALLBACK ServiceThread(void *arg)
387 {
388     PRStatus rv = PR_SUCCESS;
389     PRRecvWait *desc_out = NULL;
390     Shared *shared = (Shared*)arg;
391     do  /* until interrupted */
392     {
393         if (NULL != desc_out)
394         {
395             desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
396             if (verbosity > chatty)
397                 PrintRecvDesc(desc_out, "Service re-adding");
398             rv = PR_AddWaitFileDesc(shared->group, desc_out);
399             MW_ASSERT(PR_SUCCESS == rv);
400         }
401 
402         desc_out = PR_WaitRecvReady(shared->group);
403         if (NULL == desc_out)
404         {
405             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
406             break;
407         }
408 
409         switch (desc_out->outcome)
410         {
411             case PR_MW_SUCCESS:
412             {
413                 PR_AtomicIncrement(&ops_done);
414                 if (verbosity > chatty)
415                     PrintRecvDesc(desc_out, "Service ready");
416                 rv = ServiceRequest(shared, desc_out);
417                 break;
418             }
419             case PR_MW_INTERRUPT:
420                 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
421                 rv = PR_FAILURE;  /* if interrupted, then exit */
422                 break;
423             case PR_MW_TIMEOUT:
424                 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
425             case PR_MW_FAILURE:
426                 if (verbosity > silent)
427                     PL_FPrintError(debug, "RecvReady failure");
428                 break;
429             default:
430                 break;
431         }
432     } while (PR_SUCCESS == rv);
433 
434     if (NULL != desc_out) DestroyRecvWait(desc_out);
435 
436 }  /* ServiceThread */
437 
EnumerationThread(void * arg)438 static void PR_CALLBACK EnumerationThread(void *arg)
439 {
440     PRStatus rv;
441     PRIntn count;
442     PRRecvWait *desc;
443     Shared *shared = (Shared*)arg;
444     PRIntervalTime five_seconds = PR_SecondsToInterval(5);
445     PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
446     MW_ASSERT(NULL != enumerator);
447 
448     while (PR_SUCCESS == PR_Sleep(five_seconds))
449     {
450         count = 0;
451         desc = NULL;
452         while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
453         {
454             if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
455             count += 1;
456         }
457         if (verbosity > silent)
458             PR_fprintf(debug,
459                 "%s Enumerated %d objects\n", shared->title, count);
460     }
461 
462     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
463 
464 
465     rv = PR_DestroyMWaitEnumerator(enumerator);
466     MW_ASSERT(PR_SUCCESS == rv);
467 }  /* EnumerationThread */
468 
ServerThread(void * arg)469 static void PR_CALLBACK ServerThread(void *arg)
470 {
471     PRStatus rv;
472     PRIntn index;
473     PRRecvWait *desc_in;
474     PRThread **worker_thread;
475     Shared *shared = (Shared*)arg;
476     PRFileDesc *listener, *service;
477     PRNetAddr server_address, client_address;
478 
479     worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
480     if (verbosity > quiet)
481         PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
482     for (index = 0; index < worker_threads; ++index)
483     {
484         worker_thread[index] = PR_CreateThread(
485             PR_USER_THREAD, ServiceThread, shared,
486             PR_PRIORITY_HIGH, thread_scope,
487             PR_JOINABLE_THREAD, 16 * 1024);
488     }
489 
490     rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
491     MW_ASSERT(PR_SUCCESS == rv);
492 
493     listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
494     if (verbosity > chatty)
495         PR_fprintf(
496             debug, "%s: Server listener socket @0x%x\n",
497             shared->title, listener);
498     rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
499     rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
500     while (ops_done < ops_required)
501     {
502         if (verbosity > quiet)
503             PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
504         service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
505         if (NULL == service)
506         {
507             if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
508             PL_PrintError("Accept failed");
509             MW_ASSERT(PR_FALSE && "Accept failed");
510         }
511         else
512         {
513             desc_in = CreateRecvWait(service, shared->timeout);
514             desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
515             if (verbosity > chatty)
516                 PrintRecvDesc(desc_in, "Service adding");
517             rv = PR_AddWaitFileDesc(shared->group, desc_in);
518             MW_ASSERT(PR_SUCCESS == rv);
519         }
520     }
521 
522     if (verbosity > quiet)
523         PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
524     for (index = 0; index < worker_threads; ++index)
525     {
526         rv = PR_Interrupt(worker_thread[index]);
527         MW_ASSERT(PR_SUCCESS == rv);
528         rv = PR_JoinThread(worker_thread[index]);
529         MW_ASSERT(PR_SUCCESS == rv);
530     }
531     PR_DELETE(worker_thread);
532 
533     PR_Close(listener);
534 
535     CancelGroup(shared);
536 
537 }  /* ServerThread */
538 
RealOneGroupIO(Shared * shared)539 static void RealOneGroupIO(Shared *shared)
540 {
541     /*
542     ** Create a server that listens for connections and then services
543     ** requests that come in over those connections. The server never
544     ** deletes a connection and assumes a basic RPC model of operation.
545     **
546     ** Use worker_threads threads to service how every many open ports
547     ** there might be.
548     **
549     ** Oh, ya. Almost forget. Create (some) clients as well.
550     */
551     PRStatus rv;
552     PRIntn index;
553     PRThread *server_thread, *enumeration_thread, **client_thread;
554 
555     if (verbosity > quiet)
556         PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
557 
558     server_thread = PR_CreateThread(
559         PR_USER_THREAD, ServerThread, shared,
560         PR_PRIORITY_HIGH, thread_scope,
561         PR_JOINABLE_THREAD, 16 * 1024);
562 
563     if (verbosity > quiet)
564         PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
565 
566     enumeration_thread = PR_CreateThread(
567         PR_USER_THREAD, EnumerationThread, shared,
568         PR_PRIORITY_HIGH, thread_scope,
569         PR_JOINABLE_THREAD, 16 * 1024);
570 
571     if (verbosity > quiet)
572         PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
573     PR_Sleep(5 * shared->timeout);
574 
575     if (verbosity > quiet)
576         PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
577     client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
578     for (index = 0; index < client_threads; ++index)
579     {
580         client_thread[index] = PR_CreateThread(
581             PR_USER_THREAD, ClientThread, shared,
582             PR_PRIORITY_NORMAL, thread_scope,
583             PR_JOINABLE_THREAD, 16 * 1024);
584     }
585 
586     while (ops_done < ops_required) PR_Sleep(shared->timeout);
587 
588     if (verbosity > quiet)
589         PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
590     for (index = 0; index < client_threads; ++index)
591     {
592         rv = PR_Interrupt(client_thread[index]);
593         MW_ASSERT(PR_SUCCESS == rv);
594         rv = PR_JoinThread(client_thread[index]);
595         MW_ASSERT(PR_SUCCESS == rv);
596     }
597     PR_DELETE(client_thread);
598 
599     if (verbosity > quiet)
600         PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
601     rv = PR_Interrupt(enumeration_thread);
602     MW_ASSERT(PR_SUCCESS == rv);
603     rv = PR_JoinThread(enumeration_thread);
604     MW_ASSERT(PR_SUCCESS == rv);
605 
606     if (verbosity > quiet)
607         PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
608     rv = PR_Interrupt(server_thread);
609     MW_ASSERT(PR_SUCCESS == rv);
610     rv = PR_JoinThread(server_thread);
611     MW_ASSERT(PR_SUCCESS == rv);
612 }  /* RealOneGroupIO */
613 
RunThisOne(void (* func)(Shared *),const char * name,const char * test_name)614 static void RunThisOne(
615     void (*func)(Shared*), const char *name, const char *test_name)
616 {
617     Shared *shared;
618     if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
619     {
620         if (verbosity > silent)
621             PR_fprintf(debug, "%s()\n", name);
622         shared = MakeShared(name);
623         ops_done = 0;
624         func(shared);  /* run the test */
625         MW_ASSERT(0 == desc_allocated);
626         DestroyShared(shared);
627     }
628 }  /* RunThisOne */
629 
ChangeVerbosity(Verbosity verbosity,PRIntn delta)630 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
631 {
632     PRIntn verbage = (PRIntn)verbosity;
633     return (Verbosity)(verbage += delta);
634 }  /* ChangeVerbosity */
635 
main(int argc,char ** argv)636 int main(int argc, char **argv)
637 {
638     PLOptStatus os;
639     const char *test_name = NULL;
640     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
641 
642     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
643     {
644         if (PL_OPT_BAD == os) continue;
645         switch (opt->option)
646         {
647         case 0:
648             test_name = opt->value;
649             break;
650         case 'd':  /* debug mode */
651             if (verbosity < noisy)
652                 verbosity = ChangeVerbosity(verbosity, 1);
653             break;
654         case 'q':  /* debug mode */
655             if (verbosity > silent)
656                 verbosity = ChangeVerbosity(verbosity, -1);
657             break;
658         case 'G':  /* use global threads */
659             thread_scope = PR_GLOBAL_THREAD;
660             break;
661         case 'c':  /* number of client threads */
662             client_threads = atoi(opt->value);
663             break;
664         case 'o':  /* operations to compelete */
665             ops_required = atoi(opt->value);
666             break;
667         case 'p':  /* default port */
668             default_port = atoi(opt->value);
669             break;
670         case 't':  /* number of threads waiting */
671             worker_threads = atoi(opt->value);
672             break;
673         case 'w':  /* number of wait objects */
674             wait_objects = atoi(opt->value);
675             break;
676         default:
677             break;
678         }
679     }
680     PL_DestroyOptState(opt);
681 
682     if (verbosity > 0)
683         debug = PR_GetSpecialFD(PR_StandardError);
684 
685     RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
686     RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
687     RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
688     RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
689     RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
690     return 0;
691 }  /* main */
692 
693 /* multwait.c */
694