1 //-< UNISOCK.CPP >---------------------------------------------------*--------*
2 // FastDB Version 1.0 (c) 1999 GARRET * ? *
3 // (Main Memory Database Management System) * /\| *
4 // * / \ *
5 // Created: 8-Feb-97 K.A. Knizhnik * / [] \ *
6 // Last update: 18-May-97 K.A. Knizhnik * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Unix sockets
9 //-------------------------------------------------------------------*--------*
10
11 #include "unisock.h"
12 #undef BYTE_ORDER
13
14 #ifdef VXWORKS
15 #include "fastdbShim.h"
16 #else
17 #include <sys/ioctl.h>
18 #include <fcntl.h>
19 #include <sys/time.h>
20 #include <sys/errno.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/utsname.h>
24 #ifndef HPUX11
25 #include <sys/select.h>
26 #endif
27 #include <netinet/in.h>
28 #include <netinet/tcp.h>
29 #include <arpa/inet.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <string.h>
34 #if defined(_AIX)
35 #include <strings.h>
36 #endif /* _AIX */
37 #include <stddef.h>
38 #include <assert.h>
39 #include <errno.h>
40
41 extern "C" {
42 #include <netdb.h>
43 }
44
45 #include <signal.h>
46 #endif // VXWORKS
47
48 BEGIN_FASTDB_NAMESPACE
49
50 const int MAX_HOST_NAME = 256;
51 const int GETHOSTBYNAME_BUF_SIZE = 1024;
52
53 #ifdef VXWORKS
54 char* unix_socket::unix_socket_dir = "/comp/socket";
55 #else
56 char* unix_socket::unix_socket_dir = "/tmp/";
57 #endif // VXWORKS
58
59 class unix_socket_library {
60 public:
unix_socket_library()61 unix_socket_library() {
62 static struct sigaction sigpipe_ignore;
63 sigpipe_ignore.sa_handler = SIG_IGN;
64 sigaction(SIGPIPE, &sigpipe_ignore, NULL);
65 }
66 };
67
68 static unix_socket_library unisock_lib;
69
open(int listen_queue_size)70 bool unix_socket::open(int listen_queue_size)
71 {
72 char hostname[MAX_HOST_NAME];
73 unsigned short port;
74 char* p;
75 #ifdef VXWORKS
76 int proto = SOCK_STREAM;
77 #endif // VXWORKS
78
79 assert(address != NULL);
80
81 if ((p = strchr(address, ':')) == NULL
82 || unsigned(p - address) >= sizeof(hostname)
83 || sscanf(p+1, "%hu", &port) != 1)
84 {
85 TRACE_IMSG(("Invalid address: %s\n", address));
86 errcode = bad_address;
87 return false;
88 }
89 memcpy(hostname, address, p - address);
90 hostname[p - address] = '\0';
91
92 create_file = false;
93 union {
94 sockaddr sock;
95 sockaddr_in sock_inet;
96 #ifdef VXWORKS
97 struct sockaddr_un usock;
98 #endif
99 char name[MAX_HOST_NAME];
100 } u;
101 int len;
102
103 if (domain == sock_local_domain) {
104 #ifdef VXWORKS
105 memset(&u.usock, 0, sizeof(struct sockaddr_un));
106 u.usock.sun_family = AF_UNIX;
107 proto = SOCK_SEQPACKET;
108 u.usock.sun_len = len = sizeof (struct sockaddr_un);
109 sprintf(u.usock.sun_path, "%s/0x%x", unix_socket_dir, port);
110 TRACE_IMSG(("Sock %s %d\n", u.usock.sun_path,u.usock.sun_len));
111 unlink(u.usock.sun_path); // remove file if existed
112 create_file = true;
113 #else
114 u.sock.sa_family = AF_UNIX;
115
116 assert(strlen(unix_socket_dir) + strlen(address)
117 < MAX_HOST_NAME - offsetof(sockaddr,sa_data));
118
119 len = offsetof(sockaddr,sa_data) +
120 sprintf(u.sock.sa_data, "%s%s.%u", unix_socket_dir, hostname, port);
121
122 unlink(u.sock.sa_data); // remove file if existed
123 create_file = true;
124 #endif // VXWORKS
125 } else {
126 u.sock_inet.sin_family = AF_INET;
127 if (*hostname && strcmp(hostname, "localhost") != 0) {
128 struct hostent* hp;
129 #if defined(HAVE_GETHOSTBYNAME_R) && !defined(NO_PTHREADS)
130 struct hostent ent; // entry in hosts table
131 char buf[GETHOSTBYNAME_BUF_SIZE];
132 int h_err;
133 #if defined(__sun)
134 if ((hp = gethostbyname_r(hostname, &ent, buf, sizeof buf, &h_err)) == NULL
135 #else
136 if (gethostbyname_r(hostname, &ent, buf, sizeof buf, &hp, &h_err) != 0
137 || hp == NULL
138 #endif
139 || hp->h_addrtype != AF_INET)
140 #else
141 if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET)
142 #endif
143 {
144 TRACE_IMSG(("Failed to get host by name: %s\n", errno));
145 errcode = bad_address;
146 return false;
147 }
148 memcpy(&u.sock_inet.sin_addr, hp->h_addr,
149 sizeof u.sock_inet.sin_addr);
150 } else {
151 u.sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
152 }
153 u.sock_inet.sin_port = htons(port);
154 len = sizeof(sockaddr_in);
155 }
156
157 #ifdef VXWORKS
158 if ((fd = socket(u.sock.sa_family, proto, 0)) < 0) {
159 errcode = errno;
160 TRACE_IMSG(("Socket create is failed: %d", errcode));
161 return false;
162 }
163 #else
164 if ((fd = socket(u.sock.sa_family, SOCK_STREAM, 0)) < 0) {
165 errcode = errno;
166 TRACE_IMSG(("Socket create is failed: %d\n", errcode));
167 return false;
168 }
169 #endif // VXWORKS
170 int on = 1;
171 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
172 #ifdef VXWORKS
173 if (bind(fd, reinterpret_cast<struct sockaddr*>(&u.usock), len) < 0) {
174 errcode = errno;
175 TRACE_IMSG(("Socket bind is failed: %d", errcode));
176 ::close(fd);
177 return false;
178 }
179 #else
180 if (bind(fd, &u.sock, len) < 0) {
181 errcode = errno;
182 TRACE_IMSG(("Socket bind is failed: %d\n", errcode));
183 ::close(fd);
184 return false;
185 }
186 #endif // VXWORKS
187 if (listen(fd, listen_queue_size) < 0) {
188 errcode = errno;
189 TRACE_IMSG(("Socket listen is failed: %d\n", errcode));
190 ::close(fd);
191 return false;
192 }
193 errcode = ok;
194 state = ss_open;
195 return true;
196 }
197
get_peer_name()198 char* unix_socket::get_peer_name()
199 {
200 if (state != ss_open) {
201 errcode = not_opened;
202 return NULL;
203 }
204 struct sockaddr_in insock;
205 #if defined(__linux__) || (defined(__FreeBSD__) && __FreeBSD__ > 3) || defined(_AIX43) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(HPUX11) || defined(_SOCKLEN_T)
206 socklen_t len = sizeof(insock);
207 #elif defined(_AIX41)
208 size_t len = sizeof(insock);
209 #else
210 int len = sizeof(insock);
211 #endif
212 if (getpeername(fd, (struct sockaddr*)&insock, &len) != 0) {
213 errcode = errno;
214 return NULL;
215 }
216 char* addr = inet_ntoa(insock.sin_addr);
217 if (addr == NULL) {
218 errcode = errno;
219 return NULL;
220 }
221 char* addr_copy = new char[strlen(addr)+1];
222 strcpy(addr_copy, addr);
223 errcode = ok;
224 return addr_copy;
225 }
226
is_ok()227 bool unix_socket::is_ok()
228 {
229 return errcode == ok;
230 }
231
get_error_text(char * buf,size_t buf_size)232 void unix_socket::get_error_text(char* buf, size_t buf_size)
233 {
234 char* msg;
235 switch(errcode) {
236 case ok:
237 msg = "ok";
238 break;
239 case not_opened:
240 msg = "socket not opened";
241 break;
242 case bad_address:
243 msg = "bad address";
244 break;
245 case connection_failed:
246 msg = "exceed limit of attempts of connection to server";
247 break;
248 case broken_pipe:
249 msg = "connection is broken";
250 break;
251 case invalid_access_mode:
252 msg = "invalid access mode";
253 break;
254 default:
255 msg = strerror(errcode);
256 }
257 strncpy(buf, msg, buf_size-1);
258 buf[buf_size-1] = '\0';
259 }
260
accept()261 socket_t* unix_socket::accept()
262 {
263 int s;
264
265 if (state != ss_open) {
266 errcode = not_opened;
267 TRACE_IMSG(("Socket not openned\n"));
268 return NULL;
269 }
270
271 while((s = ::accept(fd, NULL, NULL )) < 0 && errno == EINTR);
272
273 if (s < 0) {
274 errcode = errno;
275 TRACE_IMSG(("Socket accept failed: %d\n", errcode));
276 return NULL;
277 } else if (state != ss_open) {
278 errcode = not_opened;
279 TRACE_IMSG(("Socket not openned\n"));
280 return NULL;
281 } else {
282 #if SOCK_NO_DELAY
283 if (domain == sock_global_domain) {
284 int enabled = 1;
285 if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
286 sizeof enabled) != 0)
287 {
288 errcode = errno;
289 TRACE_IMSG(("Failed to set socket options: %d\n", errcode));
290 ::close(s);
291 return NULL;
292 }
293 }
294 #endif
295 #if SOCK_LINGER
296 static struct linger l = {1, LINGER_TIME};
297 if (setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
298 TRACE_IMSG(("Failed to set socket options: %d\n", errno));
299 errcode = invalid_access_mode;
300 ::close(s);
301 return NULL;
302 }
303 #endif
304 #if SOCK_SNDBUF_SIZE
305 int size = SOCK_SNDBUF_SIZE;
306 setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof size);
307 #endif
308 errcode = ok;
309 return new unix_socket(s);
310 }
311 }
312
cancel_accept()313 bool unix_socket::cancel_accept()
314 {
315 bool result = close();
316 // Wakeup listener
317 delete socket_t::connect(address, domain, 1, 0);
318 return result;
319 }
320
321
connect(int max_attempts,time_t timeout)322 bool unix_socket::connect(int max_attempts, time_t timeout)
323 {
324 int rc;
325 char* p;
326 struct utsname local_host;
327 char hostname[MAX_HOST_NAME];
328 unsigned short port;
329 #ifdef VXWORKS
330 int proto = SOCK_STREAM;
331 #endif // VXWORKS
332
333 assert(address != NULL);
334
335 if ((p = strchr(address, ':')) == NULL
336 || unsigned(p - address) >= sizeof(hostname)
337 || sscanf(p+1, "%hu", &port) != 1)
338 {
339 errcode = bad_address;
340 TRACE_IMSG(("Invalid address: %s\n", address));
341 return false;
342 }
343 memcpy(hostname, address, p - address);
344 hostname[p - address] = '\0';
345
346 create_file = false;
347 uname(&local_host);
348
349 if (domain == sock_local_domain || (domain == sock_any_domain &&
350 (strcmp(hostname, local_host.nodename) == 0
351 || strcmp(hostname, "localhost") == 0)))
352 {
353 // connect UNIX socket
354 union {
355 sockaddr sock;
356 char name[MAX_HOST_NAME];
357 } u;
358 u.sock.sa_family = AF_UNIX;
359
360 #ifdef VXWORKS
361 if ( domain == sock_local_domain )
362 proto = SOCK_SEQPACKET;
363 #endif
364 assert(strlen(unix_socket_dir) + strlen(address)
365 < MAX_HOST_NAME - offsetof(sockaddr,sa_data));
366
367 #ifdef VXWORKS
368 int len = offsetof(sockaddr,sa_data) +
369 sprintf(u.sock.sa_data, "%s/0x%x", unix_socket_dir, port);
370 #else
371 int len = offsetof(sockaddr,sa_data) +
372 sprintf(u.sock.sa_data, "%s%s.%u", unix_socket_dir, hostname, port);
373 #endif // VXWORKS
374
375 while (true) {
376 #ifdef VXWORKS
377 if ((fd = socket(u.sock.sa_family, proto, 0)) < 0) {
378 errcode = errno;
379 TRACE_IMSG(("Failed to create socket: %d", errcode));
380 return false;
381 }
382 #else
383 if ((fd = socket(u.sock.sa_family, SOCK_STREAM, 0)) < 0) {
384 errcode = errno;
385 TRACE_IMSG(("Failed to create socket: %d\n", errcode));
386 return false;
387 }
388 #endif // VXWORKS
389 do {
390 rc = ::connect(fd, &u.sock, len);
391 } while (rc < 0 && errno == EINTR);
392
393 if (rc < 0) {
394 errcode = errno;
395 ::close(fd);
396 if (errcode == ENOENT || errcode == ECONNREFUSED) {
397 if (--max_attempts > 0) {
398 sleep(timeout);
399 } else {
400 TRACE_IMSG(("All attempts to establish connection are failed\n"));
401 break;
402 }
403 } else {
404 TRACE_IMSG(("Failed to establish connection: %d\n", errcode));
405 return false;
406 }
407 } else {
408 errcode = ok;
409 state = ss_open;
410 return true;
411 }
412 }
413 } else {
414 sockaddr_in sock_inet;
415 struct hostent* hp;
416 #if defined(HAVE_GETHOSTBYNAME_R) && !defined(NO_PTHREADS)
417 struct hostent ent; // entry in hosts table
418 char buf[GETHOSTBYNAME_BUF_SIZE];
419 int h_err;
420 #if defined(__sun)
421 if ((hp = gethostbyname_r(hostname, &ent, buf, sizeof buf, &h_err)) == NULL
422 #else
423 if (gethostbyname_r(hostname, &ent, buf, sizeof buf, &hp, &h_err) != 0
424 || hp == NULL
425 #endif
426 || hp->h_addrtype != AF_INET)
427 #else
428 if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET)
429 #endif
430 {
431 TRACE_IMSG(("Host name can not be resolved: %d\n", errno));
432 errcode = bad_address;
433 return false;
434 }
435 sock_inet.sin_family = AF_INET;
436 sock_inet.sin_port = htons(port);
437 //fprintf(stderr, "Try to connect to '%s' port %d\n", hostname, port);
438
439 while (true) {
440 for (int i = 0; hp->h_addr_list[i] != NULL; i++) {
441 memcpy(&sock_inet.sin_addr, hp->h_addr_list[i],
442 sizeof sock_inet.sin_addr);
443 if ((fd = socket(sock_inet.sin_family, SOCK_STREAM, 0)) < 0) {
444 errcode = errno;
445 TRACE_IMSG(("Failed to create socket: %d\n", errcode));
446 return false;
447 }
448 do {
449 rc = ::connect(fd,(sockaddr*)&sock_inet,sizeof(sock_inet));
450 } while (rc < 0 && errno == EINTR);
451
452 if (rc < 0) {
453 errcode = errno;
454 ::close(fd);
455 if (errcode != ENOENT && errcode != ECONNREFUSED) {
456 TRACE_IMSG(("Failed to establish connection: %d\n", errcode));
457 return false;
458 }
459 } else {
460 #if SOCK_NO_DELAY
461 int enabled = 1;
462 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
463 (char*)&enabled, sizeof enabled) != 0)
464 {
465 errcode = errno;
466 TRACE_IMSG(("Failed to set socket option TCP_NODELAY: %d\n", errcode));
467 ::close(fd);
468 return false;
469 }
470 #endif
471 #if SOCK_LINGER
472 static struct linger l = {1, LINGER_TIME};
473 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
474 errcode = errno;
475 TRACE_IMSG(("Failed to set socket option SOL_SOCKET: %d\n", errcode));
476 ::close(fd);
477 return NULL;
478 }
479 #endif
480 errcode = ok;
481 state = ss_open;
482 return true;
483 }
484 }
485 if (--max_attempts > 0) {
486 sleep(timeout);
487 } else {
488 TRACE_IMSG(("All attempts to establish connection are failed\n"));
489 break;
490 }
491 }
492 }
493 errcode = connection_failed;
494 return false;
495 }
496
read(void * buf,size_t min_size,size_t max_size,time_t timeout)497 int unix_socket::read(void* buf, size_t min_size, size_t max_size,
498 time_t timeout)
499 {
500 size_t size = 0;
501 time_t start = 0;
502 if (state != ss_open) {
503 errcode = not_opened;
504 TRACE_IMSG(("Socket is not openned\n"));
505 return -1;
506 }
507 if (timeout != WAIT_FOREVER) {
508 start = time(NULL);
509 }
510 do {
511 ssize_t rc;
512 if (timeout != WAIT_FOREVER) {
513 fd_set events;
514 struct timeval tm;
515 FD_ZERO(&events);
516 FD_SET(fd, &events);
517 tm.tv_sec = timeout;
518 tm.tv_usec = 0;
519 while ((rc = select(fd+1, &events, NULL, NULL, &tm)) < 0
520 && errno == EINTR);
521 if (rc < 0) {
522 errcode = errno;
523 TRACE_IMSG(("Socket select is failed: %d\n", errcode));
524 return -1;
525 }
526 if (rc == 0) {
527 return size;
528 }
529 time_t now = time(NULL);
530 timeout = start + timeout >= now ? timeout + start - now : 0;
531 }
532 while ((rc = ::read(fd, (char*)buf + size, max_size - size)) < 0
533 && errno == EINTR);
534 if (rc < 0) {
535 errcode = errno;
536 TRACE_IMSG(("Socket read is failed: %d\n", errcode));
537 return -1;
538 } else if (rc == 0) {
539 errcode = broken_pipe;
540 TRACE_IMSG(("Socket is disconnected\n"));
541 return -1;
542 } else {
543 size += rc;
544 }
545 } while (size < min_size);
546
547 return (int)size;
548 }
549
550
write(void const * buf,size_t size,time_t timeout)551 bool unix_socket::write(void const* buf, size_t size, time_t timeout)
552 {
553 time_t start = 0;
554 if (state != ss_open) {
555 errcode = not_opened;
556 TRACE_IMSG(("Socket is not openned\n"));
557 return -1;
558 }
559 if (timeout != WAIT_FOREVER) {
560 start = time(NULL);
561 }
562
563 do {
564 ssize_t rc;
565 if (timeout != WAIT_FOREVER) {
566 fd_set events;
567 struct timeval tm;
568 FD_ZERO(&events);
569 FD_SET(fd, &events);
570 tm.tv_sec = timeout;
571 tm.tv_usec = 0;
572 while ((rc = select(fd+1, NULL, &events, NULL, &tm)) < 0
573 && errno == EINTR);
574 if (rc <= 0) {
575 errcode = errno;
576 TRACE_IMSG(("Socket select is failed: %d\n", errcode));
577 return false;
578 }
579 time_t now = time(NULL);
580 timeout = start + timeout >= now ? timeout + start - now : 0;
581 }
582 while ((rc = ::write(fd, (char*)buf, size)) < 0 && errno == EINTR);
583 if (rc < 0) {
584 errcode = errno;
585 TRACE_IMSG(("Socket write is failed: %d\n", errcode));
586 return false;
587 } else if (rc == 0) {
588 errcode = broken_pipe;
589 TRACE_IMSG(("Socket is disconnected\n"));
590 return false;
591 } else {
592 buf = (char*)buf + rc;
593 size -= rc;
594 }
595 } while (size != 0);
596
597 //
598 // errcode is not assigned 'ok' value beacuse write function
599 // can be called in parallel with other socket operations, so
600 // we want to preserve old error code here.
601 //
602 return true;
603 }
604
close()605 bool unix_socket::close()
606 {
607 if (state != ss_close) {
608 state = ss_close;
609 if (::close(fd) == 0) {
610 errcode = ok;
611 return true;
612 } else {
613 errcode = errno;
614 TRACE_IMSG(("Socket close is failed: %d\n", errcode));
615 return false;
616 }
617 }
618 errcode = ok;
619 return true;
620 }
621
shutdown()622 bool unix_socket::shutdown()
623 {
624 if (state == ss_open) {
625 state = ss_shutdown;
626 int rc = ::shutdown(fd, 2);
627 if (rc != 0) {
628 errcode = errno;
629 TRACE_IMSG(("Socket shutdown is failed: %d\n", errcode));
630 return false;
631 }
632 }
633 return true;
634 }
635
~unix_socket()636 unix_socket::~unix_socket()
637 {
638 close();
639 if (create_file) {
640 char name[MAX_HOST_NAME];
641 char* p = strrchr(address, ':');
642 sprintf(name, "%s%.*s.%s", unix_socket_dir, (int)(p - address), address, p+1);
643 unlink(name);
644 }
645 delete[] address;
646 }
647
unix_socket(const char * addr,socket_domain domain)648 unix_socket::unix_socket(const char* addr, socket_domain domain)
649 {
650 address = new char[strlen(addr)+1];
651 strcpy(address, addr);
652 this->domain = domain;
653 create_file = false;
654 errcode = ok;
655 }
656
unix_socket(int new_fd)657 unix_socket::unix_socket(int new_fd)
658 {
659 fd = new_fd;
660 address = NULL;
661 create_file = false;
662 state = ss_open;
663 errcode = ok;
664 }
665
create_local(char const * address,int listen_queue_size)666 socket_t* socket_t::create_local(char const* address, int listen_queue_size)
667 {
668 unix_socket* sock = new unix_socket(address, sock_local_domain);
669 sock->open(listen_queue_size);
670 return sock;
671 }
672
create_global(char const * address,int listen_queue_size)673 socket_t* socket_t::create_global(char const* address, int listen_queue_size)
674 {
675 unix_socket* sock = new unix_socket(address, sock_global_domain);
676 sock->open(listen_queue_size);
677 return sock;
678 }
679
connect(char const * address,socket_domain domain,int max_attempts,time_t timeout)680 socket_t* socket_t::connect(char const* address,
681 socket_domain domain,
682 int max_attempts,
683 time_t timeout)
684 {
685 unix_socket* sock = new unix_socket(address, domain);
686 sock->connect(max_attempts, timeout);
687 return sock;
688 }
689
get_handle()690 socket_handle_t unix_socket::get_handle()
691 {
692 return fd;
693 }
694
695 END_FASTDB_NAMESPACE
696