1 /* ``Licensed under the Apache License, Version 2.0 (the "License");
2  * you may not use this file except in compliance with the License.
3  * You may obtain a copy of the License at
4  *
5  *     http://www.apache.org/licenses/LICENSE-2.0
6  *
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  *
13  * The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14  * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15  * AB. All Rights Reserved.''
16  *
17  *     $Id$
18  */
19 
20 /*
21  * Purpose: Special purpouse Unix domain socket driver for distribution.
22  */
23 
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <fcntl.h>
34 
35 #define HAVE_UIO_H
36 #include "erl_driver.h"
37 
38 #define DEBUG
39 /*#define HARDDEBUG 1*/
40 /*
41 ** Some constants/macros
42 */
43 
44 #ifdef HARDDEBUG
45 #define DEBUGF(P) debugf P
46 #include <stdarg.h>
debugf(char * str,...)47 static void debugf(char *str, ...)
48 {
49     va_list ap;
50     va_start(ap,str);
51     fprintf(stderr,"Uds_drv debug: ");
52     vfprintf(stderr,str, ap);
53     fprintf(stderr,"\r\n");
54     va_end(ap);
55 }
56 #ifndef DEBUG
57 #define DEBUG 1
58 #endif
59 #else
60 #define DEBUGF(P)
61 #endif
62 
63 
64 #ifdef DEBUG
65 #define ASSERT(X) 							\
66 do {									\
67     if (!(X)) {								\
68 	fprintf(stderr,"Assertion (%s) failed at line %d file %s\r\n", #X, \
69 		__LINE__, __FILE__); 					\
70 	exit(1);							\
71     } 									\
72 } while(0)
73 #define ASSERT_NONBLOCK(FD) ASSERT(fcntl((FD), F_GETFL, 0) & O_NONBLOCK)
74 #else
75 #define ASSERT(X)
76 #define ASSERT_NONBLOCK(FD)
77 #endif
78 
79 
80 
81 #define SET_NONBLOCKING(FD)			\
82      fcntl((FD), F_SETFL, 			\
83 	   fcntl((FD), F_GETFL, 0) | O_NONBLOCK)
84 
85 #define ALLOC(X) my_malloc(X)
86 #define REALLOC(P,X) my_realloc(P,X)
87 #define FREE(X) driver_free(X)
88 
89 #define CHUNK_SIZE 256
90 
91 #define DIST_MAGIC_RECV_TAG 100
92 
93 /*
94 ** The max length of an I/O vector seams to be impossible to find
95 ** out (?), so this is just a value known to work on solaris.
96 */
97 #define IO_VECTOR_MAX 16
98 
99 #define SOCKET_PATH "/tmp/erlang"
100 #define LOCK_SUFFIX ".lock"
101 
102 #define NORMAL_READ_FAILURE -1
103 #define SEVERE_READ_FAILURE -2
104 #define EOF_READ_FAILURE    -3
105 
106 /*
107 ** Internal structures
108 */
109 
110 #define HEADER_LENGTH 4
111 
112 typedef enum {
113     portTypeUnknown,      /* An uninitialized port */
114     portTypeListener,     /* A listening port/socket */
115     portTypeAcceptor,     /* An intermediate stage when accepting
116 			     on a listen port */
117     portTypeConnector,    /* An intermediate stage when connecting */
118     portTypeCommand,      /* A connected open port in command mode */
119     portTypeIntermediate, /* A connected open port in special half
120 			     active mode */
121     portTypeData          /* A connectec open port in data mode */
122 } PortType;
123 
124 typedef unsigned char Byte;
125 typedef unsigned int Word;
126 
127 typedef struct uds_data {
128     int fd;                   /* File descriptor */
129     ErlDrvPort port;          /* The port identifier */
130     int lockfd;               /* The file descriptor for a lock file in
131 				 case of listen sockets */
132     Byte creation;            /* The creation serial derived from the
133 				 lockfile */
134     PortType type;            /* Type of port */
135     char *name;               /* Short name of socket for unlink */
136     Word sent;                /* Messages sent */
137     Word received;            /* Messages received */
138     struct uds_data *partner; /* The partner in an accept/listen pair */
139     struct uds_data *next;    /* Next structure in list */
140 
141     /* The input buffer and it's data */
142     int buffer_size;          /* The allocated size of the input buffer */
143     int buffer_pos;           /* Current position in input buffer */
144     int header_pos;           /* Where the current header is in the
145 				 input buffer */
146     Byte *buffer;            /* The actual input buffer */
147 } UdsData;
148 
149 /*
150 ** Interface routines
151 */
152 static ErlDrvData uds_start(ErlDrvPort port, char *buff);
153 static void uds_stop(ErlDrvData handle);
154 static void uds_command(ErlDrvData handle, char *buff, int bufflen);
155 static void uds_input(ErlDrvData handle, ErlDrvEvent event);
156 static void uds_output(ErlDrvData handle, ErlDrvEvent event);
157 static void uds_finish(void);
158 static int uds_control(ErlDrvData handle, unsigned int command,
159 		       char* buf, int count, char** res, int res_size);
160 static void uds_stop_select(ErlDrvEvent event, void*);
161 
162 /*
163 ** Local helpers forward declarations
164 */
165 
166 static void uds_command_listen(UdsData *ud, char *buff, int bufflen);
167 static void uds_command_accept(UdsData *ud, char *buff, int bufflen);
168 static void uds_command_connect(UdsData *ud, char *buff, int bufflen);
169 
170 static void do_stop(UdsData *ud, int shutting_down);
171 static void do_send(UdsData *ud, char *buff, int bufflen);
172 static void do_recv(UdsData *ud);
173 
174 static int report_control_error(char **buffer, int buff_len,
175 				char *error_message);
176 static int  send_out_queue(UdsData *ud);
177 static int buffered_read_package(UdsData *ud, char **result);
178 static int read_at_least(UdsData *ud, int num);
179 static int get_packet_length(char *b);
180 static void put_packet_length(char *b, int len);
181 static void *my_malloc(size_t size);
182 static void *my_realloc(void *optr, size_t size);
183 static int try_lock(char *sockname, Byte *p_creation);
184 static int ensure_dir(char *path);
185 static void do_unlink(char *name);
186 
187 /*
188 ** Global data
189 */
190 
191 /* The driver entry */
192 ErlDrvEntry uds_driver_entry = {
193     NULL,		   /* init, N/A */
194     uds_start,             /* start, called when port is opened */
195     uds_stop,              /* stop, called when port is closed */
196     uds_command,           /* output, called when erlang has sent */
197     uds_input,             /* ready_input, called when input descriptor
198 			      ready */
199     uds_output,            /* ready_output, called when output
200 			      descriptor ready */
201     "uds_drv",             /* char *driver_name, the argument to open_port */
202     uds_finish,            /* finish, called when unloaded */
203     NULL,                  /* void * that is not used (BC) */
204     uds_control,           /* control, port_control callback */
205     NULL,                  /* timeout, called on timeouts */
206     NULL,                  /* outputv, vector output interface */
207     NULL,                  /* ready_async */
208     NULL,                  /* flush */
209     NULL,                  /* call */
210     NULL,                  /* event */
211     ERL_DRV_EXTENDED_MARKER,
212     ERL_DRV_EXTENDED_MAJOR_VERSION,
213     ERL_DRV_EXTENDED_MINOR_VERSION,
214     0,	/* ERL_DRV_FLAGs */
215     NULL,
216     NULL,                  /* process_exit */
217     uds_stop_select
218 };
219 
220 /* Beginning of linked list of ports */
221 static UdsData *first_data;
222 
223 /*
224 **
225 ** Driver interface routines
226 **
227 */
228 
229 /*
230 ** Driver initialization routine
231 */
DRIVER_INIT(uds_drv)232 DRIVER_INIT(uds_drv)
233 {
234     first_data = NULL;
235     return &uds_driver_entry;
236 }
237 
238 /*
239 ** A port is opened, we need no information whatsoever about the socket
240 ** at this stage.
241 */
uds_start(ErlDrvPort port,char * buff)242 static ErlDrvData uds_start(ErlDrvPort port, char *buff)
243 {
244     UdsData *ud;
245 
246     ud = ALLOC(sizeof(UdsData));
247     ud->fd = -1;
248     ud->lockfd = -1;
249     ud->creation = 0;
250     ud->port = port;
251     ud->type = portTypeUnknown;
252     ud->name = NULL;
253     ud->buffer_size = 0;
254     ud->buffer_pos = 0;
255     ud->header_pos = 0;
256     ud->buffer = NULL;
257     ud->sent = 0;
258     ud->received = 0;
259     ud->partner = NULL;
260     ud->next = first_data;
261     first_data = ud;
262 
263     return((ErlDrvData) ud);
264 }
265 
266 /*
267 ** Close the socket/port and free up
268 */
uds_stop(ErlDrvData handle)269 static void uds_stop(ErlDrvData handle)
270 {
271     do_stop((UdsData *) handle, 0);
272 }
273 
274 /*
275 ** Command interface, operates in two modes, Command mode and data mode.
276 ** Mode is shifted with the port_control function.
277 ** Command mode protocol:
278 ** 'L'<socketname>: Lock and listen on socket.
279 ** 'A'<listennumber as 32 bit bigendian>: Accept from the port referenced by the
280 **                                        "listennumber"
281 ** 'C'<socketname>: Connect to the socket named <socketname>
282 ** 'S'<data>: Send the data <data>
283 ** 'R': Receive one packet of data
284 ** Data mode protocol:
285 ** Send anything that arrives (no opcodes/skip opcodes).
286 */
287 
uds_command(ErlDrvData handle,char * buff,int bufflen)288 static void uds_command(ErlDrvData handle, char *buff, int bufflen)
289 {
290     UdsData *ud = (UdsData *) handle;
291 
292     if (ud->type == portTypeData || ud->type == portTypeIntermediate) {
293 	DEBUGF(("Passive do_send %d",bufflen));
294 	do_send(ud, buff + 1, bufflen - 1); /* XXX */
295 	return;
296     }
297     if (bufflen == 0) {
298 	return;
299     }
300     switch (*buff) {
301     case 'L':
302 	if (ud->type != portTypeUnknown) {
303 	    driver_failure_posix(ud->port, ENOTSUP);
304 	    return;
305 	}
306 	uds_command_listen(ud,buff,bufflen);
307 	return;
308     case 'A':
309 	if (ud->type != portTypeUnknown) {
310 	    driver_failure_posix(ud->port, ENOTSUP);
311 	    return;
312 	}
313 	uds_command_accept(ud,buff,bufflen);
314 	return;
315     case 'C':
316 	if (ud->type != portTypeUnknown) {
317 	    driver_failure_posix(ud->port, ENOTSUP);
318 	    return;
319 	}
320 	uds_command_connect(ud,buff,bufflen);
321 	return;
322     case 'S':
323 	if (ud->type != portTypeCommand) {
324 	    driver_failure_posix(ud->port, ENOTSUP);
325 	    return;
326 	}
327 	do_send(ud, buff + 1, bufflen - 1);
328 	return;
329     case 'R':
330 	if (ud->type != portTypeCommand) {
331 	    driver_failure_posix(ud->port, ENOTSUP);
332 	    return;
333 	}
334 	do_recv(ud);
335 	return;
336     default:
337 	ASSERT(0);
338 	return;
339     }
340 }
341 
uds_input(ErlDrvData handle,ErlDrvEvent event)342 static void uds_input(ErlDrvData handle, ErlDrvEvent event)
343 {
344     UdsData *ud = (UdsData *) handle;
345 
346     DEBUGF(("In uds_input type = %d",ud->type));
347     if (ud->type == portTypeListener) {
348 	UdsData *ad = ud->partner;
349 	struct sockaddr_un peer;
350 	int pl = sizeof(struct sockaddr_un);
351 	int fd;
352 
353 	ASSERT(ad != NULL);
354 	if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) {
355 	    if (errno != EWOULDBLOCK) {
356 		DEBUGF(("Accept failed."));
357 		driver_failure_posix(ud->port, errno);
358 		return;
359 	    }
360 	    DEBUGF(("Accept would block."));
361 	    return;
362 	}
363 	SET_NONBLOCKING(fd);
364 	ad->fd = fd;
365 	ad->partner = NULL;
366 	ad->type = portTypeCommand;
367 	ud->partner = NULL;
368 	DEBUGF(("Accept successful."));
369 	driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
370 	driver_output(ad->port, "Aok",3);
371 	return;
372     }
373     /* OK, normal data or command port */
374     ASSERT(ud->type >= portTypeCommand);
375 #ifdef HARDDEBUG
376     if (ud->type == portTypeData)
377 	DEBUGF(("Passive do_recv"));
378 #endif
379     do_recv(ud);
380 }
381 
uds_output(ErlDrvData handle,ErlDrvEvent event)382 static void uds_output(ErlDrvData handle, ErlDrvEvent event)
383 {
384    UdsData *ud = (UdsData *) handle;
385    if (ud->type == portTypeConnector) {
386        ud->type = portTypeCommand;
387        driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
388        driver_output(ud->port, "Cok",3);
389        return;
390    }
391    ASSERT(ud->type == portTypeCommand || ud->type == portTypeData);
392    send_out_queue(ud);
393 }
394 
uds_finish(void)395 static void uds_finish(void)
396 {
397     while (first_data != NULL) {
398 	do_stop(first_data, 1);
399     }
400 }
401 
402 /*
403 ** Protocol to control:
404 ** 'C': Set port in command mode.
405 ** 'I': Set port in intermediate mode
406 ** 'D': Set port in data mode
407 ** 'N': Get identification number for listen port
408 ** 'S': Get statistics
409 ** 'T': Send a tick message
410 ** 'R': Get creation number of listen socket
411 ** Answer is one byte status (0 == ok, Other is followed by error as string)
412 ** followed by data if applicable
413 */
uds_control(ErlDrvData handle,unsigned int command,char * buf,int count,char ** res,int res_size)414 static int uds_control(ErlDrvData handle, unsigned int command,
415 		       char* buf, int count, char** res, int res_size)
416 {
417 /* Local macro to ensure large enough buffer. */
418 #define ENSURE(N) 				\
419    do {						\
420        if (res_size < N) {			\
421 	   *res = ALLOC(N);			\
422        }					\
423    } while(0)
424 
425    UdsData *ud = (UdsData *) handle;
426 
427    DEBUGF(("Control, type = %d, fd = %d, command = %c", ud->type, ud->fd,
428 	   (char) command));
429    switch (command) {
430    case 'S':
431        {
432 	   ENSURE(13);
433 	   **res = 0;
434 	   put_packet_length((*res) + 1, ud->received);
435 	   put_packet_length((*res) + 5, ud->sent);
436 	   put_packet_length((*res) + 9, driver_sizeq(ud->port));
437 	   return 13;
438        }
439    case 'C':
440        if (ud->type < portTypeCommand) {
441 	   return report_control_error(res, res_size, "einval");
442        }
443        ud->type = portTypeCommand;
444        driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
445        ENSURE(1);
446        **res = 0;
447        return 1;
448    case 'I':
449        if (ud->type < portTypeCommand) {
450 	   return report_control_error(res, res_size, "einval");
451        }
452        ud->type = portTypeIntermediate;
453        driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
454        ENSURE(1);
455        **res = 0;
456        return 1;
457    case 'D':
458        if (ud->type < portTypeCommand) {
459 	   return report_control_error(res, res_size, "einval");
460        }
461        ud->type = portTypeData;
462        do_recv(ud);
463        ENSURE(1);
464        **res = 0;
465        return 1;
466    case 'N':
467        if (ud->type != portTypeListener) {
468 	   return report_control_error(res, res_size, "einval");
469        }
470        ENSURE(5);
471        (*res)[0] = 0;
472        put_packet_length((*res) + 1, ud->fd);
473        return 5;
474    case 'T': /* tick */
475        if (ud->type != portTypeData) {
476 	   return report_control_error(res, res_size, "einval");
477        }
478        do_send(ud,"",0);
479        ENSURE(1);
480        **res = 0;
481        return 1;
482    case 'R':
483        if (ud->type != portTypeListener) {
484 	   return report_control_error(res, res_size, "einval");
485        }
486        ENSURE(2);
487        (*res)[0] = 0;
488        (*res)[1] = ud->creation;
489        return 2;
490    default:
491        return report_control_error(res, res_size, "einval");
492    }
493 #undef ENSURE
494 }
495 
uds_stop_select(ErlDrvEvent event,void * _)496 static void uds_stop_select(ErlDrvEvent event, void* _)
497 {
498     close((int)(long)event);
499 }
500 
501 /*
502 **
503 ** Local helpers
504 **
505 */
506 
507 /*
508 ** Command implementations
509 */
uds_command_connect(UdsData * ud,char * buff,int bufflen)510 static void uds_command_connect(UdsData *ud, char *buff, int bufflen)
511 {
512     char *str;
513     int fd;
514     struct sockaddr_un s_un;
515     int length;
516     int res;
517 
518     str = ALLOC(25);
519     sprintf(str, "erl%d", (int) getpid()); /* A temporary sufficiently
520 					      unique name */
521     do_unlink(str);
522     s_un.sun_family = AF_UNIX;
523     strcpy(s_un.sun_path, SOCKET_PATH "/");
524     strcat(s_un.sun_path, str);
525     DEBUGF(("Connect own filename: %s", s_un.sun_path));
526     length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
527     ud->name = str;
528     ud->type = portTypeCommand;
529     if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
530 	DEBUGF(("socket call failed, errno = %d"));
531 	driver_failure_posix(ud->port, errno);
532 	return;
533     }
534     ud->fd = fd;
535     if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
536 	DEBUGF(("bind call failed, errno = %d",errno));
537 	driver_failure_posix(ud->port, errno);
538 	return;
539     }
540     str = ALLOC(bufflen);
541     memcpy(str, buff + 1, bufflen - 1);
542     str[bufflen - 1] = '\0';
543     strcpy(s_un.sun_path, SOCKET_PATH "/");
544     strcat(s_un.sun_path, str);
545     length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
546     DEBUGF(("Connect peer filename: %s", s_un.sun_path));
547     SET_NONBLOCKING(fd);
548     if (connect(fd, (struct sockaddr *) &s_un, length) < 0) {
549 	if (errno != EINPROGRESS) {
550 	    driver_failure_posix(ud->port, errno);
551 	} else {
552 	    DEBUGF(("Connect pending"));
553 	    ud->type = portTypeConnector;
554 	    driver_select(ud->port, (ErlDrvEvent) ud->fd,
555 			  ERL_DRV_WRITE|ERL_DRV_USE, 1);
556 	}
557     } else {
558 	DEBUGF(("Connect done"));
559 	driver_output(ud->port, "Cok", 3);
560     }
561     FREE(str);
562 }
563 
uds_command_accept(UdsData * ud,char * buff,int bufflen)564 static void uds_command_accept(UdsData *ud, char *buff, int bufflen)
565 {
566     int listen_no;
567     UdsData *lp;
568 
569     if (bufflen < 5) {
570 	driver_failure_posix(ud->port, EINVAL);
571 	return;
572     }
573 
574     listen_no = get_packet_length(buff + 1); /* Same format as
575 						packet headers */
576     DEBUGF(("Accept listen_no = %d",listen_no));
577     for (lp = first_data; lp != NULL && lp->fd != listen_no; lp = lp->next)
578 	;
579     if (lp == NULL) {
580 	DEBUGF(("Could not find listen port"));
581 	driver_failure_posix(ud->port, EINVAL);
582 	return;
583     }
584     if (lp->partner != NULL) {
585 	DEBUGF(("Listen port busy"));
586 	driver_failure_posix(ud->port, EADDRINUSE);
587 	return;
588     }
589     lp->partner = ud;
590     ud->partner = lp;
591     ud->type = portTypeAcceptor;
592     driver_select(lp->port,(ErlDrvEvent) lp->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
593     /* Silent, answer will be sent in input routine */
594 }
595 
uds_command_listen(UdsData * ud,char * buff,int bufflen)596 static void uds_command_listen(UdsData *ud, char *buff, int bufflen)
597 {
598     char *str;
599     int fd;
600     struct sockaddr_un s_un;
601     int length;
602     int res;
603     UdsData *tmp;
604     Byte creation;
605 
606     str = ALLOC(bufflen);
607     memcpy(str, buff + 1,bufflen - 1);
608     str[bufflen - 1] = '\0';
609 
610     /*
611     ** Before trying lockfiles etc, we need to assure that our own process is
612     ** not using the filename. Advisory locks can be recursive in one process.
613     */
614     for(tmp = first_data; tmp != NULL; tmp = tmp->next) {
615 	if (tmp->name != NULL && strcmp(str, tmp->name) == 0) {
616 	    driver_failure_posix(ud->port, EADDRINUSE);
617 	    FREE(str);
618 	    return;
619 	}
620     }
621 
622     if ((fd = try_lock(str, &creation)) < 0) {
623 	driver_failure_posix(ud->port, EADDRINUSE);
624 	FREE(str);
625 	return;
626     }
627     s_un.sun_family = AF_UNIX;
628     strcpy(s_un.sun_path, SOCKET_PATH "/");
629     strcat(s_un.sun_path, str);
630     length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
631     ud->name = str;
632     ud->type = portTypeListener;
633     ud->lockfd = fd;
634     ud->creation = creation;
635     if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
636 	DEBUGF(("socket call failed, errno = %d"));
637 	driver_failure_posix(ud->port, errno);
638 	return;
639     }
640     SET_NONBLOCKING(fd);
641     ud->fd = fd;
642     do_unlink(str);
643     DEBUGF(("Listen filename: %s", s_un.sun_path));
644     if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
645 	DEBUGF(("bind call failed, errno = %d",errno));
646 	driver_failure_posix(ud->port, errno);
647 	return;
648     }
649 
650     if ((res = listen(fd, 5)) < 0) {
651 	DEBUGF(("listen call failed, errno = %d"));
652 	driver_failure_posix(ud->port, errno);
653 	return;
654     }
655     driver_output(ud->port, "Lok", 3);
656 }
657 
658 /*
659 ** Input/output/stop helpers
660 */
do_stop(UdsData * ud,int shutting_down)661 static void do_stop(UdsData *ud, int shutting_down)
662 {
663     UdsData **tmp;
664 
665     DEBUGF(("Cleaning up, type = %d, fd = %d, lockfd = %d", ud->type,
666 	    ud->fd, ud->lockfd));
667     for (tmp = &first_data; *tmp != NULL && *tmp != ud; tmp = &((*tmp)->next))
668 	;
669     ASSERT(*tmp != NULL);
670     *tmp = (*tmp)->next;
671     if (ud->buffer != NULL) {
672 	FREE(ud->buffer);
673     }
674     if (ud->fd >= 0) {
675 	driver_select(ud->port, (ErlDrvEvent) ud->fd,
676 		      ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE, 0);
677     }
678     if (ud->name) {
679 	do_unlink(ud->name);
680 	FREE(ud->name);
681     }
682     if (ud->lockfd >= 0) {
683 	ASSERT(ud->type == portTypeListener);
684 	close(ud->lockfd); /* the lock will be released */
685 	/* But leave the file there for the creation counter... */
686     }
687     if (!shutting_down) { /* Dont bother if the driver is shutting down. */
688 	if (ud->partner != NULL) {
689 	    if (ud->type == portTypeAcceptor) {
690 		UdsData *listener = ud->partner;
691 		listener->partner = NULL;
692 		driver_select(listener->port, (ErlDrvEvent) listener->fd,
693 			      ERL_DRV_READ, 0);
694 	    } else {
695 		UdsData *acceptor = ud->partner;
696 		ASSERT(ud->type == portTypeListener);
697 		acceptor->partner = NULL;
698 		driver_failure_eof(acceptor->port);
699 	    }
700 	}
701     }
702     FREE(ud);
703 }
704 
705 /*
706 ** Actually send the data
707 */
do_send(UdsData * ud,char * buff,int bufflen)708 static void do_send(UdsData *ud, char *buff, int bufflen)
709 {
710     char header[4];
711     int written;
712     SysIOVec iov[2];
713     ErlIOVec eio;
714     ErlDrvBinary *binv[] = {NULL,NULL};
715 
716     put_packet_length(header, bufflen);
717     DEBUGF(("Write packet header %u,%u,%u,%u.", (Word) header[0],
718 	   (Word) header[1], (Word) header[2],(Word) header[3]));
719     iov[0].iov_base = (char *) header;
720     iov[0].iov_len = 4;
721     iov[1].iov_base = buff;
722     iov[1].iov_len = bufflen;
723     eio.iov = iov;
724     eio.binv = binv;
725     eio.vsize = 2;
726     eio.size = bufflen + 4;
727     written = 0;
728     if (driver_sizeq(ud->port) == 0) {
729 	if ((written = writev(ud->fd, iov, 2)) == eio.size) {
730 	    ud->sent += written;
731 	    if (ud->type == portTypeCommand) {
732 		driver_output(ud->port, "Sok", 3);
733 	    }
734 	    DEBUGF(("Wrote all %d bytes immediately.",written));
735 	    return;
736 	} else if (written < 0) {
737 	    if (errno != EWOULDBLOCK) {
738 		driver_failure_eof(ud->port);
739 		return;
740 	    } else {
741 		written = 0;
742 	    }
743 	} else {
744 	    ud->sent += written;
745 	}
746 	DEBUGF(("Wrote %d bytes immediately.",written));
747 	/* Enqueue remaining */
748     }
749     driver_enqv(ud->port, &eio, written);
750     DEBUGF(("Sending output queue."));
751     send_out_queue(ud);
752 }
753 
do_recv(UdsData * ud)754 static void do_recv(UdsData *ud)
755 {
756     int res;
757     char *ibuf;
758     ASSERT_NONBLOCK(ud->fd);
759     DEBUGF(("do_recv called, type = %d", ud->type));
760     for(;;) {
761 	if ((res = buffered_read_package(ud,&ibuf)) < 0) {
762 	    if (res == NORMAL_READ_FAILURE) {
763 		DEBUGF(("do_recv normal read failed"));
764 		driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
765 	    } else {
766 		DEBUGF(("do_recv fatal read failed (%d) (%d)",errno, res));
767 		driver_failure_eof(ud->port);
768 	    }
769 	    return;
770 	}
771 	DEBUGF(("do_recv got package, port type = %d", ud->type));
772 	/* Got a package */
773 	if (ud->type == portTypeCommand) {
774 	    ibuf[-1] = 'R'; /* There is always room for a single byte opcode
775 			       before the actual buffer (where the packet
776 			       header was) */
777 	    driver_output(ud->port,ibuf - 1, res + 1);
778 	    driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
779 	    return;
780 	} else {
781 	    ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */
782 	    driver_output(ud->port,ibuf - 1, res + 1);
783 	    driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
784 	}
785     }
786 }
787 
788 
789 /*
790 ** Report control error, helper for error messages from control
791 */
report_control_error(char ** buffer,int buff_len,char * error_message)792 static int report_control_error(char **buffer, int buff_len,
793 				char *error_message)
794 {
795     int elen = strlen(error_message);
796     if (elen + 1 < buff_len) {
797 	*buffer = ALLOC(elen + 1);
798     }
799     **buffer = 1;
800     memcpy((*buffer) + 1, error_message, elen);
801     return elen + 1;
802 }
803 
804 /*
805 ** Lower level I/O helpers
806 */
send_out_queue(UdsData * ud)807 static int send_out_queue(UdsData *ud)
808 {
809     ASSERT_NONBLOCK(ud->fd);
810     for(;;) {
811 	int vlen;
812 	SysIOVec *tmp = driver_peekq(ud->port, &vlen);
813 	int wrote;
814 	if (tmp == NULL) {
815 	    DEBUGF(("Write queue empty."));
816 	    driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
817 	    if (ud->type == portTypeCommand) {
818 		driver_output(ud->port, "Sok", 3);
819 	    }
820 	    return 0;
821 	}
822 	if (vlen > IO_VECTOR_MAX) {
823 	    vlen = IO_VECTOR_MAX;
824 	}
825 	DEBUGF(("Trying to writev %d vectors", vlen));
826 #ifdef HARDDEBUG
827 	{
828 	    int i;
829 	    for (i = 0; i < vlen; ++i) {
830 		DEBUGF(("Buffer %d: length %d", i, tmp[i].iov_len));
831 	    }
832 	}
833 #endif
834 	if ((wrote = writev(ud->fd, tmp, vlen)) < 0) {
835 	    if (errno == EWOULDBLOCK) {
836 		DEBUGF(("Write failed normal."));
837 		driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
838 		return 0;
839 	    } else {
840 		DEBUGF(("Write failed fatal (%d).", errno));
841 		driver_failure_eof(ud->port);
842 		return -1;
843 	    }
844 	}
845 	driver_deq(ud->port, wrote);
846 	ud->sent += wrote;
847 	DEBUGF(("Wrote %d bytes of data.",wrote));
848     }
849 }
850 
buffered_read_package(UdsData * ud,char ** result)851 static int buffered_read_package(UdsData *ud, char **result)
852 {
853     int res;
854     int data_size;
855 
856     if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH) {
857 	/* The header is not read yet */
858 	DEBUGF(("Header not read yet"));
859 	if ((res = read_at_least(ud, ud->header_pos + HEADER_LENGTH -
860 				 ud->buffer_pos)) < 0) {
861 	    DEBUGF(("Header read failed"));
862 	    return res;
863 	}
864     }
865     DEBUGF(("Header is read"));
866     /* We have at least the header read */
867     data_size = get_packet_length((char *) ud->buffer + ud->header_pos);
868     DEBUGF(("Input packet size = %d", data_size));
869     if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH + data_size) {
870 	/* We need to read more */
871 	DEBUGF(("Need to read more (bufferpos %d, want %d)", ud->buffer_pos,
872 		ud->header_pos + HEADER_LENGTH + data_size));
873 	if ((res = read_at_least(ud,
874 				 ud->header_pos + HEADER_LENGTH +
875 				 data_size - ud->buffer_pos)) < 0) {
876 	    DEBUGF(("Data read failed"));
877 	    return res;
878 	}
879     }
880     DEBUGF(("Data is completely read"));
881     *result = (char *) ud->buffer + ud->header_pos + HEADER_LENGTH;
882     ud->header_pos += HEADER_LENGTH + data_size;
883     return data_size;
884 }
885 
read_at_least(UdsData * ud,int num)886 static int read_at_least(UdsData *ud, int num)
887 {
888     int got;
889     if (ud->buffer_pos + num > ud->buffer_size) {
890 	/* No place in the buffer, try to pack it */
891 	if (ud->header_pos > 0) {
892 	    int offset = ud->header_pos;
893 	    DEBUGF(("Packing buffer, buffer_pos was %d, buffer_size was %d "
894 		    "offset %d num %d header_pos %d.",
895 		    ud->buffer_pos, ud->buffer_size,
896 		    offset, num, ud->header_pos));
897 	    memmove(ud->buffer, ud->buffer + ud->header_pos,
898 		    ud->buffer_pos - ud->header_pos);
899 	    ud->buffer_pos -= offset;
900 	    ud->header_pos -= offset;
901 	}
902 	/* The buffer is packed, look for space again and reallocate if
903 	   needed */
904 	if (ud->buffer_pos + num > ud->buffer_size) {
905 	    /* Let's grow in chunks of 256 */
906 	    ud->buffer_size = (((ud->buffer_pos + num) /
907 				  CHUNK_SIZE) + 1) * CHUNK_SIZE;
908 	    DEBUGF(("New buffer size %d.",ud->buffer_size));
909 	    /* We will always keep one extra byte before the buffer to
910 	       allow insertion of an opcode */
911 	    if (!ud->buffer) {
912 		ud->buffer = ALLOC(ud->buffer_size);
913 	    } else {
914 		ud->buffer = REALLOC(ud->buffer, ud->buffer_size);
915 	    }
916 	}
917     }
918     /* OK, now we have a large enough buffer, try to read into it */
919     if ((got = read(ud->fd, ud->buffer + ud->buffer_pos,
920 		    ud->buffer_size - ud->buffer_pos)) < 0) {
921 	/* It failed, the question is why... */
922 	if (errno == EAGAIN) {
923 	    return NORMAL_READ_FAILURE;
924 	}
925 	return SEVERE_READ_FAILURE;
926     } else if (got == 0) {
927 	return EOF_READ_FAILURE;
928     }
929     DEBUGF(("Got %d bytes.", got));
930     ud->received += got;
931     ud->buffer_pos += got;
932    /* So, we got some bytes, but enough ? */
933     if (got < num) {
934 	return NORMAL_READ_FAILURE;
935     }
936     return 0;
937 }
938 
get_packet_length(char * b)939 static int get_packet_length(char *b)
940 {
941     Byte *u = (Byte *) b;
942     int x = (((Word) u[0]) << 24) | (((Word) u[1]) << 16) |
943 	(((Word) u[2]) << 8) | ((Word) u[3]);
944     DEBUGF(("Packet length %d.", x));
945     return x;
946 }
947 
put_packet_length(char * b,int len)948 static void put_packet_length(char *b, int len)
949 {
950     Byte *p = (Byte *) b;
951     Word n = (Word) len;
952     p[0] = (n >> 24) & 0xFF;
953     p[1] = (n >> 16) & 0xFF;
954     p[2] = (n >> 8) & 0xFF;
955     p[3] = n & 0xFF;
956 }
957 
958 /*
959 ** Malloc wrappers
960 */
my_malloc(size_t size)961 static void *my_malloc(size_t size)
962 {
963     void *ptr;
964 
965     if ((ptr = driver_alloc(size)) == NULL) {
966 	fprintf(stderr, "Could not allocate %lu bytes of memory",(unsigned long) size);
967 	abort();
968     }
969     return ptr;
970 }
971 
my_realloc(void * ptr,size_t size)972 static void *my_realloc(void *ptr, size_t size)
973 {
974     void *nptr;
975     if ((nptr = driver_realloc(ptr, size)) == NULL) {
976 	fprintf(stderr, "Could not reallocate %lu bytes of memory",(unsigned long) size);
977 	abort();
978     }
979     return nptr;
980 }
981 
982 
983 /*
984 ** Socket file handling helpers
985 */
986 
987 /*
988 ** Check that directory exists, create if not (only works for one level)
989 */
ensure_dir(char * path)990 static int ensure_dir(char *path)
991 {
992     if (mkdir(path,0777) != 0 && errno != EEXIST) {
993 	return -1;
994     }
995     return 0;
996 }
997 
998 /*
999 ** Try to open a lock file and lock the first byte write-only (advisory)
1000 ** return the file descriptor if successful, otherwise -1 (<0).
1001 */
try_lock(char * sockname,Byte * p_creation)1002 static int try_lock(char *sockname, Byte *p_creation)
1003 {
1004     char *lockname;
1005     int lockfd;
1006     struct flock fl;
1007     Byte creation;
1008 
1009     lockname = ALLOC(strlen(SOCKET_PATH)+1+strlen(sockname)+
1010 		     strlen(LOCK_SUFFIX)+1);
1011     sprintf(lockname,SOCKET_PATH "/%s" LOCK_SUFFIX, sockname);
1012     DEBUGF(("lockname = %s", lockname));
1013     if (ensure_dir(SOCKET_PATH) != 0) {
1014 	DEBUGF(("ensure_dir failed, errno = %d", errno));
1015 	FREE(lockname);
1016 	return -1;
1017     }
1018     if ((lockfd = open(lockname, O_RDWR | O_CREAT, 0666)) < 0) {
1019 	DEBUGF(("open failed, errno = %d", errno));
1020 	FREE(lockname);
1021 	return -1;
1022     }
1023     FREE(lockname);
1024     memset(&fl,0,sizeof(fl));
1025     fl.l_type = F_WRLCK;
1026     fl.l_whence = SEEK_SET;
1027     fl.l_start = 0;
1028     fl.l_len = 1;
1029     if (fcntl(lockfd, F_SETLK, &fl) < 0) {
1030 	DEBUGF(("fcntl failed, errno = %d", errno));
1031 	close(lockfd);
1032 	return -1;
1033     }
1034     /* OK, check for creation and update */
1035     if (read(lockfd, &creation, 1) < 1) {
1036 	creation = 0;
1037     } else {
1038 	creation = (creation + 1) % 4;
1039     }
1040     lseek(lockfd, 0, SEEK_SET);
1041     write(lockfd, &creation, 1);
1042     fsync(lockfd); /* This could be concidered dangerous (blocking) */
1043     *p_creation = creation;
1044     return lockfd;
1045 }
1046 
do_unlink(char * name)1047 static void do_unlink(char *name)
1048 {
1049     char buff[100];
1050     char *str = buff;
1051     int len = strlen(SOCKET_PATH) + 1 + strlen(name) + 1;
1052 
1053     if (len > 100) {
1054 	str = ALLOC(len);
1055     }
1056     sprintf(str,SOCKET_PATH "/%s",name);
1057     unlink(str);
1058     if (str != buff) {
1059 	FREE(str);
1060     }
1061 }
1062 
1063