1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "prio.h"
7 #include "prprf.h"
8 #include "prlog.h"
9 #include "prmem.h"
10 #include "pratom.h"
11 #include "prlock.h"
12 #include "prmwait.h"
13 #include "prclist.h"
14 #include "prerror.h"
15 #include "prinrval.h"
16 #include "prnetdb.h"
17 #include "prthread.h"
18
19 #include "plstr.h"
20 #include "plerror.h"
21 #include "plgetopt.h"
22
23 #include <string.h>
24
25 typedef struct Shared
26 {
27 const char *title;
28 PRLock *list_lock;
29 PRWaitGroup *group;
30 PRIntervalTime timeout;
31 } Shared;
32
33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
34
35 static PRFileDesc *debug = NULL;
36 static PRInt32 desc_allocated = 0;
37 static PRUint16 default_port = 12273;
38 static enum Verbosity verbosity = quiet;
39 static PRInt32 ops_required = 1000, ops_done = 0;
40 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
41 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
42
43 #if defined(DEBUG)
44 #define MW_ASSERT(_expr) \
45 ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
_MW_Assert(const char * s,const char * file,PRIntn ln)46 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
47 {
48 if (NULL != debug) PL_FPrintError(debug, NULL);
49 PR_Assert(s, file, ln);
50 } /* _MW_Assert */
51 #else
52 #define MW_ASSERT(_expr)
53 #endif
54
PrintRecvDesc(PRRecvWait * desc,const char * msg)55 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
56 {
57 const char *tag[] = {
58 "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
59 "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
60 PR_fprintf(
61 debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
62 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
63 } /* PrintRecvDesc */
64
MakeShared(const char * title)65 static Shared *MakeShared(const char *title)
66 {
67 Shared *shared = PR_NEWZAP(Shared);
68 shared->group = PR_CreateWaitGroup(1);
69 shared->timeout = PR_SecondsToInterval(1);
70 shared->list_lock = PR_NewLock();
71 shared->title = title;
72 return shared;
73 } /* MakeShared */
74
DestroyShared(Shared * shared)75 static void DestroyShared(Shared *shared)
76 {
77 PRStatus rv;
78 if (verbosity > quiet)
79 PR_fprintf(debug, "%s: destroying group\n", shared->title);
80 rv = PR_DestroyWaitGroup(shared->group);
81 MW_ASSERT(PR_SUCCESS == rv);
82 PR_DestroyLock(shared->list_lock);
83 PR_DELETE(shared);
84 } /* DestroyShared */
85
CreateRecvWait(PRFileDesc * fd,PRIntervalTime timeout)86 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
87 {
88 PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
89 MW_ASSERT(NULL != desc_out);
90
91 MW_ASSERT(NULL != fd);
92 desc_out->fd = fd;
93 desc_out->timeout = timeout;
94 desc_out->buffer.length = 120;
95 desc_out->buffer.start = PR_CALLOC(120);
96
97 PR_AtomicIncrement(&desc_allocated);
98
99 if (verbosity > chatty)
100 PrintRecvDesc(desc_out, "Allocated");
101 return desc_out;
102 } /* CreateRecvWait */
103
DestroyRecvWait(PRRecvWait * desc_out)104 static void DestroyRecvWait(PRRecvWait *desc_out)
105 {
106 if (verbosity > chatty)
107 PrintRecvDesc(desc_out, "Destroying");
108 PR_Close(desc_out->fd);
109 if (NULL != desc_out->buffer.start)
110 PR_DELETE(desc_out->buffer.start);
111 PR_Free(desc_out);
112 (void)PR_AtomicDecrement(&desc_allocated);
113 } /* DestroyRecvWait */
114
CancelGroup(Shared * shared)115 static void CancelGroup(Shared *shared)
116 {
117 PRRecvWait *desc_out;
118
119 if (verbosity > quiet)
120 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
121
122 do
123 {
124 desc_out = PR_CancelWaitGroup(shared->group);
125 if (NULL != desc_out) DestroyRecvWait(desc_out);
126 } while (NULL != desc_out);
127
128 MW_ASSERT(0 == desc_allocated);
129 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
130 } /* CancelGroup */
131
ClientThread(void * arg)132 static void PR_CALLBACK ClientThread(void* arg)
133 {
134 PRStatus rv;
135 PRInt32 bytes;
136 PRIntn empty_flags = 0;
137 PRNetAddr server_address;
138 unsigned char buffer[100];
139 Shared *shared = (Shared*)arg;
140 PRFileDesc *server = PR_NewTCPSocket();
141 if ((NULL == server)
142 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
143 MW_ASSERT(NULL != server);
144
145 if (verbosity > chatty)
146 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
147
148 /* Initialize the buffer so that Purify won't complain */
149 memset(buffer, 0, sizeof(buffer));
150
151 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
152 MW_ASSERT(PR_SUCCESS == rv);
153
154 if (verbosity > quiet)
155 PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
156 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
157
158 if (PR_FAILURE == rv)
159 {
160 if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
161 return;
162 }
163
164 while (ops_done < ops_required)
165 {
166 bytes = PR_Send(
167 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
168 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
169 MW_ASSERT(sizeof(buffer) == bytes);
170 if (verbosity > chatty)
171 PR_fprintf(
172 debug, "%s: Client sent %d bytes\n",
173 shared->title, sizeof(buffer));
174 bytes = PR_Recv(
175 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
176 if (verbosity > chatty)
177 PR_fprintf(
178 debug, "%s: Client received %d bytes\n",
179 shared->title, sizeof(buffer));
180 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
181 MW_ASSERT(sizeof(buffer) == bytes);
182 PR_Sleep(shared->timeout);
183 }
184 rv = PR_Close(server);
185 MW_ASSERT(PR_SUCCESS == rv);
186
187 } /* ClientThread */
188
OneInThenCancelled(Shared * shared)189 static void OneInThenCancelled(Shared *shared)
190 {
191 PRStatus rv;
192 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
193
194 shared->timeout = PR_INTERVAL_NO_TIMEOUT;
195
196 desc_in->fd = PR_NewTCPSocket();
197 desc_in->timeout = shared->timeout;
198
199 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
200
201 rv = PR_AddWaitFileDesc(shared->group, desc_in);
202 MW_ASSERT(PR_SUCCESS == rv);
203
204 if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
205 rv = PR_CancelWaitFileDesc(shared->group, desc_in);
206 MW_ASSERT(PR_SUCCESS == rv);
207
208 desc_out = PR_WaitRecvReady(shared->group);
209 MW_ASSERT(desc_out == desc_in);
210 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
211 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
212 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
213
214 rv = PR_Close(desc_in->fd);
215 MW_ASSERT(PR_SUCCESS == rv);
216
217 if (verbosity > quiet)
218 PR_fprintf(debug, "%s: destroying group\n", shared->title);
219
220 PR_DELETE(desc_in);
221 } /* OneInThenCancelled */
222
OneOpOneThread(Shared * shared)223 static void OneOpOneThread(Shared *shared)
224 {
225 PRStatus rv;
226 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
227
228 desc_in->fd = PR_NewTCPSocket();
229 desc_in->timeout = shared->timeout;
230
231 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
232
233 rv = PR_AddWaitFileDesc(shared->group, desc_in);
234 MW_ASSERT(PR_SUCCESS == rv);
235 desc_out = PR_WaitRecvReady(shared->group);
236 MW_ASSERT(desc_out == desc_in);
237 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
238 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
239 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
240
241 rv = PR_Close(desc_in->fd);
242 MW_ASSERT(PR_SUCCESS == rv);
243
244 PR_DELETE(desc_in);
245 } /* OneOpOneThread */
246
ManyOpOneThread(Shared * shared)247 static void ManyOpOneThread(Shared *shared)
248 {
249 PRStatus rv;
250 PRIntn index;
251 PRRecvWait *desc_in;
252 PRRecvWait *desc_out;
253
254 if (verbosity > quiet)
255 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
256
257 for (index = 0; index < wait_objects; ++index)
258 {
259 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
260
261 rv = PR_AddWaitFileDesc(shared->group, desc_in);
262 MW_ASSERT(PR_SUCCESS == rv);
263 }
264
265 while (ops_done < ops_required)
266 {
267 desc_out = PR_WaitRecvReady(shared->group);
268 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
269 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
270 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
271 rv = PR_AddWaitFileDesc(shared->group, desc_out);
272 MW_ASSERT(PR_SUCCESS == rv);
273 (void)PR_AtomicIncrement(&ops_done);
274 }
275
276 CancelGroup(shared);
277 } /* ManyOpOneThread */
278
SomeOpsThread(void * arg)279 static void PR_CALLBACK SomeOpsThread(void *arg)
280 {
281 PRRecvWait *desc_out;
282 PRStatus rv = PR_SUCCESS;
283 Shared *shared = (Shared*)arg;
284 do /* until interrupted */
285 {
286 desc_out = PR_WaitRecvReady(shared->group);
287 if (NULL == desc_out)
288 {
289 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
290 if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
291 break;
292 }
293 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
294 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
295 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
296
297 if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
298 desc_out->timeout = shared->timeout;
299 rv = PR_AddWaitFileDesc(shared->group, desc_out);
300 PR_AtomicIncrement(&ops_done);
301 if (ops_done > ops_required) break;
302 } while (PR_SUCCESS == rv);
303 MW_ASSERT(PR_SUCCESS == rv);
304 } /* SomeOpsThread */
305
SomeOpsSomeThreads(Shared * shared)306 static void SomeOpsSomeThreads(Shared *shared)
307 {
308 PRStatus rv;
309 PRThread **thread;
310 PRIntn index;
311 PRRecvWait *desc_in;
312
313 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
314
315 /* Create some threads */
316
317 if (verbosity > quiet)
318 PR_fprintf(debug, "%s: creating threads\n", shared->title);
319 for (index = 0; index < worker_threads; ++index)
320 {
321 thread[index] = PR_CreateThread(
322 PR_USER_THREAD, SomeOpsThread, shared,
323 PR_PRIORITY_HIGH, thread_scope,
324 PR_JOINABLE_THREAD, 16 * 1024);
325 }
326
327 /* then create some operations */
328 if (verbosity > quiet)
329 PR_fprintf(debug, "%s: creating desc\n", shared->title);
330 for (index = 0; index < wait_objects; ++index)
331 {
332 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
333 rv = PR_AddWaitFileDesc(shared->group, desc_in);
334 MW_ASSERT(PR_SUCCESS == rv);
335 }
336
337 if (verbosity > quiet)
338 PR_fprintf(debug, "%s: sleeping\n", shared->title);
339 while (ops_done < ops_required) PR_Sleep(shared->timeout);
340
341 if (verbosity > quiet)
342 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
343 for (index = 0; index < worker_threads; ++index)
344 {
345 rv = PR_Interrupt(thread[index]);
346 MW_ASSERT(PR_SUCCESS == rv);
347 rv = PR_JoinThread(thread[index]);
348 MW_ASSERT(PR_SUCCESS == rv);
349 }
350 PR_DELETE(thread);
351
352 CancelGroup(shared);
353 } /* SomeOpsSomeThreads */
354
ServiceRequest(Shared * shared,PRRecvWait * desc)355 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
356 {
357 PRInt32 bytes_out;
358
359 if (verbosity > chatty)
360 PR_fprintf(
361 debug, "%s: Service received %d bytes\n",
362 shared->title, desc->bytesRecv);
363
364 if (0 == desc->bytesRecv) goto quitting;
365 if ((-1 == desc->bytesRecv)
366 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
367
368 bytes_out = PR_Send(
369 desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
370 if (verbosity > chatty)
371 PR_fprintf(
372 debug, "%s: Service sent %d bytes\n",
373 shared->title, bytes_out);
374
375 if ((-1 == bytes_out)
376 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
377 MW_ASSERT(bytes_out == desc->bytesRecv);
378
379 return PR_SUCCESS;
380
381 aborted:
382 quitting:
383 return PR_FAILURE;
384 } /* ServiceRequest */
385
ServiceThread(void * arg)386 static void PR_CALLBACK ServiceThread(void *arg)
387 {
388 PRStatus rv = PR_SUCCESS;
389 PRRecvWait *desc_out = NULL;
390 Shared *shared = (Shared*)arg;
391 do /* until interrupted */
392 {
393 if (NULL != desc_out)
394 {
395 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
396 if (verbosity > chatty)
397 PrintRecvDesc(desc_out, "Service re-adding");
398 rv = PR_AddWaitFileDesc(shared->group, desc_out);
399 MW_ASSERT(PR_SUCCESS == rv);
400 }
401
402 desc_out = PR_WaitRecvReady(shared->group);
403 if (NULL == desc_out)
404 {
405 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
406 break;
407 }
408
409 switch (desc_out->outcome)
410 {
411 case PR_MW_SUCCESS:
412 {
413 PR_AtomicIncrement(&ops_done);
414 if (verbosity > chatty)
415 PrintRecvDesc(desc_out, "Service ready");
416 rv = ServiceRequest(shared, desc_out);
417 break;
418 }
419 case PR_MW_INTERRUPT:
420 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
421 rv = PR_FAILURE; /* if interrupted, then exit */
422 break;
423 case PR_MW_TIMEOUT:
424 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
425 case PR_MW_FAILURE:
426 if (verbosity > silent)
427 PL_FPrintError(debug, "RecvReady failure");
428 break;
429 default:
430 break;
431 }
432 } while (PR_SUCCESS == rv);
433
434 if (NULL != desc_out) DestroyRecvWait(desc_out);
435
436 } /* ServiceThread */
437
EnumerationThread(void * arg)438 static void PR_CALLBACK EnumerationThread(void *arg)
439 {
440 PRStatus rv;
441 PRIntn count;
442 PRRecvWait *desc;
443 Shared *shared = (Shared*)arg;
444 PRIntervalTime five_seconds = PR_SecondsToInterval(5);
445 PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
446 MW_ASSERT(NULL != enumerator);
447
448 while (PR_SUCCESS == PR_Sleep(five_seconds))
449 {
450 count = 0;
451 desc = NULL;
452 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
453 {
454 if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
455 count += 1;
456 }
457 if (verbosity > silent)
458 PR_fprintf(debug,
459 "%s Enumerated %d objects\n", shared->title, count);
460 }
461
462 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
463
464
465 rv = PR_DestroyMWaitEnumerator(enumerator);
466 MW_ASSERT(PR_SUCCESS == rv);
467 } /* EnumerationThread */
468
ServerThread(void * arg)469 static void PR_CALLBACK ServerThread(void *arg)
470 {
471 PRStatus rv;
472 PRIntn index;
473 PRRecvWait *desc_in;
474 PRThread **worker_thread;
475 Shared *shared = (Shared*)arg;
476 PRFileDesc *listener, *service;
477 PRNetAddr server_address, client_address;
478
479 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
480 if (verbosity > quiet)
481 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
482 for (index = 0; index < worker_threads; ++index)
483 {
484 worker_thread[index] = PR_CreateThread(
485 PR_USER_THREAD, ServiceThread, shared,
486 PR_PRIORITY_HIGH, thread_scope,
487 PR_JOINABLE_THREAD, 16 * 1024);
488 }
489
490 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
491 MW_ASSERT(PR_SUCCESS == rv);
492
493 listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
494 if (verbosity > chatty)
495 PR_fprintf(
496 debug, "%s: Server listener socket @0x%x\n",
497 shared->title, listener);
498 rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
499 rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
500 while (ops_done < ops_required)
501 {
502 if (verbosity > quiet)
503 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
504 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
505 if (NULL == service)
506 {
507 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
508 PL_PrintError("Accept failed");
509 MW_ASSERT(PR_FALSE && "Accept failed");
510 }
511 else
512 {
513 desc_in = CreateRecvWait(service, shared->timeout);
514 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
515 if (verbosity > chatty)
516 PrintRecvDesc(desc_in, "Service adding");
517 rv = PR_AddWaitFileDesc(shared->group, desc_in);
518 MW_ASSERT(PR_SUCCESS == rv);
519 }
520 }
521
522 if (verbosity > quiet)
523 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
524 for (index = 0; index < worker_threads; ++index)
525 {
526 rv = PR_Interrupt(worker_thread[index]);
527 MW_ASSERT(PR_SUCCESS == rv);
528 rv = PR_JoinThread(worker_thread[index]);
529 MW_ASSERT(PR_SUCCESS == rv);
530 }
531 PR_DELETE(worker_thread);
532
533 PR_Close(listener);
534
535 CancelGroup(shared);
536
537 } /* ServerThread */
538
RealOneGroupIO(Shared * shared)539 static void RealOneGroupIO(Shared *shared)
540 {
541 /*
542 ** Create a server that listens for connections and then services
543 ** requests that come in over those connections. The server never
544 ** deletes a connection and assumes a basic RPC model of operation.
545 **
546 ** Use worker_threads threads to service how every many open ports
547 ** there might be.
548 **
549 ** Oh, ya. Almost forget. Create (some) clients as well.
550 */
551 PRStatus rv;
552 PRIntn index;
553 PRThread *server_thread, *enumeration_thread, **client_thread;
554
555 if (verbosity > quiet)
556 PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
557
558 server_thread = PR_CreateThread(
559 PR_USER_THREAD, ServerThread, shared,
560 PR_PRIORITY_HIGH, thread_scope,
561 PR_JOINABLE_THREAD, 16 * 1024);
562
563 if (verbosity > quiet)
564 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
565
566 enumeration_thread = PR_CreateThread(
567 PR_USER_THREAD, EnumerationThread, shared,
568 PR_PRIORITY_HIGH, thread_scope,
569 PR_JOINABLE_THREAD, 16 * 1024);
570
571 if (verbosity > quiet)
572 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
573 PR_Sleep(5 * shared->timeout);
574
575 if (verbosity > quiet)
576 PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
577 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
578 for (index = 0; index < client_threads; ++index)
579 {
580 client_thread[index] = PR_CreateThread(
581 PR_USER_THREAD, ClientThread, shared,
582 PR_PRIORITY_NORMAL, thread_scope,
583 PR_JOINABLE_THREAD, 16 * 1024);
584 }
585
586 while (ops_done < ops_required) PR_Sleep(shared->timeout);
587
588 if (verbosity > quiet)
589 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
590 for (index = 0; index < client_threads; ++index)
591 {
592 rv = PR_Interrupt(client_thread[index]);
593 MW_ASSERT(PR_SUCCESS == rv);
594 rv = PR_JoinThread(client_thread[index]);
595 MW_ASSERT(PR_SUCCESS == rv);
596 }
597 PR_DELETE(client_thread);
598
599 if (verbosity > quiet)
600 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
601 rv = PR_Interrupt(enumeration_thread);
602 MW_ASSERT(PR_SUCCESS == rv);
603 rv = PR_JoinThread(enumeration_thread);
604 MW_ASSERT(PR_SUCCESS == rv);
605
606 if (verbosity > quiet)
607 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
608 rv = PR_Interrupt(server_thread);
609 MW_ASSERT(PR_SUCCESS == rv);
610 rv = PR_JoinThread(server_thread);
611 MW_ASSERT(PR_SUCCESS == rv);
612 } /* RealOneGroupIO */
613
RunThisOne(void (* func)(Shared *),const char * name,const char * test_name)614 static void RunThisOne(
615 void (*func)(Shared*), const char *name, const char *test_name)
616 {
617 Shared *shared;
618 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
619 {
620 if (verbosity > silent)
621 PR_fprintf(debug, "%s()\n", name);
622 shared = MakeShared(name);
623 ops_done = 0;
624 func(shared); /* run the test */
625 MW_ASSERT(0 == desc_allocated);
626 DestroyShared(shared);
627 }
628 } /* RunThisOne */
629
ChangeVerbosity(Verbosity verbosity,PRIntn delta)630 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
631 {
632 PRIntn verbage = (PRIntn)verbosity;
633 return (Verbosity)(verbage += delta);
634 } /* ChangeVerbosity */
635
main(int argc,char ** argv)636 int main(int argc, char **argv)
637 {
638 PLOptStatus os;
639 const char *test_name = NULL;
640 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
641
642 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
643 {
644 if (PL_OPT_BAD == os) continue;
645 switch (opt->option)
646 {
647 case 0:
648 test_name = opt->value;
649 break;
650 case 'd': /* debug mode */
651 if (verbosity < noisy)
652 verbosity = ChangeVerbosity(verbosity, 1);
653 break;
654 case 'q': /* debug mode */
655 if (verbosity > silent)
656 verbosity = ChangeVerbosity(verbosity, -1);
657 break;
658 case 'G': /* use global threads */
659 thread_scope = PR_GLOBAL_THREAD;
660 break;
661 case 'c': /* number of client threads */
662 client_threads = atoi(opt->value);
663 break;
664 case 'o': /* operations to compelete */
665 ops_required = atoi(opt->value);
666 break;
667 case 'p': /* default port */
668 default_port = atoi(opt->value);
669 break;
670 case 't': /* number of threads waiting */
671 worker_threads = atoi(opt->value);
672 break;
673 case 'w': /* number of wait objects */
674 wait_objects = atoi(opt->value);
675 break;
676 default:
677 break;
678 }
679 }
680 PL_DestroyOptState(opt);
681
682 if (verbosity > 0)
683 debug = PR_GetSpecialFD(PR_StandardError);
684
685 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
686 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
687 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
688 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
689 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
690 return 0;
691 } /* main */
692
693 /* multwait.c */
694