1 //-< W32SOCK.CXX >---------------------------------------------------*--------*
2 // FastDB                    Version 1.0         (c) 1997  GARRET    *     ?  *
3 // (Main Memory Database Management System)                          *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:      8-May-97    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 19-May-97    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Windows sockets
9 //-------------------------------------------------------------------*--------*
10 
11 #define INSIDE_FASTDB
12 
13 #include "stdtp.h"
14 #include "w32sock.h"
15 #include "sync.h"
16 
17 BEGIN_FASTDB_NAMESPACE
18 
19 #define MAX_HOST_NAME         256
20 #define MILLISECOND           1000
21 
22 static HANDLE WatchDogMutex;
23 
24 #if (defined(_MSC_VER) && _MSC_VER <= 1310) || defined(__GNUC__) || defined(__BORLANDC__)
25 #define MEMORY_BARRIER_NOT_DEFINED true
26 #endif
27 
28 #ifdef MEMORY_BARRIER_NOT_DEFINED
29 
30 #ifdef __GNUC__
31 
32 #if (__GNUC__ > 4) || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
33 #define MemoryBarrier() __sync_synchronize()
34 #else
MemoryBarrier()35 inline void MemoryBarrier() {
36     asm volatile("mfence":::"memory");
37 }
38 #endif
39 
40 #else
41 
42 #if defined(_M_AMD64)
43 #define MemoryBarrier __faststorefence
44 #elif defined(_M_IA64)
45 #define MemoryBarrier __mf
46 #elif (defined(_M_IX86) || defined(_M_X64)) && !defined(__BORLANDC__)
MemoryBarrier()47 inline void MemoryBarrier() {
48     LONG Barrier;
49     __asm {
50         xchg Barrier, eax
51     }
52 }
53 #else
54 static dbMutex barrierMutex;
MemoryBarrier()55 inline void MemoryBarrier() { dbCriticalSection cs(barrierMutex); }
56 #endif
57 #endif
58 #endif
59 
60 class win_socket_library {
61   public:
62     SYSTEM_INFO sinfo;
63 
win_socket_library()64     win_socket_library() {
65         WSADATA wsa;
66         if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
67             fprintf(stderr,"Failed to initialize windows sockets: %d\n",
68                     WSAGetLastError());
69         }
70         //
71         // This mutex is used to recognize process termination
72         //
73         WatchDogMutex = CreateMutex(FASTDB_SECURITY_ATTRIBUTES, TRUE, NULL);
74 
75 #ifdef PHAR_LAP
76         sinfo.wProcessorLevel = 5;
77 #else
78         GetSystemInfo(&sinfo);
79 #endif
80     }
~win_socket_library()81     ~win_socket_library() {
82         //      WSACleanup();
83     }
84 };
85 
86 static win_socket_library ws32_lib;
87 
open(int listen_queue_size)88 bool win_socket::open(int listen_queue_size)
89 {
90     unsigned short port;
91     char* p;
92     char hostname[MAX_HOST_NAME];
93 
94     assert(address != NULL);
95 
96     if ((p = strchr(address, ':')) == NULL
97         || sscanf(p+1, "%hu", &port) != 1)
98     {
99         errcode = bad_address;
100         TRACE_MSG(("Invalid address: %s\n", address));
101         return false;
102     }
103     memcpy(hostname, address, p - address);
104     hostname[p - address] = '\0';
105 
106     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
107         errcode = WSAGetLastError();
108         TRACE_MSG(("Socket create is failed: %d\n", errcode));
109         return false;
110     }
111     struct sockaddr_in insock;
112     insock.sin_family = AF_INET;
113     if (*hostname && stricmp(hostname, "localhost") != 0) {
114         struct hostent* hp;  // entry in hosts table
115         if ((hp = gethostbyname(hostname)) == NULL
116             || hp->h_addrtype != AF_INET)
117         {
118             TRACE_MSG(("Failed to get host by name: %s\n", errno));
119             errcode = bad_address;
120             return false;
121         }
122         memcpy(&insock.sin_addr, hp->h_addr, sizeof insock.sin_addr);
123     } else {
124         insock.sin_addr.s_addr = htonl(INADDR_ANY);
125     }
126     insock.sin_port = htons(port);
127 
128     int on = 1;
129     setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
130 
131     if (bind(s, (sockaddr*)&insock, sizeof(insock)) != 0) {
132         errcode = WSAGetLastError();
133         TRACE_MSG(("Socket bind is failed: %d\n", errcode));
134         closesocket(s);
135         return false;
136     }
137     if (listen(s, listen_queue_size) != 0) {
138         errcode = WSAGetLastError();
139         TRACE_MSG(("Socket listen is failed: %d\n", errcode));
140         closesocket(s);
141         return false;
142     }
143     errcode = ok;
144     state = ss_open;
145     return true;
146 }
147 
is_ok()148 bool win_socket::is_ok()
149 {
150     return errcode == ok;
151 }
152 
get_handle()153 socket_handle_t win_socket::get_handle()
154 {
155     return s;
156 }
157 
get_error_text(char * buf,size_t buf_size)158 void win_socket::get_error_text(char* buf, size_t buf_size)
159 {
160     char* msg;
161     char  msgbuf[64];
162 
163     switch(errcode) {
164       case ok:
165         msg = "ok";
166         break;
167       case not_opened:
168         msg = "socket not opened";
169         break;
170       case bad_address:
171         msg = "bad address";
172         break;
173       case connection_failed:
174         msg = "exceed limit of attempts of connection to server";
175         break;
176       case broken_pipe:
177         msg = "connection is broken";
178         break;
179       case invalid_access_mode:
180         msg = "invalid access mode";
181         break;
182       default:
183 #ifndef PHAR_LAP
184         {
185           int   len;
186 #if defined(_WINCE) || defined(UNICODE)
187           wchar_t cnvBuf[CNV_BUF_SIZE];
188           FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
189                         NULL,
190                         errcode,
191                         0,
192                         cnvBuf,
193                         CNV_BUF_SIZE-1,
194                         NULL);
195           cnvBuf[CNV_BUF_SIZE-1] = '\0';
196           len = wcstombs(buf, cnvBuf, buf_size);
197 #else
198           len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
199                               NULL,
200                               errcode,
201                               0,
202                               buf,
203                               (DWORD)buf_size-1,
204                               NULL);
205 #endif
206           if (len == 0) {
207               sprintf(msgbuf, "unknown error code %u", errcode);
208               msg = msgbuf;
209           } else {
210               return;
211           }
212         }
213 #else
214         sprintf(msgbuf, "System error code: %u", errcode);
215         msg = msgbuf;
216 #endif
217     }
218     strncpy(buf, msg, buf_size-1);
219     buf[buf_size-1] = '\0';
220 }
221 
accept()222 socket_t* win_socket::accept()
223 {
224     if (state != ss_open) {
225         errcode = not_opened;
226         TRACE_MSG(("Socket not openned\n"));
227         return NULL;
228     }
229 
230     SOCKET new_sock = ::accept(s, NULL, NULL );
231 
232     if (new_sock == INVALID_SOCKET) {
233         errcode = WSAGetLastError();
234         TRACE_MSG(("Socket accept failed: %d\n", errcode));
235         return NULL;
236     } else {
237 #if SOCK_LINGER
238         static struct linger l = {1, LINGER_TIME};
239         if (setsockopt(new_sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
240             errcode = WSAGetLastError();
241             TRACE_MSG(("Failed to set socket options: %d\n", errcode));
242             closesocket(new_sock);
243             return NULL;
244         }
245 #endif
246 #if SOCK_NO_DELAY
247         int enabled = 1;
248         if (setsockopt(new_sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
249                        sizeof enabled) != 0)
250         {
251             errcode = WSAGetLastError();
252             TRACE_MSG(("Failed to set socket options: %d\n", errcode));
253             closesocket(new_sock);
254             return NULL;
255         }
256 #endif
257 #if SOCK_SNDBUF_SIZE
258         int size = SOCK_SNDBUF_SIZE;
259         setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof size);
260 #endif
261         errcode = ok;
262         return new win_socket(new_sock);
263     }
264 }
265 
cancel_accept()266 bool win_socket::cancel_accept()
267 {
268     bool result = close();
269     // Wakeup listener
270     delete socket_t::connect(address, sock_global_domain, 1, 0);
271     return result;
272 }
273 
274 
connect(int max_attempts,time_t timeout)275 bool win_socket::connect(int max_attempts, time_t timeout)
276 {
277     char hostname[MAX_HOST_NAME];
278     char *p;
279     unsigned short port;
280 
281     assert(address != NULL);
282 
283     if ((p = strchr(address, ':')) == NULL
284         || (size_t)(p - address) >= sizeof(hostname)
285         || sscanf(p+1, "%hu", &port) != 1)
286     {
287         errcode = bad_address;
288         TRACE_MSG(("Invalid address: %s\n", address));
289         return false;
290     }
291     memcpy(hostname, address, p - address);
292     hostname[p - address] = '\0';
293 
294     struct sockaddr_in insock;  // inet socket address
295     struct hostent*    hp;      // entry in hosts table
296 
297     if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET) {
298         TRACE_MSG(("Host name can not be resolved: %d\n", WSAGetLastError()));
299         errcode = bad_address;
300         return false;
301     }
302     insock.sin_family = AF_INET;
303     insock.sin_port = htons(port);
304 
305     while (true) {
306         for (int i = 0; hp->h_addr_list[i] != NULL; i++) {
307             memcpy(&insock.sin_addr, hp->h_addr_list[i],
308                    sizeof insock.sin_addr);
309             if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
310                 errcode = WSAGetLastError();
311                 TRACE_MSG(("Failed to create socket: %d\n", errcode));
312                 return false;
313             }
314             if (::connect(s, (sockaddr*)&insock, sizeof insock) != 0) {
315                 errcode = WSAGetLastError();
316                 closesocket(s);
317                 if (errcode != WSAECONNREFUSED) {
318                     TRACE_MSG(("Failed to establish connection: %d\n", errcode));
319                     return false;
320                 }
321             } else {
322 #if SOCK_NO_DELAY
323                 int enabled = 1;
324                 if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
325                                sizeof enabled) != 0)
326                 {
327                     errcode = WSAGetLastError();
328                     TRACE_MSG(("Failed to set socket options: %d\n", errcode));
329                     closesocket(s);
330                     return false;
331                 }
332 #endif
333 #if SOCK_LINGER
334                 static struct linger l = {1, LINGER_TIME};
335                 if (setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
336                     errcode = WSAGetLastError();
337                     TRACE_MSG(("Failed to set socket options: %d\n", errcode));
338                     closesocket(s);
339                     return NULL;
340                 }
341 #endif
342                 errcode = ok;
343                 state = ss_open;
344                 return true;
345             }
346         }
347         if (--max_attempts > 0) {
348             Sleep((DWORD)(timeout*MILLISECOND));
349         } else {
350             errcode = connection_failed;
351             TRACE_MSG(("All attempts to establish connection are failed\n"));
352             return false;
353         }
354     }
355 }
356 
read(void * buf,size_t min_size,size_t max_size,time_t timeout)357 int win_socket::read(void* buf, size_t min_size, size_t max_size,
358                      time_t timeout)
359 {
360     size_t size = 0;
361     time_t start = 0;
362     if (state != ss_open) {
363         errcode = not_opened;
364         TRACE_MSG(("Socket is not openned\n"));
365         return -1;
366     }
367     if (timeout != WAIT_FOREVER) {
368         start = time(NULL);
369     }
370 
371     do {
372         int rc;
373         if (timeout != WAIT_FOREVER) {
374             fd_set events;
375             struct timeval tm;
376             FD_ZERO(&events);
377             FD_SET(s, &events);
378             tm.tv_sec = (long)timeout;
379             tm.tv_usec = 0;
380             rc = select((int)s+1, &events, NULL, NULL, &tm);
381             if (rc < 0) {
382                 errcode = WSAGetLastError();
383                 TRACE_MSG(("Socket select is failed: %d\n", errcode));
384                 return -1;
385             }
386             if (rc == 0) {
387                 return (int)size;
388             }
389             time_t now = time(NULL);
390             timeout = start + timeout >= now ? timeout + start - now : 0;
391         }
392         rc = recv(s, (char*)buf + size, (int)(max_size - size), 0);
393         if (rc < 0) {
394             errcode = WSAGetLastError();
395             TRACE_MSG(("Socket read is failed: %d\n", errcode));
396             return -1;
397         } else if (rc == 0) {
398             errcode = broken_pipe;
399             TRACE_MSG(("Socket is disconnected\n"));
400             return -1;
401         } else {
402             size += rc;
403         }
404     } while (size < min_size);
405 
406     return (int)size;
407 }
408 
write(void const * buf,size_t size,time_t timeout)409 bool win_socket::write(void const* buf, size_t size, time_t timeout)
410 {
411     time_t start = 0;
412     if (state != ss_open) {
413         errcode = not_opened;
414         TRACE_MSG(("Socket is not openned\n"));
415         return false;
416     }
417     if (timeout != WAIT_FOREVER) {
418         start = time(NULL);
419     }
420 
421     do {
422         int rc;
423         if (timeout != WAIT_FOREVER) {
424             fd_set events;
425             struct timeval tm;
426             FD_ZERO(&events);
427             FD_SET(s, &events);
428             tm.tv_sec = (long)timeout;
429             tm.tv_usec = 0;
430             rc = select((int)s+1, NULL, &events, NULL, &tm);
431             if (rc <= 0) {
432                 errcode = WSAGetLastError();
433                 TRACE_MSG(("Socket select is failed: %d\n", errcode));
434                 return false;
435             }
436             time_t now = time(NULL);
437             timeout = start + timeout >= now ? timeout + start - now : 0;
438         }
439         rc = send(s, (char*)buf, (int)size, 0);
440         if (rc < 0) {
441             errcode = WSAGetLastError();
442             TRACE_MSG(("Socket write is failed: %d\n", errcode));
443             return false;
444         } else if (rc == 0) {
445             errcode = broken_pipe;
446             TRACE_MSG(("Socket is disconnected\n"));
447             return false;
448         } else {
449             buf = (char*)buf + rc;
450             size -= rc;
451         }
452     } while (size != 0);
453 
454     return true;
455 }
456 
shutdown()457 bool win_socket::shutdown()
458 {
459     if (state == ss_open) {
460         state = ss_shutdown;
461         int rc = ::shutdown(s, 2);
462         if (rc != 0) {
463             errcode = WSAGetLastError();
464             TRACE_MSG(("Socket shutdown is failed: %d\n", errcode));
465             return false;
466         }
467     }
468     errcode = ok;
469     return true;
470 }
471 
472 
close()473 bool win_socket::close()
474 {
475     if (state != ss_close) {
476         state = ss_close;
477         if (closesocket(s) == 0) {
478             errcode = ok;
479             return true;
480         } else {
481             errcode = WSAGetLastError();
482             TRACE_MSG(("Socket close is failed: %d\n", errcode));
483             return false;
484         }
485     }
486     return true;
487 }
488 
get_peer_name()489 char* win_socket::get_peer_name()
490 {
491     if (state != ss_open) {
492         errcode = not_opened;
493         return NULL;
494     }
495     struct sockaddr_in insock;
496     int len = sizeof(insock);
497     if (getpeername(s, (struct sockaddr*)&insock, &len) != 0) {
498         errcode = WSAGetLastError();
499         return NULL;
500     }
501     char* addr = inet_ntoa(insock.sin_addr);
502     if (addr == NULL) {
503         errcode = WSAGetLastError();
504         return NULL;
505     }
506     char* addr_copy = new char[strlen(addr)+1];
507     strcpy(addr_copy, addr);
508     errcode = ok;
509     return addr_copy;
510 }
511 
~win_socket()512 win_socket::~win_socket()
513 {
514     close();
515     delete[] address;
516 }
517 
win_socket(const char * addr)518 win_socket::win_socket(const char* addr)
519 {
520     address = new char[strlen(addr)+1];
521     strcpy(address, addr);
522     errcode = ok;
523     s = INVALID_SOCKET;
524 }
525 
win_socket(SOCKET new_sock)526 win_socket::win_socket(SOCKET new_sock)
527 {
528     s = new_sock;
529     address = NULL;
530     state = ss_open;
531     errcode = ok;
532 }
533 
create_local(char const * address,int listen_queue_size)534 socket_t* socket_t::create_local(char const* address, int listen_queue_size)
535 {
536 #ifdef _WINCE
537     return NULL;
538 #else
539     local_win_socket* sock = new local_win_socket(address);
540     sock->open(listen_queue_size);
541     return sock;
542 #endif
543 }
544 
create_global(char const * address,int listen_queue_size)545 socket_t* socket_t::create_global(char const* address, int listen_queue_size)
546 {
547     win_socket* sock = new win_socket(address);
548     sock->open(listen_queue_size);
549     return sock;
550 }
551 
connect(char const * address,socket_domain domain,int max_attempts,time_t timeout)552 socket_t* socket_t::connect(char const* address,
553                             socket_domain domain,
554                             int max_attempts,
555                             time_t timeout)
556 {
557 #ifndef _WINCE
558     char   hostname[MAX_HOST_NAME];
559     size_t hostname_len;
560     char const* port;
561 
562     if (domain == sock_local_domain
563         || (domain == sock_any_domain
564             && ((port = strchr(address, ':')) == NULL
565                 || ((hostname_len = port - address) == 9
566                     && strncmp(address, "localhost", hostname_len) == 0)
567                 || (gethostname(hostname, sizeof hostname) == 0
568                     && strlen(hostname) == hostname_len
569                     && strncmp(address, hostname, hostname_len) == 0))))
570     {
571         local_win_socket* s = new local_win_socket(address);
572         s->connect(max_attempts, timeout);
573         return s;
574     }
575     else
576 #endif
577     {
578         win_socket* s = new win_socket(address);
579         s->connect(max_attempts, timeout);
580         return s;
581     }
582 }
583 
584 #ifndef _WINCE
585 
586 //
587 // Local windows sockets
588 //
589 
read(void * buf,size_t min_size,size_t max_size,time_t timeout)590 int local_win_socket::read(void* buf, size_t min_size, size_t max_size,
591                            time_t timeout)
592 {
593     time_t start = 0;
594     char* dst = (char*)buf;
595     size_t size = 0;
596     Error = ok;
597     if (timeout != WAIT_FOREVER) {
598         start = time(NULL);
599         timeout *= 1000; // convert seconds to miliseconds
600     }
601     while (size < min_size && state == ss_open) {
602         RcvBuf->RcvWaitFlag = true;
603         MemoryBarrier();
604         size_t begin = RcvBuf->DataBeg;
605         size_t end = RcvBuf->DataEnd;
606         size_t rcv_size = (begin <= end)
607             ? end - begin : sizeof(RcvBuf->Data) - begin;
608         if (rcv_size > 0) {
609             RcvBuf->RcvWaitFlag = false;
610             if (rcv_size >= max_size) {
611                 memcpy(dst, &RcvBuf->Data[begin], max_size);
612                 begin += max_size;
613                 size += max_size;
614             } else {
615                 memcpy(dst, &RcvBuf->Data[begin], rcv_size);
616                 begin += rcv_size;
617                 dst += rcv_size;
618                 size += rcv_size;
619                 max_size -= rcv_size;
620             }
621             RcvBuf->DataBeg = (begin == sizeof(RcvBuf->Data)) ? 0 : (int)begin;
622             MemoryBarrier();
623             if (RcvBuf->SndWaitFlag) {
624                 SetEvent(Signal[RTR]);
625             }
626         } else {
627             HANDLE h[2];
628             h[0] = Signal[RD];
629             h[1] = Mutex;
630             int rc = WaitForMultipleObjects(2, h, false, (DWORD)timeout);
631             RcvBuf->RcvWaitFlag = false;
632             if (rc != WAIT_OBJECT_0) {
633                 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
634                     Error = broken_pipe;
635                     ReleaseMutex(Mutex);
636                 } else if (rc == WAIT_TIMEOUT) {
637                     return (int)size;
638                 } else {
639                     Error = GetLastError();
640                 }
641                 return -1;
642             }
643             if (timeout != WAIT_FOREVER) {
644                 time_t now = time(NULL);
645                 timeout = timeout >= (now - start)*1000
646                     ? timeout - (now - start)*1000 : 0;
647             }
648         }
649     }
650     return size < min_size ? -1 : (int)size;
651 }
652 
653 
write(const void * buf,size_t size,time_t timeout)654 bool local_win_socket::write(const void* buf, size_t size, time_t timeout)
655 {
656     char* src = (char*)buf;
657     Error = ok;
658     while (size > 0 && state == ss_open) {
659         SndBuf->SndWaitFlag = true;
660         MemoryBarrier();
661         size_t begin = SndBuf->DataBeg;
662         size_t end = SndBuf->DataEnd;
663         size_t snd_size = (begin <= end)
664             ? sizeof(SndBuf->Data) - end - (begin == 0)
665             : begin - end - 1;
666         if (snd_size > 0) {
667             SndBuf->SndWaitFlag = false;
668             if (snd_size >= size) {
669                 memcpy(&SndBuf->Data[end], src, size);
670                 end += size;
671                 size = 0;
672             } else {
673                 memcpy(&SndBuf->Data[end], src, snd_size);
674                 end += snd_size;
675                 src += snd_size;
676                 size -= snd_size;
677             }
678             SndBuf->DataEnd = (end == sizeof(SndBuf->Data)) ? 0 : (int)end;
679             MemoryBarrier();
680             if (SndBuf->RcvWaitFlag) {
681                 SetEvent(Signal[TD]);
682             }
683         } else {
684             HANDLE h[2];
685             h[0] = Signal[RTT];
686             h[1] = Mutex;
687             int rc = WaitForMultipleObjects(2, h, false, (DWORD)timeout);
688             SndBuf->SndWaitFlag = false;
689             if (rc != WAIT_OBJECT_0) {
690                 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
691                     Error = broken_pipe;
692                     ReleaseMutex(Mutex);
693                 } else {
694                     Error = GetLastError();
695                 }
696                 return false;
697             }
698         }
699     }
700     return size == 0;
701 }
702 
703 #define MAX_ADDRESS_LEN 64
704 
local_win_socket(const char * address)705 local_win_socket::local_win_socket(const char* address)
706 {
707     Name = new char[strlen(address)+1];
708     strcpy(Name, address);
709     Error = not_opened;
710     Mutex = NULL;
711 }
712 
open(int)713 bool local_win_socket::open(int)
714 {
715     char buf[MAX_ADDRESS_LEN];
716     int  i;
717 
718     for (i = RD; i <= RTT; i++) {
719         sprintf(buf, "%s.%c", Name, i + '0');
720         Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, WC_STRING(buf));
721         if (GetLastError() == ERROR_ALREADY_EXISTS) {
722             WaitForSingleObject(Signal[i], 0);
723         }
724         if (!Signal[i]) {
725             Error = GetLastError();
726             while (--i >= 0) {
727                 CloseHandle(Signal[i]);
728             }
729             return false;
730         }
731     }
732     sprintf(buf, "%s.shr", Name);
733     BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
734                                0, sizeof(socket_buf)*2, WC_STRING(buf));
735     if (!BufHnd) {
736         Error = GetLastError();
737         for (i = RD; i <= RTT; i++) {
738             CloseHandle(Signal[i]);
739         }
740         return false;
741     }
742     RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
743     if (!RcvBuf) {
744         Error = GetLastError();
745         CloseHandle(BufHnd);
746         for (i = RD; i <= RTT; i++) {
747             CloseHandle(Signal[i]);
748         }
749         return false;
750     }
751     SndBuf = RcvBuf+1;
752     RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
753     SndBuf->DataBeg = SndBuf->DataEnd = 0;
754     Error = ok;
755     state = ss_open;
756     return true;
757 }
758 
local_win_socket()759 local_win_socket::local_win_socket()
760 {
761     int i;
762     BufHnd = NULL;
763     Mutex = NULL;
764     Name = NULL;
765 
766     for (i = RD; i <= RTT; i++) {
767         Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, NULL);
768         if (!Signal[i]) {
769             Error = GetLastError();
770             while (--i >= 0) {
771                 CloseHandle(Signal[i]);
772             }
773             return;
774         }
775     }
776     // create anonymous shared memory section
777     BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
778                                0, sizeof(socket_buf)*2, NULL);
779     if (!BufHnd) {
780         Error = GetLastError();
781         for (i = RD; i <= RTT; i++) {
782             CloseHandle(Signal[i]);
783         }
784         return;
785     }
786     RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
787     if (!RcvBuf) {
788         Error = GetLastError();
789         CloseHandle(BufHnd);
790         for (i = RD; i <= RTT; i++) {
791             CloseHandle(Signal[i]);
792         }
793         BufHnd = NULL;
794         return;
795     }
796     SndBuf = RcvBuf+1;
797     RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
798     SndBuf->DataBeg = SndBuf->DataEnd = 0;
799     Error = ok;
800     state = ss_open;
801 }
802 
~local_win_socket()803 local_win_socket::~local_win_socket()
804 {
805     close();
806     delete[] Name;
807 }
808 
accept()809 socket_t* local_win_socket::accept()
810 {
811     HANDLE h[2];
812 
813     if (state != ss_open) {
814         return NULL;
815     }
816 
817     connect_data* cdp = (connect_data*)SndBuf->Data;
818     cdp->Pid = GetCurrentProcessId();
819     cdp->Mutex = WatchDogMutex;
820     while (true) {
821         SetEvent(Signal[RTR]);
822         int rc = WaitForSingleObject(Signal[RD], ACCEPT_TIMEOUT);
823         if (rc == WAIT_OBJECT_0) {
824             if (state != ss_open) {
825                 Error = not_opened;
826                 return NULL;
827             }
828             Error = ok;
829             break;
830         } else if (rc != WAIT_TIMEOUT) {
831             Error = GetLastError();
832             return NULL;
833         }
834     }
835     local_win_socket* sock = new local_win_socket();
836     sock->Mutex = ((connect_data*)RcvBuf->Data)->Mutex;
837     accept_data* adp = (accept_data*)SndBuf->Data;
838     adp->BufHnd = sock->BufHnd;
839     for (int i = RD; i <= RTT; i++) {
840         adp->Signal[(i + TD - RD) & RTT] = sock->Signal[i];
841     }
842     SetEvent(Signal[TD]);
843     h[0] = Signal[RD];
844     h[1] = sock->Mutex;
845     int rc = WaitForMultipleObjects(2, h, false, INFINITE);
846     if (rc != WAIT_OBJECT_0) {
847         if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
848             Error = broken_pipe;
849             ReleaseMutex(Mutex);
850         } else {
851             Error = GetLastError();
852         }
853         delete sock;
854         return NULL;
855     }
856     return sock;
857 }
858 
cancel_accept()859 bool local_win_socket::cancel_accept()
860 {
861     state = ss_shutdown;
862     SetEvent(Signal[RD]);
863     SetEvent(Signal[RTT]);
864     return true;
865 }
866 
get_peer_name()867 char* local_win_socket::get_peer_name()
868 {
869     if (state != ss_open) {
870         Error = not_opened;
871         return NULL;
872     }
873     char* addr = "127.0.0.1";
874     char* addr_copy = new char[strlen(addr)+1];
875     strcpy(addr_copy, addr);
876     Error = ok;
877     return addr_copy;
878 }
879 
is_ok()880 bool local_win_socket::is_ok()
881 {
882     return !Error;
883 }
884 
close()885 bool local_win_socket::close()
886 {
887     if (state != ss_close) {
888         state = ss_close;
889         if (Mutex) {
890             CloseHandle(Mutex);
891         }
892         for (int i = RD; i <= RTT; i++) {
893             CloseHandle(Signal[i]);
894         }
895         UnmapViewOfFile(RcvBuf < SndBuf ? RcvBuf : SndBuf);
896         CloseHandle(BufHnd);
897         Error = not_opened;
898     }
899     return true;
900 }
901 
get_error_text(char * buf,size_t buf_size)902 void local_win_socket::get_error_text(char* buf, size_t buf_size)
903 {
904     switch (Error) {
905       case ok:
906         strncpy(buf, "ok", buf_size-1);
907         break;
908       case not_opened:
909         strncpy(buf, "socket not opened", buf_size-1);
910         break;
911       case broken_pipe:
912         strncpy(buf, "connection is broken", buf_size-1);
913         break;
914       case timeout_expired:
915         strncpy(buf, "connection timeout expired", buf_size-1);
916         break;
917       default:
918 #ifndef PHAR_LAP
919         FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
920                       NULL,
921                       Error,
922                       0,
923                       WC_STRING(buf),
924                       (DWORD)buf_size-1,
925                       NULL);
926 #else
927         strncpy(buf, "Unknown socket error", buf_size-1);
928 #endif
929     }
930     buf[buf_size-1] = '\0';
931 }
932 
933 
shutdown()934 bool local_win_socket::shutdown()
935 {
936     if (state == ss_open) {
937         state = ss_shutdown;
938         SetEvent(Signal[RD]);
939         SetEvent(Signal[RTT]);
940     }
941     return true;
942 }
943 
connect(int max_attempts,time_t timeout)944 bool local_win_socket::connect(int max_attempts, time_t timeout)
945 {
946     char buf[MAX_ADDRESS_LEN];
947     int  rc, i, error_code;
948     HANDLE h[2];
949 
950     for (i = RD; i <= RTT; i++) {
951         sprintf(buf, "%s.%c", Name, ((i + TD - RD) & RTT) + '0');
952         Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, WC_STRING(buf));
953         if (!Signal[i]) {
954             Error = GetLastError();
955             while (--i >= 0) {
956                 CloseHandle(Signal[i]);
957             }
958             return false;
959         }
960     }
961     sprintf(buf, "%s.shr", Name);
962     BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
963                                0, sizeof(socket_buf)*2, WC_STRING(buf));
964     if (!BufHnd) {
965         Error = GetLastError();
966         for (i = RD; i <= RTT; i++) {
967             CloseHandle(Signal[i]);
968         }
969         return false;
970     }
971     SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
972     if (!SndBuf) {
973         Error = GetLastError();
974         for (i = RD; i <= RTT; i++) {
975             CloseHandle(Signal[i]);
976         }
977         CloseHandle(BufHnd);
978         return false;
979     }
980     RcvBuf = SndBuf+1;
981     state = ss_shutdown;
982     Mutex = NULL;
983 
984     rc = WaitForSingleObject(Signal[RTT], (DWORD)(timeout*max_attempts*MILLISECOND));
985     if (rc != WAIT_OBJECT_0) {
986         error_code = rc == WAIT_TIMEOUT ? timeout_expired : GetLastError();
987         close();
988         Error = error_code;
989         return false;
990     }
991     connect_data* cdp = (connect_data*)RcvBuf->Data;
992     HANDLE hServer = OpenProcess(STANDARD_RIGHTS_REQUIRED|PROCESS_DUP_HANDLE,
993                                  false, cdp->Pid);
994     if (!hServer) {
995         error_code = GetLastError();
996         close();
997         Error = error_code;
998         return false;
999     }
1000     HANDLE hSelf = GetCurrentProcess();
1001     if (!DuplicateHandle(hServer, cdp->Mutex, hSelf, &Mutex,
1002                          0, false, DUPLICATE_SAME_ACCESS) ||
1003         !DuplicateHandle(hSelf, WatchDogMutex, hServer,
1004                          &((connect_data*)SndBuf->Data)->Mutex,
1005                          0, false, DUPLICATE_SAME_ACCESS))
1006     {
1007         error_code = GetLastError();
1008         CloseHandle(hServer);
1009         close();
1010         Error = error_code;
1011         return false;
1012     }
1013     SetEvent(Signal[TD]);
1014     h[0] = Signal[RD];
1015     h[1] = Mutex;
1016     rc = WaitForMultipleObjects(2, h, false, INFINITE);
1017 
1018     if (rc != WAIT_OBJECT_0) {
1019         if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
1020             error_code = broken_pipe;
1021             ReleaseMutex(Mutex);
1022         } else {
1023             error_code = GetLastError();
1024         }
1025         CloseHandle(hServer);
1026         close();
1027         Error = error_code;
1028         return false;
1029     }
1030     accept_data ad = *(accept_data*)RcvBuf->Data;
1031 
1032     SetEvent(Signal[TD]);
1033     for (i = RD; i <= RTT; i++) {
1034         CloseHandle(Signal[i]);
1035     }
1036     UnmapViewOfFile(SndBuf);
1037     CloseHandle(BufHnd);
1038     BufHnd = NULL;
1039 
1040     if (!DuplicateHandle(hServer, ad.BufHnd, hSelf, &BufHnd,
1041                          0, false, DUPLICATE_SAME_ACCESS))
1042     {
1043         Error = GetLastError();
1044         CloseHandle(hServer);
1045         CloseHandle(Mutex);
1046         return false;
1047     } else {
1048         for (i = RD; i <= RTT; i++) {
1049             if (!DuplicateHandle(hServer, ad.Signal[i],
1050                                  hSelf, &Signal[i],
1051                                  0, false, DUPLICATE_SAME_ACCESS))
1052             {
1053                 Error = GetLastError();
1054                 CloseHandle(hServer);
1055                 CloseHandle(BufHnd);
1056                 CloseHandle(Mutex);
1057                 while (--i >= 0) CloseHandle(Signal[1]);
1058                 return false;
1059             }
1060         }
1061     }
1062     CloseHandle(hServer);
1063 
1064     SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
1065     if (!SndBuf) {
1066         Error = GetLastError();
1067         CloseHandle(BufHnd);
1068         CloseHandle(Mutex);
1069         for (i = RD; i <= RTT; i++) {
1070             CloseHandle(Signal[i]);
1071         }
1072         return false;
1073     }
1074     RcvBuf = SndBuf+1;
1075     Error = ok;
1076     state = ss_open;
1077     return true;
1078 }
1079 
get_handle()1080 socket_handle_t local_win_socket::get_handle()
1081 {
1082     return (socket_handle_t)-1;
1083 }
1084 
1085 #endif
1086 
1087 END_FASTDB_NAMESPACE
1088