1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "prio.h"
7 #include "prprf.h"
8 #include "prlog.h"
9 #include "prmem.h"
10 #include "pratom.h"
11 #include "prlock.h"
12 #include "prmwait.h"
13 #include "prclist.h"
14 #include "prerror.h"
15 #include "prinrval.h"
16 #include "prnetdb.h"
17 #include "prthread.h"
18 
19 #include "plstr.h"
20 #include "plerror.h"
21 #include "plgetopt.h"
22 
23 #include <string.h>
24 
25 typedef struct Shared
26 {
27     const char *title;
28     PRLock *list_lock;
29     PRWaitGroup *group;
30     PRIntervalTime timeout;
31 } Shared;
32 
33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
34 
35 #ifdef DEBUG
36 #define PORT_INC_DO +100
37 #else
38 #define PORT_INC_DO
39 #endif
40 #ifdef IS_64
41 #define PORT_INC_3264 +200
42 #else
43 #define PORT_INC_3264
44 #endif
45 
46 static PRFileDesc *debug = NULL;
47 static PRInt32 desc_allocated = 0;
48 static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264;
49 static enum Verbosity verbosity = quiet;
50 static PRInt32 ops_required = 1000, ops_done = 0;
51 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
52 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
53 
54 #if defined(DEBUG)
55 #define MW_ASSERT(_expr) \
56     ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
_MW_Assert(const char * s,const char * file,PRIntn ln)57 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
58 {
59     if (NULL != debug) {
60         PL_FPrintError(debug, NULL);
61     }
62     PR_Assert(s, file, ln);
63 }  /* _MW_Assert */
64 #else
65 #define MW_ASSERT(_expr)
66 #endif
67 
PrintRecvDesc(PRRecvWait * desc,const char * msg)68 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
69 {
70     const char *tag[] = {
71         "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
72         "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"
73     };
74     PR_fprintf(
75         debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
76         msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
77 }  /* PrintRecvDesc */
78 
MakeShared(const char * title)79 static Shared *MakeShared(const char *title)
80 {
81     Shared *shared = PR_NEWZAP(Shared);
82     shared->group = PR_CreateWaitGroup(1);
83     shared->timeout = PR_SecondsToInterval(1);
84     shared->list_lock = PR_NewLock();
85     shared->title = title;
86     return shared;
87 }  /* MakeShared */
88 
DestroyShared(Shared * shared)89 static void DestroyShared(Shared *shared)
90 {
91     PRStatus rv;
92     if (verbosity > quiet) {
93         PR_fprintf(debug, "%s: destroying group\n", shared->title);
94     }
95     rv = PR_DestroyWaitGroup(shared->group);
96     MW_ASSERT(PR_SUCCESS == rv);
97     PR_DestroyLock(shared->list_lock);
98     PR_DELETE(shared);
99 }  /* DestroyShared */
100 
CreateRecvWait(PRFileDesc * fd,PRIntervalTime timeout)101 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
102 {
103     PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
104     MW_ASSERT(NULL != desc_out);
105 
106     MW_ASSERT(NULL != fd);
107     desc_out->fd = fd;
108     desc_out->timeout = timeout;
109     desc_out->buffer.length = 120;
110     desc_out->buffer.start = PR_CALLOC(120);
111 
112     PR_AtomicIncrement(&desc_allocated);
113 
114     if (verbosity > chatty) {
115         PrintRecvDesc(desc_out, "Allocated");
116     }
117     return desc_out;
118 }  /* CreateRecvWait */
119 
DestroyRecvWait(PRRecvWait * desc_out)120 static void DestroyRecvWait(PRRecvWait *desc_out)
121 {
122     if (verbosity > chatty) {
123         PrintRecvDesc(desc_out, "Destroying");
124     }
125     PR_Close(desc_out->fd);
126     if (NULL != desc_out->buffer.start) {
127         PR_DELETE(desc_out->buffer.start);
128     }
129     PR_Free(desc_out);
130     (void)PR_AtomicDecrement(&desc_allocated);
131 }  /* DestroyRecvWait */
132 
CancelGroup(Shared * shared)133 static void CancelGroup(Shared *shared)
134 {
135     PRRecvWait *desc_out;
136 
137     if (verbosity > quiet) {
138         PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
139     }
140 
141     do
142     {
143         desc_out = PR_CancelWaitGroup(shared->group);
144         if (NULL != desc_out) {
145             DestroyRecvWait(desc_out);
146         }
147     } while (NULL != desc_out);
148 
149     MW_ASSERT(0 == desc_allocated);
150     MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
151 }  /* CancelGroup */
152 
ClientThread(void * arg)153 static void PR_CALLBACK ClientThread(void* arg)
154 {
155     PRStatus rv;
156     PRInt32 bytes;
157     PRIntn empty_flags = 0;
158     PRNetAddr server_address;
159     unsigned char buffer[100];
160     Shared *shared = (Shared*)arg;
161     PRFileDesc *server = PR_NewTCPSocket();
162     if ((NULL == server)
163         && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
164         return;
165     }
166     MW_ASSERT(NULL != server);
167 
168     if (verbosity > chatty) {
169         PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
170     }
171 
172     /* Initialize the buffer so that Purify won't complain */
173     memset(buffer, 0, sizeof(buffer));
174 
175     rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
176     MW_ASSERT(PR_SUCCESS == rv);
177 
178     if (verbosity > quiet) {
179         PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
180     }
181     rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
182 
183     if (PR_FAILURE == rv)
184     {
185         if (verbosity > silent) {
186             PL_FPrintError(debug, "Client connect failed");
187         }
188         return;
189     }
190 
191     while (ops_done < ops_required)
192     {
193         bytes = PR_Send(
194                     server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
195         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
196             break;
197         }
198         MW_ASSERT(sizeof(buffer) == bytes);
199         if (verbosity > chatty)
200             PR_fprintf(
201                 debug, "%s: Client sent %d bytes\n",
202                 shared->title, sizeof(buffer));
203         bytes = PR_Recv(
204                     server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
205         if (verbosity > chatty)
206             PR_fprintf(
207                 debug, "%s: Client received %d bytes\n",
208                 shared->title, sizeof(buffer));
209         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
210             break;
211         }
212         MW_ASSERT(sizeof(buffer) == bytes);
213         PR_Sleep(shared->timeout);
214     }
215     rv = PR_Close(server);
216     MW_ASSERT(PR_SUCCESS == rv);
217 
218 }  /* ClientThread */
219 
OneInThenCancelled(Shared * shared)220 static void OneInThenCancelled(Shared *shared)
221 {
222     PRStatus rv;
223     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
224 
225     shared->timeout = PR_INTERVAL_NO_TIMEOUT;
226 
227     desc_in->fd = PR_NewTCPSocket();
228     desc_in->timeout = shared->timeout;
229 
230     if (verbosity > chatty) {
231         PrintRecvDesc(desc_in, "Adding desc");
232     }
233 
234     rv = PR_AddWaitFileDesc(shared->group, desc_in);
235     MW_ASSERT(PR_SUCCESS == rv);
236 
237     if (verbosity > chatty) {
238         PrintRecvDesc(desc_in, "Cancelling");
239     }
240     rv = PR_CancelWaitFileDesc(shared->group, desc_in);
241     MW_ASSERT(PR_SUCCESS == rv);
242 
243     desc_out = PR_WaitRecvReady(shared->group);
244     MW_ASSERT(desc_out == desc_in);
245     MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
246     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
247     if (verbosity > chatty) {
248         PrintRecvDesc(desc_out, "Ready");
249     }
250 
251     rv = PR_Close(desc_in->fd);
252     MW_ASSERT(PR_SUCCESS == rv);
253 
254     if (verbosity > quiet) {
255         PR_fprintf(debug, "%s: destroying group\n", shared->title);
256     }
257 
258     PR_DELETE(desc_in);
259 }  /* OneInThenCancelled */
260 
OneOpOneThread(Shared * shared)261 static void OneOpOneThread(Shared *shared)
262 {
263     PRStatus rv;
264     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
265 
266     desc_in->fd = PR_NewTCPSocket();
267     desc_in->timeout = shared->timeout;
268 
269     if (verbosity > chatty) {
270         PrintRecvDesc(desc_in, "Adding desc");
271     }
272 
273     rv = PR_AddWaitFileDesc(shared->group, desc_in);
274     MW_ASSERT(PR_SUCCESS == rv);
275     desc_out = PR_WaitRecvReady(shared->group);
276     MW_ASSERT(desc_out == desc_in);
277     MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
278     MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
279     if (verbosity > chatty) {
280         PrintRecvDesc(desc_out, "Ready");
281     }
282 
283     rv = PR_Close(desc_in->fd);
284     MW_ASSERT(PR_SUCCESS == rv);
285 
286     PR_DELETE(desc_in);
287 }  /* OneOpOneThread */
288 
ManyOpOneThread(Shared * shared)289 static void ManyOpOneThread(Shared *shared)
290 {
291     PRStatus rv;
292     PRIntn index;
293     PRRecvWait *desc_in;
294     PRRecvWait *desc_out;
295 
296     if (verbosity > quiet) {
297         PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
298     }
299 
300     for (index = 0; index < wait_objects; ++index)
301     {
302         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
303 
304         rv = PR_AddWaitFileDesc(shared->group, desc_in);
305         MW_ASSERT(PR_SUCCESS == rv);
306     }
307 
308     while (ops_done < ops_required)
309     {
310         desc_out = PR_WaitRecvReady(shared->group);
311         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
312         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
313         if (verbosity > chatty) {
314             PrintRecvDesc(desc_out, "Ready/readding");
315         }
316         rv = PR_AddWaitFileDesc(shared->group, desc_out);
317         MW_ASSERT(PR_SUCCESS == rv);
318         (void)PR_AtomicIncrement(&ops_done);
319     }
320 
321     CancelGroup(shared);
322 }  /* ManyOpOneThread */
323 
SomeOpsThread(void * arg)324 static void PR_CALLBACK SomeOpsThread(void *arg)
325 {
326     PRRecvWait *desc_out;
327     PRStatus rv = PR_SUCCESS;
328     Shared *shared = (Shared*)arg;
329     do  /* until interrupted */
330     {
331         desc_out = PR_WaitRecvReady(shared->group);
332         if (NULL == desc_out)
333         {
334             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
335             if (verbosity > quiet) {
336                 PR_fprintf(debug, "Aborted\n");
337             }
338             break;
339         }
340         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
341         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
342         if (verbosity > chatty) {
343             PrintRecvDesc(desc_out, "Ready");
344         }
345 
346         if (verbosity > chatty) {
347             PrintRecvDesc(desc_out, "Re-Adding");
348         }
349         desc_out->timeout = shared->timeout;
350         rv = PR_AddWaitFileDesc(shared->group, desc_out);
351         PR_AtomicIncrement(&ops_done);
352         if (ops_done > ops_required) {
353             break;
354         }
355     } while (PR_SUCCESS == rv);
356     MW_ASSERT(PR_SUCCESS == rv);
357 }  /* SomeOpsThread */
358 
SomeOpsSomeThreads(Shared * shared)359 static void SomeOpsSomeThreads(Shared *shared)
360 {
361     PRStatus rv;
362     PRThread **thread;
363     PRIntn index;
364     PRRecvWait *desc_in;
365 
366     thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
367 
368     /* Create some threads */
369 
370     if (verbosity > quiet) {
371         PR_fprintf(debug, "%s: creating threads\n", shared->title);
372     }
373     for (index = 0; index < worker_threads; ++index)
374     {
375         thread[index] = PR_CreateThread(
376                             PR_USER_THREAD, SomeOpsThread, shared,
377                             PR_PRIORITY_HIGH, thread_scope,
378                             PR_JOINABLE_THREAD, 16 * 1024);
379     }
380 
381     /* then create some operations */
382     if (verbosity > quiet) {
383         PR_fprintf(debug, "%s: creating desc\n", shared->title);
384     }
385     for (index = 0; index < wait_objects; ++index)
386     {
387         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
388         rv = PR_AddWaitFileDesc(shared->group, desc_in);
389         MW_ASSERT(PR_SUCCESS == rv);
390     }
391 
392     if (verbosity > quiet) {
393         PR_fprintf(debug, "%s: sleeping\n", shared->title);
394     }
395     while (ops_done < ops_required) {
396         PR_Sleep(shared->timeout);
397     }
398 
399     if (verbosity > quiet) {
400         PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
401     }
402     for (index = 0; index < worker_threads; ++index)
403     {
404         rv = PR_Interrupt(thread[index]);
405         MW_ASSERT(PR_SUCCESS == rv);
406         rv = PR_JoinThread(thread[index]);
407         MW_ASSERT(PR_SUCCESS == rv);
408     }
409     PR_DELETE(thread);
410 
411     CancelGroup(shared);
412 }  /* SomeOpsSomeThreads */
413 
ServiceRequest(Shared * shared,PRRecvWait * desc)414 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
415 {
416     PRInt32 bytes_out;
417 
418     if (verbosity > chatty)
419         PR_fprintf(
420             debug, "%s: Service received %d bytes\n",
421             shared->title, desc->bytesRecv);
422 
423     if (0 == desc->bytesRecv) {
424         goto quitting;
425     }
426     if ((-1 == desc->bytesRecv)
427         && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
428         goto aborted;
429     }
430 
431     bytes_out = PR_Send(
432                     desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
433     if (verbosity > chatty)
434         PR_fprintf(
435             debug, "%s: Service sent %d bytes\n",
436             shared->title, bytes_out);
437 
438     if ((-1 == bytes_out)
439         && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
440         goto aborted;
441     }
442     MW_ASSERT(bytes_out == desc->bytesRecv);
443 
444     return PR_SUCCESS;
445 
446 aborted:
447 quitting:
448     return PR_FAILURE;
449 }  /* ServiceRequest */
450 
ServiceThread(void * arg)451 static void PR_CALLBACK ServiceThread(void *arg)
452 {
453     PRStatus rv = PR_SUCCESS;
454     PRRecvWait *desc_out = NULL;
455     Shared *shared = (Shared*)arg;
456     do  /* until interrupted */
457     {
458         if (NULL != desc_out)
459         {
460             desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
461             if (verbosity > chatty) {
462                 PrintRecvDesc(desc_out, "Service re-adding");
463             }
464             rv = PR_AddWaitFileDesc(shared->group, desc_out);
465             MW_ASSERT(PR_SUCCESS == rv);
466         }
467 
468         desc_out = PR_WaitRecvReady(shared->group);
469         if (NULL == desc_out)
470         {
471             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
472             break;
473         }
474 
475         switch (desc_out->outcome)
476         {
477             case PR_MW_SUCCESS:
478             {
479                 PR_AtomicIncrement(&ops_done);
480                 if (verbosity > chatty) {
481                     PrintRecvDesc(desc_out, "Service ready");
482                 }
483                 rv = ServiceRequest(shared, desc_out);
484                 break;
485             }
486             case PR_MW_INTERRUPT:
487                 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
488                 rv = PR_FAILURE;  /* if interrupted, then exit */
489                 break;
490             case PR_MW_TIMEOUT:
491                 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
492             case PR_MW_FAILURE:
493                 if (verbosity > silent) {
494                     PL_FPrintError(debug, "RecvReady failure");
495                 }
496                 break;
497             default:
498                 break;
499         }
500     } while (PR_SUCCESS == rv);
501 
502     if (NULL != desc_out) {
503         DestroyRecvWait(desc_out);
504     }
505 
506 }  /* ServiceThread */
507 
EnumerationThread(void * arg)508 static void PR_CALLBACK EnumerationThread(void *arg)
509 {
510     PRStatus rv;
511     PRIntn count;
512     PRRecvWait *desc;
513     Shared *shared = (Shared*)arg;
514     PRIntervalTime five_seconds = PR_SecondsToInterval(5);
515     PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
516     MW_ASSERT(NULL != enumerator);
517 
518     while (PR_SUCCESS == PR_Sleep(five_seconds))
519     {
520         count = 0;
521         desc = NULL;
522         while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
523         {
524             if (verbosity > chatty) {
525                 PrintRecvDesc(desc, shared->title);
526             }
527             count += 1;
528         }
529         if (verbosity > silent)
530             PR_fprintf(debug,
531                        "%s Enumerated %d objects\n", shared->title, count);
532     }
533 
534     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
535 
536 
537     rv = PR_DestroyMWaitEnumerator(enumerator);
538     MW_ASSERT(PR_SUCCESS == rv);
539 }  /* EnumerationThread */
540 
ServerThread(void * arg)541 static void PR_CALLBACK ServerThread(void *arg)
542 {
543     PRStatus rv;
544     PRIntn index;
545     PRRecvWait *desc_in;
546     PRThread **worker_thread;
547     Shared *shared = (Shared*)arg;
548     PRFileDesc *listener, *service;
549     PRNetAddr server_address, client_address;
550 
551     worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
552     if (verbosity > quiet) {
553         PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
554     }
555     for (index = 0; index < worker_threads; ++index)
556     {
557         worker_thread[index] = PR_CreateThread(
558                                    PR_USER_THREAD, ServiceThread, shared,
559                                    PR_PRIORITY_HIGH, thread_scope,
560                                    PR_JOINABLE_THREAD, 16 * 1024);
561     }
562 
563     rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
564     MW_ASSERT(PR_SUCCESS == rv);
565 
566     listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
567     if (verbosity > chatty)
568         PR_fprintf(
569             debug, "%s: Server listener socket @0x%x\n",
570             shared->title, listener);
571     rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
572     rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
573     while (ops_done < ops_required)
574     {
575         if (verbosity > quiet) {
576             PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
577         }
578         service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
579         if (NULL == service)
580         {
581             if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) {
582                 break;
583             }
584             PL_PrintError("Accept failed");
585             MW_ASSERT(PR_FALSE && "Accept failed");
586         }
587         else
588         {
589             desc_in = CreateRecvWait(service, shared->timeout);
590             desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
591             if (verbosity > chatty) {
592                 PrintRecvDesc(desc_in, "Service adding");
593             }
594             rv = PR_AddWaitFileDesc(shared->group, desc_in);
595             MW_ASSERT(PR_SUCCESS == rv);
596         }
597     }
598 
599     if (verbosity > quiet) {
600         PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
601     }
602     for (index = 0; index < worker_threads; ++index)
603     {
604         rv = PR_Interrupt(worker_thread[index]);
605         MW_ASSERT(PR_SUCCESS == rv);
606         rv = PR_JoinThread(worker_thread[index]);
607         MW_ASSERT(PR_SUCCESS == rv);
608     }
609     PR_DELETE(worker_thread);
610 
611     PR_Close(listener);
612 
613     CancelGroup(shared);
614 
615 }  /* ServerThread */
616 
RealOneGroupIO(Shared * shared)617 static void RealOneGroupIO(Shared *shared)
618 {
619     /*
620     ** Create a server that listens for connections and then services
621     ** requests that come in over those connections. The server never
622     ** deletes a connection and assumes a basic RPC model of operation.
623     **
624     ** Use worker_threads threads to service how every many open ports
625     ** there might be.
626     **
627     ** Oh, ya. Almost forget. Create (some) clients as well.
628     */
629     PRStatus rv;
630     PRIntn index;
631     PRThread *server_thread, *enumeration_thread, **client_thread;
632 
633     if (verbosity > quiet) {
634         PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
635     }
636 
637     server_thread = PR_CreateThread(
638                         PR_USER_THREAD, ServerThread, shared,
639                         PR_PRIORITY_HIGH, thread_scope,
640                         PR_JOINABLE_THREAD, 16 * 1024);
641 
642     if (verbosity > quiet) {
643         PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
644     }
645 
646     enumeration_thread = PR_CreateThread(
647                              PR_USER_THREAD, EnumerationThread, shared,
648                              PR_PRIORITY_HIGH, thread_scope,
649                              PR_JOINABLE_THREAD, 16 * 1024);
650 
651     if (verbosity > quiet) {
652         PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
653     }
654     PR_Sleep(5 * shared->timeout);
655 
656     if (verbosity > quiet) {
657         PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
658     }
659     client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
660     for (index = 0; index < client_threads; ++index)
661     {
662         client_thread[index] = PR_CreateThread(
663                                    PR_USER_THREAD, ClientThread, shared,
664                                    PR_PRIORITY_NORMAL, thread_scope,
665                                    PR_JOINABLE_THREAD, 16 * 1024);
666     }
667 
668     while (ops_done < ops_required) {
669         PR_Sleep(shared->timeout);
670     }
671 
672     if (verbosity > quiet) {
673         PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
674     }
675     for (index = 0; index < client_threads; ++index)
676     {
677         rv = PR_Interrupt(client_thread[index]);
678         MW_ASSERT(PR_SUCCESS == rv);
679         rv = PR_JoinThread(client_thread[index]);
680         MW_ASSERT(PR_SUCCESS == rv);
681     }
682     PR_DELETE(client_thread);
683 
684     if (verbosity > quiet) {
685         PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
686     }
687     rv = PR_Interrupt(enumeration_thread);
688     MW_ASSERT(PR_SUCCESS == rv);
689     rv = PR_JoinThread(enumeration_thread);
690     MW_ASSERT(PR_SUCCESS == rv);
691 
692     if (verbosity > quiet) {
693         PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
694     }
695     rv = PR_Interrupt(server_thread);
696     MW_ASSERT(PR_SUCCESS == rv);
697     rv = PR_JoinThread(server_thread);
698     MW_ASSERT(PR_SUCCESS == rv);
699 }  /* RealOneGroupIO */
700 
RunThisOne(void (* func)(Shared *),const char * name,const char * test_name)701 static void RunThisOne(
702     void (*func)(Shared*), const char *name, const char *test_name)
703 {
704     Shared *shared;
705     if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
706     {
707         if (verbosity > silent) {
708             PR_fprintf(debug, "%s()\n", name);
709         }
710         shared = MakeShared(name);
711         ops_done = 0;
712         func(shared);  /* run the test */
713         MW_ASSERT(0 == desc_allocated);
714         DestroyShared(shared);
715     }
716 }  /* RunThisOne */
717 
ChangeVerbosity(Verbosity verbosity,PRIntn delta)718 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
719 {
720     return (Verbosity)(((PRIntn)verbosity) + delta);
721 }  /* ChangeVerbosity */
722 
main(int argc,char ** argv)723 int main(int argc, char **argv)
724 {
725     PLOptStatus os;
726     const char *test_name = NULL;
727     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
728 
729     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
730     {
731         if (PL_OPT_BAD == os) {
732             continue;
733         }
734         switch (opt->option)
735         {
736             case 0:
737                 test_name = opt->value;
738                 break;
739             case 'd':  /* debug mode */
740                 if (verbosity < noisy) {
741                     verbosity = ChangeVerbosity(verbosity, 1);
742                 }
743                 break;
744             case 'q':  /* debug mode */
745                 if (verbosity > silent) {
746                     verbosity = ChangeVerbosity(verbosity, -1);
747                 }
748                 break;
749             case 'G':  /* use global threads */
750                 thread_scope = PR_GLOBAL_THREAD;
751                 break;
752             case 'c':  /* number of client threads */
753                 client_threads = atoi(opt->value);
754                 break;
755             case 'o':  /* operations to compelete */
756                 ops_required = atoi(opt->value);
757                 break;
758             case 'p':  /* default port */
759                 default_port = atoi(opt->value);
760                 break;
761             case 't':  /* number of threads waiting */
762                 worker_threads = atoi(opt->value);
763                 break;
764             case 'w':  /* number of wait objects */
765                 wait_objects = atoi(opt->value);
766                 break;
767             default:
768                 break;
769         }
770     }
771     PL_DestroyOptState(opt);
772 
773     if (verbosity > 0) {
774         debug = PR_GetSpecialFD(PR_StandardError);
775     }
776 
777     RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
778     RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
779     RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
780     RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
781     RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
782     return 0;
783 }  /* main */
784 
785 /* multwait.c */
786