1 /*
2  *  The Regina Rexx Interpreter
3  *  Copyright (C) 1992-1994  Anders Christensen <anders@pvv.unit.no>
4  *
5  *  This library is free software; you can redistribute it and/or
6  *  modify it under the terms of the GNU Library General Public
7  *  License as published by the Free Software Foundation; either
8  *  version 2 of the License, or (at your option) any later version.
9  *
10  *  This library is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  *  Library General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Library General Public
16  *  License along with this library; if not, write to the Free
17  *  Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18  */
19 /*
20  * This process runs as a daemon (or NT service). It maintains multiple,
21  * named Rexx queues.
22  * All communication is done via TCP/IP sockets.
23  * This process waits on a known port; 5656 by default for connections
24  * from clients. A client is any process that respects the Interface
25  * defined below. The "normal" clients are regina and rxqueue.
26  * Details about each client is kept, like current queue name.
27  *
28  * Structure
29  * ---------
30  * startup
31  * - set signal handler for SIGTERM
32  * initialise socket interface
33  * - socket()
34  * - bind()
35  * - listen()
36  * loop until killed
37  * - setup read FDs
38  * - select()
39  * - if listen socket, add new client
40  * - otherwise read command
41  * cleanup
42  * - disconnect all clients
43  * - free up resources
44  *
45  * Interface.
46  * ---------
47  * Once a client connects, it sends commands
48  * Commands are single character, followed by optional 6 hex character
49  * length and optional data.
50  *   F - queue data onto client's current queue (FIFO)
51  *       in -> FFFFFFFxxx--data--xxx
52  *       out-> 0000000 (if successful)
53  *       out-> 2xxxxxx (if error, eg queue deleted)
54  *       out-> 3000000 (memory allocation error)
55  *       regina QUEUE, rxqueue /fifo, websocket /fifo
56  *   L - push data onto client's current queue (LIFO)
57  *       in->  LFFFFFFxxx--data--xxx
58  *       out-> 0000000 (if successful)
59  *       out-> 2xxxxxx (if error, eg queue deleted)
60  *       out-> 3000000 (memory allocation error)
61  *       regina PUSH, rxqueue /lifo, websocket /lifo
62  *   C - create queue
63  *       in->  CFFFFFFxxx--queue name--xxx (if length 0, create name)
64  *       out-> 0FFFFFFxxx--queue name--xxx (if queue name created)
65  *       out-> 1FFFFFFxxx--queue name--xxx (if queue name existed)
66  *       out-> 2xxxxxx (if error)
67  *       out-> 6000000 (queue name not passed)
68  *       regina RXQUEUE('C'), rxqueue N/A, websocket /create
69  *   D - delete queue
70  *       in->  DFFFFFFxxx--queue name--xxx
71  *       out-> 0000000 (if queue name deleted)
72  *       out-> 5xxxxxx (trying to delete 'SESSION' queue)
73  *       out-> 6000000 (queue name not passed)
74  *       out-> 9xxxxxx (if error, eg queue already deleted)
75  *       regina RXQUEUE('D'), rxqueue N/A, websocket /delete
76  *   E - empty data from specified queue
77  *       in->  EFFFFFFxxx--queue name--xxx
78  *       out-> 0000000 (if queue emptied)
79  *       out-> 2xxxxxx (if error, eg queue deleted)
80  *       out-> 3000000 (memory allocation error)
81  *       regina N/A, rxqueue /clear, websocket /clear
82  *   P - pop item off client's default queue
83  *       in->  P000000
84  *       out-> 0FFFFFFxxx--data--xxx (if queue name existed)
85  *       out-> 1000000 (if queue empty)
86  *       out-> 2xxxxxx (if queue name deleted - length ignored)
87  *       out-> 4xxxxxx (if timeout on queue exceeded - length ignored)
88  *       regina PULL, rxqueue /pull, websocket /pull
89  *   p - fetch item off client's default queue
90  *       in->  p000000
91  *       out-> 0FFFFFFxxx--data--xxx (if queue name existed)
92  *       out-> 1000000 (if queue empty)
93  *       out-> 2xxxxxx (if queue name deleted - length ignored)
94  *       regina PULL without timeout, rxqueue N/A, websocket N/A
95  *   S - set default queue name (allow false queues)
96  *       in->  SFFFFFFxxx--queue name--xxx
97  *       out-> 0000000 (if successful)
98  *       out-> 3000000 (memory allocation error)
99  *       out-> 6000000 (queue name not passed)
100  *       regina RXQUEUE('S'), rxqueue N/A, websocket N/A
101  *   G - get default queue name
102  *       in->  G000000
103  *       out-> 0FFFFFFxxx--queue name--xxx
104  *       regina RXQUEUE('G'), rxqueue N/A, websocket N/A
105  *   N - return number of lines on stack
106  *       in->  N000000
107  *       out-> 0FFFFFF (if queue exists)
108  *       out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
109  *       regina QUEUED(), rxqueue /queued, websocket /queued
110  *   Q - return names of stacks
111  *       in->  Q000000
112  *       out-> 0FFFFFFxxx--queue name--xxxDDxxx--queue name---xxxDD...
113  *          where FFFFFF is length of all names
114  *          and DD is xFF delimiting names of queues and ... is more queues
115  *       out-> 2xxxxxx (if error)
116  *       regina RXQUEUE(), rxqueue /queues, websocket /queues
117  *   T - set timeout on queue pull
118  *       in->  TFFFFFFTTTTTT
119  *       out-> 0000000 (if queue timeout set)
120  *       out-> 2xxxxxx (if error, eg invalid argument)
121  *       out-> 6000000 (queue name not passed)
122  *       regina RXQUEUE('T'), rxqueue N/A, websocket N/A
123  *   X - client disconnect
124  *       in->  X000000
125  *       out->
126  *   Z - client requests shutdown - should only be called by ourselves!!
127  *       in->  Z000000
128  *       out->
129  */
130 
131 #define NO_CTYPE_REPLACEMENT
132 #include "rexx.h"
133 
134 #if defined(WIN32) || defined(__LCC__)
135 # if defined(_MSC_VER)
136 #  if _MSC_VER >= 1100
137 /* Stupid MSC can't compile own headers without warning at least in VC 5.0 */
138 #   pragma warning(disable: 4115 4201 4214 4514)
139 #  endif
140 #  include <windows.h>
141 #  if _MSC_VER >= 1100
142 #   pragma warning(default: 4115 4201 4214)
143 #  endif
144 # elif defined(__LCC__)
145 #  include <windows.h>
146 #  include <winsvc.h>
147 #  include <winsock.h>
148 # else
149 #  include <windows.h>
150 # endif
151 # include <io.h>
152 #else
153 # ifdef HAVE_SYS_SOCKET_H
154 #  include <sys/socket.h>
155 # endif
156 # ifdef HAVE_NETINET_IN_H
157 #  include <netinet/in.h>
158 #  endif
159 # if defined(HAVE_POLL_H) && defined(HAVE_POLL)
160 #  include <poll.h>
161 # elif defined(HAVE_SYS_POLL_H) && defined(HAVE_POLL)
162 #  include <sys/poll.h>
163 # elif defined(HAVE_SYS_SELECT_H)
164 #  include <sys/select.h>
165 # endif
166 # ifdef HAVE_NETDB_H
167 #  include <netdb.h>
168 # endif
169 # ifdef HAVE_ARPA_INET_H
170 #  include <arpa/inet.h>
171 # endif
172 # define closesocket(x) close(x)
173 #endif
174 #include <string.h>
175 
176 #ifdef HAVE_UNISTD_H
177 #include <unistd.h>
178 #endif
179 
180 #ifdef HAVE_ERRNO_H
181 #include <errno.h>
182 #endif
183 
184 #ifdef HAVE_SIGNAL_H
185 #include <signal.h>
186 #endif
187 
188 #ifdef HAVE_CTYPE_H
189 #include <ctype.h>
190 #endif
191 
192 #ifdef HAVE_PROCESS_H
193 # include <process.h>
194 #endif
195 
196 #if defined(TIME_WITH_SYS_TIME)
197 # include <sys/time.h>
198 # include <time.h>
199 #else
200 # if defined(HAVE_SYS_TIME_H)
201 #  include <sys/time.h>
202 # else
203 #  include <time.h>
204 # endif
205 #endif
206 
207 #include <assert.h>
208 
209 #define HAVE_FORK
210 #if defined(__WATCOMC__) || defined(_MSC_VER) || (defined(__IBMC__) && defined(WIN32)) || defined(__SASC) || defined(__MINGW32__) || defined(__BORLANDC__) || defined(DOS) || defined(__LCC__)
211 # undef HAVE_FORK
212 #endif
213 #if defined(__WATCOMC__) && defined(__QNX__)
214 # define HAVE_FORK
215 #endif
216 
217 #include "extstack.h"
218 #include "mygetopt.h"
219 #include "contrib/LibSha1.h"
220 
221 #ifdef BUILD_NT_SERVICE
222 # include "service.h"
223 /*
224  * this event is signalled when the
225  * service should end
226  */
227 HANDLE  hServerStopEvent = NULL;
228 #endif
229 
230 #ifdef WIN32
231 # define os_errno ((int)WSAGetLastError())
232 # define errno_str(code) Win32ErrorString(code)
233 # undef EINTR
234 # define EINTR WSAEINTR
235 # undef ECONNRESET
236 # define ECONNRESET WSAECONNRESET
237 #else
238 # define os_errno errno
239 # define errno_str(code) strerror(code)
240 #endif
241 
242 #define WS_RESPONSE_HEADER "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %.*s\r\n\r\n"
243 #define WS_RESPONSE_MAGIC  "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
244 /*
245  * Translation Table for Base64 encoding as described in RFC1113
246  */
247 static const char cb64[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
248 #ifdef REQUIRE_BASE64_DECODE
249 static const char cd64[]="|$$$}rstuvwxyz{$$$$$$$>?@ABCDEFGHIJKLMNOPQRSTUVW$$$$$$XYZ[\\]^_`abcdefghijklmnopq";
250 #endif
251 /*
252  * debugging is turned off. You can turn it on by the command line option "-D".
253  */
254 static int debug = 0 ;
255 #define DEBUGDUMP(x) { if ( debug ) \
256                           {x;}      \
257                      }
258 /*
259  * Length of action string from websocket command
260  */
261 #define ACTION_FIFO_LEN 6
262 #define ACTION_LIFO_LEN 6
263 #define ACTION_QUEUED_LEN 8
264 #define ACTION_CLEAR_LEN 7
265 #define ACTION_DELETE_LEN 8
266 #define ACTION_CREATE_LEN 8
267 #define ACTION_PULL_LEN 6
268 
269 /*
270  * DEFAULT_WAKEUP is the time in ms after which the process shall wakeup.
271  * The time has a maximum of 49 days and shall be around one day. It may
272  * be set much shorter for debugging purpose.
273  */
274 #define DEFAULT_WAKEUP 86400000
275 
276 /*
277  * QUEUE_TIMEOUT is the time in ms after which an unused queue will be
278  * removed. A queue is unused if no client is connected to it.
279  * be set much shorter for debugging purpose.
280  * The time has a maximum of 49 days and may be set to one week. It may
281  * be set much shorter for debugging purpose.
282  */
283 #define QUEUE_TIMEOUT (86400000*7)
284 
285 /*
286  * RxTime defines a structure holding the time in milliseconds resolution.
287  * I know, most systems have at least one sort of high precision
288  * time structure, but the ugly "#ifdef" are unreadable if we use
289  * them all over here.
290  * Defining a structure on our own is much more helpful.
291  * Of course, we can't use a single 32 bit value for milliseconds. This
292  * will break the server after 49 days. unix will live a little bit longer
293  * and there shall Windows machines exist which doesn't have reboot since
294  * longer periods ;-)
295  */
296 typedef struct {
297    /* seconds are typical time_t values. */
298    time_t seconds ;
299 
300    /*
301     * milli's values are between 0 and 999. The special value -1 indicates
302     * a not-used condition.
303     */
304    int milli ;
305 } RxTime ;
306 
307 /*
308  * now holds the current time. It isn't updated after every operation and
309  * may be out of date by some milliseconds some times.
310  */
311 RxTime now ;
312 
313 /*
314  * This value is now plus 7 days.
315  */
316 RxTime queue_deadline ;
317 
318 struct _Client ;
319 typedef struct _RxQueue {
320    /*
321     * linked list maintainance elements
322     */
323    struct _RxQueue *prev, *next ;
324    /* name is the uppercased name of the queue.
325     */
326    streng *name ;
327    /*
328     * Indicates if the queue is a "real" queue
329     * or a false queue as a result of a rxqueue('set', 'qname')
330     * on a queue that doesn't exist. This is crap behaviour!
331     * but that's how Object Rexx works :-(
332     */
333    int isReal ;
334    /*
335     * Content: single buffered stack in opposite to the multi buffered
336     * internal stacks of Regina.
337     */
338    Buffer buf ;
339    /*
340     * deadline is the time the queue was last used plus a timeout.
341     * The queue is removed if the queue isn't used for one week.
342     * Thus, the value is the time the queue was used last plus one week.
343     */
344    RxTime deadline ;
345    /*
346     * Several clients may want to wait for incoming data. They are queued
347     * in the following structure and automatically reponsed by the
348     * data acceptor of the queue in a FIFO manner, which is a fair-queue
349     * algorithm.
350     * The clients will get a notice of am error if the queue is destroyed
351     * or emptied by an explicite call.
352     * structure (n = newer, o = older)
353     *         oldest                 newest
354     *           ||                     ||
355     * NULL<-o--client<-o--client<-o--client--n->NULL
356     *               |     ^    |     ^
357     *               |     |    |     |
358     *               +--n--+    +--n--+
359     */
360    struct _Client *oldest, *newest;
361 } RxQueue;
362 
363 /*
364  * queues we work on.
365  * Format, p=prev, n=next:
366  *           queues
367  *             ||
368  *  NULL<--p-queue-n---->queue-n---->queue-n-->NULL
369  *               ^       v   ^       v
370  *               |       |   |       |
371  *               +---p---+   +---p---+
372  */
373 RxQueue *queues ;
374 
375 /* SESSION is the special queue which can't be deleted and to which clients
376  * drop when their current queue is deleted.
377  */
378 RxQueue *SESSION ;
379 
380 /*
381  * Structure for multiple clients
382  */
383 typedef struct _Client
384 {
385    /*
386     * linked list maintainance elements
387     */
388    struct _Client *prev, *next ;
389 
390    /*
391     * socket contains the socket's handle
392     */
393    int socket;
394 
395    /*
396     * Indicates if this client is a Websocket client, 0 no, 1 yes
397     */
398    int isWebsocket;
399 
400    /*
401     * each client has a default queue associated. It must be valid all
402     * the times after initialization.
403     */
404    RxQueue *default_queue;
405 
406    /*
407     * if queue_timeout is set, the client expects an error code after
408     * this time instead of waiting until world's end.
409     * The value is in milliseconds.
410     * A value of zero means no timeout; return immediately if no data
411     * A value of -1 means wait forever
412     */
413    long queue_timeout;
414 
415    /*
416     * We manage a deadline. A PULL operation is in an error state after
417     * this timestamp. The value is set at a PULL operation to
418     * now+queue_timeout.
419     */
420    RxTime deadline ;
421 
422    /*
423     * linked list maintainance elements for waiters.
424     */
425    struct _Client *older, *newer ;
426 } Client;
427 
428 /*
429  * clients we work on.
430  * Format, p=prev, n=next:
431  *           clients
432  *             ||
433  *  NULL<--p-client-n--->client-n--->client-n-->NULL
434  *               ^       v   ^       v
435  *               |       |   |       |
436  *               +---p---+   +---p---+
437  */
438 Client *clients;
439 
440 int running = 1;
441 int allclean = 0;
442 time_t base_secs; /* the time the process started */
443 
444 static long portno = 0;
445 static int world = 0 ;
446 static int isdaemon = 0;
447 static int tosuicide = 0;
448 
449 void empty_queue( RxQueue *q ) ;
450 
451 #if !defined(HAVE_STRERROR)
452 /*
453  * Sigh! This must probably be done this way, although it's incredibly
454  * backwards. Some versions of gcc comes with a complete set of ANSI C
455  * include files, which contains the definition of strerror(). However,
456  * that function does not exist in the default libraries of SunOS.
457  * To circumvent that problem, strerror() is #define'd to get_sys_errlist()
458  * in config.h, and here follows the definition of that function.
459  * Originally, strerror() was #defined to sys_errlist[x], but that does
460  * not work if string.h contains a declaration of the (non-existing)
461  * function strerror().
462  *
463  * So, this is a mismatch between the include files and the library, and
464  * it should not create problems for Regina. However, the _user_ will not
465  * encounter any problems until he compiles Regina, so we'll have to
466  * clean up after a buggy installation of the C compiler!
467  */
get_sys_errlist(int num)468 const char *get_sys_errlist( int num )
469 {
470    extern char *sys_errlist[] ;
471    return sys_errlist[num] ;
472 }
473 #endif
474 
475 #ifdef WIN32
Win32ErrorString(int code)476 const volatile char *Win32ErrorString(int code)
477 {
478    static char buffer[512];
479    size_t len;
480    const CHAR *array[10];
481    static HINSTANCE tcpip = NULL;
482    DWORD rc;
483 
484    for (rc = 0;rc < sizeof(array) / sizeof(array[0]);rc++)
485       array[rc] = "?";
486 
487    sprintf(buffer,"code %d: ",code);
488    len = strlen(buffer);
489 
490    if (tcpip == NULL)
491       tcpip = GetModuleHandle("wsock32");
492    rc = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM |
493                       FORMAT_MESSAGE_FROM_HMODULE |
494                       FORMAT_MESSAGE_ARGUMENT_ARRAY |
495                       FORMAT_MESSAGE_MAX_WIDTH_MASK,
496                       tcpip,                       /* lpSource               */
497                       code,
498                       GetUserDefaultLangID(),
499                       buffer + len,
500                       sizeof(buffer) - len - 1,    /* w/o term. 0            */
501                       (va_list *) &array);
502    if ((rc == 0) && (len >= 2))
503       buffer[len - 2] = '\0';                      /* cut off ": " at the end*/
504    return(buffer);
505 }
506 #endif
507 
508 /*
509  * get_now returns the current time.
510  */
get_now(void)511 RxTime get_now( void )
512 {
513    RxTime retval ;
514 #if defined(HAVE_GETTIMEOFDAY)
515    struct timeval times ;
516 
517    gettimeofday(&times, NULL) ;
518    retval.seconds = times.tv_sec ;
519    retval.milli = times.tv_usec / 1000 ;
520 
521 #elif defined(MAXLONGLONG)
522    static enum { T_first, T_OK, T_illegal } state = T_first ;
523    static LARGE_INTEGER freq;
524    LARGE_INTEGER curr;
525 
526    retval.seconds = 0; /* Keep compiler happy */
527    if ( state == T_first )
528    {
529       if ( !QueryPerformanceFrequency( &freq ) )
530          state = T_illegal ;
531       else
532          state = T_OK ;
533    }
534    if ( state == T_OK )
535    {
536       if ( !QueryPerformanceCounter( &curr ) )
537          state = T_illegal ;
538       else
539       {
540          ULONGLONG h ;
541          /*
542           * if we don't native support for the 64 bit arithmetic, use
543           * doubles. Everything else is a pain.
544           * I hope we never get a compiler for Windows which can't compile
545           * this directly.
546           */
547          retval.seconds = (time_t) ( curr.QuadPart / freq.QuadPart ) ;
548          h = curr.QuadPart % freq.QuadPart ;
549          h *= 1000 ;
550          retval.milli = (int) ( h / freq.QuadPart ) ;
551       }
552    }
553    if ( state == T_illegal )
554    {
555       /* Windows systems have ftime in the C library */
556       struct timeb timebuffer;
557 
558       ftime(&timebuffer);
559       retval.seconds = timebuffer.time ;
560       retval.milli = timebuffer.millitm ;
561    }
562 
563 #elif defined(HAVE_FTIME)
564    struct timeb timebuffer;
565 
566    ftime(&timebuffer);
567    retval.seconds = timebuffer.time ;
568    retval.milli = timebuffer.millitm ;
569 
570 #else
571    clock_t c ;
572 
573    if ( ( c = clock() ) == (clock_t) -1 )
574    {
575       /*
576        * clock() values are not adjusted to 1.1.1970 and CLOCKS_PER_SEC
577        * may be a float or double
578        */
579       retval.seconds = (time_t) (c / CLOCKS_PER_SEC) ;
580       retval.milli = (int) ( ( c * 1000.0 ) / CLOCKS_PER_SEC) % 1000 ;
581    }
582    else
583    {
584       retval.seconds = time( NULL ) ;
585       retval.milli = 0 ;
586    }
587 #endif
588    return retval ;
589 }
590 
591 /*
592  * time_add increments an amount of milliseconds to time. the amount is
593  * usually a Client->queue_timeout.
594  */
time_add(RxTime * t,long incr)595 static void time_add( RxTime *t, long incr )
596 {
597    time_t s = (time_t) ( incr / 1000 ) ;
598    int m = ( incr % 1000 ) ;
599 
600    assert( t->milli != -1 ) ;
601    /* wrapping may occur, be careful */
602    s += t->seconds ;
603    if ( ( t->milli += m ) >= 1000 )
604    {
605       s++ ;
606       t->milli -= 1000 ;
607    }
608    if ( t->seconds > s )
609    {
610       /* wrapping! */
611       s = (time_t) -1 ; /* maximum 1 */
612       if ( t->seconds > s )
613       {
614          /* time_t is a signed value, most representations have
615           * MinX = -MaxX - 1
616           */
617          s++;
618          s = -s;
619       }
620       assert( t->seconds <= s ) ;
621    }
622    t->seconds = s ;
623 }
624 
625 /*
626  * time_diff returns t1 - t2 in milliseconds. -1 is returned on overflow;
627  * -1 is never returned else. -2 is used is the returned time will be
628  * negative.
629  */
time_diff(RxTime t1,RxTime t2)630 static int time_diff( RxTime t1, RxTime t2 )
631 {
632    int retval, h ;
633 
634    assert( t1.milli != -1 ) ;
635    assert( t2.milli != -1 ) ;
636    if ( t1.seconds < t2.seconds )
637       retval = -2 ;
638    else
639    {
640       retval = (int) ( ( t1.seconds - t2.seconds ) * 1000 ) ;
641 
642       if ( ( t1.milli < t2.milli ) && ( retval == 0 ) )
643          retval = -2 ;
644       else
645       {
646          retval += t1.milli - t2.milli ;
647 
648          /* final check for overflow */
649          h = (int) ( t1.seconds - t2.seconds ) ;
650          if ( ( ( retval / 1000 ) < h - 1 )
651            || ( ( retval / 1000 ) > h + 1 )
652            || ( retval < 0 ) )
653             return -1 ;
654       }
655    }
656    return retval ;
657 }
658 
659 /* compares 2 strengs and returns 0 if they are equal, 1 if not.
660  * The second one is converted to uppercase while comparing, the first
661  * one must be uppercase.
662  */
Str_ccmp(const streng * first,const streng * second)663 int Str_ccmp( const streng *first, const streng *second )
664 {
665    int tmp ;
666 
667    if ( PSTRENGLEN( first ) != PSTRENGLEN( second ) )
668       return 1 ;
669 
670    for (tmp=0; tmp < PSTRENGLEN( first ); tmp++ )
671       if ( first->value[tmp] != toupper( second->value[tmp] ) )
672          return 1 ;
673 
674    return 0 ;
675 }
676 
677 /* Str_cre_or_exit create a streng or exits after a message about
678  * missing memory.
679  */
Str_cre_or_exit(const char * str,unsigned length)680 streng *Str_cre_or_exit( const char *str, unsigned length )
681 {
682    streng *retval ;
683 
684    if ( ( retval = MAKESTRENG( length ) ) == NULL )
685    {
686       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
687       exit( ERR_STORAGE_EXHAUSTED );
688    }
689 
690    memcpy( PSTRENGVAL( retval ), str, length ) ;
691    retval->len = length ;
692    return retval ;
693 }
694 
695 /*
696  * Str_buf create a streng from a buffer. It may return NULL on error.
697  */
Str_buf(const char * str,unsigned length)698 streng *Str_buf( const char *str, unsigned length )
699 {
700    streng *retval ;
701 
702    if ( ( retval = MAKESTRENG( length ) ) == NULL )
703    {
704       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
705       return NULL;
706    }
707 
708    memcpy( PSTRENGVAL( retval ), str, length );
709    retval->len = length;
710    return retval;
711 }
712 
713 /*
714  * Str_dup duplicates a streng. It may return NULL on error.
715  */
Str_dup(const streng * str)716 streng *Str_dup( const streng *str )
717 {
718    return Str_buf( PSTRENGVAL( str ), PSTRENGLEN( str ) ) ;
719 }
720 
Str_cat(streng * first,const streng * second)721 streng *Str_cat( streng *first, const streng *second )
722 {
723    streng *ptr;
724    int tmp;
725 
726    tmp = Str_len( first ) + Str_len( second );
727 
728    if ( ( ptr = MAKESTRENG( tmp ) ) != NULL )
729    {
730       memcpy( ptr->value, first->value, Str_len( first ) );
731       memcpy( ptr->value + Str_len( first ), second->value, Str_len( second ) );
732       ptr->len = tmp;
733    }
734    return ptr;
735 }
736 
737 /*
738  * delete_a_queue deletes the queue's content and unlinks it from the list
739  * of existing queues if q isn't SESSION.
740  */
delete_a_queue(RxQueue * q)741 void delete_a_queue( RxQueue *q )
742 {
743    Client *c ;
744 
745    if ( q->name )
746    {
747       DEBUGDUMP(printf("Deleting queue <%.*s>\n", PSTRENGLEN( q->name), PSTRENGVAL( q->name)););
748    }
749    else
750    {
751       DEBUGDUMP(printf("Deleting natal queue\n"););
752    }
753    empty_queue( q ) ;
754 
755    if ( q == SESSION )
756       return ;
757 
758    if ( q->name != NULL )
759       DROPSTRENG( q->name );
760 
761    /*
762     * dequeue from the linked list and free space
763     */
764    if (q->prev != NULL)
765       q->prev->next = q->next ;
766    else
767       queues = q->next ;
768    if (q->next != NULL)
769       q->next->prev = q->prev ;
770    free( q ) ;
771 
772    /*
773     * Let all clients connected to this queue fall back to "SESSION" as
774     * the default queue.
775     * FIXME: It may be better to leave a queue to a non-isReal state.
776     *        This kind of work destroys the data integrity on SESSION.
777     *        We get many more connections working on SESSION than expected.
778     */
779    for( c = clients; c != NULL; c = c->next )
780    {
781       if ( c->default_queue == q )
782          c->default_queue = SESSION ;
783    }
784 }
785 
786 /*
787  * delete_a_client deletes the clients' content and unlinks it from the list
788  * of existing clients.
789  */
delete_a_client(Client * c)790 void delete_a_client( Client *c )
791 {
792    closesocket( c->socket ) ;
793 
794    /*
795     * dequeue from the linked list and free space
796     */
797    if (c->prev != NULL)
798       c->prev->next = c->next ;
799    else
800       clients = c->next ;
801    if (c->next != NULL)
802       c->next->prev = c->prev ;
803    free( c ) ;
804 }
805 
delete_all_queues(void)806 void delete_all_queues( void )
807 {
808    RxQueue *q, *h ;
809 
810    /*
811     * SESSION won't be deleted. Be careful to initiate one delete per
812     * queue.
813     */
814    for ( q = queues; q != NULL; )
815    {
816       h = q ;
817       q = q->next ;
818       delete_a_queue( h );
819    }
820 }
821 
get_unspecified_queue(void)822 char *get_unspecified_queue( void )
823 {
824    char *rxq = getenv( "RXQUEUE" );
825 
826    if ( rxq == NULL )
827       rxq = "SESSION";
828 
829    if ( strchr(rxq, '@' ) == NULL )
830    {
831       char *h ;
832 
833       if ( ( h = (char *)malloc( strlen( rxq ) + 2 ) ) != NULL )
834       {
835          strcpy( h, rxq ) ;
836          strcat( h, "@" ) ;
837          rxq = h ;
838       }
839    }
840    return rxq;
841 }
842 
suicide(void)843 int suicide( void )
844 {
845    int sock;
846    int myport;
847    streng *queue;
848    char *in_queue=get_unspecified_queue();
849    Queue q;
850 
851    if ( init_external_queue( NULL ) )
852       return 1;
853 
854 
855    queue = Str_cre_or_exit( in_queue, strlen( in_queue ) ) ;
856 
857    if ( parse_queue( NULL, queue, &q ) == 1 )
858    {
859       if ( portno == 0 )
860          myport = default_port_number();
861       else
862          myport = portno;
863       q.u.e.portno = myport;
864       sock = connect_to_rxstack( NULL, &q );
865       if ( sock < 0 )
866       {
867          /* error already shown by the function */
868          return(ERR_RXSTACK_CANT_CONNECT);
869       }
870       send_command_to_rxstack( NULL, sock, RXSTACK_KILL_STR, NULL, 0 );
871       read_result_from_rxstack( NULL, sock, RXSTACK_HEADER_SIZE );
872       closesocket(sock);
873    }
874    term_external_queue( ) ;
875    return 0;
876 }
877 
rxstack_cleanup(void)878 int rxstack_cleanup( void )
879 {
880    if ( !allclean )
881    {
882       DEBUGDUMP(printf("Cleaning up\n"););
883       /*
884        * Disconnect all clients
885        * Delete all clients
886        */
887       delete_all_queues();
888       DEBUGDUMP(printf("Finished Cleaning up\n"););
889       term_external_queue( ) ;
890       allclean = 1;
891    }
892    return 0;
893 }
894 
895 #ifdef BUILD_NT_SERVICE
report_service_start(void)896 BOOL report_service_start( void )
897 {
898    /*
899     * report the status to the service control manager.
900     */
901    return (ReportStatusToSCMgr(
902            SERVICE_RUNNING,       /* service state */
903            NO_ERROR,              /* exit code */
904            0));                   /* wait hint */
905 }
906 
report_service_pending_start(void)907 BOOL report_service_pending_start( void )
908 {
909    /*
910     * report the status to the service control manager.
911     */
912    return (ReportStatusToSCMgr(
913            SERVICE_START_PENDING, /* service state */
914            NO_ERROR,              /* exit code */
915            3000));                /* wait hint */
916 }
917 
nt_service_start(void)918 int nt_service_start( void )
919 {
920    /*
921     * code copied from sample NT Service code. The goto's are
922     * not mine!!
923     * report the status to the service control manager.
924     */
925    if ( !report_service_pending_start() )
926       goto cleanupper;
927 
928    /*
929     * create the event object. The control handler function signals
930     * this event when it receives the "stop" control code.
931     */
932    hServerStopEvent = CreateEvent(
933         NULL,    /* no security attributes */
934         TRUE,    /* manual reset event */
935         FALSE,   /* not-signalled */
936         NULL);   /* no name */
937 
938    if ( hServerStopEvent == NULL)
939       goto cleanupper;
940 
941    /*
942     * report the status to the service control manager.
943     */
944    if ( !report_service_pending_start() )
945       goto cleanupper;
946 
947     return 0;
948 cleanupper:
949     return 1;
950 }
951 
ServiceStop()952 VOID ServiceStop()
953 {
954    DEBUGDUMP(printf("In ServiceStop()\n"););
955    suicide();
956 /*
957    running = 0;
958 */
959 }
960 #endif
961 
rxstack_signal_handler(int sig)962 void rxstack_signal_handler( int sig )
963 {
964    running = 0;
965 }
966 
967 /* Creates a new client and appends it in front of the current clients.
968  * Don't forget to set a default_queue and the socket at once.
969  */
get_new_client()970 Client *get_new_client( )
971 {
972    Client *retval = (Client *)malloc( sizeof( Client ) ) ;
973 
974    if ( retval == NULL )
975       return NULL ;
976    memset( retval, 0, sizeof( Client ) ) ;
977    retval->socket = -1 ;
978    retval->deadline.milli = -1 ; /* deadline not used --> infinite timeout */
979 
980    retval->next = clients ;
981    if ( clients != NULL )
982       clients->prev = retval ;
983    clients = retval ;
984    return retval ;
985 }
986 
987 /*
988  * Find the named queue - case insensitive
989  * returns the queue or NULL if no queue with this name exists.
990  */
find_queue(const streng * queue_name)991 RxQueue *find_queue( const streng *queue_name )
992 {
993    RxQueue *q ;
994 
995    for ( q = queues; q != NULL; q = q->next )
996    {
997       /* This is inefficient, FIXME: Introduce a hash value */
998       if ( Str_ccmp( q->name, queue_name ) == 0 )
999          return q;
1000    }
1001    return NULL ;
1002 }
1003 
1004 /* Creates a new queue and appends it in front of the current queues.
1005  * Don't forget to set a name at once.
1006  */
get_new_queue(void)1007 RxQueue *get_new_queue( void )
1008 {
1009    RxQueue *retval = (RxQueue *)malloc( sizeof( RxQueue ) ) ;
1010 
1011    if ( retval == NULL )
1012       return NULL ;
1013    memset( retval, 0, sizeof( RxQueue ) ) ;
1014    retval->deadline = queue_deadline ;
1015 
1016    retval->next = queues ;
1017    if ( queues != NULL )
1018       queues->prev = retval ;
1019    queues = retval ;
1020    return retval ;
1021 }
1022 
rxstack_delete_queue(Client * client,streng * queue_name)1023 int rxstack_delete_queue( Client *client, streng *queue_name )
1024 {
1025    RxQueue *q ;
1026    int rc ;
1027 
1028    if ( ( q = find_queue( queue_name ) ) == NULL )
1029    {
1030       rc = 9;
1031    }
1032    else
1033    {
1034       if ( q == SESSION )
1035          rc = 5;
1036       else
1037       {
1038          if ( !q->isReal )
1039          {
1040             /*
1041              * If we found a false queue, return 9
1042              * but delete it.
1043              */
1044             delete_a_queue( q );
1045             rc = 9;
1046          }
1047          else
1048          {
1049             /*
1050              * Delete the contents of the queue
1051              * and mark it as gone.
1052              */
1053             delete_a_queue( q );
1054             rc = 0;
1055          }
1056       }
1057    }
1058    return rc ;
1059 }
1060 
rxstack_create_client(int socket)1061 int rxstack_create_client( int socket )
1062 {
1063    Client *c ;
1064 
1065    if ( ( c = get_new_client( ) ) == NULL )
1066    {
1067       closesocket( socket ) ;
1068       /* This may have been the connection telling us to go down ;-) */
1069       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1070       exit( ERR_STORAGE_EXHAUSTED );
1071    }
1072 
1073    c->socket = socket;
1074    c->default_queue = SESSION ;
1075    return 0;
1076 }
1077 
encode_ws_payload(u_char const * src,size_t srclength,char * target,size_t targsize,unsigned int opcode)1078 int encode_ws_payload(u_char const *src, size_t srclength, char *target, size_t targsize, unsigned int opcode)
1079 {
1080    unsigned long long b64_sz, payload_offset = 2, len = 0;
1081 
1082    if ((int)srclength <= 0)
1083    {
1084       return 0;
1085    }
1086 
1087    DEBUGDUMP(printf("Encode new frame\n"););
1088    b64_sz = srclength;
1089 
1090    target[0] = (char)((opcode & 0x0F) | 0x80);
1091 
1092    if (b64_sz <= 125)
1093    {
1094       target[1] = (char) b64_sz;
1095       payload_offset = 2;
1096    }
1097    else if ((b64_sz > 125) && (b64_sz < 65536))
1098    {
1099       target[1] = (char) 126;
1100       *(u_short*)&(target[2]) = htons(b64_sz);
1101       payload_offset = 4;
1102    }
1103    else
1104    {
1105       showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Sending frames larger than 65535 bytes not supported" );
1106       return -1;
1107    }
1108 
1109    memcpy( target+payload_offset, src, srclength );
1110    len = srclength;
1111    return len + payload_offset;
1112 }
1113 
1114 /* rxstack_send_return writes back to the client the action return code
1115  * and optionally a string of length len.
1116  * The functions returns 0 on success, -1 on error
1117  */
rxstack_send_return(Client * client,char * action,char * str,int len)1118 int rxstack_send_return( Client *client, char *action, char *str, int len )
1119 {
1120    streng *qlen, *header;
1121    int rc, retval = 0, rcode ;
1122    int sock = client->socket;
1123    char ws_response[4096];
1124 
1125    if ( client->isWebsocket )
1126    {
1127       DEBUGDUMP(printf("Sending to websocket client %d Result: %c <%.*s>\n", sock, *action, len, (str) ? str : ""););
1128       rcode = encode_ws_payload( (const unsigned char *)str, len, ws_response, sizeof(ws_response), 1);
1129       rc = send( client->socket, ws_response, rcode, 0 );
1130       if ( rc != rcode )
1131       {
1132          DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1133          retval = -1;
1134       }
1135    }
1136    else
1137    {
1138       DEBUGDUMP(printf("Sending to %d Result: %c <%.*s>\n", sock, *action, len, (str) ? str : ""););
1139       qlen = REXX_D2X( len );
1140       if ( qlen )
1141       {
1142          header = REXX_RIGHT( qlen, RXSTACK_HEADER_SIZE, '0');
1143          DROPSTRENG( qlen );
1144          if ( header )
1145          {
1146             header->value[0] = action[0];
1147             DEBUGDUMP(printf("Sending Header: %.*s\n", PSTRENGLEN(header), PSTRENGVAL(header)););
1148             rc = send( sock, PSTRENGVAL(header), PSTRENGLEN(header), 0 );
1149             if ( rc != PSTRENGLEN(header) )
1150             {
1151                DEBUGDUMP(printf("Send failed: rc> %d != PSTRENGLEN(header)> %d errno = %d\n", rc, PSTRENGLEN(header),os_errno ););
1152                retval = -1 ;
1153             }
1154             else if ( str )
1155             {
1156                rc = send( sock, str, len, 0 );
1157                if ( rc != len )
1158                {
1159                   DEBUGDUMP(printf("Send failed: errno = %d\n", os_errno ););
1160                   retval = -1 ;
1161                }
1162             }
1163             DROPSTRENG( header );
1164          }
1165       }
1166    }
1167    return retval ;
1168 }
1169 
rxstack_delete_client(Client * client)1170 int rxstack_delete_client( Client *client )
1171 {
1172    DEBUGDUMP(printf("Deleting client: Socket: %d\n", client->socket ););
1173    delete_a_client( client ) ;
1174    return 0;
1175 }
1176 
1177 /* rxstack_set_default_queue sets the client's (new) queue name.
1178  * A false queue is created if the queue isn't found.
1179  * The new queue is returned or NULL if we are out of memory.
1180  */
rxstack_set_default_queue(Client * client,streng * data)1181 RxQueue *rxstack_set_default_queue( Client *client, streng *data )
1182 {
1183    RxQueue *q, *prev;
1184    streng *newq;
1185 
1186    prev = client->default_queue;
1187    if ( ( q = find_queue( data ) ) == NULL )
1188    {
1189       /*
1190        * We didn't find a real or a false queue, so create
1191        * a false queue
1192        */
1193       q = get_new_queue( );
1194       if ( q != NULL )
1195       {
1196          newq = Str_dup( data );
1197          if ( newq == NULL )
1198          {
1199             delete_a_queue( q );
1200             return NULL;
1201          }
1202          q->name = REXX_UPPER( newq ) ;
1203          DEBUGDUMP(printf("Creating the false queue <%.*s>", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ););
1204          /* q->isReal set to 0 by get_new_queue --> false queue */
1205          client->default_queue = q;
1206       }
1207    }
1208    else
1209    {
1210       client->default_queue = q;
1211    }
1212 
1213    if ( q == NULL )
1214    {
1215       DEBUGDUMP(printf("No FREE MEMORY when setting default queue for client: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1216    }
1217    else
1218    {
1219       DEBUGDUMP(printf("Setting default queue for client: <%.*s> Prev: %p <%.*s>\n", PSTRENGLEN(q->name), PSTRENGVAL(q->name), prev, PSTRENGLEN(prev->name), PSTRENGVAL(prev->name) ););
1220       /* SET or CREATE resets a timeout to 0; effectively turns off any timeout */
1221       client->queue_timeout = 0 ;
1222    }
1223    return q;
1224 }
1225 
rxstack_timeout_queue(Client * client,const streng * data)1226 int rxstack_timeout_queue( Client *client, const streng *data )
1227 {
1228    int val,error;
1229 
1230    /*
1231     * Convert the timeout
1232     * If the supplied timeout is 0 (infinite wait), set the client->queue_timeout
1233     * to -1.
1234     */
1235    val = REXX_X2D( data, &error );
1236    if ( error )
1237       return 2;
1238    if ( val == 0 )
1239       val = -1;
1240    client->queue_timeout = val;
1241    DEBUGDUMP(printf("Timeout on queue: %ld\n", client->queue_timeout ););
1242 
1243    return 0;
1244 }
1245 
1246 /* unique_name creates a unique name for a queue.
1247  * The function may exit after a message about missing memory.
1248  */
unique_name(void)1249 static streng *unique_name( void )
1250 {
1251    static int first = 1 ;
1252    static char buf[ 80 ] ;
1253    static char *ptr ;
1254    static unsigned runner = 0;
1255 
1256    if ( first )
1257    {
1258       first = 0 ;
1259       sprintf( buf, "S%d%ld", (int) getpid(), (long) time( NULL ) ) ;
1260       ptr = buf + strlen( buf ) ;
1261    }
1262    sprintf( ptr, "%u", runner++ ) ;
1263    return Str_buf( buf, strlen( buf ) ) ;
1264 }
1265 
rxstack_create_queue(Client * client,streng * data,streng ** result)1266 int rxstack_create_queue( Client *client, streng *data, streng **result )
1267 {
1268    RxQueue *q ;
1269    streng *new_queue = NULL;
1270    int rc = 0;
1271 
1272    if ( data )
1273    {
1274       DEBUGDUMP(printf("Creating new user-specified queue: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1275       if ( ( q = find_queue( data ) ) == NULL )
1276       {
1277          /*
1278           * No queue of that name, so use a duplicate of it.
1279           */
1280          DEBUGDUMP(printf("Couldn't find <%.*s>; so creating it\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1281          new_queue = data;
1282       }
1283       else
1284       {
1285          /*
1286           * If the queue we found is a false queue, we can still
1287           * use the supplied name and the slot
1288           */
1289          DROPSTRENG( data );
1290          if ( !q->isReal )
1291          {
1292             DEBUGDUMP(printf("Found false queue\n" ););
1293             q->isReal = 1;
1294             /* SET or CREATE resets a timeout to 0 */
1295             client->queue_timeout = 0;
1296             *result = q->name;
1297             return 0; /* Pass back the name. May be different due to
1298                        * different locales or codepages, but it IS the selected
1299                        * name.
1300                        */
1301          }
1302          new_queue = unique_name( );
1303          if ( new_queue == NULL )
1304             return 3;
1305          DEBUGDUMP(printf("Having to create unique queue <%.*s>\n", PSTRENGLEN( new_queue ), PSTRENGVAL( new_queue ) ););
1306          rc = 1;
1307       }
1308    }
1309    else
1310    {
1311       DEBUGDUMP(printf("Creating system generated queue.\n"););
1312       new_queue = unique_name( );
1313       if ( new_queue == NULL )
1314          return 3;
1315    }
1316 
1317    if ( ( q = get_new_queue( ) ) == NULL )
1318    {
1319       DROPSTRENG( new_queue );
1320       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1321       return 3;
1322    }
1323 
1324    /*
1325     * Uppercase the queue name
1326     */
1327    q->name = REXX_UPPER( new_queue );
1328    q->isReal = 1;
1329    /* SET or CREATE resets a timeout to 0 */
1330    client->queue_timeout = 0 ;
1331    *result = q->name;
1332    return rc; /* Both code 0 and code 1 return the name to the caller.
1333                * May be different due to codepages, etc.
1334                */
1335 }
1336 
1337 /*
1338  * Pushes 'line' onto the REXX stack in LIFO manner.
1339  *    point to the new line. The line is put on top of the current
1340  *    buffer.
1341  */
rxstack_stack_lifo(RxQueue * current_queue,streng * line)1342 StackLine *rxstack_stack_lifo( RxQueue *current_queue, streng *line )
1343 {
1344    StackLine *newbox ;
1345 
1346    if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1347    {
1348       newbox->contents = line ;
1349       LIFO_LINE( &current_queue->buf, newbox ) ;
1350    }
1351 
1352    return newbox;
1353 }
1354 
1355 
1356 /*
1357  * Pushes 'line' onto the REXX stack in FIFO manner.
1358  *    point to the new line. The line is put on top of the current
1359  *    buffer.
1360  */
rxstack_stack_fifo(RxQueue * current_queue,streng * line)1361 StackLine *rxstack_stack_fifo( RxQueue *current_queue, streng *line )
1362 {
1363    StackLine *newbox ;
1364 
1365    if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1366    {
1367       newbox->contents = line ;
1368       FIFO_LINE( &current_queue->buf, newbox ) ;
1369    }
1370 
1371    return newbox;
1372 }
1373 
1374 /*
1375  * dequeue_waiter dequeues the Client c from the waiter's list of the queue q.
1376  * The client must be linked in the waiter's list of the queue.
1377  * The client's variables for this purpose are cleaned, too.
1378  */
dequeue_waiter(RxQueue * q,Client * c)1379 static void dequeue_waiter( RxQueue *q, Client *c )
1380 {
1381 #if defined(DEBUG)
1382    Client *run ;
1383    for ( run = q->oldest; run != NULL; run = run->newer )
1384       if ( run == c )
1385          break ;
1386    assert( run != NULL ) ; /* This is the test if c is a waiter of q */
1387 #endif
1388 
1389    if ( c->older != NULL )
1390    {
1391       c->older->newer = c->newer ;
1392    }
1393    else
1394    {
1395       q->oldest = c->newer ;
1396       if ( q->oldest != NULL )
1397          q->oldest->older = NULL ;
1398    }
1399 
1400    if ( c->newer != NULL )
1401    {
1402       c->newer->older = c->older ;
1403    }
1404    else
1405    {
1406       q->newest = c->older ;
1407       if ( q->newest != NULL )
1408          q->newest->newer = NULL ;
1409    }
1410    assert( ( ( q->newest != NULL ) && ( q->oldest != NULL ) ) ||
1411            ( ( q->newest == NULL ) && ( q->oldest == NULL ) ) ) ;
1412    c->older = c->newer = NULL ;
1413    c->deadline.milli = -1 ;
1414 }
1415 
1416 /* redir sends data as the answer of a pull operation back to
1417  * the oldest waiting client and dequeues this client.
1418  * data is dropped after the operation.
1419  */
redir(RxQueue * q,streng * data)1420 void redir( RxQueue *q, streng *data )
1421 {
1422    Client *c ;
1423 
1424    c = q->oldest ;
1425    DEBUGDUMP(printf("Redirecting <%.*s> to waiting client %d\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", c->socket ););
1426 
1427    dequeue_waiter( q, c ) ;
1428 
1429    rxstack_send_return( c, "0", PSTRENGVAL( data ), PSTRENGLEN( data ) ) ;
1430    DROPSTRENG( data ) ;
1431 }
1432 
1433 /* bad_news_for_waiter informs a waiter about an error while waiting for
1434  * data for a pull request. The client is dequeued from its queue.
1435  */
bad_news_for_waiter(RxQueue * q,Client * c)1436 void bad_news_for_waiter( RxQueue *q, Client *c )
1437 {
1438    dequeue_waiter( q, c ) ;
1439    DEBUGDUMP(printf("Sending negative response to waiting client %d\n", c->socket ););
1440 
1441    rxstack_send_return( c, "4", NULL, 0 ) ;
1442 }
1443 
rxstack_queue_data(Client * client,streng * data,char order)1444 int rxstack_queue_data( Client *client, streng *data, char order )
1445 {
1446    int rc = 0;
1447 
1448    if ( client->default_queue->oldest != NULL )
1449    {
1450       redir( client->default_queue, data ) ;
1451       return 0 ;
1452    }
1453    DEBUGDUMP(printf("Queueing: <%.*s> Order: %c\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", order ););
1454    if ( order == RXSTACK_QUEUE_FIFO )
1455    {
1456       if ( rxstack_stack_fifo( client->default_queue, data ) == NULL )
1457       {
1458          DROPSTRENG( data );
1459          rc = 3;
1460       }
1461    }
1462    else
1463    {
1464       if ( rxstack_stack_lifo( client->default_queue, data ) == NULL )
1465       {
1466          DROPSTRENG( data );
1467          rc = 3;
1468       }
1469    }
1470    return rc;
1471 }
1472 
1473 /* Clears the content of the queue. All waiters are informed by code
1474  * 2 of a cleaned queue and removed the the waiter's list.
1475  */
empty_queue(RxQueue * q)1476 void empty_queue( RxQueue *q )
1477 {
1478    StackLine *tmp, *line;
1479    streng *contents;
1480    Buffer *b ;
1481 
1482    b = &q->buf ;
1483    for ( line = b->top; line != NULL; )
1484    {
1485       contents = line->contents;
1486       DROPSTRENG( contents );
1487       tmp = line;
1488       line = line->lower;
1489       free( tmp );
1490    }
1491    memset( &q->buf, 0, sizeof( Buffer ) ) ;
1492 
1493    /* acknowledge waiters for data not ready and dequeue them */
1494 
1495    while ( q->oldest != NULL ) {
1496       bad_news_for_waiter( q, q->oldest ) ;
1497    }
1498 }
1499 
1500 /* Clears the content of the queue named data. In opposite to the previous
1501  * version, the client's current queue isn't set to the named queue any
1502  * longer. The is the default behaviour in Regina.
1503  * returns 0 on success, 2 if the queue doesn't exist.
1504  */
rxstack_empty_queue(Client * client,streng * data)1505 int rxstack_empty_queue( Client *client, streng *data )
1506 {
1507    RxQueue *q ;
1508 
1509    DEBUGDUMP(printf("Emptying queue: <%.*s>\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "" ););
1510    if ( ( q =  find_queue( data ) ) == NULL )
1511       return 2;
1512 
1513    empty_queue( q ) ;
1514 
1515    return 0;
1516 }
1517 
rxstack_number_in_queue(Client * client)1518 int rxstack_number_in_queue( Client *client )
1519 {
1520    int lines = (int) client->default_queue->buf.elements;
1521 
1522    DEBUGDUMP(printf("Querying number in queue: %d\n", lines ););
1523    return lines ;
1524 }
1525 
1526 /*
1527  * Pulls a line off the queue and dequeues it.
1528  *
1529  * If nowait isn't set and no data is available and the client's queue_timeout
1530  * is set, the client is set to the newest end of the client's default_queue.
1531  *
1532  * It will be awaken by either ariving data on this pipe, or deleting/emptying
1533  * the pipe, or by a timeout.
1534  * Returns:
1535  * 0 if line OK
1536  * 1 if queue empty
1537  * 3 if waiting
1538  */
rxstack_pull_line_off_queue(Client * client,streng ** result,int nowait)1539 int rxstack_pull_line_off_queue( Client *client, streng **result, int nowait )
1540 {
1541    int rc;
1542    Buffer *b;
1543    StackLine *line;
1544    RxQueue *q;
1545 
1546    b = &client->default_queue->buf;
1547    POP_LINE( b, line );
1548    if ( line != NULL )
1549    {
1550       *result = line->contents;
1551       free( line );
1552       rc = 0;
1553    }
1554    else
1555    {
1556       *result = NULL;
1557       if ( nowait )
1558       {
1559          rc = RXSTACK_EMPTY; /* queue empty */
1560          DEBUGDUMP(printf("nowait set to 1\n" ););
1561       }
1562       else
1563       {
1564          if ( client->queue_timeout == 0 )
1565          {
1566             rc = RXSTACK_EMPTY; /* queue empty */
1567             DEBUGDUMP(printf("client timeout = 0\n" ););
1568          }
1569          else
1570          {
1571             assert( client->deadline.milli == -1 );
1572             assert ( client->newer == NULL );
1573             if ( client->queue_timeout != -1 )
1574             {
1575                now = get_now( );
1576                client->deadline = now;
1577                time_add( &client->deadline, client->queue_timeout );
1578             }
1579             q = client->default_queue;
1580             client->newer = NULL;
1581             client->older = q->newest;
1582             if ( client->older != NULL )
1583                client->older->newer = client;
1584             q->newest = client;
1585             if ( q->oldest == NULL )
1586                q->oldest = client;
1587             rc = RXSTACK_WAITING; /* waiting */
1588             DEBUGDUMP(printf("waiting until %ld.%d\n", (long) client->deadline.seconds,client->deadline.milli ););
1589          }
1590       }
1591    }
1592    DEBUGDUMP(printf("Pulling line off queue; rc %d\n", rc ););
1593    return rc;
1594 }
1595 
1596 /*
1597  * Gets all queues for all clients
1598  */
rxstack_get_queues(Client * client,streng ** result)1599 int rxstack_get_queues( Client *client, streng **result )
1600 {
1601    int rc,i=0;
1602    int len=0;
1603    RxQueue *q;
1604    char eol[3];
1605    streng *seol,*tmp1=NULL,*tmp2=NULL;
1606 
1607    DEBUGDUMP(printf("Getting queues; rc %d\n", rc ););
1608    /* get the length of all queues and the nul terminator and EOL delimiter */
1609    for ( q = queues; q != NULL; )
1610    {
1611       len += PSTRENGLEN( q->name)+1;
1612 #if !defined(UNIX) && !defined(MAC)
1613       len++;
1614 #endif
1615       DEBUGDUMP(printf("Queue name <%.*s>\n", PSTRENGLEN( q->name), PSTRENGVAL( q->name)););
1616       q = q->next ;
1617    }
1618    /* determine eol for appending */
1619 #if !defined(UNIX)
1620    eol[i++] = REGINA_CR;
1621 #endif
1622 #if !defined(MAC)
1623    eol[i++] = REGINA_EOL;
1624 #endif
1625    eol[i] = '\0';
1626    seol = Str_cre_or_exit( eol, i );
1627    for ( q = queues; q != NULL; )
1628    {
1629       if ( tmp1 )
1630          tmp1 = Str_cat( tmp2, q->name );
1631       else
1632          tmp1 = Str_dup( q->name ) ;
1633       /* free tmp2 ? if null */
1634       DROPSTRENG( tmp2 );
1635       tmp2 = Str_cat( tmp1, seol );
1636       /* free tmp1 ? */
1637       DROPSTRENG( tmp1 );
1638       q = q->next ;
1639    }
1640    *result = tmp2;
1641    rc = 0;
1642 
1643    return rc;
1644 }
1645 
eec_base64_encode(unsigned char * rawstr,int strlen,unsigned char ** encstr,int * encstrlen)1646 static int eec_base64_encode( unsigned char *rawstr, int strlen, unsigned char **encstr, int *encstrlen )
1647 {
1648    unsigned char in[3], out[4];
1649    int len,i,j,k,encoded_length;
1650    unsigned char *encoded_code;
1651 
1652    /*
1653     * Allocate 4/3 times the rawstring length
1654     */
1655    encoded_length = 3 + ((strlen * 4) / 3);
1656    encoded_code = (unsigned char *)malloc( encoded_length + sizeof(long) );
1657    if ( encoded_code == NULL )
1658    {
1659       return 1;
1660    }
1661 
1662    for ( i = 0, j = 0; i < strlen; i +=3 )
1663    {
1664       len = 1;
1665       in[0] = rawstr[i];
1666       if (i+1 < strlen)
1667       {
1668          in[1] = rawstr[i+1];
1669          len++;
1670       }
1671       else
1672          in[1] = 0;
1673       if (i+2 < strlen)
1674       {
1675          in[2] = rawstr[i+2];
1676          len++;
1677       }
1678       else
1679          in[2] = 0;
1680       /* encode */
1681       out[0] = cb64[ in[0] >> 2 ];
1682       out[1] = cb64[ ((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4) ];
1683       out[2] = (unsigned char) (len > 1 ? cb64[ ((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6) ] : '=');
1684       out[3] = (unsigned char) (len > 2 ? cb64[ in[2] & 0x3f ] : '=');
1685       for ( k = 0; k < 4; k++ )
1686       {
1687          encoded_code[j++] = out[k];
1688       }
1689    }
1690    *encstrlen = j;
1691    *encstr = encoded_code;
1692    return 0;
1693 }
1694 
1695 #if REQUIRE_BASE64_DECODE
eec_base64_decode(unsigned char * encstr,long encstrlen,unsigned char ** rawstr,long * rawstrlen)1696 static int eec_base64_decode( unsigned char *encstr, long encstrlen, unsigned char **rawstr, long *rawstrlen )
1697 {
1698    unsigned char *raw_code,v;
1699    unsigned char *encoded_code;
1700    unsigned char in[4], out[3];
1701    int i,j,k,len;
1702 
1703    encoded_code = encstr;
1704    if ( (encstrlen % 4 ) != 0 )
1705    {
1706       showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Length of encoded value must be a multiple of 4." );
1707       return 1;
1708    }
1709    /*
1710     * Decode encoded_code to produce raw
1711     * Result will always be smaller so allocate memory for encoded length
1712     */
1713    raw_code = (unsigned char *)malloc( encstrlen );
1714    if ( raw_code == NULL )
1715    {
1716       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1717       return 1;
1718    }
1719    /*
1720     * Now decode encoded_code
1721     */
1722    for ( i = 0, j = 0; i < encstrlen;  )
1723    {
1724       for ( k = 0, len = 0; k < 4; )
1725       {
1726          v = (unsigned char) encoded_code[i++];
1727          v = (unsigned char) ((v < 43 || v > 122) ? 0 : cd64[ v - 43 ]);
1728          if( v )
1729          {
1730             v = (unsigned char) ((v == '$') ? 0 : v - 61);
1731             if ( v )
1732             {
1733                in[k++] = (unsigned char) (v - 1);
1734                len++;
1735             }
1736             else
1737                break;
1738          }
1739       }
1740       /* decode */
1741       out[ 0 ] = (unsigned char ) (in[0] << 2 | in[1] >> 4);
1742       out[ 1 ] = (unsigned char ) (in[1] << 4 | in[2] >> 2);
1743       out[ 2 ] = (unsigned char ) (((in[2] << 6) & 0xc0) | in[3]);
1744       for ( k = 0; k < len - 1; k++ )
1745       {
1746          raw_code[j++] = out[k];
1747       }
1748    }
1749    raw_code[j] = '\0';
1750    *rawstrlen = j;
1751    *rawstr = raw_code;
1752    return 0;
1753 }
1754 #endif
1755 
memcmpi(char * buf1,char * buf2,int len)1756 int memcmpi( char *buf1, char *buf2, int len )
1757 {
1758    register short i=0;
1759    char c1=0,c2=0;
1760    for(i=0;i<len;i++)
1761    {
1762       if (isupper(*buf1))
1763          c1 = tolower(*buf1);
1764       else
1765          c1 = *buf1;
1766       if (isupper(*buf2))
1767          c2 = tolower(*buf2);
1768       else
1769          c2 = *buf2;
1770       if (c1 != c2)
1771          return(c1-c2);
1772       ++buf1;
1773       ++buf2;
1774    }
1775  return(0);
1776 }
1777 
countstr(char * str,char ch)1778 int countstr( char *str, char ch )
1779 {
1780    int i,count=0;
1781    int len = strlen( str );
1782 
1783    for ( i = 0; i < len; i++ )
1784    {
1785       if ( str[i] == ch )
1786          count++;
1787    }
1788    return count;
1789 }
1790 
determine_rxstack_command(char * str)1791 char determine_rxstack_command( char *str )
1792 {
1793    if ( memcmpi( "/fifo:", str, ACTION_FIFO_LEN ) == 0
1794    &&   countstr( str, ':' ) >= 2 )
1795       return RXSTACK_QUEUE_FIFO;
1796    else if ( memcmpi( "/lifo:", str, ACTION_FIFO_LEN ) == 0
1797    &&   countstr( str, ':' ) >= 2 )
1798       return RXSTACK_QUEUE_LIFO;
1799    else if ( memcmpi( "/queued:", str, ACTION_QUEUED_LEN ) == 0
1800    &&   countstr( str, ':' ) == 1 )
1801       return RXSTACK_NUMBER_IN_QUEUE;
1802    else if ( memcmpi( "/clear:", str, ACTION_CLEAR_LEN ) == 0
1803    &&   countstr( str, ':' ) == 1 )
1804       return RXSTACK_EMPTY_QUEUE;
1805    else if ( memcmpi( "/pull:", str, ACTION_PULL_LEN ) == 0
1806    &&   countstr( str, ':' ) == 1 )
1807       return RXSTACK_PULL;
1808    else if ( memcmpi( "/create:", str, ACTION_CREATE_LEN ) == 0
1809    &&   countstr( str, ':' ) == 1 )
1810       return RXSTACK_CREATE_QUEUE;
1811    else if ( memcmpi( "/delete:", str, ACTION_DELETE_LEN ) == 0
1812    &&   countstr( str, ':' ) == 1 )
1813       return RXSTACK_DELETE_QUEUE;
1814   /* wrong action or incorrect number of colons - error */
1815   return 0;
1816 }
1817 
1818 #define BAD_REQUEST "HTTP/1.1 400 Bad Request\r\n\r\n"
bad_request_from_websocket(Client * client)1819 int bad_request_from_websocket( Client *client )
1820 {
1821    int rc;
1822    int len = strlen( BAD_REQUEST );
1823 
1824    rc = send( client->socket, BAD_REQUEST, len, 0 );
1825    if ( rc != len )
1826    {
1827       DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1828       return 0;
1829    }
1830    return 1;
1831 }
1832 
rxstack_process_websockets_headers(Client * client)1833 int rxstack_process_websockets_headers( Client *client )
1834 {
1835    int rc = 1; /* by default successful return */
1836    char ws_headers[4096];
1837    char key[1024];
1838    char protocol[1024];
1839    char response[4096];
1840    char *start, *end;
1841    SHA1_HASH hash[SHA1_HASH_SIZE];
1842    Sha1Context context;
1843    unsigned char *strptr;
1844    int strlength;
1845    int len;
1846 
1847    memset( ws_headers, 0, sizeof(ws_headers) );
1848    rc = recv( client->socket, ws_headers, 4096, 0 );
1849    if ( rc < 0 )
1850    {
1851       if ( os_errno != ECONNRESET )
1852       {
1853          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
1854       }
1855       /*
1856        * Assume client has been lost
1857        */
1858       rxstack_delete_client( client );
1859       return 0 ;
1860    }
1861    if ( rc == 0 )
1862    {
1863       DEBUGDUMP(printf("read empty WS header\n"););
1864       /*
1865        * Assume client has been lost
1866        */
1867       rxstack_delete_client( client );
1868       return 0 ;
1869    }
1870    DEBUGDUMP(printf("header: [%s]\nLength: %d\n",ws_headers,rc););
1871    /*
1872     * Parse the headers
1873     */
1874    start = ws_headers+4;
1875    /*
1876     * must specify "root" as location: ws://localhost:8888 or ws://localhost:8888/ only.
1877     * no path allowed after optional port
1878     */
1879    end = strstr( start, "/ HTTP/1.1" );
1880    if ( !end )
1881    {
1882       bad_request_from_websocket( client );
1883       return 0;
1884    }
1885    /* validate and get protocol header */
1886    start = strstr( ws_headers, "\r\nSec-WebSocket-Protocol: ");
1887    if (!start)
1888    {
1889       DEBUGDUMP(printf("No header: \"Sec-WebSocket-Protocol:\"\n"););
1890       bad_request_from_websocket( client );
1891       return 0;
1892    }
1893    /* protocol must be "rxstack" */
1894    start += 26;
1895    end = strstr( start, "\r\n");
1896    strncpy( protocol, start, end-start );
1897    protocol[end-start] = '\0';
1898    if ( strcmp( protocol, "rxstack" ) != 0 )
1899    {
1900       DEBUGDUMP(printf("Invalid Protocol: %s; should be \"rxstack\"\n",protocol););
1901       bad_request_from_websocket( client );
1902       return 0;
1903    }
1904    /* validate and get key header */
1905    start = strstr(ws_headers, "\r\nSec-WebSocket-Key: ");
1906    if (!start)
1907    {
1908       bad_request_from_websocket( client );
1909       return 0;
1910    }
1911    start += 21;
1912    end = strstr(start, "\r\n");
1913    strncpy( key, start, end-start );
1914    key[end-start] = '\0';
1915 
1916    start = strstr(ws_headers, "\r\n\r\n");
1917    DEBUGDUMP(printf("end of headers at %x: [%s]\n",(unsigned int) start,start ););
1918 
1919    /*
1920     * Generate our response key
1921     */
1922    Sha1Initialise( &context );
1923    Sha1Update( &context, key, strlen(key) );
1924    Sha1Update( &context, WS_RESPONSE_MAGIC, 36 );
1925    Sha1Finalise( &context, hash );
1926    if ( eec_base64_encode( (unsigned char *)hash, SHA1_HASH_SIZE, (unsigned char **)&strptr, &strlength ) != 0 )
1927    {
1928       DEBUGDUMP(printf("base64 encoding failed:\n" ););
1929       return 0;
1930    }
1931    DEBUGDUMP(printf("Base64 key: <%.*s>\n", strlength, strptr););
1932    /*
1933     * Respond with our headers
1934     */
1935    len = sprintf( response, WS_RESPONSE_HEADER, strlength, strptr );
1936    DEBUGDUMP(printf("Sending to %d Result:\n[%.*s]\n", client->socket, len, response););
1937    rc = send( client->socket, response, len, 0 );
1938    if ( rc != len )
1939    {
1940       DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1941       return 0;
1942    }
1943    free( strptr );
1944    /*
1945     * Indicate we are a Websocket client
1946     */
1947    client->isWebsocket = 1;
1948    return rc;
1949 }
1950 
decode_ws_payload(unsigned char * src,size_t srclength,unsigned char * target,size_t targsize,unsigned int * opcode,unsigned int * left)1951 int decode_ws_payload( unsigned char *src, size_t srclength, unsigned char *target, size_t targsize, unsigned int *opcode, unsigned int *left )
1952 {
1953    unsigned char *frame, *mask, *payload, save_char;
1954    int masked = 0;
1955    int i = 0, framecount = 0;
1956    size_t remaining;
1957    unsigned int target_offset = 0, hdr_length = 0, payload_length = 0, decoded_length;
1958 
1959    *left = srclength;
1960    frame = src;
1961 
1962    DEBUGDUMP(printf("Decode new frame\n"););
1963    while (1)
1964    {
1965       // Need at least two bytes of the header
1966       // Find beginning of next frame. First time hdr_length, masked and
1967       // payload_length are zero
1968       frame += hdr_length + 4*masked + payload_length;
1969       DEBUGDUMP(printf("frame[0..3]: 0x%x 0x%x 0x%x 0x%x (tot: %d)\n",(unsigned char) frame[0],(unsigned char) frame[1],(unsigned char) frame[2],(unsigned char) frame[3], (int) srclength););
1970 
1971       if (frame > src + srclength)
1972       {
1973          DEBUGDUMP(printf("Truncated frame from client, need %d more bytes\n", (int) (frame - (src + srclength))););
1974          break;
1975       }
1976       remaining = (src + srclength) - frame;
1977       if (remaining < 2)
1978       {
1979          DEBUGDUMP(printf("Truncated frame header from client\n"););
1980          break;
1981       }
1982       framecount ++;
1983 
1984       *opcode = frame[0] & 0x0f;
1985       masked = (frame[1] & 0x80) >> 7;
1986 
1987       if (*opcode == 0x8)
1988       {
1989          // client sent orderly close frame
1990          break;
1991       }
1992 
1993       payload_length = frame[1] & 0x7f;
1994       if (payload_length < 126)
1995       {
1996          hdr_length = 2;
1997          //frame += 2 * sizeof(char);
1998       }
1999       else if (payload_length == 126)
2000       {
2001          payload_length = (frame[2] << 8) + frame[3];
2002          hdr_length = 4;
2003       }
2004       else
2005       {
2006          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Receiving frames larger than 65535 bytes not supported" );
2007          return -1;
2008       }
2009       if ((hdr_length + 4*masked + payload_length) > remaining)
2010       {
2011          continue;
2012       }
2013       DEBUGDUMP(printf("    payload_length: %u, raw remaining: %u, , hdr_length: %d, opcode %d\n",payload_length, (int) remaining,hdr_length, *opcode););
2014       payload = frame + hdr_length + 4*masked;
2015 
2016       if (*opcode != 1 && *opcode != 2)
2017       {
2018          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: Ignoring non-data frame, opcode 0x%x", *opcode );
2019          continue;
2020       }
2021 
2022       if (payload_length == 0)
2023       {
2024          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Ignoring empty frame" );
2025          continue;
2026       }
2027 
2028       if ((payload_length > 0) && (!masked))
2029       {
2030          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Received unmasked payload from client" );
2031          return -1;
2032       }
2033 
2034       // Terminate with a null for base64 decode
2035       save_char = payload[payload_length];
2036       payload[payload_length] = '\0';
2037       if ( debug )
2038       {
2039          DEBUGDUMP(printf("payload before masking: [%s]\nLength: %d\n",payload,payload_length););
2040          for ( i = 0; i < payload_length; i++ )
2041             DEBUGDUMP(printf("%x ",payload[i]););
2042          DEBUGDUMP(printf("\n"););
2043       }
2044       // unmask the data
2045       mask = payload - 4;
2046       for (i = 0; i < payload_length; i++)
2047       {
2048          payload[i] ^= mask[i%4];
2049       }
2050       if ( debug )
2051       {
2052          DEBUGDUMP(printf("payload after masking: [%s]\nLength: %d\n",payload,payload_length););
2053          for ( i = 0; i < payload_length; i++ )
2054             DEBUGDUMP(printf("%x ",payload[i]););
2055          DEBUGDUMP(printf("\n"););
2056       }
2057 #ifdef REQUIRE_BASE64_DECODE
2058       // base64 decode the data - not needed for text-only data ?
2059       if ( eec_base64_decode( (unsigned char *)payload, payload_length, &decoded_chunk, &decoded_length ) != 0 )
2060       {
2061          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: Base64 decode error code %d", len );
2062          return -1;
2063       }
2064       memcpy( target+target_offset, decoded_chunk, decoded_length );
2065       free( decoded_chunk );
2066 #else
2067       memcpy( target+target_offset, payload, payload_length+1 ); /* +1 ensures nul terminated string */
2068       decoded_length = payload_length;
2069 #endif
2070 
2071       // Restore the first character of the next frame
2072       payload[payload_length] = save_char;
2073       target_offset += decoded_length;
2074    }
2075 
2076    if (framecount > 1)
2077    {
2078       DEBUGDUMP(printf("framecount: %d\n",framecount););
2079    }
2080 
2081    *left = remaining;
2082    return target_offset;
2083 }
2084 
split_ws_payload(char * ws_payload,int offset,streng ** queue,streng ** buffer)2085 int split_ws_payload( char *ws_payload, int offset, streng **queue, streng **buffer )
2086 {
2087    int i,buffer_start=0;
2088    int len = strlen( ws_payload );
2089    char *q = ws_payload+offset;
2090 
2091    if ( ws_payload[offset] == '\0'
2092    || ( ws_payload[offset] == ':' && len == offset + 1 ) )
2093    {
2094       /* queue and buffer both empty */
2095       /* caters for:
2096        * /clear:
2097        * /fifo::
2098        */
2099       *queue = *buffer = NULL;
2100       return 0;
2101    }
2102    for ( i = offset; i < len; i++ )
2103    {
2104       if ( ws_payload[i] == ':' )
2105       {
2106          ws_payload[i] = '\0';
2107          *queue = Str_buf( q, i - offset );
2108          buffer_start = i+1;
2109          break;
2110       }
2111    }
2112    /* if no ':' found, buffer is  empty but we have a queue */
2113    if ( buffer_start == 0 )
2114    {
2115       *buffer = NULL;
2116       *queue = Str_buf( q, len - offset );
2117    }
2118    else
2119    {
2120       *buffer = Str_buf( ws_payload+buffer_start, len - buffer_start + 1 );
2121    }
2122    return 0;
2123 }
2124 
send_response_to_client(Client * client,char action,streng * buffer)2125 int send_response_to_client( Client *client, char action, streng *buffer )
2126 {
2127    int rc,length;
2128    char rcode[2];
2129    RxQueue *q ;
2130    streng *result=NULL;
2131 
2132    switch( action )
2133    {
2134       case RXSTACK_QUEUE_FIFO:
2135       case RXSTACK_QUEUE_LIFO:
2136          DEBUGDUMP(printf("--- Queue %s ---\n", action == RXSTACK_QUEUE_FIFO ? "FIFO" : "LIFO"););
2137          /*
2138           * fixes bug 700539
2139           */
2140          if ( buffer == NULL )
2141             buffer = Str_buf( "", 0 );
2142          if ( buffer == NULL )
2143             rc = 3;
2144          else
2145             rc = rxstack_queue_data( client, buffer, action );
2146          rcode[0] = (char)(rc+'0');
2147          rxstack_send_return( client, rcode, NULL, 0 );
2148          buffer = NULL ; /* consumed by rxstack_queue_data */
2149          break;
2150       case RXSTACK_EXIT:
2151          DEBUGDUMP(printf("--- Exit ---\n"););
2152          /*
2153           * Client has requested disconnect, so remove all
2154           * references to the client
2155           */
2156          rxstack_send_return( client, "0", NULL, 0 );
2157          rxstack_delete_client( client );
2158          if ( buffer != NULL )
2159          {
2160             DROPSTRENG( buffer );
2161             buffer = NULL;
2162          }
2163          break;
2164       case RXSTACK_KILL:
2165          DEBUGDUMP(printf("--- Kill ---\n"););
2166          /*
2167           * Client has requested server to stop
2168           */
2169          rxstack_send_return( client, "0", NULL, 0 );
2170          rxstack_delete_client( client );
2171          running = 0;
2172          return 0;
2173       case RXSTACK_SET_QUEUE:
2174          DEBUGDUMP(printf("--- Set Queue ---\n"););
2175          /*
2176           * Set the default queue for the client
2177           */
2178          if ( buffer == NULL )
2179             rxstack_send_return( client, "6", NULL, 0 );
2180          else
2181          {
2182             q = rxstack_set_default_queue( client, buffer );
2183             if ( q == NULL )
2184                rxstack_send_return( client, "3", NULL, 0 );
2185             else
2186                rxstack_send_return( client, "0", q->name->value, q->name->len );
2187             DROPSTRENG( buffer );
2188             buffer = NULL;
2189          }
2190          break;
2191       case RXSTACK_EMPTY_QUEUE:
2192          DEBUGDUMP(printf("--- Empty Queue ---\n"););
2193          /*
2194           * Use the current queue as the default queue.
2195           */
2196          if ( buffer == NULL )
2197             buffer = client->default_queue->name;
2198          rc = rxstack_empty_queue( client, buffer );
2199          rcode[0] = (char)(rc+'0');
2200          rxstack_send_return( client, rcode, NULL, 0 );
2201          if ( buffer != client->default_queue->name )
2202             DROPSTRENG( buffer );
2203          buffer = NULL ;
2204          break;
2205       case RXSTACK_NUMBER_IN_QUEUE:
2206          DEBUGDUMP(printf("--- Number in Queue ---\n"););
2207          length = rxstack_number_in_queue( client );
2208          rxstack_send_return( client, "0", NULL, length );
2209          if ( buffer != NULL )
2210          {
2211             DROPSTRENG( buffer );
2212             buffer = NULL;
2213          }
2214          break;
2215       case RXSTACK_SHOW_QUEUES:
2216          DEBUGDUMP(printf("--- Show ---\n"););
2217          rc = rxstack_get_queues( client, &result );
2218          switch( rc )
2219          {
2220             case 0: /* all OK */
2221                rxstack_send_return( client, "0", PSTRENGVAL( result ), PSTRENGLEN( result ) );
2222                DROPSTRENG( result );
2223                break;
2224             default: /* empty/error */
2225                rcode[0] = (char)(rc+'0');
2226                rxstack_send_return( client, rcode, NULL, 0 );
2227                break;
2228          }
2229          if ( buffer != NULL )
2230          {
2231             DROPSTRENG( buffer );
2232             buffer = NULL;
2233          }
2234          break;
2235       case RXSTACK_PULL:
2236       case RXSTACK_FETCH:
2237          DEBUGDUMP(printf("--- Pull ---\n"););
2238          rc = rxstack_pull_line_off_queue( client, &result, action == RXSTACK_FETCH );
2239          switch( rc )
2240          {
2241             case 0: /* all OK */
2242                rxstack_send_return( client, "0", PSTRENGVAL( result ), PSTRENGLEN( result ) );
2243                DROPSTRENG( result );
2244                break;
2245             case RXSTACK_WAITING: /* still waiting; don't return */
2246                break;
2247             default: /* empty/error */
2248                rcode[0] = (char)(rc+'0');
2249                rxstack_send_return( client, rcode, NULL, 0 );
2250                break;
2251          }
2252          if ( buffer != NULL )
2253          {
2254             DROPSTRENG( buffer );
2255             buffer = NULL;
2256          }
2257          break;
2258       case RXSTACK_GET_QUEUE:
2259          DEBUGDUMP(printf("--- Get Queue ---\n"););
2260          rxstack_send_return( client, "0", PSTRENGVAL(client->default_queue->name), PSTRENGLEN(client->default_queue->name) ) ;
2261          if ( buffer != NULL )
2262          {
2263             DROPSTRENG( buffer );
2264             buffer = NULL;
2265          }
2266          break;
2267       case RXSTACK_CREATE_QUEUE:
2268          DEBUGDUMP(printf("--- Create Queue ---\n"););
2269          /*
2270           * Create a new queue
2271           */
2272          rc = rxstack_create_queue( client, buffer, &result );
2273          rcode[0] = (char)(rc+'0');
2274          if ( ( rc != 1 ) && ( rc != 0 ) )
2275             rxstack_send_return( client, rcode, NULL, 0 );
2276          else
2277             rxstack_send_return( client, rcode, PSTRENGVAL(result), PSTRENGLEN(result) );
2278          buffer = NULL;  /* consumed by rxstack_create_queue */
2279          break;
2280       case RXSTACK_DELETE_QUEUE:
2281          DEBUGDUMP(printf("--- Delete Queue ---\n"););
2282          /*
2283           * Delete the queue
2284           */
2285          if ( buffer == NULL )
2286             rc = 6;
2287          else
2288             rc = rxstack_delete_queue( client, buffer );
2289          rcode[0] = (char)(rc+'0');
2290          rxstack_send_return( client, rcode, NULL, 0 );
2291          if ( buffer != NULL )
2292          {
2293             DROPSTRENG( buffer );
2294             buffer = NULL;
2295          }
2296          break;
2297       case RXSTACK_TIMEOUT_QUEUE:
2298          DEBUGDUMP(printf("--- Timeout Queue ---\n"););
2299          /*
2300           * Set timeout for pull from queue
2301           */
2302          if ( buffer == NULL )
2303             rc = 6;
2304          else
2305             rc = rxstack_timeout_queue( client, buffer );
2306          rcode[0] = (char)(rc+'0');
2307          rxstack_send_return( client, rcode, NULL, 0 );
2308          if ( buffer != NULL )
2309             DROPSTRENG( buffer );
2310          buffer = NULL;
2311          break;
2312       case RXSTACK_UNKNOWN:
2313          /* do nothing */
2314          break;
2315       default:
2316          rxstack_send_return( client, "9", NULL, 0 );
2317          break;
2318    }
2319    assert( buffer == NULL ) ;
2320    if ( buffer != NULL )
2321       DROPSTRENG( buffer ) ;
2322    return 1;
2323 }
2324 
rxstack_process_websockets_data(Client * client)2325 int rxstack_process_websockets_data( Client *client )
2326 {
2327    int rc = 1;
2328    char ws_headers[8192];
2329    char ws_payload[4096];
2330    char ws_response[4096];
2331    char response[4096];
2332    int response_length;
2333    int len,i,rcode;
2334    unsigned int opcode, left;
2335    streng *queue=NULL, *buffer=NULL, *result;
2336    streng *resp_streng, *rcode_streng;
2337    char action;
2338    int length;
2339    char srcode[3];
2340    RxQueue *q ;
2341 
2342    srcode[1] = ':';
2343    srcode[2] = '\0';
2344 /* assumes only 4096 bytes can b sent by client and in one chunk */
2345    memset( ws_headers, 0, sizeof(ws_headers) );
2346    rc = recv( client->socket, ws_headers, 4096, 0 );
2347    if ( rc < 0 )
2348    {
2349       if ( os_errno != ECONNRESET )
2350       {
2351          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2352       }
2353       /*
2354        * Assume client has been lost
2355        */
2356       rxstack_delete_client( client );
2357       return 0 ;
2358    }
2359    if ( rc == 0 )
2360    {
2361       DEBUGDUMP(printf("read empty WS data\n"););
2362       /*
2363        * Assume client has been lost
2364        */
2365       rxstack_delete_client( client );
2366       return 0 ;
2367    }
2368    if ( debug )
2369    {
2370       DEBUGDUMP(printf("header: [%s]\nLength: %d\n",ws_headers,rc););
2371       for ( i = 0; i < rc; i++ )
2372          DEBUGDUMP(printf("%x ",ws_headers[i]););
2373       DEBUGDUMP(printf("\n"););
2374    }
2375    /*
2376     * Decode the websocket payload
2377     */
2378    rcode = decode_ws_payload( (unsigned char *)ws_headers, rc, (unsigned char *)ws_payload, sizeof(ws_payload), &opcode, &left );
2379    DEBUGDUMP(printf("after decode payload: rcode: %d remaining: %d payload:[%s]\n",rcode,left,ws_payload););
2380    if ( opcode == 0x8 )
2381    {
2382       /* client sent close request; send back a close opcode message with empty payload */
2383       len = encode_ws_payload( (unsigned char *)"", 0, ws_response, sizeof(ws_response), opcode);
2384       rc = send( client->socket, ws_response, len, 0 );
2385       if ( rc != len )
2386       {
2387          DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
2388          return 0;
2389       }
2390       rxstack_delete_client( client );
2391    }
2392    else
2393    {
2394       DEBUGDUMP(printf("got valid command:[%s]\n",ws_payload););
2395       /* process command from client - TODO */
2396       action = determine_rxstack_command( ws_payload );
2397       if ( action == 0 )
2398       {
2399          DEBUGDUMP(printf("decoded invalid command: opcode %d\n",opcode););
2400          /* wrong action or syntax error */
2401          len = sprintf( ws_headers, "9:Incorrect rxstack command or invalid format of command: %s", ws_payload );
2402          rxstack_send_return( client, &action, ws_headers, len );
2403       }
2404       else
2405       {
2406          DEBUGDUMP(printf("decoded valid command:[%c]\n",action););
2407          /* valid action: process the request and send response */
2408          switch( action )
2409          {
2410             case RXSTACK_QUEUE_FIFO:
2411             case RXSTACK_QUEUE_LIFO:
2412                DEBUGDUMP(printf("WS: Queue %s ---\n", action == RXSTACK_QUEUE_FIFO ? "FIFO" : "LIFO"););
2413                split_ws_payload( ws_payload, ACTION_FIFO_LEN, &queue, &buffer );
2414                if ( queue
2415                &&   PSTRENGLEN( queue ) )
2416                {
2417                   q = rxstack_set_default_queue( client, queue );
2418                   if ( q == NULL )
2419                      // errorrxstack_send_return( client, "3", NULL, 0 );
2420                      break;
2421                }
2422                /*
2423                 * fixes bug 700539
2424                 */
2425                if ( buffer == NULL )
2426                   buffer = Str_buf( "", 0 );
2427                if ( buffer == NULL )
2428                   rc = 3;
2429                else
2430                   rc = rxstack_queue_data( client, buffer, action );
2431 
2432                response_length = sprintf( response, "%d:", rc );
2433                DEBUGDUMP(printf("    Response: [%s] Len: %d\n",response,response_length););
2434                rxstack_send_return( client, &action, response, response_length );
2435                buffer = NULL ; /* consumed by rxstack_queue_data */
2436                break;
2437             case RXSTACK_EMPTY_QUEUE:
2438                DEBUGDUMP(printf("WS: Empty Queue ---\n"););
2439                split_ws_payload( ws_payload, ACTION_CLEAR_LEN, &queue, &buffer );
2440                if ( !queue
2441                ||   PSTRENGLEN( queue ) == 0 )
2442                   queue = client->default_queue->name;
2443                rc = rxstack_empty_queue( client, queue );
2444                srcode[0] = (char)(rc+'0');
2445                rxstack_send_return( client, &action, srcode, 2 );
2446                break;
2447             case RXSTACK_NUMBER_IN_QUEUE:
2448                DEBUGDUMP(printf("WS: Number in Queue ---\n"););
2449                split_ws_payload( ws_payload, ACTION_QUEUED_LEN, &queue, &buffer );
2450                if ( queue
2451                &&   PSTRENGLEN( queue ) )
2452                {
2453                   q = rxstack_set_default_queue( client, queue );
2454                   if ( q == NULL )
2455                      // errorrxstack_send_return( client, "3", NULL, 0 );
2456                      break;
2457                }
2458                length = rxstack_number_in_queue( client );
2459                response_length = sprintf( response, "0:%d", length );
2460                DEBUGDUMP(printf("    Response: [%s] Len: %d\n",response,response_length););
2461                rxstack_send_return( client, &action, response, response_length );
2462                break;
2463             case RXSTACK_PULL:
2464                DEBUGDUMP(printf("WS: Pull ---\n"););
2465                split_ws_payload( ws_payload, ACTION_PULL_LEN, &queue, &buffer );
2466                if ( queue
2467                &&   PSTRENGLEN( queue ) )
2468                {
2469                   q = rxstack_set_default_queue( client, queue );
2470                   if ( q == NULL )
2471                      // errorrxstack_send_return( client, "3", NULL, 0 );
2472                      break;
2473                }
2474                rc = rxstack_pull_line_off_queue( client, &result, action == RXSTACK_FETCH );
2475                srcode[0] = (char)(rc+'0');
2476                switch( rc )
2477                {
2478                   case 0: /* all OK */
2479                      rcode_streng = Str_buf( srcode, 2 );
2480                      if ( rcode_streng == NULL )
2481                      {
2482                         rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2483                      }
2484                      else
2485                      {
2486                         resp_streng = Str_cat( rcode_streng, result );
2487                         if ( resp_streng == NULL )
2488                         {
2489                            rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2490                            DROPSTRENG( rcode_streng );
2491                         }
2492                         else
2493                         {
2494                            rxstack_send_return( client, &action, PSTRENGVAL( resp_streng ), PSTRENGLEN( resp_streng ) );
2495                            DROPSTRENG( result );
2496                            DROPSTRENG( resp_streng );
2497                            DROPSTRENG( rcode_streng );
2498                         }
2499                      }
2500                      break;
2501                   case RXSTACK_WAITING: /* still waiting; don't return */
2502                      break;
2503                   default: /* empty/error */
2504                      rxstack_send_return( client, &action, srcode, 2 );
2505                      break;
2506                }
2507                break;
2508             case RXSTACK_CREATE_QUEUE:
2509                DEBUGDUMP(printf("WS: Create Queue ---\n"););
2510                /*
2511                 * Create a new queue
2512                 */
2513                split_ws_payload( ws_payload, ACTION_CREATE_LEN, &queue, &buffer );
2514                rc = rxstack_create_queue( client, queue, &result );
2515                srcode[0] = (char)(rc+'0');
2516                if ( ( rc != 1 ) && ( rc != 0 ) )
2517                   rxstack_send_return( client, &action, srcode, 2 );
2518                else
2519                {
2520                   rcode_streng = Str_buf( srcode, 2 );
2521                   if ( rcode_streng == NULL )
2522                   {
2523                      rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2524                   }
2525                   else
2526                   {
2527                      resp_streng = Str_cat( rcode_streng, result );
2528                      if ( resp_streng == NULL )
2529                      {
2530                         rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2531                         DROPSTRENG( rcode_streng );
2532                      }
2533                      else
2534                      {
2535                         rxstack_send_return( client, &action, PSTRENGVAL( resp_streng ), PSTRENGLEN( resp_streng ) );
2536                         DROPSTRENG( result );
2537                         DROPSTRENG( resp_streng );
2538                         DROPSTRENG( rcode_streng );
2539                      }
2540                   }
2541                }
2542                buffer = NULL;  /* consumed by rxstack_create_queue */
2543                break;
2544             case RXSTACK_DELETE_QUEUE:
2545                DEBUGDUMP(printf("WS: Delete Queue ---\n"););
2546                /*
2547                 * Delete the queue
2548                 */
2549                split_ws_payload( ws_payload, ACTION_DELETE_LEN, &queue, &buffer );
2550                if ( !queue
2551                ||   PSTRENGLEN( queue ) == 0 )
2552                   rc = 6;
2553                else
2554                   rc = rxstack_delete_queue( client, queue );
2555                srcode[0] = (char)(rc+'0');
2556                rxstack_send_return( client, &action, srcode, 2 );
2557                break;
2558             default:
2559                break;
2560          }
2561          if ( buffer )
2562             DROPSTRENG( buffer ) ;
2563          if ( queue
2564          &&   queue != client->default_queue->name )
2565             DROPSTRENG( queue ) ;
2566       }
2567    }
2568 
2569    DEBUGDUMP(printf("after valid response: rcode: %d\n",rcode););
2570 
2571    return rc;
2572 }
2573 
rxstack_process_traditional_command(Client * client)2574 int rxstack_process_traditional_command( Client * client )
2575 {
2576    char cheader[RXSTACK_HEADER_SIZE];
2577    streng *header;
2578    streng *buffer = NULL ;
2579    int rc,length;
2580    memset( cheader, 0, sizeof(cheader) );
2581    DEBUGDUMP(printf("\nreading from socket %d\n", client->socket););
2582    rc = recv( client->socket, cheader, RXSTACK_HEADER_SIZE, 0 );
2583    if ( rc < 0 )
2584    {
2585       if ( os_errno != ECONNRESET )
2586       {
2587          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2588       }
2589       /*
2590        * Assume client has been lost
2591        */
2592       rxstack_delete_client( client );
2593       return 0 ;
2594    }
2595    if ( rc == 0 )
2596    {
2597       DEBUGDUMP(printf("read empty header\n"););
2598       /*
2599        * Assume client has been lost
2600        */
2601       rxstack_delete_client( client );
2602       return 0 ;
2603    }
2604    else if ( rc != RXSTACK_HEADER_SIZE )
2605    {
2606       DEBUGDUMP(printf("read corrupted header\n"););
2607       /*
2608        * Assume client has been lost
2609        */
2610       rxstack_delete_client( client );
2611       return 0 ;
2612    }
2613    DEBUGDUMP(printf("header: %.*s\n",RXSTACK_HEADER_SIZE,cheader););
2614    header = MakeStreng( RXSTACK_HEADER_SIZE - 1 );
2615    if ( header == NULL )
2616    {
2617       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2618       exit( ERR_STORAGE_EXHAUSTED );
2619    }
2620    memcpy( PSTRENGVAL(header), cheader+1, RXSTACK_HEADER_SIZE-1 );
2621    header->len = RXSTACK_HEADER_SIZE-1 ;
2622    buffer = NULL;
2623    /*
2624     * Convert the data length
2625     */
2626    length = REXX_X2D( header, &rc );
2627    if ( rc )
2628    {
2629       /*
2630        * Errorneous number. Kill the client.
2631        */
2632       DEBUGDUMP(printf("Invalid header: <%.*s>, client killed\n", header->len, header->value););
2633       rxstack_send_return( client, "9", NULL, 0 );
2634       rxstack_delete_client( client );
2635       return 1;
2636    }
2637 
2638    DEBUGDUMP(printf("Header: <%.*s> length: %d\n", header->len, header->value, length););
2639    DROPSTRENG( header );
2640    if ( length > 0 )
2641    {
2642       /*
2643        * Allocate a streng big enough for the expected data
2644        * string, based on the length just read; even if the length
2645        * is zero
2646        */
2647       buffer = MAKESTRENG ( length );
2648       if ( buffer == NULL )
2649       {
2650          showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2651          DEBUGDUMP(printf("can't buffer input of client\n"););
2652          rxstack_delete_client( client );
2653          return 0;
2654       }
2655       rc = recv( client->socket, PSTRENGVAL(buffer), length, 0 );
2656       if ( rc < 0 )
2657       {
2658          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2659       }
2660       else if ( rc == 0 )
2661       {
2662          /*
2663           * All we can assume here is that the client has been lost
2664           */
2665          DEBUGDUMP(printf("read empty header\n"););
2666          rxstack_delete_client( client );
2667          DROPSTRENG( buffer ) ;
2668          return 0 ;
2669       }
2670       else
2671          buffer->len = length ;
2672    }
2673    send_response_to_client( client, cheader[0], buffer );
2674    return 1;
2675 }
2676 
2677 /* rxstack_process_command reads a new command from the client and processes
2678  * it.
2679  * returns 0 if the client has been terminated, 1 if the client persists.
2680  */
rxstack_process_command(Client * client)2681 int rxstack_process_command( Client *client )
2682 {
2683    char pheader[RXSTACK_PEEK_HEADER_SIZE];
2684    int rc;
2685 
2686    if ( client->deadline.milli != -1 )
2687    {
2688       /*
2689        * interrupted wait, assume the client don't want to wait for data any
2690        * longer
2691        */
2692       bad_news_for_waiter( client->default_queue, client ) ;
2693    }
2694    memset( pheader, 0, sizeof(pheader) );
2695    DEBUGDUMP(printf("\npeeking from socket %d\n", client->socket););
2696    rc = recv( client->socket, pheader, RXSTACK_PEEK_HEADER_SIZE, MSG_PEEK );
2697    if ( rc < 0 )
2698    {
2699       if ( os_errno != ECONNRESET )
2700       {
2701          showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2702       }
2703       /*
2704        * Assume client has been lost
2705        */
2706       rxstack_delete_client( client );
2707       return 0 ;
2708    }
2709    if ( rc == 0 )
2710    {
2711       DEBUGDUMP(printf("read empty header\n"););
2712       /*
2713        * Assume client has been lost
2714        */
2715       rxstack_delete_client( client );
2716       return 0 ;
2717    }
2718    else if ( rc != RXSTACK_PEEK_HEADER_SIZE )
2719    {
2720       DEBUGDUMP(printf("read corrupted header\n"););
2721       /*
2722        * Assume client has been lost
2723        */
2724       rxstack_delete_client( client );
2725       return 0 ;
2726    }
2727    DEBUGDUMP(printf("peek at 1st two bytes of header: 0x%x%x\n", pheader[0],pheader[1]););
2728    /*
2729     * We have peeked at the first 2 bytes of the message. determine what type of connection it is:
2730     * GE - header for Websocket interface (start of GET)
2731     * first byte < x20 - Websocket data
2732     * anything else - assume traditional interface
2733     */
2734    if ( memcmp( pheader, "GE", 2 ) == 0 )
2735    {
2736       DEBUGDUMP(printf("have a WS header...\n"););
2737       rc = rxstack_process_websockets_headers( client );
2738    }
2739    else if ( client->isWebsocket )
2740    {
2741       DEBUGDUMP(printf("have WS data...\n"););
2742       rc = rxstack_process_websockets_data( client );
2743    }
2744    else
2745    {
2746       DEBUGDUMP(printf("have normal data...\n"););
2747       rc = rxstack_process_traditional_command( client );
2748    }
2749    return rc;
2750 }
2751 
2752 /*
2753  * earlier returns the time stamp which is more early.
2754  */
earlier(RxTime t1,RxTime t2)2755 static RxTime earlier( RxTime t1, RxTime t2 )
2756 {
2757    if ( time_diff( t1, t2 ) == -2 )
2758       return t1 ;
2759    return t2 ;
2760 }
2761 
2762 /* check_for_waiting checks a client for a pending IO request.
2763  * We currently only support PULL requests.
2764  * The function returns immediately if the client doesn't wait.
2765  * In the other case it checks whether the maximum wait time has been
2766  * expired.
2767  * If the client is still waiting and the deadline isn't reached, it
2768  * checks if the deadline is more early then next_timeout and sets this
2769  * value is necessary.
2770  *
2771  * The client's queue's deadline is set to the default deadline of queues in
2772  * all cases.
2773  */
check_for_waiting(Client * client,RxTime * next_timeout)2774 void check_for_waiting( Client *client, RxTime *next_timeout )
2775 {
2776    int diff ;
2777 
2778    client->default_queue->deadline = queue_deadline ;
2779    /* Do we are a waiter? */
2780    if ( client->deadline.milli == -1 )
2781       return ;
2782 
2783    /*
2784     * Check if there is anything in the queue...
2785     */
2786    diff = time_diff( client->deadline, now ) ;
2787    if ( ( diff != -2 ) && ( diff != 0 ) )
2788    {
2789       DEBUGDUMP(
2790          if ( diff == -1 )
2791             printf("Still waiting infinitely for %d\n", client->socket );
2792          else
2793             printf("Still waiting %d ms at max for %d\n", diff, client->socket );
2794       );
2795       *next_timeout = earlier( *next_timeout, client->deadline ) ;
2796    }
2797    else
2798    {
2799       bad_news_for_waiter( client->default_queue, client ) ;
2800    }
2801 }
2802 
2803 /* check_queue checks a queue has reached its deadline and if no clients
2804  * have this queue as the default queue. The queue is deleted in this
2805  * case.
2806  * If the queuet is still valid and the deadline isn't reached, it
2807  * checks if the deadline is more early then next_timeout and sets this
2808  * value is necessary.
2809  */
check_queue(RxQueue * q,RxTime * next_timeout)2810 void check_queue( RxQueue *q, RxTime *next_timeout )
2811 {
2812    int diff ;
2813    Client *c ;
2814 
2815    diff = time_diff( q->deadline, now ) ;
2816    if ( ( diff != -2 ) && ( diff != 0 ) )
2817    {
2818       *next_timeout = earlier( *next_timeout, q->deadline ) ;
2819       return ;
2820    }
2821 
2822    for ( c = clients; c != NULL; c = c->next )
2823    {
2824       if ( c->default_queue == q )
2825       {
2826          q->deadline = queue_deadline ;
2827          *next_timeout = earlier( *next_timeout, q->deadline ) ;
2828          return ;
2829       }
2830    }
2831 
2832    DEBUGDUMP( printf( "Purging unused queue <%.*s>\n", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ) );
2833    q->deadline = queue_deadline ; /* needed at least for SESSION */
2834    *next_timeout = earlier( *next_timeout, q->deadline ) ;
2835    delete_a_queue( q ) ;
2836 }
2837 
rxstack_doit()2838 int rxstack_doit( )
2839 {
2840    RxTime timeout ;
2841    int listen_sock,msgsock;
2842    struct sockaddr_in server,client;
2843 #ifdef HAVE_SOCKLEN_T
2844    socklen_t client_size ;
2845 #else
2846    int client_size ;
2847 #endif
2848    int rc;
2849    Client *c, *ch ;
2850    RxQueue *q, *qh ;
2851 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2852 # define POLL_INCR 16
2853    struct pollfd *pd = NULL ;
2854    unsigned poll_max = 0 ;
2855    unsigned poll_cnt = 0 ;
2856 #else
2857    int max_sock ;
2858    fd_set ready ;
2859    struct timeval to ;
2860 #endif
2861 #ifdef BUILD_NT_SERVICE
2862    char buf[30];
2863 #endif
2864 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
2865    int on = 1;
2866 #endif
2867 
2868    client_size = sizeof( struct sockaddr );
2869 #ifdef WIN32
2870    if ( init_external_queue( NULL ) )
2871       return 1;
2872 #endif
2873 
2874 #ifdef BUILD_NT_SERVICE
2875    if ( IsItNT()
2876    &&   !report_service_pending_start() )
2877       goto notrunning;
2878 #endif
2879    /*
2880     * Set up signal handler
2881     */
2882 #ifdef SIGTERM
2883    signal( SIGTERM, rxstack_signal_handler );
2884 #endif
2885 #ifdef SIGINT
2886    signal( SIGINT, rxstack_signal_handler );
2887 #endif
2888 #ifdef SIGBREAK
2889    signal( SIGBREAK, rxstack_signal_handler );
2890 #endif
2891 #ifdef SIGPIPE
2892    signal( SIGPIPE, SIG_IGN );
2893 #endif
2894    clients = NULL ;
2895    queues = NULL ;
2896 
2897    /*
2898     * Initialise default "SESSION" queue
2899     */
2900    if ( ( SESSION = get_new_queue( ) ) == NULL )
2901    {
2902       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL ) ;
2903       return ERR_STORAGE_EXHAUSTED ;
2904    }
2905    SESSION->name = Str_cre_or_exit( "SESSION", 7 ) ;
2906    SESSION->isReal = 1;
2907 
2908 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2909    pd = (struct pollfd *)malloc( ( poll_max = POLL_INCR ) * sizeof( struct pollfd ) );
2910    if ( pd == NULL )
2911    {
2912       showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2913       exit( ERR_STORAGE_EXHAUSTED );
2914    }
2915 #endif
2916 
2917 #ifdef BUILD_NT_SERVICE
2918    if ( IsItNT()
2919    &&   !report_service_pending_start() )
2920       goto notrunning;
2921 #endif
2922    /*
2923     * Create listener socket
2924     */
2925    listen_sock = socket(AF_INET, SOCK_STREAM, 0);
2926    if (listen_sock < 0)
2927    {
2928       showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Listening on socket", errno_str( os_errno ) );
2929       rxstack_cleanup();
2930       exit(ERR_RXSTACK_GENERAL);
2931    }
2932    memset( &server, 0, sizeof(server) );
2933    server.sin_family = AF_INET;
2934    if ( world )
2935       server.sin_addr.s_addr = htonl(INADDR_ANY);
2936    else
2937       server.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
2938    if ( portno == 0 )
2939       portno = default_port_number();
2940    server.sin_port = htons((unsigned short) portno);
2941 
2942 #ifdef BUILD_NT_SERVICE
2943    if ( IsItNT()
2944    &&   !report_service_pending_start() )
2945       goto notrunning;
2946 #endif
2947 
2948 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
2949    setsockopt( listen_sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof( on ) );
2950 #endif
2951    if ( bind(listen_sock, (struct sockaddr *)&server, sizeof(server)) < 0)
2952    {
2953       showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Error binding socket", errno_str( os_errno ) );
2954       rxstack_cleanup();
2955       exit( ERR_RXSTACK_GENERAL );
2956    }
2957 #ifdef BUILD_NT_SERVICE
2958    sprintf(buf, "Listening on port: %ld", portno );
2959    if ( IsItNT() )
2960    {
2961       if ( !report_service_start() )
2962          goto notrunning;
2963       if ( debug == 1 )
2964       {
2965          printf( "%s\n", buf );
2966          fflush(stdout);
2967       }
2968       else
2969       {
2970          AddToMessageLog(TEXT(buf));
2971       }
2972    }
2973    else
2974    {
2975       printf( "%s\n", buf );
2976       fflush(stdout);
2977    }
2978 #else
2979    printf( "rxstack listening on port: %ld\n", portno );
2980    fflush(stdout);
2981 #endif
2982    /*
2983     * Start accepting connections
2984     */
2985    listen(listen_sock, 5);
2986    timeout = get_now( ) ;
2987    time_add( &timeout, DEFAULT_WAKEUP ) ;
2988    queue_deadline = now ;
2989    time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
2990    while ( running )
2991    {
2992 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2993       poll_cnt = 0 ;
2994       pd[ poll_cnt   ].events = POLLIN ;
2995       pd[ poll_cnt++ ].fd = listen_sock ;
2996       DEBUGDUMP(printf("****** poll((%d", listen_sock););
2997       for ( c = clients; c != NULL; c = c->next )
2998       {
2999          if ( c->socket == -1 )
3000          {
3001             continue ;
3002          }
3003 
3004          if ( poll_cnt == poll_max )
3005          {
3006             pd = (struct pollfd *)realloc( pd, ( poll_max += POLL_INCR ) * sizeof( struct pollfd ) );
3007             if ( pd == NULL )
3008             {
3009                showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
3010                exit( ERR_STORAGE_EXHAUSTED );
3011             }
3012          }
3013 
3014          pd[ poll_cnt   ].events = POLLIN ;
3015          pd[ poll_cnt++ ].fd = c->socket ;
3016          DEBUGDUMP(printf(", %d", c->socket););
3017       }
3018 #else
3019       FD_ZERO(&ready);
3020       FD_SET(listen_sock, &ready);
3021       DEBUGDUMP(printf("****** select((%d", listen_sock););
3022       max_sock = listen_sock;
3023       /*
3024        * For each connected client, allow its socket
3025        * to be triggered
3026        */
3027       for ( c = clients; c != NULL; c = c->next )
3028       {
3029          if ( c->socket != -1 )
3030          {
3031             DEBUGDUMP(printf(", %d", c->socket););
3032             FD_SET( c->socket, &ready );
3033             if ( c->socket > max_sock )
3034                max_sock = c->socket;
3035          }
3036       }
3037 #endif
3038       now = get_now( ) ;
3039       rc = time_diff( timeout, now ) ;
3040       if ( rc == -2 )
3041          rc = 0 ; /* already timed out */
3042       if ( ( rc == -1 ) || ( rc > DEFAULT_WAKEUP ) )
3043          rc = DEFAULT_WAKEUP ;
3044       DEBUGDUMP(printf("), to=%d) ms at %ld,%03d\n", rc, (long) now.seconds, now.milli ););
3045 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3046       rc = poll( pd, poll_cnt, rc ) ;
3047 #else
3048       to.tv_usec = ( rc % 1000 ) * 1000 ; /* microseconds fraction */
3049       to.tv_sec = rc / 1000;
3050       rc = select( max_sock + 1, &ready, (fd_set *)0, (fd_set *)0, &to ) ;
3051 #endif
3052       now = get_now( ) ;
3053       DEBUGDUMP(printf("****** after waiting(), rc=%d at %ld,%03d\n", rc, (long) now.seconds, now.milli ););
3054       if ( rc < 0 )
3055       {
3056          if ( os_errno != EINTR ) /* Win32 doesn't know about it ? */
3057          {
3058             showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling select", errno_str( os_errno ) );
3059             exit( ERR_RXSTACK_GENERAL );
3060          }
3061          continue ;
3062       }
3063       if ( rc )
3064       {
3065 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3066          if ( pd[ 0 ].revents )
3067 #else
3068          if ( FD_ISSET(listen_sock, &ready ) )
3069 #endif
3070          {
3071             msgsock = accept(listen_sock, (struct  sockaddr *)&client, &client_size);
3072             if (msgsock == -1)
3073             {
3074                showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling listen", errno_str( os_errno ) );
3075                rxstack_cleanup();
3076                exit( ERR_RXSTACK_GENERAL );
3077             }
3078             else
3079             {
3080                /*
3081                 * A client has connected, create a client entry
3082                 * and set their default queue to SESSION
3083                 */
3084                /*
3085                 * Validate the client here...TBD
3086                 * use details in client sockaddr struct
3087                 */
3088                DEBUGDUMP(printf("Client connecting from %s has socket %d\n", inet_ntoa( client.sin_addr ), msgsock ););
3089                rxstack_create_client( msgsock );
3090             }
3091          }
3092          else
3093          {
3094 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3095             for ( c = clients, poll_cnt = 1; c != NULL; poll_cnt++ )
3096 #else
3097             for ( c = clients; c != NULL; )
3098 #endif
3099             {
3100                /* c might be deleted by the following calls.
3101                 * Assure we have everything perfect to use the
3102                 * next element. An access to c after rxstack_process_command
3103                 * is forbidden.
3104                 */
3105                ch = c ;
3106                c = c->next ;
3107 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3108                if ( pd[ poll_cnt ].revents )
3109 #else
3110                if ( FD_ISSET( ch->socket, &ready ) )
3111 #endif
3112                {
3113                   DEBUGDUMP(printf("Client with socket %d ready for reading\n", ch->socket ););
3114                   /*
3115                    * Process the client's command...
3116                    */
3117                   rxstack_process_command( ch ) ;
3118                }
3119             }
3120          }
3121       }
3122       /*
3123        * If select() timed out or received input, check all connected clients who
3124        * may be waiting for input on one of the queues.
3125        *
3126        * now contains the time between the start of select() call and now
3127        * in milliseconds
3128        */
3129       now = get_now();
3130       timeout = get_now( ) ;
3131       time_add( &timeout, DEFAULT_WAKEUP ) ;
3132       queue_deadline = now ;
3133       time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
3134       for ( c = clients; c != NULL; c = c->next )
3135       {
3136          check_for_waiting( c, &timeout );
3137       }
3138       for ( q = queues; q != NULL; )
3139       {
3140          qh = q ;
3141          q = q->next ;
3142          check_queue( qh, &timeout );
3143       }
3144    }
3145 #ifdef BUILD_NT_SERVICE
3146 notrunning:
3147 #endif
3148    return 0;
3149 }
3150 
3151 /*
3152  * Gives a short usage description on stderr and returns 1
3153  */
usage(const char * argv0)3154 int usage( const char *argv0 )
3155 {
3156    fprintf( stdout, "\n%s: %s (%d bit). All rights reserved.\n", argv0, PARSE_VERSION_STRING, REGINA_BITS );
3157    fprintf( stdout,"Regina is distributed under the terms of the GNU Library Public License \n" );
3158    fprintf( stdout,"and comes with NO WARRANTY. See the file COPYING-LIB for details.\n" );
3159    fprintf( stdout,"\n%s [switches]\n", argv0 );
3160    fprintf( stdout,"where switches are:\n\n" );
3161    fprintf( stdout,"  --help, -h               show this message\n" );
3162    fprintf( stdout,"  --version, -v            display Regina version and exit\n" );
3163    fprintf( stdout,"  --debug, -D              turn on debugging\n" );
3164 #if defined(HAVE_FORK)
3165    fprintf( stdout,"  --daemon, -d             run %s as a daemon process\n", argv0 );
3166 #endif
3167    fprintf( stdout,"  --kill, -k               kill the running %s process\n", argv0 );
3168    fprintf( stdout,"  --port=portno, -pportno  listen on TCP port portno\n");
3169    fprintf( stdout,"  --world, -w              allow connections from anywhere. By default only connections from localhost allowed\n");
3170    fflush( stdout );
3171    return 1 ;
3172 }
3173 
checkDebug(void)3174 static void checkDebug(void)
3175 {
3176    if ( getenv( "RXDEBUG" ) != NULL )
3177       debug = 1 ;
3178 }
3179 
runNormal(int argc,char ** argv)3180 int runNormal( int argc, char **argv )
3181 {
3182    int rc = 0 ;
3183    int c;
3184    const char *argv0 = argv[ 0 ] ;
3185    static struct my_getopt_option long_options[] =
3186    {
3187       {"help",    no_argument,       0,  'h' },
3188       {"version", no_argument,       0,  'v' },
3189       {"debug",   no_argument,       0,  'D' },
3190       {"kill",    no_argument,       0,  'k' },
3191       {"world",   no_argument,       0,  'w' },
3192       {"port",    required_argument, 0,  'p' },
3193       {"daemon",  no_argument,       0,  'd' },
3194       {0,         0,                 0,  0 }
3195   };
3196 
3197    checkDebug();
3198    while (1)
3199    {
3200       int option_index = 0;
3201 
3202       c = my_getopt_long( argc, argv, "+hvDdkwp:", long_options, &option_index );
3203       if ( c == -1 )
3204          break;
3205 
3206       switch(c)
3207       {
3208          case 'h':
3209             return usage( argv0 );
3210             break;
3211          case 'D':
3212             debug = 1;
3213             putenv( "RXDEBUG=1" ) ;
3214             break;
3215          case 'd':
3216 #ifndef BUILD_NT_SERVICE
3217             isdaemon = 1;
3218 #endif
3219             break;
3220          case 'k':
3221             tosuicide = 1;
3222             break;
3223          case 'w':
3224             world = 1;
3225             break;
3226          case 'p':
3227             portno = atol( optarg );
3228             if ( portno == 0
3229             || portno > 0xFFFF )
3230             {
3231                fprintf( stderr, "Option \"-p\" or \"--port\" requires integer between 1 and 65535.\n" ) ;
3232                return usage( argv0 );
3233             }
3234             break;
3235          case 'v':
3236             fprintf( stderr, "%s: %s (%d bit)\n", argv0, PARSE_VERSION_STRING, REGINA_BITS );
3237             return 0;
3238          default: /* unknown switch */
3239             return usage( argv0 );
3240             break;
3241       }
3242    }
3243    if ( argc > optind )
3244    {
3245       fprintf( stderr, "Extra, unknown command line argument(s).\n" ) ;
3246       return usage( argv0 );
3247    }
3248    if ( tosuicide )
3249       return suicide();
3250 #ifndef BUILD_NT_SERVICE
3251    if ( isdaemon )
3252    {
3253 #if defined(HAVE_FORK)
3254       if ( ( rc = fork() ) != 0 )
3255          exit(rc < 0);
3256       rc = rxstack_doit();
3257 #else
3258       fprintf( stderr, "Option \"-d\" or \"--daemon\" is invalid on this platform.\n" ) ;
3259       return usage( argv0 );
3260 #endif
3261    }
3262    else
3263 #endif
3264    {
3265       rc = rxstack_doit();
3266    }
3267    rxstack_cleanup();
3268    printf( "%s terminated.\n", argv0 );
3269    fflush(stdout);
3270    return rc;
3271 }
3272 
3273 #ifdef BUILD_NT_SERVICE
ServiceStart(DWORD argc,LPTSTR * argv)3274 VOID ServiceStart(DWORD argc, LPTSTR *argv)
3275 #else
3276 int main(int argc, char *argv[])
3277 #endif
3278 {
3279 #ifdef BUILD_NT_SERVICE
3280    if ( IsItNT() )
3281    {
3282       if ( !nt_service_start() )
3283       {
3284 /*
3285          checkDebug();
3286          rxstack_doit();
3287 */
3288          runNormal(argc, argv);
3289       }
3290       rxstack_cleanup();
3291       return;
3292    }
3293    else
3294    {
3295       runNormal(argc, argv);
3296    }
3297 #else
3298    return runNormal( argc, argv );
3299 #endif
3300 }
3301