1 //-< W32SOCK.CXX >---------------------------------------------------*--------*
2 // FastDB Version 1.0 (c) 1997 GARRET * ? *
3 // (Main Memory Database Management System) * /\| *
4 // * / \ *
5 // Created: 8-May-97 K.A. Knizhnik * / [] \ *
6 // Last update: 19-May-97 K.A. Knizhnik * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Windows sockets
9 //-------------------------------------------------------------------*--------*
10
11 #define INSIDE_FASTDB
12
13 #include "stdtp.h"
14 #include "w32sock.h"
15 #include "sync.h"
16
17 BEGIN_FASTDB_NAMESPACE
18
19 #define MAX_HOST_NAME 256
20 #define MILLISECOND 1000
21
22 static HANDLE WatchDogMutex;
23
24 #if (defined(_MSC_VER) && _MSC_VER <= 1310) || defined(__GNUC__) || defined(__BORLANDC__)
25 #define MEMORY_BARRIER_NOT_DEFINED true
26 #endif
27
28 #ifdef MEMORY_BARRIER_NOT_DEFINED
29
30 #ifdef __GNUC__
31
32 #if (__GNUC__ > 4) || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
33 #define MemoryBarrier() __sync_synchronize()
34 #else
MemoryBarrier()35 inline void MemoryBarrier() {
36 asm volatile("mfence":::"memory");
37 }
38 #endif
39
40 #else
41
42 #if defined(_M_AMD64)
43 #define MemoryBarrier __faststorefence
44 #elif defined(_M_IA64)
45 #define MemoryBarrier __mf
46 #elif (defined(_M_IX86) || defined(_M_X64)) && !defined(__BORLANDC__)
MemoryBarrier()47 inline void MemoryBarrier() {
48 LONG Barrier;
49 __asm {
50 xchg Barrier, eax
51 }
52 }
53 #else
54 static dbMutex barrierMutex;
MemoryBarrier()55 inline void MemoryBarrier() { dbCriticalSection cs(barrierMutex); }
56 #endif
57 #endif
58 #endif
59
60 class win_socket_library {
61 public:
62 SYSTEM_INFO sinfo;
63
win_socket_library()64 win_socket_library() {
65 WSADATA wsa;
66 if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
67 fprintf(stderr,"Failed to initialize windows sockets: %d\n",
68 WSAGetLastError());
69 }
70 //
71 // This mutex is used to recognize process termination
72 //
73 WatchDogMutex = CreateMutex(FASTDB_SECURITY_ATTRIBUTES, TRUE, NULL);
74
75 #ifdef PHAR_LAP
76 sinfo.wProcessorLevel = 5;
77 #else
78 GetSystemInfo(&sinfo);
79 #endif
80 }
~win_socket_library()81 ~win_socket_library() {
82 // WSACleanup();
83 }
84 };
85
86 static win_socket_library ws32_lib;
87
open(int listen_queue_size)88 bool win_socket::open(int listen_queue_size)
89 {
90 unsigned short port;
91 char* p;
92 char hostname[MAX_HOST_NAME];
93
94 assert(address != NULL);
95
96 if ((p = strchr(address, ':')) == NULL
97 || sscanf(p+1, "%hu", &port) != 1)
98 {
99 errcode = bad_address;
100 TRACE_MSG(("Invalid address: %s\n", address));
101 return false;
102 }
103 memcpy(hostname, address, p - address);
104 hostname[p - address] = '\0';
105
106 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
107 errcode = WSAGetLastError();
108 TRACE_MSG(("Socket create is failed: %d\n", errcode));
109 return false;
110 }
111 struct sockaddr_in insock;
112 insock.sin_family = AF_INET;
113 if (*hostname && stricmp(hostname, "localhost") != 0) {
114 struct hostent* hp; // entry in hosts table
115 if ((hp = gethostbyname(hostname)) == NULL
116 || hp->h_addrtype != AF_INET)
117 {
118 TRACE_MSG(("Failed to get host by name: %s\n", errno));
119 errcode = bad_address;
120 return false;
121 }
122 memcpy(&insock.sin_addr, hp->h_addr, sizeof insock.sin_addr);
123 } else {
124 insock.sin_addr.s_addr = htonl(INADDR_ANY);
125 }
126 insock.sin_port = htons(port);
127
128 int on = 1;
129 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
130
131 if (bind(s, (sockaddr*)&insock, sizeof(insock)) != 0) {
132 errcode = WSAGetLastError();
133 TRACE_MSG(("Socket bind is failed: %d\n", errcode));
134 closesocket(s);
135 return false;
136 }
137 if (listen(s, listen_queue_size) != 0) {
138 errcode = WSAGetLastError();
139 TRACE_MSG(("Socket listen is failed: %d\n", errcode));
140 closesocket(s);
141 return false;
142 }
143 errcode = ok;
144 state = ss_open;
145 return true;
146 }
147
is_ok()148 bool win_socket::is_ok()
149 {
150 return errcode == ok;
151 }
152
get_handle()153 socket_handle_t win_socket::get_handle()
154 {
155 return s;
156 }
157
get_error_text(char * buf,size_t buf_size)158 void win_socket::get_error_text(char* buf, size_t buf_size)
159 {
160 char* msg;
161 char msgbuf[64];
162
163 switch(errcode) {
164 case ok:
165 msg = "ok";
166 break;
167 case not_opened:
168 msg = "socket not opened";
169 break;
170 case bad_address:
171 msg = "bad address";
172 break;
173 case connection_failed:
174 msg = "exceed limit of attempts of connection to server";
175 break;
176 case broken_pipe:
177 msg = "connection is broken";
178 break;
179 case invalid_access_mode:
180 msg = "invalid access mode";
181 break;
182 default:
183 #ifndef PHAR_LAP
184 {
185 int len;
186 #if defined(_WINCE) || defined(UNICODE)
187 wchar_t cnvBuf[CNV_BUF_SIZE];
188 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
189 NULL,
190 errcode,
191 0,
192 cnvBuf,
193 CNV_BUF_SIZE-1,
194 NULL);
195 cnvBuf[CNV_BUF_SIZE-1] = '\0';
196 len = wcstombs(buf, cnvBuf, buf_size);
197 #else
198 len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
199 NULL,
200 errcode,
201 0,
202 buf,
203 (DWORD)buf_size-1,
204 NULL);
205 #endif
206 if (len == 0) {
207 sprintf(msgbuf, "unknown error code %u", errcode);
208 msg = msgbuf;
209 } else {
210 return;
211 }
212 }
213 #else
214 sprintf(msgbuf, "System error code: %u", errcode);
215 msg = msgbuf;
216 #endif
217 }
218 strncpy(buf, msg, buf_size-1);
219 buf[buf_size-1] = '\0';
220 }
221
accept()222 socket_t* win_socket::accept()
223 {
224 if (state != ss_open) {
225 errcode = not_opened;
226 TRACE_MSG(("Socket not openned\n"));
227 return NULL;
228 }
229
230 SOCKET new_sock = ::accept(s, NULL, NULL );
231
232 if (new_sock == INVALID_SOCKET) {
233 errcode = WSAGetLastError();
234 TRACE_MSG(("Socket accept failed: %d\n", errcode));
235 return NULL;
236 } else {
237 #if SOCK_LINGER
238 static struct linger l = {1, LINGER_TIME};
239 if (setsockopt(new_sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
240 errcode = WSAGetLastError();
241 TRACE_MSG(("Failed to set socket options: %d\n", errcode));
242 closesocket(new_sock);
243 return NULL;
244 }
245 #endif
246 #if SOCK_NO_DELAY
247 int enabled = 1;
248 if (setsockopt(new_sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
249 sizeof enabled) != 0)
250 {
251 errcode = WSAGetLastError();
252 TRACE_MSG(("Failed to set socket options: %d\n", errcode));
253 closesocket(new_sock);
254 return NULL;
255 }
256 #endif
257 #if SOCK_SNDBUF_SIZE
258 int size = SOCK_SNDBUF_SIZE;
259 setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof size);
260 #endif
261 errcode = ok;
262 return new win_socket(new_sock);
263 }
264 }
265
cancel_accept()266 bool win_socket::cancel_accept()
267 {
268 bool result = close();
269 // Wakeup listener
270 delete socket_t::connect(address, sock_global_domain, 1, 0);
271 return result;
272 }
273
274
connect(int max_attempts,time_t timeout)275 bool win_socket::connect(int max_attempts, time_t timeout)
276 {
277 char hostname[MAX_HOST_NAME];
278 char *p;
279 unsigned short port;
280
281 assert(address != NULL);
282
283 if ((p = strchr(address, ':')) == NULL
284 || (size_t)(p - address) >= sizeof(hostname)
285 || sscanf(p+1, "%hu", &port) != 1)
286 {
287 errcode = bad_address;
288 TRACE_MSG(("Invalid address: %s\n", address));
289 return false;
290 }
291 memcpy(hostname, address, p - address);
292 hostname[p - address] = '\0';
293
294 struct sockaddr_in insock; // inet socket address
295 struct hostent* hp; // entry in hosts table
296
297 if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET) {
298 TRACE_MSG(("Host name can not be resolved: %d\n", WSAGetLastError()));
299 errcode = bad_address;
300 return false;
301 }
302 insock.sin_family = AF_INET;
303 insock.sin_port = htons(port);
304
305 while (true) {
306 for (int i = 0; hp->h_addr_list[i] != NULL; i++) {
307 memcpy(&insock.sin_addr, hp->h_addr_list[i],
308 sizeof insock.sin_addr);
309 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
310 errcode = WSAGetLastError();
311 TRACE_MSG(("Failed to create socket: %d\n", errcode));
312 return false;
313 }
314 if (::connect(s, (sockaddr*)&insock, sizeof insock) != 0) {
315 errcode = WSAGetLastError();
316 closesocket(s);
317 if (errcode != WSAECONNREFUSED) {
318 TRACE_MSG(("Failed to establish connection: %d\n", errcode));
319 return false;
320 }
321 } else {
322 #if SOCK_NO_DELAY
323 int enabled = 1;
324 if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
325 sizeof enabled) != 0)
326 {
327 errcode = WSAGetLastError();
328 TRACE_MSG(("Failed to set socket options: %d\n", errcode));
329 closesocket(s);
330 return false;
331 }
332 #endif
333 #if SOCK_LINGER
334 static struct linger l = {1, LINGER_TIME};
335 if (setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
336 errcode = WSAGetLastError();
337 TRACE_MSG(("Failed to set socket options: %d\n", errcode));
338 closesocket(s);
339 return NULL;
340 }
341 #endif
342 errcode = ok;
343 state = ss_open;
344 return true;
345 }
346 }
347 if (--max_attempts > 0) {
348 Sleep((DWORD)(timeout*MILLISECOND));
349 } else {
350 errcode = connection_failed;
351 TRACE_MSG(("All attempts to establish connection are failed\n"));
352 return false;
353 }
354 }
355 }
356
read(void * buf,size_t min_size,size_t max_size,time_t timeout)357 int win_socket::read(void* buf, size_t min_size, size_t max_size,
358 time_t timeout)
359 {
360 size_t size = 0;
361 time_t start = 0;
362 if (state != ss_open) {
363 errcode = not_opened;
364 TRACE_MSG(("Socket is not openned\n"));
365 return -1;
366 }
367 if (timeout != WAIT_FOREVER) {
368 start = time(NULL);
369 }
370
371 do {
372 int rc;
373 if (timeout != WAIT_FOREVER) {
374 fd_set events;
375 struct timeval tm;
376 FD_ZERO(&events);
377 FD_SET(s, &events);
378 tm.tv_sec = (long)timeout;
379 tm.tv_usec = 0;
380 rc = select((int)s+1, &events, NULL, NULL, &tm);
381 if (rc < 0) {
382 errcode = WSAGetLastError();
383 TRACE_MSG(("Socket select is failed: %d\n", errcode));
384 return -1;
385 }
386 if (rc == 0) {
387 return (int)size;
388 }
389 time_t now = time(NULL);
390 timeout = start + timeout >= now ? timeout + start - now : 0;
391 }
392 rc = recv(s, (char*)buf + size, (int)(max_size - size), 0);
393 if (rc < 0) {
394 errcode = WSAGetLastError();
395 TRACE_MSG(("Socket read is failed: %d\n", errcode));
396 return -1;
397 } else if (rc == 0) {
398 errcode = broken_pipe;
399 TRACE_MSG(("Socket is disconnected\n"));
400 return -1;
401 } else {
402 size += rc;
403 }
404 } while (size < min_size);
405
406 return (int)size;
407 }
408
write(void const * buf,size_t size,time_t timeout)409 bool win_socket::write(void const* buf, size_t size, time_t timeout)
410 {
411 time_t start = 0;
412 if (state != ss_open) {
413 errcode = not_opened;
414 TRACE_MSG(("Socket is not openned\n"));
415 return false;
416 }
417 if (timeout != WAIT_FOREVER) {
418 start = time(NULL);
419 }
420
421 do {
422 int rc;
423 if (timeout != WAIT_FOREVER) {
424 fd_set events;
425 struct timeval tm;
426 FD_ZERO(&events);
427 FD_SET(s, &events);
428 tm.tv_sec = (long)timeout;
429 tm.tv_usec = 0;
430 rc = select((int)s+1, NULL, &events, NULL, &tm);
431 if (rc <= 0) {
432 errcode = WSAGetLastError();
433 TRACE_MSG(("Socket select is failed: %d\n", errcode));
434 return false;
435 }
436 time_t now = time(NULL);
437 timeout = start + timeout >= now ? timeout + start - now : 0;
438 }
439 rc = send(s, (char*)buf, (int)size, 0);
440 if (rc < 0) {
441 errcode = WSAGetLastError();
442 TRACE_MSG(("Socket write is failed: %d\n", errcode));
443 return false;
444 } else if (rc == 0) {
445 errcode = broken_pipe;
446 TRACE_MSG(("Socket is disconnected\n"));
447 return false;
448 } else {
449 buf = (char*)buf + rc;
450 size -= rc;
451 }
452 } while (size != 0);
453
454 return true;
455 }
456
shutdown()457 bool win_socket::shutdown()
458 {
459 if (state == ss_open) {
460 state = ss_shutdown;
461 int rc = ::shutdown(s, 2);
462 if (rc != 0) {
463 errcode = WSAGetLastError();
464 TRACE_MSG(("Socket shutdown is failed: %d\n", errcode));
465 return false;
466 }
467 }
468 errcode = ok;
469 return true;
470 }
471
472
close()473 bool win_socket::close()
474 {
475 if (state != ss_close) {
476 state = ss_close;
477 if (closesocket(s) == 0) {
478 errcode = ok;
479 return true;
480 } else {
481 errcode = WSAGetLastError();
482 TRACE_MSG(("Socket close is failed: %d\n", errcode));
483 return false;
484 }
485 }
486 return true;
487 }
488
get_peer_name()489 char* win_socket::get_peer_name()
490 {
491 if (state != ss_open) {
492 errcode = not_opened;
493 return NULL;
494 }
495 struct sockaddr_in insock;
496 int len = sizeof(insock);
497 if (getpeername(s, (struct sockaddr*)&insock, &len) != 0) {
498 errcode = WSAGetLastError();
499 return NULL;
500 }
501 char* addr = inet_ntoa(insock.sin_addr);
502 if (addr == NULL) {
503 errcode = WSAGetLastError();
504 return NULL;
505 }
506 char* addr_copy = new char[strlen(addr)+1];
507 strcpy(addr_copy, addr);
508 errcode = ok;
509 return addr_copy;
510 }
511
~win_socket()512 win_socket::~win_socket()
513 {
514 close();
515 delete[] address;
516 }
517
win_socket(const char * addr)518 win_socket::win_socket(const char* addr)
519 {
520 address = new char[strlen(addr)+1];
521 strcpy(address, addr);
522 errcode = ok;
523 s = INVALID_SOCKET;
524 }
525
win_socket(SOCKET new_sock)526 win_socket::win_socket(SOCKET new_sock)
527 {
528 s = new_sock;
529 address = NULL;
530 state = ss_open;
531 errcode = ok;
532 }
533
create_local(char const * address,int listen_queue_size)534 socket_t* socket_t::create_local(char const* address, int listen_queue_size)
535 {
536 #ifdef _WINCE
537 return NULL;
538 #else
539 local_win_socket* sock = new local_win_socket(address);
540 sock->open(listen_queue_size);
541 return sock;
542 #endif
543 }
544
create_global(char const * address,int listen_queue_size)545 socket_t* socket_t::create_global(char const* address, int listen_queue_size)
546 {
547 win_socket* sock = new win_socket(address);
548 sock->open(listen_queue_size);
549 return sock;
550 }
551
connect(char const * address,socket_domain domain,int max_attempts,time_t timeout)552 socket_t* socket_t::connect(char const* address,
553 socket_domain domain,
554 int max_attempts,
555 time_t timeout)
556 {
557 #ifndef _WINCE
558 char hostname[MAX_HOST_NAME];
559 size_t hostname_len;
560 char const* port;
561
562 if (domain == sock_local_domain
563 || (domain == sock_any_domain
564 && ((port = strchr(address, ':')) == NULL
565 || ((hostname_len = port - address) == 9
566 && strncmp(address, "localhost", hostname_len) == 0)
567 || (gethostname(hostname, sizeof hostname) == 0
568 && strlen(hostname) == hostname_len
569 && strncmp(address, hostname, hostname_len) == 0))))
570 {
571 local_win_socket* s = new local_win_socket(address);
572 s->connect(max_attempts, timeout);
573 return s;
574 }
575 else
576 #endif
577 {
578 win_socket* s = new win_socket(address);
579 s->connect(max_attempts, timeout);
580 return s;
581 }
582 }
583
584 #ifndef _WINCE
585
586 //
587 // Local windows sockets
588 //
589
read(void * buf,size_t min_size,size_t max_size,time_t timeout)590 int local_win_socket::read(void* buf, size_t min_size, size_t max_size,
591 time_t timeout)
592 {
593 time_t start = 0;
594 char* dst = (char*)buf;
595 size_t size = 0;
596 Error = ok;
597 if (timeout != WAIT_FOREVER) {
598 start = time(NULL);
599 timeout *= 1000; // convert seconds to miliseconds
600 }
601 while (size < min_size && state == ss_open) {
602 RcvBuf->RcvWaitFlag = true;
603 MemoryBarrier();
604 size_t begin = RcvBuf->DataBeg;
605 size_t end = RcvBuf->DataEnd;
606 size_t rcv_size = (begin <= end)
607 ? end - begin : sizeof(RcvBuf->Data) - begin;
608 if (rcv_size > 0) {
609 RcvBuf->RcvWaitFlag = false;
610 if (rcv_size >= max_size) {
611 memcpy(dst, &RcvBuf->Data[begin], max_size);
612 begin += max_size;
613 size += max_size;
614 } else {
615 memcpy(dst, &RcvBuf->Data[begin], rcv_size);
616 begin += rcv_size;
617 dst += rcv_size;
618 size += rcv_size;
619 max_size -= rcv_size;
620 }
621 RcvBuf->DataBeg = (begin == sizeof(RcvBuf->Data)) ? 0 : (int)begin;
622 MemoryBarrier();
623 if (RcvBuf->SndWaitFlag) {
624 SetEvent(Signal[RTR]);
625 }
626 } else {
627 HANDLE h[2];
628 h[0] = Signal[RD];
629 h[1] = Mutex;
630 int rc = WaitForMultipleObjects(2, h, false, (DWORD)timeout);
631 RcvBuf->RcvWaitFlag = false;
632 if (rc != WAIT_OBJECT_0) {
633 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
634 Error = broken_pipe;
635 ReleaseMutex(Mutex);
636 } else if (rc == WAIT_TIMEOUT) {
637 return (int)size;
638 } else {
639 Error = GetLastError();
640 }
641 return -1;
642 }
643 if (timeout != WAIT_FOREVER) {
644 time_t now = time(NULL);
645 timeout = timeout >= (now - start)*1000
646 ? timeout - (now - start)*1000 : 0;
647 }
648 }
649 }
650 return size < min_size ? -1 : (int)size;
651 }
652
653
write(const void * buf,size_t size,time_t timeout)654 bool local_win_socket::write(const void* buf, size_t size, time_t timeout)
655 {
656 char* src = (char*)buf;
657 Error = ok;
658 while (size > 0 && state == ss_open) {
659 SndBuf->SndWaitFlag = true;
660 MemoryBarrier();
661 size_t begin = SndBuf->DataBeg;
662 size_t end = SndBuf->DataEnd;
663 size_t snd_size = (begin <= end)
664 ? sizeof(SndBuf->Data) - end - (begin == 0)
665 : begin - end - 1;
666 if (snd_size > 0) {
667 SndBuf->SndWaitFlag = false;
668 if (snd_size >= size) {
669 memcpy(&SndBuf->Data[end], src, size);
670 end += size;
671 size = 0;
672 } else {
673 memcpy(&SndBuf->Data[end], src, snd_size);
674 end += snd_size;
675 src += snd_size;
676 size -= snd_size;
677 }
678 SndBuf->DataEnd = (end == sizeof(SndBuf->Data)) ? 0 : (int)end;
679 MemoryBarrier();
680 if (SndBuf->RcvWaitFlag) {
681 SetEvent(Signal[TD]);
682 }
683 } else {
684 HANDLE h[2];
685 h[0] = Signal[RTT];
686 h[1] = Mutex;
687 int rc = WaitForMultipleObjects(2, h, false, (DWORD)timeout);
688 SndBuf->SndWaitFlag = false;
689 if (rc != WAIT_OBJECT_0) {
690 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
691 Error = broken_pipe;
692 ReleaseMutex(Mutex);
693 } else {
694 Error = GetLastError();
695 }
696 return false;
697 }
698 }
699 }
700 return size == 0;
701 }
702
703 #define MAX_ADDRESS_LEN 64
704
local_win_socket(const char * address)705 local_win_socket::local_win_socket(const char* address)
706 {
707 Name = new char[strlen(address)+1];
708 strcpy(Name, address);
709 Error = not_opened;
710 Mutex = NULL;
711 }
712
open(int)713 bool local_win_socket::open(int)
714 {
715 char buf[MAX_ADDRESS_LEN];
716 int i;
717
718 for (i = RD; i <= RTT; i++) {
719 sprintf(buf, "%s.%c", Name, i + '0');
720 Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, WC_STRING(buf));
721 if (GetLastError() == ERROR_ALREADY_EXISTS) {
722 WaitForSingleObject(Signal[i], 0);
723 }
724 if (!Signal[i]) {
725 Error = GetLastError();
726 while (--i >= 0) {
727 CloseHandle(Signal[i]);
728 }
729 return false;
730 }
731 }
732 sprintf(buf, "%s.shr", Name);
733 BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
734 0, sizeof(socket_buf)*2, WC_STRING(buf));
735 if (!BufHnd) {
736 Error = GetLastError();
737 for (i = RD; i <= RTT; i++) {
738 CloseHandle(Signal[i]);
739 }
740 return false;
741 }
742 RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
743 if (!RcvBuf) {
744 Error = GetLastError();
745 CloseHandle(BufHnd);
746 for (i = RD; i <= RTT; i++) {
747 CloseHandle(Signal[i]);
748 }
749 return false;
750 }
751 SndBuf = RcvBuf+1;
752 RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
753 SndBuf->DataBeg = SndBuf->DataEnd = 0;
754 Error = ok;
755 state = ss_open;
756 return true;
757 }
758
local_win_socket()759 local_win_socket::local_win_socket()
760 {
761 int i;
762 BufHnd = NULL;
763 Mutex = NULL;
764 Name = NULL;
765
766 for (i = RD; i <= RTT; i++) {
767 Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, NULL);
768 if (!Signal[i]) {
769 Error = GetLastError();
770 while (--i >= 0) {
771 CloseHandle(Signal[i]);
772 }
773 return;
774 }
775 }
776 // create anonymous shared memory section
777 BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
778 0, sizeof(socket_buf)*2, NULL);
779 if (!BufHnd) {
780 Error = GetLastError();
781 for (i = RD; i <= RTT; i++) {
782 CloseHandle(Signal[i]);
783 }
784 return;
785 }
786 RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
787 if (!RcvBuf) {
788 Error = GetLastError();
789 CloseHandle(BufHnd);
790 for (i = RD; i <= RTT; i++) {
791 CloseHandle(Signal[i]);
792 }
793 BufHnd = NULL;
794 return;
795 }
796 SndBuf = RcvBuf+1;
797 RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
798 SndBuf->DataBeg = SndBuf->DataEnd = 0;
799 Error = ok;
800 state = ss_open;
801 }
802
~local_win_socket()803 local_win_socket::~local_win_socket()
804 {
805 close();
806 delete[] Name;
807 }
808
accept()809 socket_t* local_win_socket::accept()
810 {
811 HANDLE h[2];
812
813 if (state != ss_open) {
814 return NULL;
815 }
816
817 connect_data* cdp = (connect_data*)SndBuf->Data;
818 cdp->Pid = GetCurrentProcessId();
819 cdp->Mutex = WatchDogMutex;
820 while (true) {
821 SetEvent(Signal[RTR]);
822 int rc = WaitForSingleObject(Signal[RD], ACCEPT_TIMEOUT);
823 if (rc == WAIT_OBJECT_0) {
824 if (state != ss_open) {
825 Error = not_opened;
826 return NULL;
827 }
828 Error = ok;
829 break;
830 } else if (rc != WAIT_TIMEOUT) {
831 Error = GetLastError();
832 return NULL;
833 }
834 }
835 local_win_socket* sock = new local_win_socket();
836 sock->Mutex = ((connect_data*)RcvBuf->Data)->Mutex;
837 accept_data* adp = (accept_data*)SndBuf->Data;
838 adp->BufHnd = sock->BufHnd;
839 for (int i = RD; i <= RTT; i++) {
840 adp->Signal[(i + TD - RD) & RTT] = sock->Signal[i];
841 }
842 SetEvent(Signal[TD]);
843 h[0] = Signal[RD];
844 h[1] = sock->Mutex;
845 int rc = WaitForMultipleObjects(2, h, false, INFINITE);
846 if (rc != WAIT_OBJECT_0) {
847 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
848 Error = broken_pipe;
849 ReleaseMutex(Mutex);
850 } else {
851 Error = GetLastError();
852 }
853 delete sock;
854 return NULL;
855 }
856 return sock;
857 }
858
cancel_accept()859 bool local_win_socket::cancel_accept()
860 {
861 state = ss_shutdown;
862 SetEvent(Signal[RD]);
863 SetEvent(Signal[RTT]);
864 return true;
865 }
866
get_peer_name()867 char* local_win_socket::get_peer_name()
868 {
869 if (state != ss_open) {
870 Error = not_opened;
871 return NULL;
872 }
873 char* addr = "127.0.0.1";
874 char* addr_copy = new char[strlen(addr)+1];
875 strcpy(addr_copy, addr);
876 Error = ok;
877 return addr_copy;
878 }
879
is_ok()880 bool local_win_socket::is_ok()
881 {
882 return !Error;
883 }
884
close()885 bool local_win_socket::close()
886 {
887 if (state != ss_close) {
888 state = ss_close;
889 if (Mutex) {
890 CloseHandle(Mutex);
891 }
892 for (int i = RD; i <= RTT; i++) {
893 CloseHandle(Signal[i]);
894 }
895 UnmapViewOfFile(RcvBuf < SndBuf ? RcvBuf : SndBuf);
896 CloseHandle(BufHnd);
897 Error = not_opened;
898 }
899 return true;
900 }
901
get_error_text(char * buf,size_t buf_size)902 void local_win_socket::get_error_text(char* buf, size_t buf_size)
903 {
904 switch (Error) {
905 case ok:
906 strncpy(buf, "ok", buf_size-1);
907 break;
908 case not_opened:
909 strncpy(buf, "socket not opened", buf_size-1);
910 break;
911 case broken_pipe:
912 strncpy(buf, "connection is broken", buf_size-1);
913 break;
914 case timeout_expired:
915 strncpy(buf, "connection timeout expired", buf_size-1);
916 break;
917 default:
918 #ifndef PHAR_LAP
919 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
920 NULL,
921 Error,
922 0,
923 WC_STRING(buf),
924 (DWORD)buf_size-1,
925 NULL);
926 #else
927 strncpy(buf, "Unknown socket error", buf_size-1);
928 #endif
929 }
930 buf[buf_size-1] = '\0';
931 }
932
933
shutdown()934 bool local_win_socket::shutdown()
935 {
936 if (state == ss_open) {
937 state = ss_shutdown;
938 SetEvent(Signal[RD]);
939 SetEvent(Signal[RTT]);
940 }
941 return true;
942 }
943
connect(int max_attempts,time_t timeout)944 bool local_win_socket::connect(int max_attempts, time_t timeout)
945 {
946 char buf[MAX_ADDRESS_LEN];
947 int rc, i, error_code;
948 HANDLE h[2];
949
950 for (i = RD; i <= RTT; i++) {
951 sprintf(buf, "%s.%c", Name, ((i + TD - RD) & RTT) + '0');
952 Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, WC_STRING(buf));
953 if (!Signal[i]) {
954 Error = GetLastError();
955 while (--i >= 0) {
956 CloseHandle(Signal[i]);
957 }
958 return false;
959 }
960 }
961 sprintf(buf, "%s.shr", Name);
962 BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
963 0, sizeof(socket_buf)*2, WC_STRING(buf));
964 if (!BufHnd) {
965 Error = GetLastError();
966 for (i = RD; i <= RTT; i++) {
967 CloseHandle(Signal[i]);
968 }
969 return false;
970 }
971 SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
972 if (!SndBuf) {
973 Error = GetLastError();
974 for (i = RD; i <= RTT; i++) {
975 CloseHandle(Signal[i]);
976 }
977 CloseHandle(BufHnd);
978 return false;
979 }
980 RcvBuf = SndBuf+1;
981 state = ss_shutdown;
982 Mutex = NULL;
983
984 rc = WaitForSingleObject(Signal[RTT], (DWORD)(timeout*max_attempts*MILLISECOND));
985 if (rc != WAIT_OBJECT_0) {
986 error_code = rc == WAIT_TIMEOUT ? timeout_expired : GetLastError();
987 close();
988 Error = error_code;
989 return false;
990 }
991 connect_data* cdp = (connect_data*)RcvBuf->Data;
992 HANDLE hServer = OpenProcess(STANDARD_RIGHTS_REQUIRED|PROCESS_DUP_HANDLE,
993 false, cdp->Pid);
994 if (!hServer) {
995 error_code = GetLastError();
996 close();
997 Error = error_code;
998 return false;
999 }
1000 HANDLE hSelf = GetCurrentProcess();
1001 if (!DuplicateHandle(hServer, cdp->Mutex, hSelf, &Mutex,
1002 0, false, DUPLICATE_SAME_ACCESS) ||
1003 !DuplicateHandle(hSelf, WatchDogMutex, hServer,
1004 &((connect_data*)SndBuf->Data)->Mutex,
1005 0, false, DUPLICATE_SAME_ACCESS))
1006 {
1007 error_code = GetLastError();
1008 CloseHandle(hServer);
1009 close();
1010 Error = error_code;
1011 return false;
1012 }
1013 SetEvent(Signal[TD]);
1014 h[0] = Signal[RD];
1015 h[1] = Mutex;
1016 rc = WaitForMultipleObjects(2, h, false, INFINITE);
1017
1018 if (rc != WAIT_OBJECT_0) {
1019 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
1020 error_code = broken_pipe;
1021 ReleaseMutex(Mutex);
1022 } else {
1023 error_code = GetLastError();
1024 }
1025 CloseHandle(hServer);
1026 close();
1027 Error = error_code;
1028 return false;
1029 }
1030 accept_data ad = *(accept_data*)RcvBuf->Data;
1031
1032 SetEvent(Signal[TD]);
1033 for (i = RD; i <= RTT; i++) {
1034 CloseHandle(Signal[i]);
1035 }
1036 UnmapViewOfFile(SndBuf);
1037 CloseHandle(BufHnd);
1038 BufHnd = NULL;
1039
1040 if (!DuplicateHandle(hServer, ad.BufHnd, hSelf, &BufHnd,
1041 0, false, DUPLICATE_SAME_ACCESS))
1042 {
1043 Error = GetLastError();
1044 CloseHandle(hServer);
1045 CloseHandle(Mutex);
1046 return false;
1047 } else {
1048 for (i = RD; i <= RTT; i++) {
1049 if (!DuplicateHandle(hServer, ad.Signal[i],
1050 hSelf, &Signal[i],
1051 0, false, DUPLICATE_SAME_ACCESS))
1052 {
1053 Error = GetLastError();
1054 CloseHandle(hServer);
1055 CloseHandle(BufHnd);
1056 CloseHandle(Mutex);
1057 while (--i >= 0) CloseHandle(Signal[1]);
1058 return false;
1059 }
1060 }
1061 }
1062 CloseHandle(hServer);
1063
1064 SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
1065 if (!SndBuf) {
1066 Error = GetLastError();
1067 CloseHandle(BufHnd);
1068 CloseHandle(Mutex);
1069 for (i = RD; i <= RTT; i++) {
1070 CloseHandle(Signal[i]);
1071 }
1072 return false;
1073 }
1074 RcvBuf = SndBuf+1;
1075 Error = ok;
1076 state = ss_open;
1077 return true;
1078 }
1079
get_handle()1080 socket_handle_t local_win_socket::get_handle()
1081 {
1082 return (socket_handle_t)-1;
1083 }
1084
1085 #endif
1086
1087 END_FASTDB_NAMESPACE
1088