1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "prio.h"
7 #include "prprf.h"
8 #include "prlog.h"
9 #include "prnetdb.h"
10 #include "prthread.h"
11 
12 #include "plerror.h"
13 #include "plgetopt.h"
14 #include "prwin16.h"
15 
16 #include <stdlib.h>
17 #include <string.h>
18 
19 /*
20 ** Testing layering of I/O
21 **
22 **      The layered server
23 ** A thread that acts as a server. It creates a TCP listener with a dummy
24 ** layer pushed on top. Then listens for incoming connections. Each connection
25 ** request for connection will be layered as well, accept one request, echo
26 ** it back and close.
27 **
28 **      The layered client
29 ** Pretty much what you'd expect.
30 */
31 
32 static PRFileDesc *logFile;
33 static PRDescIdentity identity;
34 static PRNetAddr server_address;
35 
36 static PRIOMethods myMethods;
37 
38 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
39 
40 static PRIntn minor_iterations = 5;
41 static PRIntn major_iterations = 1;
42 static Verbosity verbosity = quiet;
43 
44 #ifdef DEBUG
45 #define PORT_INC_DO +100
46 #else
47 #define PORT_INC_DO
48 #endif
49 #ifdef IS_64
50 #define PORT_INC_3264 +200
51 #else
52 #define PORT_INC_3264
53 #endif
54 
55 static PRUint16 default_port = 12273 PORT_INC_DO PORT_INC_3264;
56 
PushLayer(PRFileDesc * stack)57 static PRFileDesc *PushLayer(PRFileDesc *stack)
58 {
59     PRFileDesc *layer = PR_CreateIOLayerStub(identity, &myMethods);
60 #if defined(DEBUG) || defined(FORCE_PR_ASSERT)
61     PRStatus rv = /* we only need rv for PR_ASSERT() */
62 #endif
63         PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer);
64     if (verbosity > quiet) {
65         PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer, stack);
66     }
67     PR_ASSERT(PR_SUCCESS == rv);
68     return stack;
69 }  /* PushLayer */
70 
PushNewLayers(PRFileDesc * stack)71 static PRFileDesc *PushNewLayers(PRFileDesc *stack)
72 {
73     PRDescIdentity tmp_identity;
74     PRFileDesc *layer;
75     PRStatus rv;
76 
77     /* push a dummy layer */
78     tmp_identity = PR_GetUniqueIdentity("Dummy 1");
79     layer = PR_CreateIOLayerStub(tmp_identity, PR_GetDefaultIOMethods());
80     rv = PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer);
81     if (verbosity > quiet)
82         PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer,
83                    stack);
84     PR_ASSERT(PR_SUCCESS == rv);
85 
86     /* push a data processing layer */
87     layer = PR_CreateIOLayerStub(identity, &myMethods);
88     rv = PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer);
89     if (verbosity > quiet)
90         PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer,
91                    stack);
92     PR_ASSERT(PR_SUCCESS == rv);
93 
94     /* push another dummy layer */
95     tmp_identity = PR_GetUniqueIdentity("Dummy 2");
96     layer = PR_CreateIOLayerStub(tmp_identity, PR_GetDefaultIOMethods());
97     rv = PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer);
98     if (verbosity > quiet)
99         PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer,
100                    stack);
101     PR_ASSERT(PR_SUCCESS == rv);
102     return stack;
103 }  /* PushLayer */
104 
105 #if 0
106 static PRFileDesc *PopLayer(PRFileDesc *stack)
107 {
108     PRFileDesc *popped = PR_PopIOLayer(stack, identity);
109     if (verbosity > quiet) {
110         PR_fprintf(logFile, "Popped layer(0x%x) from stack(0x%x)\n", popped, stack);
111     }
112     popped->dtor(popped);
113 
114     return stack;
115 }  /* PopLayer */
116 #endif
117 
Client(void * arg)118 static void PR_CALLBACK Client(void *arg)
119 {
120     PRStatus rv;
121     PRUint8 buffer[100];
122     PRIntn empty_flags = 0;
123     PRIntn bytes_read, bytes_sent;
124     PRFileDesc *stack = (PRFileDesc*)arg;
125 
126     /* Initialize the buffer so that Purify won't complain */
127     memset(buffer, 0, sizeof(buffer));
128 
129     rv = PR_Connect(stack, &server_address, PR_INTERVAL_NO_TIMEOUT);
130     PR_ASSERT(PR_SUCCESS == rv);
131     while (minor_iterations-- > 0)
132     {
133         bytes_sent = PR_Send(
134                          stack, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
135         PR_ASSERT(sizeof(buffer) == bytes_sent);
136         if (verbosity > chatty) {
137             PR_fprintf(logFile, "Client sending %d bytes\n", bytes_sent);
138         }
139         bytes_read = PR_Recv(
140                          stack, buffer, bytes_sent, empty_flags, PR_INTERVAL_NO_TIMEOUT);
141         if (verbosity > chatty) {
142             PR_fprintf(logFile, "Client receiving %d bytes\n", bytes_read);
143         }
144         PR_ASSERT(bytes_read == bytes_sent);
145     }
146 
147     if (verbosity > quiet) {
148         PR_fprintf(logFile, "Client shutting down stack\n");
149     }
150 
151     rv = PR_Shutdown(stack, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv);
152 }  /* Client */
153 
Server(void * arg)154 static void PR_CALLBACK Server(void *arg)
155 {
156     PRStatus rv;
157     PRUint8 buffer[100];
158     PRFileDesc *service;
159     PRUintn empty_flags = 0;
160     PRIntn bytes_read, bytes_sent;
161     PRFileDesc *stack = (PRFileDesc*)arg;
162     PRNetAddr client_address;
163 
164     service = PR_Accept(stack, &client_address, PR_INTERVAL_NO_TIMEOUT);
165     if (verbosity > quiet) {
166         PR_fprintf(logFile, "Server accepting connection\n");
167     }
168 
169     do
170     {
171         bytes_read = PR_Recv(
172                          service, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
173         if (0 != bytes_read)
174         {
175             if (verbosity > chatty) {
176                 PR_fprintf(logFile, "Server receiving %d bytes\n", bytes_read);
177             }
178             PR_ASSERT(bytes_read > 0);
179             bytes_sent = PR_Send(
180                              service, buffer, bytes_read, empty_flags, PR_INTERVAL_NO_TIMEOUT);
181             if (verbosity > chatty) {
182                 PR_fprintf(logFile, "Server sending %d bytes\n", bytes_sent);
183             }
184             PR_ASSERT(bytes_read == bytes_sent);
185         }
186 
187     } while (0 != bytes_read);
188 
189     if (verbosity > quiet) {
190         PR_fprintf(logFile, "Server shutting down and closing stack\n");
191     }
192     rv = PR_Shutdown(service, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv);
193     rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv);
194 
195 }  /* Server */
196 
MyRecv(PRFileDesc * fd,void * buf,PRInt32 amount,PRIntn flags,PRIntervalTime timeout)197 static PRInt32 PR_CALLBACK MyRecv(
198     PRFileDesc *fd, void *buf, PRInt32 amount,
199     PRIntn flags, PRIntervalTime timeout)
200 {
201     char *b = (char*)buf;
202     PRFileDesc *lo = fd->lower;
203     PRInt32 rv, readin = 0, request = 0;
204     rv = lo->methods->recv(lo, &request, sizeof(request), flags, timeout);
205     if (verbosity > chatty) PR_fprintf(
206             logFile, "MyRecv sending permission for %d bytes\n", request);
207     if (0 < rv)
208     {
209         if (verbosity > chatty) PR_fprintf(
210                 logFile, "MyRecv received permission request for %d bytes\n", request);
211         rv = lo->methods->send(
212                  lo, &request, sizeof(request), flags, timeout);
213         if (0 < rv)
214         {
215             if (verbosity > chatty) PR_fprintf(
216                     logFile, "MyRecv sending permission for %d bytes\n", request);
217             while (readin < request)
218             {
219                 rv = lo->methods->recv(
220                          lo, b + readin, amount - readin, flags, timeout);
221                 if (rv <= 0) {
222                     break;
223                 }
224                 if (verbosity > chatty) PR_fprintf(
225                         logFile, "MyRecv received %d bytes\n", rv);
226                 readin += rv;
227             }
228             rv = readin;
229         }
230     }
231     return rv;
232 }  /* MyRecv */
233 
MySend(PRFileDesc * fd,const void * buf,PRInt32 amount,PRIntn flags,PRIntervalTime timeout)234 static PRInt32 PR_CALLBACK MySend(
235     PRFileDesc *fd, const void *buf, PRInt32 amount,
236     PRIntn flags, PRIntervalTime timeout)
237 {
238     PRFileDesc *lo = fd->lower;
239     const char *b = (const char*)buf;
240     PRInt32 rv, wroteout = 0, request;
241     if (verbosity > chatty) PR_fprintf(
242             logFile, "MySend asking permission to send %d bytes\n", amount);
243     rv = lo->methods->send(lo, &amount, sizeof(amount), flags, timeout);
244     if (0 < rv)
245     {
246         rv = lo->methods->recv(
247                  lo, &request, sizeof(request), flags, timeout);
248         if (0 < rv)
249         {
250             PR_ASSERT(request == amount);
251             if (verbosity > chatty) PR_fprintf(
252                     logFile, "MySend got permission to send %d bytes\n", request);
253             while (wroteout < request)
254             {
255                 rv = lo->methods->send(
256                          lo, b + wroteout, request - wroteout, flags, timeout);
257                 if (rv <= 0) {
258                     break;
259                 }
260                 if (verbosity > chatty) PR_fprintf(
261                         logFile, "MySend wrote %d bytes\n", rv);
262                 wroteout += rv;
263             }
264             rv = amount;
265         }
266     }
267     return rv;
268 }  /* MySend */
269 
ChangeVerbosity(Verbosity verbosity,PRIntn delta)270 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
271 {
272     PRIntn verbage = (PRIntn)verbosity + delta;
273     if (verbage < (PRIntn)silent) {
274         verbage = (PRIntn)silent;
275     }
276     else if (verbage > (PRIntn)noisy) {
277         verbage = (PRIntn)noisy;
278     }
279     return (Verbosity)verbage;
280 }  /* ChangeVerbosity */
281 
main(int argc,char ** argv)282 int main(int argc, char **argv)
283 {
284     PRStatus rv;
285     PRIntn mits;
286     PLOptStatus os;
287     PRFileDesc *client, *service;
288     PRFileDesc *client_stack, *service_stack;
289     PRNetAddr any_address;
290     const char *server_name = NULL;
291     const PRIOMethods *stubMethods;
292     PRThread *client_thread, *server_thread;
293     PRThreadScope thread_scope = PR_LOCAL_THREAD;
294     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGC:c:p:");
295     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
296     {
297         if (PL_OPT_BAD == os) {
298             continue;
299         }
300         switch (opt->option)
301         {
302             case 0:
303                 server_name = opt->value;
304                 break;
305             case 'd':  /* debug mode */
306                 if (verbosity < noisy) {
307                     verbosity = ChangeVerbosity(verbosity, 1);
308                 }
309                 break;
310             case 'q':  /* debug mode */
311                 if (verbosity > silent) {
312                     verbosity = ChangeVerbosity(verbosity, -1);
313                 }
314                 break;
315             case 'G':  /* use global threads */
316                 thread_scope = PR_GLOBAL_THREAD;
317                 break;
318             case 'C':  /* number of threads waiting */
319                 major_iterations = atoi(opt->value);
320                 break;
321             case 'c':  /* number of client threads */
322                 minor_iterations = atoi(opt->value);
323                 break;
324             case 'p':  /* default port */
325                 default_port = atoi(opt->value);
326                 break;
327             default:
328                 break;
329         }
330     }
331     PL_DestroyOptState(opt);
332     PR_STDIO_INIT();
333 
334     logFile = PR_GetSpecialFD(PR_StandardError);
335 
336     identity = PR_GetUniqueIdentity("Dummy");
337     stubMethods = PR_GetDefaultIOMethods();
338 
339     /*
340     ** The protocol we're going to implement is one where in order to initiate
341     ** a send, the sender must first solicit permission. Therefore, every
342     ** send is really a send - receive - send sequence.
343     */
344     myMethods = *stubMethods;  /* first get the entire batch */
345     myMethods.recv = MyRecv;  /* then override the ones we care about */
346     myMethods.send = MySend;  /* then override the ones we care about */
347 
348     if (NULL == server_name)
349         rv = PR_InitializeNetAddr(
350                  PR_IpAddrLoopback, default_port, &server_address);
351     else
352     {
353         rv = PR_StringToNetAddr(server_name, &server_address);
354         PR_ASSERT(PR_SUCCESS == rv);
355         rv = PR_InitializeNetAddr(
356                  PR_IpAddrNull, default_port, &server_address);
357     }
358     PR_ASSERT(PR_SUCCESS == rv);
359 
360     /* one type w/o layering */
361 
362     mits = minor_iterations;
363     while (major_iterations-- > 0)
364     {
365         if (verbosity > silent) {
366             PR_fprintf(logFile, "Beginning non-layered test\n");
367         }
368         client = PR_NewTCPSocket(); PR_ASSERT(NULL != client);
369         service = PR_NewTCPSocket(); PR_ASSERT(NULL != service);
370         rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address);
371         PR_ASSERT(PR_SUCCESS == rv);
372         rv = PR_Bind(service, &any_address); PR_ASSERT(PR_SUCCESS == rv);
373         rv = PR_Listen(service, 10); PR_ASSERT(PR_SUCCESS == rv);
374 
375         minor_iterations = mits;
376         server_thread = PR_CreateThread(
377                             PR_USER_THREAD, Server, service,
378                             PR_PRIORITY_HIGH, thread_scope,
379                             PR_JOINABLE_THREAD, 16 * 1024);
380         PR_ASSERT(NULL != server_thread);
381 
382         client_thread = PR_CreateThread(
383                             PR_USER_THREAD, Client, client,
384                             PR_PRIORITY_NORMAL, thread_scope,
385                             PR_JOINABLE_THREAD, 16 * 1024);
386         PR_ASSERT(NULL != client_thread);
387 
388         rv = PR_JoinThread(client_thread);
389         PR_ASSERT(PR_SUCCESS == rv);
390         rv = PR_JoinThread(server_thread);
391         PR_ASSERT(PR_SUCCESS == rv);
392 
393         rv = PR_Close(client); PR_ASSERT(PR_SUCCESS == rv);
394         rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv);
395         if (verbosity > silent) {
396             PR_fprintf(logFile, "Ending non-layered test\n");
397         }
398 
399         /* with layering */
400         if (verbosity > silent) {
401             PR_fprintf(logFile, "Beginning layered test\n");
402         }
403         client = PR_NewTCPSocket(); PR_ASSERT(NULL != client);
404         PushLayer(client);
405         service = PR_NewTCPSocket(); PR_ASSERT(NULL != service);
406         PushLayer(service);
407         rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address);
408         PR_ASSERT(PR_SUCCESS == rv);
409         rv = PR_Bind(service, &any_address); PR_ASSERT(PR_SUCCESS == rv);
410         rv = PR_Listen(service, 10); PR_ASSERT(PR_SUCCESS == rv);
411 
412         minor_iterations = mits;
413         server_thread = PR_CreateThread(
414                             PR_USER_THREAD, Server, service,
415                             PR_PRIORITY_HIGH, thread_scope,
416                             PR_JOINABLE_THREAD, 16 * 1024);
417         PR_ASSERT(NULL != server_thread);
418 
419         client_thread = PR_CreateThread(
420                             PR_USER_THREAD, Client, client,
421                             PR_PRIORITY_NORMAL, thread_scope,
422                             PR_JOINABLE_THREAD, 16 * 1024);
423         PR_ASSERT(NULL != client_thread);
424 
425         rv = PR_JoinThread(client_thread);
426         PR_ASSERT(PR_SUCCESS == rv);
427         rv = PR_JoinThread(server_thread);
428         PR_ASSERT(PR_SUCCESS == rv);
429 
430         rv = PR_Close(client); PR_ASSERT(PR_SUCCESS == rv);
431         rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv);
432         /* with layering, using new style stack */
433         if (verbosity > silent)
434             PR_fprintf(logFile,
435                        "Beginning layered test with new style stack\n");
436         client = PR_NewTCPSocket(); PR_ASSERT(NULL != client);
437         client_stack = PR_CreateIOLayer(client);
438         PushNewLayers(client_stack);
439         service = PR_NewTCPSocket(); PR_ASSERT(NULL != service);
440         service_stack = PR_CreateIOLayer(service);
441         PushNewLayers(service_stack);
442         rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address);
443         PR_ASSERT(PR_SUCCESS == rv);
444         rv = PR_Bind(service, &any_address); PR_ASSERT(PR_SUCCESS == rv);
445         rv = PR_Listen(service, 10); PR_ASSERT(PR_SUCCESS == rv);
446 
447         minor_iterations = mits;
448         server_thread = PR_CreateThread(
449                             PR_USER_THREAD, Server, service_stack,
450                             PR_PRIORITY_HIGH, thread_scope,
451                             PR_JOINABLE_THREAD, 16 * 1024);
452         PR_ASSERT(NULL != server_thread);
453 
454         client_thread = PR_CreateThread(
455                             PR_USER_THREAD, Client, client_stack,
456                             PR_PRIORITY_NORMAL, thread_scope,
457                             PR_JOINABLE_THREAD, 16 * 1024);
458         PR_ASSERT(NULL != client_thread);
459 
460         rv = PR_JoinThread(client_thread);
461         PR_ASSERT(PR_SUCCESS == rv);
462         rv = PR_JoinThread(server_thread);
463         PR_ASSERT(PR_SUCCESS == rv);
464 
465         rv = PR_Close(client_stack); PR_ASSERT(PR_SUCCESS == rv);
466         rv = PR_Close(service_stack); PR_ASSERT(PR_SUCCESS == rv);
467         if (verbosity > silent) {
468             PR_fprintf(logFile, "Ending layered test\n");
469         }
470     }
471     return 0;
472 }  /* main */
473 
474 /* layer.c */
475