1 /* ``Licensed under the Apache License, Version 2.0 (the "License");
2 * you may not use this file except in compliance with the License.
3 * You may obtain a copy of the License at
4 *
5 * http://www.apache.org/licenses/LICENSE-2.0
6 *
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 *
13 * The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14 * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15 * AB. All Rights Reserved.''
16 *
17 * $Id$
18 */
19
20 /*
21 * Purpose: Special purpouse Unix domain socket driver for distribution.
22 */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <fcntl.h>
34
35 #define HAVE_UIO_H
36 #include "erl_driver.h"
37
38 #define DEBUG
39 /*#define HARDDEBUG 1*/
40 /*
41 ** Some constants/macros
42 */
43
44 #ifdef HARDDEBUG
45 #define DEBUGF(P) debugf P
46 #include <stdarg.h>
debugf(char * str,...)47 static void debugf(char *str, ...)
48 {
49 va_list ap;
50 va_start(ap,str);
51 fprintf(stderr,"Uds_drv debug: ");
52 vfprintf(stderr,str, ap);
53 fprintf(stderr,"\r\n");
54 va_end(ap);
55 }
56 #ifndef DEBUG
57 #define DEBUG 1
58 #endif
59 #else
60 #define DEBUGF(P)
61 #endif
62
63
64 #ifdef DEBUG
65 #define ASSERT(X) \
66 do { \
67 if (!(X)) { \
68 fprintf(stderr,"Assertion (%s) failed at line %d file %s\r\n", #X, \
69 __LINE__, __FILE__); \
70 exit(1); \
71 } \
72 } while(0)
73 #define ASSERT_NONBLOCK(FD) ASSERT(fcntl((FD), F_GETFL, 0) & O_NONBLOCK)
74 #else
75 #define ASSERT(X)
76 #define ASSERT_NONBLOCK(FD)
77 #endif
78
79
80
81 #define SET_NONBLOCKING(FD) \
82 fcntl((FD), F_SETFL, \
83 fcntl((FD), F_GETFL, 0) | O_NONBLOCK)
84
85 #define ALLOC(X) my_malloc(X)
86 #define REALLOC(P,X) my_realloc(P,X)
87 #define FREE(X) driver_free(X)
88
89 #define CHUNK_SIZE 256
90
91 #define DIST_MAGIC_RECV_TAG 100
92
93 /*
94 ** The max length of an I/O vector seams to be impossible to find
95 ** out (?), so this is just a value known to work on solaris.
96 */
97 #define IO_VECTOR_MAX 16
98
99 #define SOCKET_PATH "/tmp/erlang"
100 #define LOCK_SUFFIX ".lock"
101
102 #define NORMAL_READ_FAILURE -1
103 #define SEVERE_READ_FAILURE -2
104 #define EOF_READ_FAILURE -3
105
106 /*
107 ** Internal structures
108 */
109
110 #define HEADER_LENGTH 4
111
112 typedef enum {
113 portTypeUnknown, /* An uninitialized port */
114 portTypeListener, /* A listening port/socket */
115 portTypeAcceptor, /* An intermediate stage when accepting
116 on a listen port */
117 portTypeConnector, /* An intermediate stage when connecting */
118 portTypeCommand, /* A connected open port in command mode */
119 portTypeIntermediate, /* A connected open port in special half
120 active mode */
121 portTypeData /* A connectec open port in data mode */
122 } PortType;
123
124 typedef unsigned char Byte;
125 typedef unsigned int Word;
126
127 typedef struct uds_data {
128 int fd; /* File descriptor */
129 ErlDrvPort port; /* The port identifier */
130 int lockfd; /* The file descriptor for a lock file in
131 case of listen sockets */
132 Byte creation; /* The creation serial derived from the
133 lockfile */
134 PortType type; /* Type of port */
135 char *name; /* Short name of socket for unlink */
136 Word sent; /* Messages sent */
137 Word received; /* Messages received */
138 struct uds_data *partner; /* The partner in an accept/listen pair */
139 struct uds_data *next; /* Next structure in list */
140
141 /* The input buffer and it's data */
142 int buffer_size; /* The allocated size of the input buffer */
143 int buffer_pos; /* Current position in input buffer */
144 int header_pos; /* Where the current header is in the
145 input buffer */
146 Byte *buffer; /* The actual input buffer */
147 } UdsData;
148
149 /*
150 ** Interface routines
151 */
152 static ErlDrvData uds_start(ErlDrvPort port, char *buff);
153 static void uds_stop(ErlDrvData handle);
154 static void uds_command(ErlDrvData handle, char *buff, int bufflen);
155 static void uds_input(ErlDrvData handle, ErlDrvEvent event);
156 static void uds_output(ErlDrvData handle, ErlDrvEvent event);
157 static void uds_finish(void);
158 static int uds_control(ErlDrvData handle, unsigned int command,
159 char* buf, int count, char** res, int res_size);
160 static void uds_stop_select(ErlDrvEvent event, void*);
161
162 /*
163 ** Local helpers forward declarations
164 */
165
166 static void uds_command_listen(UdsData *ud, char *buff, int bufflen);
167 static void uds_command_accept(UdsData *ud, char *buff, int bufflen);
168 static void uds_command_connect(UdsData *ud, char *buff, int bufflen);
169
170 static void do_stop(UdsData *ud, int shutting_down);
171 static void do_send(UdsData *ud, char *buff, int bufflen);
172 static void do_recv(UdsData *ud);
173
174 static int report_control_error(char **buffer, int buff_len,
175 char *error_message);
176 static int send_out_queue(UdsData *ud);
177 static int buffered_read_package(UdsData *ud, char **result);
178 static int read_at_least(UdsData *ud, int num);
179 static int get_packet_length(char *b);
180 static void put_packet_length(char *b, int len);
181 static void *my_malloc(size_t size);
182 static void *my_realloc(void *optr, size_t size);
183 static int try_lock(char *sockname, Byte *p_creation);
184 static int ensure_dir(char *path);
185 static void do_unlink(char *name);
186
187 /*
188 ** Global data
189 */
190
191 /* The driver entry */
192 ErlDrvEntry uds_driver_entry = {
193 NULL, /* init, N/A */
194 uds_start, /* start, called when port is opened */
195 uds_stop, /* stop, called when port is closed */
196 uds_command, /* output, called when erlang has sent */
197 uds_input, /* ready_input, called when input descriptor
198 ready */
199 uds_output, /* ready_output, called when output
200 descriptor ready */
201 "uds_drv", /* char *driver_name, the argument to open_port */
202 uds_finish, /* finish, called when unloaded */
203 NULL, /* void * that is not used (BC) */
204 uds_control, /* control, port_control callback */
205 NULL, /* timeout, called on timeouts */
206 NULL, /* outputv, vector output interface */
207 NULL, /* ready_async */
208 NULL, /* flush */
209 NULL, /* call */
210 NULL, /* event */
211 ERL_DRV_EXTENDED_MARKER,
212 ERL_DRV_EXTENDED_MAJOR_VERSION,
213 ERL_DRV_EXTENDED_MINOR_VERSION,
214 0, /* ERL_DRV_FLAGs */
215 NULL,
216 NULL, /* process_exit */
217 uds_stop_select
218 };
219
220 /* Beginning of linked list of ports */
221 static UdsData *first_data;
222
223 /*
224 **
225 ** Driver interface routines
226 **
227 */
228
229 /*
230 ** Driver initialization routine
231 */
DRIVER_INIT(uds_drv)232 DRIVER_INIT(uds_drv)
233 {
234 first_data = NULL;
235 return &uds_driver_entry;
236 }
237
238 /*
239 ** A port is opened, we need no information whatsoever about the socket
240 ** at this stage.
241 */
uds_start(ErlDrvPort port,char * buff)242 static ErlDrvData uds_start(ErlDrvPort port, char *buff)
243 {
244 UdsData *ud;
245
246 ud = ALLOC(sizeof(UdsData));
247 ud->fd = -1;
248 ud->lockfd = -1;
249 ud->creation = 0;
250 ud->port = port;
251 ud->type = portTypeUnknown;
252 ud->name = NULL;
253 ud->buffer_size = 0;
254 ud->buffer_pos = 0;
255 ud->header_pos = 0;
256 ud->buffer = NULL;
257 ud->sent = 0;
258 ud->received = 0;
259 ud->partner = NULL;
260 ud->next = first_data;
261 first_data = ud;
262
263 return((ErlDrvData) ud);
264 }
265
266 /*
267 ** Close the socket/port and free up
268 */
uds_stop(ErlDrvData handle)269 static void uds_stop(ErlDrvData handle)
270 {
271 do_stop((UdsData *) handle, 0);
272 }
273
274 /*
275 ** Command interface, operates in two modes, Command mode and data mode.
276 ** Mode is shifted with the port_control function.
277 ** Command mode protocol:
278 ** 'L'<socketname>: Lock and listen on socket.
279 ** 'A'<listennumber as 32 bit bigendian>: Accept from the port referenced by the
280 ** "listennumber"
281 ** 'C'<socketname>: Connect to the socket named <socketname>
282 ** 'S'<data>: Send the data <data>
283 ** 'R': Receive one packet of data
284 ** Data mode protocol:
285 ** Send anything that arrives (no opcodes/skip opcodes).
286 */
287
uds_command(ErlDrvData handle,char * buff,int bufflen)288 static void uds_command(ErlDrvData handle, char *buff, int bufflen)
289 {
290 UdsData *ud = (UdsData *) handle;
291
292 if (ud->type == portTypeData || ud->type == portTypeIntermediate) {
293 DEBUGF(("Passive do_send %d",bufflen));
294 do_send(ud, buff + 1, bufflen - 1); /* XXX */
295 return;
296 }
297 if (bufflen == 0) {
298 return;
299 }
300 switch (*buff) {
301 case 'L':
302 if (ud->type != portTypeUnknown) {
303 driver_failure_posix(ud->port, ENOTSUP);
304 return;
305 }
306 uds_command_listen(ud,buff,bufflen);
307 return;
308 case 'A':
309 if (ud->type != portTypeUnknown) {
310 driver_failure_posix(ud->port, ENOTSUP);
311 return;
312 }
313 uds_command_accept(ud,buff,bufflen);
314 return;
315 case 'C':
316 if (ud->type != portTypeUnknown) {
317 driver_failure_posix(ud->port, ENOTSUP);
318 return;
319 }
320 uds_command_connect(ud,buff,bufflen);
321 return;
322 case 'S':
323 if (ud->type != portTypeCommand) {
324 driver_failure_posix(ud->port, ENOTSUP);
325 return;
326 }
327 do_send(ud, buff + 1, bufflen - 1);
328 return;
329 case 'R':
330 if (ud->type != portTypeCommand) {
331 driver_failure_posix(ud->port, ENOTSUP);
332 return;
333 }
334 do_recv(ud);
335 return;
336 default:
337 ASSERT(0);
338 return;
339 }
340 }
341
uds_input(ErlDrvData handle,ErlDrvEvent event)342 static void uds_input(ErlDrvData handle, ErlDrvEvent event)
343 {
344 UdsData *ud = (UdsData *) handle;
345
346 DEBUGF(("In uds_input type = %d",ud->type));
347 if (ud->type == portTypeListener) {
348 UdsData *ad = ud->partner;
349 struct sockaddr_un peer;
350 int pl = sizeof(struct sockaddr_un);
351 int fd;
352
353 ASSERT(ad != NULL);
354 if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) {
355 if (errno != EWOULDBLOCK) {
356 DEBUGF(("Accept failed."));
357 driver_failure_posix(ud->port, errno);
358 return;
359 }
360 DEBUGF(("Accept would block."));
361 return;
362 }
363 SET_NONBLOCKING(fd);
364 ad->fd = fd;
365 ad->partner = NULL;
366 ad->type = portTypeCommand;
367 ud->partner = NULL;
368 DEBUGF(("Accept successful."));
369 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
370 driver_output(ad->port, "Aok",3);
371 return;
372 }
373 /* OK, normal data or command port */
374 ASSERT(ud->type >= portTypeCommand);
375 #ifdef HARDDEBUG
376 if (ud->type == portTypeData)
377 DEBUGF(("Passive do_recv"));
378 #endif
379 do_recv(ud);
380 }
381
uds_output(ErlDrvData handle,ErlDrvEvent event)382 static void uds_output(ErlDrvData handle, ErlDrvEvent event)
383 {
384 UdsData *ud = (UdsData *) handle;
385 if (ud->type == portTypeConnector) {
386 ud->type = portTypeCommand;
387 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
388 driver_output(ud->port, "Cok",3);
389 return;
390 }
391 ASSERT(ud->type == portTypeCommand || ud->type == portTypeData);
392 send_out_queue(ud);
393 }
394
uds_finish(void)395 static void uds_finish(void)
396 {
397 while (first_data != NULL) {
398 do_stop(first_data, 1);
399 }
400 }
401
402 /*
403 ** Protocol to control:
404 ** 'C': Set port in command mode.
405 ** 'I': Set port in intermediate mode
406 ** 'D': Set port in data mode
407 ** 'N': Get identification number for listen port
408 ** 'S': Get statistics
409 ** 'T': Send a tick message
410 ** 'R': Get creation number of listen socket
411 ** Answer is one byte status (0 == ok, Other is followed by error as string)
412 ** followed by data if applicable
413 */
uds_control(ErlDrvData handle,unsigned int command,char * buf,int count,char ** res,int res_size)414 static int uds_control(ErlDrvData handle, unsigned int command,
415 char* buf, int count, char** res, int res_size)
416 {
417 /* Local macro to ensure large enough buffer. */
418 #define ENSURE(N) \
419 do { \
420 if (res_size < N) { \
421 *res = ALLOC(N); \
422 } \
423 } while(0)
424
425 UdsData *ud = (UdsData *) handle;
426
427 DEBUGF(("Control, type = %d, fd = %d, command = %c", ud->type, ud->fd,
428 (char) command));
429 switch (command) {
430 case 'S':
431 {
432 ENSURE(13);
433 **res = 0;
434 put_packet_length((*res) + 1, ud->received);
435 put_packet_length((*res) + 5, ud->sent);
436 put_packet_length((*res) + 9, driver_sizeq(ud->port));
437 return 13;
438 }
439 case 'C':
440 if (ud->type < portTypeCommand) {
441 return report_control_error(res, res_size, "einval");
442 }
443 ud->type = portTypeCommand;
444 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
445 ENSURE(1);
446 **res = 0;
447 return 1;
448 case 'I':
449 if (ud->type < portTypeCommand) {
450 return report_control_error(res, res_size, "einval");
451 }
452 ud->type = portTypeIntermediate;
453 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
454 ENSURE(1);
455 **res = 0;
456 return 1;
457 case 'D':
458 if (ud->type < portTypeCommand) {
459 return report_control_error(res, res_size, "einval");
460 }
461 ud->type = portTypeData;
462 do_recv(ud);
463 ENSURE(1);
464 **res = 0;
465 return 1;
466 case 'N':
467 if (ud->type != portTypeListener) {
468 return report_control_error(res, res_size, "einval");
469 }
470 ENSURE(5);
471 (*res)[0] = 0;
472 put_packet_length((*res) + 1, ud->fd);
473 return 5;
474 case 'T': /* tick */
475 if (ud->type != portTypeData) {
476 return report_control_error(res, res_size, "einval");
477 }
478 do_send(ud,"",0);
479 ENSURE(1);
480 **res = 0;
481 return 1;
482 case 'R':
483 if (ud->type != portTypeListener) {
484 return report_control_error(res, res_size, "einval");
485 }
486 ENSURE(2);
487 (*res)[0] = 0;
488 (*res)[1] = ud->creation;
489 return 2;
490 default:
491 return report_control_error(res, res_size, "einval");
492 }
493 #undef ENSURE
494 }
495
uds_stop_select(ErlDrvEvent event,void * _)496 static void uds_stop_select(ErlDrvEvent event, void* _)
497 {
498 close((int)(long)event);
499 }
500
501 /*
502 **
503 ** Local helpers
504 **
505 */
506
507 /*
508 ** Command implementations
509 */
uds_command_connect(UdsData * ud,char * buff,int bufflen)510 static void uds_command_connect(UdsData *ud, char *buff, int bufflen)
511 {
512 char *str;
513 int fd;
514 struct sockaddr_un s_un;
515 int length;
516 int res;
517
518 str = ALLOC(25);
519 sprintf(str, "erl%d", (int) getpid()); /* A temporary sufficiently
520 unique name */
521 do_unlink(str);
522 s_un.sun_family = AF_UNIX;
523 strcpy(s_un.sun_path, SOCKET_PATH "/");
524 strcat(s_un.sun_path, str);
525 DEBUGF(("Connect own filename: %s", s_un.sun_path));
526 length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
527 ud->name = str;
528 ud->type = portTypeCommand;
529 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
530 DEBUGF(("socket call failed, errno = %d"));
531 driver_failure_posix(ud->port, errno);
532 return;
533 }
534 ud->fd = fd;
535 if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
536 DEBUGF(("bind call failed, errno = %d",errno));
537 driver_failure_posix(ud->port, errno);
538 return;
539 }
540 str = ALLOC(bufflen);
541 memcpy(str, buff + 1, bufflen - 1);
542 str[bufflen - 1] = '\0';
543 strcpy(s_un.sun_path, SOCKET_PATH "/");
544 strcat(s_un.sun_path, str);
545 length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
546 DEBUGF(("Connect peer filename: %s", s_un.sun_path));
547 SET_NONBLOCKING(fd);
548 if (connect(fd, (struct sockaddr *) &s_un, length) < 0) {
549 if (errno != EINPROGRESS) {
550 driver_failure_posix(ud->port, errno);
551 } else {
552 DEBUGF(("Connect pending"));
553 ud->type = portTypeConnector;
554 driver_select(ud->port, (ErlDrvEvent) ud->fd,
555 ERL_DRV_WRITE|ERL_DRV_USE, 1);
556 }
557 } else {
558 DEBUGF(("Connect done"));
559 driver_output(ud->port, "Cok", 3);
560 }
561 FREE(str);
562 }
563
uds_command_accept(UdsData * ud,char * buff,int bufflen)564 static void uds_command_accept(UdsData *ud, char *buff, int bufflen)
565 {
566 int listen_no;
567 UdsData *lp;
568
569 if (bufflen < 5) {
570 driver_failure_posix(ud->port, EINVAL);
571 return;
572 }
573
574 listen_no = get_packet_length(buff + 1); /* Same format as
575 packet headers */
576 DEBUGF(("Accept listen_no = %d",listen_no));
577 for (lp = first_data; lp != NULL && lp->fd != listen_no; lp = lp->next)
578 ;
579 if (lp == NULL) {
580 DEBUGF(("Could not find listen port"));
581 driver_failure_posix(ud->port, EINVAL);
582 return;
583 }
584 if (lp->partner != NULL) {
585 DEBUGF(("Listen port busy"));
586 driver_failure_posix(ud->port, EADDRINUSE);
587 return;
588 }
589 lp->partner = ud;
590 ud->partner = lp;
591 ud->type = portTypeAcceptor;
592 driver_select(lp->port,(ErlDrvEvent) lp->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
593 /* Silent, answer will be sent in input routine */
594 }
595
uds_command_listen(UdsData * ud,char * buff,int bufflen)596 static void uds_command_listen(UdsData *ud, char *buff, int bufflen)
597 {
598 char *str;
599 int fd;
600 struct sockaddr_un s_un;
601 int length;
602 int res;
603 UdsData *tmp;
604 Byte creation;
605
606 str = ALLOC(bufflen);
607 memcpy(str, buff + 1,bufflen - 1);
608 str[bufflen - 1] = '\0';
609
610 /*
611 ** Before trying lockfiles etc, we need to assure that our own process is
612 ** not using the filename. Advisory locks can be recursive in one process.
613 */
614 for(tmp = first_data; tmp != NULL; tmp = tmp->next) {
615 if (tmp->name != NULL && strcmp(str, tmp->name) == 0) {
616 driver_failure_posix(ud->port, EADDRINUSE);
617 FREE(str);
618 return;
619 }
620 }
621
622 if ((fd = try_lock(str, &creation)) < 0) {
623 driver_failure_posix(ud->port, EADDRINUSE);
624 FREE(str);
625 return;
626 }
627 s_un.sun_family = AF_UNIX;
628 strcpy(s_un.sun_path, SOCKET_PATH "/");
629 strcat(s_un.sun_path, str);
630 length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
631 ud->name = str;
632 ud->type = portTypeListener;
633 ud->lockfd = fd;
634 ud->creation = creation;
635 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
636 DEBUGF(("socket call failed, errno = %d"));
637 driver_failure_posix(ud->port, errno);
638 return;
639 }
640 SET_NONBLOCKING(fd);
641 ud->fd = fd;
642 do_unlink(str);
643 DEBUGF(("Listen filename: %s", s_un.sun_path));
644 if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
645 DEBUGF(("bind call failed, errno = %d",errno));
646 driver_failure_posix(ud->port, errno);
647 return;
648 }
649
650 if ((res = listen(fd, 5)) < 0) {
651 DEBUGF(("listen call failed, errno = %d"));
652 driver_failure_posix(ud->port, errno);
653 return;
654 }
655 driver_output(ud->port, "Lok", 3);
656 }
657
658 /*
659 ** Input/output/stop helpers
660 */
do_stop(UdsData * ud,int shutting_down)661 static void do_stop(UdsData *ud, int shutting_down)
662 {
663 UdsData **tmp;
664
665 DEBUGF(("Cleaning up, type = %d, fd = %d, lockfd = %d", ud->type,
666 ud->fd, ud->lockfd));
667 for (tmp = &first_data; *tmp != NULL && *tmp != ud; tmp = &((*tmp)->next))
668 ;
669 ASSERT(*tmp != NULL);
670 *tmp = (*tmp)->next;
671 if (ud->buffer != NULL) {
672 FREE(ud->buffer);
673 }
674 if (ud->fd >= 0) {
675 driver_select(ud->port, (ErlDrvEvent) ud->fd,
676 ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE, 0);
677 }
678 if (ud->name) {
679 do_unlink(ud->name);
680 FREE(ud->name);
681 }
682 if (ud->lockfd >= 0) {
683 ASSERT(ud->type == portTypeListener);
684 close(ud->lockfd); /* the lock will be released */
685 /* But leave the file there for the creation counter... */
686 }
687 if (!shutting_down) { /* Dont bother if the driver is shutting down. */
688 if (ud->partner != NULL) {
689 if (ud->type == portTypeAcceptor) {
690 UdsData *listener = ud->partner;
691 listener->partner = NULL;
692 driver_select(listener->port, (ErlDrvEvent) listener->fd,
693 ERL_DRV_READ, 0);
694 } else {
695 UdsData *acceptor = ud->partner;
696 ASSERT(ud->type == portTypeListener);
697 acceptor->partner = NULL;
698 driver_failure_eof(acceptor->port);
699 }
700 }
701 }
702 FREE(ud);
703 }
704
705 /*
706 ** Actually send the data
707 */
do_send(UdsData * ud,char * buff,int bufflen)708 static void do_send(UdsData *ud, char *buff, int bufflen)
709 {
710 char header[4];
711 int written;
712 SysIOVec iov[2];
713 ErlIOVec eio;
714 ErlDrvBinary *binv[] = {NULL,NULL};
715
716 put_packet_length(header, bufflen);
717 DEBUGF(("Write packet header %u,%u,%u,%u.", (Word) header[0],
718 (Word) header[1], (Word) header[2],(Word) header[3]));
719 iov[0].iov_base = (char *) header;
720 iov[0].iov_len = 4;
721 iov[1].iov_base = buff;
722 iov[1].iov_len = bufflen;
723 eio.iov = iov;
724 eio.binv = binv;
725 eio.vsize = 2;
726 eio.size = bufflen + 4;
727 written = 0;
728 if (driver_sizeq(ud->port) == 0) {
729 if ((written = writev(ud->fd, iov, 2)) == eio.size) {
730 ud->sent += written;
731 if (ud->type == portTypeCommand) {
732 driver_output(ud->port, "Sok", 3);
733 }
734 DEBUGF(("Wrote all %d bytes immediately.",written));
735 return;
736 } else if (written < 0) {
737 if (errno != EWOULDBLOCK) {
738 driver_failure_eof(ud->port);
739 return;
740 } else {
741 written = 0;
742 }
743 } else {
744 ud->sent += written;
745 }
746 DEBUGF(("Wrote %d bytes immediately.",written));
747 /* Enqueue remaining */
748 }
749 driver_enqv(ud->port, &eio, written);
750 DEBUGF(("Sending output queue."));
751 send_out_queue(ud);
752 }
753
do_recv(UdsData * ud)754 static void do_recv(UdsData *ud)
755 {
756 int res;
757 char *ibuf;
758 ASSERT_NONBLOCK(ud->fd);
759 DEBUGF(("do_recv called, type = %d", ud->type));
760 for(;;) {
761 if ((res = buffered_read_package(ud,&ibuf)) < 0) {
762 if (res == NORMAL_READ_FAILURE) {
763 DEBUGF(("do_recv normal read failed"));
764 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
765 } else {
766 DEBUGF(("do_recv fatal read failed (%d) (%d)",errno, res));
767 driver_failure_eof(ud->port);
768 }
769 return;
770 }
771 DEBUGF(("do_recv got package, port type = %d", ud->type));
772 /* Got a package */
773 if (ud->type == portTypeCommand) {
774 ibuf[-1] = 'R'; /* There is always room for a single byte opcode
775 before the actual buffer (where the packet
776 header was) */
777 driver_output(ud->port,ibuf - 1, res + 1);
778 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
779 return;
780 } else {
781 ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */
782 driver_output(ud->port,ibuf - 1, res + 1);
783 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
784 }
785 }
786 }
787
788
789 /*
790 ** Report control error, helper for error messages from control
791 */
report_control_error(char ** buffer,int buff_len,char * error_message)792 static int report_control_error(char **buffer, int buff_len,
793 char *error_message)
794 {
795 int elen = strlen(error_message);
796 if (elen + 1 < buff_len) {
797 *buffer = ALLOC(elen + 1);
798 }
799 **buffer = 1;
800 memcpy((*buffer) + 1, error_message, elen);
801 return elen + 1;
802 }
803
804 /*
805 ** Lower level I/O helpers
806 */
send_out_queue(UdsData * ud)807 static int send_out_queue(UdsData *ud)
808 {
809 ASSERT_NONBLOCK(ud->fd);
810 for(;;) {
811 int vlen;
812 SysIOVec *tmp = driver_peekq(ud->port, &vlen);
813 int wrote;
814 if (tmp == NULL) {
815 DEBUGF(("Write queue empty."));
816 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
817 if (ud->type == portTypeCommand) {
818 driver_output(ud->port, "Sok", 3);
819 }
820 return 0;
821 }
822 if (vlen > IO_VECTOR_MAX) {
823 vlen = IO_VECTOR_MAX;
824 }
825 DEBUGF(("Trying to writev %d vectors", vlen));
826 #ifdef HARDDEBUG
827 {
828 int i;
829 for (i = 0; i < vlen; ++i) {
830 DEBUGF(("Buffer %d: length %d", i, tmp[i].iov_len));
831 }
832 }
833 #endif
834 if ((wrote = writev(ud->fd, tmp, vlen)) < 0) {
835 if (errno == EWOULDBLOCK) {
836 DEBUGF(("Write failed normal."));
837 driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
838 return 0;
839 } else {
840 DEBUGF(("Write failed fatal (%d).", errno));
841 driver_failure_eof(ud->port);
842 return -1;
843 }
844 }
845 driver_deq(ud->port, wrote);
846 ud->sent += wrote;
847 DEBUGF(("Wrote %d bytes of data.",wrote));
848 }
849 }
850
buffered_read_package(UdsData * ud,char ** result)851 static int buffered_read_package(UdsData *ud, char **result)
852 {
853 int res;
854 int data_size;
855
856 if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH) {
857 /* The header is not read yet */
858 DEBUGF(("Header not read yet"));
859 if ((res = read_at_least(ud, ud->header_pos + HEADER_LENGTH -
860 ud->buffer_pos)) < 0) {
861 DEBUGF(("Header read failed"));
862 return res;
863 }
864 }
865 DEBUGF(("Header is read"));
866 /* We have at least the header read */
867 data_size = get_packet_length((char *) ud->buffer + ud->header_pos);
868 DEBUGF(("Input packet size = %d", data_size));
869 if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH + data_size) {
870 /* We need to read more */
871 DEBUGF(("Need to read more (bufferpos %d, want %d)", ud->buffer_pos,
872 ud->header_pos + HEADER_LENGTH + data_size));
873 if ((res = read_at_least(ud,
874 ud->header_pos + HEADER_LENGTH +
875 data_size - ud->buffer_pos)) < 0) {
876 DEBUGF(("Data read failed"));
877 return res;
878 }
879 }
880 DEBUGF(("Data is completely read"));
881 *result = (char *) ud->buffer + ud->header_pos + HEADER_LENGTH;
882 ud->header_pos += HEADER_LENGTH + data_size;
883 return data_size;
884 }
885
read_at_least(UdsData * ud,int num)886 static int read_at_least(UdsData *ud, int num)
887 {
888 int got;
889 if (ud->buffer_pos + num > ud->buffer_size) {
890 /* No place in the buffer, try to pack it */
891 if (ud->header_pos > 0) {
892 int offset = ud->header_pos;
893 DEBUGF(("Packing buffer, buffer_pos was %d, buffer_size was %d "
894 "offset %d num %d header_pos %d.",
895 ud->buffer_pos, ud->buffer_size,
896 offset, num, ud->header_pos));
897 memmove(ud->buffer, ud->buffer + ud->header_pos,
898 ud->buffer_pos - ud->header_pos);
899 ud->buffer_pos -= offset;
900 ud->header_pos -= offset;
901 }
902 /* The buffer is packed, look for space again and reallocate if
903 needed */
904 if (ud->buffer_pos + num > ud->buffer_size) {
905 /* Let's grow in chunks of 256 */
906 ud->buffer_size = (((ud->buffer_pos + num) /
907 CHUNK_SIZE) + 1) * CHUNK_SIZE;
908 DEBUGF(("New buffer size %d.",ud->buffer_size));
909 /* We will always keep one extra byte before the buffer to
910 allow insertion of an opcode */
911 if (!ud->buffer) {
912 ud->buffer = ALLOC(ud->buffer_size);
913 } else {
914 ud->buffer = REALLOC(ud->buffer, ud->buffer_size);
915 }
916 }
917 }
918 /* OK, now we have a large enough buffer, try to read into it */
919 if ((got = read(ud->fd, ud->buffer + ud->buffer_pos,
920 ud->buffer_size - ud->buffer_pos)) < 0) {
921 /* It failed, the question is why... */
922 if (errno == EAGAIN) {
923 return NORMAL_READ_FAILURE;
924 }
925 return SEVERE_READ_FAILURE;
926 } else if (got == 0) {
927 return EOF_READ_FAILURE;
928 }
929 DEBUGF(("Got %d bytes.", got));
930 ud->received += got;
931 ud->buffer_pos += got;
932 /* So, we got some bytes, but enough ? */
933 if (got < num) {
934 return NORMAL_READ_FAILURE;
935 }
936 return 0;
937 }
938
get_packet_length(char * b)939 static int get_packet_length(char *b)
940 {
941 Byte *u = (Byte *) b;
942 int x = (((Word) u[0]) << 24) | (((Word) u[1]) << 16) |
943 (((Word) u[2]) << 8) | ((Word) u[3]);
944 DEBUGF(("Packet length %d.", x));
945 return x;
946 }
947
put_packet_length(char * b,int len)948 static void put_packet_length(char *b, int len)
949 {
950 Byte *p = (Byte *) b;
951 Word n = (Word) len;
952 p[0] = (n >> 24) & 0xFF;
953 p[1] = (n >> 16) & 0xFF;
954 p[2] = (n >> 8) & 0xFF;
955 p[3] = n & 0xFF;
956 }
957
958 /*
959 ** Malloc wrappers
960 */
my_malloc(size_t size)961 static void *my_malloc(size_t size)
962 {
963 void *ptr;
964
965 if ((ptr = driver_alloc(size)) == NULL) {
966 fprintf(stderr, "Could not allocate %lu bytes of memory",(unsigned long) size);
967 abort();
968 }
969 return ptr;
970 }
971
my_realloc(void * ptr,size_t size)972 static void *my_realloc(void *ptr, size_t size)
973 {
974 void *nptr;
975 if ((nptr = driver_realloc(ptr, size)) == NULL) {
976 fprintf(stderr, "Could not reallocate %lu bytes of memory",(unsigned long) size);
977 abort();
978 }
979 return nptr;
980 }
981
982
983 /*
984 ** Socket file handling helpers
985 */
986
987 /*
988 ** Check that directory exists, create if not (only works for one level)
989 */
ensure_dir(char * path)990 static int ensure_dir(char *path)
991 {
992 if (mkdir(path,0777) != 0 && errno != EEXIST) {
993 return -1;
994 }
995 return 0;
996 }
997
998 /*
999 ** Try to open a lock file and lock the first byte write-only (advisory)
1000 ** return the file descriptor if successful, otherwise -1 (<0).
1001 */
try_lock(char * sockname,Byte * p_creation)1002 static int try_lock(char *sockname, Byte *p_creation)
1003 {
1004 char *lockname;
1005 int lockfd;
1006 struct flock fl;
1007 Byte creation;
1008
1009 lockname = ALLOC(strlen(SOCKET_PATH)+1+strlen(sockname)+
1010 strlen(LOCK_SUFFIX)+1);
1011 sprintf(lockname,SOCKET_PATH "/%s" LOCK_SUFFIX, sockname);
1012 DEBUGF(("lockname = %s", lockname));
1013 if (ensure_dir(SOCKET_PATH) != 0) {
1014 DEBUGF(("ensure_dir failed, errno = %d", errno));
1015 FREE(lockname);
1016 return -1;
1017 }
1018 if ((lockfd = open(lockname, O_RDWR | O_CREAT, 0666)) < 0) {
1019 DEBUGF(("open failed, errno = %d", errno));
1020 FREE(lockname);
1021 return -1;
1022 }
1023 FREE(lockname);
1024 memset(&fl,0,sizeof(fl));
1025 fl.l_type = F_WRLCK;
1026 fl.l_whence = SEEK_SET;
1027 fl.l_start = 0;
1028 fl.l_len = 1;
1029 if (fcntl(lockfd, F_SETLK, &fl) < 0) {
1030 DEBUGF(("fcntl failed, errno = %d", errno));
1031 close(lockfd);
1032 return -1;
1033 }
1034 /* OK, check for creation and update */
1035 if (read(lockfd, &creation, 1) < 1) {
1036 creation = 0;
1037 } else {
1038 creation = (creation + 1) % 4;
1039 }
1040 lseek(lockfd, 0, SEEK_SET);
1041 write(lockfd, &creation, 1);
1042 fsync(lockfd); /* This could be concidered dangerous (blocking) */
1043 *p_creation = creation;
1044 return lockfd;
1045 }
1046
do_unlink(char * name)1047 static void do_unlink(char *name)
1048 {
1049 char buff[100];
1050 char *str = buff;
1051 int len = strlen(SOCKET_PATH) + 1 + strlen(name) + 1;
1052
1053 if (len > 100) {
1054 str = ALLOC(len);
1055 }
1056 sprintf(str,SOCKET_PATH "/%s",name);
1057 unlink(str);
1058 if (str != buff) {
1059 FREE(str);
1060 }
1061 }
1062
1063