1 /*
2 * The Regina Rexx Interpreter
3 * Copyright (C) 2001-2004 Florian Gro�e-Coosmann
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the Free
17 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 *
19 ******************************************************************************
20 *
21 * Asynchroneous thread multiplexer with a parallel use of the REXXSAA API.
22 *
23 * This example works with Win32 as with OS/2 or Posix threads.
24 */
25
26 #pragma clang diagnostic ignored "-Wint-to-void-pointer-cast"
27
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <limits.h>
32 #include <errno.h>
33 #include <time.h>
34
35 #ifdef POSIX_THREADS
36 # include <sys/time.h>
37 # include <stdint.h>
38 # include <unistd.h>
39 # include <pthread.h>
40 # define USE_SEMAPHORES 0
41 # if defined(_POSIX_SEMAPHORES) && !(defined(__APPLE__) && defined(__MACH__))
42 /*
43 * threader uses unnamed posix semaphores. OSX does not support unnamed
44 * semaphores; use mutexes for OSX
45 */
46 # include <semaphore.h>
47 # include <signal.h>
48 # undef USE_SEMAPHORES
49 # define USE_SEMAPHORES 1
50 # endif
51 #endif
52
53 #ifdef OS2_THREADS
54 # include <io.h>
55 # include <stddef.h>
56 # include <process.h>
57 # define INCL_DOSSEMAPHORES
58 # define INCL_ERRORS
59 # include <os2.h>
60 # define CHAR_TYPEDEFED
61 # define SHORT_TYPEDEFED
62 # define LONG_TYPEDEFED
63 # ifndef _OS2EMX_H
64 # define _OS2EMX_H /* prevents PFN from defining (Watcom) */
65 # endif
66 #endif
67
68 #ifdef _MSC_VER
69 /* This picky compiler claims about unused formal parameters.
70 * This is correct but hides (for human eyes) other errors since they
71 * are many and we can't reduce them all.
72 * Error 4100 is "unused formal parameter".
73 */
74 # pragma warning(disable:4100 4115 4201 4214 4514)
75 #endif
76
77 #ifdef WIN32_THREADS
78 # define WIN32_LEAN_AND_MEAN
79 # include <process.h>
80 # include <windows.h>
81 # include <io.h>
82 #endif
83
84 #ifdef _MSC_VER
85 # pragma warning(default:4100 4115 4201 4214)
86 #endif
87
88 #define INCL_RXSHV
89 #define INCL_RXFUNC
90 #define INCL_RXSYSEXIT
91 #define INCL_RXSUBCOM
92
93 #ifdef USE_OREXX
94 # include "rexx.h"
95 #else
96 # include "rexxsaa.h"
97 #endif
98
99 /*
100 * MAX_THREADS is the number of parallel starting threads. 20 is a good
101 * maximum.
102 */
103 #define MAX_THREADS 20
104
105 /*
106 * MAX_RUN defines the number of loops each thread has to perform.
107 * Don't modify.
108 */
109 #define MAX_RUN ( sizeof( unsigned ) * CHAR_BIT )
110
111 /*
112 * TOTAL_THREADS is the number of threads which shall be created. 2000 should
113 * be sufficient to detect memory leaks, etc.
114 * Can be overwritten with -t command line switch
115 */
116 #define TOTAL_THREADS 1500
117 int total_threads = TOTAL_THREADS;
118
119 /*
120 * timeout_seconds is the number of seconds a thread is allowed to live before
121 * stopping the program. Overwrite with -s command line switch
122 */
123 int timeout_seconds = 3;
124
125 #ifdef POSIX_THREADS
126 /*
127 * See below at WIN32_THREADS for a description.
128 */
129 #define ThreadIndexType pthread_t
130 #define my_threadidx() pthread_self()
131 #define my_threadid() pthread_self()
132 #define THREAD_RETURN void *
133 #define THREAD_CONVENTION
134 static pthread_t thread[MAX_THREADS];
135 #endif
136
137 #ifdef OS2_THREADS
138 /*
139 * See below at WIN32_THREADS for a description.
140 */
141 #define ThreadIndexType int
142 #define my_threadidx() *_threadid
143 #define my_threadid() *_threadid
144 #define THREAD_RETURN void
145 #define THREAD_RETURN_VOID 1
146 #define THREAD_CONVENTION
147 static int thread[MAX_THREADS];
148 #endif
149
150 #ifdef WIN32_THREADS
151 /*
152 * ThreadIndexType is the type of my_threadidx which shall be the thread
153 * identifier.
154 */
155 #define ThreadIndexType DWORD
156
157 /*
158 * my_threadidx() has to return the thread identifier of the current thread.
159 */
160 #define my_threadidx() GetCurrentThreadId()
161
162 /*
163 * my_threadid() has to return the thread's handle of the current thread if
164 * such a thing exists.
165 */
166 #define my_threadid() GetCurrentThread()
167
168 /*
169 * THREAD_RETURN defines the return value type of the thread creation function.
170 */
171 #define THREAD_RETURN unsigned
172
173 /*
174 * THREAD_CONVENTION defines the calling convention of the thread creation
175 * function. It may be the empty string for cdecl.
176 */
177 #define THREAD_CONVENTION __stdcall
178
179 /*
180 * thread will hold the handle of each thread.
181 */
182 static HANDLE thread[MAX_THREADS];
183 #endif
184
185
186 /*
187 * threadx will hold the identifier of each thread (in opposite to "thread").
188 */
189 static ThreadIndexType threadx[MAX_THREADS];
190
191 /*
192 * State manages the state of each possible parallel thread in the arrays
193 * "thread" and "threadx".
194 */
195 static enum {
196 Ready = 0, /* The thread's slot may be used */
197 Running, /* Thread has been started */
198 Stopped /* This value is used by a thread to signal its death. */
199 } State[MAX_THREADS];
200
201 /*
202 * found contains the number of properly detected lines by the thread.
203 * Lines matching a pattern are expected. The values are used bitwise.
204 */
205 static unsigned found[MAX_THREADS];
206
207 /*
208 * GlobalError contains the global error code (and return value).
209 * This variables type shall be threadsafe.
210 */
211 static int GlobalError = 0;
212
213 /*
214 * stdout_is_tty is a flag which is set when we may drop additional garbage
215 * to the output.
216 */
217 static int stdout_is_tty = 0;
218
219 /*
220 * UseInstore manages the state of the instore-processing. Instore macros
221 * are precompiled macros and shall run faster than normal ones.
222 */
223 static enum {
224 UnUsed = 0, /* Don't use instore macros at all. */
225 FirstRun, /* This is the first run when compiling the macros */
226 DoInstore /* We have to use the instore macro */
227 } UseInstore = UnUsed;
228
229 /*
230 * InstoreBuf will hold the compiled macro.
231 */
232 static void *InstoreBuf = NULL;
233
234 /*
235 * InstoreLen will hold the compiled macro's length if InstoreBuf is non-NULL.
236 */
237 static unsigned InstoreLen;
238
239 static void ThreadHasStopped(unsigned position);
240
241 static char *program_name = NULL;
242
243 /*
244 * We redirect Rexx' output. This is the redirection handler.
245 * We expect the string "Loop <x> in thread <y>" where x is a running
246 * counter and y is the thread identifier returned by Regina.
247 */
instore_exit(LONG ExNum,LONG Subfun,PEXIT PBlock)248 LONG APIENTRY instore_exit( LONG ExNum, LONG Subfun, PEXIT PBlock )
249 {
250 RXSIOSAY_PARM *psiosay;
251 char buf[256]; /* enough to hold the data */
252 RXSTRING rx;
253 unsigned len,loop;
254 unsigned long tid;
255 int rc,idx;
256
257 if ( ( ExNum != RXSIO ) || ( Subfun != RXSIOSAY ) ) /* unexpected? */
258 return RXEXIT_NOT_HANDLED;
259
260 if ( GlobalError )
261 return RXEXIT_HANDLED;
262
263 /*
264 * We have to check the data after fetching it from the parameter block.
265 */
266 psiosay = ( RXSIOSAY_PARM * ) PBlock;
267 rx = psiosay->rxsio_string;
268 if ( !RXVALIDSTRING( rx ) )
269 {
270 fprintf( stderr, "\n"
271 "Thread %lu gives an invalid string for a SAY output.\n",
272 ( unsigned long ) my_threadidx() );
273 GlobalError = 1;
274 return RXEXIT_HANDLED;
275 }
276
277 len = RXSTRLEN( rx );
278 if ( len >= sizeof(buf) )
279 len = sizeof(buf) - 1; /* This shall NOT happen, but it's irrelevant */
280 memcpy( buf, RXSTRPTR( rx ), len );
281 buf[len] = '\0'; /* We have a sscanf-able string */
282 rc = sscanf( buf, "Loop %u in thread %lu", &loop, &tid );
283 if ( rc != 2 )
284 {
285 fprintf( stderr, "\n"
286 "Thread %lu gives an unexpected SAY output \"%s\".\n",
287 ( unsigned long ) my_threadidx(), buf );
288 GlobalError = 1;
289 return RXEXIT_HANDLED;
290 }
291
292 /*
293 * Is the wrong thread addressed? This may be a common error.
294 */
295 if ( ( ThreadIndexType ) tid != my_threadidx() )
296 {
297 fprintf( stderr, "\n"
298 "Thread %lu claims an incorrect thread identifier %lu.\n",
299 ( unsigned long ) my_threadidx(),
300 ( unsigned long ) tid );
301 GlobalError = 1;
302 return RXEXIT_HANDLED;
303 }
304
305 /*
306 * Is this a known thread?
307 */
308 for ( idx = 0; idx < MAX_THREADS; idx++ )
309 {
310 if ( threadx[idx] == ( ThreadIndexType ) tid )
311 break;
312 }
313
314 if ( idx >= MAX_THREADS )
315 {
316 fprintf( stderr, "\n"
317 "Thread %lu can't be found in the thread table.\n",
318 ( unsigned long ) my_threadidx() );
319 GlobalError = 1;
320 return RXEXIT_HANDLED;
321 }
322
323 /*
324 * Check the loop number. We may have lost or duplicated data.
325 */
326 rc = 0; /* OK */
327 if ( ( loop < 1 ) || ( loop > MAX_RUN ) )
328 rc = 1;
329 else
330 {
331 loop = 1 << ( loop - 1 ); /* Bitmask for the loop */
332 if ( found[idx] & loop )
333 rc = 1; /* already found */
334 found[idx] |= loop;
335 }
336
337 if ( rc )
338 {
339 fprintf( stderr, "\n"
340 "Thread %lu's loop doesn't run continuously.\n",
341 ( unsigned long ) my_threadidx() );
342 GlobalError = 1;
343 }
344
345 return RXEXIT_HANDLED;
346 }
347
348 /*
349 * instore is a separate thread and invokes a Rexx script.
350 * It runs a loop (inside Rexx) and checks for errors.
351 * The return value is 0 if a return value is used at all.
352 * The argument data is the index of the thread within threadx.
353 */
instore(void * data)354 THREAD_RETURN THREAD_CONVENTION instore( void *data )
355 {
356 RXSTRING Instore[2];
357 RXSYSEXIT Exits[2];
358 char instore_buf[256];
359 int rc;
360
361 /*
362 * signal that we are alive.
363 */
364 threadx[(uintmax_t) data] = my_threadidx();
365
366 /*
367 * Register an exit handler which shall check Regina's output of the thread.
368 */
369 RexxRegisterExitExe( "ExitHandler", /* name of the handler */
370 #ifdef RX_WEAKTYPING
371 (PFN) instore_exit, /* entry point of the handler */
372 #else
373 instore_exit, /* entry point of the handler */
374 #endif
375 NULL ); /* user area of the handler */
376
377 /*
378 * Build up the structure which informs Regina to use the exit handler.
379 */
380 Exits[0].sysexit_name = "ExitHandler";
381 Exits[0].sysexit_code = RXSIO;
382 Exits[1].sysexit_code = RXENDLST;
383
384 sprintf( instore_buf, "Numeric Digits 30;"
385 "Do i = 1 To %u;"
386 "say 'Loop' i 'in thread' gettid();"
387 "End;"
388 "Return 0",
389 (unsigned) MAX_RUN );
390 Instore[0].strptr = instore_buf;
391 Instore[0].strlength = strlen( Instore[0].strptr );
392 if ( UseInstore == DoInstore )
393 {
394 /*
395 * We don't need a script any longer. Instead we pass a compiled script
396 * for the interpreter. You may or may not pass the original script,
397 * it is ignored.
398 */
399 Instore[1].strptr = (char *)InstoreBuf;
400 Instore[1].strlength = InstoreLen;
401 }
402 else
403 {
404 Instore[1].strptr = NULL;
405 }
406 rc = RexxStart( 0, /* ArgCount */
407 NULL, /* ArgList */
408 "Testing", /* ProgramName */
409 Instore, /* Instore (source/compiled) */
410 "Foo", /* EnvironmentName */
411 RXCOMMAND, /* CallType */
412 Exits, /* ExitHandlerList */
413 NULL, /* ReturnCode (ignored) */
414 NULL ); /* ReturnValue (ignored) */
415 switch ( UseInstore )
416 {
417 case UnUsed:
418 /*
419 * We will get an instore macro even if not desired. Delete it.
420 */
421 if ( Instore[1].strptr )
422 RexxFreeMemory( Instore[1].strptr );
423 else
424 {
425 GlobalError = 1;
426 fprintf( stderr, "\n"
427 "Didn't got the instore macro.\n" );
428 }
429 break;
430
431 case FirstRun:
432 /*
433 * On the first run save the instore macro for later use.
434 */
435 if ( Instore[1].strptr )
436 {
437 InstoreBuf = Instore[1].strptr;
438 InstoreLen = Instore[1].strlength;
439 }
440 else
441 {
442 GlobalError = 1;
443 fprintf( stderr, "\n"
444 "Didn't got the instore macro.\n" );
445 }
446 break;
447
448 default:
449 /*
450 * I don't know if the standard allows a success and a return value
451 * of NULL in Instore[1]. Ignore it. It will be detected later.
452 * True application should check the return code of RexxStart().
453 */
454 break;
455 }
456 RexxDeregisterExit( "ExitHandler", /* name of the handler */
457 NULL ); /* module name, NULL=executable */
458
459 /*
460 * Finally inform the invoker that we have stopped gracefully.
461 */
462 ThreadHasStopped( ( uintmax_t ) data );
463 #ifdef REGINAVERSION
464 ReginaCleanup();
465 #endif
466 #ifndef THREAD_RETURN_VOID
467 return ( THREAD_RETURN ) 0;
468 #endif
469 }
470
471 /*
472 * external is a separate thread and invokes a Rexx program from disk.
473 * It runs a loop (inside Rexx) and checks for errors.
474 * The return value is 0 if a return value is used at all.
475 * The argument data is the index of the thread within threadx.
476 * The Rexx program filename is in a global variable.
477 */
external(void * data)478 THREAD_RETURN THREAD_CONVENTION external( void *data )
479 {
480 int rc;
481
482 /*
483 * signal that we are alive.
484 */
485 threadx[(uintmax_t) data] = my_threadidx();
486
487 rc = RexxStart( 0, /* ArgCount */
488 NULL, /* ArgList */
489 program_name, /* ProgramName */
490 NULL, /* Instore (source/compiled) */
491 "Foo", /* EnvironmentName */
492 RXCOMMAND, /* CallType */
493 NULL, /* ExitHandlerList */
494 NULL, /* ReturnCode (ignored) */
495 NULL ); /* ReturnValue (ignored) */
496
497 /*
498 * Finally inform the invoker that we have stopped gracefully.
499 */
500 ThreadHasStopped( ( uintmax_t ) data );
501 #ifdef REGINAVERSION
502 ReginaCleanup();
503 #endif
504 #ifndef THREAD_RETURN_VOID
505 return ( THREAD_RETURN ) 0;
506 #endif
507 }
508
509 /******************************************************************************
510 ******************************************************************************
511 * thread management **********************************************************
512 ******************************************************************************
513 *****************************************************************************/
514
515 /*
516 * reap checks the thread's result buffer to see if it has seen all the lines
517 * the interpreter has emitted.
518 * The global error is set in case of an error.
519 * The result buffer is reset on success.
520 * Only called when using "instore" macro.
521 */
reap(unsigned position)522 void reap( unsigned position )
523 {
524 if (found[position] != ~((unsigned) 0))
525 {
526 fprintf(stderr,"\n"
527 "Thread %lu has stopped without completing its loop.\n",
528 (unsigned long) threadx[position]);
529 GlobalError = 1;
530 }
531 found[position] = 0;
532 }
533
534 #ifdef WIN32_THREADS
535 /*
536 * init_threads initializes the usage of our thread management system.
537 * Returns 1 on success, 0 otherwise.
538 */
init_threads(void)539 int init_threads( void )
540 {
541 return 1;
542 }
543
544 /*
545 * start_a_thread starts a thread and sets some state informations which are
546 * set back in case of an error.
547 * The return code is 1 on success, 0 otherwise.
548 */
start_a_thread(unsigned position)549 int start_a_thread( unsigned position )
550 {
551 int rc;
552 unsigned threadID;
553
554 State[position] = Running;
555 /* Avoid some race conditions. I don't know if this is a problem of the
556 * runtime system or the kernel. If the systems runs into severe swapping
557 * the threads seems to run before the thread id is known which is used
558 * in instore_exit. We suspend the thread until all details of the new
559 * thread are known before we continue. This gives a little bit worse
560 * performance.
561 */
562 thread[position] = ( HANDLE ) _beginthreadex( NULL,
563 0,
564 (program_name) ? external : instore,
565 ( void * ) position,
566 CREATE_SUSPENDED,
567 &threadID );
568 rc = ( long ) thread[position] != 0l;
569 if ( !rc )
570 {
571 fprintf( stderr, "\n"
572 "Error starting thread, error code is %ld\n",
573 GetLastError() );
574 GlobalError = 1;
575 ThreadHasStopped( position );
576 }
577 ResumeThread( thread[position] );
578 return rc;
579 }
580
581 /*
582 * Thread has stopped sets the global state information of the thread with the
583 * index "position" to "Stopped".
584 */
ThreadHasStopped(unsigned position)585 static void ThreadHasStopped( unsigned position )
586 {
587 State[position] = Stopped;
588 }
589
590 /*
591 * wait_for_threads restarts new threads until the requested count of
592 * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
593 *
594 * We expect to have MAX_THREADS already running.
595 */
wait_for_threads(void)596 void wait_for_threads( void )
597 {
598 unsigned i,j,done,running;
599 DWORD rc;
600 HANDLE compressed[MAX_THREADS];
601 unsigned BackSort[MAX_THREADS];
602
603 running = done = MAX_THREADS;
604
605 if ( stdout_is_tty )
606 printf( "%u\r", MAX_THREADS );
607 if ( GlobalError )
608 return;
609
610 for ( ; ; )
611 {
612 /*
613 * We have to pass an array of thread handles to the OS' waiter function.
614 * But some threads may not be running at the last steps. Therefore we
615 * have to resort the handles to be consecutive in a temporary array.
616 */
617 for ( i = 0, j = 0; i < MAX_THREADS; i++ )
618 {
619 if ( State[i] != Ready )
620 {
621 compressed[j] = thread[i];
622 BackSort[j] = i;
623 j++;
624 }
625 }
626 rc = WaitForMultipleObjects( running, compressed, FALSE, timeout_seconds*1000 );
627 if ( rc == 0xFFFFFFFF )
628 {
629 /*
630 * Failure or dead thread, look for a stopped one
631 */
632 for ( i = 0; i < running; i++ )
633 {
634 if ( State[BackSort[i]] == Stopped )
635 rc = WAIT_OBJECT_0 + i;
636 }
637 }
638 if ( ( rc < WAIT_OBJECT_0 ) || ( rc >= running + WAIT_OBJECT_0 ) )
639 {
640 fprintf( stderr, "\n"
641 "At least one thread won't finish normally within 3 seconds (rc=%u, error=%lu).\n",
642 rc, GetLastError() );
643 GlobalError = 1;
644 }
645 if ( GlobalError )
646 break;
647
648 /*
649 * A thread has died. Find it and check the reason.
650 */
651 i = BackSort[rc - WAIT_OBJECT_0];
652 if ( State[i] != Stopped )
653 {
654 fprintf( stderr, "\n"
655 "Thread %u hasn't finished normally.\n",i);
656 GlobalError = 1;
657 break;
658 }
659
660 /*
661 * Destroy associated buffers, check values and
662 * restart a new instance if we still have to do so.
663 */
664 CloseHandle( thread[i] );
665 State[i] = Ready;
666 running--;
667 /*
668 * Only reap our threads if running the instore test code
669 */
670 if ( program_name == NULL )
671 reap( i );
672
673 if ( done < total_threads )
674 {
675 if ( !start_a_thread( i ) )
676 break;
677 done++;
678 running++;
679 }
680 if ( stdout_is_tty )
681 printf( "%u(%u)\r", done, running );
682 if ( GlobalError || !running )
683 break;
684 }
685 }
686 #endif
687
688 #ifdef OS2_THREADS
689 /*
690 * init_threads initializes the usage of our thread management system.
691 * Returns 1 on success, 0 otherwise.
692 */
693 HMUX hmux;
694 SEMRECORD thread_sems[MAX_THREADS];
695
init_threads(void)696 int init_threads( void )
697 {
698 int i;
699 LONG rc;
700
701 for ( i = 0; i < MAX_THREADS; i++ )
702 {
703 thread_sems[i].ulUser = i;
704 rc = DosCreateEventSem( NULL,
705 (PHEV) &thread_sems[i].hsemCur,
706 0,
707 0 );
708 if ( rc != 0 )
709 {
710 fprintf( stderr, "\n"
711 "Error creating an EventSem, error code is %lu\n",
712 rc );
713 return 0;
714 }
715 }
716
717 rc = DosCreateMuxWaitSem( NULL,
718 &hmux,
719 MAX_THREADS,
720 thread_sems,
721 DCMW_WAIT_ANY);
722 if ( rc != 0 )
723 {
724 fprintf( stderr, "\n"
725 "Error creating a MuxWaitSem, error code is %lu\n",
726 rc );
727 return 0;
728 }
729 return 1;
730 }
731
732 /*
733 * start_a_thread starts a thread and sets some state informations which are
734 * set back in case of an error.
735 * The return code is 1 on success, 0 otherwise.
736 */
start_a_thread(unsigned position)737 int start_a_thread( unsigned position )
738 {
739 ULONG rc, post;
740
741 rc = DosResetEventSem( (HEV) thread_sems[position].hsemCur, &post );
742 if ( ( rc != 0 ) && ( rc != ERROR_ALREADY_RESET ) )
743 {
744 fprintf( stderr, "\n"
745 "Error resetting an EventSem, error code is %lu\n",
746 rc );
747 GlobalError = 1;
748 return 0;
749 }
750 State[position] = Running;
751 thread[position] = _beginthread( (program_name) ? external : instore,
752 NULL,
753 0x8000,
754 ( void * ) position );
755 if ( thread[position] == -1 )
756 {
757 fprintf( stderr, "\n"
758 "Error starting thread, error code is %d\n",
759 errno );
760 GlobalError = 1;
761 State[position] = Stopped;
762 DosPostEventSem( (HEV) thread_sems[position].hsemCur );
763 return 0;
764 }
765 return 1;
766 }
767
768 /*
769 * Thread has stopped sets the global state information of the thread with the
770 * index "position" to "Stopped".
771 */
ThreadHasStopped(unsigned position)772 static void ThreadHasStopped( unsigned position )
773 {
774 ULONG rc;
775
776 State[position] = Stopped;
777 if ( ( rc = DosPostEventSem( (HEV) thread_sems[position].hsemCur ) ) != 0 )
778 {
779 fprintf( stderr, "\n"
780 "Error posting an EventSem, error code is %lu\n",
781 rc );
782 GlobalError = 1;
783 }
784 }
785
786 /*
787 * wait_for_threads restarts new threads until the requested count of
788 * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
789 *
790 * We expect to have MAX_THREADS already running.
791 */
wait_for_threads(void)792 void wait_for_threads( void )
793 {
794 unsigned done,running;
795 ULONG rc, post, user;
796
797 running = done = MAX_THREADS;
798
799 if ( stdout_is_tty )
800 printf( "%u\r", MAX_THREADS );
801 if ( GlobalError )
802 return;
803
804 for ( ; ; )
805 {
806 rc = DosWaitMuxWaitSem( hmux, timeout_seconds*1000, &user );
807 if ( rc != 0 )
808 {
809 fprintf( stderr, "\n"
810 "At least one thread won't finish normally within 3 seconds (error=%lu).\n",
811 rc );
812 GlobalError = 1;
813 }
814 if ( user >= MAX_THREADS )
815 {
816 fprintf( stderr, "\n"
817 "Strange behaviour after wating for MuxWaitSem, released thread index is %lu.\n",
818 user );
819 GlobalError = 1;
820 }
821 if ( GlobalError )
822 break;
823
824 /*
825 * A thread has died. Check the reason.
826 */
827 if ( State[user] != Stopped )
828 {
829 fprintf( stderr, "\n"
830 "Thread %lu hasn't finished normally.\n", user );
831 GlobalError = 1;
832 break;
833 }
834
835 /*
836 * Check values and restart a new instance if we still have to do so.
837 */
838 State[user] = Ready;
839 running--;
840 /*
841 * Only reap our threads if running the instore test code
842 */
843 if ( program_name == NULL )
844 reap( (int) user );
845
846 if ( done < total_threads )
847 {
848 if ( !start_a_thread( (int) user ) )
849 break;
850 done++;
851 running++;
852 }
853 else
854 {
855 rc = DosResetEventSem( (HEV) thread_sems[user].hsemCur, &post );
856 if ( ( rc != 0 ) && ( rc != ERROR_ALREADY_RESET ) )
857 {
858 fprintf( stderr, "\n"
859 "Error resetting an EventSem, error code is %lu\n",
860 rc );
861 GlobalError = 1;
862 break;
863 }
864 }
865 if ( stdout_is_tty )
866 printf( "%u(%u)\r", done, running );
867 if ( GlobalError || !running )
868 break;
869 }
870 }
871 #endif
872
873 #ifdef POSIX_THREADS
874 /*
875 * The number of processed runs needs to be global for error analysis.
876 */
877 static unsigned done = 0;
878
879 #if USE_SEMAPHORES
880 static sem_t something_stopped;
881
882 /*
883 * timer_alarm will end the program if nothing happens within 3 seconds.
884 */
timer_alarm(int sig)885 void timer_alarm( int sig )
886 {
887 static unsigned lastvalue = ( unsigned ) -1;
888 static int call_count = 0;
889
890 if ( ( lastvalue != done ) || ( (int) done == total_threads ) )
891 {
892 lastvalue = done;
893 signal( SIGALRM, timer_alarm );
894 return;
895 }
896
897 GlobalError = 1;
898 signal( SIGALRM, timer_alarm );
899 if ( call_count++ == 0 )
900 {
901 fprintf( stderr, "\n"
902 "At least one thread won't finish within 3 seconds.\n" );
903 sem_post( &something_stopped );
904 }
905
906 if ( call_count > 2 )
907 exit( 1 );
908 }
909 #else
910 static pthread_mutex_t thread_lock = PTHREAD_MUTEX_INITIALIZER;
911 static pthread_cond_t something_stopped = PTHREAD_COND_INITIALIZER;
912 #endif
913
914 /*
915 * init_threads initializes the usage of our thread management system.
916 * Returns 1 on success, 0 otherwise.
917 */
init_threads(void)918 int init_threads( void )
919 {
920 #if USE_SEMAPHORES
921 if ( sem_init( &something_stopped, 0, 0 ) != 0 )
922 return 0;
923 #endif
924 return 1;
925 }
926
927 /*
928 * start_a_thread starts a thread and sets some state informations which are
929 * set back in case of an error.
930 * The return code is 1 on success, 0 otherwise.
931 */
start_a_thread(unsigned position)932 int start_a_thread( unsigned position )
933 {
934 int rc;
935
936 State[position] = Running;
937 rc = pthread_create( &thread[position], NULL, (program_name) ? external : instore, (void *)(uintptr_t) position );
938 if ( rc )
939 {
940 fprintf( stderr, "\n"
941 "Error starting thread, error code is %d\n",
942 rc );
943 GlobalError = 1;
944 ThreadHasStopped( position );
945 }
946 return(!rc);
947 }
948
949
950 /*
951 * Thread has stopped sets the global state information of the thread with the
952 * index "position" to "Stopped".
953 * The master semaphore is set to signal the main thread about the thread's
954 * death.
955 */
ThreadHasStopped(unsigned position)956 static void ThreadHasStopped( unsigned position )
957 {
958 #if USE_SEMAPHORES
959 State[position] = Stopped;
960 sem_post( &something_stopped );
961 #else
962 /*
963 * The use of the mutex lock semaphores is forced by Posix.
964 */
965 pthread_mutex_lock( &thread_lock );
966 State[position] = Stopped;
967 pthread_cond_signal( &something_stopped );
968 pthread_mutex_unlock( &thread_lock );
969 #endif
970 }
971
972 /*
973 * wait_for_threads restarts new threads until the requested count of
974 * TOTAL_THREADS has been reached. GlobalError is set if any error occurs.
975 *
976 * We expect to have MAX_THREADS already running.
977 *
978 * Note: You will get a better performance if you set the schedule policy to
979 * RR or FIFO, but you have to be root to do this. If USE_SEMAPHORES is
980 * active, changing the policy won't increase the performance.
981 */
wait_for_threads(void)982 void wait_for_threads( void )
983 {
984 unsigned i,running;
985 int rc;
986 #if !USE_SEMAPHORES
987 struct timeval now;
988 struct timespec timeout;
989 #else
990 struct itimerval ival;
991 #endif
992
993 running = done = MAX_THREADS;
994 if ( stdout_is_tty )
995 printf( "%u\r", MAX_THREADS );
996
997 if ( GlobalError )
998 return;
999
1000 #if !USE_SEMAPHORES
1001 /*
1002 * The lock is needed by pthread_cond_timewait.
1003 * The multithreading paradigma here is very unefficient.
1004 */
1005 pthread_mutex_lock( &thread_lock );
1006 #else
1007 signal( SIGALRM, timer_alarm );
1008 ival.it_value.tv_sec = timeout_seconds;
1009 ival.it_value.tv_usec = 0;
1010 ival.it_interval = ival.it_value;
1011 setitimer( ITIMER_REAL, &ival, NULL );
1012 #endif
1013
1014 for ( ; ; )
1015 {
1016 #if !USE_SEMAPHORES
1017 /*
1018 * Sleep a maximum of 3 seconds.
1019 */
1020 gettimeofday(&now,NULL);
1021 timeout.tv_sec = now.tv_sec + timeout_seconds;
1022 timeout.tv_nsec = now.tv_usec * 1000;
1023 /*
1024 * The following call will wait up to timeout time for a signalled
1025 * something_stopped condition semaphore. thread_lock is atomically
1026 * unlocked on function entry and locked on function exit.
1027 */
1028 rc = pthread_cond_timedwait( &something_stopped, &thread_lock, &timeout );
1029 if ( rc == ETIMEDOUT )
1030 {
1031 fprintf( stderr, "\n"
1032 "At least one thread won't finish within 3 seconds.\n" );
1033 GlobalError = 1;
1034 }
1035 #else
1036 if ( sem_wait( &something_stopped ) != 0 )
1037 {
1038 fprintf( stderr, "\n"
1039 "Interrupted wait.\n" );
1040 GlobalError = 1;
1041 }
1042 #endif
1043 if ( GlobalError )
1044 break;
1045
1046 /*
1047 * Restart the threads is appropriate.
1048 */
1049 for ( i = 0; i < MAX_THREADS; i++ )
1050 {
1051 if ( State[i] != Stopped )
1052 continue;
1053 State[i] = Ready;
1054 running--;
1055 rc = pthread_join( thread[i], NULL );
1056 if ( rc != 0 )
1057 {
1058 fprintf( stderr, "\n"
1059 "A thread can't be found in the internal table.\n" );
1060 GlobalError = 1;
1061 break;
1062 }
1063
1064 /*
1065 * Has the thread done its work completely?
1066 */
1067 /*
1068 * Only reap our threads if running the instore test code
1069 */
1070 if ( program_name == NULL )
1071 reap(i);
1072
1073 /*
1074 * Restart a new thread if we need some more runs.
1075 */
1076 if ( (int) done < total_threads )
1077 {
1078 if ( !start_a_thread( i ) )
1079 break;
1080 done++;
1081 running++;
1082 }
1083 }
1084 if ( stdout_is_tty )
1085 printf( "%u(%u)\r", done, running );
1086 if ( GlobalError || !running )
1087 break;
1088 }
1089 #if !USE_SEMAPHORES
1090 pthread_mutex_unlock( &thread_lock );
1091 #endif
1092 }
1093 #endif
1094
1095 /*
1096 * Usage shows this program's usage and stops the program.
1097 */
usage(void)1098 static void usage( void )
1099 {
1100 printf( "usage: threader [-p] [-q] [-t total_threads] [-s timeout_seconds] [filename]\n"
1101 "\n"
1102 "Options:\n"
1103 "-p\tLoad the macro only once and then use the generated instore\n"
1104 "\tmacro. Default: Always load the macro new.\n"
1105 "\n"
1106 "-q\tRun quietly. Don't display running progress information.\n"
1107 "\n"
1108 "-t\ttotal_threads\tTotal number of threads to execute.\n"
1109 "\n"
1110 "-s\ttimeoutseconds\tNumber of seconds to timeout a thread. Default 3.\n"
1111 "\n"
1112 "filename\tThe Rexx program to execute rather than the instore test\n"
1113 "\tprogram.\n"
1114 "\nThe default instore macro generates lines with numbers which can\n"
1115 "be parsed to detect problems in the multi-threading implementation.\n"
1116 "A loop counter runs by default until %u. The test should run from a few\n"
1117 "seconds up to a few minutes. You should hit ^C to abort the program\n"
1118 "if you think your harddisk is working heavily.\n"
1119 "\n"
1120 "This program is for testing Regina's multithreading capabilities.\n"
1121 ,total_threads );
1122 exit( 1 );
1123 }
1124
main(int argc,char * argv[])1125 int main( int argc, char *argv[] )
1126 {
1127 RXSTRING version;
1128 ULONG versioncode;
1129 int i;
1130 time_t start;
1131
1132 /*
1133 * In case of a connected tty we let run a counter to show the user some
1134 * "success".
1135 */
1136 if ( isatty( fileno( stdout ) ) )
1137 stdout_is_tty = 1;
1138
1139 for ( i = 1; i < argc; i++ )
1140 {
1141 if ( argv[i][0] != '-' )
1142 break;
1143
1144 if ( strcmp( argv[i], "-p" ) == 0 )
1145 {
1146 UseInstore = FirstRun;
1147 }
1148 else if ( strcmp( argv[i], "-t" ) == 0 )
1149 {
1150 i++;
1151 total_threads = atoi( argv[i] );
1152 }
1153 else if ( strcmp( argv[i], "-s" ) == 0 )
1154 {
1155 i++;
1156 timeout_seconds = atoi( argv[i] );
1157 }
1158 else if ( strcmp( argv[i], "-q" ) == 0 )
1159 {
1160 stdout_is_tty = 0;
1161 }
1162 else if ( strcmp( argv[i], "--" ) == 0 )
1163 {
1164 i++;
1165 break;
1166 }
1167 else
1168 {
1169 usage();
1170 }
1171 }
1172
1173 if ( argc > i )
1174 {
1175 program_name = argv[i];
1176 i++;
1177 }
1178 if ( argc > i )
1179 usage();
1180
1181 /*
1182 * Initialize some tables and tune the IO system to show every output
1183 * at once.
1184 */
1185 memset( found, 0,sizeof( found ) );
1186 memset( State, 0,sizeof( State ) );
1187 setvbuf( stdout,NULL, _IONBF, 0 );
1188 setvbuf( stderr,NULL, _IONBF, 0 );
1189 printf( "Regina Rexx Thread Tester\n" );
1190 printf( "-------------------------\n" );
1191
1192 version.strlength = 0;
1193 version.strptr = NULL;
1194 /*
1195 * This will not work if we check another Rexx. You can safely comment out
1196 * the following code up to the 'printf( "\n" );'
1197 */
1198 #ifdef REGINAVERSION
1199 versioncode = ReginaVersion( &version );
1200 printf( "Regina's version is %lu.%lu",
1201 versioncode >> 8,
1202 versioncode & 0xFF );
1203 if ( version.strptr )
1204 {
1205 printf( " (in complete \"%s\")", version.strptr );
1206 RexxFreeMemory( version.strptr );
1207 }
1208 #endif
1209 printf( "\n" );
1210
1211 if ( UseInstore && program_name )
1212 {
1213 printf( "Ignoring the \"-p\" flag for external REXX scripts\n" );
1214 UseInstore = UnUsed;
1215 }
1216 /*
1217 * In case of a processing with compiled macros we need something
1218 * compiled ;-) We let run one instance and let instore() save the compiled
1219 * macros for later use.
1220 */
1221 if ( UseInstore )
1222 {
1223 State[0] = Running;
1224 thread[0] = my_threadid();
1225 threadx[0] = my_threadidx();
1226 if ( program_name )
1227 external( NULL );
1228 else
1229 instore( NULL );
1230 State[0] = Stopped;
1231 found[0] = 0;
1232 UseInstore = DoInstore;
1233 }
1234
1235 if ( stdout_is_tty )
1236 {
1237 printf( "\n"
1238 "You should see a loop counter which stops at %u.\n\n",
1239 total_threads );
1240 }
1241
1242 /*
1243 * Start some threads and check for errors. Then restart threads up to the
1244 * maximum count.
1245 */
1246 if ( !init_threads() )
1247 {
1248 fprintf( stderr, "\n"
1249 "Failure initializing the thread management system.\n\n" );
1250 return 1;
1251 }
1252
1253 start = time( NULL );
1254 for ( i = 0; i < MAX_THREADS; i++ )
1255 {
1256 if ( !start_a_thread( i ) || GlobalError )
1257 break;
1258 }
1259 if ( !GlobalError )
1260 wait_for_threads();
1261
1262 if ( GlobalError )
1263 {
1264 fprintf( stderr, "\n"
1265 "An error encountered. Do you use the right shared "
1266 "libs or DLLs?\n" );
1267 return 1;
1268 }
1269
1270 printf( "\n"
1271 "Thread tester passed without an error.\n"
1272 "About %u seconds used for %u cyles, each creating a thread.\n",
1273 (unsigned) ( time( NULL ) - start ), total_threads );
1274
1275 if ( !stdout_is_tty || !isatty( fileno( stdout ) ) )
1276 return 0;
1277
1278 printf( "Press ENTER to continue and end the program. You may have a look\n"
1279 " at your preferred memory analyser like ps, pstat or tasklist...\n" );
1280 {
1281 char buf[128];
1282 fgets( buf, sizeof( buf ), stdin );
1283 }
1284
1285 return 0;
1286 }
1287