1 //-< UNISOCK.CPP >---------------------------------------------------*--------*
2 // FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Main Memory Database Management System)                          *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:      8-Feb-97    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 18-May-97    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Unix sockets
9 //-------------------------------------------------------------------*--------*
10 
11 #include "unisock.h"
12 #undef BYTE_ORDER
13 
14 #ifdef VXWORKS
15 #include "fastdbShim.h"
16 #else
17 #include <sys/ioctl.h>
18 #include <fcntl.h>
19 #include <sys/time.h>
20 #include <sys/errno.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/utsname.h>
24 #ifndef HPUX11
25 #include <sys/select.h>
26 #endif
27 #include <netinet/in.h>
28 #include <netinet/tcp.h>
29 #include <arpa/inet.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #if defined(_AIX)
35 #include <strings.h>
36 #endif  /* _AIX */
37 #include <stddef.h>
38 #include <assert.h>
39 #include <errno.h>
40 
41 extern "C" {
42 #include <netdb.h>
43 }
44 
45 #include <signal.h>
46 #endif // VXWORKS
47 
48 BEGIN_FASTDB_NAMESPACE
49 
50 const int MAX_HOST_NAME = 256;
51 const int GETHOSTBYNAME_BUF_SIZE = 1024;
52 
53 #ifdef VXWORKS
54 char* unix_socket::unix_socket_dir = "/comp/socket";
55 #else
56 char* unix_socket::unix_socket_dir = "/tmp/";
57 #endif // VXWORKS
58 
59 class unix_socket_library {
60   public:
unix_socket_library()61     unix_socket_library() {
62         static struct sigaction sigpipe_ignore;
63         sigpipe_ignore.sa_handler = SIG_IGN;
64         sigaction(SIGPIPE, &sigpipe_ignore, NULL);
65     }
66 };
67 
68 static unix_socket_library unisock_lib;
69 
open(int listen_queue_size)70 bool unix_socket::open(int listen_queue_size)
71 {
72     char hostname[MAX_HOST_NAME];
73     unsigned short port;
74     char* p;
75 #ifdef VXWORKS
76     int proto = SOCK_STREAM;
77 #endif // VXWORKS
78 
79     assert(address != NULL);
80 
81     if ((p = strchr(address, ':')) == NULL
82         || unsigned(p - address) >= sizeof(hostname)
83         || sscanf(p+1, "%hu", &port) != 1)
84     {
85         TRACE_IMSG(("Invalid address: %s\n", address));
86         errcode = bad_address;
87         return false;
88     }
89     memcpy(hostname, address, p - address);
90     hostname[p - address] = '\0';
91 
92     create_file = false;
93     union {
94         sockaddr    sock;
95         sockaddr_in sock_inet;
96 #ifdef VXWORKS
97         struct sockaddr_un usock;
98 #endif
99         char        name[MAX_HOST_NAME];
100     } u;
101     int len;
102 
103     if (domain == sock_local_domain) {
104 #ifdef VXWORKS
105         memset(&u.usock, 0, sizeof(struct sockaddr_un));
106         u.usock.sun_family = AF_UNIX;
107         proto = SOCK_SEQPACKET;
108         u.usock.sun_len = len = sizeof (struct sockaddr_un);
109         sprintf(u.usock.sun_path, "%s/0x%x", unix_socket_dir, port);
110         TRACE_IMSG(("Sock %s %d\n", u.usock.sun_path,u.usock.sun_len));
111         unlink(u.usock.sun_path); // remove file if existed
112         create_file = true;
113 #else
114         u.sock.sa_family = AF_UNIX;
115 
116         assert(strlen(unix_socket_dir) + strlen(address)
117                < MAX_HOST_NAME - offsetof(sockaddr,sa_data));
118 
119         len = offsetof(sockaddr,sa_data) +
120             sprintf(u.sock.sa_data, "%s%s.%u", unix_socket_dir, hostname, port);
121 
122         unlink(u.sock.sa_data); // remove file if existed
123         create_file = true;
124 #endif // VXWORKS
125     } else {
126         u.sock_inet.sin_family = AF_INET;
127         if (*hostname && strcmp(hostname, "localhost") != 0) {
128             struct hostent* hp;
129 #if defined(HAVE_GETHOSTBYNAME_R) && !defined(NO_PTHREADS)
130             struct hostent ent;  // entry in hosts table
131             char buf[GETHOSTBYNAME_BUF_SIZE];
132             int h_err;
133 #if defined(__sun)
134             if ((hp = gethostbyname_r(hostname, &ent, buf, sizeof buf, &h_err)) == NULL
135 #else
136             if (gethostbyname_r(hostname, &ent, buf, sizeof buf, &hp, &h_err) != 0
137                 || hp == NULL
138 #endif
139                 || hp->h_addrtype != AF_INET)
140 #else
141             if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET)
142 #endif
143             {
144                 TRACE_IMSG(("Failed to get host by name: %s\n", errno));
145                 errcode = bad_address;
146                 return false;
147             }
148             memcpy(&u.sock_inet.sin_addr, hp->h_addr,
149                    sizeof u.sock_inet.sin_addr);
150         } else {
151             u.sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
152         }
153         u.sock_inet.sin_port = htons(port);
154         len = sizeof(sockaddr_in);
155     }
156 
157 #ifdef VXWORKS
158     if ((fd = socket(u.sock.sa_family, proto, 0)) < 0) {
159         errcode = errno;
160         TRACE_IMSG(("Socket create is failed: %d", errcode));
161         return false;
162     }
163 #else
164     if ((fd = socket(u.sock.sa_family, SOCK_STREAM, 0)) < 0) {
165         errcode = errno;
166         TRACE_IMSG(("Socket create is failed: %d\n", errcode));
167         return false;
168     }
169 #endif // VXWORKS
170     int on = 1;
171     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
172 #ifdef VXWORKS
173     if (bind(fd, reinterpret_cast<struct sockaddr*>(&u.usock), len) < 0) {
174         errcode = errno;
175         TRACE_IMSG(("Socket bind is failed: %d", errcode));
176         ::close(fd);
177         return false;
178     }
179 #else
180     if (bind(fd, &u.sock, len) < 0) {
181         errcode = errno;
182         TRACE_IMSG(("Socket bind is failed: %d\n", errcode));
183         ::close(fd);
184         return false;
185     }
186 #endif // VXWORKS
187     if (listen(fd, listen_queue_size) < 0) {
188         errcode = errno;
189         TRACE_IMSG(("Socket listen is failed: %d\n", errcode));
190         ::close(fd);
191         return false;
192     }
193     errcode = ok;
194     state = ss_open;
195     return true;
196 }
197 
get_peer_name()198 char* unix_socket::get_peer_name()
199 {
200     if (state != ss_open) {
201         errcode = not_opened;
202         return NULL;
203     }
204     struct sockaddr_in insock;
205     #if defined(__linux__) || (defined(__FreeBSD__) && __FreeBSD__ > 3) || defined(_AIX43) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(HPUX11) || defined(_SOCKLEN_T)
206     socklen_t len = sizeof(insock);
207 #elif defined(_AIX41)
208     size_t len = sizeof(insock);
209 #else
210     int len = sizeof(insock);
211 #endif
212     if (getpeername(fd, (struct sockaddr*)&insock, &len) != 0) {
213         errcode = errno;
214         return NULL;
215     }
216     char* addr = inet_ntoa(insock.sin_addr);
217     if (addr == NULL) {
218         errcode = errno;
219         return NULL;
220     }
221     char* addr_copy = new char[strlen(addr)+1];
222     strcpy(addr_copy, addr);
223     errcode = ok;
224     return addr_copy;
225 }
226 
is_ok()227 bool  unix_socket::is_ok()
228 {
229     return errcode == ok;
230 }
231 
get_error_text(char * buf,size_t buf_size)232 void unix_socket::get_error_text(char* buf, size_t buf_size)
233 {
234     char* msg;
235     switch(errcode) {
236       case ok:
237         msg = "ok";
238         break;
239       case not_opened:
240         msg = "socket not opened";
241         break;
242       case bad_address:
243         msg = "bad address";
244         break;
245       case connection_failed:
246         msg = "exceed limit of attempts of connection to server";
247         break;
248       case broken_pipe:
249         msg = "connection is broken";
250         break;
251       case invalid_access_mode:
252         msg = "invalid access mode";
253         break;
254       default:
255         msg = strerror(errcode);
256     }
257     strncpy(buf, msg, buf_size-1);
258     buf[buf_size-1] = '\0';
259 }
260 
accept()261 socket_t* unix_socket::accept()
262 {
263     int s;
264 
265     if (state != ss_open) {
266         errcode = not_opened;
267         TRACE_IMSG(("Socket not openned\n"));
268         return NULL;
269     }
270 
271     while((s = ::accept(fd, NULL, NULL )) < 0 && errno == EINTR);
272 
273     if (s < 0) {
274         errcode = errno;
275         TRACE_IMSG(("Socket accept failed: %d\n", errcode));
276         return NULL;
277     } else if (state != ss_open) {
278         errcode = not_opened;
279         TRACE_IMSG(("Socket not openned\n"));
280         return NULL;
281     } else {
282 #if SOCK_NO_DELAY
283         if (domain == sock_global_domain) {
284             int enabled = 1;
285             if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
286                            sizeof enabled) != 0)
287             {
288                 errcode = errno;
289                 TRACE_IMSG(("Failed to set socket options: %d\n", errcode));
290                 ::close(s);
291                 return NULL;
292             }
293         }
294 #endif
295 #if SOCK_LINGER
296         static struct linger l = {1, LINGER_TIME};
297         if (setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
298             TRACE_IMSG(("Failed to set socket options: %d\n", errno));
299             errcode = invalid_access_mode;
300             ::close(s);
301             return NULL;
302         }
303 #endif
304 #if SOCK_SNDBUF_SIZE
305         int size = SOCK_SNDBUF_SIZE;
306         setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof size);
307 #endif
308         errcode = ok;
309         return new unix_socket(s);
310     }
311 }
312 
cancel_accept()313 bool unix_socket::cancel_accept()
314 {
315     bool result = close();
316     // Wakeup listener
317     delete socket_t::connect(address, domain, 1, 0);
318     return result;
319 }
320 
321 
connect(int max_attempts,time_t timeout)322 bool unix_socket::connect(int max_attempts, time_t timeout)
323 {
324     int   rc;
325     char* p;
326     struct utsname local_host;
327     char hostname[MAX_HOST_NAME];
328     unsigned short port;
329 #ifdef VXWORKS
330     int proto = SOCK_STREAM;
331 #endif // VXWORKS
332 
333     assert(address != NULL);
334 
335     if ((p = strchr(address, ':')) == NULL
336         || unsigned(p - address) >= sizeof(hostname)
337         || sscanf(p+1, "%hu", &port) != 1)
338     {
339         errcode = bad_address;
340         TRACE_IMSG(("Invalid address: %s\n", address));
341         return false;
342     }
343     memcpy(hostname, address, p - address);
344     hostname[p - address] = '\0';
345 
346     create_file = false;
347     uname(&local_host);
348 
349     if (domain == sock_local_domain || (domain == sock_any_domain &&
350         (strcmp(hostname, local_host.nodename) == 0
351          || strcmp(hostname, "localhost") == 0)))
352     {
353         // connect UNIX socket
354         union {
355             sockaddr sock;
356             char     name[MAX_HOST_NAME];
357         } u;
358         u.sock.sa_family = AF_UNIX;
359 
360 #ifdef VXWORKS
361         if ( domain == sock_local_domain )
362             proto = SOCK_SEQPACKET;
363 #endif
364         assert(strlen(unix_socket_dir) + strlen(address)
365                < MAX_HOST_NAME - offsetof(sockaddr,sa_data));
366 
367 #ifdef VXWORKS
368         int len = offsetof(sockaddr,sa_data) +
369             sprintf(u.sock.sa_data, "%s/0x%x", unix_socket_dir, port);
370 #else
371         int len = offsetof(sockaddr,sa_data) +
372             sprintf(u.sock.sa_data, "%s%s.%u", unix_socket_dir, hostname, port);
373 #endif // VXWORKS
374 
375         while (true) {
376 #ifdef VXWORKS
377             if ((fd = socket(u.sock.sa_family, proto, 0)) < 0) {
378                 errcode = errno;
379                TRACE_IMSG(("Failed to create socket: %d", errcode));
380                return false;
381             }
382 #else
383             if ((fd = socket(u.sock.sa_family, SOCK_STREAM, 0)) < 0) {
384                 errcode = errno;
385                 TRACE_IMSG(("Failed to create socket: %d\n", errcode));
386                 return false;
387             }
388 #endif // VXWORKS
389             do {
390                 rc = ::connect(fd, &u.sock, len);
391             } while (rc < 0 && errno == EINTR);
392 
393             if (rc < 0) {
394                 errcode = errno;
395                 ::close(fd);
396                 if (errcode == ENOENT || errcode == ECONNREFUSED) {
397                     if (--max_attempts > 0) {
398                         sleep(timeout);
399                     } else {
400                         TRACE_IMSG(("All attempts to establish connection are failed\n"));
401                         break;
402                     }
403                 } else {
404                     TRACE_IMSG(("Failed to establish connection: %d\n", errcode));
405                     return false;
406                 }
407             } else {
408                 errcode = ok;
409                 state = ss_open;
410                 return true;
411             }
412         }
413     } else {
414         sockaddr_in sock_inet;
415         struct hostent* hp;
416 #if defined(HAVE_GETHOSTBYNAME_R) && !defined(NO_PTHREADS)
417         struct hostent ent;  // entry in hosts table
418         char buf[GETHOSTBYNAME_BUF_SIZE];
419         int h_err;
420 #if defined(__sun)
421         if ((hp = gethostbyname_r(hostname, &ent, buf, sizeof buf, &h_err)) == NULL
422 #else
423         if (gethostbyname_r(hostname, &ent, buf, sizeof buf, &hp, &h_err) != 0
424             || hp == NULL
425 #endif
426             || hp->h_addrtype != AF_INET)
427 #else
428         if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET)
429 #endif
430         {
431             TRACE_IMSG(("Host name can not be resolved: %d\n", errno));
432             errcode = bad_address;
433             return false;
434         }
435         sock_inet.sin_family = AF_INET;
436         sock_inet.sin_port = htons(port);
437         //fprintf(stderr, "Try to connect to '%s' port %d\n", hostname, port);
438 
439         while (true) {
440             for (int i = 0; hp->h_addr_list[i] != NULL; i++) {
441                 memcpy(&sock_inet.sin_addr, hp->h_addr_list[i],
442                        sizeof sock_inet.sin_addr);
443                 if ((fd = socket(sock_inet.sin_family, SOCK_STREAM, 0)) < 0) {
444                     errcode = errno;
445                     TRACE_IMSG(("Failed to create socket: %d\n", errcode));
446                     return false;
447                 }
448                 do {
449                     rc = ::connect(fd,(sockaddr*)&sock_inet,sizeof(sock_inet));
450                 } while (rc < 0 && errno == EINTR);
451 
452                 if (rc < 0) {
453                     errcode = errno;
454                     ::close(fd);
455                     if (errcode != ENOENT && errcode != ECONNREFUSED) {
456                         TRACE_IMSG(("Failed to establish connection: %d\n", errcode));
457                         return false;
458                     }
459                 } else {
460 #if SOCK_NO_DELAY
461                     int enabled = 1;
462                     if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
463                                    (char*)&enabled, sizeof enabled) != 0)
464                     {
465                         errcode = errno;
466                         TRACE_IMSG(("Failed to set socket option TCP_NODELAY: %d\n", errcode));
467                         ::close(fd);
468                         return false;
469                     }
470 #endif
471 #if SOCK_LINGER
472                     static struct linger l = {1, LINGER_TIME};
473                     if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
474                         errcode = errno;
475                         TRACE_IMSG(("Failed to set socket option SOL_SOCKET: %d\n", errcode));
476                         ::close(fd);
477                         return NULL;
478                     }
479 #endif
480                     errcode = ok;
481                     state = ss_open;
482                     return true;
483                 }
484             }
485             if (--max_attempts > 0) {
486                 sleep(timeout);
487             } else {
488                 TRACE_IMSG(("All attempts to establish connection are failed\n"));
489                 break;
490             }
491         }
492     }
493     errcode = connection_failed;
494     return false;
495 }
496 
read(void * buf,size_t min_size,size_t max_size,time_t timeout)497 int unix_socket::read(void* buf, size_t min_size, size_t max_size,
498                       time_t timeout)
499 {
500     size_t size = 0;
501     time_t start = 0;
502     if (state != ss_open) {
503         errcode = not_opened;
504         TRACE_IMSG(("Socket is not openned\n"));
505         return -1;
506     }
507     if (timeout != WAIT_FOREVER) {
508         start = time(NULL);
509     }
510     do {
511         ssize_t rc;
512         if (timeout != WAIT_FOREVER) {
513             fd_set events;
514             struct timeval tm;
515             FD_ZERO(&events);
516             FD_SET(fd, &events);
517             tm.tv_sec = timeout;
518             tm.tv_usec = 0;
519             while ((rc = select(fd+1, &events, NULL, NULL, &tm)) < 0
520                    && errno == EINTR);
521             if (rc < 0) {
522                 errcode = errno;
523                 TRACE_IMSG(("Socket select is failed: %d\n", errcode));
524                 return -1;
525             }
526             if (rc == 0) {
527                 return size;
528             }
529             time_t now = time(NULL);
530             timeout = start + timeout >= now ? timeout + start - now : 0;
531         }
532         while ((rc = ::read(fd, (char*)buf + size, max_size - size)) < 0
533                && errno == EINTR);
534         if (rc < 0) {
535             errcode = errno;
536             TRACE_IMSG(("Socket read is failed: %d\n", errcode));
537             return -1;
538         } else if (rc == 0) {
539             errcode = broken_pipe;
540             TRACE_IMSG(("Socket is disconnected\n"));
541             return -1;
542         } else {
543             size += rc;
544         }
545     } while (size < min_size);
546 
547     return (int)size;
548 }
549 
550 
write(void const * buf,size_t size,time_t timeout)551 bool unix_socket::write(void const* buf, size_t size, time_t timeout)
552 {
553     time_t start = 0;
554     if (state != ss_open) {
555         errcode = not_opened;
556         TRACE_IMSG(("Socket is not openned\n"));
557         return -1;
558     }
559     if (timeout != WAIT_FOREVER) {
560         start = time(NULL);
561     }
562 
563     do {
564         ssize_t rc;
565         if (timeout != WAIT_FOREVER) {
566             fd_set events;
567             struct timeval tm;
568             FD_ZERO(&events);
569             FD_SET(fd, &events);
570             tm.tv_sec = timeout;
571             tm.tv_usec = 0;
572             while ((rc = select(fd+1, NULL, &events, NULL, &tm)) < 0
573                    && errno == EINTR);
574             if (rc <= 0) {
575                 errcode = errno;
576                 TRACE_IMSG(("Socket select is failed: %d\n", errcode));
577                 return false;
578             }
579             time_t now = time(NULL);
580             timeout = start + timeout >= now ? timeout + start - now : 0;
581         }
582         while ((rc = ::write(fd, (char*)buf, size)) < 0 && errno == EINTR);
583         if (rc < 0) {
584             errcode = errno;
585             TRACE_IMSG(("Socket write is failed: %d\n", errcode));
586             return false;
587         } else if (rc == 0) {
588             errcode = broken_pipe;
589             TRACE_IMSG(("Socket is disconnected\n"));
590             return false;
591         } else {
592             buf = (char*)buf + rc;
593             size -= rc;
594         }
595     } while (size != 0);
596 
597     //
598     // errcode is not assigned 'ok' value beacuse write function
599     // can be called in parallel with other socket operations, so
600     // we want to preserve old error code here.
601     //
602     return true;
603 }
604 
close()605 bool unix_socket::close()
606 {
607     if (state != ss_close) {
608         state = ss_close;
609         if (::close(fd) == 0) {
610             errcode = ok;
611             return true;
612         } else {
613             errcode = errno;
614             TRACE_IMSG(("Socket close is failed: %d\n", errcode));
615             return false;
616         }
617     }
618     errcode = ok;
619     return true;
620 }
621 
shutdown()622 bool unix_socket::shutdown()
623 {
624     if (state == ss_open) {
625         state = ss_shutdown;
626         int rc = ::shutdown(fd, 2);
627         if (rc != 0) {
628             errcode = errno;
629             TRACE_IMSG(("Socket shutdown is failed: %d\n", errcode));
630             return false;
631         }
632     }
633     return true;
634 }
635 
~unix_socket()636 unix_socket::~unix_socket()
637 {
638     close();
639     if (create_file) {
640         char name[MAX_HOST_NAME];
641         char* p = strrchr(address, ':');
642         sprintf(name, "%s%.*s.%s", unix_socket_dir, (int)(p - address), address, p+1);
643         unlink(name);
644     }
645     delete[] address;
646 }
647 
unix_socket(const char * addr,socket_domain domain)648 unix_socket::unix_socket(const char* addr, socket_domain domain)
649 {
650     address = new char[strlen(addr)+1];
651     strcpy(address, addr);
652     this->domain = domain;
653     create_file = false;
654     errcode = ok;
655 }
656 
unix_socket(int new_fd)657 unix_socket::unix_socket(int new_fd)
658 {
659     fd = new_fd;
660     address = NULL;
661     create_file = false;
662     state = ss_open;
663     errcode = ok;
664 }
665 
create_local(char const * address,int listen_queue_size)666 socket_t* socket_t::create_local(char const* address, int listen_queue_size)
667 {
668     unix_socket* sock = new unix_socket(address, sock_local_domain);
669     sock->open(listen_queue_size);
670     return sock;
671 }
672 
create_global(char const * address,int listen_queue_size)673 socket_t* socket_t::create_global(char const* address, int listen_queue_size)
674 {
675     unix_socket* sock = new unix_socket(address, sock_global_domain);
676     sock->open(listen_queue_size);
677     return sock;
678 }
679 
connect(char const * address,socket_domain domain,int max_attempts,time_t timeout)680 socket_t* socket_t::connect(char const* address,
681                             socket_domain domain,
682                             int max_attempts,
683                             time_t timeout)
684 {
685     unix_socket* sock = new unix_socket(address, domain);
686     sock->connect(max_attempts, timeout);
687     return sock;
688 }
689 
get_handle()690 socket_handle_t unix_socket::get_handle()
691 {
692     return fd;
693 }
694 
695 END_FASTDB_NAMESPACE
696