1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "prio.h"
7 #include "prprf.h"
8 #include "prlog.h"
9 #include "prmem.h"
10 #include "pratom.h"
11 #include "prlock.h"
12 #include "prmwait.h"
13 #include "prclist.h"
14 #include "prerror.h"
15 #include "prinrval.h"
16 #include "prnetdb.h"
17 #include "prthread.h"
18
19 #include "plstr.h"
20 #include "plerror.h"
21 #include "plgetopt.h"
22
23 #include <string.h>
24
25 typedef struct Shared
26 {
27 const char *title;
28 PRLock *list_lock;
29 PRWaitGroup *group;
30 PRIntervalTime timeout;
31 } Shared;
32
33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
34
35 #ifdef DEBUG
36 #define PORT_INC_DO +100
37 #else
38 #define PORT_INC_DO
39 #endif
40 #ifdef IS_64
41 #define PORT_INC_3264 +200
42 #else
43 #define PORT_INC_3264
44 #endif
45
46 static PRFileDesc *debug = NULL;
47 static PRInt32 desc_allocated = 0;
48 static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264;
49 static enum Verbosity verbosity = quiet;
50 static PRInt32 ops_required = 1000, ops_done = 0;
51 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
52 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
53
54 #if defined(DEBUG)
55 #define MW_ASSERT(_expr) \
56 ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
_MW_Assert(const char * s,const char * file,PRIntn ln)57 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
58 {
59 if (NULL != debug) {
60 PL_FPrintError(debug, NULL);
61 }
62 PR_Assert(s, file, ln);
63 } /* _MW_Assert */
64 #else
65 #define MW_ASSERT(_expr)
66 #endif
67
PrintRecvDesc(PRRecvWait * desc,const char * msg)68 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
69 {
70 const char *tag[] = {
71 "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
72 "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"
73 };
74 PR_fprintf(
75 debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
76 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
77 } /* PrintRecvDesc */
78
MakeShared(const char * title)79 static Shared *MakeShared(const char *title)
80 {
81 Shared *shared = PR_NEWZAP(Shared);
82 shared->group = PR_CreateWaitGroup(1);
83 shared->timeout = PR_SecondsToInterval(1);
84 shared->list_lock = PR_NewLock();
85 shared->title = title;
86 return shared;
87 } /* MakeShared */
88
DestroyShared(Shared * shared)89 static void DestroyShared(Shared *shared)
90 {
91 PRStatus rv;
92 if (verbosity > quiet) {
93 PR_fprintf(debug, "%s: destroying group\n", shared->title);
94 }
95 rv = PR_DestroyWaitGroup(shared->group);
96 MW_ASSERT(PR_SUCCESS == rv);
97 PR_DestroyLock(shared->list_lock);
98 PR_DELETE(shared);
99 } /* DestroyShared */
100
CreateRecvWait(PRFileDesc * fd,PRIntervalTime timeout)101 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
102 {
103 PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
104 MW_ASSERT(NULL != desc_out);
105
106 MW_ASSERT(NULL != fd);
107 desc_out->fd = fd;
108 desc_out->timeout = timeout;
109 desc_out->buffer.length = 120;
110 desc_out->buffer.start = PR_CALLOC(120);
111
112 PR_AtomicIncrement(&desc_allocated);
113
114 if (verbosity > chatty) {
115 PrintRecvDesc(desc_out, "Allocated");
116 }
117 return desc_out;
118 } /* CreateRecvWait */
119
DestroyRecvWait(PRRecvWait * desc_out)120 static void DestroyRecvWait(PRRecvWait *desc_out)
121 {
122 if (verbosity > chatty) {
123 PrintRecvDesc(desc_out, "Destroying");
124 }
125 PR_Close(desc_out->fd);
126 if (NULL != desc_out->buffer.start) {
127 PR_DELETE(desc_out->buffer.start);
128 }
129 PR_Free(desc_out);
130 (void)PR_AtomicDecrement(&desc_allocated);
131 } /* DestroyRecvWait */
132
CancelGroup(Shared * shared)133 static void CancelGroup(Shared *shared)
134 {
135 PRRecvWait *desc_out;
136
137 if (verbosity > quiet) {
138 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
139 }
140
141 do
142 {
143 desc_out = PR_CancelWaitGroup(shared->group);
144 if (NULL != desc_out) {
145 DestroyRecvWait(desc_out);
146 }
147 } while (NULL != desc_out);
148
149 MW_ASSERT(0 == desc_allocated);
150 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
151 } /* CancelGroup */
152
ClientThread(void * arg)153 static void PR_CALLBACK ClientThread(void* arg)
154 {
155 PRStatus rv;
156 PRInt32 bytes;
157 PRIntn empty_flags = 0;
158 PRNetAddr server_address;
159 unsigned char buffer[100];
160 Shared *shared = (Shared*)arg;
161 PRFileDesc *server = PR_NewTCPSocket();
162 if ((NULL == server)
163 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
164 return;
165 }
166 MW_ASSERT(NULL != server);
167
168 if (verbosity > chatty) {
169 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
170 }
171
172 /* Initialize the buffer so that Purify won't complain */
173 memset(buffer, 0, sizeof(buffer));
174
175 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
176 MW_ASSERT(PR_SUCCESS == rv);
177
178 if (verbosity > quiet) {
179 PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
180 }
181 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
182
183 if (PR_FAILURE == rv)
184 {
185 if (verbosity > silent) {
186 PL_FPrintError(debug, "Client connect failed");
187 }
188 return;
189 }
190
191 while (ops_done < ops_required)
192 {
193 bytes = PR_Send(
194 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
195 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
196 break;
197 }
198 MW_ASSERT(sizeof(buffer) == bytes);
199 if (verbosity > chatty)
200 PR_fprintf(
201 debug, "%s: Client sent %d bytes\n",
202 shared->title, sizeof(buffer));
203 bytes = PR_Recv(
204 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
205 if (verbosity > chatty)
206 PR_fprintf(
207 debug, "%s: Client received %d bytes\n",
208 shared->title, sizeof(buffer));
209 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
210 break;
211 }
212 MW_ASSERT(sizeof(buffer) == bytes);
213 PR_Sleep(shared->timeout);
214 }
215 rv = PR_Close(server);
216 MW_ASSERT(PR_SUCCESS == rv);
217
218 } /* ClientThread */
219
OneInThenCancelled(Shared * shared)220 static void OneInThenCancelled(Shared *shared)
221 {
222 PRStatus rv;
223 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
224
225 shared->timeout = PR_INTERVAL_NO_TIMEOUT;
226
227 desc_in->fd = PR_NewTCPSocket();
228 desc_in->timeout = shared->timeout;
229
230 if (verbosity > chatty) {
231 PrintRecvDesc(desc_in, "Adding desc");
232 }
233
234 rv = PR_AddWaitFileDesc(shared->group, desc_in);
235 MW_ASSERT(PR_SUCCESS == rv);
236
237 if (verbosity > chatty) {
238 PrintRecvDesc(desc_in, "Cancelling");
239 }
240 rv = PR_CancelWaitFileDesc(shared->group, desc_in);
241 MW_ASSERT(PR_SUCCESS == rv);
242
243 desc_out = PR_WaitRecvReady(shared->group);
244 MW_ASSERT(desc_out == desc_in);
245 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
246 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
247 if (verbosity > chatty) {
248 PrintRecvDesc(desc_out, "Ready");
249 }
250
251 rv = PR_Close(desc_in->fd);
252 MW_ASSERT(PR_SUCCESS == rv);
253
254 if (verbosity > quiet) {
255 PR_fprintf(debug, "%s: destroying group\n", shared->title);
256 }
257
258 PR_DELETE(desc_in);
259 } /* OneInThenCancelled */
260
OneOpOneThread(Shared * shared)261 static void OneOpOneThread(Shared *shared)
262 {
263 PRStatus rv;
264 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
265
266 desc_in->fd = PR_NewTCPSocket();
267 desc_in->timeout = shared->timeout;
268
269 if (verbosity > chatty) {
270 PrintRecvDesc(desc_in, "Adding desc");
271 }
272
273 rv = PR_AddWaitFileDesc(shared->group, desc_in);
274 MW_ASSERT(PR_SUCCESS == rv);
275 desc_out = PR_WaitRecvReady(shared->group);
276 MW_ASSERT(desc_out == desc_in);
277 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
278 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
279 if (verbosity > chatty) {
280 PrintRecvDesc(desc_out, "Ready");
281 }
282
283 rv = PR_Close(desc_in->fd);
284 MW_ASSERT(PR_SUCCESS == rv);
285
286 PR_DELETE(desc_in);
287 } /* OneOpOneThread */
288
ManyOpOneThread(Shared * shared)289 static void ManyOpOneThread(Shared *shared)
290 {
291 PRStatus rv;
292 PRIntn index;
293 PRRecvWait *desc_in;
294 PRRecvWait *desc_out;
295
296 if (verbosity > quiet) {
297 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
298 }
299
300 for (index = 0; index < wait_objects; ++index)
301 {
302 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
303
304 rv = PR_AddWaitFileDesc(shared->group, desc_in);
305 MW_ASSERT(PR_SUCCESS == rv);
306 }
307
308 while (ops_done < ops_required)
309 {
310 desc_out = PR_WaitRecvReady(shared->group);
311 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
312 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
313 if (verbosity > chatty) {
314 PrintRecvDesc(desc_out, "Ready/readding");
315 }
316 rv = PR_AddWaitFileDesc(shared->group, desc_out);
317 MW_ASSERT(PR_SUCCESS == rv);
318 (void)PR_AtomicIncrement(&ops_done);
319 }
320
321 CancelGroup(shared);
322 } /* ManyOpOneThread */
323
SomeOpsThread(void * arg)324 static void PR_CALLBACK SomeOpsThread(void *arg)
325 {
326 PRRecvWait *desc_out;
327 PRStatus rv = PR_SUCCESS;
328 Shared *shared = (Shared*)arg;
329 do /* until interrupted */
330 {
331 desc_out = PR_WaitRecvReady(shared->group);
332 if (NULL == desc_out)
333 {
334 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
335 if (verbosity > quiet) {
336 PR_fprintf(debug, "Aborted\n");
337 }
338 break;
339 }
340 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
341 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
342 if (verbosity > chatty) {
343 PrintRecvDesc(desc_out, "Ready");
344 }
345
346 if (verbosity > chatty) {
347 PrintRecvDesc(desc_out, "Re-Adding");
348 }
349 desc_out->timeout = shared->timeout;
350 rv = PR_AddWaitFileDesc(shared->group, desc_out);
351 PR_AtomicIncrement(&ops_done);
352 if (ops_done > ops_required) {
353 break;
354 }
355 } while (PR_SUCCESS == rv);
356 MW_ASSERT(PR_SUCCESS == rv);
357 } /* SomeOpsThread */
358
SomeOpsSomeThreads(Shared * shared)359 static void SomeOpsSomeThreads(Shared *shared)
360 {
361 PRStatus rv;
362 PRThread **thread;
363 PRIntn index;
364 PRRecvWait *desc_in;
365
366 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
367
368 /* Create some threads */
369
370 if (verbosity > quiet) {
371 PR_fprintf(debug, "%s: creating threads\n", shared->title);
372 }
373 for (index = 0; index < worker_threads; ++index)
374 {
375 thread[index] = PR_CreateThread(
376 PR_USER_THREAD, SomeOpsThread, shared,
377 PR_PRIORITY_HIGH, thread_scope,
378 PR_JOINABLE_THREAD, 16 * 1024);
379 }
380
381 /* then create some operations */
382 if (verbosity > quiet) {
383 PR_fprintf(debug, "%s: creating desc\n", shared->title);
384 }
385 for (index = 0; index < wait_objects; ++index)
386 {
387 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
388 rv = PR_AddWaitFileDesc(shared->group, desc_in);
389 MW_ASSERT(PR_SUCCESS == rv);
390 }
391
392 if (verbosity > quiet) {
393 PR_fprintf(debug, "%s: sleeping\n", shared->title);
394 }
395 while (ops_done < ops_required) {
396 PR_Sleep(shared->timeout);
397 }
398
399 if (verbosity > quiet) {
400 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
401 }
402 for (index = 0; index < worker_threads; ++index)
403 {
404 rv = PR_Interrupt(thread[index]);
405 MW_ASSERT(PR_SUCCESS == rv);
406 rv = PR_JoinThread(thread[index]);
407 MW_ASSERT(PR_SUCCESS == rv);
408 }
409 PR_DELETE(thread);
410
411 CancelGroup(shared);
412 } /* SomeOpsSomeThreads */
413
ServiceRequest(Shared * shared,PRRecvWait * desc)414 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
415 {
416 PRInt32 bytes_out;
417
418 if (verbosity > chatty)
419 PR_fprintf(
420 debug, "%s: Service received %d bytes\n",
421 shared->title, desc->bytesRecv);
422
423 if (0 == desc->bytesRecv) {
424 goto quitting;
425 }
426 if ((-1 == desc->bytesRecv)
427 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
428 goto aborted;
429 }
430
431 bytes_out = PR_Send(
432 desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
433 if (verbosity > chatty)
434 PR_fprintf(
435 debug, "%s: Service sent %d bytes\n",
436 shared->title, bytes_out);
437
438 if ((-1 == bytes_out)
439 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) {
440 goto aborted;
441 }
442 MW_ASSERT(bytes_out == desc->bytesRecv);
443
444 return PR_SUCCESS;
445
446 aborted:
447 quitting:
448 return PR_FAILURE;
449 } /* ServiceRequest */
450
ServiceThread(void * arg)451 static void PR_CALLBACK ServiceThread(void *arg)
452 {
453 PRStatus rv = PR_SUCCESS;
454 PRRecvWait *desc_out = NULL;
455 Shared *shared = (Shared*)arg;
456 do /* until interrupted */
457 {
458 if (NULL != desc_out)
459 {
460 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
461 if (verbosity > chatty) {
462 PrintRecvDesc(desc_out, "Service re-adding");
463 }
464 rv = PR_AddWaitFileDesc(shared->group, desc_out);
465 MW_ASSERT(PR_SUCCESS == rv);
466 }
467
468 desc_out = PR_WaitRecvReady(shared->group);
469 if (NULL == desc_out)
470 {
471 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
472 break;
473 }
474
475 switch (desc_out->outcome)
476 {
477 case PR_MW_SUCCESS:
478 {
479 PR_AtomicIncrement(&ops_done);
480 if (verbosity > chatty) {
481 PrintRecvDesc(desc_out, "Service ready");
482 }
483 rv = ServiceRequest(shared, desc_out);
484 break;
485 }
486 case PR_MW_INTERRUPT:
487 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
488 rv = PR_FAILURE; /* if interrupted, then exit */
489 break;
490 case PR_MW_TIMEOUT:
491 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
492 case PR_MW_FAILURE:
493 if (verbosity > silent) {
494 PL_FPrintError(debug, "RecvReady failure");
495 }
496 break;
497 default:
498 break;
499 }
500 } while (PR_SUCCESS == rv);
501
502 if (NULL != desc_out) {
503 DestroyRecvWait(desc_out);
504 }
505
506 } /* ServiceThread */
507
EnumerationThread(void * arg)508 static void PR_CALLBACK EnumerationThread(void *arg)
509 {
510 PRStatus rv;
511 PRIntn count;
512 PRRecvWait *desc;
513 Shared *shared = (Shared*)arg;
514 PRIntervalTime five_seconds = PR_SecondsToInterval(5);
515 PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
516 MW_ASSERT(NULL != enumerator);
517
518 while (PR_SUCCESS == PR_Sleep(five_seconds))
519 {
520 count = 0;
521 desc = NULL;
522 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
523 {
524 if (verbosity > chatty) {
525 PrintRecvDesc(desc, shared->title);
526 }
527 count += 1;
528 }
529 if (verbosity > silent)
530 PR_fprintf(debug,
531 "%s Enumerated %d objects\n", shared->title, count);
532 }
533
534 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
535
536
537 rv = PR_DestroyMWaitEnumerator(enumerator);
538 MW_ASSERT(PR_SUCCESS == rv);
539 } /* EnumerationThread */
540
ServerThread(void * arg)541 static void PR_CALLBACK ServerThread(void *arg)
542 {
543 PRStatus rv;
544 PRIntn index;
545 PRRecvWait *desc_in;
546 PRThread **worker_thread;
547 Shared *shared = (Shared*)arg;
548 PRFileDesc *listener, *service;
549 PRNetAddr server_address, client_address;
550
551 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
552 if (verbosity > quiet) {
553 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
554 }
555 for (index = 0; index < worker_threads; ++index)
556 {
557 worker_thread[index] = PR_CreateThread(
558 PR_USER_THREAD, ServiceThread, shared,
559 PR_PRIORITY_HIGH, thread_scope,
560 PR_JOINABLE_THREAD, 16 * 1024);
561 }
562
563 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
564 MW_ASSERT(PR_SUCCESS == rv);
565
566 listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
567 if (verbosity > chatty)
568 PR_fprintf(
569 debug, "%s: Server listener socket @0x%x\n",
570 shared->title, listener);
571 rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
572 rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
573 while (ops_done < ops_required)
574 {
575 if (verbosity > quiet) {
576 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
577 }
578 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
579 if (NULL == service)
580 {
581 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) {
582 break;
583 }
584 PL_PrintError("Accept failed");
585 MW_ASSERT(PR_FALSE && "Accept failed");
586 }
587 else
588 {
589 desc_in = CreateRecvWait(service, shared->timeout);
590 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
591 if (verbosity > chatty) {
592 PrintRecvDesc(desc_in, "Service adding");
593 }
594 rv = PR_AddWaitFileDesc(shared->group, desc_in);
595 MW_ASSERT(PR_SUCCESS == rv);
596 }
597 }
598
599 if (verbosity > quiet) {
600 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
601 }
602 for (index = 0; index < worker_threads; ++index)
603 {
604 rv = PR_Interrupt(worker_thread[index]);
605 MW_ASSERT(PR_SUCCESS == rv);
606 rv = PR_JoinThread(worker_thread[index]);
607 MW_ASSERT(PR_SUCCESS == rv);
608 }
609 PR_DELETE(worker_thread);
610
611 PR_Close(listener);
612
613 CancelGroup(shared);
614
615 } /* ServerThread */
616
RealOneGroupIO(Shared * shared)617 static void RealOneGroupIO(Shared *shared)
618 {
619 /*
620 ** Create a server that listens for connections and then services
621 ** requests that come in over those connections. The server never
622 ** deletes a connection and assumes a basic RPC model of operation.
623 **
624 ** Use worker_threads threads to service how every many open ports
625 ** there might be.
626 **
627 ** Oh, ya. Almost forget. Create (some) clients as well.
628 */
629 PRStatus rv;
630 PRIntn index;
631 PRThread *server_thread, *enumeration_thread, **client_thread;
632
633 if (verbosity > quiet) {
634 PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
635 }
636
637 server_thread = PR_CreateThread(
638 PR_USER_THREAD, ServerThread, shared,
639 PR_PRIORITY_HIGH, thread_scope,
640 PR_JOINABLE_THREAD, 16 * 1024);
641
642 if (verbosity > quiet) {
643 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
644 }
645
646 enumeration_thread = PR_CreateThread(
647 PR_USER_THREAD, EnumerationThread, shared,
648 PR_PRIORITY_HIGH, thread_scope,
649 PR_JOINABLE_THREAD, 16 * 1024);
650
651 if (verbosity > quiet) {
652 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
653 }
654 PR_Sleep(5 * shared->timeout);
655
656 if (verbosity > quiet) {
657 PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
658 }
659 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
660 for (index = 0; index < client_threads; ++index)
661 {
662 client_thread[index] = PR_CreateThread(
663 PR_USER_THREAD, ClientThread, shared,
664 PR_PRIORITY_NORMAL, thread_scope,
665 PR_JOINABLE_THREAD, 16 * 1024);
666 }
667
668 while (ops_done < ops_required) {
669 PR_Sleep(shared->timeout);
670 }
671
672 if (verbosity > quiet) {
673 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
674 }
675 for (index = 0; index < client_threads; ++index)
676 {
677 rv = PR_Interrupt(client_thread[index]);
678 MW_ASSERT(PR_SUCCESS == rv);
679 rv = PR_JoinThread(client_thread[index]);
680 MW_ASSERT(PR_SUCCESS == rv);
681 }
682 PR_DELETE(client_thread);
683
684 if (verbosity > quiet) {
685 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
686 }
687 rv = PR_Interrupt(enumeration_thread);
688 MW_ASSERT(PR_SUCCESS == rv);
689 rv = PR_JoinThread(enumeration_thread);
690 MW_ASSERT(PR_SUCCESS == rv);
691
692 if (verbosity > quiet) {
693 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
694 }
695 rv = PR_Interrupt(server_thread);
696 MW_ASSERT(PR_SUCCESS == rv);
697 rv = PR_JoinThread(server_thread);
698 MW_ASSERT(PR_SUCCESS == rv);
699 } /* RealOneGroupIO */
700
RunThisOne(void (* func)(Shared *),const char * name,const char * test_name)701 static void RunThisOne(
702 void (*func)(Shared*), const char *name, const char *test_name)
703 {
704 Shared *shared;
705 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
706 {
707 if (verbosity > silent) {
708 PR_fprintf(debug, "%s()\n", name);
709 }
710 shared = MakeShared(name);
711 ops_done = 0;
712 func(shared); /* run the test */
713 MW_ASSERT(0 == desc_allocated);
714 DestroyShared(shared);
715 }
716 } /* RunThisOne */
717
ChangeVerbosity(Verbosity verbosity,PRIntn delta)718 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
719 {
720 return (Verbosity)(((PRIntn)verbosity) + delta);
721 } /* ChangeVerbosity */
722
main(int argc,char ** argv)723 int main(int argc, char **argv)
724 {
725 PLOptStatus os;
726 const char *test_name = NULL;
727 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
728
729 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
730 {
731 if (PL_OPT_BAD == os) {
732 continue;
733 }
734 switch (opt->option)
735 {
736 case 0:
737 test_name = opt->value;
738 break;
739 case 'd': /* debug mode */
740 if (verbosity < noisy) {
741 verbosity = ChangeVerbosity(verbosity, 1);
742 }
743 break;
744 case 'q': /* debug mode */
745 if (verbosity > silent) {
746 verbosity = ChangeVerbosity(verbosity, -1);
747 }
748 break;
749 case 'G': /* use global threads */
750 thread_scope = PR_GLOBAL_THREAD;
751 break;
752 case 'c': /* number of client threads */
753 client_threads = atoi(opt->value);
754 break;
755 case 'o': /* operations to compelete */
756 ops_required = atoi(opt->value);
757 break;
758 case 'p': /* default port */
759 default_port = atoi(opt->value);
760 break;
761 case 't': /* number of threads waiting */
762 worker_threads = atoi(opt->value);
763 break;
764 case 'w': /* number of wait objects */
765 wait_objects = atoi(opt->value);
766 break;
767 default:
768 break;
769 }
770 }
771 PL_DestroyOptState(opt);
772
773 if (verbosity > 0) {
774 debug = PR_GetSpecialFD(PR_StandardError);
775 }
776
777 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
778 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
779 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
780 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
781 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
782 return 0;
783 } /* main */
784
785 /* multwait.c */
786