1 /*****************************************************************************
2 Major portions of this software are copyrighted by the Medical College
3 of Wisconsin, 1994-2000, and are released under the Gnu General Public
4 License, Version 2. See the file README.Copyright for details.
5 ******************************************************************************/
6
7 #include "thd_iochan.h"
8 #include "Amalloc.h"
9 #include <errno.h>
10 #include "niml.h"
11
12 static char *error_string=NULL ; /* 21 Nov 2001 */
13
iochan_error_string(void)14 char *iochan_error_string(void){ return error_string; }
15
16 #ifndef DONT_USE_SHM
17 static int shm_RMID_delay = 0 ; /* 12 Dec 2002 */
18 #endif
19
20 /*****************************************************************
21 Routines to manipulate IOCHANs, something RWCox invented as
22 an abstraction of socket or shmem inter-process communication.
23
24 The function iochan_init creates an IOCHAN, and returns a
25 pointer to it.
26 The function iochan_goodcheck() checks if an IOCHAN is ready
27 (connected at both ends); if not, it will wait until the
28 connection is made, or until a given amount of time has passed.
29 The functions iochan_readcheck() and iochan_writecheck() will
30 check if an IOCHAN is good for I/O, and is ready to accept
31 reads and writes, respectively.
32 The functions iochan_send() and iochan_recv() will transmit
33 and receive messages, respectively. There are also
34 the routines iochan_sendall() and iochan_recvall() that
35 will block until the specified number of bytes is
36 transmitted or received, respectively.
37 The function iochan_ctl() allows control of internal IOCHAN
38 parameters.
39
40 Socket and shmem IOCHANs are not fully symmetric:
41 * Sockets are bidirectional, but shmem IOCHANs can only
42 communicate in one direction.
43 * Sending data to a socket will result in all the data being
44 transmitted, even if the process must block. Sending
45 data to a shmem may result in only part of the data
46 being transmitted (if the buffer space available
47 isn't big enough to hold the whole message).
48
49 Shmem IOCHANs are implemented as a circular buffer with two
50 int indices "bstart" and "bend"; the data between the
51 bstart-th and bend-th bytes of the buffer (inclusive) are
52 ready to be read. Writing to the buffer involves increasing
53 bend, and is done only by the "writing" process. Reading from
54 the buffer involves increasing bstart, and is done only by the
55 "reading" process. That is, the two processes do not write
56 into the same control variables. (Unless they both try to
57 read AND write into the same IOCHAN.)
58
59 June 1997:
60 Shmem IOCHANs now can be bidirectional. This is ordered
61 by using the new "size1+size2" specification.
62 ******************************************************************/
63
64 /*---------------------------------------------------------------*/
65 static int pron = 1 ; /* 22 Nov 2002 */
iochan_enable_perror(int q)66 void iochan_enable_perror( int q ){ pron = q; } /* ditto */
67
68 #undef DEBUG
69 #ifdef DEBUG
70 # define PERROR(x) perror(x)
71 # define STATUS(x) fprintf(stderr,"%s\n",x)
72 #else
73 static char *pqlast = NULL ;
74 static double pqtim = -6.66 ;
75 # define PERROR(x) \
76 do{ if( (x) != NULL && pron ){ \
77 double qtim = COX_clock_time() ; \
78 int skip = ( qtim-pqtim < 3.333 && \
79 pqlast != NULL && strcmp(pqlast,x) == 0 ); \
80 if( !skip ){ \
81 perror(x); pqtim = qtim; \
82 if( pqlast != NULL ) free(pqlast) ; \
83 pqlast = strdup(x) ; \
84 } \
85 }} while(0)
86 # define STATUS(x) /* nada */
87 #endif
88 /*---------------------------------------------------------------*/
89
90 #include <signal.h>
91 static int nosigpipe = 0 ; /* 20 Apr 1997: turn off SIGPIPE signals */
92
93 #define USE_SHUTDOWN
94 #ifdef USE_SHUTDOWN
95 # define CLOSEDOWN(ss) ( shutdown((ss),2) , close((ss)) )
96 #else
97 # define CLOSEDOWN(ss) close((ss))
98 #endif
99
100 /** this is used to set the send/receive buffer size for sockets **/
101
102 #define SOCKET_BUFSIZE (31*1024)
103
104 /********************************************************************
105 Routines to manipulate TCP/IP stream sockets.
106 *********************************************************************/
107
108 /*-------------------------------------------------------------------
109 See if the given socket (sd) is ready to read.
110 msec is the number of milliseconds to wait:
111 zero ==> no waiting
112 < 0 ==> wait until something happens
113
114 Return values are
115 -1 = some error occured (socket closed at other end?)
116 0 = socket is not ready to read
117 1 = socket has data
118 ---------------------------------------------------------------------*/
119
tcp_readcheck(int sd,int msec)120 int tcp_readcheck( int sd , int msec )
121 {
122 int ii ;
123 fd_set rfds ;
124 struct timeval tv , * tvp ;
125
126 if( sd < 0 ){ STATUS("tcp_readcheck: illegal sd") ; return -1 ; } /* bad socket id */
127
128 FD_ZERO(&rfds) ; FD_SET(sd, &rfds) ; /* check only sd */
129
130 if( msec >= 0 ){ /* set timer */
131 tv.tv_sec = msec/1000 ;
132 tv.tv_usec = (msec%1000)*1000 ;
133 tvp = &tv ;
134 } else {
135 tvp = NULL ; /* forever */
136 }
137
138 /** STATUS("tcp_readcheck: call select") ; **/
139
140 ii = select(sd+1, &rfds, NULL, NULL, tvp) ; /* check it */
141 if( ii == -1 ) PERROR( "Socket gone bad? tcp_readcheck[select]" ) ;
142 return ii ;
143 }
144
tcp_writecheck(int sd,int msec)145 int tcp_writecheck( int sd , int msec )
146 {
147 int ii ;
148 fd_set wfds ;
149 struct timeval tv , * tvp ;
150
151 if( sd < 0 ){ STATUS("tcp_writecheck: illegal sd") ; return -1 ; } /* bad socket id */
152
153 FD_ZERO(&wfds) ; FD_SET(sd, &wfds) ; /* check only sd */
154
155 if( msec >= 0 ){ /* set timer */
156 tv.tv_sec = msec/1000 ;
157 tv.tv_usec = (msec%1000)*1000 ;
158 tvp = &tv ;
159 } else {
160 tvp = NULL ; /* forever */
161 }
162
163 /** STATUS("tcp_writecheck: call select") ; **/
164
165 ii = select(sd+1, NULL , &wfds, NULL, tvp) ; /* check it */
166 if( ii == -1 ) PERROR( "Socket gone bad? tcp_writecheck[select]" ) ;
167 return ii ;
168 }
169
170 /*------------------------------------------------------------------------
171 Version of recv() that will try again if interrupted by a signal
172 before any data arrives.
173 --------------------------------------------------------------------------*/
174 #ifdef USE_TCP_RECV
tcp_recv(int s,void * buf,int len,unsigned int flags)175 int tcp_recv( int s , void * buf , int len , unsigned int flags )
176 {
177 int ii ;
178
179 while(1){
180 ii = recv( s , buf , len , flags ) ;
181 if( ii >= 0 ) return ii ;
182 if( errno != EINTR ) return ii ;
183 }
184 return 0 ;
185 }
186 #endif
187
188 /*------------------------------------------------------------------------
189 Set a socket so that it will cutoff quickly when it is closed.
190 --------------------------------------------------------------------------*/
191
tcp_set_cutoff(int sd)192 void tcp_set_cutoff( int sd )
193 {
194 #ifdef SO_LINGER
195 { struct linger lg ;
196 lg.l_onoff = 1 ;
197 lg.l_linger = 0 ;
198 setsockopt(sd, SOL_SOCKET, SO_LINGER, (void *)&lg, sizeof(struct linger)) ;
199 }
200 #endif
201 #ifdef SO_REUSEADDR
202 { int optval = 1;
203 setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)) ;
204 }
205 #endif
206 return ;
207 }
208
209 /*------------------------------------------------------------------------
210 Check if an already active socket is still alive.
211 If it is dead, then readcheck will say we can read, but we
212 won't actually get any bytes when we try (using peek mode).
213 Returns 1 if things are OK, 0 if not.
214 --------------------------------------------------------------------------*/
215
tcp_alivecheck(int sd)216 int tcp_alivecheck( int sd )
217 {
218 int ii ;
219 char bbb[4] ;
220
221 ii = tcp_readcheck(sd,0) ; /* can I read? */
222 if( ii == 0 ) return 1 ; /* can't read is OK */
223 if( ii < 0 ) return 0 ; /* some error is bad */
224 errno = 0 ;
225 ii = tcp_recv( sd , bbb , 1 , MSG_PEEK ) ; /* try to read one byte */
226 if( ii == 1 ) return 1 ; /* if we get it, good */
227 if( errno ) PERROR("Socket gone bad? tcp_alivecheck[tcp_recv]") ;
228 return 0 ; /* no data ==> death! */
229 }
230
231 /*------------------------------------------------------------------------
232 Open a socket to the given host, to the given TCP port.
233 Returns socket id; if -1, some error occured (e.g., nobody listening).
234 --------------------------------------------------------------------------*/
235
tcp_connect(char * host,int port)236 int tcp_connect( char * host , int port )
237 {
238 int sd , l ;
239 struct sockaddr_in sin ;
240 struct hostent * hostp ;
241
242 if( host == NULL || port < 1 ){ STATUS("tcp_connect: illegal inputs") ; return -1 ; }
243
244 /** open a socket **/
245
246 sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
247 if( sd == -1 ){ PERROR("Can't create? tcp_connect[socket]"); return -1; }
248
249 /** set socket options (no delays, large buffers) **/
250
251 #if 0
252 l = 1;
253 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
254 #endif
255 l = SOCKET_BUFSIZE ;
256 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
257 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
258
259 /** set port on remote computer **/
260
261 memset( &sin , 0 , sizeof(sin) ) ;
262 sin.sin_family = AF_INET ;
263 sin.sin_port = htons(port) ;
264
265 /** set remote computer **/
266
267 hostp = gethostbyname(host) ;
268 if( hostp == NULL ){
269 PERROR("Can't lookup? tcp_connect[gethostbyname]"); CLOSEDOWN(sd); return -1;
270 }
271 sin.sin_addr.s_addr = ((struct in_addr *)(hostp->h_addr))->s_addr ;
272
273 if( connect(sd , (struct sockaddr *)&sin , sizeof(sin)) == -1 ){
274 PERROR("Can't connect? tcp_connect[connect]") ; CLOSEDOWN(sd); return -1;
275 }
276
277 return sd ;
278 }
279
280 static int tcp_listen_mute = 0;
281
set_tcp_listen_mute(int v)282 void set_tcp_listen_mute(int v) {
283 tcp_listen_mute = v;
284 }
285
get_tcp_listen_mute(void)286 int get_tcp_listen_mute(void) {
287 return(tcp_listen_mute);
288 }
289
290
291 /*--------------------------------------------------------------------------
292 Set up to listen for a connection on a given port. This does not
293 actually form the connection. That must be done separately.
294 Whether someone is trying to connect can be checked for with the routine
295 "tcp_readcheck" and then accepted with "tcp_accept".
296
297 The return value is the descriptor for the listening socket.
298 ----------------------------------------------------------------------------*/
299
tcp_listen(int port)300 int tcp_listen( int port )
301 {
302 static int nobindmsg;
303 int sd , l ;
304 struct sockaddr_in sin ;
305 char serr[128]={""}; /*ZSS June 2011*/
306
307 if( port < 1 ){ STATUS("tcp_listen: illegal port") ; return -1 ; }
308
309 /** open a socket **/
310
311 sd = socket( AF_INET , SOCK_STREAM , 0 ) ;
312 if( sd == -1 ){
313 sprintf(serr,"Can't create? (socket): (Name %s, Port %d)",
314 get_port_numbered(port), port);
315 PERROR(serr); return -1;
316 }
317
318 /** set socket options (no delays, large buffers) **/
319
320 #if 0
321 l = 1;
322 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (void *)&l, sizeof(int)) ;
323 #endif
324 l = SOCKET_BUFSIZE ;
325 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (void *)&l, sizeof(int)) ;
326 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void *)&l, sizeof(int)) ;
327
328 /** set port on remote computer **/
329
330 memset( &sin , 0 , sizeof(sin) ) ;
331 sin.sin_family = AF_INET ;
332 sin.sin_port = htons(port) ;
333 sin.sin_addr.s_addr = INADDR_ANY ; /* reader reads from anybody */
334
335 if( bind(sd , (struct sockaddr *)&sin , sizeof(sin)) == -1 ){
336 if (!tcp_listen_mute &&
337 !(nobindmsg % 10000)) { /* slow message printing down! ZSS */
338 sprintf(serr,"\nCan't bind? tcp_listen[bind] (Name %s, Port %d, sd %d)",
339 get_port_numbered(port), port,sd);
340 PERROR(serr);
341 nobindmsg = 0;
342 }
343 ++nobindmsg;
344 CLOSEDOWN(sd); return -1;
345 }
346
347 if( listen(sd,1) == -1 ){
348 if (!tcp_listen_mute) {
349 sprintf(serr,"Can't listen? tcp_listen[listen] (Name %s, Port %d)",
350 get_port_numbered(port), port);
351 }
352 PERROR(serr); CLOSEDOWN(sd); return -1;
353 }
354
355 return sd ;
356 }
357
358 /*--------------------------------------------------------------------------
359 Accept incoming connection on a socket. Return value is the attached
360 socket (which is not the original socket!). If -1 is returned, some
361 error occured. If the accept works, then the original socket is
362 still open and listening for further attachments.
363
364 If hostname is not NULL, then the char * it points to will be filled
365 with a pointer to the official name of the host that connected.
366
367 If hostaddr is not NULL, then the char * it points to will be filled
368 with a pointer to the Internet address (in 'dot' form) of the host that
369 connected.
370
371 Both the char * pointers returned are from malloc, and should be free-d
372 when no longer needed. If they aren't needed at all, just pass in NULL
373 for these arguments.
374
375 Note that this routine will block until somebody connects. You can
376 use tcp_readcheck(sd,0) to see if anyone is waiting to connect before
377 calling this routine.
378 ----------------------------------------------------------------------------*/
379
tcp_accept(int sd,char ** hostname,char ** hostaddr)380 int tcp_accept( int sd , char ** hostname , char ** hostaddr )
381 {
382 struct sockaddr_in pin ;
383 int sd_new ; socklen_t addrlen ;
384 struct hostent * hostp ;
385 char * sout , * str ;
386
387 /** accept the connection **/
388
389 /** STATUS("tcp_accept: about to call accept") ; **/
390
391 addrlen = sizeof(pin) ;
392 sd_new = accept( sd , (struct sockaddr *)&pin , &addrlen ) ;
393 if( sd_new == -1 ){ PERROR("Can't accept? tcp_accept[accept]"); return -1; }
394
395 /** get name of connector **/
396
397 if( hostname != NULL ){
398 hostp = gethostbyaddr( (char *) (&pin.sin_addr) ,
399 sizeof(struct in_addr) , AF_INET ) ;
400 if( hostp != NULL ){
401 sout = (char *) malloc( strlen(hostp->h_name)+1 ) ;
402 strcpy(sout,hostp->h_name) ;
403 } else {
404 sout = (char *) malloc( strlen("UNKNOWN")+1 ) ;
405 strcpy(sout,"UNKNOWN") ;
406 }
407 *hostname = sout ;
408 }
409
410 /** get address of connector **/
411
412 if( hostaddr != NULL ){
413 str = inet_ntoa( pin.sin_addr ) ;
414 sout = (char *) malloc( strlen(str)+1 ) ;
415 strcpy(sout,str) ;
416 *hostaddr = sout ;
417 }
418
419 return sd_new ;
420 }
421
422 /****************************************************************
423 Routines to manipulate IPC shared memory segments
424 *****************************************************************/
425
426 /*---------------------------------------------------------------
427 Convert a string to a key, for IPC operations.
428 Augment sum by port offset value (-np option)
429 -----------------------------------------------------------------*/
430
string_to_key(char * key_string)431 key_t string_to_key( char * key_string )
432 {
433 int ii , sum = get_user_np();
434
435 sum += 666;
436 if( key_string == NULL ) return (key_t) sum ;
437
438 for( ii=0 ; key_string[ii] != '\0' ; ii++ )
439 sum += ((int)key_string[ii]) << ((ii%3)*8) ;
440
441 if( sum < 0 ) sum = -sum ;
442 else if( sum == 0 ) sum = 314159265 ;
443
444 return (key_t) sum ;
445 }
446
447 #ifndef DONT_USE_SHM
448 /*---------------------------------------------------------------
449 Get a pre-existing shmem segment.
450 Returns the shmid >= 0 if successful; returns -1 if failure.
451 -----------------------------------------------------------------*/
452
shm_accept(char * key_string)453 int shm_accept( char * key_string )
454 {
455 key_t key ;
456 int shmid ;
457
458 key = string_to_key( key_string ) ;
459 shmid = shmget( key , 0 , 0777 ) ;
460 return shmid ;
461 }
462
463 /*---------------------------------------------------------------
464 Connect to, or create if needed, a shmem segment.
465 Returns the shmid >= 0 if successful; returns -1 if failure.
466 -----------------------------------------------------------------*/
467
shm_create(char * key_string,int size)468 int shm_create( char * key_string , int size )
469 {
470 key_t key ;
471 int shmid ;
472
473 key = string_to_key( key_string ) ;
474 shmid = shmget( key , size , 0777 | IPC_CREAT ) ;
475 if( shmid < 0 ){
476 PERROR("Can't create? shm_create[shmget]") ;
477 if( pron ) fprintf(stderr,"key_string=%s key=%d size=%d\n",
478 key_string , (int)key , size ) ;
479 }
480 return shmid ;
481 }
482
483 /*---------------------------------------------------------------
484 Actually attach to the shmem segment.
485 Returns the pointer to the segment start.
486 NULL is returned if an error occurs.
487 -----------------------------------------------------------------*/
488
shm_attach(int shmid)489 char * shm_attach( int shmid )
490 {
491 char * adr ;
492 adr = (char *) shmat( shmid , NULL , 0 ) ;
493 if( adr == (char *) -1 ){
494 adr = NULL ; PERROR("Can't attach? shm_attach[shmat]") ;
495 }
496 return adr ;
497 }
498
499 /*---------------------------------------------------------------
500 Find the size of a shmem segment.
501 Returns -1 if an error occurs.
502 -----------------------------------------------------------------*/
503
shm_size(int shmid)504 int shm_size( int shmid )
505 {
506 int ii ;
507 struct shmid_ds buf ;
508
509 if( shmid < 0 ) return -1 ;
510 ii = shmctl( shmid , IPC_STAT , &buf ) ;
511 if( ii < 0 ){ PERROR("Can't check? shm_size[shmctl]"); return -1; }
512 return buf.shm_segsz ;
513 }
514
515 /*----------------------------------------------------------------
516 Find the number of attaches to a shmem segment.
517 Returns -1 if an error occurs (like the segment was destroyed).
518 ------------------------------------------------------------------*/
519
shm_nattach(int shmid)520 int shm_nattach( int shmid )
521 {
522 int ii ;
523 struct shmid_ds buf ;
524
525 if( shmid < 0 ){ STATUS("shm_nattach: illegal shmid") ; return -1 ; }
526 errno = 0 ;
527 ii = shmctl( shmid , IPC_STAT , &buf ) ;
528 if( ii < 0 ){
529 PERROR("Has shared memory buffer gone bad? shm_nattach[shmctl]") ;
530 return -1 ;
531 }
532 return buf.shm_nattch ;
533 }
534
535 #else /** dummy functions, if SysV IPC shared memory isn't available */
536
shm_nattach(int shmid)537 int shm_nattach( int shmid ) { return -1 ; }
shm_size(int shmid)538 int shm_size ( int shmid ) { return -1 ; }
shm_attach(int shmid)539 char * shm_attach ( int shmid ) { return NULL;}
shm_create(char * key_string,int size)540 int shm_create ( char * key_string , int size ){ return -1 ; }
shm_accept(char * key_string)541 int shm_accept ( char * key_string ) { return -1 ; }
542
543 #endif /* DONT_USE_SHM */
544
545 /****************************************************************
546 The IOCHAN routines, which call the tcp_ or shm_ routines,
547 as needed.
548 *****************************************************************/
549
550 /*---------------------------------------------------------------
551 Create an IOCHAN struct, and return a pointer to it. NULL is
552 returned if an error occurs.
553
554 name = "tcp:host:port" to connect a socket to system "host"
555 on the given port.
556 = "shm:name:size" to connect a shared memory segment
557 with the given name and size in bytes (size can
558 end with K or M to denote kilobytes or megabytes).
559 With this mode, the creator and acceptor processes
560 cannot both read and write -- one of them must
561 be the writing process (normally the creator),
562 and one must be the reading process (normally
563 the acceptor).
564 = "shm:name:size1+size2" to connect a shared memory
565 segment with buffers of length size1 and size2.
566 The creator process will write to the size1 buffer
567 and read from the size2 buffer. The acceptor
568 process will reverse this.
569
570 mode = "create" to open a new channel
571 * tcp: host must be specified
572 * shm: size must be > 0
573 (if in the form size1+size2, both must be > 0)
574 = "accept" to log into a channel created by someone else
575 * tcp: host is ignored (but must be present)
576 * shm: size is ignored (but must be present)
577 (similarly, size1+size2 must be present, but
578 their actual values are ignored)
579
580 The inputs "host" (for tcp:) and "name" (for shm:) are limited
581 to a maximum of 127 bytes.
582
583 After an tcp: accept IOCHAN is good, then the string ioc->name
584 contains the IP address of the connecting host, in "dot" form
585 (e.g., "201.201.201.201"); here, "ioc" is the IOCHAN * returned
586 by this routine.
587 -----------------------------------------------------------------*/
588
iochan_init(char * name,char * mode)589 IOCHAN * iochan_init( char * name , char * mode )
590 {
591 IOCHAN * ioc ;
592 int do_create , do_accept ;
593
594 /** 12 Dec 2002: check if shm_RMID_delay needs to be set **/
595
596 #ifndef DONT_USE_SHM
597 { static int first=1 ;
598 if( first ){
599 char *eee = getenv("IOCHAN_DELAY_RMID") ;
600 shm_RMID_delay = ( eee != NULL && (*eee=='Y' || *eee=='y') ) ;
601 first = 0 ;
602 }
603 }
604 #endif
605
606 /** check if inputs are reasonable **/
607
608 error_string = NULL ;
609
610 if( name == NULL || strlen(name) < 6 || strlen(name) > 127 ){
611 error_string = "iochan_init: bad name" ; return NULL ;
612 }
613
614 if( mode == NULL ){
615 error_string = "iochan_init: bad mode" ; return NULL ;
616 }
617
618 do_create = (strcmp(mode,"create") == 0 || strcmp(mode,"w") == 0) ;
619 do_accept = (strcmp(mode,"accept") == 0 || strcmp(mode,"r") == 0) ;
620
621 if( !do_create && !do_accept ){
622 error_string = "iochan_init: bad mode" ; return NULL ;
623 }
624
625 #ifdef DEBUG
626 fprintf(stderr,"iochan_init: name=%s mode=%s\n",name,mode) ;
627 #endif
628
629 /***** deal with TCP/IP sockets *****/
630
631 if( strncmp(name,"tcp:",4) == 0 ){
632 char host[128] , * hend ;
633 int port=-1 , ii , jj ;
634
635 /** find "host" substring **/
636
637 hend = strstr( name+4 , ":" ) ;
638 if( hend == NULL || hend-name > 128 ){
639 error_string = "iochan_init: bad name" ; return NULL ;
640 }
641 for( ii=4 ; name[ii] != ':' ; ii++ ) host[ii-4] = name[ii] ;
642 host[ii-4] = '\0' ;
643
644 /** get "port" number **/
645
646 port = strtol( name+ii+1 , NULL , 10 ) ;
647 if( port <= 0 ){
648 error_string = "iochan_init: bad port" ; return NULL ;
649 }
650
651 /** initialize IOCHAN **/
652
653 /* from malloc 12 Feb 2009 [lesstif patrol] */
654 ioc = (IOCHAN *) calloc( 1, sizeof(IOCHAN) ) ;
655
656 ioc->type = TCP_IOCHAN ; /* what kind is this? */
657 ioc->port = port ; /* save the port # */
658 ioc->bufsize = 0 ; /* TCP has no buffer */
659 ioc->buf = NULL ;
660 ioc->sendsize = 0 ; /* no upper limit */
661 ioc->ioc2 = NULL ; /* TCP has no second channel */
662
663 /** attach to incoming call **/
664
665 if( do_accept ){
666 ioc->whoami = ACCEPTOR ; /* 24 June 1997 */
667 ioc->id = tcp_listen( port ) ; /* set up to listen */
668 if( ioc->id < 0 ){ /* error? must die! */
669 error_string = "iochan_init: tcp_listen fails" ;
670 free(ioc) ; return NULL ;
671 }
672 ioc->bad = TCP_WAIT_ACCEPT ; /* not connected yet */
673 ii = tcp_readcheck(ioc->id,1) ; /* see if ready */
674 if( ii > 0 ){ /* if socket ready */
675 jj = tcp_accept( ioc->id , NULL,&hend ) ; /* accept connection */
676 if( jj >= 0 ){ /* if accept worked */
677 CLOSEDOWN( ioc->id ) ; /* close old socket */
678 strcpy( ioc->name , hend ) ; /* put IP into name */
679 free(hend) ; ioc->bad = 0 ; ioc->id = jj ; /* and ready to go! */
680 }
681 }
682 return ioc ;
683 }
684
685 /** place an outgoing call **/
686
687 if( do_create ){
688 struct hostent * hostp ;
689 ioc->whoami = CREATOR ; /* 24 June 1997 */
690 hostp = gethostbyname(host) ; /* lookup host on net */
691 if( hostp == NULL ){ /* fails? must die! */
692 error_string = "iochan_init: gethostbyname fails" ;
693 free(ioc) ; return NULL ;
694 }
695 ioc->id = tcp_connect( host , port ) ; /* connect to host */
696 ioc->bad = (ioc->id < 0) ? TCP_WAIT_CONNECT : 0 ; /* fails? must wait */
697 strcpy( ioc->name , host ) ; /* save the host name */
698 return ioc ;
699 }
700 return NULL ; /* should never be reached */
701 }
702
703 /***** deal with shared memory segments *****/
704
705 if( strncmp(name,"shm:",4) == 0 ){
706 char key[128] , * kend , shm2[256] ;
707 int size=-1 , ii , jj , size2=-1 ;
708
709 #ifdef DONT_USE_SHM
710 return NULL ; /* 18 Dec 2002 */
711 #endif
712
713 /** get keystring **/
714
715 kend = strstr( name+4 , ":" ) ;
716 if( kend == NULL || kend-name > 128 ){
717 error_string = "iochan_init: bad name" ; return NULL ;
718 }
719 for( ii=4 ; name[ii] != ':' ; ii++ ) key[ii-4] = name[ii] ;
720 key[ii-4] = '\0' ;
721
722
723 /** get size **/
724
725 size = strtol( name+ii+1 , &kend , 10 ) ;
726 if( size < 0 || (size == 0 && do_create) ){
727 error_string = "iochan_init: bad size" ; return NULL ;
728 }
729 if( *kend == 'K' || *kend == 'k' ){ size *= 1024 ; kend++ ; }
730 else if( *kend == 'M' || *kend == 'm' ){ size *= 1024*1024 ; kend++ ; }
731
732 /** 24 June 1997: get second size **/
733
734 if( *kend == '+' ){
735 size2 = strtol( kend+1 , &kend , 10 ) ;
736 if( size2 < 0 || (size2 == 0 && do_create) ){
737 error_string = "iochan_init: bad size2" ; return NULL ;
738 }
739 if( *kend == 'K' || *kend == 'k' ){ size2 *= 1024 ; kend++ ; }
740 else if( *kend == 'M' || *kend == 'm' ){ size2 *= 1024*1024 ; kend++ ; }
741
742 sprintf(shm2,"shm:%s++:%d",key,size2) ; /* second channel spec */
743 } else {
744 shm2[0] = '\0' ; /* no second channel */
745 }
746
747 /** initialize IOCHAN **/
748
749 /* from malloc 12 Feb 2009 [lesstif patrol] */
750 ioc = (IOCHAN *) calloc( 1, sizeof(IOCHAN) ) ;
751
752 ioc->type = SHM_IOCHAN ; /* what type is this? */
753 strcpy( ioc->name , key ) ; /* save the key name */
754 ioc->ioc2 = NULL ; /* maybe reset below? */
755
756 /** open the second channel, if any **/
757
758 if( shm2[0] != '\0' ){
759 ioc->ioc2 = iochan_init( shm2 , mode ) ;
760 if( ioc->ioc2 == NULL ){
761 error_string = "iochan_init: can't open shm2" ;
762 free(ioc) ; return NULL ;
763 }
764 #ifdef DEBUG
765 fprintf(stderr,"iochan_init: input=%s shm2=%s\n",name,shm2) ;
766 #endif
767 }
768
769 /** attach to existing shmem segment **/
770
771 if( do_accept ){
772 ioc->whoami = ACCEPTOR ; /* 24 June 1997 */
773 for( ii=0 ; ii < 2 ; ii++ ){ /* try to find segment */
774 ioc->id = shm_accept( key ) ; /* several times */
775 if( ioc->id >= 0 ) break ; /* works? break out */
776 iochan_sleep(1) ; /* wait 1 millisecond */
777 }
778 if( ioc->id < 0 ) ioc->id = shm_accept( key ) ; /* 1 last try? */
779
780 if( ioc->id < 0 ){ /* failed to find segment? */
781 ioc->bad = SHM_WAIT_CREATE ; /* mark for waiting */
782
783 } else { /* found it? */
784 char * bbb ;
785 bbb = shm_attach( ioc->id ) ; /* attach it */
786 if( bbb == NULL ){ /* can't? quit */
787 error_string = "iochan_init: shm_attach fails" ;
788 iochan_close(ioc) ; return NULL ;
789 }
790 ioc->bstart = (int *) bbb ; /* set start, */
791 ioc->bend = (int *) (bbb + sizeof(int)) ; /* end markers */
792 ioc->buf = bbb + 2*sizeof(int) ; /* after markers */
793 ioc->bufsize = shm_size(ioc->id) - 2*sizeof(int) ; /* get its size */
794 if( ioc->bufsize <= 0 ){ /* can't? quit */
795 error_string = "iochan_init: bufsize < 0" ;
796 iochan_close(ioc) ; return NULL ;
797 }
798 ioc->bad = 0 ; /* mark ready */
799 }
800 return ioc ;
801 }
802
803 /** make a new shmem segment **/
804
805 if( do_create ){
806 char * bbb ;
807 ioc->whoami = CREATOR ; /* 24 June 1997*/
808 size = size + 1 ; /* extra byte */
809 ioc->id = shm_create( key , size+2*sizeof(int) ) ; /* create it */
810 if( ioc->id < 0 ){ /* can't? quit */
811 error_string = "iochan_init: shm_create fails" ;
812 iochan_close(ioc->ioc2) ; free(ioc) ; return NULL ;
813 }
814 bbb = shm_attach( ioc->id ) ; /* attach it */
815 if( bbb == NULL ){ /* can't? quit */
816 error_string = "iochan_init: shm_attach fails" ;
817 iochan_close(ioc) ; free(ioc) ; return NULL ;
818 }
819 ioc->bstart = (int *) bbb ; /* init start, */
820 ioc->bend = (int *) (bbb + sizeof(int)) ; /* end markers */
821 *(ioc->bstart) = 0 ;
822 *(ioc->bend) = size-1 ;
823 ioc->buf = bbb + 2*sizeof(int) ; /* I/O buffer */
824 ioc->bufsize = size ; /* buffer size */
825 ioc->bad = (shm_nattach(ioc->id) < 2) /* ready if */
826 ? SHM_WAIT_ACCEPT /* both are */
827 : 0 ; /* attached */
828 return ioc ;
829 }
830 return NULL ; /* should never be reached */
831 }
832
833 return NULL ; /* should never be reached */
834 }
835
836 /*-------------------------------------------------------------------------
837 Check if the shmem segment is alive (has 2 attached processes).
838 Returns 0 if not alive, 1 if life is happy.
839 ---------------------------------------------------------------------------*/
840
shm_alivecheck(int shmid)841 int shm_alivecheck( int shmid )
842 {
843 if( shmid < 0 ) return 0 ;
844 return (shm_nattach(shmid) >= 2) ;
845 }
846
847 /*-------------------------------------------------------------------------
848 Check if the given IOCHAN is ready for I/O. If not, wait up to
849 msec milliseconds to establish the connection to the other end;
850 if msec < 0, will wait indefinitely. Returns 1 if ready; 0 if not;
851 -1 if an error occurs. Possible errors are:
852 + ioc was connected, and now has become disconnected
853 + ioc is passed in as NULL
854 ---------------------------------------------------------------------------*/
855
iochan_goodcheck(IOCHAN * ioc,int msec)856 int iochan_goodcheck( IOCHAN * ioc , int msec )
857 {
858 int ii , jj ;
859 char * bbb ;
860
861 /** check inputs for OK-osity **/
862
863 error_string = NULL ;
864
865 if( ioc == NULL ){
866 error_string = "iochan_goodcheck: bad input" ; return -1 ;
867 }
868
869 /** if it was good before, then check if it is still good **/
870
871 if( IOC_BAD(ioc) == 0 ){
872 int ich = 1 ;
873
874 if( ioc->type == TCP_IOCHAN ){
875 ich = tcp_alivecheck(ioc->id) ;
876 } else if( ioc->type == SHM_IOCHAN ){
877 ich = shm_alivecheck(ioc->id) ;
878 if( ich && ioc->ioc2 != NULL )
879 ich = shm_alivecheck(ioc->ioc2->id) ;
880 }
881
882 if( ich == 0 ){
883 error_string = "iochan_goodcheck: no longer alive" ; return -1 ;
884 }
885 else
886 return 1 ;
887 }
888
889 /** wasn't good before, so check if that condition has changed **/
890
891 /** TCP/IP waiting to accept call from another host **/
892
893 if( ioc->bad == TCP_WAIT_ACCEPT ){
894 ii = tcp_readcheck(ioc->id,msec) ; /* see if ready */
895 if( ii > 0 ){ /* if socket ready */
896 STATUS("iochan_goodcheck: try to accept tcp");
897 jj = tcp_accept( ioc->id , NULL,&bbb ) ; /* accept connection */
898 if( jj >= 0 ){ /* if accept worked */
899 STATUS("iochan_goodcheck: accept worked!") ;
900 CLOSEDOWN( ioc->id ) ; /* close old socket */
901 strcpy( ioc->name , bbb ) ; /* put IP into name */
902 free(bbb) ; ioc->bad = 0 ; ioc->id = jj ; /* and ready to go! */
903 } else {
904 STATUS("iochan_goodcheck: accept failed!") ;
905 }
906 }
907 }
908
909 /** TCP/IP waiting to connect call to another host **/
910
911 else if( ioc->bad == TCP_WAIT_CONNECT ){
912 int dms=0 , ms ;
913
914 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
915 for( ms=0 ; ms < msec ; ms += dms ){
916 ioc->id = tcp_connect( ioc->name , ioc->port ) ; /* try to connect to host */
917 if( ioc->id >= 0 ) break ; /* worked? break out */
918 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
919 }
920 if( ioc->id < 0 ) /* one last try? */
921 ioc->id = tcp_connect( ioc->name , ioc->port ) ;
922
923 if( ioc->id >= 0 ) ioc->bad = 0 ; /* succeeded? */
924 }
925
926 /** shmem segment waiting for creation (by someone else) **/
927
928 else if( ioc->bad == SHM_WAIT_CREATE ){
929 int dms=0 , ms ;
930
931 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
932 for( ms=0 ; ms < msec ; ms += dms ){
933 ioc->id = shm_accept( ioc->name ) ; /* try to attach to shmem segment */
934 if( ioc->id >= 0 ) break ; /* works? break out */
935 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
936 }
937 if( ioc->id < 0 ) /* one last try? */
938 ioc->id = shm_accept( ioc->name ) ;
939
940 if( ioc->id >= 0 ){ /* found it? */
941 char * bbb ;
942 bbb = shm_attach( ioc->id ) ; /* attach it */
943 ioc->bstart = (int *) bbb ; /* set start, */
944 ioc->bend = (int *) (bbb + sizeof(int)) ; /* end markers */
945 ioc->buf = bbb + 2*sizeof(int) ; /* after markers */
946 ioc->bufsize = shm_size(ioc->id) - 2*sizeof(int) ; /* get its size */
947 ioc->bad = 0 ; /* mark ready */
948 }
949 }
950
951 /** shmem segment we created waiting for someone else to attach **/
952
953 else if( ioc->bad == SHM_WAIT_ACCEPT ){
954 int dms=0 , ms ;
955
956 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
957 for( ms=0 ; ms < msec ; ms += dms ){
958 if( shm_nattach(ioc->id) > 1 ){ ioc->bad = 0 ; break ; }
959 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
960 }
961 if( ioc->bad && shm_nattach(ioc->id) > 1 ) ioc->bad = 0 ;
962 }
963
964 /** if there is a second channel, check it too **/
965
966 if( ioc->ioc2 != NULL && ioc->ioc2->bad != 0 )
967 iochan_goodcheck( ioc->ioc2 , msec ) ;
968
969 return ( IOC_BAD(ioc) == 0 ) ;
970 }
971
972 /*-----------------------------------------------------------------------
973 Close an IOCHAN. Note that this will free what ioc points to.
974 Use the IOCHAN_CLOSE macro to also set the pointer "ioc" to NULL.
975 -------------------------------------------------------------------------*/
976
iochan_close(IOCHAN * ioc)977 void iochan_close( IOCHAN * ioc )
978 {
979 if( ioc == NULL ) return ;
980
981 if( ioc->ioc2 != NULL ) iochan_close(ioc->ioc2) ;
982
983 if( ioc->type == TCP_IOCHAN ){
984 if( ioc->id >= 0 ) CLOSEDOWN(ioc->id) ;
985 }
986
987 else if( ioc->type == SHM_IOCHAN ){
988 #ifndef DONT_USE_SHM
989 if( ioc->id >= 0 ){
990 shmdt( (char *) ioc->bstart ) ; /* detach */
991 /* then kill */
992 if( !shm_RMID_delay || shm_nattach(ioc->id) < 1 )
993 shmctl( ioc->id , IPC_RMID , NULL ) ;
994 }
995 #endif
996 }
997
998 free( ioc ) ; return ;
999 }
1000
iochan_set_cutoff(IOCHAN * ioc)1001 void iochan_set_cutoff( IOCHAN * ioc )
1002 {
1003 if( ioc == NULL ) return ;
1004
1005 if( ioc->type == TCP_IOCHAN && ioc->id >= 0 ) tcp_set_cutoff( ioc->id ) ;
1006 return ;
1007 }
1008
1009 /*---------------------------------------------------------------------------
1010 Check if the IOCHAN is ready to have data read out of it.
1011 If not, the routine will wait up to msec milliseconds for data to be
1012 available. If msec < 0, this routine will wait indefinitely.
1013 For sockets, the return value is 1 if data is ready, 0 if not
1014 (for sockets, no indication of how much can be read is available).
1015 For shmem segments, the return value is how many bytes can be
1016 read (0 if none are available).
1017 -1 will be returned if some unrecoverable error is detected.
1018 -----------------------------------------------------------------------------*/
1019
iochan_readcheck(IOCHAN * ioc,int msec)1020 int iochan_readcheck( IOCHAN * ioc , int msec )
1021 {
1022 int ii ;
1023
1024 /** check if the IOCHAN is good **/
1025
1026 error_string = NULL ;
1027
1028 ii = iochan_goodcheck(ioc,0) ;
1029 if( ii == -1 ) return -1 ; /* some error */
1030 if( ii == 0 ){ /* not good yet */
1031 ii = iochan_goodcheck(ioc,msec) ; /* so wait for it to get good */
1032 if( ii != 1 ) return 0 ; /* if still not good, exit */
1033 }
1034
1035 /** tcp: ==> just use the Unix "select" mechanism **/
1036
1037 if( ioc->type == TCP_IOCHAN ){
1038 ii = tcp_alivecheck( ioc->id ) ; if( !ii ) return -1 ;
1039 ii = tcp_readcheck( ioc->id , msec ) ;
1040 if( ii < 0 ) error_string = "iochan_readcheck: socket is bad" ;
1041 return ii ;
1042 }
1043
1044 /** shm: ==> must loop and wait ourselves **/
1045
1046 if( ioc->type == SHM_IOCHAN ){
1047 int nread , dms=0 , ms ;
1048
1049 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
1050
1051 /** Compute the number of readable bytes into nread. This routine
1052 should be called by the "reading" process. It will then
1053 be waiting until the "writing" process increments ioc->bend. **/
1054
1055 ioc = SHMIOC_READ(ioc) ; /* 24 June 1997 */
1056
1057 for( ms=0 ; ms < msec ; ms += dms ){
1058 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1059 if( nread > 0 ) return nread ;
1060 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
1061 ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
1062 }
1063 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1064 if( nread > 0 ) return nread ;
1065 return 0 ;
1066 }
1067
1068 return -1 ; /* should never be reached */
1069 }
1070
1071 /*---------------------------------------------------------------------------
1072 Read and discard data from the IOCHAN until it is clear.
1073 Returns -1 if an error occurs, otherwise returns the number
1074 of bytes discarded (which may be 0).
1075 -----------------------------------------------------------------------------*/
1076
1077 #define QBUF 1024
1078
iochan_force_clear(IOCHAN * ioc)1079 int iochan_force_clear( IOCHAN * ioc )
1080 {
1081 int ii , ntot = 0 ;
1082 char qbuf[QBUF] ;
1083
1084 do{
1085 ii = iochan_readcheck(ioc,0) ;
1086 if( ii == -1 ) return -1 ;
1087 if( ii == 0 ) return ntot ;
1088
1089 ii = iochan_recv( ioc , qbuf , QBUF ) ;
1090 if( ii == -1 ) return -1 ;
1091 ntot += ii ;
1092
1093 } while( 1 ) ; /** loop until readcheck says no data available **/
1094
1095 return -1 ; /* should not be reached */
1096 }
1097
1098 /*---------------------------------------------------------------------------
1099 Check if the IOCHAN is clear -- that is, if the data sent to it
1100 has already been read. This is logically impossible for a TCP
1101 channel, so will normally only be used for shmem segments. It
1102 would be called by the writer process to see if the reader process
1103 has cleared the data all out.
1104 Will return 1 if clear, 0 if not clear, and -1 if some error occurs.
1105 -----------------------------------------------------------------------------*/
1106
iochan_clearcheck(IOCHAN * ioc,int msec)1107 int iochan_clearcheck( IOCHAN * ioc , int msec )
1108 {
1109 int ii ;
1110
1111 /** check if the IOCHAN is good **/
1112
1113 error_string = NULL ;
1114
1115 ii = iochan_goodcheck(ioc,0) ;
1116 if( ii == -1 ) return -1 ; /* some error */
1117 if( ii == 0 ) return 1 ; /* not good yet, so can be no data */
1118
1119 /** tcp: ==> use the Unix "select" mechanism **/
1120
1121 if( ioc->type == TCP_IOCHAN ) return ( tcp_readcheck(ioc->id,msec) == 0 ) ;
1122
1123 /** shm: ==> must loop and wait ourselves **/
1124
1125 if( ioc->type == SHM_IOCHAN ){
1126 int nread , dms=0 , ms ;
1127
1128 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
1129
1130 ioc = SHMIOC_WRITE(ioc) ; /* 24 June 1997 */
1131
1132 for( ms=0 ; ms < msec ; ms += dms ){
1133 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1134 if( nread == 0 ) return 1 ;
1135 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
1136 ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
1137 }
1138 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1139 return (nread == 0) ;
1140 }
1141
1142 return -1 ; /* should never be reached */
1143 }
1144
1145 /*---------------------------------------------------------------------------
1146 Check if the IOCHAN is ready to have data written into it.
1147 If not, the routine will wait up to msec milliseconds for writing to
1148 be allowable. If msec < 0, this routine will wait indefinitely.
1149 For sockets, the return value is 1 if data can be sent, 0 if not
1150 (for sockets, no indication is given of how much data can be sent).
1151 For shmem segments, the return value is the number of bytes that
1152 can be sent (0 if none, positive if some).
1153 -1 will be returned if some unrecoverable error is detected.
1154 -----------------------------------------------------------------------------*/
1155
iochan_writecheck(IOCHAN * ioc,int msec)1156 int iochan_writecheck( IOCHAN * ioc , int msec )
1157 {
1158 int ii ;
1159
1160 /** check if the IOCHAN is good **/
1161
1162 error_string = NULL ;
1163
1164 ii = iochan_goodcheck(ioc,0) ;
1165 if( ii == -1 ) return -1 ; /* some error */
1166 if( ii == 0 ){ /* not good yet */
1167 ii = iochan_goodcheck(ioc,msec) ; /* so wait for it to get good */
1168 if( ii != 1 ) return ii ; /* if still not good, exit */
1169 }
1170
1171 /** tcp: ==> just use the Unix "select" mechanism **/
1172
1173 if( ioc->type == TCP_IOCHAN ){
1174 ii = tcp_writecheck( ioc->id , msec ) ;
1175 if( ii == -1 ) error_string = "iochan_writecheck: socket not ready" ;
1176 return ii ;
1177 }
1178
1179 /** shm: ==> must loop and wait ourselves **/
1180
1181 if( ioc->type == SHM_IOCHAN ){
1182 int nread , dms=0 , ms , nwrite ;
1183
1184 if( msec < 0 ) msec = 999999999 ; /* a long time (11+ days) */
1185
1186 ioc = SHMIOC_WRITE(ioc) ; /* 24 June 1997 */
1187
1188 /** This routine is called by the "writing" process. It will
1189 wait until the reading process increments ioc->bstart. **/
1190
1191 for( ms=0 ; ms < msec ; ms += dms ){
1192 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1193 nwrite = ioc->bufsize - 1 - nread ;
1194 if( nwrite > 0 ) return nwrite ;
1195 dms = NEXTDMS(dms) ; dms = MIN(dms,msec-ms) ; iochan_sleep(dms) ;
1196 ii = iochan_goodcheck(ioc,0) ; if( ii == -1 ) return -1 ;
1197 }
1198 nread = (*(ioc->bend) - *(ioc->bstart) + ioc->bufsize + 1) % (ioc->bufsize) ;
1199 nwrite = ioc->bufsize - 1 - nread ;
1200 if( nwrite > 0 ) return nwrite ;
1201 return 0 ;
1202 }
1203
1204 return -1 ; /* should never be reached */
1205 }
1206
1207 /*----------------------------------------------------------------------------
1208 Send (at most) nbytes of data from buffer down the IOCHAN. Return value is
1209 the number of bytes actually sent, or is -1 if some error occurs.
1210 (Zero bytes might be sent under some circumstances.)
1211
1212 Tcp: IOCHANs use blocking sends, so that all the data should be
1213 sent properly unless the connection to the other end fails for some
1214 reason.
1215
1216 Shm: IOCHANs may not be able to send all the data, if the buffer is
1217 nearly full. This function will send what it can, and return
1218 immediately (will not wait for space to clear up). If no space is
1219 available in the buffer, nothing will be sent (0 will be returned).
1220 ------------------------------------------------------------------------------*/
1221
iochan_send(IOCHAN * ioc,char * buffer,int nbytes)1222 int iochan_send( IOCHAN * ioc , char * buffer , int nbytes )
1223 {
1224 int ii ;
1225
1226 /** check for reasonable inputs **/
1227
1228 error_string = NULL ;
1229
1230 if( ioc == NULL || IOC_BAD(ioc) != 0 ||
1231 buffer == NULL || nbytes < 0 ){
1232
1233 error_string = "iochan_send: bad inputs" ; return -1 ;
1234 }
1235
1236 if( nbytes == 0 ) return 0 ;
1237
1238 ii = iochan_goodcheck(ioc,0) ;
1239 if( ii != 1 ){
1240 if( error_string == NULL )
1241 error_string = "iochan_send: iochan_goodcheck fails" ;
1242 return ii ;
1243 }
1244
1245 ii = iochan_writecheck(ioc,1) ;
1246 if( ii <= 0 ){
1247 if( error_string == NULL )
1248 error_string = "iochan_send: iochan_writecheck fails" ;
1249 return ii ;
1250 }
1251
1252 /** tcp: ==> just use send **/
1253
1254 if( ioc->type == TCP_IOCHAN ){
1255 if( !nosigpipe ){ signal( SIGPIPE , SIG_IGN ) ; nosigpipe = 1 ; }
1256
1257 if( ioc->sendsize <= 0 || nbytes <= ioc->sendsize ){
1258 int nsent = send( ioc->id , buffer , nbytes , 0 ) ;
1259 if( nsent == -1 ) PERROR("Can't use socket? tcp[send]") ;
1260 if( nsent < 0 ) error_string = "iochan_send: tcp send fails" ;
1261 return nsent ;
1262 } else {
1263 int nsent , ntosend , ntot = 0 ;
1264 do{
1265 while( tcp_writecheck(ioc->id,1) == 0 ) ; /* spin */
1266 ntosend = MIN( ioc->sendsize , nbytes-ntot ) ;
1267 nsent = send( ioc->id , buffer+ntot , ntosend , 0 ) ;
1268 if( nsent == -1 ) PERROR("Can't use socket? tcp[send]") ;
1269 if( nsent <= 0 ){
1270 error_string = "iochan_send: tcp send fails" ;
1271 return ((ntot>0) ? ntot : nsent) ;
1272 }
1273 ntot += nsent ;
1274 } while( ntot < nbytes ) ;
1275 return ntot ;
1276 }
1277 }
1278
1279 /** shm: ==> write into the circular buffer, past "bend" **/
1280
1281 if( ioc->type == SHM_IOCHAN ){
1282 int nread,nwrite , bend,bstart , ebot,etop , size ;
1283
1284 ioc = SHMIOC_WRITE(ioc) ; /* 24 June 1997 */
1285
1286 bend = *(ioc->bend) ; bstart = *(ioc->bstart) ; size = ioc->bufsize ;
1287 nread = ( bend - bstart + size + 1 ) % size ; /* amount readable */
1288 nwrite = size - 1 - nread ; /* amount writeable */
1289 if( nwrite <= 0 ) return 0 ; /* can't write! */
1290
1291 if( nwrite > nbytes ) nwrite = nbytes ; /* how much to write */
1292
1293 ebot = bend+1 ; if( ebot >= size ) ebot = 0 ; /* start at ebot */
1294 etop = ebot+nwrite-1 ; /* end at etop */
1295
1296 if( etop < size ){ /* 1 piece to copy */
1297 BCOPY( ioc->buf + ebot, buffer, nwrite ) ; /* copy data */
1298 *(ioc->bend) = etop ; /* change bend */
1299 #ifdef DEBUG
1300 fprintf(stderr,"iochan_send: shm 1 piece: %d to %d\n",ebot,etop) ;
1301 #endif
1302
1303 } else { /* 2 pieces to copy */
1304 int nn = size - ebot ; /* size of piece 1 */
1305 BCOPY( ioc->buf + ebot, buffer , nn ) ; /* copy piece 1 */
1306 BCOPY( ioc->buf , buffer+nn, nwrite-nn ) ; /* copy piece 2 */
1307 *(ioc->bend) = nwrite-nn-1 ; /* change bend */
1308 #ifdef DEBUG
1309 fprintf(stderr,"iochan_send: shm 2 pieces: %d to %d AND %d to %d\n",
1310 ebot,ebot+nn-1,0,nwrite-nn-1) ;
1311 #endif
1312
1313 }
1314 return nwrite ;
1315 }
1316
1317 return -1 ; /* should not be reached */
1318 }
1319
1320 /*----------------------------------------------------------------------------
1321 Send (exactly) nbytes of data from the buffer down the IOCHAN. The only
1322 difference between this and iochan_send is that this function will not
1323 return until all the data is sent, even if it takes forever.
1324 Under these circumstances, it would be good if the reader process is
1325 still working.
1326 ------------------------------------------------------------------------------*/
1327
iochan_sendall(IOCHAN * ioc,char * buffer,int nbytes)1328 int iochan_sendall( IOCHAN * ioc , char * buffer , int nbytes )
1329 {
1330 int ii , ntot=0 , dms=0 ;
1331
1332 error_string = NULL ;
1333
1334 /** check for reasonable inputs **/
1335
1336 if( ioc == NULL || IOC_BAD(ioc) != 0 ||
1337 buffer == NULL || nbytes < 0 ){
1338
1339 error_string = "iochan_sendall: bad inputs" ; return -1 ;
1340 }
1341
1342 if( nbytes == 0 ) return 0 ;
1343
1344 while(1){
1345 ii = iochan_send( ioc , buffer+ntot , nbytes-ntot ); /* send what's left */
1346 if( ii == -1 ){ /* an error!? */
1347 if( error_string == NULL )
1348 error_string = "iochan_sendall: iochan_send fails" ;
1349 return -1 ;
1350 }
1351 ntot += ii ; /* total sent so far */
1352 if( ntot == nbytes ) return nbytes ; /* all done!? */
1353 dms = NEXTDMS(dms) ; iochan_sleep(dms) ; /* wait a while */
1354 }
1355 return -1 ; /* should never be reached */
1356 }
1357
1358 /*----------------------------------------------------------------------------
1359 Read up to nbytes of data from the IOCHAN, into buffer. Returns the
1360 number of bytes actually read. For both the case of sockets and
1361 shmem segments, this may be less than nbytes (may even be 0). If an
1362 error occurs, -1 is returned.
1363 ------------------------------------------------------------------------------*/
1364
iochan_recv(IOCHAN * ioc,char * buffer,int nbytes)1365 int iochan_recv( IOCHAN * ioc , char * buffer , int nbytes )
1366 {
1367 /** check for reasonable inputs **/
1368
1369 error_string = NULL ;
1370
1371 if( ioc == NULL || IOC_BAD(ioc) != 0 ||
1372 buffer == NULL || nbytes < 0 ){
1373
1374 error_string = "iochan_recv: bad inputs" ; return -1 ;
1375 }
1376
1377 if( nbytes == 0 ) return 0 ;
1378 if( iochan_goodcheck(ioc,0) != 1 ) return -1 ;
1379
1380 /** tcp: just use recv **/
1381
1382 if( ioc->type == TCP_IOCHAN ){
1383 int ii = tcp_recv( ioc->id , buffer , nbytes , 0 ) ;
1384 if( ii == -1 ){
1385 PERROR("Can't read from socket? tcp[recv]") ;
1386 error_string = "iochan_recv: tcp recv fails" ;
1387 }
1388 return ii ;
1389 }
1390
1391 /** shm: read from the circular buffer, starting at bstart **/
1392
1393 if( ioc->type == SHM_IOCHAN ){
1394 int nread, bend,bstart , size , sbot,stop ;
1395
1396 ioc = SHMIOC_READ(ioc) ; /* 24 June 1997 */
1397
1398 bend = *(ioc->bend) ; bstart = *(ioc->bstart) ; size = ioc->bufsize ;
1399 nread = ( bend - bstart + size + 1 ) % size ; /* readable amount */
1400 if( nread <= 0 ) return 0 ; /* nothing!? */
1401 if( nread > nbytes ) nread = nbytes ; /* amount to read */
1402
1403 sbot = bstart ; stop = sbot + nread-1 ; /* from sbot to stop */
1404
1405 if( stop < size ){ /* 1 piece to copy */
1406 BCOPY( buffer, ioc->buf + sbot, nread ) ; /* copy the data */
1407 *(ioc->bstart) = (stop+1) % size ; /* move bstart up */
1408 #ifdef DEBUG
1409 fprintf(stderr,"iochan_recv: get 1 piece: %d to %d\n",sbot,stop) ;
1410 #endif
1411
1412 } else { /* 2 pieces to copy */
1413 int nn = size - sbot ; /* size of piece 1 */
1414 BCOPY( buffer , ioc->buf + sbot, nn ) ; /* copy piece 1 */
1415 BCOPY( buffer+nn, ioc->buf , nread-nn ) ; /* copy piece 2 */
1416 *(ioc->bstart) = nread-nn ; /* move bstart up */
1417 #ifdef DEBUG
1418 fprintf(stderr,"iochan_recv: get 2 pieces: %d to %d AND %d to %d\n",
1419 sbot,sbot+nn-1,0,nread-nn-1) ;
1420 #endif
1421
1422 }
1423 return nread ;
1424 }
1425
1426 return -1 ; /* should not be reached */
1427 }
1428
1429 /*----------------------------------------------------------------------------
1430 Read as much data as possible from the iochan, looping until nothing
1431 is left -- 22 May 2001 -- RWCox.
1432 ------------------------------------------------------------------------------*/
1433
iochan_recvloop(IOCHAN * ioc,char * buffer,int nbytes)1434 int iochan_recvloop( IOCHAN * ioc , char * buffer , int nbytes )
1435 {
1436 int jj , nbuf=0 ;
1437
1438 error_string = NULL ;
1439
1440 /** check for reasonable inputs **/
1441
1442 if( ioc == NULL || IOC_BAD(ioc) != 0 ||
1443 buffer == NULL || nbytes < 0 ){
1444
1445 error_string = "iochan_recvloop: bad inputs" ; return -1 ;
1446 }
1447
1448 if( iochan_goodcheck(ioc,0) != 1 ) return -1 ;
1449
1450 if( nbytes == 0 ) return 0 ;
1451
1452 while(1){
1453 jj = iochan_recv( ioc , buffer+nbuf , nbytes-nbuf ) ;
1454 if( jj < 1 ) break ; /* stop if nothing more comes in */
1455 nbuf += jj ;
1456 if( nbuf >= nbytes ) break ; /* stop if overflow */
1457 iochan_sleep(1) ;
1458 }
1459
1460 return nbuf ;
1461 }
1462
1463 /*----------------------------------------------------------------------------
1464 Receive (exactly) nbytes of data from the buffer down the IOCHAN. The only
1465 difference between this and iochan_recv is that this function will not
1466 return until all the data is received, even if it takes forever.
1467 Under these circumstances, it would be good if the writer process is
1468 still working.
1469 ------------------------------------------------------------------------------*/
1470
iochan_recvall(IOCHAN * ioc,char * buffer,int nbytes)1471 int iochan_recvall( IOCHAN * ioc , char * buffer , int nbytes )
1472 {
1473 int ii , ntot=0 , dms=0 ;
1474
1475 /** check for reasonable inputs **/
1476
1477 error_string = NULL ;
1478
1479 if( ioc == NULL || IOC_BAD(ioc) != 0 ||
1480 buffer == NULL || nbytes < 0 ){
1481
1482 error_string = "iochan_recvall: bad inputs" ; return -1 ;
1483 }
1484
1485 if( nbytes == 0 ) return 0 ;
1486
1487 while(1){
1488 ii = iochan_recv( ioc , buffer+ntot , nbytes-ntot ) ; /* get what's left */
1489 if( ii == -1 ) return -1 ; /* an error!? */
1490 ntot += ii ; /* total so far */
1491 if( ntot == nbytes ) return nbytes ; /* all done!? */
1492
1493 /* Only slow down if we are receiving little bits, otherwise any large
1494 buffer that is broken into small pieces could take a very long time,
1495 even such as a single anatomical volume. 17 Jul 2015 [rickr] */
1496 /* Note that the limit is likely SOCKET_BUFSIZE (31*1024) bytes. */
1497 if( ii < 4096 ) dms = NEXTDMS(dms) ;
1498
1499 iochan_sleep(dms) ; /* wait a while */
1500 }
1501 return -1 ; /* should never be reached */
1502 }
1503
1504 /*-----------------------------------------------------------------
1505 Sleep a given # of milliseconds (uses the Unix select routine)
1506 ------------------------------------------------------------------*/
1507
iochan_sleep(int msec)1508 void iochan_sleep( int msec )
1509 {
1510 struct timeval tv ;
1511 if( msec <= 0 ) return ;
1512 tv.tv_sec = msec/1000 ;
1513 tv.tv_usec = (msec%1000)*1000 ;
1514 select( 1 , NULL,NULL,NULL , &tv ) ;
1515 return ;
1516 }
1517
1518 /*-----------------------------------------------------------------
1519 Send some control command to the IOCHAN. At present the
1520 only command available is to limit the size of TCP/IP sends.
1521 ------------------------------------------------------------------*/
1522
iochan_ctl(IOCHAN * ioc,int cmd,int arg)1523 int iochan_ctl( IOCHAN * ioc , int cmd , int arg )
1524 {
1525 if( ioc == NULL ) return -1 ;
1526
1527 switch( cmd ){
1528
1529 case IOC_TCP_SENDSIZE:
1530 if( arg >= 0 ){ ioc->sendsize = arg ; return 0 ; }
1531 else return -1 ;
1532 break ;
1533
1534 }
1535 return -1 ;
1536 }
1537
1538 /*-----------------------------------------------------------------
1539 Relay data from one IOCHAN to another in a fork()-ed process:
1540 name_in = name of input IOCHAN; will be read from only
1541 - should be opened with "create" in the caller before
1542 this call, and then the child process will open with
1543 "accept".
1544 name_out = name of output IOCHAN; will be written to only
1545 - will be opened with "create" in the child and then
1546 child will wait for someone to "accept"
1547 Return value is the pid of the forked process; (pid_t)-1 if
1548 something bad happened at startup. If something bad happens
1549 later, the child will _exit(1), and it will also close both
1550 IOCHANs it opened. You can detect the child exit with waitpid(),
1551 and can detect the closing of the name_in IOCHAN using
1552 iochan_writecheck(), as in this fragment:
1553
1554 pid_t qpid , ppid ;
1555 IOCHAN *ioc ;
1556 char * data ;
1557 int ndata ;
1558
1559 ioc = iochan_init( "shm:Elvis:1M" , "create" ) ;
1560 if( ioc == NULL ){
1561 ... do something here if can't open for output
1562 }
1563 ppid = iochan_fork_relay( "shm:Elvis:1M" , "tcp:Fabian:1234" ) ;
1564 if( ppid == (pid_t)-1 ){
1565 IOCHAN_CLOSE(ioc) ;
1566 ... do something here if fork failed
1567 }
1568
1569 ... later: check if child process died
1570
1571 qpid = waitpid( ppid , NULL , WNOHANG ) ;
1572 if( qpid == ppid ){
1573 IOCHAN_CLOSE(ioc) ;
1574 ... probably do something else here to, to mark end of output
1575 }
1576
1577 ... check if output IOCHAN is ready for data
1578
1579 if( iochan_writecheck(ioc,0) >= ndata ){
1580 iochan_sendall( ioc , data , ndata ) ;
1581 } else {
1582 IOCHAN_CLOSE(ioc) ;
1583 ... probably do something else here to, to mark end of output
1584 }
1585
1586 As in the example, the usual way to use this would have name_in
1587 be a shm IOCHAN, and name_out be a tcp IOCHAN; this would then
1588 let a main program relay socket data through a separate process,
1589 so if the socket freezes up, then the main program can merrily
1590 continue (by using iochan_writecheck to see if the shm IOCHAN
1591 is available for output).
1592
1593 -- 23 May 2001 -- RWCox
1594 -------------------------------------------------------------------*/
1595
1596 static IOCHAN *ioc_kill_1 = NULL ;
1597 static IOCHAN *ioc_kill_2 = NULL ;
1598
iochan_fork_sigfunc(int sig)1599 static void iochan_fork_sigfunc(int sig) /* for when the child dies */
1600 {
1601 switch( sig ){
1602 case SIGTERM:
1603 if( ioc_kill_1 != NULL ) iochan_close(ioc_kill_1) ;
1604 if( ioc_kill_2 != NULL ) iochan_close(ioc_kill_2) ;
1605 fprintf(stderr,"\n*** iochan_fork received SIGTERM signal\n");
1606 fflush(stderr) ;
1607 _exit(1) ;
1608 case SIGSEGV:
1609 if( ioc_kill_1 != NULL ) iochan_close(ioc_kill_1) ;
1610 if( ioc_kill_2 != NULL ) iochan_close(ioc_kill_2) ;
1611 fprintf(stderr,"\n*** iochan_fork received SIGSEGV signal\n");
1612 fflush(stderr) ;
1613 _exit(1) ;
1614 }
1615 }
1616
1617 /*-----------------------------------------------------------------*/
1618
iochan_fork_relay(char * name_in,char * name_out)1619 pid_t iochan_fork_relay( char * name_in , char * name_out )
1620 {
1621 pid_t ppid = (pid_t)(-1) ;
1622 int jj , kk , nbuf ;
1623 #define MBUF 1048576 /* 1 Megabyte */
1624 char * buf , *sss ;
1625 IOCHAN *ioc_in, *ioc_out ;
1626
1627 if( name_in == NULL || name_out == NULL ) return ppid ;
1628
1629 /*-- fork into two processes --*/
1630
1631 ppid = fork() ;
1632 if( ppid == (pid_t)(-1) ){
1633 perror("iochan_fork failed") ;
1634 return ppid ;
1635 }
1636
1637 if( ppid != 0 ){ /* the parent process */
1638 pid_t qpid ;
1639 iochan_sleep(5) ; /* wait a little bit */
1640 qpid = waitpid( ppid , NULL , WNOHANG ) ; /* see if child died */
1641 if( qpid == ppid ) ppid = (pid_t)(-1) ; /* if it did, return error */
1642 return ppid ;
1643 }
1644
1645 /*--- from here on is the child process, which never returns ---*/
1646
1647 ioc_in = iochan_init( name_in , "accept" ) ; /* open input */
1648 if( ioc_in == NULL ) _exit(1) ; /* failed? */
1649
1650 ioc_out = iochan_init( name_out , "create" ) ; /* open output */
1651 if( ioc_out == NULL ){ /* failed? */
1652 iochan_close(ioc_in) ; _exit(1) ;
1653 }
1654
1655 /* set signal handler to deal with sudden death situations */
1656
1657 ioc_kill_1 = ioc_in ;
1658 ioc_kill_2 = ioc_out ;
1659 signal( SIGTERM , iochan_fork_sigfunc ) ;
1660 signal( SIGSEGV , iochan_fork_sigfunc ) ;
1661
1662 fprintf(stderr,"forked process for shm->tcp started\n") ;
1663
1664 do{ /* loop until both iochans are ready */
1665
1666 jj = iochan_goodcheck(ioc_in ,1) ;
1667 kk = iochan_goodcheck(ioc_out,1) ;
1668 if( jj < 0 || kk < 0 ){
1669 iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
1670 }
1671
1672 } while( jj == 0 || kk == 0 ) ;
1673
1674 fprintf(stderr,"forked process fully connected\n") ;
1675
1676 buf = AFMALL(char, MBUF) ; /* workspace for transfers */
1677 if( buf == NULL ){
1678 fprintf(stderr,"forked process can't malloc I/O buffer") ;
1679 iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
1680 }
1681
1682 while(1){ /* loop, waiting for data */
1683
1684 errno = 0 ;
1685 jj = iochan_readcheck( ioc_in , 20 ) ; /* any input? */
1686 if( jj < 0 ){ /* bad news? */
1687 if( errno ) perror( "forked readcheck" ) ;
1688 else fprintf(stderr,"forked readcheck abort: jj=%d!\n",jj) ;
1689 sss = iochan_error_string() ;
1690 if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
1691 break ;
1692 }
1693 if( jj == 0 ) continue ; /* no news */
1694
1695 nbuf = iochan_recvloop( ioc_in , buf , MBUF ) ; /* get input! */
1696 if( nbuf <= 0 ) continue ; /* too weird! */
1697
1698 #if 0
1699 fprintf(stderr,"forked process read %d bytes\n",nbuf) ;
1700 #endif
1701
1702 errno = 0 ;
1703 kk = iochan_writecheck( ioc_out , 1 ) ; /* check */
1704 if( kk == 0 ){
1705 int qq ;
1706 fprintf(stderr,"forked writecheck repeat:") ;
1707 for( qq=0 ; qq < 1000 ; qq++ ){
1708 if( qq%50 == 0 ) fprintf(stderr," %d",qq+1) ;
1709 kk = iochan_writecheck( ioc_out , 2 ) ;
1710 if( kk != 0 ) break ;
1711 }
1712 fprintf(stderr,"\n") ;
1713 }
1714 if( kk <= 0 ){
1715 if( errno ) perror( "forked writecheck" ) ;
1716 else fprintf(stderr,"forked writecheck abort: kk=%d!\n",kk) ;
1717 sss = iochan_error_string() ;
1718 if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
1719 break ;
1720 }
1721 kk = iochan_sendall( ioc_out , buf , nbuf ) ; /* send data! */
1722 if( kk < 0 ){ /* bad news? */
1723 if( errno ) perror( "forked sendall" ) ;
1724 else fprintf(stderr,"forked sendall abort: kk=%d!\n",kk) ;
1725 sss = iochan_error_string() ;
1726 if( sss != NULL ) fprintf(stderr," ** %s\n",sss) ;
1727 break ;
1728 }
1729
1730 #if 0
1731 fprintf(stderr,"forked process wrote %d bytes\n",nbuf) ;
1732 #endif
1733 }
1734
1735 /* bad news ==> shut down child operations */
1736
1737 fprintf(stderr,"forked process fails!\n") ;
1738
1739 iochan_close(ioc_in) ; iochan_close(ioc_out) ; _exit(1) ;
1740 }
1741
1742 /*-----------------------------------------------------------------
1743 Return time elapsed since first call to this routine
1744 -------------------------------------------------------------------*/
1745
1746 #include <time.h>
1747
COX_clock_time(void)1748 double COX_clock_time(void) /* in seconds */
1749 {
1750 struct timeval new_tval ;
1751 struct timezone tzone ;
1752 static struct timeval old_tval ;
1753 static int first = 1 ;
1754
1755 gettimeofday( &new_tval , &tzone ) ;
1756
1757 if( first ){
1758 old_tval = new_tval ;
1759 first = 0 ;
1760 return 0.0 ;
1761 }
1762
1763 if( old_tval.tv_usec > new_tval.tv_usec ){
1764 new_tval.tv_usec += 1000000 ;
1765 new_tval.tv_sec -- ;
1766 }
1767
1768 return (double)( (new_tval.tv_sec - old_tval.tv_sec )
1769 +(new_tval.tv_usec - old_tval.tv_usec)*1.0e-6 ) ;
1770 }
1771
1772 /*-----------------------------------------------------------------
1773 Return total user-level CPU time for this process.
1774 -------------------------------------------------------------------*/
1775
COX_cpu_time(void)1776 double COX_cpu_time(void) /* in seconds */
1777 #ifdef CLK_TCK
1778 {
1779 struct tms ttt ;
1780
1781 (void) times( &ttt ) ;
1782 return ( (double) (ttt.tms_utime /* + ttt.tms_stime */ )
1783 / (double) CLK_TCK ) ;
1784 }
1785 #else
1786 { return 0.0 ; }
1787 #endif
1788