1 /*
2  *  The Regina Rexx Interpreter
3  *  Copyright (C) 2001-2004  Florian Gro�e-Coosmann
4  *
5  *  This library is free software; you can redistribute it and/or
6  *  modify it under the terms of the GNU Library General Public
7  *  License as published by the Free Software Foundation; either
8  *  version 2 of the License, or (at your option) any later version.
9  *
10  *  This library is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  *  Library General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Library General Public
16  *  License along with this library; if not, write to the Free
17  *  Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18  *
19  ******************************************************************************
20  *
21  * Asynchroneous thread multiplexer with a parallel use of the REXXSAA API.
22  *
23  * This example works with Win32 as with OS/2 or Posix threads.
24  */
25 
26 #pragma clang diagnostic ignored "-Wint-to-void-pointer-cast"
27 
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <limits.h>
32 #include <errno.h>
33 #include <time.h>
34 
35 #ifdef POSIX_THREADS
36 # include <sys/time.h>
37 # include <stdint.h>
38 # include <unistd.h>
39 # include <pthread.h>
40 # define USE_SEMAPHORES 0
41 # if defined(_POSIX_SEMAPHORES) && !(defined(__APPLE__) && defined(__MACH__))
42 /*
43  * threader uses unnamed posix semaphores. OSX does not support unnamed
44  * semaphores; use mutexes for OSX
45  */
46 #  include <semaphore.h>
47 #  include <signal.h>
48 #  undef USE_SEMAPHORES
49 #  define USE_SEMAPHORES 1
50 # endif
51 #endif
52 
53 #ifdef OS2_THREADS
54 # include <io.h>
55 # include <stddef.h>
56 # include <process.h>
57 # define INCL_DOSSEMAPHORES
58 # define INCL_ERRORS
59 # include <os2.h>
60 # define CHAR_TYPEDEFED
61 # define SHORT_TYPEDEFED
62 # define LONG_TYPEDEFED
63 # ifndef _OS2EMX_H
64 #  define _OS2EMX_H       /* prevents PFN from defining (Watcom) */
65 # endif
66 #endif
67 
68 #ifdef _MSC_VER
69 /* This picky compiler claims about unused formal parameters.
70  * This is correct but hides (for human eyes) other errors since they
71  * are many and we can't reduce them all.
72  * Error 4100 is "unused formal parameter".
73  */
74 # pragma warning(disable:4100 4115 4201 4214 4514)
75 #endif
76 
77 #ifdef WIN32_THREADS
78 # define WIN32_LEAN_AND_MEAN
79 # include <process.h>
80 # include <windows.h>
81 # include <io.h>
82 #endif
83 
84 #ifdef _MSC_VER
85 # pragma warning(default:4100 4115 4201 4214)
86 #endif
87 
88 #define INCL_RXSHV
89 #define INCL_RXFUNC
90 #define INCL_RXSYSEXIT
91 #define INCL_RXSUBCOM
92 
93 #ifdef USE_OREXX
94 # include "rexx.h"
95 #else
96 # include "rexxsaa.h"
97 #endif
98 
99 /*
100  * MAX_THREADS is the number of parallel starting threads. 20 is a good
101  * maximum.
102  */
103 #define MAX_THREADS 20
104 
105 /*
106  * MAX_RUN defines the number of loops each thread has to perform.
107  * Don't modify.
108  */
109 #define MAX_RUN ( sizeof( unsigned ) * CHAR_BIT )
110 
111 /*
112  * TOTAL_THREADS is the number of threads which shall be created. 2000 should
113  * be sufficient to detect memory leaks, etc.
114  * Can be overwritten with -t command line switch
115  */
116 #define TOTAL_THREADS 1500
117 int total_threads = TOTAL_THREADS;
118 
119 /*
120  * timeout_seconds is the number of seconds a thread is allowed to live before
121  * stopping the program. Overwrite with -s command line switch
122  */
123 int timeout_seconds = 3;
124 
125 #ifdef POSIX_THREADS
126 /*
127  * See below at WIN32_THREADS for a description.
128  */
129 #define ThreadIndexType pthread_t
130 #define my_threadidx() pthread_self()
131 #define my_threadid() pthread_self()
132 #define THREAD_RETURN void *
133 #define THREAD_CONVENTION
134 static pthread_t thread[MAX_THREADS];
135 #endif
136 
137 #ifdef OS2_THREADS
138 /*
139  * See below at WIN32_THREADS for a description.
140  */
141 #define ThreadIndexType int
142 #define my_threadidx() *_threadid
143 #define my_threadid() *_threadid
144 #define THREAD_RETURN void
145 #define THREAD_RETURN_VOID 1
146 #define THREAD_CONVENTION
147 static int thread[MAX_THREADS];
148 #endif
149 
150 #ifdef WIN32_THREADS
151 /*
152  * ThreadIndexType is the type of my_threadidx which shall be the thread
153  * identifier.
154  */
155 #define ThreadIndexType DWORD
156 
157 /*
158  * my_threadidx() has to return the thread identifier of the current thread.
159  */
160 #define my_threadidx() GetCurrentThreadId()
161 
162 /*
163  * my_threadid() has to return the thread's handle of the current thread if
164  * such a thing exists.
165  */
166 #define my_threadid() GetCurrentThread()
167 
168 /*
169  * THREAD_RETURN defines the return value type of the thread creation function.
170  */
171 #define THREAD_RETURN unsigned
172 
173 /*
174  * THREAD_CONVENTION defines the calling convention of the thread creation
175  * function. It may be the empty string for cdecl.
176  */
177 #define THREAD_CONVENTION __stdcall
178 
179 /*
180  * thread will hold the handle of each thread.
181  */
182 static HANDLE thread[MAX_THREADS];
183 #endif
184 
185 
186 /*
187  * threadx will hold the identifier of each thread (in opposite to "thread").
188  */
189 static ThreadIndexType threadx[MAX_THREADS];
190 
191 /*
192  * State manages the state of each possible parallel thread in the arrays
193  * "thread" and "threadx".
194  */
195 static enum {
196    Ready = 0,  /* The thread's slot may be used */
197    Running,    /* Thread has been started */
198    Stopped     /* This value is used by a thread to signal its death. */
199 } State[MAX_THREADS];
200 
201 /*
202  * found contains the number of properly detected lines by the thread.
203  * Lines matching a pattern are expected. The values are used bitwise.
204  */
205 static unsigned found[MAX_THREADS];
206 
207 /*
208  * GlobalError contains the global error code (and return value).
209  * This variables type shall be threadsafe.
210  */
211 static int GlobalError = 0;
212 
213 /*
214  * stdout_is_tty is a flag which is set when we may drop additional garbage
215  * to the output.
216  */
217 static int stdout_is_tty = 0;
218 
219 /*
220  * UseInstore manages the state of the instore-processing. Instore macros
221  * are precompiled macros and shall run faster than normal ones.
222  */
223 static enum {
224    UnUsed = 0, /* Don't use instore macros at all. */
225    FirstRun,   /* This is the first run when compiling the macros */
226    DoInstore   /* We have to use the instore macro */
227 } UseInstore = UnUsed;
228 
229 /*
230  * InstoreBuf will hold the compiled macro.
231  */
232 static void *InstoreBuf = NULL;
233 
234 /*
235  * InstoreLen will hold the compiled macro's length if InstoreBuf is non-NULL.
236  */
237 static unsigned InstoreLen;
238 
239 static void ThreadHasStopped(unsigned position);
240 
241 static char *program_name = NULL;
242 
243 /*
244  * We redirect Rexx' output. This is the redirection handler.
245  * We expect the string "Loop <x> in thread <y>" where x is a running
246  * counter and y is the thread identifier returned by Regina.
247  */
instore_exit(LONG ExNum,LONG Subfun,PEXIT PBlock)248 LONG APIENTRY instore_exit( LONG ExNum, LONG Subfun, PEXIT PBlock )
249 {
250    RXSIOSAY_PARM *psiosay;
251    char buf[256];           /* enough to hold the data */
252    RXSTRING rx;
253    unsigned len,loop;
254    unsigned long tid;
255    int rc,idx;
256 
257    if ( ( ExNum != RXSIO ) || ( Subfun != RXSIOSAY ) ) /* unexpected? */
258       return RXEXIT_NOT_HANDLED;
259 
260    if ( GlobalError )
261       return RXEXIT_HANDLED;
262 
263    /*
264     * We have to check the data after fetching it from the parameter block.
265     */
266    psiosay = ( RXSIOSAY_PARM * ) PBlock;
267    rx = psiosay->rxsio_string;
268    if ( !RXVALIDSTRING( rx ) )
269    {
270       fprintf( stderr, "\n"
271                       "Thread %lu gives an invalid string for a SAY output.\n",
272                        ( unsigned long ) my_threadidx() );
273       GlobalError = 1;
274       return RXEXIT_HANDLED;
275    }
276 
277    len = RXSTRLEN( rx );
278    if ( len >= sizeof(buf) )
279       len = sizeof(buf) - 1;  /* This shall NOT happen, but it's irrelevant */
280    memcpy( buf, RXSTRPTR( rx ), len );
281    buf[len] = '\0'; /* We have a sscanf-able string */
282    rc = sscanf( buf, "Loop %u in thread %lu", &loop, &tid );
283    if ( rc != 2 )
284    {
285       fprintf( stderr, "\n"
286                        "Thread %lu gives an unexpected SAY output \"%s\".\n",
287                        ( unsigned long ) my_threadidx(), buf );
288       GlobalError = 1;
289       return RXEXIT_HANDLED;
290    }
291 
292    /*
293     * Is the wrong thread addressed? This may be a common error.
294     */
295    if ( ( ThreadIndexType ) tid != my_threadidx() )
296    {
297       fprintf( stderr, "\n"
298                      "Thread %lu claims an incorrect thread identifier %lu.\n",
299                        ( unsigned long ) my_threadidx(),
300                        ( unsigned long ) tid );
301       GlobalError = 1;
302       return RXEXIT_HANDLED;
303    }
304 
305    /*
306     * Is this a known thread?
307     */
308    for ( idx = 0; idx < MAX_THREADS; idx++ )
309    {
310       if ( threadx[idx] == ( ThreadIndexType ) tid )
311          break;
312    }
313 
314    if ( idx >= MAX_THREADS )
315    {
316       fprintf( stderr, "\n"
317                        "Thread %lu can't be found in the thread table.\n",
318                        ( unsigned long ) my_threadidx() );
319       GlobalError = 1;
320       return RXEXIT_HANDLED;
321    }
322 
323    /*
324     * Check the loop number. We may have lost or duplicated data.
325     */
326    rc = 0; /* OK */
327    if ( ( loop < 1 ) || ( loop > MAX_RUN ) )
328       rc = 1;
329    else
330    {
331       loop = 1 << ( loop - 1 ); /* Bitmask for the loop */
332       if ( found[idx] & loop )
333          rc = 1;                /* already found */
334       found[idx] |= loop;
335    }
336 
337    if ( rc )
338    {
339       fprintf( stderr, "\n"
340                        "Thread %lu's loop doesn't run continuously.\n",
341                        ( unsigned long ) my_threadidx() );
342          GlobalError = 1;
343    }
344 
345    return RXEXIT_HANDLED;
346 }
347 
348 /*
349  * instore is a separate thread and invokes a Rexx script.
350  * It runs a loop (inside Rexx) and checks for errors.
351  * The return value is 0 if a return value is used at all.
352  * The argument data is the index of the thread within threadx.
353  */
instore(void * data)354 THREAD_RETURN THREAD_CONVENTION instore( void *data )
355 {
356    RXSTRING Instore[2];
357    RXSYSEXIT Exits[2];
358    char instore_buf[256];
359    int rc;
360 
361    /*
362     * signal that we are alive.
363     */
364    threadx[(uintmax_t) data] = my_threadidx();
365 
366    /*
367     * Register an exit handler which shall check Regina's output of the thread.
368     */
369    RexxRegisterExitExe( "ExitHandler",       /* name of the handler        */
370 #ifdef RX_WEAKTYPING
371                         (PFN) instore_exit,  /* entry point of the handler */
372 #else
373                         instore_exit,        /* entry point of the handler */
374 #endif
375                         NULL );              /* user area of the handler   */
376 
377    /*
378     * Build up the structure which informs Regina to use the exit handler.
379     */
380    Exits[0].sysexit_name = "ExitHandler";
381    Exits[0].sysexit_code = RXSIO;
382    Exits[1].sysexit_code = RXENDLST;
383 
384    sprintf( instore_buf, "Numeric Digits 30;"
385                          "Do i = 1 To %u;"
386                          "say 'Loop' i 'in thread' gettid();"
387                          "End;"
388                          "Return 0",
389                          (unsigned) MAX_RUN );
390    Instore[0].strptr = instore_buf;
391    Instore[0].strlength = strlen( Instore[0].strptr );
392    if ( UseInstore == DoInstore )
393    {
394       /*
395        * We don't need a script any longer. Instead we pass a compiled script
396        * for the interpreter. You may or may not pass the original script,
397        * it is ignored.
398        */
399       Instore[1].strptr = (char *)InstoreBuf;
400       Instore[1].strlength = InstoreLen;
401    }
402    else
403    {
404       Instore[1].strptr = NULL;
405    }
406    rc = RexxStart( 0,         /* ArgCount                  */
407                    NULL,      /* ArgList                   */
408                    "Testing", /* ProgramName               */
409                    Instore,   /* Instore (source/compiled) */
410                    "Foo",     /* EnvironmentName           */
411                    RXCOMMAND, /* CallType                  */
412                    Exits,     /* ExitHandlerList           */
413                    NULL,      /* ReturnCode (ignored)      */
414                    NULL );    /* ReturnValue (ignored)     */
415    switch ( UseInstore )
416    {
417       case UnUsed:
418          /*
419           * We will get an instore macro even if not desired. Delete it.
420           */
421          if ( Instore[1].strptr )
422             RexxFreeMemory( Instore[1].strptr );
423          else
424          {
425             GlobalError = 1;
426             fprintf( stderr, "\n"
427                              "Didn't got the instore macro.\n" );
428          }
429          break;
430 
431       case FirstRun:
432          /*
433           * On the first run save the instore macro for later use.
434           */
435          if ( Instore[1].strptr )
436          {
437             InstoreBuf = Instore[1].strptr;
438             InstoreLen = Instore[1].strlength;
439          }
440          else
441          {
442             GlobalError = 1;
443             fprintf( stderr, "\n"
444                              "Didn't got the instore macro.\n" );
445          }
446          break;
447 
448       default:
449          /*
450           * I don't know if the standard allows a success and a return value
451           * of NULL in Instore[1]. Ignore it. It will be detected later.
452           * True application should check the return code of RexxStart().
453           */
454          break;
455    }
456    RexxDeregisterExit( "ExitHandler", /* name of the handler */
457                        NULL );        /* module name, NULL=executable */
458 
459    /*
460     * Finally inform the invoker that we have stopped gracefully.
461     */
462    ThreadHasStopped( ( uintmax_t ) data );
463 #ifdef REGINAVERSION
464    ReginaCleanup();
465 #endif
466 #ifndef THREAD_RETURN_VOID
467    return ( THREAD_RETURN ) 0;
468 #endif
469 }
470 
471 /*
472  * external is a separate thread and invokes a Rexx program from disk.
473  * It runs a loop (inside Rexx) and checks for errors.
474  * The return value is 0 if a return value is used at all.
475  * The argument data is the index of the thread within threadx.
476  * The Rexx program filename is in a global variable.
477  */
external(void * data)478 THREAD_RETURN THREAD_CONVENTION external( void *data )
479 {
480    int rc;
481 
482    /*
483     * signal that we are alive.
484     */
485    threadx[(uintmax_t) data] = my_threadidx();
486 
487    rc = RexxStart( 0,            /* ArgCount                  */
488                    NULL,         /* ArgList                   */
489                    program_name, /* ProgramName               */
490                    NULL,         /* Instore (source/compiled) */
491                    "Foo",        /* EnvironmentName           */
492                    RXCOMMAND,    /* CallType                  */
493                    NULL,         /* ExitHandlerList           */
494                    NULL,         /* ReturnCode (ignored)      */
495                    NULL );       /* ReturnValue (ignored)     */
496 
497    /*
498     * Finally inform the invoker that we have stopped gracefully.
499     */
500    ThreadHasStopped( ( uintmax_t ) data );
501 #ifdef REGINAVERSION
502    ReginaCleanup();
503 #endif
504 #ifndef THREAD_RETURN_VOID
505    return ( THREAD_RETURN ) 0;
506 #endif
507 }
508 
509 /******************************************************************************
510  ******************************************************************************
511  * thread management **********************************************************
512  ******************************************************************************
513  *****************************************************************************/
514 
515 /*
516  * reap checks the thread's result buffer to see if it has seen all the lines
517  * the interpreter has emitted.
518  * The global error is set in case of an error.
519  * The result buffer is reset on success.
520  * Only called when using "instore" macro.
521  */
reap(unsigned position)522 void reap( unsigned position )
523 {
524    if (found[position] != ~((unsigned) 0))
525    {
526       fprintf(stderr,"\n"
527                      "Thread %lu has stopped without completing its loop.\n",
528                      (unsigned long) threadx[position]);
529       GlobalError = 1;
530    }
531    found[position] = 0;
532 }
533 
534 #ifdef WIN32_THREADS
535 /*
536  * init_threads initializes the usage of our thread management system.
537  * Returns 1 on success, 0 otherwise.
538  */
init_threads(void)539 int init_threads( void )
540 {
541    return 1;
542 }
543 
544 /*
545  * start_a_thread starts a thread and sets some state informations which are
546  * set back in case of an error.
547  * The return code is 1 on success, 0 otherwise.
548  */
start_a_thread(unsigned position)549 int start_a_thread( unsigned position )
550 {
551    int rc;
552    unsigned threadID;
553 
554    State[position] = Running;
555    /* Avoid some race conditions. I don't know if this is a problem of the
556     * runtime system or the kernel. If the systems runs into severe swapping
557     * the threads seems to run before the thread id is known which is used
558     * in instore_exit. We suspend the thread until all details of the new
559     * thread are known before we continue. This gives a little bit worse
560     * performance.
561     */
562    thread[position] = ( HANDLE ) _beginthreadex( NULL,
563                                                  0,
564                                                  (program_name) ? external : instore,
565                                                  ( void * ) position,
566                                                  CREATE_SUSPENDED,
567                                                  &threadID );
568    rc = ( long ) thread[position] != 0l;
569    if ( !rc )
570    {
571       fprintf( stderr, "\n"
572                        "Error starting thread, error code is %ld\n",
573                        GetLastError() );
574       GlobalError = 1;
575       ThreadHasStopped( position );
576    }
577    ResumeThread( thread[position] );
578    return rc;
579 }
580 
581 /*
582  * Thread has stopped sets the global state information of the thread with the
583  * index "position" to "Stopped".
584  */
ThreadHasStopped(unsigned position)585 static void ThreadHasStopped( unsigned position )
586 {
587    State[position] = Stopped;
588 }
589 
590 /*
591  * wait_for_threads restarts new threads until the requested count of
592  * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
593  *
594  * We expect to have MAX_THREADS already running.
595  */
wait_for_threads(void)596 void wait_for_threads( void )
597 {
598    unsigned i,j,done,running;
599    DWORD rc;
600    HANDLE compressed[MAX_THREADS];
601    unsigned BackSort[MAX_THREADS];
602 
603    running = done = MAX_THREADS;
604 
605    if ( stdout_is_tty )
606       printf( "%u\r", MAX_THREADS );
607    if ( GlobalError )
608       return;
609 
610    for ( ; ; )
611    {
612       /*
613        * We have to pass an array of thread handles to the OS' waiter function.
614        * But some threads may not be running at the last steps. Therefore we
615        * have to resort the handles to be consecutive in a temporary array.
616        */
617       for ( i = 0, j = 0; i < MAX_THREADS; i++ )
618       {
619          if ( State[i] != Ready )
620          {
621             compressed[j] = thread[i];
622             BackSort[j] = i;
623             j++;
624          }
625       }
626       rc = WaitForMultipleObjects( running, compressed, FALSE, timeout_seconds*1000 );
627       if ( rc == 0xFFFFFFFF )
628       {
629          /*
630           * Failure or dead thread, look for a stopped one
631           */
632          for ( i = 0; i < running; i++ )
633          {
634             if ( State[BackSort[i]] == Stopped )
635                rc = WAIT_OBJECT_0 + i;
636          }
637       }
638       if ( ( rc < WAIT_OBJECT_0 ) || ( rc >= running + WAIT_OBJECT_0 ) )
639       {
640          fprintf( stderr, "\n"
641                           "At least one thread won't finish normally within 3 seconds (rc=%u, error=%lu).\n",
642                           rc, GetLastError() );
643          GlobalError = 1;
644       }
645       if ( GlobalError )
646          break;
647 
648       /*
649        * A thread has died. Find it and check the reason.
650        */
651       i = BackSort[rc - WAIT_OBJECT_0];
652       if ( State[i] != Stopped )
653       {
654          fprintf( stderr, "\n"
655                           "Thread %u hasn't finished normally.\n",i);
656          GlobalError = 1;
657          break;
658       }
659 
660       /*
661        * Destroy associated buffers, check values and
662        * restart a new instance if we still have to do so.
663        */
664       CloseHandle( thread[i] );
665       State[i] = Ready;
666       running--;
667       /*
668        * Only reap our threads if running the instore test code
669        */
670       if ( program_name == NULL )
671          reap( i );
672 
673       if ( done < total_threads )
674       {
675          if ( !start_a_thread( i ) )
676             break;
677          done++;
678          running++;
679       }
680       if ( stdout_is_tty )
681          printf( "%u(%u)\r", done, running );
682       if ( GlobalError || !running )
683          break;
684    }
685 }
686 #endif
687 
688 #ifdef OS2_THREADS
689 /*
690  * init_threads initializes the usage of our thread management system.
691  * Returns 1 on success, 0 otherwise.
692  */
693 HMUX hmux;
694 SEMRECORD thread_sems[MAX_THREADS];
695 
init_threads(void)696 int init_threads( void )
697 {
698    int i;
699    LONG rc;
700 
701    for ( i = 0; i < MAX_THREADS; i++ )
702    {
703       thread_sems[i].ulUser = i;
704       rc = DosCreateEventSem( NULL,
705                               (PHEV) &thread_sems[i].hsemCur,
706                               0,
707                               0 );
708       if ( rc != 0 )
709       {
710          fprintf( stderr, "\n"
711                           "Error creating an EventSem, error code is %lu\n",
712                           rc );
713          return 0;
714       }
715    }
716 
717    rc = DosCreateMuxWaitSem( NULL,
718                              &hmux,
719                              MAX_THREADS,
720                              thread_sems,
721                              DCMW_WAIT_ANY);
722    if ( rc != 0 )
723    {
724       fprintf( stderr, "\n"
725                        "Error creating a MuxWaitSem, error code is %lu\n",
726                        rc );
727       return 0;
728    }
729    return 1;
730 }
731 
732 /*
733  * start_a_thread starts a thread and sets some state informations which are
734  * set back in case of an error.
735  * The return code is 1 on success, 0 otherwise.
736  */
start_a_thread(unsigned position)737 int start_a_thread( unsigned position )
738 {
739    ULONG rc, post;
740 
741    rc = DosResetEventSem( (HEV) thread_sems[position].hsemCur, &post );
742    if ( ( rc != 0 ) && ( rc != ERROR_ALREADY_RESET ) )
743    {
744       fprintf( stderr, "\n"
745                        "Error resetting an EventSem, error code is %lu\n",
746                        rc );
747       GlobalError = 1;
748       return 0;
749    }
750    State[position] = Running;
751    thread[position] = _beginthread( (program_name) ? external : instore,
752                                     NULL,
753                                     0x8000,
754                                     ( void * ) position );
755    if ( thread[position] == -1 )
756    {
757       fprintf( stderr, "\n"
758                        "Error starting thread, error code is %d\n",
759                        errno );
760       GlobalError = 1;
761       State[position] = Stopped;
762       DosPostEventSem( (HEV) thread_sems[position].hsemCur );
763       return 0;
764    }
765    return 1;
766 }
767 
768 /*
769  * Thread has stopped sets the global state information of the thread with the
770  * index "position" to "Stopped".
771  */
ThreadHasStopped(unsigned position)772 static void ThreadHasStopped( unsigned position )
773 {
774    ULONG rc;
775 
776    State[position] = Stopped;
777    if ( ( rc = DosPostEventSem( (HEV) thread_sems[position].hsemCur ) ) != 0 )
778    {
779       fprintf( stderr, "\n"
780                        "Error posting an EventSem, error code is %lu\n",
781                        rc );
782       GlobalError = 1;
783    }
784 }
785 
786 /*
787  * wait_for_threads restarts new threads until the requested count of
788  * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
789  *
790  * We expect to have MAX_THREADS already running.
791  */
wait_for_threads(void)792 void wait_for_threads( void )
793 {
794    unsigned done,running;
795    ULONG rc, post, user;
796 
797    running = done = MAX_THREADS;
798 
799    if ( stdout_is_tty )
800       printf( "%u\r", MAX_THREADS );
801    if ( GlobalError )
802       return;
803 
804    for ( ; ; )
805    {
806       rc = DosWaitMuxWaitSem( hmux, timeout_seconds*1000, &user );
807       if ( rc != 0 )
808       {
809          fprintf( stderr, "\n"
810                           "At least one thread won't finish normally within 3 seconds (error=%lu).\n",
811                           rc );
812          GlobalError = 1;
813       }
814       if ( user >= MAX_THREADS )
815       {
816          fprintf( stderr, "\n"
817                           "Strange behaviour after wating for MuxWaitSem, released thread index is %lu.\n",
818                           user );
819          GlobalError = 1;
820       }
821       if ( GlobalError )
822          break;
823 
824       /*
825        * A thread has died. Check the reason.
826        */
827       if ( State[user] != Stopped )
828       {
829          fprintf( stderr, "\n"
830                           "Thread %lu hasn't finished normally.\n", user );
831          GlobalError = 1;
832          break;
833       }
834 
835       /*
836        * Check values and restart a new instance if we still have to do so.
837        */
838       State[user] = Ready;
839       running--;
840       /*
841        * Only reap our threads if running the instore test code
842        */
843       if ( program_name == NULL )
844          reap( (int) user );
845 
846       if ( done < total_threads )
847       {
848          if ( !start_a_thread( (int) user ) )
849             break;
850          done++;
851          running++;
852       }
853       else
854       {
855          rc = DosResetEventSem( (HEV) thread_sems[user].hsemCur, &post );
856          if ( ( rc != 0 ) && ( rc != ERROR_ALREADY_RESET ) )
857          {
858             fprintf( stderr, "\n"
859                              "Error resetting an EventSem, error code is %lu\n",
860                              rc );
861             GlobalError = 1;
862             break;
863          }
864       }
865       if ( stdout_is_tty )
866          printf( "%u(%u)\r", done, running );
867       if ( GlobalError || !running )
868          break;
869    }
870 }
871 #endif
872 
873 #ifdef POSIX_THREADS
874 /*
875  * The number of processed runs needs to be global for error analysis.
876  */
877 static unsigned done = 0;
878 
879 #if USE_SEMAPHORES
880 static sem_t something_stopped;
881 
882 /*
883  * timer_alarm will end the program if nothing happens within 3 seconds.
884  */
timer_alarm(int sig)885 void timer_alarm( int sig )
886 {
887    static unsigned lastvalue = ( unsigned ) -1;
888    static int call_count = 0;
889 
890    if ( ( lastvalue != done ) || ( (int) done == total_threads ) )
891    {
892       lastvalue = done;
893       signal( SIGALRM, timer_alarm );
894       return;
895    }
896 
897    GlobalError = 1;
898    signal( SIGALRM, timer_alarm );
899    if ( call_count++ == 0 )
900    {
901       fprintf( stderr, "\n"
902                        "At least one thread won't finish within 3 seconds.\n" );
903       sem_post( &something_stopped );
904    }
905 
906    if ( call_count > 2 )
907       exit( 1 );
908 }
909 #else
910 static pthread_mutex_t thread_lock = PTHREAD_MUTEX_INITIALIZER;
911 static pthread_cond_t something_stopped = PTHREAD_COND_INITIALIZER;
912 #endif
913 
914 /*
915  * init_threads initializes the usage of our thread management system.
916  * Returns 1 on success, 0 otherwise.
917  */
init_threads(void)918 int init_threads( void )
919 {
920 #if USE_SEMAPHORES
921    if ( sem_init( &something_stopped, 0, 0 ) != 0 )
922       return 0;
923 #endif
924    return 1;
925 }
926 
927 /*
928  * start_a_thread starts a thread and sets some state informations which are
929  * set back in case of an error.
930  * The return code is 1 on success, 0 otherwise.
931  */
start_a_thread(unsigned position)932 int start_a_thread( unsigned position )
933 {
934    int rc;
935 
936    State[position] = Running;
937    rc = pthread_create( &thread[position], NULL, (program_name) ? external : instore, (void *)(uintptr_t) position );
938    if ( rc )
939    {
940       fprintf( stderr, "\n"
941                        "Error starting thread, error code is %d\n",
942                        rc );
943       GlobalError = 1;
944       ThreadHasStopped( position );
945    }
946    return(!rc);
947 }
948 
949 
950 /*
951  * Thread has stopped sets the global state information of the thread with the
952  * index "position" to "Stopped".
953  * The master semaphore is set to signal the main thread about the thread's
954  * death.
955  */
ThreadHasStopped(unsigned position)956 static void ThreadHasStopped( unsigned position )
957 {
958 #if USE_SEMAPHORES
959    State[position] = Stopped;
960    sem_post( &something_stopped );
961 #else
962    /*
963     * The use of the mutex lock semaphores is forced by Posix.
964     */
965    pthread_mutex_lock( &thread_lock );
966    State[position] = Stopped;
967    pthread_cond_signal( &something_stopped );
968    pthread_mutex_unlock( &thread_lock );
969 #endif
970 }
971 
972 /*
973  * wait_for_threads restarts new threads until the requested count of
974  * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
975  *
976  * We expect to have MAX_THREADS already running.
977  *
978  * Note: You will get a better performance if you set the schedule policy to
979  *       RR or FIFO, but you have to be root to do this. If USE_SEMAPHORES is
980  *       active, changing the policy won't increase the performance.
981  */
wait_for_threads(void)982 void wait_for_threads( void )
983 {
984    unsigned i,running;
985    int rc;
986 #if !USE_SEMAPHORES
987    struct timeval now;
988    struct timespec timeout;
989 #else
990    struct itimerval ival;
991 #endif
992 
993    running = done = MAX_THREADS;
994    if ( stdout_is_tty )
995       printf( "%u\r", MAX_THREADS );
996 
997    if ( GlobalError )
998       return;
999 
1000 #if !USE_SEMAPHORES
1001    /*
1002     * The lock is needed by pthread_cond_timewait.
1003     * The multithreading paradigma here is very unefficient.
1004     */
1005    pthread_mutex_lock( &thread_lock );
1006 #else
1007    signal( SIGALRM, timer_alarm );
1008    ival.it_value.tv_sec = timeout_seconds;
1009    ival.it_value.tv_usec = 0;
1010    ival.it_interval = ival.it_value;
1011    setitimer( ITIMER_REAL, &ival, NULL );
1012 #endif
1013 
1014    for ( ; ; )
1015    {
1016 #if !USE_SEMAPHORES
1017       /*
1018        * Sleep a maximum of 3 seconds.
1019        */
1020       gettimeofday(&now,NULL);
1021       timeout.tv_sec = now.tv_sec + timeout_seconds;
1022       timeout.tv_nsec = now.tv_usec * 1000;
1023       /*
1024        * The following call will wait up to timeout time for a signalled
1025        * something_stopped condition semaphore. thread_lock is atomically
1026        * unlocked on function entry and locked on function exit.
1027        */
1028       rc = pthread_cond_timedwait( &something_stopped, &thread_lock, &timeout );
1029       if ( rc == ETIMEDOUT )
1030       {
1031          fprintf( stderr, "\n"
1032                           "At least one thread won't finish within 3 seconds.\n" );
1033          GlobalError = 1;
1034       }
1035 #else
1036       if ( sem_wait( &something_stopped ) != 0 )
1037       {
1038          fprintf( stderr, "\n"
1039                           "Interrupted wait.\n" );
1040          GlobalError = 1;
1041       }
1042 #endif
1043       if ( GlobalError )
1044          break;
1045 
1046       /*
1047        * Restart the threads is appropriate.
1048        */
1049       for ( i = 0; i < MAX_THREADS; i++ )
1050       {
1051          if ( State[i] != Stopped )
1052             continue;
1053          State[i] = Ready;
1054          running--;
1055          rc = pthread_join( thread[i], NULL );
1056          if ( rc != 0 )
1057          {
1058             fprintf( stderr, "\n"
1059                             "A thread can't be found in the internal table.\n" );
1060             GlobalError = 1;
1061             break;
1062          }
1063 
1064          /*
1065           * Has the thread done its work completely?
1066           */
1067          /*
1068           * Only reap our threads if running the instore test code
1069           */
1070          if ( program_name == NULL )
1071             reap(i);
1072 
1073          /*
1074           * Restart a new thread if we need some more runs.
1075           */
1076          if ( (int) done < total_threads )
1077          {
1078             if ( !start_a_thread( i ) )
1079                break;
1080             done++;
1081             running++;
1082          }
1083       }
1084       if ( stdout_is_tty )
1085          printf( "%u(%u)\r", done, running );
1086       if ( GlobalError || !running )
1087          break;
1088    }
1089 #if !USE_SEMAPHORES
1090    pthread_mutex_unlock( &thread_lock );
1091 #endif
1092 }
1093 #endif
1094 
1095 /*
1096  * Usage shows this program's usage and stops the program.
1097  */
usage(void)1098 static void usage( void )
1099 {
1100    printf( "usage: threader [-p] [-q] [-t total_threads] [-s timeout_seconds] [filename]\n"
1101            "\n"
1102            "Options:\n"
1103            "-p\tLoad the macro only once and then use the generated instore\n"
1104            "\tmacro. Default: Always load the macro new.\n"
1105            "\n"
1106            "-q\tRun quietly. Don't display running progress information.\n"
1107            "\n"
1108            "-t\ttotal_threads\tTotal number of threads to execute.\n"
1109            "\n"
1110            "-s\ttimeoutseconds\tNumber of seconds to timeout a thread. Default 3.\n"
1111            "\n"
1112            "filename\tThe Rexx program to execute rather than the instore test\n"
1113            "\tprogram.\n"
1114            "\nThe default instore macro generates lines with numbers which can\n"
1115            "be parsed to detect problems in the multi-threading implementation.\n"
1116            "A loop counter runs by default until %u. The test should run from a few\n"
1117            "seconds up to a few minutes. You should hit ^C to abort the program\n"
1118            "if you think your harddisk is working heavily.\n"
1119            "\n"
1120            "This program is for testing Regina's multithreading capabilities.\n"
1121            ,total_threads );
1122    exit( 1 );
1123 }
1124 
main(int argc,char * argv[])1125 int main( int argc, char *argv[] )
1126 {
1127    RXSTRING version;
1128    ULONG versioncode;
1129    int i;
1130    time_t start;
1131 
1132    /*
1133     * In case of a connected tty we let run a counter to show the user some
1134     * "success".
1135     */
1136    if ( isatty( fileno( stdout ) ) )
1137       stdout_is_tty = 1;
1138 
1139    for ( i = 1; i < argc; i++ )
1140    {
1141       if ( argv[i][0] != '-' )
1142          break;
1143 
1144       if ( strcmp( argv[i], "-p" ) == 0 )
1145       {
1146          UseInstore = FirstRun;
1147       }
1148       else if ( strcmp( argv[i], "-t" ) == 0 )
1149       {
1150          i++;
1151          total_threads = atoi( argv[i] );
1152       }
1153       else if ( strcmp( argv[i], "-s" ) == 0 )
1154       {
1155          i++;
1156          timeout_seconds = atoi( argv[i] );
1157       }
1158       else if ( strcmp( argv[i], "-q" ) == 0 )
1159       {
1160          stdout_is_tty = 0;
1161       }
1162       else if ( strcmp( argv[i], "--" ) == 0 )
1163       {
1164          i++;
1165          break;
1166       }
1167       else
1168       {
1169          usage();
1170       }
1171    }
1172 
1173    if ( argc > i )
1174    {
1175       program_name = argv[i];
1176       i++;
1177    }
1178    if ( argc > i )
1179       usage();
1180 
1181    /*
1182     * Initialize some tables and tune the IO system to show every output
1183     * at once.
1184     */
1185    memset( found, 0,sizeof( found ) );
1186    memset( State, 0,sizeof( State ) );
1187    setvbuf( stdout,NULL, _IONBF, 0 );
1188    setvbuf( stderr,NULL, _IONBF, 0 );
1189    printf( "Regina Rexx Thread Tester\n" );
1190    printf( "-------------------------\n" );
1191 
1192    version.strlength = 0;
1193    version.strptr = NULL;
1194    /*
1195     * This will not work if we check another Rexx. You can safely comment out
1196     * the following code up to the 'printf( "\n" );'
1197     */
1198 #ifdef REGINAVERSION
1199    versioncode = ReginaVersion( &version );
1200    printf( "Regina's version is %lu.%lu",
1201            versioncode >> 8,
1202            versioncode & 0xFF );
1203    if ( version.strptr )
1204    {
1205       printf( " (in complete \"%s\")", version.strptr );
1206       RexxFreeMemory( version.strptr );
1207    }
1208 #endif
1209    printf( "\n" );
1210 
1211    if ( UseInstore && program_name )
1212    {
1213       printf( "Ignoring the \"-p\" flag for external REXX scripts\n" );
1214       UseInstore = UnUsed;
1215    }
1216    /*
1217     * In case of a processing with compiled macros we need something
1218     * compiled ;-) We let run one instance and let instore() save the compiled
1219     * macros for later use.
1220     */
1221    if ( UseInstore )
1222    {
1223       State[0] = Running;
1224       thread[0] = my_threadid();
1225       threadx[0] = my_threadidx();
1226       if ( program_name )
1227          external( NULL );
1228       else
1229          instore( NULL );
1230       State[0] = Stopped;
1231       found[0] = 0;
1232       UseInstore = DoInstore;
1233    }
1234 
1235    if ( stdout_is_tty )
1236    {
1237       printf( "\n"
1238               "You should see a loop counter which stops at %u.\n\n",
1239               total_threads );
1240    }
1241 
1242    /*
1243     * Start some threads and check for errors. Then restart threads up to the
1244     * maximum count.
1245     */
1246    if ( !init_threads() )
1247    {
1248       fprintf( stderr, "\n"
1249                        "Failure initializing the thread management system.\n\n" );
1250       return 1;
1251    }
1252 
1253    start = time( NULL );
1254    for ( i = 0; i < MAX_THREADS; i++ )
1255    {
1256       if ( !start_a_thread( i ) || GlobalError )
1257          break;
1258    }
1259    if ( !GlobalError )
1260       wait_for_threads();
1261 
1262    if ( GlobalError )
1263    {
1264       fprintf( stderr, "\n"
1265                        "An error encountered. Do you use the right shared "
1266                                                            "libs or DLLs?\n" );
1267       return 1;
1268    }
1269 
1270    printf( "\n"
1271            "Thread tester passed without an error.\n"
1272            "About %u seconds used for %u cyles, each creating a thread.\n",
1273            (unsigned) ( time( NULL ) - start ), total_threads );
1274 
1275    if ( !stdout_is_tty || !isatty( fileno( stdout ) ) )
1276       return 0;
1277 
1278    printf( "Press ENTER to continue and end the program. You may have a look\n"
1279           " at your preferred memory analyser like ps, pstat or tasklist...\n" );
1280    {
1281       char buf[128];
1282       fgets( buf, sizeof( buf ), stdin );
1283    }
1284 
1285    return 0;
1286 }
1287