1 /*
2 * The Regina Rexx Interpreter
3 * Copyright (C) 1992-1994 Anders Christensen <anders@pvv.unit.no>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the Free
17 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19 /*
20 * This process runs as a daemon (or NT service). It maintains multiple,
21 * named Rexx queues.
22 * All communication is done via TCP/IP sockets.
23 * This process waits on a known port; 5656 by default for connections
24 * from clients. A client is any process that respects the Interface
25 * defined below. The "normal" clients are regina and rxqueue.
26 * Details about each client is kept, like current queue name.
27 *
28 * Structure
29 * ---------
30 * startup
31 * - set signal handler for SIGTERM
32 * initialise socket interface
33 * - socket()
34 * - bind()
35 * - listen()
36 * loop until killed
37 * - setup read FDs
38 * - select()
39 * - if listen socket, add new client
40 * - otherwise read command
41 * cleanup
42 * - disconnect all clients
43 * - free up resources
44 *
45 * Interface.
46 * ---------
47 * Once a client connects, it sends commands
48 * Commands are single character, followed by optional 6 hex character
49 * length and optional data.
50 * F - queue data onto client's current queue (FIFO)
51 * in -> FFFFFFFxxx--data--xxx
52 * out-> 0000000 (if successful)
53 * out-> 2xxxxxx (if error, eg queue deleted)
54 * out-> 3000000 (memory allocation error)
55 * regina QUEUE, rxqueue /fifo, websocket /fifo
56 * L - push data onto client's current queue (LIFO)
57 * in-> LFFFFFFxxx--data--xxx
58 * out-> 0000000 (if successful)
59 * out-> 2xxxxxx (if error, eg queue deleted)
60 * out-> 3000000 (memory allocation error)
61 * regina PUSH, rxqueue /lifo, websocket /lifo
62 * C - create queue
63 * in-> CFFFFFFxxx--queue name--xxx (if length 0, create name)
64 * out-> 0FFFFFFxxx--queue name--xxx (if queue name created)
65 * out-> 1FFFFFFxxx--queue name--xxx (if queue name existed)
66 * out-> 2xxxxxx (if error)
67 * out-> 6000000 (queue name not passed)
68 * regina RXQUEUE('C'), rxqueue N/A, websocket /create
69 * D - delete queue
70 * in-> DFFFFFFxxx--queue name--xxx
71 * out-> 0000000 (if queue name deleted)
72 * out-> 5xxxxxx (trying to delete 'SESSION' queue)
73 * out-> 6000000 (queue name not passed)
74 * out-> 9xxxxxx (if error, eg queue already deleted)
75 * regina RXQUEUE('D'), rxqueue N/A, websocket /delete
76 * E - empty data from specified queue
77 * in-> EFFFFFFxxx--queue name--xxx
78 * out-> 0000000 (if queue emptied)
79 * out-> 2xxxxxx (if error, eg queue deleted)
80 * out-> 3000000 (memory allocation error)
81 * regina N/A, rxqueue /clear, websocket /clear
82 * P - pop item off client's default queue
83 * in-> P000000
84 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
85 * out-> 1000000 (if queue empty)
86 * out-> 2xxxxxx (if queue name deleted - length ignored)
87 * out-> 4xxxxxx (if timeout on queue exceeded - length ignored)
88 * regina PULL, rxqueue /pull, websocket /pull
89 * p - fetch item off client's default queue
90 * in-> p000000
91 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
92 * out-> 1000000 (if queue empty)
93 * out-> 2xxxxxx (if queue name deleted - length ignored)
94 * regina PULL without timeout, rxqueue N/A, websocket N/A
95 * S - set default queue name (allow false queues)
96 * in-> SFFFFFFxxx--queue name--xxx
97 * out-> 0000000 (if successful)
98 * out-> 3000000 (memory allocation error)
99 * out-> 6000000 (queue name not passed)
100 * regina RXQUEUE('S'), rxqueue N/A, websocket N/A
101 * G - get default queue name
102 * in-> G000000
103 * out-> 0FFFFFFxxx--queue name--xxx
104 * regina RXQUEUE('G'), rxqueue N/A, websocket N/A
105 * N - return number of lines on stack
106 * in-> N000000
107 * out-> 0FFFFFF (if queue exists)
108 * out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
109 * regina QUEUED(), rxqueue /queued, websocket /queued
110 * Q - return names of stacks
111 * in-> Q000000
112 * out-> 0FFFFFFxxx--queue name--xxxDDxxx--queue name---xxxDD...
113 * where FFFFFF is length of all names
114 * and DD is xFF delimiting names of queues and ... is more queues
115 * out-> 2xxxxxx (if error)
116 * regina RXQUEUE(), rxqueue /queues, websocket /queues
117 * T - set timeout on queue pull
118 * in-> TFFFFFFTTTTTT
119 * out-> 0000000 (if queue timeout set)
120 * out-> 2xxxxxx (if error, eg invalid argument)
121 * out-> 6000000 (queue name not passed)
122 * regina RXQUEUE('T'), rxqueue N/A, websocket N/A
123 * X - client disconnect
124 * in-> X000000
125 * out->
126 * Z - client requests shutdown - should only be called by ourselves!!
127 * in-> Z000000
128 * out->
129 */
130
131 #define NO_CTYPE_REPLACEMENT
132 #include "rexx.h"
133
134 #if defined(WIN32) || defined(__LCC__)
135 # if defined(_MSC_VER)
136 # if _MSC_VER >= 1100
137 /* Stupid MSC can't compile own headers without warning at least in VC 5.0 */
138 # pragma warning(disable: 4115 4201 4214 4514)
139 # endif
140 # include <windows.h>
141 # if _MSC_VER >= 1100
142 # pragma warning(default: 4115 4201 4214)
143 # endif
144 # elif defined(__LCC__)
145 # include <windows.h>
146 # include <winsvc.h>
147 # include <winsock.h>
148 # else
149 # include <windows.h>
150 # endif
151 # include <io.h>
152 #else
153 # ifdef HAVE_SYS_SOCKET_H
154 # include <sys/socket.h>
155 # endif
156 # ifdef HAVE_NETINET_IN_H
157 # include <netinet/in.h>
158 # endif
159 # if defined(HAVE_POLL_H) && defined(HAVE_POLL)
160 # include <poll.h>
161 # elif defined(HAVE_SYS_POLL_H) && defined(HAVE_POLL)
162 # include <sys/poll.h>
163 # elif defined(HAVE_SYS_SELECT_H)
164 # include <sys/select.h>
165 # endif
166 # ifdef HAVE_NETDB_H
167 # include <netdb.h>
168 # endif
169 # ifdef HAVE_ARPA_INET_H
170 # include <arpa/inet.h>
171 # endif
172 # define closesocket(x) close(x)
173 #endif
174 #include <string.h>
175
176 #ifdef HAVE_UNISTD_H
177 #include <unistd.h>
178 #endif
179
180 #ifdef HAVE_ERRNO_H
181 #include <errno.h>
182 #endif
183
184 #ifdef HAVE_SIGNAL_H
185 #include <signal.h>
186 #endif
187
188 #ifdef HAVE_CTYPE_H
189 #include <ctype.h>
190 #endif
191
192 #ifdef HAVE_PROCESS_H
193 # include <process.h>
194 #endif
195
196 #if defined(TIME_WITH_SYS_TIME)
197 # include <sys/time.h>
198 # include <time.h>
199 #else
200 # if defined(HAVE_SYS_TIME_H)
201 # include <sys/time.h>
202 # else
203 # include <time.h>
204 # endif
205 #endif
206
207 #include <assert.h>
208
209 #define HAVE_FORK
210 #if defined(__WATCOMC__) || defined(_MSC_VER) || (defined(__IBMC__) && defined(WIN32)) || defined(__SASC) || defined(__MINGW32__) || defined(__BORLANDC__) || defined(DOS) || defined(__LCC__)
211 # undef HAVE_FORK
212 #endif
213 #if defined(__WATCOMC__) && defined(__QNX__)
214 # define HAVE_FORK
215 #endif
216
217 #include "extstack.h"
218 #include "mygetopt.h"
219 #include "contrib/LibSha1.h"
220
221 #ifdef BUILD_NT_SERVICE
222 # include "service.h"
223 /*
224 * this event is signalled when the
225 * service should end
226 */
227 HANDLE hServerStopEvent = NULL;
228 #endif
229
230 #ifdef WIN32
231 # define os_errno ((int)WSAGetLastError())
232 # define errno_str(code) Win32ErrorString(code)
233 # undef EINTR
234 # define EINTR WSAEINTR
235 # undef ECONNRESET
236 # define ECONNRESET WSAECONNRESET
237 #else
238 # define os_errno errno
239 # define errno_str(code) strerror(code)
240 #endif
241
242 #define WS_RESPONSE_HEADER "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %.*s\r\n\r\n"
243 #define WS_RESPONSE_MAGIC "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
244 /*
245 * Translation Table for Base64 encoding as described in RFC1113
246 */
247 static const char cb64[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
248 #ifdef REQUIRE_BASE64_DECODE
249 static const char cd64[]="|$$$}rstuvwxyz{$$$$$$$>?@ABCDEFGHIJKLMNOPQRSTUVW$$$$$$XYZ[\\]^_`abcdefghijklmnopq";
250 #endif
251 /*
252 * debugging is turned off. You can turn it on by the command line option "-D".
253 */
254 static int debug = 0 ;
255 #define DEBUGDUMP(x) { if ( debug ) \
256 {x;} \
257 }
258 /*
259 * Length of action string from websocket command
260 */
261 #define ACTION_FIFO_LEN 6
262 #define ACTION_LIFO_LEN 6
263 #define ACTION_QUEUED_LEN 8
264 #define ACTION_CLEAR_LEN 7
265 #define ACTION_DELETE_LEN 8
266 #define ACTION_CREATE_LEN 8
267 #define ACTION_PULL_LEN 6
268
269 /*
270 * DEFAULT_WAKEUP is the time in ms after which the process shall wakeup.
271 * The time has a maximum of 49 days and shall be around one day. It may
272 * be set much shorter for debugging purpose.
273 */
274 #define DEFAULT_WAKEUP 86400000
275
276 /*
277 * QUEUE_TIMEOUT is the time in ms after which an unused queue will be
278 * removed. A queue is unused if no client is connected to it.
279 * be set much shorter for debugging purpose.
280 * The time has a maximum of 49 days and may be set to one week. It may
281 * be set much shorter for debugging purpose.
282 */
283 #define QUEUE_TIMEOUT (86400000*7)
284
285 /*
286 * RxTime defines a structure holding the time in milliseconds resolution.
287 * I know, most systems have at least one sort of high precision
288 * time structure, but the ugly "#ifdef" are unreadable if we use
289 * them all over here.
290 * Defining a structure on our own is much more helpful.
291 * Of course, we can't use a single 32 bit value for milliseconds. This
292 * will break the server after 49 days. unix will live a little bit longer
293 * and there shall Windows machines exist which doesn't have reboot since
294 * longer periods ;-)
295 */
296 typedef struct {
297 /* seconds are typical time_t values. */
298 time_t seconds ;
299
300 /*
301 * milli's values are between 0 and 999. The special value -1 indicates
302 * a not-used condition.
303 */
304 int milli ;
305 } RxTime ;
306
307 /*
308 * now holds the current time. It isn't updated after every operation and
309 * may be out of date by some milliseconds some times.
310 */
311 RxTime now ;
312
313 /*
314 * This value is now plus 7 days.
315 */
316 RxTime queue_deadline ;
317
318 struct _Client ;
319 typedef struct _RxQueue {
320 /*
321 * linked list maintainance elements
322 */
323 struct _RxQueue *prev, *next ;
324 /* name is the uppercased name of the queue.
325 */
326 streng *name ;
327 /*
328 * Indicates if the queue is a "real" queue
329 * or a false queue as a result of a rxqueue('set', 'qname')
330 * on a queue that doesn't exist. This is crap behaviour!
331 * but that's how Object Rexx works :-(
332 */
333 int isReal ;
334 /*
335 * Content: single buffered stack in opposite to the multi buffered
336 * internal stacks of Regina.
337 */
338 Buffer buf ;
339 /*
340 * deadline is the time the queue was last used plus a timeout.
341 * The queue is removed if the queue isn't used for one week.
342 * Thus, the value is the time the queue was used last plus one week.
343 */
344 RxTime deadline ;
345 /*
346 * Several clients may want to wait for incoming data. They are queued
347 * in the following structure and automatically reponsed by the
348 * data acceptor of the queue in a FIFO manner, which is a fair-queue
349 * algorithm.
350 * The clients will get a notice of am error if the queue is destroyed
351 * or emptied by an explicite call.
352 * structure (n = newer, o = older)
353 * oldest newest
354 * || ||
355 * NULL<-o--client<-o--client<-o--client--n->NULL
356 * | ^ | ^
357 * | | | |
358 * +--n--+ +--n--+
359 */
360 struct _Client *oldest, *newest;
361 } RxQueue;
362
363 /*
364 * queues we work on.
365 * Format, p=prev, n=next:
366 * queues
367 * ||
368 * NULL<--p-queue-n---->queue-n---->queue-n-->NULL
369 * ^ v ^ v
370 * | | | |
371 * +---p---+ +---p---+
372 */
373 RxQueue *queues ;
374
375 /* SESSION is the special queue which can't be deleted and to which clients
376 * drop when their current queue is deleted.
377 */
378 RxQueue *SESSION ;
379
380 /*
381 * Structure for multiple clients
382 */
383 typedef struct _Client
384 {
385 /*
386 * linked list maintainance elements
387 */
388 struct _Client *prev, *next ;
389
390 /*
391 * socket contains the socket's handle
392 */
393 int socket;
394
395 /*
396 * Indicates if this client is a Websocket client, 0 no, 1 yes
397 */
398 int isWebsocket;
399
400 /*
401 * each client has a default queue associated. It must be valid all
402 * the times after initialization.
403 */
404 RxQueue *default_queue;
405
406 /*
407 * if queue_timeout is set, the client expects an error code after
408 * this time instead of waiting until world's end.
409 * The value is in milliseconds.
410 * A value of zero means no timeout; return immediately if no data
411 * A value of -1 means wait forever
412 */
413 long queue_timeout;
414
415 /*
416 * We manage a deadline. A PULL operation is in an error state after
417 * this timestamp. The value is set at a PULL operation to
418 * now+queue_timeout.
419 */
420 RxTime deadline ;
421
422 /*
423 * linked list maintainance elements for waiters.
424 */
425 struct _Client *older, *newer ;
426 } Client;
427
428 /*
429 * clients we work on.
430 * Format, p=prev, n=next:
431 * clients
432 * ||
433 * NULL<--p-client-n--->client-n--->client-n-->NULL
434 * ^ v ^ v
435 * | | | |
436 * +---p---+ +---p---+
437 */
438 Client *clients;
439
440 int running = 1;
441 int allclean = 0;
442 time_t base_secs; /* the time the process started */
443
444 static long portno = 0;
445 static int world = 0 ;
446 static int isdaemon = 0;
447 static int tosuicide = 0;
448
449 void empty_queue( RxQueue *q ) ;
450
451 #if !defined(HAVE_STRERROR)
452 /*
453 * Sigh! This must probably be done this way, although it's incredibly
454 * backwards. Some versions of gcc comes with a complete set of ANSI C
455 * include files, which contains the definition of strerror(). However,
456 * that function does not exist in the default libraries of SunOS.
457 * To circumvent that problem, strerror() is #define'd to get_sys_errlist()
458 * in config.h, and here follows the definition of that function.
459 * Originally, strerror() was #defined to sys_errlist[x], but that does
460 * not work if string.h contains a declaration of the (non-existing)
461 * function strerror().
462 *
463 * So, this is a mismatch between the include files and the library, and
464 * it should not create problems for Regina. However, the _user_ will not
465 * encounter any problems until he compiles Regina, so we'll have to
466 * clean up after a buggy installation of the C compiler!
467 */
get_sys_errlist(int num)468 const char *get_sys_errlist( int num )
469 {
470 extern char *sys_errlist[] ;
471 return sys_errlist[num] ;
472 }
473 #endif
474
475 #ifdef WIN32
Win32ErrorString(int code)476 const volatile char *Win32ErrorString(int code)
477 {
478 static char buffer[512];
479 size_t len;
480 const CHAR *array[10];
481 static HINSTANCE tcpip = NULL;
482 DWORD rc;
483
484 for (rc = 0;rc < sizeof(array) / sizeof(array[0]);rc++)
485 array[rc] = "?";
486
487 sprintf(buffer,"code %d: ",code);
488 len = strlen(buffer);
489
490 if (tcpip == NULL)
491 tcpip = GetModuleHandle("wsock32");
492 rc = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM |
493 FORMAT_MESSAGE_FROM_HMODULE |
494 FORMAT_MESSAGE_ARGUMENT_ARRAY |
495 FORMAT_MESSAGE_MAX_WIDTH_MASK,
496 tcpip, /* lpSource */
497 code,
498 GetUserDefaultLangID(),
499 buffer + len,
500 sizeof(buffer) - len - 1, /* w/o term. 0 */
501 (va_list *) &array);
502 if ((rc == 0) && (len >= 2))
503 buffer[len - 2] = '\0'; /* cut off ": " at the end*/
504 return(buffer);
505 }
506 #endif
507
508 /*
509 * get_now returns the current time.
510 */
get_now(void)511 RxTime get_now( void )
512 {
513 RxTime retval ;
514 #if defined(HAVE_GETTIMEOFDAY)
515 struct timeval times ;
516
517 gettimeofday(×, NULL) ;
518 retval.seconds = times.tv_sec ;
519 retval.milli = times.tv_usec / 1000 ;
520
521 #elif defined(MAXLONGLONG)
522 static enum { T_first, T_OK, T_illegal } state = T_first ;
523 static LARGE_INTEGER freq;
524 LARGE_INTEGER curr;
525
526 retval.seconds = 0; /* Keep compiler happy */
527 if ( state == T_first )
528 {
529 if ( !QueryPerformanceFrequency( &freq ) )
530 state = T_illegal ;
531 else
532 state = T_OK ;
533 }
534 if ( state == T_OK )
535 {
536 if ( !QueryPerformanceCounter( &curr ) )
537 state = T_illegal ;
538 else
539 {
540 ULONGLONG h ;
541 /*
542 * if we don't native support for the 64 bit arithmetic, use
543 * doubles. Everything else is a pain.
544 * I hope we never get a compiler for Windows which can't compile
545 * this directly.
546 */
547 retval.seconds = (time_t) ( curr.QuadPart / freq.QuadPart ) ;
548 h = curr.QuadPart % freq.QuadPart ;
549 h *= 1000 ;
550 retval.milli = (int) ( h / freq.QuadPart ) ;
551 }
552 }
553 if ( state == T_illegal )
554 {
555 /* Windows systems have ftime in the C library */
556 struct timeb timebuffer;
557
558 ftime(&timebuffer);
559 retval.seconds = timebuffer.time ;
560 retval.milli = timebuffer.millitm ;
561 }
562
563 #elif defined(HAVE_FTIME)
564 struct timeb timebuffer;
565
566 ftime(&timebuffer);
567 retval.seconds = timebuffer.time ;
568 retval.milli = timebuffer.millitm ;
569
570 #else
571 clock_t c ;
572
573 if ( ( c = clock() ) == (clock_t) -1 )
574 {
575 /*
576 * clock() values are not adjusted to 1.1.1970 and CLOCKS_PER_SEC
577 * may be a float or double
578 */
579 retval.seconds = (time_t) (c / CLOCKS_PER_SEC) ;
580 retval.milli = (int) ( ( c * 1000.0 ) / CLOCKS_PER_SEC) % 1000 ;
581 }
582 else
583 {
584 retval.seconds = time( NULL ) ;
585 retval.milli = 0 ;
586 }
587 #endif
588 return retval ;
589 }
590
591 /*
592 * time_add increments an amount of milliseconds to time. the amount is
593 * usually a Client->queue_timeout.
594 */
time_add(RxTime * t,long incr)595 static void time_add( RxTime *t, long incr )
596 {
597 time_t s = (time_t) ( incr / 1000 ) ;
598 int m = ( incr % 1000 ) ;
599
600 assert( t->milli != -1 ) ;
601 /* wrapping may occur, be careful */
602 s += t->seconds ;
603 if ( ( t->milli += m ) >= 1000 )
604 {
605 s++ ;
606 t->milli -= 1000 ;
607 }
608 if ( t->seconds > s )
609 {
610 /* wrapping! */
611 s = (time_t) -1 ; /* maximum 1 */
612 if ( t->seconds > s )
613 {
614 /* time_t is a signed value, most representations have
615 * MinX = -MaxX - 1
616 */
617 s++;
618 s = -s;
619 }
620 assert( t->seconds <= s ) ;
621 }
622 t->seconds = s ;
623 }
624
625 /*
626 * time_diff returns t1 - t2 in milliseconds. -1 is returned on overflow;
627 * -1 is never returned else. -2 is used is the returned time will be
628 * negative.
629 */
time_diff(RxTime t1,RxTime t2)630 static int time_diff( RxTime t1, RxTime t2 )
631 {
632 int retval, h ;
633
634 assert( t1.milli != -1 ) ;
635 assert( t2.milli != -1 ) ;
636 if ( t1.seconds < t2.seconds )
637 retval = -2 ;
638 else
639 {
640 retval = (int) ( ( t1.seconds - t2.seconds ) * 1000 ) ;
641
642 if ( ( t1.milli < t2.milli ) && ( retval == 0 ) )
643 retval = -2 ;
644 else
645 {
646 retval += t1.milli - t2.milli ;
647
648 /* final check for overflow */
649 h = (int) ( t1.seconds - t2.seconds ) ;
650 if ( ( ( retval / 1000 ) < h - 1 )
651 || ( ( retval / 1000 ) > h + 1 )
652 || ( retval < 0 ) )
653 return -1 ;
654 }
655 }
656 return retval ;
657 }
658
659 /* compares 2 strengs and returns 0 if they are equal, 1 if not.
660 * The second one is converted to uppercase while comparing, the first
661 * one must be uppercase.
662 */
Str_ccmp(const streng * first,const streng * second)663 int Str_ccmp( const streng *first, const streng *second )
664 {
665 int tmp ;
666
667 if ( PSTRENGLEN( first ) != PSTRENGLEN( second ) )
668 return 1 ;
669
670 for (tmp=0; tmp < PSTRENGLEN( first ); tmp++ )
671 if ( first->value[tmp] != toupper( second->value[tmp] ) )
672 return 1 ;
673
674 return 0 ;
675 }
676
677 /* Str_cre_or_exit create a streng or exits after a message about
678 * missing memory.
679 */
Str_cre_or_exit(const char * str,unsigned length)680 streng *Str_cre_or_exit( const char *str, unsigned length )
681 {
682 streng *retval ;
683
684 if ( ( retval = MAKESTRENG( length ) ) == NULL )
685 {
686 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
687 exit( ERR_STORAGE_EXHAUSTED );
688 }
689
690 memcpy( PSTRENGVAL( retval ), str, length ) ;
691 retval->len = length ;
692 return retval ;
693 }
694
695 /*
696 * Str_buf create a streng from a buffer. It may return NULL on error.
697 */
Str_buf(const char * str,unsigned length)698 streng *Str_buf( const char *str, unsigned length )
699 {
700 streng *retval ;
701
702 if ( ( retval = MAKESTRENG( length ) ) == NULL )
703 {
704 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
705 return NULL;
706 }
707
708 memcpy( PSTRENGVAL( retval ), str, length );
709 retval->len = length;
710 return retval;
711 }
712
713 /*
714 * Str_dup duplicates a streng. It may return NULL on error.
715 */
Str_dup(const streng * str)716 streng *Str_dup( const streng *str )
717 {
718 return Str_buf( PSTRENGVAL( str ), PSTRENGLEN( str ) ) ;
719 }
720
Str_cat(streng * first,const streng * second)721 streng *Str_cat( streng *first, const streng *second )
722 {
723 streng *ptr;
724 int tmp;
725
726 tmp = Str_len( first ) + Str_len( second );
727
728 if ( ( ptr = MAKESTRENG( tmp ) ) != NULL )
729 {
730 memcpy( ptr->value, first->value, Str_len( first ) );
731 memcpy( ptr->value + Str_len( first ), second->value, Str_len( second ) );
732 ptr->len = tmp;
733 }
734 return ptr;
735 }
736
737 /*
738 * delete_a_queue deletes the queue's content and unlinks it from the list
739 * of existing queues if q isn't SESSION.
740 */
delete_a_queue(RxQueue * q)741 void delete_a_queue( RxQueue *q )
742 {
743 Client *c ;
744
745 if ( q->name )
746 {
747 DEBUGDUMP(printf("Deleting queue <%.*s>\n", PSTRENGLEN( q->name), PSTRENGVAL( q->name)););
748 }
749 else
750 {
751 DEBUGDUMP(printf("Deleting natal queue\n"););
752 }
753 empty_queue( q ) ;
754
755 if ( q == SESSION )
756 return ;
757
758 if ( q->name != NULL )
759 DROPSTRENG( q->name );
760
761 /*
762 * dequeue from the linked list and free space
763 */
764 if (q->prev != NULL)
765 q->prev->next = q->next ;
766 else
767 queues = q->next ;
768 if (q->next != NULL)
769 q->next->prev = q->prev ;
770 free( q ) ;
771
772 /*
773 * Let all clients connected to this queue fall back to "SESSION" as
774 * the default queue.
775 * FIXME: It may be better to leave a queue to a non-isReal state.
776 * This kind of work destroys the data integrity on SESSION.
777 * We get many more connections working on SESSION than expected.
778 */
779 for( c = clients; c != NULL; c = c->next )
780 {
781 if ( c->default_queue == q )
782 c->default_queue = SESSION ;
783 }
784 }
785
786 /*
787 * delete_a_client deletes the clients' content and unlinks it from the list
788 * of existing clients.
789 */
delete_a_client(Client * c)790 void delete_a_client( Client *c )
791 {
792 closesocket( c->socket ) ;
793
794 /*
795 * dequeue from the linked list and free space
796 */
797 if (c->prev != NULL)
798 c->prev->next = c->next ;
799 else
800 clients = c->next ;
801 if (c->next != NULL)
802 c->next->prev = c->prev ;
803 free( c ) ;
804 }
805
delete_all_queues(void)806 void delete_all_queues( void )
807 {
808 RxQueue *q, *h ;
809
810 /*
811 * SESSION won't be deleted. Be careful to initiate one delete per
812 * queue.
813 */
814 for ( q = queues; q != NULL; )
815 {
816 h = q ;
817 q = q->next ;
818 delete_a_queue( h );
819 }
820 }
821
get_unspecified_queue(void)822 char *get_unspecified_queue( void )
823 {
824 char *rxq = getenv( "RXQUEUE" );
825
826 if ( rxq == NULL )
827 rxq = "SESSION";
828
829 if ( strchr(rxq, '@' ) == NULL )
830 {
831 char *h ;
832
833 if ( ( h = (char *)malloc( strlen( rxq ) + 2 ) ) != NULL )
834 {
835 strcpy( h, rxq ) ;
836 strcat( h, "@" ) ;
837 rxq = h ;
838 }
839 }
840 return rxq;
841 }
842
suicide(void)843 int suicide( void )
844 {
845 int sock;
846 int myport;
847 streng *queue;
848 char *in_queue=get_unspecified_queue();
849 Queue q;
850
851 if ( init_external_queue( NULL ) )
852 return 1;
853
854
855 queue = Str_cre_or_exit( in_queue, strlen( in_queue ) ) ;
856
857 if ( parse_queue( NULL, queue, &q ) == 1 )
858 {
859 if ( portno == 0 )
860 myport = default_port_number();
861 else
862 myport = portno;
863 q.u.e.portno = myport;
864 sock = connect_to_rxstack( NULL, &q );
865 if ( sock < 0 )
866 {
867 /* error already shown by the function */
868 return(ERR_RXSTACK_CANT_CONNECT);
869 }
870 send_command_to_rxstack( NULL, sock, RXSTACK_KILL_STR, NULL, 0 );
871 read_result_from_rxstack( NULL, sock, RXSTACK_HEADER_SIZE );
872 closesocket(sock);
873 }
874 term_external_queue( ) ;
875 return 0;
876 }
877
rxstack_cleanup(void)878 int rxstack_cleanup( void )
879 {
880 if ( !allclean )
881 {
882 DEBUGDUMP(printf("Cleaning up\n"););
883 /*
884 * Disconnect all clients
885 * Delete all clients
886 */
887 delete_all_queues();
888 DEBUGDUMP(printf("Finished Cleaning up\n"););
889 term_external_queue( ) ;
890 allclean = 1;
891 }
892 return 0;
893 }
894
895 #ifdef BUILD_NT_SERVICE
report_service_start(void)896 BOOL report_service_start( void )
897 {
898 /*
899 * report the status to the service control manager.
900 */
901 return (ReportStatusToSCMgr(
902 SERVICE_RUNNING, /* service state */
903 NO_ERROR, /* exit code */
904 0)); /* wait hint */
905 }
906
report_service_pending_start(void)907 BOOL report_service_pending_start( void )
908 {
909 /*
910 * report the status to the service control manager.
911 */
912 return (ReportStatusToSCMgr(
913 SERVICE_START_PENDING, /* service state */
914 NO_ERROR, /* exit code */
915 3000)); /* wait hint */
916 }
917
nt_service_start(void)918 int nt_service_start( void )
919 {
920 /*
921 * code copied from sample NT Service code. The goto's are
922 * not mine!!
923 * report the status to the service control manager.
924 */
925 if ( !report_service_pending_start() )
926 goto cleanupper;
927
928 /*
929 * create the event object. The control handler function signals
930 * this event when it receives the "stop" control code.
931 */
932 hServerStopEvent = CreateEvent(
933 NULL, /* no security attributes */
934 TRUE, /* manual reset event */
935 FALSE, /* not-signalled */
936 NULL); /* no name */
937
938 if ( hServerStopEvent == NULL)
939 goto cleanupper;
940
941 /*
942 * report the status to the service control manager.
943 */
944 if ( !report_service_pending_start() )
945 goto cleanupper;
946
947 return 0;
948 cleanupper:
949 return 1;
950 }
951
ServiceStop()952 VOID ServiceStop()
953 {
954 DEBUGDUMP(printf("In ServiceStop()\n"););
955 suicide();
956 /*
957 running = 0;
958 */
959 }
960 #endif
961
rxstack_signal_handler(int sig)962 void rxstack_signal_handler( int sig )
963 {
964 running = 0;
965 }
966
967 /* Creates a new client and appends it in front of the current clients.
968 * Don't forget to set a default_queue and the socket at once.
969 */
get_new_client()970 Client *get_new_client( )
971 {
972 Client *retval = (Client *)malloc( sizeof( Client ) ) ;
973
974 if ( retval == NULL )
975 return NULL ;
976 memset( retval, 0, sizeof( Client ) ) ;
977 retval->socket = -1 ;
978 retval->deadline.milli = -1 ; /* deadline not used --> infinite timeout */
979
980 retval->next = clients ;
981 if ( clients != NULL )
982 clients->prev = retval ;
983 clients = retval ;
984 return retval ;
985 }
986
987 /*
988 * Find the named queue - case insensitive
989 * returns the queue or NULL if no queue with this name exists.
990 */
find_queue(const streng * queue_name)991 RxQueue *find_queue( const streng *queue_name )
992 {
993 RxQueue *q ;
994
995 for ( q = queues; q != NULL; q = q->next )
996 {
997 /* This is inefficient, FIXME: Introduce a hash value */
998 if ( Str_ccmp( q->name, queue_name ) == 0 )
999 return q;
1000 }
1001 return NULL ;
1002 }
1003
1004 /* Creates a new queue and appends it in front of the current queues.
1005 * Don't forget to set a name at once.
1006 */
get_new_queue(void)1007 RxQueue *get_new_queue( void )
1008 {
1009 RxQueue *retval = (RxQueue *)malloc( sizeof( RxQueue ) ) ;
1010
1011 if ( retval == NULL )
1012 return NULL ;
1013 memset( retval, 0, sizeof( RxQueue ) ) ;
1014 retval->deadline = queue_deadline ;
1015
1016 retval->next = queues ;
1017 if ( queues != NULL )
1018 queues->prev = retval ;
1019 queues = retval ;
1020 return retval ;
1021 }
1022
rxstack_delete_queue(Client * client,streng * queue_name)1023 int rxstack_delete_queue( Client *client, streng *queue_name )
1024 {
1025 RxQueue *q ;
1026 int rc ;
1027
1028 if ( ( q = find_queue( queue_name ) ) == NULL )
1029 {
1030 rc = 9;
1031 }
1032 else
1033 {
1034 if ( q == SESSION )
1035 rc = 5;
1036 else
1037 {
1038 if ( !q->isReal )
1039 {
1040 /*
1041 * If we found a false queue, return 9
1042 * but delete it.
1043 */
1044 delete_a_queue( q );
1045 rc = 9;
1046 }
1047 else
1048 {
1049 /*
1050 * Delete the contents of the queue
1051 * and mark it as gone.
1052 */
1053 delete_a_queue( q );
1054 rc = 0;
1055 }
1056 }
1057 }
1058 return rc ;
1059 }
1060
rxstack_create_client(int socket)1061 int rxstack_create_client( int socket )
1062 {
1063 Client *c ;
1064
1065 if ( ( c = get_new_client( ) ) == NULL )
1066 {
1067 closesocket( socket ) ;
1068 /* This may have been the connection telling us to go down ;-) */
1069 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1070 exit( ERR_STORAGE_EXHAUSTED );
1071 }
1072
1073 c->socket = socket;
1074 c->default_queue = SESSION ;
1075 return 0;
1076 }
1077
encode_ws_payload(u_char const * src,size_t srclength,char * target,size_t targsize,unsigned int opcode)1078 int encode_ws_payload(u_char const *src, size_t srclength, char *target, size_t targsize, unsigned int opcode)
1079 {
1080 unsigned long long b64_sz, payload_offset = 2, len = 0;
1081
1082 if ((int)srclength <= 0)
1083 {
1084 return 0;
1085 }
1086
1087 DEBUGDUMP(printf("Encode new frame\n"););
1088 b64_sz = srclength;
1089
1090 target[0] = (char)((opcode & 0x0F) | 0x80);
1091
1092 if (b64_sz <= 125)
1093 {
1094 target[1] = (char) b64_sz;
1095 payload_offset = 2;
1096 }
1097 else if ((b64_sz > 125) && (b64_sz < 65536))
1098 {
1099 target[1] = (char) 126;
1100 *(u_short*)&(target[2]) = htons(b64_sz);
1101 payload_offset = 4;
1102 }
1103 else
1104 {
1105 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Sending frames larger than 65535 bytes not supported" );
1106 return -1;
1107 }
1108
1109 memcpy( target+payload_offset, src, srclength );
1110 len = srclength;
1111 return len + payload_offset;
1112 }
1113
1114 /* rxstack_send_return writes back to the client the action return code
1115 * and optionally a string of length len.
1116 * The functions returns 0 on success, -1 on error
1117 */
rxstack_send_return(Client * client,char * action,char * str,int len)1118 int rxstack_send_return( Client *client, char *action, char *str, int len )
1119 {
1120 streng *qlen, *header;
1121 int rc, retval = 0, rcode ;
1122 int sock = client->socket;
1123 char ws_response[4096];
1124
1125 if ( client->isWebsocket )
1126 {
1127 DEBUGDUMP(printf("Sending to websocket client %d Result: %c <%.*s>\n", sock, *action, len, (str) ? str : ""););
1128 rcode = encode_ws_payload( (const unsigned char *)str, len, ws_response, sizeof(ws_response), 1);
1129 rc = send( client->socket, ws_response, rcode, 0 );
1130 if ( rc != rcode )
1131 {
1132 DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1133 retval = -1;
1134 }
1135 }
1136 else
1137 {
1138 DEBUGDUMP(printf("Sending to %d Result: %c <%.*s>\n", sock, *action, len, (str) ? str : ""););
1139 qlen = REXX_D2X( len );
1140 if ( qlen )
1141 {
1142 header = REXX_RIGHT( qlen, RXSTACK_HEADER_SIZE, '0');
1143 DROPSTRENG( qlen );
1144 if ( header )
1145 {
1146 header->value[0] = action[0];
1147 DEBUGDUMP(printf("Sending Header: %.*s\n", PSTRENGLEN(header), PSTRENGVAL(header)););
1148 rc = send( sock, PSTRENGVAL(header), PSTRENGLEN(header), 0 );
1149 if ( rc != PSTRENGLEN(header) )
1150 {
1151 DEBUGDUMP(printf("Send failed: rc> %d != PSTRENGLEN(header)> %d errno = %d\n", rc, PSTRENGLEN(header),os_errno ););
1152 retval = -1 ;
1153 }
1154 else if ( str )
1155 {
1156 rc = send( sock, str, len, 0 );
1157 if ( rc != len )
1158 {
1159 DEBUGDUMP(printf("Send failed: errno = %d\n", os_errno ););
1160 retval = -1 ;
1161 }
1162 }
1163 DROPSTRENG( header );
1164 }
1165 }
1166 }
1167 return retval ;
1168 }
1169
rxstack_delete_client(Client * client)1170 int rxstack_delete_client( Client *client )
1171 {
1172 DEBUGDUMP(printf("Deleting client: Socket: %d\n", client->socket ););
1173 delete_a_client( client ) ;
1174 return 0;
1175 }
1176
1177 /* rxstack_set_default_queue sets the client's (new) queue name.
1178 * A false queue is created if the queue isn't found.
1179 * The new queue is returned or NULL if we are out of memory.
1180 */
rxstack_set_default_queue(Client * client,streng * data)1181 RxQueue *rxstack_set_default_queue( Client *client, streng *data )
1182 {
1183 RxQueue *q, *prev;
1184 streng *newq;
1185
1186 prev = client->default_queue;
1187 if ( ( q = find_queue( data ) ) == NULL )
1188 {
1189 /*
1190 * We didn't find a real or a false queue, so create
1191 * a false queue
1192 */
1193 q = get_new_queue( );
1194 if ( q != NULL )
1195 {
1196 newq = Str_dup( data );
1197 if ( newq == NULL )
1198 {
1199 delete_a_queue( q );
1200 return NULL;
1201 }
1202 q->name = REXX_UPPER( newq ) ;
1203 DEBUGDUMP(printf("Creating the false queue <%.*s>", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ););
1204 /* q->isReal set to 0 by get_new_queue --> false queue */
1205 client->default_queue = q;
1206 }
1207 }
1208 else
1209 {
1210 client->default_queue = q;
1211 }
1212
1213 if ( q == NULL )
1214 {
1215 DEBUGDUMP(printf("No FREE MEMORY when setting default queue for client: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1216 }
1217 else
1218 {
1219 DEBUGDUMP(printf("Setting default queue for client: <%.*s> Prev: %p <%.*s>\n", PSTRENGLEN(q->name), PSTRENGVAL(q->name), prev, PSTRENGLEN(prev->name), PSTRENGVAL(prev->name) ););
1220 /* SET or CREATE resets a timeout to 0; effectively turns off any timeout */
1221 client->queue_timeout = 0 ;
1222 }
1223 return q;
1224 }
1225
rxstack_timeout_queue(Client * client,const streng * data)1226 int rxstack_timeout_queue( Client *client, const streng *data )
1227 {
1228 int val,error;
1229
1230 /*
1231 * Convert the timeout
1232 * If the supplied timeout is 0 (infinite wait), set the client->queue_timeout
1233 * to -1.
1234 */
1235 val = REXX_X2D( data, &error );
1236 if ( error )
1237 return 2;
1238 if ( val == 0 )
1239 val = -1;
1240 client->queue_timeout = val;
1241 DEBUGDUMP(printf("Timeout on queue: %ld\n", client->queue_timeout ););
1242
1243 return 0;
1244 }
1245
1246 /* unique_name creates a unique name for a queue.
1247 * The function may exit after a message about missing memory.
1248 */
unique_name(void)1249 static streng *unique_name( void )
1250 {
1251 static int first = 1 ;
1252 static char buf[ 80 ] ;
1253 static char *ptr ;
1254 static unsigned runner = 0;
1255
1256 if ( first )
1257 {
1258 first = 0 ;
1259 sprintf( buf, "S%d%ld", (int) getpid(), (long) time( NULL ) ) ;
1260 ptr = buf + strlen( buf ) ;
1261 }
1262 sprintf( ptr, "%u", runner++ ) ;
1263 return Str_buf( buf, strlen( buf ) ) ;
1264 }
1265
rxstack_create_queue(Client * client,streng * data,streng ** result)1266 int rxstack_create_queue( Client *client, streng *data, streng **result )
1267 {
1268 RxQueue *q ;
1269 streng *new_queue = NULL;
1270 int rc = 0;
1271
1272 if ( data )
1273 {
1274 DEBUGDUMP(printf("Creating new user-specified queue: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1275 if ( ( q = find_queue( data ) ) == NULL )
1276 {
1277 /*
1278 * No queue of that name, so use a duplicate of it.
1279 */
1280 DEBUGDUMP(printf("Couldn't find <%.*s>; so creating it\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1281 new_queue = data;
1282 }
1283 else
1284 {
1285 /*
1286 * If the queue we found is a false queue, we can still
1287 * use the supplied name and the slot
1288 */
1289 DROPSTRENG( data );
1290 if ( !q->isReal )
1291 {
1292 DEBUGDUMP(printf("Found false queue\n" ););
1293 q->isReal = 1;
1294 /* SET or CREATE resets a timeout to 0 */
1295 client->queue_timeout = 0;
1296 *result = q->name;
1297 return 0; /* Pass back the name. May be different due to
1298 * different locales or codepages, but it IS the selected
1299 * name.
1300 */
1301 }
1302 new_queue = unique_name( );
1303 if ( new_queue == NULL )
1304 return 3;
1305 DEBUGDUMP(printf("Having to create unique queue <%.*s>\n", PSTRENGLEN( new_queue ), PSTRENGVAL( new_queue ) ););
1306 rc = 1;
1307 }
1308 }
1309 else
1310 {
1311 DEBUGDUMP(printf("Creating system generated queue.\n"););
1312 new_queue = unique_name( );
1313 if ( new_queue == NULL )
1314 return 3;
1315 }
1316
1317 if ( ( q = get_new_queue( ) ) == NULL )
1318 {
1319 DROPSTRENG( new_queue );
1320 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1321 return 3;
1322 }
1323
1324 /*
1325 * Uppercase the queue name
1326 */
1327 q->name = REXX_UPPER( new_queue );
1328 q->isReal = 1;
1329 /* SET or CREATE resets a timeout to 0 */
1330 client->queue_timeout = 0 ;
1331 *result = q->name;
1332 return rc; /* Both code 0 and code 1 return the name to the caller.
1333 * May be different due to codepages, etc.
1334 */
1335 }
1336
1337 /*
1338 * Pushes 'line' onto the REXX stack in LIFO manner.
1339 * point to the new line. The line is put on top of the current
1340 * buffer.
1341 */
rxstack_stack_lifo(RxQueue * current_queue,streng * line)1342 StackLine *rxstack_stack_lifo( RxQueue *current_queue, streng *line )
1343 {
1344 StackLine *newbox ;
1345
1346 if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1347 {
1348 newbox->contents = line ;
1349 LIFO_LINE( ¤t_queue->buf, newbox ) ;
1350 }
1351
1352 return newbox;
1353 }
1354
1355
1356 /*
1357 * Pushes 'line' onto the REXX stack in FIFO manner.
1358 * point to the new line. The line is put on top of the current
1359 * buffer.
1360 */
rxstack_stack_fifo(RxQueue * current_queue,streng * line)1361 StackLine *rxstack_stack_fifo( RxQueue *current_queue, streng *line )
1362 {
1363 StackLine *newbox ;
1364
1365 if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1366 {
1367 newbox->contents = line ;
1368 FIFO_LINE( ¤t_queue->buf, newbox ) ;
1369 }
1370
1371 return newbox;
1372 }
1373
1374 /*
1375 * dequeue_waiter dequeues the Client c from the waiter's list of the queue q.
1376 * The client must be linked in the waiter's list of the queue.
1377 * The client's variables for this purpose are cleaned, too.
1378 */
dequeue_waiter(RxQueue * q,Client * c)1379 static void dequeue_waiter( RxQueue *q, Client *c )
1380 {
1381 #if defined(DEBUG)
1382 Client *run ;
1383 for ( run = q->oldest; run != NULL; run = run->newer )
1384 if ( run == c )
1385 break ;
1386 assert( run != NULL ) ; /* This is the test if c is a waiter of q */
1387 #endif
1388
1389 if ( c->older != NULL )
1390 {
1391 c->older->newer = c->newer ;
1392 }
1393 else
1394 {
1395 q->oldest = c->newer ;
1396 if ( q->oldest != NULL )
1397 q->oldest->older = NULL ;
1398 }
1399
1400 if ( c->newer != NULL )
1401 {
1402 c->newer->older = c->older ;
1403 }
1404 else
1405 {
1406 q->newest = c->older ;
1407 if ( q->newest != NULL )
1408 q->newest->newer = NULL ;
1409 }
1410 assert( ( ( q->newest != NULL ) && ( q->oldest != NULL ) ) ||
1411 ( ( q->newest == NULL ) && ( q->oldest == NULL ) ) ) ;
1412 c->older = c->newer = NULL ;
1413 c->deadline.milli = -1 ;
1414 }
1415
1416 /* redir sends data as the answer of a pull operation back to
1417 * the oldest waiting client and dequeues this client.
1418 * data is dropped after the operation.
1419 */
redir(RxQueue * q,streng * data)1420 void redir( RxQueue *q, streng *data )
1421 {
1422 Client *c ;
1423
1424 c = q->oldest ;
1425 DEBUGDUMP(printf("Redirecting <%.*s> to waiting client %d\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", c->socket ););
1426
1427 dequeue_waiter( q, c ) ;
1428
1429 rxstack_send_return( c, "0", PSTRENGVAL( data ), PSTRENGLEN( data ) ) ;
1430 DROPSTRENG( data ) ;
1431 }
1432
1433 /* bad_news_for_waiter informs a waiter about an error while waiting for
1434 * data for a pull request. The client is dequeued from its queue.
1435 */
bad_news_for_waiter(RxQueue * q,Client * c)1436 void bad_news_for_waiter( RxQueue *q, Client *c )
1437 {
1438 dequeue_waiter( q, c ) ;
1439 DEBUGDUMP(printf("Sending negative response to waiting client %d\n", c->socket ););
1440
1441 rxstack_send_return( c, "4", NULL, 0 ) ;
1442 }
1443
rxstack_queue_data(Client * client,streng * data,char order)1444 int rxstack_queue_data( Client *client, streng *data, char order )
1445 {
1446 int rc = 0;
1447
1448 if ( client->default_queue->oldest != NULL )
1449 {
1450 redir( client->default_queue, data ) ;
1451 return 0 ;
1452 }
1453 DEBUGDUMP(printf("Queueing: <%.*s> Order: %c\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", order ););
1454 if ( order == RXSTACK_QUEUE_FIFO )
1455 {
1456 if ( rxstack_stack_fifo( client->default_queue, data ) == NULL )
1457 {
1458 DROPSTRENG( data );
1459 rc = 3;
1460 }
1461 }
1462 else
1463 {
1464 if ( rxstack_stack_lifo( client->default_queue, data ) == NULL )
1465 {
1466 DROPSTRENG( data );
1467 rc = 3;
1468 }
1469 }
1470 return rc;
1471 }
1472
1473 /* Clears the content of the queue. All waiters are informed by code
1474 * 2 of a cleaned queue and removed the the waiter's list.
1475 */
empty_queue(RxQueue * q)1476 void empty_queue( RxQueue *q )
1477 {
1478 StackLine *tmp, *line;
1479 streng *contents;
1480 Buffer *b ;
1481
1482 b = &q->buf ;
1483 for ( line = b->top; line != NULL; )
1484 {
1485 contents = line->contents;
1486 DROPSTRENG( contents );
1487 tmp = line;
1488 line = line->lower;
1489 free( tmp );
1490 }
1491 memset( &q->buf, 0, sizeof( Buffer ) ) ;
1492
1493 /* acknowledge waiters for data not ready and dequeue them */
1494
1495 while ( q->oldest != NULL ) {
1496 bad_news_for_waiter( q, q->oldest ) ;
1497 }
1498 }
1499
1500 /* Clears the content of the queue named data. In opposite to the previous
1501 * version, the client's current queue isn't set to the named queue any
1502 * longer. The is the default behaviour in Regina.
1503 * returns 0 on success, 2 if the queue doesn't exist.
1504 */
rxstack_empty_queue(Client * client,streng * data)1505 int rxstack_empty_queue( Client *client, streng *data )
1506 {
1507 RxQueue *q ;
1508
1509 DEBUGDUMP(printf("Emptying queue: <%.*s>\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "" ););
1510 if ( ( q = find_queue( data ) ) == NULL )
1511 return 2;
1512
1513 empty_queue( q ) ;
1514
1515 return 0;
1516 }
1517
rxstack_number_in_queue(Client * client)1518 int rxstack_number_in_queue( Client *client )
1519 {
1520 int lines = (int) client->default_queue->buf.elements;
1521
1522 DEBUGDUMP(printf("Querying number in queue: %d\n", lines ););
1523 return lines ;
1524 }
1525
1526 /*
1527 * Pulls a line off the queue and dequeues it.
1528 *
1529 * If nowait isn't set and no data is available and the client's queue_timeout
1530 * is set, the client is set to the newest end of the client's default_queue.
1531 *
1532 * It will be awaken by either ariving data on this pipe, or deleting/emptying
1533 * the pipe, or by a timeout.
1534 * Returns:
1535 * 0 if line OK
1536 * 1 if queue empty
1537 * 3 if waiting
1538 */
rxstack_pull_line_off_queue(Client * client,streng ** result,int nowait)1539 int rxstack_pull_line_off_queue( Client *client, streng **result, int nowait )
1540 {
1541 int rc;
1542 Buffer *b;
1543 StackLine *line;
1544 RxQueue *q;
1545
1546 b = &client->default_queue->buf;
1547 POP_LINE( b, line );
1548 if ( line != NULL )
1549 {
1550 *result = line->contents;
1551 free( line );
1552 rc = 0;
1553 }
1554 else
1555 {
1556 *result = NULL;
1557 if ( nowait )
1558 {
1559 rc = RXSTACK_EMPTY; /* queue empty */
1560 DEBUGDUMP(printf("nowait set to 1\n" ););
1561 }
1562 else
1563 {
1564 if ( client->queue_timeout == 0 )
1565 {
1566 rc = RXSTACK_EMPTY; /* queue empty */
1567 DEBUGDUMP(printf("client timeout = 0\n" ););
1568 }
1569 else
1570 {
1571 assert( client->deadline.milli == -1 );
1572 assert ( client->newer == NULL );
1573 if ( client->queue_timeout != -1 )
1574 {
1575 now = get_now( );
1576 client->deadline = now;
1577 time_add( &client->deadline, client->queue_timeout );
1578 }
1579 q = client->default_queue;
1580 client->newer = NULL;
1581 client->older = q->newest;
1582 if ( client->older != NULL )
1583 client->older->newer = client;
1584 q->newest = client;
1585 if ( q->oldest == NULL )
1586 q->oldest = client;
1587 rc = RXSTACK_WAITING; /* waiting */
1588 DEBUGDUMP(printf("waiting until %ld.%d\n", (long) client->deadline.seconds,client->deadline.milli ););
1589 }
1590 }
1591 }
1592 DEBUGDUMP(printf("Pulling line off queue; rc %d\n", rc ););
1593 return rc;
1594 }
1595
1596 /*
1597 * Gets all queues for all clients
1598 */
rxstack_get_queues(Client * client,streng ** result)1599 int rxstack_get_queues( Client *client, streng **result )
1600 {
1601 int rc,i=0;
1602 int len=0;
1603 RxQueue *q;
1604 char eol[3];
1605 streng *seol,*tmp1=NULL,*tmp2=NULL;
1606
1607 DEBUGDUMP(printf("Getting queues; rc %d\n", rc ););
1608 /* get the length of all queues and the nul terminator and EOL delimiter */
1609 for ( q = queues; q != NULL; )
1610 {
1611 len += PSTRENGLEN( q->name)+1;
1612 #if !defined(UNIX) && !defined(MAC)
1613 len++;
1614 #endif
1615 DEBUGDUMP(printf("Queue name <%.*s>\n", PSTRENGLEN( q->name), PSTRENGVAL( q->name)););
1616 q = q->next ;
1617 }
1618 /* determine eol for appending */
1619 #if !defined(UNIX)
1620 eol[i++] = REGINA_CR;
1621 #endif
1622 #if !defined(MAC)
1623 eol[i++] = REGINA_EOL;
1624 #endif
1625 eol[i] = '\0';
1626 seol = Str_cre_or_exit( eol, i );
1627 for ( q = queues; q != NULL; )
1628 {
1629 if ( tmp1 )
1630 tmp1 = Str_cat( tmp2, q->name );
1631 else
1632 tmp1 = Str_dup( q->name ) ;
1633 /* free tmp2 ? if null */
1634 DROPSTRENG( tmp2 );
1635 tmp2 = Str_cat( tmp1, seol );
1636 /* free tmp1 ? */
1637 DROPSTRENG( tmp1 );
1638 q = q->next ;
1639 }
1640 *result = tmp2;
1641 rc = 0;
1642
1643 return rc;
1644 }
1645
eec_base64_encode(unsigned char * rawstr,int strlen,unsigned char ** encstr,int * encstrlen)1646 static int eec_base64_encode( unsigned char *rawstr, int strlen, unsigned char **encstr, int *encstrlen )
1647 {
1648 unsigned char in[3], out[4];
1649 int len,i,j,k,encoded_length;
1650 unsigned char *encoded_code;
1651
1652 /*
1653 * Allocate 4/3 times the rawstring length
1654 */
1655 encoded_length = 3 + ((strlen * 4) / 3);
1656 encoded_code = (unsigned char *)malloc( encoded_length + sizeof(long) );
1657 if ( encoded_code == NULL )
1658 {
1659 return 1;
1660 }
1661
1662 for ( i = 0, j = 0; i < strlen; i +=3 )
1663 {
1664 len = 1;
1665 in[0] = rawstr[i];
1666 if (i+1 < strlen)
1667 {
1668 in[1] = rawstr[i+1];
1669 len++;
1670 }
1671 else
1672 in[1] = 0;
1673 if (i+2 < strlen)
1674 {
1675 in[2] = rawstr[i+2];
1676 len++;
1677 }
1678 else
1679 in[2] = 0;
1680 /* encode */
1681 out[0] = cb64[ in[0] >> 2 ];
1682 out[1] = cb64[ ((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4) ];
1683 out[2] = (unsigned char) (len > 1 ? cb64[ ((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6) ] : '=');
1684 out[3] = (unsigned char) (len > 2 ? cb64[ in[2] & 0x3f ] : '=');
1685 for ( k = 0; k < 4; k++ )
1686 {
1687 encoded_code[j++] = out[k];
1688 }
1689 }
1690 *encstrlen = j;
1691 *encstr = encoded_code;
1692 return 0;
1693 }
1694
1695 #if REQUIRE_BASE64_DECODE
eec_base64_decode(unsigned char * encstr,long encstrlen,unsigned char ** rawstr,long * rawstrlen)1696 static int eec_base64_decode( unsigned char *encstr, long encstrlen, unsigned char **rawstr, long *rawstrlen )
1697 {
1698 unsigned char *raw_code,v;
1699 unsigned char *encoded_code;
1700 unsigned char in[4], out[3];
1701 int i,j,k,len;
1702
1703 encoded_code = encstr;
1704 if ( (encstrlen % 4 ) != 0 )
1705 {
1706 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Length of encoded value must be a multiple of 4." );
1707 return 1;
1708 }
1709 /*
1710 * Decode encoded_code to produce raw
1711 * Result will always be smaller so allocate memory for encoded length
1712 */
1713 raw_code = (unsigned char *)malloc( encstrlen );
1714 if ( raw_code == NULL )
1715 {
1716 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1717 return 1;
1718 }
1719 /*
1720 * Now decode encoded_code
1721 */
1722 for ( i = 0, j = 0; i < encstrlen; )
1723 {
1724 for ( k = 0, len = 0; k < 4; )
1725 {
1726 v = (unsigned char) encoded_code[i++];
1727 v = (unsigned char) ((v < 43 || v > 122) ? 0 : cd64[ v - 43 ]);
1728 if( v )
1729 {
1730 v = (unsigned char) ((v == '$') ? 0 : v - 61);
1731 if ( v )
1732 {
1733 in[k++] = (unsigned char) (v - 1);
1734 len++;
1735 }
1736 else
1737 break;
1738 }
1739 }
1740 /* decode */
1741 out[ 0 ] = (unsigned char ) (in[0] << 2 | in[1] >> 4);
1742 out[ 1 ] = (unsigned char ) (in[1] << 4 | in[2] >> 2);
1743 out[ 2 ] = (unsigned char ) (((in[2] << 6) & 0xc0) | in[3]);
1744 for ( k = 0; k < len - 1; k++ )
1745 {
1746 raw_code[j++] = out[k];
1747 }
1748 }
1749 raw_code[j] = '\0';
1750 *rawstrlen = j;
1751 *rawstr = raw_code;
1752 return 0;
1753 }
1754 #endif
1755
memcmpi(char * buf1,char * buf2,int len)1756 int memcmpi( char *buf1, char *buf2, int len )
1757 {
1758 register short i=0;
1759 char c1=0,c2=0;
1760 for(i=0;i<len;i++)
1761 {
1762 if (isupper(*buf1))
1763 c1 = tolower(*buf1);
1764 else
1765 c1 = *buf1;
1766 if (isupper(*buf2))
1767 c2 = tolower(*buf2);
1768 else
1769 c2 = *buf2;
1770 if (c1 != c2)
1771 return(c1-c2);
1772 ++buf1;
1773 ++buf2;
1774 }
1775 return(0);
1776 }
1777
countstr(char * str,char ch)1778 int countstr( char *str, char ch )
1779 {
1780 int i,count=0;
1781 int len = strlen( str );
1782
1783 for ( i = 0; i < len; i++ )
1784 {
1785 if ( str[i] == ch )
1786 count++;
1787 }
1788 return count;
1789 }
1790
determine_rxstack_command(char * str)1791 char determine_rxstack_command( char *str )
1792 {
1793 if ( memcmpi( "/fifo:", str, ACTION_FIFO_LEN ) == 0
1794 && countstr( str, ':' ) >= 2 )
1795 return RXSTACK_QUEUE_FIFO;
1796 else if ( memcmpi( "/lifo:", str, ACTION_FIFO_LEN ) == 0
1797 && countstr( str, ':' ) >= 2 )
1798 return RXSTACK_QUEUE_LIFO;
1799 else if ( memcmpi( "/queued:", str, ACTION_QUEUED_LEN ) == 0
1800 && countstr( str, ':' ) == 1 )
1801 return RXSTACK_NUMBER_IN_QUEUE;
1802 else if ( memcmpi( "/clear:", str, ACTION_CLEAR_LEN ) == 0
1803 && countstr( str, ':' ) == 1 )
1804 return RXSTACK_EMPTY_QUEUE;
1805 else if ( memcmpi( "/pull:", str, ACTION_PULL_LEN ) == 0
1806 && countstr( str, ':' ) == 1 )
1807 return RXSTACK_PULL;
1808 else if ( memcmpi( "/create:", str, ACTION_CREATE_LEN ) == 0
1809 && countstr( str, ':' ) == 1 )
1810 return RXSTACK_CREATE_QUEUE;
1811 else if ( memcmpi( "/delete:", str, ACTION_DELETE_LEN ) == 0
1812 && countstr( str, ':' ) == 1 )
1813 return RXSTACK_DELETE_QUEUE;
1814 /* wrong action or incorrect number of colons - error */
1815 return 0;
1816 }
1817
1818 #define BAD_REQUEST "HTTP/1.1 400 Bad Request\r\n\r\n"
bad_request_from_websocket(Client * client)1819 int bad_request_from_websocket( Client *client )
1820 {
1821 int rc;
1822 int len = strlen( BAD_REQUEST );
1823
1824 rc = send( client->socket, BAD_REQUEST, len, 0 );
1825 if ( rc != len )
1826 {
1827 DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1828 return 0;
1829 }
1830 return 1;
1831 }
1832
rxstack_process_websockets_headers(Client * client)1833 int rxstack_process_websockets_headers( Client *client )
1834 {
1835 int rc = 1; /* by default successful return */
1836 char ws_headers[4096];
1837 char key[1024];
1838 char protocol[1024];
1839 char response[4096];
1840 char *start, *end;
1841 SHA1_HASH hash[SHA1_HASH_SIZE];
1842 Sha1Context context;
1843 unsigned char *strptr;
1844 int strlength;
1845 int len;
1846
1847 memset( ws_headers, 0, sizeof(ws_headers) );
1848 rc = recv( client->socket, ws_headers, 4096, 0 );
1849 if ( rc < 0 )
1850 {
1851 if ( os_errno != ECONNRESET )
1852 {
1853 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
1854 }
1855 /*
1856 * Assume client has been lost
1857 */
1858 rxstack_delete_client( client );
1859 return 0 ;
1860 }
1861 if ( rc == 0 )
1862 {
1863 DEBUGDUMP(printf("read empty WS header\n"););
1864 /*
1865 * Assume client has been lost
1866 */
1867 rxstack_delete_client( client );
1868 return 0 ;
1869 }
1870 DEBUGDUMP(printf("header: [%s]\nLength: %d\n",ws_headers,rc););
1871 /*
1872 * Parse the headers
1873 */
1874 start = ws_headers+4;
1875 /*
1876 * must specify "root" as location: ws://localhost:8888 or ws://localhost:8888/ only.
1877 * no path allowed after optional port
1878 */
1879 end = strstr( start, "/ HTTP/1.1" );
1880 if ( !end )
1881 {
1882 bad_request_from_websocket( client );
1883 return 0;
1884 }
1885 /* validate and get protocol header */
1886 start = strstr( ws_headers, "\r\nSec-WebSocket-Protocol: ");
1887 if (!start)
1888 {
1889 DEBUGDUMP(printf("No header: \"Sec-WebSocket-Protocol:\"\n"););
1890 bad_request_from_websocket( client );
1891 return 0;
1892 }
1893 /* protocol must be "rxstack" */
1894 start += 26;
1895 end = strstr( start, "\r\n");
1896 strncpy( protocol, start, end-start );
1897 protocol[end-start] = '\0';
1898 if ( strcmp( protocol, "rxstack" ) != 0 )
1899 {
1900 DEBUGDUMP(printf("Invalid Protocol: %s; should be \"rxstack\"\n",protocol););
1901 bad_request_from_websocket( client );
1902 return 0;
1903 }
1904 /* validate and get key header */
1905 start = strstr(ws_headers, "\r\nSec-WebSocket-Key: ");
1906 if (!start)
1907 {
1908 bad_request_from_websocket( client );
1909 return 0;
1910 }
1911 start += 21;
1912 end = strstr(start, "\r\n");
1913 strncpy( key, start, end-start );
1914 key[end-start] = '\0';
1915
1916 start = strstr(ws_headers, "\r\n\r\n");
1917 DEBUGDUMP(printf("end of headers at %x: [%s]\n",(unsigned int) start,start ););
1918
1919 /*
1920 * Generate our response key
1921 */
1922 Sha1Initialise( &context );
1923 Sha1Update( &context, key, strlen(key) );
1924 Sha1Update( &context, WS_RESPONSE_MAGIC, 36 );
1925 Sha1Finalise( &context, hash );
1926 if ( eec_base64_encode( (unsigned char *)hash, SHA1_HASH_SIZE, (unsigned char **)&strptr, &strlength ) != 0 )
1927 {
1928 DEBUGDUMP(printf("base64 encoding failed:\n" ););
1929 return 0;
1930 }
1931 DEBUGDUMP(printf("Base64 key: <%.*s>\n", strlength, strptr););
1932 /*
1933 * Respond with our headers
1934 */
1935 len = sprintf( response, WS_RESPONSE_HEADER, strlength, strptr );
1936 DEBUGDUMP(printf("Sending to %d Result:\n[%.*s]\n", client->socket, len, response););
1937 rc = send( client->socket, response, len, 0 );
1938 if ( rc != len )
1939 {
1940 DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
1941 return 0;
1942 }
1943 free( strptr );
1944 /*
1945 * Indicate we are a Websocket client
1946 */
1947 client->isWebsocket = 1;
1948 return rc;
1949 }
1950
decode_ws_payload(unsigned char * src,size_t srclength,unsigned char * target,size_t targsize,unsigned int * opcode,unsigned int * left)1951 int decode_ws_payload( unsigned char *src, size_t srclength, unsigned char *target, size_t targsize, unsigned int *opcode, unsigned int *left )
1952 {
1953 unsigned char *frame, *mask, *payload, save_char;
1954 int masked = 0;
1955 int i = 0, framecount = 0;
1956 size_t remaining;
1957 unsigned int target_offset = 0, hdr_length = 0, payload_length = 0, decoded_length;
1958
1959 *left = srclength;
1960 frame = src;
1961
1962 DEBUGDUMP(printf("Decode new frame\n"););
1963 while (1)
1964 {
1965 // Need at least two bytes of the header
1966 // Find beginning of next frame. First time hdr_length, masked and
1967 // payload_length are zero
1968 frame += hdr_length + 4*masked + payload_length;
1969 DEBUGDUMP(printf("frame[0..3]: 0x%x 0x%x 0x%x 0x%x (tot: %d)\n",(unsigned char) frame[0],(unsigned char) frame[1],(unsigned char) frame[2],(unsigned char) frame[3], (int) srclength););
1970
1971 if (frame > src + srclength)
1972 {
1973 DEBUGDUMP(printf("Truncated frame from client, need %d more bytes\n", (int) (frame - (src + srclength))););
1974 break;
1975 }
1976 remaining = (src + srclength) - frame;
1977 if (remaining < 2)
1978 {
1979 DEBUGDUMP(printf("Truncated frame header from client\n"););
1980 break;
1981 }
1982 framecount ++;
1983
1984 *opcode = frame[0] & 0x0f;
1985 masked = (frame[1] & 0x80) >> 7;
1986
1987 if (*opcode == 0x8)
1988 {
1989 // client sent orderly close frame
1990 break;
1991 }
1992
1993 payload_length = frame[1] & 0x7f;
1994 if (payload_length < 126)
1995 {
1996 hdr_length = 2;
1997 //frame += 2 * sizeof(char);
1998 }
1999 else if (payload_length == 126)
2000 {
2001 payload_length = (frame[2] << 8) + frame[3];
2002 hdr_length = 4;
2003 }
2004 else
2005 {
2006 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Receiving frames larger than 65535 bytes not supported" );
2007 return -1;
2008 }
2009 if ((hdr_length + 4*masked + payload_length) > remaining)
2010 {
2011 continue;
2012 }
2013 DEBUGDUMP(printf(" payload_length: %u, raw remaining: %u, , hdr_length: %d, opcode %d\n",payload_length, (int) remaining,hdr_length, *opcode););
2014 payload = frame + hdr_length + 4*masked;
2015
2016 if (*opcode != 1 && *opcode != 2)
2017 {
2018 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: Ignoring non-data frame, opcode 0x%x", *opcode );
2019 continue;
2020 }
2021
2022 if (payload_length == 0)
2023 {
2024 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Ignoring empty frame" );
2025 continue;
2026 }
2027
2028 if ((payload_length > 0) && (!masked))
2029 {
2030 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: %s", "Received unmasked payload from client" );
2031 return -1;
2032 }
2033
2034 // Terminate with a null for base64 decode
2035 save_char = payload[payload_length];
2036 payload[payload_length] = '\0';
2037 if ( debug )
2038 {
2039 DEBUGDUMP(printf("payload before masking: [%s]\nLength: %d\n",payload,payload_length););
2040 for ( i = 0; i < payload_length; i++ )
2041 DEBUGDUMP(printf("%x ",payload[i]););
2042 DEBUGDUMP(printf("\n"););
2043 }
2044 // unmask the data
2045 mask = payload - 4;
2046 for (i = 0; i < payload_length; i++)
2047 {
2048 payload[i] ^= mask[i%4];
2049 }
2050 if ( debug )
2051 {
2052 DEBUGDUMP(printf("payload after masking: [%s]\nLength: %d\n",payload,payload_length););
2053 for ( i = 0; i < payload_length; i++ )
2054 DEBUGDUMP(printf("%x ",payload[i]););
2055 DEBUGDUMP(printf("\n"););
2056 }
2057 #ifdef REQUIRE_BASE64_DECODE
2058 // base64 decode the data - not needed for text-only data ?
2059 if ( eec_base64_decode( (unsigned char *)payload, payload_length, &decoded_chunk, &decoded_length ) != 0 )
2060 {
2061 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_INTERNAL, "rxstack websocket interface error: Base64 decode error code %d", len );
2062 return -1;
2063 }
2064 memcpy( target+target_offset, decoded_chunk, decoded_length );
2065 free( decoded_chunk );
2066 #else
2067 memcpy( target+target_offset, payload, payload_length+1 ); /* +1 ensures nul terminated string */
2068 decoded_length = payload_length;
2069 #endif
2070
2071 // Restore the first character of the next frame
2072 payload[payload_length] = save_char;
2073 target_offset += decoded_length;
2074 }
2075
2076 if (framecount > 1)
2077 {
2078 DEBUGDUMP(printf("framecount: %d\n",framecount););
2079 }
2080
2081 *left = remaining;
2082 return target_offset;
2083 }
2084
split_ws_payload(char * ws_payload,int offset,streng ** queue,streng ** buffer)2085 int split_ws_payload( char *ws_payload, int offset, streng **queue, streng **buffer )
2086 {
2087 int i,buffer_start=0;
2088 int len = strlen( ws_payload );
2089 char *q = ws_payload+offset;
2090
2091 if ( ws_payload[offset] == '\0'
2092 || ( ws_payload[offset] == ':' && len == offset + 1 ) )
2093 {
2094 /* queue and buffer both empty */
2095 /* caters for:
2096 * /clear:
2097 * /fifo::
2098 */
2099 *queue = *buffer = NULL;
2100 return 0;
2101 }
2102 for ( i = offset; i < len; i++ )
2103 {
2104 if ( ws_payload[i] == ':' )
2105 {
2106 ws_payload[i] = '\0';
2107 *queue = Str_buf( q, i - offset );
2108 buffer_start = i+1;
2109 break;
2110 }
2111 }
2112 /* if no ':' found, buffer is empty but we have a queue */
2113 if ( buffer_start == 0 )
2114 {
2115 *buffer = NULL;
2116 *queue = Str_buf( q, len - offset );
2117 }
2118 else
2119 {
2120 *buffer = Str_buf( ws_payload+buffer_start, len - buffer_start + 1 );
2121 }
2122 return 0;
2123 }
2124
send_response_to_client(Client * client,char action,streng * buffer)2125 int send_response_to_client( Client *client, char action, streng *buffer )
2126 {
2127 int rc,length;
2128 char rcode[2];
2129 RxQueue *q ;
2130 streng *result=NULL;
2131
2132 switch( action )
2133 {
2134 case RXSTACK_QUEUE_FIFO:
2135 case RXSTACK_QUEUE_LIFO:
2136 DEBUGDUMP(printf("--- Queue %s ---\n", action == RXSTACK_QUEUE_FIFO ? "FIFO" : "LIFO"););
2137 /*
2138 * fixes bug 700539
2139 */
2140 if ( buffer == NULL )
2141 buffer = Str_buf( "", 0 );
2142 if ( buffer == NULL )
2143 rc = 3;
2144 else
2145 rc = rxstack_queue_data( client, buffer, action );
2146 rcode[0] = (char)(rc+'0');
2147 rxstack_send_return( client, rcode, NULL, 0 );
2148 buffer = NULL ; /* consumed by rxstack_queue_data */
2149 break;
2150 case RXSTACK_EXIT:
2151 DEBUGDUMP(printf("--- Exit ---\n"););
2152 /*
2153 * Client has requested disconnect, so remove all
2154 * references to the client
2155 */
2156 rxstack_send_return( client, "0", NULL, 0 );
2157 rxstack_delete_client( client );
2158 if ( buffer != NULL )
2159 {
2160 DROPSTRENG( buffer );
2161 buffer = NULL;
2162 }
2163 break;
2164 case RXSTACK_KILL:
2165 DEBUGDUMP(printf("--- Kill ---\n"););
2166 /*
2167 * Client has requested server to stop
2168 */
2169 rxstack_send_return( client, "0", NULL, 0 );
2170 rxstack_delete_client( client );
2171 running = 0;
2172 return 0;
2173 case RXSTACK_SET_QUEUE:
2174 DEBUGDUMP(printf("--- Set Queue ---\n"););
2175 /*
2176 * Set the default queue for the client
2177 */
2178 if ( buffer == NULL )
2179 rxstack_send_return( client, "6", NULL, 0 );
2180 else
2181 {
2182 q = rxstack_set_default_queue( client, buffer );
2183 if ( q == NULL )
2184 rxstack_send_return( client, "3", NULL, 0 );
2185 else
2186 rxstack_send_return( client, "0", q->name->value, q->name->len );
2187 DROPSTRENG( buffer );
2188 buffer = NULL;
2189 }
2190 break;
2191 case RXSTACK_EMPTY_QUEUE:
2192 DEBUGDUMP(printf("--- Empty Queue ---\n"););
2193 /*
2194 * Use the current queue as the default queue.
2195 */
2196 if ( buffer == NULL )
2197 buffer = client->default_queue->name;
2198 rc = rxstack_empty_queue( client, buffer );
2199 rcode[0] = (char)(rc+'0');
2200 rxstack_send_return( client, rcode, NULL, 0 );
2201 if ( buffer != client->default_queue->name )
2202 DROPSTRENG( buffer );
2203 buffer = NULL ;
2204 break;
2205 case RXSTACK_NUMBER_IN_QUEUE:
2206 DEBUGDUMP(printf("--- Number in Queue ---\n"););
2207 length = rxstack_number_in_queue( client );
2208 rxstack_send_return( client, "0", NULL, length );
2209 if ( buffer != NULL )
2210 {
2211 DROPSTRENG( buffer );
2212 buffer = NULL;
2213 }
2214 break;
2215 case RXSTACK_SHOW_QUEUES:
2216 DEBUGDUMP(printf("--- Show ---\n"););
2217 rc = rxstack_get_queues( client, &result );
2218 switch( rc )
2219 {
2220 case 0: /* all OK */
2221 rxstack_send_return( client, "0", PSTRENGVAL( result ), PSTRENGLEN( result ) );
2222 DROPSTRENG( result );
2223 break;
2224 default: /* empty/error */
2225 rcode[0] = (char)(rc+'0');
2226 rxstack_send_return( client, rcode, NULL, 0 );
2227 break;
2228 }
2229 if ( buffer != NULL )
2230 {
2231 DROPSTRENG( buffer );
2232 buffer = NULL;
2233 }
2234 break;
2235 case RXSTACK_PULL:
2236 case RXSTACK_FETCH:
2237 DEBUGDUMP(printf("--- Pull ---\n"););
2238 rc = rxstack_pull_line_off_queue( client, &result, action == RXSTACK_FETCH );
2239 switch( rc )
2240 {
2241 case 0: /* all OK */
2242 rxstack_send_return( client, "0", PSTRENGVAL( result ), PSTRENGLEN( result ) );
2243 DROPSTRENG( result );
2244 break;
2245 case RXSTACK_WAITING: /* still waiting; don't return */
2246 break;
2247 default: /* empty/error */
2248 rcode[0] = (char)(rc+'0');
2249 rxstack_send_return( client, rcode, NULL, 0 );
2250 break;
2251 }
2252 if ( buffer != NULL )
2253 {
2254 DROPSTRENG( buffer );
2255 buffer = NULL;
2256 }
2257 break;
2258 case RXSTACK_GET_QUEUE:
2259 DEBUGDUMP(printf("--- Get Queue ---\n"););
2260 rxstack_send_return( client, "0", PSTRENGVAL(client->default_queue->name), PSTRENGLEN(client->default_queue->name) ) ;
2261 if ( buffer != NULL )
2262 {
2263 DROPSTRENG( buffer );
2264 buffer = NULL;
2265 }
2266 break;
2267 case RXSTACK_CREATE_QUEUE:
2268 DEBUGDUMP(printf("--- Create Queue ---\n"););
2269 /*
2270 * Create a new queue
2271 */
2272 rc = rxstack_create_queue( client, buffer, &result );
2273 rcode[0] = (char)(rc+'0');
2274 if ( ( rc != 1 ) && ( rc != 0 ) )
2275 rxstack_send_return( client, rcode, NULL, 0 );
2276 else
2277 rxstack_send_return( client, rcode, PSTRENGVAL(result), PSTRENGLEN(result) );
2278 buffer = NULL; /* consumed by rxstack_create_queue */
2279 break;
2280 case RXSTACK_DELETE_QUEUE:
2281 DEBUGDUMP(printf("--- Delete Queue ---\n"););
2282 /*
2283 * Delete the queue
2284 */
2285 if ( buffer == NULL )
2286 rc = 6;
2287 else
2288 rc = rxstack_delete_queue( client, buffer );
2289 rcode[0] = (char)(rc+'0');
2290 rxstack_send_return( client, rcode, NULL, 0 );
2291 if ( buffer != NULL )
2292 {
2293 DROPSTRENG( buffer );
2294 buffer = NULL;
2295 }
2296 break;
2297 case RXSTACK_TIMEOUT_QUEUE:
2298 DEBUGDUMP(printf("--- Timeout Queue ---\n"););
2299 /*
2300 * Set timeout for pull from queue
2301 */
2302 if ( buffer == NULL )
2303 rc = 6;
2304 else
2305 rc = rxstack_timeout_queue( client, buffer );
2306 rcode[0] = (char)(rc+'0');
2307 rxstack_send_return( client, rcode, NULL, 0 );
2308 if ( buffer != NULL )
2309 DROPSTRENG( buffer );
2310 buffer = NULL;
2311 break;
2312 case RXSTACK_UNKNOWN:
2313 /* do nothing */
2314 break;
2315 default:
2316 rxstack_send_return( client, "9", NULL, 0 );
2317 break;
2318 }
2319 assert( buffer == NULL ) ;
2320 if ( buffer != NULL )
2321 DROPSTRENG( buffer ) ;
2322 return 1;
2323 }
2324
rxstack_process_websockets_data(Client * client)2325 int rxstack_process_websockets_data( Client *client )
2326 {
2327 int rc = 1;
2328 char ws_headers[8192];
2329 char ws_payload[4096];
2330 char ws_response[4096];
2331 char response[4096];
2332 int response_length;
2333 int len,i,rcode;
2334 unsigned int opcode, left;
2335 streng *queue=NULL, *buffer=NULL, *result;
2336 streng *resp_streng, *rcode_streng;
2337 char action;
2338 int length;
2339 char srcode[3];
2340 RxQueue *q ;
2341
2342 srcode[1] = ':';
2343 srcode[2] = '\0';
2344 /* assumes only 4096 bytes can b sent by client and in one chunk */
2345 memset( ws_headers, 0, sizeof(ws_headers) );
2346 rc = recv( client->socket, ws_headers, 4096, 0 );
2347 if ( rc < 0 )
2348 {
2349 if ( os_errno != ECONNRESET )
2350 {
2351 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2352 }
2353 /*
2354 * Assume client has been lost
2355 */
2356 rxstack_delete_client( client );
2357 return 0 ;
2358 }
2359 if ( rc == 0 )
2360 {
2361 DEBUGDUMP(printf("read empty WS data\n"););
2362 /*
2363 * Assume client has been lost
2364 */
2365 rxstack_delete_client( client );
2366 return 0 ;
2367 }
2368 if ( debug )
2369 {
2370 DEBUGDUMP(printf("header: [%s]\nLength: %d\n",ws_headers,rc););
2371 for ( i = 0; i < rc; i++ )
2372 DEBUGDUMP(printf("%x ",ws_headers[i]););
2373 DEBUGDUMP(printf("\n"););
2374 }
2375 /*
2376 * Decode the websocket payload
2377 */
2378 rcode = decode_ws_payload( (unsigned char *)ws_headers, rc, (unsigned char *)ws_payload, sizeof(ws_payload), &opcode, &left );
2379 DEBUGDUMP(printf("after decode payload: rcode: %d remaining: %d payload:[%s]\n",rcode,left,ws_payload););
2380 if ( opcode == 0x8 )
2381 {
2382 /* client sent close request; send back a close opcode message with empty payload */
2383 len = encode_ws_payload( (unsigned char *)"", 0, ws_response, sizeof(ws_response), opcode);
2384 rc = send( client->socket, ws_response, len, 0 );
2385 if ( rc != len )
2386 {
2387 DEBUGDUMP(printf("Send failed: rc> %d != len> %d errno = %d\n", rc, len, os_errno ););
2388 return 0;
2389 }
2390 rxstack_delete_client( client );
2391 }
2392 else
2393 {
2394 DEBUGDUMP(printf("got valid command:[%s]\n",ws_payload););
2395 /* process command from client - TODO */
2396 action = determine_rxstack_command( ws_payload );
2397 if ( action == 0 )
2398 {
2399 DEBUGDUMP(printf("decoded invalid command: opcode %d\n",opcode););
2400 /* wrong action or syntax error */
2401 len = sprintf( ws_headers, "9:Incorrect rxstack command or invalid format of command: %s", ws_payload );
2402 rxstack_send_return( client, &action, ws_headers, len );
2403 }
2404 else
2405 {
2406 DEBUGDUMP(printf("decoded valid command:[%c]\n",action););
2407 /* valid action: process the request and send response */
2408 switch( action )
2409 {
2410 case RXSTACK_QUEUE_FIFO:
2411 case RXSTACK_QUEUE_LIFO:
2412 DEBUGDUMP(printf("WS: Queue %s ---\n", action == RXSTACK_QUEUE_FIFO ? "FIFO" : "LIFO"););
2413 split_ws_payload( ws_payload, ACTION_FIFO_LEN, &queue, &buffer );
2414 if ( queue
2415 && PSTRENGLEN( queue ) )
2416 {
2417 q = rxstack_set_default_queue( client, queue );
2418 if ( q == NULL )
2419 // errorrxstack_send_return( client, "3", NULL, 0 );
2420 break;
2421 }
2422 /*
2423 * fixes bug 700539
2424 */
2425 if ( buffer == NULL )
2426 buffer = Str_buf( "", 0 );
2427 if ( buffer == NULL )
2428 rc = 3;
2429 else
2430 rc = rxstack_queue_data( client, buffer, action );
2431
2432 response_length = sprintf( response, "%d:", rc );
2433 DEBUGDUMP(printf(" Response: [%s] Len: %d\n",response,response_length););
2434 rxstack_send_return( client, &action, response, response_length );
2435 buffer = NULL ; /* consumed by rxstack_queue_data */
2436 break;
2437 case RXSTACK_EMPTY_QUEUE:
2438 DEBUGDUMP(printf("WS: Empty Queue ---\n"););
2439 split_ws_payload( ws_payload, ACTION_CLEAR_LEN, &queue, &buffer );
2440 if ( !queue
2441 || PSTRENGLEN( queue ) == 0 )
2442 queue = client->default_queue->name;
2443 rc = rxstack_empty_queue( client, queue );
2444 srcode[0] = (char)(rc+'0');
2445 rxstack_send_return( client, &action, srcode, 2 );
2446 break;
2447 case RXSTACK_NUMBER_IN_QUEUE:
2448 DEBUGDUMP(printf("WS: Number in Queue ---\n"););
2449 split_ws_payload( ws_payload, ACTION_QUEUED_LEN, &queue, &buffer );
2450 if ( queue
2451 && PSTRENGLEN( queue ) )
2452 {
2453 q = rxstack_set_default_queue( client, queue );
2454 if ( q == NULL )
2455 // errorrxstack_send_return( client, "3", NULL, 0 );
2456 break;
2457 }
2458 length = rxstack_number_in_queue( client );
2459 response_length = sprintf( response, "0:%d", length );
2460 DEBUGDUMP(printf(" Response: [%s] Len: %d\n",response,response_length););
2461 rxstack_send_return( client, &action, response, response_length );
2462 break;
2463 case RXSTACK_PULL:
2464 DEBUGDUMP(printf("WS: Pull ---\n"););
2465 split_ws_payload( ws_payload, ACTION_PULL_LEN, &queue, &buffer );
2466 if ( queue
2467 && PSTRENGLEN( queue ) )
2468 {
2469 q = rxstack_set_default_queue( client, queue );
2470 if ( q == NULL )
2471 // errorrxstack_send_return( client, "3", NULL, 0 );
2472 break;
2473 }
2474 rc = rxstack_pull_line_off_queue( client, &result, action == RXSTACK_FETCH );
2475 srcode[0] = (char)(rc+'0');
2476 switch( rc )
2477 {
2478 case 0: /* all OK */
2479 rcode_streng = Str_buf( srcode, 2 );
2480 if ( rcode_streng == NULL )
2481 {
2482 rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2483 }
2484 else
2485 {
2486 resp_streng = Str_cat( rcode_streng, result );
2487 if ( resp_streng == NULL )
2488 {
2489 rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2490 DROPSTRENG( rcode_streng );
2491 }
2492 else
2493 {
2494 rxstack_send_return( client, &action, PSTRENGVAL( resp_streng ), PSTRENGLEN( resp_streng ) );
2495 DROPSTRENG( result );
2496 DROPSTRENG( resp_streng );
2497 DROPSTRENG( rcode_streng );
2498 }
2499 }
2500 break;
2501 case RXSTACK_WAITING: /* still waiting; don't return */
2502 break;
2503 default: /* empty/error */
2504 rxstack_send_return( client, &action, srcode, 2 );
2505 break;
2506 }
2507 break;
2508 case RXSTACK_CREATE_QUEUE:
2509 DEBUGDUMP(printf("WS: Create Queue ---\n"););
2510 /*
2511 * Create a new queue
2512 */
2513 split_ws_payload( ws_payload, ACTION_CREATE_LEN, &queue, &buffer );
2514 rc = rxstack_create_queue( client, queue, &result );
2515 srcode[0] = (char)(rc+'0');
2516 if ( ( rc != 1 ) && ( rc != 0 ) )
2517 rxstack_send_return( client, &action, srcode, 2 );
2518 else
2519 {
2520 rcode_streng = Str_buf( srcode, 2 );
2521 if ( rcode_streng == NULL )
2522 {
2523 rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2524 }
2525 else
2526 {
2527 resp_streng = Str_cat( rcode_streng, result );
2528 if ( resp_streng == NULL )
2529 {
2530 rxstack_send_return( client, &action, "9:rxstack: out of memory", 23 );
2531 DROPSTRENG( rcode_streng );
2532 }
2533 else
2534 {
2535 rxstack_send_return( client, &action, PSTRENGVAL( resp_streng ), PSTRENGLEN( resp_streng ) );
2536 DROPSTRENG( result );
2537 DROPSTRENG( resp_streng );
2538 DROPSTRENG( rcode_streng );
2539 }
2540 }
2541 }
2542 buffer = NULL; /* consumed by rxstack_create_queue */
2543 break;
2544 case RXSTACK_DELETE_QUEUE:
2545 DEBUGDUMP(printf("WS: Delete Queue ---\n"););
2546 /*
2547 * Delete the queue
2548 */
2549 split_ws_payload( ws_payload, ACTION_DELETE_LEN, &queue, &buffer );
2550 if ( !queue
2551 || PSTRENGLEN( queue ) == 0 )
2552 rc = 6;
2553 else
2554 rc = rxstack_delete_queue( client, queue );
2555 srcode[0] = (char)(rc+'0');
2556 rxstack_send_return( client, &action, srcode, 2 );
2557 break;
2558 default:
2559 break;
2560 }
2561 if ( buffer )
2562 DROPSTRENG( buffer ) ;
2563 if ( queue
2564 && queue != client->default_queue->name )
2565 DROPSTRENG( queue ) ;
2566 }
2567 }
2568
2569 DEBUGDUMP(printf("after valid response: rcode: %d\n",rcode););
2570
2571 return rc;
2572 }
2573
rxstack_process_traditional_command(Client * client)2574 int rxstack_process_traditional_command( Client * client )
2575 {
2576 char cheader[RXSTACK_HEADER_SIZE];
2577 streng *header;
2578 streng *buffer = NULL ;
2579 int rc,length;
2580 memset( cheader, 0, sizeof(cheader) );
2581 DEBUGDUMP(printf("\nreading from socket %d\n", client->socket););
2582 rc = recv( client->socket, cheader, RXSTACK_HEADER_SIZE, 0 );
2583 if ( rc < 0 )
2584 {
2585 if ( os_errno != ECONNRESET )
2586 {
2587 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2588 }
2589 /*
2590 * Assume client has been lost
2591 */
2592 rxstack_delete_client( client );
2593 return 0 ;
2594 }
2595 if ( rc == 0 )
2596 {
2597 DEBUGDUMP(printf("read empty header\n"););
2598 /*
2599 * Assume client has been lost
2600 */
2601 rxstack_delete_client( client );
2602 return 0 ;
2603 }
2604 else if ( rc != RXSTACK_HEADER_SIZE )
2605 {
2606 DEBUGDUMP(printf("read corrupted header\n"););
2607 /*
2608 * Assume client has been lost
2609 */
2610 rxstack_delete_client( client );
2611 return 0 ;
2612 }
2613 DEBUGDUMP(printf("header: %.*s\n",RXSTACK_HEADER_SIZE,cheader););
2614 header = MakeStreng( RXSTACK_HEADER_SIZE - 1 );
2615 if ( header == NULL )
2616 {
2617 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2618 exit( ERR_STORAGE_EXHAUSTED );
2619 }
2620 memcpy( PSTRENGVAL(header), cheader+1, RXSTACK_HEADER_SIZE-1 );
2621 header->len = RXSTACK_HEADER_SIZE-1 ;
2622 buffer = NULL;
2623 /*
2624 * Convert the data length
2625 */
2626 length = REXX_X2D( header, &rc );
2627 if ( rc )
2628 {
2629 /*
2630 * Errorneous number. Kill the client.
2631 */
2632 DEBUGDUMP(printf("Invalid header: <%.*s>, client killed\n", header->len, header->value););
2633 rxstack_send_return( client, "9", NULL, 0 );
2634 rxstack_delete_client( client );
2635 return 1;
2636 }
2637
2638 DEBUGDUMP(printf("Header: <%.*s> length: %d\n", header->len, header->value, length););
2639 DROPSTRENG( header );
2640 if ( length > 0 )
2641 {
2642 /*
2643 * Allocate a streng big enough for the expected data
2644 * string, based on the length just read; even if the length
2645 * is zero
2646 */
2647 buffer = MAKESTRENG ( length );
2648 if ( buffer == NULL )
2649 {
2650 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2651 DEBUGDUMP(printf("can't buffer input of client\n"););
2652 rxstack_delete_client( client );
2653 return 0;
2654 }
2655 rc = recv( client->socket, PSTRENGVAL(buffer), length, 0 );
2656 if ( rc < 0 )
2657 {
2658 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2659 }
2660 else if ( rc == 0 )
2661 {
2662 /*
2663 * All we can assume here is that the client has been lost
2664 */
2665 DEBUGDUMP(printf("read empty header\n"););
2666 rxstack_delete_client( client );
2667 DROPSTRENG( buffer ) ;
2668 return 0 ;
2669 }
2670 else
2671 buffer->len = length ;
2672 }
2673 send_response_to_client( client, cheader[0], buffer );
2674 return 1;
2675 }
2676
2677 /* rxstack_process_command reads a new command from the client and processes
2678 * it.
2679 * returns 0 if the client has been terminated, 1 if the client persists.
2680 */
rxstack_process_command(Client * client)2681 int rxstack_process_command( Client *client )
2682 {
2683 char pheader[RXSTACK_PEEK_HEADER_SIZE];
2684 int rc;
2685
2686 if ( client->deadline.milli != -1 )
2687 {
2688 /*
2689 * interrupted wait, assume the client don't want to wait for data any
2690 * longer
2691 */
2692 bad_news_for_waiter( client->default_queue, client ) ;
2693 }
2694 memset( pheader, 0, sizeof(pheader) );
2695 DEBUGDUMP(printf("\npeeking from socket %d\n", client->socket););
2696 rc = recv( client->socket, pheader, RXSTACK_PEEK_HEADER_SIZE, MSG_PEEK );
2697 if ( rc < 0 )
2698 {
2699 if ( os_errno != ECONNRESET )
2700 {
2701 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
2702 }
2703 /*
2704 * Assume client has been lost
2705 */
2706 rxstack_delete_client( client );
2707 return 0 ;
2708 }
2709 if ( rc == 0 )
2710 {
2711 DEBUGDUMP(printf("read empty header\n"););
2712 /*
2713 * Assume client has been lost
2714 */
2715 rxstack_delete_client( client );
2716 return 0 ;
2717 }
2718 else if ( rc != RXSTACK_PEEK_HEADER_SIZE )
2719 {
2720 DEBUGDUMP(printf("read corrupted header\n"););
2721 /*
2722 * Assume client has been lost
2723 */
2724 rxstack_delete_client( client );
2725 return 0 ;
2726 }
2727 DEBUGDUMP(printf("peek at 1st two bytes of header: 0x%x%x\n", pheader[0],pheader[1]););
2728 /*
2729 * We have peeked at the first 2 bytes of the message. determine what type of connection it is:
2730 * GE - header for Websocket interface (start of GET)
2731 * first byte < x20 - Websocket data
2732 * anything else - assume traditional interface
2733 */
2734 if ( memcmp( pheader, "GE", 2 ) == 0 )
2735 {
2736 DEBUGDUMP(printf("have a WS header...\n"););
2737 rc = rxstack_process_websockets_headers( client );
2738 }
2739 else if ( client->isWebsocket )
2740 {
2741 DEBUGDUMP(printf("have WS data...\n"););
2742 rc = rxstack_process_websockets_data( client );
2743 }
2744 else
2745 {
2746 DEBUGDUMP(printf("have normal data...\n"););
2747 rc = rxstack_process_traditional_command( client );
2748 }
2749 return rc;
2750 }
2751
2752 /*
2753 * earlier returns the time stamp which is more early.
2754 */
earlier(RxTime t1,RxTime t2)2755 static RxTime earlier( RxTime t1, RxTime t2 )
2756 {
2757 if ( time_diff( t1, t2 ) == -2 )
2758 return t1 ;
2759 return t2 ;
2760 }
2761
2762 /* check_for_waiting checks a client for a pending IO request.
2763 * We currently only support PULL requests.
2764 * The function returns immediately if the client doesn't wait.
2765 * In the other case it checks whether the maximum wait time has been
2766 * expired.
2767 * If the client is still waiting and the deadline isn't reached, it
2768 * checks if the deadline is more early then next_timeout and sets this
2769 * value is necessary.
2770 *
2771 * The client's queue's deadline is set to the default deadline of queues in
2772 * all cases.
2773 */
check_for_waiting(Client * client,RxTime * next_timeout)2774 void check_for_waiting( Client *client, RxTime *next_timeout )
2775 {
2776 int diff ;
2777
2778 client->default_queue->deadline = queue_deadline ;
2779 /* Do we are a waiter? */
2780 if ( client->deadline.milli == -1 )
2781 return ;
2782
2783 /*
2784 * Check if there is anything in the queue...
2785 */
2786 diff = time_diff( client->deadline, now ) ;
2787 if ( ( diff != -2 ) && ( diff != 0 ) )
2788 {
2789 DEBUGDUMP(
2790 if ( diff == -1 )
2791 printf("Still waiting infinitely for %d\n", client->socket );
2792 else
2793 printf("Still waiting %d ms at max for %d\n", diff, client->socket );
2794 );
2795 *next_timeout = earlier( *next_timeout, client->deadline ) ;
2796 }
2797 else
2798 {
2799 bad_news_for_waiter( client->default_queue, client ) ;
2800 }
2801 }
2802
2803 /* check_queue checks a queue has reached its deadline and if no clients
2804 * have this queue as the default queue. The queue is deleted in this
2805 * case.
2806 * If the queuet is still valid and the deadline isn't reached, it
2807 * checks if the deadline is more early then next_timeout and sets this
2808 * value is necessary.
2809 */
check_queue(RxQueue * q,RxTime * next_timeout)2810 void check_queue( RxQueue *q, RxTime *next_timeout )
2811 {
2812 int diff ;
2813 Client *c ;
2814
2815 diff = time_diff( q->deadline, now ) ;
2816 if ( ( diff != -2 ) && ( diff != 0 ) )
2817 {
2818 *next_timeout = earlier( *next_timeout, q->deadline ) ;
2819 return ;
2820 }
2821
2822 for ( c = clients; c != NULL; c = c->next )
2823 {
2824 if ( c->default_queue == q )
2825 {
2826 q->deadline = queue_deadline ;
2827 *next_timeout = earlier( *next_timeout, q->deadline ) ;
2828 return ;
2829 }
2830 }
2831
2832 DEBUGDUMP( printf( "Purging unused queue <%.*s>\n", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ) );
2833 q->deadline = queue_deadline ; /* needed at least for SESSION */
2834 *next_timeout = earlier( *next_timeout, q->deadline ) ;
2835 delete_a_queue( q ) ;
2836 }
2837
rxstack_doit()2838 int rxstack_doit( )
2839 {
2840 RxTime timeout ;
2841 int listen_sock,msgsock;
2842 struct sockaddr_in server,client;
2843 #ifdef HAVE_SOCKLEN_T
2844 socklen_t client_size ;
2845 #else
2846 int client_size ;
2847 #endif
2848 int rc;
2849 Client *c, *ch ;
2850 RxQueue *q, *qh ;
2851 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2852 # define POLL_INCR 16
2853 struct pollfd *pd = NULL ;
2854 unsigned poll_max = 0 ;
2855 unsigned poll_cnt = 0 ;
2856 #else
2857 int max_sock ;
2858 fd_set ready ;
2859 struct timeval to ;
2860 #endif
2861 #ifdef BUILD_NT_SERVICE
2862 char buf[30];
2863 #endif
2864 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
2865 int on = 1;
2866 #endif
2867
2868 client_size = sizeof( struct sockaddr );
2869 #ifdef WIN32
2870 if ( init_external_queue( NULL ) )
2871 return 1;
2872 #endif
2873
2874 #ifdef BUILD_NT_SERVICE
2875 if ( IsItNT()
2876 && !report_service_pending_start() )
2877 goto notrunning;
2878 #endif
2879 /*
2880 * Set up signal handler
2881 */
2882 #ifdef SIGTERM
2883 signal( SIGTERM, rxstack_signal_handler );
2884 #endif
2885 #ifdef SIGINT
2886 signal( SIGINT, rxstack_signal_handler );
2887 #endif
2888 #ifdef SIGBREAK
2889 signal( SIGBREAK, rxstack_signal_handler );
2890 #endif
2891 #ifdef SIGPIPE
2892 signal( SIGPIPE, SIG_IGN );
2893 #endif
2894 clients = NULL ;
2895 queues = NULL ;
2896
2897 /*
2898 * Initialise default "SESSION" queue
2899 */
2900 if ( ( SESSION = get_new_queue( ) ) == NULL )
2901 {
2902 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL ) ;
2903 return ERR_STORAGE_EXHAUSTED ;
2904 }
2905 SESSION->name = Str_cre_or_exit( "SESSION", 7 ) ;
2906 SESSION->isReal = 1;
2907
2908 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2909 pd = (struct pollfd *)malloc( ( poll_max = POLL_INCR ) * sizeof( struct pollfd ) );
2910 if ( pd == NULL )
2911 {
2912 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2913 exit( ERR_STORAGE_EXHAUSTED );
2914 }
2915 #endif
2916
2917 #ifdef BUILD_NT_SERVICE
2918 if ( IsItNT()
2919 && !report_service_pending_start() )
2920 goto notrunning;
2921 #endif
2922 /*
2923 * Create listener socket
2924 */
2925 listen_sock = socket(AF_INET, SOCK_STREAM, 0);
2926 if (listen_sock < 0)
2927 {
2928 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Listening on socket", errno_str( os_errno ) );
2929 rxstack_cleanup();
2930 exit(ERR_RXSTACK_GENERAL);
2931 }
2932 memset( &server, 0, sizeof(server) );
2933 server.sin_family = AF_INET;
2934 if ( world )
2935 server.sin_addr.s_addr = htonl(INADDR_ANY);
2936 else
2937 server.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
2938 if ( portno == 0 )
2939 portno = default_port_number();
2940 server.sin_port = htons((unsigned short) portno);
2941
2942 #ifdef BUILD_NT_SERVICE
2943 if ( IsItNT()
2944 && !report_service_pending_start() )
2945 goto notrunning;
2946 #endif
2947
2948 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
2949 setsockopt( listen_sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof( on ) );
2950 #endif
2951 if ( bind(listen_sock, (struct sockaddr *)&server, sizeof(server)) < 0)
2952 {
2953 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Error binding socket", errno_str( os_errno ) );
2954 rxstack_cleanup();
2955 exit( ERR_RXSTACK_GENERAL );
2956 }
2957 #ifdef BUILD_NT_SERVICE
2958 sprintf(buf, "Listening on port: %ld", portno );
2959 if ( IsItNT() )
2960 {
2961 if ( !report_service_start() )
2962 goto notrunning;
2963 if ( debug == 1 )
2964 {
2965 printf( "%s\n", buf );
2966 fflush(stdout);
2967 }
2968 else
2969 {
2970 AddToMessageLog(TEXT(buf));
2971 }
2972 }
2973 else
2974 {
2975 printf( "%s\n", buf );
2976 fflush(stdout);
2977 }
2978 #else
2979 printf( "rxstack listening on port: %ld\n", portno );
2980 fflush(stdout);
2981 #endif
2982 /*
2983 * Start accepting connections
2984 */
2985 listen(listen_sock, 5);
2986 timeout = get_now( ) ;
2987 time_add( &timeout, DEFAULT_WAKEUP ) ;
2988 queue_deadline = now ;
2989 time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
2990 while ( running )
2991 {
2992 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2993 poll_cnt = 0 ;
2994 pd[ poll_cnt ].events = POLLIN ;
2995 pd[ poll_cnt++ ].fd = listen_sock ;
2996 DEBUGDUMP(printf("****** poll((%d", listen_sock););
2997 for ( c = clients; c != NULL; c = c->next )
2998 {
2999 if ( c->socket == -1 )
3000 {
3001 continue ;
3002 }
3003
3004 if ( poll_cnt == poll_max )
3005 {
3006 pd = (struct pollfd *)realloc( pd, ( poll_max += POLL_INCR ) * sizeof( struct pollfd ) );
3007 if ( pd == NULL )
3008 {
3009 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
3010 exit( ERR_STORAGE_EXHAUSTED );
3011 }
3012 }
3013
3014 pd[ poll_cnt ].events = POLLIN ;
3015 pd[ poll_cnt++ ].fd = c->socket ;
3016 DEBUGDUMP(printf(", %d", c->socket););
3017 }
3018 #else
3019 FD_ZERO(&ready);
3020 FD_SET(listen_sock, &ready);
3021 DEBUGDUMP(printf("****** select((%d", listen_sock););
3022 max_sock = listen_sock;
3023 /*
3024 * For each connected client, allow its socket
3025 * to be triggered
3026 */
3027 for ( c = clients; c != NULL; c = c->next )
3028 {
3029 if ( c->socket != -1 )
3030 {
3031 DEBUGDUMP(printf(", %d", c->socket););
3032 FD_SET( c->socket, &ready );
3033 if ( c->socket > max_sock )
3034 max_sock = c->socket;
3035 }
3036 }
3037 #endif
3038 now = get_now( ) ;
3039 rc = time_diff( timeout, now ) ;
3040 if ( rc == -2 )
3041 rc = 0 ; /* already timed out */
3042 if ( ( rc == -1 ) || ( rc > DEFAULT_WAKEUP ) )
3043 rc = DEFAULT_WAKEUP ;
3044 DEBUGDUMP(printf("), to=%d) ms at %ld,%03d\n", rc, (long) now.seconds, now.milli ););
3045 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3046 rc = poll( pd, poll_cnt, rc ) ;
3047 #else
3048 to.tv_usec = ( rc % 1000 ) * 1000 ; /* microseconds fraction */
3049 to.tv_sec = rc / 1000;
3050 rc = select( max_sock + 1, &ready, (fd_set *)0, (fd_set *)0, &to ) ;
3051 #endif
3052 now = get_now( ) ;
3053 DEBUGDUMP(printf("****** after waiting(), rc=%d at %ld,%03d\n", rc, (long) now.seconds, now.milli ););
3054 if ( rc < 0 )
3055 {
3056 if ( os_errno != EINTR ) /* Win32 doesn't know about it ? */
3057 {
3058 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling select", errno_str( os_errno ) );
3059 exit( ERR_RXSTACK_GENERAL );
3060 }
3061 continue ;
3062 }
3063 if ( rc )
3064 {
3065 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3066 if ( pd[ 0 ].revents )
3067 #else
3068 if ( FD_ISSET(listen_sock, &ready ) )
3069 #endif
3070 {
3071 msgsock = accept(listen_sock, (struct sockaddr *)&client, &client_size);
3072 if (msgsock == -1)
3073 {
3074 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling listen", errno_str( os_errno ) );
3075 rxstack_cleanup();
3076 exit( ERR_RXSTACK_GENERAL );
3077 }
3078 else
3079 {
3080 /*
3081 * A client has connected, create a client entry
3082 * and set their default queue to SESSION
3083 */
3084 /*
3085 * Validate the client here...TBD
3086 * use details in client sockaddr struct
3087 */
3088 DEBUGDUMP(printf("Client connecting from %s has socket %d\n", inet_ntoa( client.sin_addr ), msgsock ););
3089 rxstack_create_client( msgsock );
3090 }
3091 }
3092 else
3093 {
3094 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3095 for ( c = clients, poll_cnt = 1; c != NULL; poll_cnt++ )
3096 #else
3097 for ( c = clients; c != NULL; )
3098 #endif
3099 {
3100 /* c might be deleted by the following calls.
3101 * Assure we have everything perfect to use the
3102 * next element. An access to c after rxstack_process_command
3103 * is forbidden.
3104 */
3105 ch = c ;
3106 c = c->next ;
3107 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
3108 if ( pd[ poll_cnt ].revents )
3109 #else
3110 if ( FD_ISSET( ch->socket, &ready ) )
3111 #endif
3112 {
3113 DEBUGDUMP(printf("Client with socket %d ready for reading\n", ch->socket ););
3114 /*
3115 * Process the client's command...
3116 */
3117 rxstack_process_command( ch ) ;
3118 }
3119 }
3120 }
3121 }
3122 /*
3123 * If select() timed out or received input, check all connected clients who
3124 * may be waiting for input on one of the queues.
3125 *
3126 * now contains the time between the start of select() call and now
3127 * in milliseconds
3128 */
3129 now = get_now();
3130 timeout = get_now( ) ;
3131 time_add( &timeout, DEFAULT_WAKEUP ) ;
3132 queue_deadline = now ;
3133 time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
3134 for ( c = clients; c != NULL; c = c->next )
3135 {
3136 check_for_waiting( c, &timeout );
3137 }
3138 for ( q = queues; q != NULL; )
3139 {
3140 qh = q ;
3141 q = q->next ;
3142 check_queue( qh, &timeout );
3143 }
3144 }
3145 #ifdef BUILD_NT_SERVICE
3146 notrunning:
3147 #endif
3148 return 0;
3149 }
3150
3151 /*
3152 * Gives a short usage description on stderr and returns 1
3153 */
usage(const char * argv0)3154 int usage( const char *argv0 )
3155 {
3156 fprintf( stdout, "\n%s: %s (%d bit). All rights reserved.\n", argv0, PARSE_VERSION_STRING, REGINA_BITS );
3157 fprintf( stdout,"Regina is distributed under the terms of the GNU Library Public License \n" );
3158 fprintf( stdout,"and comes with NO WARRANTY. See the file COPYING-LIB for details.\n" );
3159 fprintf( stdout,"\n%s [switches]\n", argv0 );
3160 fprintf( stdout,"where switches are:\n\n" );
3161 fprintf( stdout," --help, -h show this message\n" );
3162 fprintf( stdout," --version, -v display Regina version and exit\n" );
3163 fprintf( stdout," --debug, -D turn on debugging\n" );
3164 #if defined(HAVE_FORK)
3165 fprintf( stdout," --daemon, -d run %s as a daemon process\n", argv0 );
3166 #endif
3167 fprintf( stdout," --kill, -k kill the running %s process\n", argv0 );
3168 fprintf( stdout," --port=portno, -pportno listen on TCP port portno\n");
3169 fprintf( stdout," --world, -w allow connections from anywhere. By default only connections from localhost allowed\n");
3170 fflush( stdout );
3171 return 1 ;
3172 }
3173
checkDebug(void)3174 static void checkDebug(void)
3175 {
3176 if ( getenv( "RXDEBUG" ) != NULL )
3177 debug = 1 ;
3178 }
3179
runNormal(int argc,char ** argv)3180 int runNormal( int argc, char **argv )
3181 {
3182 int rc = 0 ;
3183 int c;
3184 const char *argv0 = argv[ 0 ] ;
3185 static struct my_getopt_option long_options[] =
3186 {
3187 {"help", no_argument, 0, 'h' },
3188 {"version", no_argument, 0, 'v' },
3189 {"debug", no_argument, 0, 'D' },
3190 {"kill", no_argument, 0, 'k' },
3191 {"world", no_argument, 0, 'w' },
3192 {"port", required_argument, 0, 'p' },
3193 {"daemon", no_argument, 0, 'd' },
3194 {0, 0, 0, 0 }
3195 };
3196
3197 checkDebug();
3198 while (1)
3199 {
3200 int option_index = 0;
3201
3202 c = my_getopt_long( argc, argv, "+hvDdkwp:", long_options, &option_index );
3203 if ( c == -1 )
3204 break;
3205
3206 switch(c)
3207 {
3208 case 'h':
3209 return usage( argv0 );
3210 break;
3211 case 'D':
3212 debug = 1;
3213 putenv( "RXDEBUG=1" ) ;
3214 break;
3215 case 'd':
3216 #ifndef BUILD_NT_SERVICE
3217 isdaemon = 1;
3218 #endif
3219 break;
3220 case 'k':
3221 tosuicide = 1;
3222 break;
3223 case 'w':
3224 world = 1;
3225 break;
3226 case 'p':
3227 portno = atol( optarg );
3228 if ( portno == 0
3229 || portno > 0xFFFF )
3230 {
3231 fprintf( stderr, "Option \"-p\" or \"--port\" requires integer between 1 and 65535.\n" ) ;
3232 return usage( argv0 );
3233 }
3234 break;
3235 case 'v':
3236 fprintf( stderr, "%s: %s (%d bit)\n", argv0, PARSE_VERSION_STRING, REGINA_BITS );
3237 return 0;
3238 default: /* unknown switch */
3239 return usage( argv0 );
3240 break;
3241 }
3242 }
3243 if ( argc > optind )
3244 {
3245 fprintf( stderr, "Extra, unknown command line argument(s).\n" ) ;
3246 return usage( argv0 );
3247 }
3248 if ( tosuicide )
3249 return suicide();
3250 #ifndef BUILD_NT_SERVICE
3251 if ( isdaemon )
3252 {
3253 #if defined(HAVE_FORK)
3254 if ( ( rc = fork() ) != 0 )
3255 exit(rc < 0);
3256 rc = rxstack_doit();
3257 #else
3258 fprintf( stderr, "Option \"-d\" or \"--daemon\" is invalid on this platform.\n" ) ;
3259 return usage( argv0 );
3260 #endif
3261 }
3262 else
3263 #endif
3264 {
3265 rc = rxstack_doit();
3266 }
3267 rxstack_cleanup();
3268 printf( "%s terminated.\n", argv0 );
3269 fflush(stdout);
3270 return rc;
3271 }
3272
3273 #ifdef BUILD_NT_SERVICE
ServiceStart(DWORD argc,LPTSTR * argv)3274 VOID ServiceStart(DWORD argc, LPTSTR *argv)
3275 #else
3276 int main(int argc, char *argv[])
3277 #endif
3278 {
3279 #ifdef BUILD_NT_SERVICE
3280 if ( IsItNT() )
3281 {
3282 if ( !nt_service_start() )
3283 {
3284 /*
3285 checkDebug();
3286 rxstack_doit();
3287 */
3288 runNormal(argc, argv);
3289 }
3290 rxstack_cleanup();
3291 return;
3292 }
3293 else
3294 {
3295 runNormal(argc, argv);
3296 }
3297 #else
3298 return runNormal( argc, argv );
3299 #endif
3300 }
3301