1 //-< W32SOCK.CXX >---------------------------------------------------*--------*
2 // GigaBASE                  Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Post Relational Database Management System)                      *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:      8-May-97    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 19-May-97    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Windows sockets
9 //-------------------------------------------------------------------*--------*
10 
11 #if defined(_WIN32) || defined(__MINGW32__)
12 
13 #define INSIDE_GIGABASE
14 
15 #include "stdtp.h"
16 #include "w32sock.h"
17 #include "sync.h"
18 
19 BEGIN_GIGABASE_NAMESPACE
20 
21 #define MAX_HOST_NAME         256
22 #define MILLISECOND           1000
23 
24 static HANDLE WatchDogMutex;
25 #ifdef L64
26 #endif
27 
28 #if (defined(_MSC_VER) && _MSC_VER <= 1310) || defined(__GNUC__) || defined(__BORLANDC__)
29 #define MEMORY_BARRIER_NOT_DEFINED true
30 #endif
31 
32 #ifdef MEMORY_BARRIER_NOT_DEFINED
33 
34 #ifdef __GNUC__
35 
36 #if (__GNUC__ > 4) || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
37 #define MemoryBarrier() __sync_synchronize()
38 #else
MemoryBarrier()39 inline void MemoryBarrier() {
40     asm volatile("mfence":::"memory");
41 }
42 #endif
43 
44 #else
45 
46 #if defined(_M_AMD64)
47 #define MemoryBarrier __faststorefence
48 #elif defined(_M_IA64)
49 #define MemoryBarrier __mf
50 #elif (defined(_M_IX86) || defined(_M_X64)) && !defined(__BORLANDC__)
MemoryBarrier()51 inline void MemoryBarrier() {
52     LONG Barrier;
53     __asm {
54         xchg Barrier, eax
55     }
56 }
57 #else
58 static dbMutex barrierMutex;
MemoryBarrier()59 inline void MemoryBarrier() { dbCriticalSection cs(barrierMutex); }
60 #endif
61 #endif
62 #endif
63 
64 
65 class win_socket_library {
66   public:
67     SYSTEM_INFO sinfo;
68 
win_socket_library()69     win_socket_library() {
70         WSADATA wsa;
71         if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
72             fprintf(stderr,"Failed to initialize windows sockets: %d\n",
73                     WSAGetLastError());
74         }
75         //
76         // This mutex is used to recognize process termination
77         //
78         WatchDogMutex = CreateMutex(NULL, TRUE, NULL);
79 
80         GetSystemInfo(&sinfo);
81     }
~win_socket_library()82     ~win_socket_library() {
83         // WSACleanup();
84     }
85 };
86 
87 #ifdef SET_NULL_DACL
88 class dbNullSecurityDesciptor {
89   public:
90     SECURITY_DESCRIPTOR sd;
91     SECURITY_ATTRIBUTES sa;
92 
dbNullSecurityDesciptor()93     dbNullSecurityDesciptor() {
94         InitializeSecurityDescriptor(&sd, SECURITY_DESCRIPTOR_REVISION);
95         SetSecurityDescriptorDacl(&sd, TRUE, NULL, FALSE);
96         sa.nLength = sizeof(sa);
97         sa.bInheritHandle = TRUE;
98         sa.lpSecurityDescriptor = &sd;
99     }
100 
101     static dbNullSecurityDesciptor instance;
102 };
103 dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
104 #define GB_SECURITY_ATTRIBUTES &dbNullSecurityDesciptor::instance.sa
105 #else
106 #define GB_SECURITY_ATTRIBUTES NULL
107 #endif
108 
109 
110 static win_socket_library ws32_lib;
111 
open(int listen_queue_size)112 bool win_socket::open(int listen_queue_size)
113 {
114     unsigned short port;
115     char* p;
116     char hostname[MAX_HOST_NAME];
117 
118     assert(address != NULL);
119 
120     if ((p = strchr(address, ':')) == NULL
121         || sscanf(p+1, "%hu", &port) != 1)
122     {
123         errcode = bad_address;
124         return false;
125     }
126     memcpy(hostname, address, p - address);
127     hostname[p - address] = '\0';
128 
129     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
130         errcode = WSAGetLastError();
131         return false;
132     }
133     struct sockaddr_in insock;
134     insock.sin_family = AF_INET;
135     if (*hostname && _stricmp(hostname, "localhost") != 0) {
136         struct hostent* hp;  // entry in hosts table
137         if ((hp = gethostbyname(hostname)) == NULL
138             || hp->h_addrtype != AF_INET)
139         {
140             errcode = bad_address;
141             return false;
142         }
143         memcpy(&insock.sin_addr, hp->h_addr, sizeof insock.sin_addr);
144     } else {
145         insock.sin_addr.s_addr = htonl(INADDR_ANY);
146     }
147     insock.sin_port = htons(port);
148 
149     int on = 1;
150     setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
151 
152     if (bind(s, (sockaddr*)&insock, sizeof(insock)) != 0) {
153         errcode = WSAGetLastError();
154         closesocket(s);
155         return false;
156     }
157     if (listen(s, listen_queue_size) != 0) {
158         errcode = WSAGetLastError();
159         closesocket(s);
160         return false;
161     }
162     errcode = ok;
163     state = ss_open;
164     return true;
165 }
166 
is_ok()167 bool win_socket::is_ok()
168 {
169     return errcode == ok;
170 }
171 
get_error_text(char_t * buf,size_t buf_size)172 void win_socket::get_error_text(char_t* buf, size_t buf_size)
173 {
174     int     len;
175     char_t* msg;
176     char_t  msgbuf[64];
177 
178     switch(errcode) {
179       case ok:
180         msg = _T("ok");
181         break;
182       case not_opened:
183         msg = _T("socket not opened");
184         break;
185       case bad_address:
186         msg = _T("bad address");
187         break;
188       case connection_failed:
189         msg = _T("exceed limit of attempts of connection to server");
190         break;
191       case broken_pipe:
192         msg = _T("connection is broken");
193         break;
194       case invalid_access_mode:
195         msg = _T("invalid access mode");
196         break;
197       default:
198         len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
199                             NULL,
200                             errcode,
201                             0,
202                             buf,
203                             (DWORD)(buf_size-1),
204                             NULL);
205         if (len == 0) {
206             SPRINTF(SPRINTF_BUFFER(msgbuf), _T("unknown error code %u"), errcode);
207             msg = msgbuf;
208         } else {
209             return;
210         }
211     }
212     STRNCPY(buf, msg, buf_size-1);
213     buf[buf_size-1] = '\0';
214 }
215 
accept()216 socket_t* win_socket::accept()
217 {
218     if (state != ss_open) {
219         errcode = not_opened;
220         return NULL;
221     }
222 
223     SOCKET new_sock = ::accept(s, NULL, NULL );
224 
225     if (new_sock == INVALID_SOCKET) {
226         errcode = WSAGetLastError();
227         return NULL;
228     } else {
229 #ifdef USE_SO_LINGER
230         static struct linger l = {1, LINGER_TIME};
231         if (setsockopt(new_sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
232             errcode = invalid_access_mode;
233             closesocket(new_sock);
234             return NULL;
235         }
236 #endif
237         int enabled = 1;
238         if (setsockopt(new_sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
239                        sizeof enabled) != 0)
240         {
241             errcode = WSAGetLastError();
242             closesocket(new_sock);
243             return NULL;
244         }
245         errcode = ok;
246         return new win_socket(new_sock);
247     }
248 }
249 
cancel_accept()250 bool win_socket::cancel_accept()
251 {
252     bool result = close();
253     // Wakeup listener
254     delete socket_t::connect(address, sock_global_domain, 1, 0);
255     return result;
256 }
257 
258 
connect(int max_attempts,time_t timeout)259 bool win_socket::connect(int max_attempts, time_t timeout)
260 {
261     char hostname[MAX_HOST_NAME];
262     char *p;
263     unsigned short port;
264 
265     assert(address != NULL);
266 
267     if ((p = strchr(address, ':')) == NULL
268         || (size_t)(p - address) >= sizeof(hostname)
269         || sscanf(p+1, "%hu", &port) != 1)
270     {
271         errcode = bad_address;
272         return false;
273     }
274     memcpy(hostname, address, p - address);
275     hostname[p - address] = '\0';
276 
277     struct sockaddr_in insock;  // inet socket address
278     struct hostent*    hp;      // entry in hosts table
279 
280     if ((hp = gethostbyname(hostname)) == NULL || hp->h_addrtype != AF_INET) {
281         errcode = bad_address;
282         return false;
283     }
284     insock.sin_family = AF_INET;
285     insock.sin_port = htons(port);
286 
287     while (true) {
288         for (int i = 0; hp->h_addr_list[i] != NULL; i++) {
289             memcpy(&insock.sin_addr, hp->h_addr_list[i],
290                    sizeof insock.sin_addr);
291             if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
292                 errcode = WSAGetLastError();
293                 return false;
294             }
295             if (::connect(s, (sockaddr*)&insock, sizeof insock) != 0) {
296                 errcode = WSAGetLastError();
297                 closesocket(s);
298                 if (errcode != WSAECONNREFUSED) {
299                     return false;
300                 }
301             } else {
302                 int enabled = 1;
303                 if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled,
304                                sizeof enabled) != 0)
305                 {
306                     errcode = WSAGetLastError();
307                     closesocket(s);
308                     return false;
309                 }
310 #ifdef USE_SO_LINGER
311                 static struct linger l = {1, LINGER_TIME};
312                 if (setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof l) != 0) {
313                     errcode = invalid_access_mode;
314                     closesocket(s);
315                     return NULL;
316                 }
317 #endif
318                 errcode = ok;
319                 state = ss_open;
320                 return true;
321             }
322         }
323         if (--max_attempts > 0) {
324             Sleep((DWORD)(timeout*MILLISECOND));
325         } else {
326             errcode = connection_failed;
327             return false;
328         }
329     }
330 }
331 
read(void * buf,size_t min_size,size_t max_size,time_t timeout)332 int win_socket::read(void* buf, size_t min_size, size_t max_size,
333                      time_t timeout)
334 {
335     size_t size = 0;
336     time_t start = 0;
337     if (state != ss_open) {
338         errcode = not_opened;
339         return -1;
340     }
341     if (timeout != WAIT_FOREVER) {
342         start = time(NULL);
343     }
344 
345     do {
346         int rc;
347         if (timeout != WAIT_FOREVER) {
348             fd_set events;
349             struct timeval tm;
350             FD_ZERO(&events);
351             FD_SET(s, &events);
352             tm.tv_sec = (long)timeout;
353             tm.tv_usec = 0;
354             rc = select((int)s+1, &events, NULL, NULL, &tm);
355             if (rc < 0) {
356                 errcode = WSAGetLastError();
357                 return -1;
358             }
359             if (rc == 0) {
360                 return (int)size;
361             }
362             time_t now = time(NULL);
363             timeout = start + timeout >= now ? timeout + start - now : 0;
364         }
365         rc = recv(s, (char*)buf + size, (int)(max_size - size), 0);
366         if (rc < 0) {
367             errcode = WSAGetLastError();
368             return -1;
369         } else if (rc == 0) {
370             errcode = broken_pipe;
371             return -1;
372         } else {
373             size += rc;
374         }
375     } while (size < min_size);
376 
377     return (int)size;
378 }
379 
write(void const * buf,size_t size)380 bool win_socket::write(void const* buf, size_t size)
381 {
382     if (state != ss_open) {
383         errcode = not_opened;
384         return false;
385     }
386 
387     do {
388         int rc = send(s, (char*)buf, (int)size, 0);
389         if (rc < 0) {
390             errcode = WSAGetLastError();
391             return false;
392         } else if (rc == 0) {
393             errcode = broken_pipe;
394             return false;
395         } else {
396             buf = (char*)buf + rc;
397             size -= rc;
398         }
399     } while (size != 0);
400 
401     return true;
402 }
403 
shutdown()404 bool win_socket::shutdown()
405 {
406     if (state == ss_open) {
407         state = ss_shutdown;
408         int rc = ::shutdown(s, 2);
409         if (rc != 0) {
410             errcode = WSAGetLastError();
411             return false;
412         }
413     }
414     errcode = ok;
415     return true;
416 }
417 
418 
close()419 bool win_socket::close()
420 {
421     if (state != ss_close) {
422         state = ss_close;
423         if (closesocket(s) == 0) {
424             errcode = ok;
425             return true;
426         } else {
427             errcode = WSAGetLastError();
428             return false;
429         }
430     }
431     return true;
432 }
433 
get_peer_name()434 char* win_socket::get_peer_name()
435 {
436     if (state != ss_open) {
437         errcode = not_opened;
438         return NULL;
439     }
440     struct sockaddr_in insock;
441     int len = sizeof(insock);
442     if (getpeername(s, (struct sockaddr*)&insock, &len) != 0) {
443         errcode = WSAGetLastError();
444         return NULL;
445     }
446     char* addr = inet_ntoa(insock.sin_addr);
447     if (addr == NULL) {
448         errcode = WSAGetLastError();
449         return NULL;
450     }
451     char* addr_copy = new char[strlen(addr)+1];
452     strcpy(addr_copy, addr);
453     errcode = ok;
454     return addr_copy;
455 }
456 
~win_socket()457 win_socket::~win_socket()
458 {
459     close();
460     delete[] address;
461 }
462 
win_socket(const char * addr)463 win_socket::win_socket(const char* addr)
464 {
465     address = new char[strlen(addr)+1];
466     strcpy(address, addr);
467     errcode = ok;
468     s = INVALID_SOCKET;
469 }
470 
win_socket(SOCKET new_sock)471 win_socket::win_socket(SOCKET new_sock)
472 {
473     s = new_sock;
474     address = NULL;
475     state = ss_open;
476     errcode = ok;
477 }
478 
create_local(char const * address,int listen_queue_size)479 socket_t* socket_t::create_local(char const* address, int listen_queue_size)
480 {
481 #ifdef _WINCE
482     return NULL;
483 #else
484     local_win_socket* sock = new local_win_socket(address);
485     sock->open(listen_queue_size);
486     return sock;
487 #endif
488 }
489 
create_global(char const * address,int listen_queue_size)490 socket_t* socket_t::create_global(char const* address, int listen_queue_size)
491 {
492     win_socket* sock = new win_socket(address);
493     sock->open(listen_queue_size);
494     return sock;
495 }
496 
connect(char const * address,socket_domain domain,int max_attempts,time_t timeout)497 socket_t* socket_t::connect(char const* address,
498                             socket_domain domain,
499                             int max_attempts,
500                             time_t timeout)
501 {
502     char   hostname[MAX_HOST_NAME];
503     size_t hostname_len;
504     char const* port;
505 
506 #ifndef _WINCE
507     if (domain == sock_local_domain
508         || (domain == sock_any_domain
509             && ((port = strchr(address, ':')) == NULL
510                 || ((hostname_len = port - address) == 9
511                     && strncmp(address, "localhost", hostname_len) == 0)
512                 || (gethostname(hostname, sizeof hostname) == 0
513                     && strlen(hostname) == hostname_len
514                     && strncmp(address, hostname, hostname_len) == 0))))
515      {
516         local_win_socket* s = new local_win_socket(address);
517         s->connect(max_attempts, timeout);
518         return s;
519      }
520      else
521 #endif
522      {
523         win_socket* s = new win_socket(address);
524         s->connect(max_attempts, timeout);
525         return s;
526     }
527 }
528 
529 #ifndef _WINCE
530 
531 //
532 // Local windows sockets
533 //
534 
read(void * buf,size_t min_size,size_t max_size,time_t timeout)535 int local_win_socket::read(void* buf, size_t min_size, size_t max_size,
536                            time_t timeout)
537 {
538     time_t start = 0;
539     char* dst = (char*)buf;
540     size_t size = 0;
541     Error = ok;
542     if (timeout != WAIT_FOREVER) {
543         start = time(NULL);
544         timeout *= 1000; // convert seconds to miliseconds
545     }
546     while (size < min_size && state == ss_open) {
547         RcvBuf->RcvWaitFlag = true;
548         MemoryBarrier();
549         size_t begin = RcvBuf->DataBeg;
550         size_t end = RcvBuf->DataEnd;
551         size_t rcv_size = (begin <= end)
552             ? end - begin : sizeof(RcvBuf->Data) - begin;
553         if (rcv_size > 0) {
554             RcvBuf->RcvWaitFlag = false;
555             if (rcv_size >= max_size) {
556                 memcpy(dst, &RcvBuf->Data[begin], max_size);
557                 begin += max_size;
558                 size += max_size;
559             } else {
560                 memcpy(dst, &RcvBuf->Data[begin], rcv_size);
561                 begin += rcv_size;
562                 dst += rcv_size;
563                 size += rcv_size;
564                 max_size -= rcv_size;
565             }
566             RcvBuf->DataBeg = (begin == sizeof(RcvBuf->Data)) ? 0 : (int)begin;
567             MemoryBarrier();
568             if (RcvBuf->SndWaitFlag) {
569                 SetEvent(Signal[RTR]);
570             }
571         } else {
572             HANDLE h[2];
573             h[0] = Signal[RD];
574             h[1] = Mutex;
575             int rc = WaitForMultipleObjects(2, h, false, (DWORD)timeout);
576             RcvBuf->RcvWaitFlag = false;
577             if (rc != WAIT_OBJECT_0) {
578                 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
579                     Error = broken_pipe;
580                     ReleaseMutex(Mutex);
581                 } else if (rc == WAIT_TIMEOUT) {
582                     return (int)size;
583                 } else {
584                     Error = GetLastError();
585                 }
586                 return -1;
587             }
588             if (timeout != WAIT_FOREVER) {
589                 time_t now = time(NULL);
590                 timeout = timeout >= (now - start)*1000
591                     ? timeout - (now - start)*1000 : 0;
592             }
593         }
594     }
595     return size < min_size ? -1 : (int)size;
596 }
597 
598 
write(const void * buf,size_t size)599 bool local_win_socket::write(const void* buf, size_t size)
600 {
601     char* src = (char*)buf;
602     Error = ok;
603     while (size > 0 && state == ss_open) {
604         SndBuf->SndWaitFlag = true;
605         MemoryBarrier();
606         size_t begin = SndBuf->DataBeg;
607         size_t end = SndBuf->DataEnd;
608         size_t snd_size = (begin <= end)
609             ? sizeof(SndBuf->Data) - end - (begin == 0)
610             : begin - end - 1;
611         if (snd_size > 0) {
612             SndBuf->SndWaitFlag = false;
613             if (snd_size >= size) {
614                 memcpy(&SndBuf->Data[end], src, size);
615                 end += size;
616                 size = 0;
617             } else {
618                 memcpy(&SndBuf->Data[end], src, snd_size);
619                 end += snd_size;
620                 src += snd_size;
621                 size -= snd_size;
622             }
623             SndBuf->DataEnd = (end == sizeof(SndBuf->Data)) ? 0 : (int)end;
624             MemoryBarrier();
625             if (SndBuf->RcvWaitFlag) {
626                 SetEvent(Signal[TD]);
627             }
628         } else {
629             HANDLE h[2];
630             h[0] = Signal[RTT];
631             h[1] = Mutex;
632             int rc = WaitForMultipleObjects(2, h, false, INFINITE);
633             SndBuf->SndWaitFlag = false;
634             if (rc != WAIT_OBJECT_0) {
635                 if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
636                     Error = broken_pipe;
637                     ReleaseMutex(Mutex);
638                 } else {
639                     Error = GetLastError();
640                 }
641                 return false;
642             }
643         }
644     }
645     return size == 0;
646 }
647 
648 #define MAX_ADDRESS_LEN 64
649 
local_win_socket(const char * address)650 local_win_socket::local_win_socket(const char* address)
651 {
652     Name = new char[strlen(address)+1];
653     strcpy(Name, address);
654     Error = not_opened;
655     Mutex = NULL;
656 }
657 
open(int)658 bool local_win_socket::open(int)
659 {
660     char buf[MAX_ADDRESS_LEN];
661     int  i;
662 
663     for (i = RD; i <= RTT; i++) {
664         sprintf(buf, "%s.%c", Name, i + '0');
665         Signal[i] = CreateEventA(GB_SECURITY_ATTRIBUTES, false, false, buf);
666         if (GetLastError() == ERROR_ALREADY_EXISTS) {
667             WaitForSingleObject(Signal[i], 0);
668         }
669         if (!Signal[i]) {
670             Error = GetLastError();
671             while (--i >= 0) {
672                 CloseHandle(Signal[i]);
673             }
674             return false;
675         }
676     }
677     sprintf(buf, "%s.shr", Name);
678     BufHnd = CreateFileMappingA(INVALID_HANDLE_VALUE, GB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
679                                 0, sizeof(socket_buf)*2, buf);
680     if (!BufHnd) {
681         Error = GetLastError();
682         for (i = RD; i <= RTT; i++) {
683             CloseHandle(Signal[i]);
684         }
685         return false;
686     }
687     RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
688     if (!RcvBuf) {
689         Error = GetLastError();
690         CloseHandle(BufHnd);
691         for (i = RD; i <= RTT; i++) {
692             CloseHandle(Signal[i]);
693         }
694         return false;
695     }
696     SndBuf = RcvBuf+1;
697     RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
698     SndBuf->DataBeg = SndBuf->DataEnd = 0;
699     Error = ok;
700     state = ss_open;
701     return true;
702 }
703 
local_win_socket()704 local_win_socket::local_win_socket()
705 {
706     int i;
707     BufHnd = NULL;
708     Mutex = NULL;
709     Name = NULL;
710 
711     for (i = RD; i <= RTT; i++) {
712         Signal[i] = CreateEvent(GB_SECURITY_ATTRIBUTES, false, false, NULL);
713         if (!Signal[i]) {
714             Error = GetLastError();
715             while (--i >= 0) {
716                 CloseHandle(Signal[i]);
717             }
718             return;
719         }
720     }
721     // create anonymous shared memory section
722     BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, GB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
723                                0, sizeof(socket_buf)*2, NULL);
724     if (!BufHnd) {
725         Error = GetLastError();
726         for (i = RD; i <= RTT; i++) {
727             CloseHandle(Signal[i]);
728         }
729         return;
730     }
731     RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
732     if (!RcvBuf) {
733         Error = GetLastError();
734         CloseHandle(BufHnd);
735         for (i = RD; i <= RTT; i++) {
736             CloseHandle(Signal[i]);
737         }
738         BufHnd = NULL;
739         return;
740     }
741     SndBuf = RcvBuf+1;
742     RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
743     SndBuf->DataBeg = SndBuf->DataEnd = 0;
744     Error = ok;
745     state = ss_open;
746 }
747 
~local_win_socket()748 local_win_socket::~local_win_socket()
749 {
750     close();
751     delete[] Name;
752 }
753 
accept()754 socket_t* local_win_socket::accept()
755 {
756     HANDLE h[2];
757 
758     if (state != ss_open) {
759         return NULL;
760     }
761 
762     connect_data* cdp = (connect_data*)SndBuf->Data;
763     cdp->Pid = GetCurrentProcessId();
764     cdp->Mutex = WatchDogMutex;
765     while (true) {
766         SetEvent(Signal[RTR]);
767         int rc = WaitForSingleObject(Signal[RD], ACCEPT_TIMEOUT);
768         if (rc == WAIT_OBJECT_0) {
769             if (state != ss_open) {
770                 Error = not_opened;
771                 return NULL;
772             }
773             Error = ok;
774             break;
775         } else if (rc != WAIT_TIMEOUT) {
776             Error = GetLastError();
777             return NULL;
778         }
779     }
780     local_win_socket* sock = new local_win_socket();
781     sock->Mutex = ((connect_data*)RcvBuf->Data)->Mutex;
782     accept_data* adp = (accept_data*)SndBuf->Data;
783     adp->BufHnd = sock->BufHnd;
784     for (int i = RD; i <= RTT; i++) {
785         adp->Signal[(i + TD - RD) & RTT] = sock->Signal[i];
786     }
787     SetEvent(Signal[TD]);
788     h[0] = Signal[RD];
789     h[1] = sock->Mutex;
790     int rc = WaitForMultipleObjects(2, h, false, INFINITE);
791     if (rc != WAIT_OBJECT_0) {
792         if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
793             Error = broken_pipe;
794             ReleaseMutex(Mutex);
795         } else {
796             Error = GetLastError();
797         }
798         delete sock;
799         return NULL;
800     }
801     return sock;
802 }
803 
cancel_accept()804 bool local_win_socket::cancel_accept()
805 {
806     state = ss_shutdown;
807     SetEvent(Signal[RD]);
808     SetEvent(Signal[RTT]);
809     return true;
810 }
811 
get_peer_name()812 char* local_win_socket::get_peer_name()
813 {
814     if (state != ss_open) {
815         Error = not_opened;
816         return NULL;
817     }
818     char* addr = "127.0.0.1";
819     char* addr_copy = new char[strlen(addr)+1];
820     strcpy(addr_copy, addr);
821     Error = ok;
822     return addr_copy;
823 }
824 
is_ok()825 bool local_win_socket::is_ok()
826 {
827     return !Error;
828 }
829 
close()830 bool local_win_socket::close()
831 {
832     if (state != ss_close) {
833         state = ss_close;
834         if (Mutex) {
835             CloseHandle(Mutex);
836         }
837         for (int i = RD; i <= RTT; i++) {
838             CloseHandle(Signal[i]);
839         }
840         UnmapViewOfFile(RcvBuf < SndBuf ? RcvBuf : SndBuf);
841         CloseHandle(BufHnd);
842         Error = not_opened;
843     }
844     return true;
845 }
846 
get_error_text(char_t * buf,size_t buf_size)847 void local_win_socket::get_error_text(char_t* buf, size_t buf_size)
848 {
849     switch (Error) {
850       case ok:
851         STRNCPY(buf, _T("ok"), buf_size-1);
852         break;
853       case not_opened:
854         STRNCPY(buf, _T("socket not opened"), buf_size-1);
855         break;
856       case broken_pipe:
857         STRNCPY(buf, _T("connection is broken"), buf_size-1);
858         break;
859       case timeout_expired:
860         STRNCPY(buf, _T("connection timeout expired"), buf_size-1);
861         break;
862       default:
863         FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
864                       NULL,
865                       Error,
866                       0,
867                       buf,
868                       (DWORD)(buf_size-1),
869                       NULL);
870     }
871     buf[buf_size-1] = '\0';
872 }
873 
874 
shutdown()875 bool local_win_socket::shutdown()
876 {
877     if (state == ss_open) {
878         state = ss_shutdown;
879         SetEvent(Signal[RD]);
880         SetEvent(Signal[RTT]);
881     }
882     return true;
883 }
884 
connect(int max_attempts,time_t timeout)885 bool local_win_socket::connect(int max_attempts, time_t timeout)
886 {
887     char buf[MAX_ADDRESS_LEN];
888     int  rc, i, error_code;
889     HANDLE h[2];
890 
891     for (i = RD; i <= RTT; i++) {
892         sprintf(buf, "%s.%c", Name, ((i + TD - RD) & RTT) + '0');
893         Signal[i] = CreateEventA(GB_SECURITY_ATTRIBUTES, false, false, buf);
894         if (!Signal[i]) {
895             Error = GetLastError();
896             while (--i >= 0) {
897                 CloseHandle(Signal[i]);
898             }
899             return false;
900         }
901     }
902     sprintf(buf, "%s.shr", Name);
903     BufHnd = CreateFileMappingA(INVALID_HANDLE_VALUE, GB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
904                                 0, sizeof(socket_buf)*2, buf);
905     if (!BufHnd) {
906         Error = GetLastError();
907         for (i = RD; i <= RTT; i++) {
908             CloseHandle(Signal[i]);
909         }
910         return false;
911     }
912     SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
913     if (!SndBuf) {
914         Error = GetLastError();
915         for (i = RD; i <= RTT; i++) {
916             CloseHandle(Signal[i]);
917         }
918         CloseHandle(BufHnd);
919         return false;
920     }
921     RcvBuf = SndBuf+1;
922     state = ss_shutdown;
923     Mutex = NULL;
924 
925     rc = WaitForSingleObject(Signal[RTT], (DWORD)(timeout*max_attempts*MILLISECOND));
926     if (rc != WAIT_OBJECT_0) {
927         error_code = (rc == WAIT_TIMEOUT) ? (int)timeout_expired : (int)GetLastError();
928         close();
929         Error = error_code;
930         return false;
931     }
932     connect_data* cdp = (connect_data*)RcvBuf->Data;
933     HANDLE hServer = OpenProcess(STANDARD_RIGHTS_REQUIRED|PROCESS_DUP_HANDLE,
934                                  false, cdp->Pid);
935     if (!hServer) {
936         error_code = GetLastError();
937         close();
938         Error = error_code;
939         return false;
940     }
941     HANDLE hSelf = GetCurrentProcess();
942     if (!DuplicateHandle(hServer, cdp->Mutex, hSelf, &Mutex,
943                          0, false, DUPLICATE_SAME_ACCESS) ||
944         !DuplicateHandle(hSelf, WatchDogMutex, hServer,
945                          &((connect_data*)SndBuf->Data)->Mutex,
946                          0, false, DUPLICATE_SAME_ACCESS))
947     {
948         error_code = GetLastError();
949         CloseHandle(hServer);
950         close();
951         Error = error_code;
952         return false;
953     }
954     SetEvent(Signal[TD]);
955     h[0] = Signal[RD];
956     h[1] = Mutex;
957     rc = WaitForMultipleObjects(2, h, false, INFINITE);
958 
959     if (rc != WAIT_OBJECT_0) {
960         if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1) {
961             error_code = broken_pipe;
962             ReleaseMutex(Mutex);
963         } else {
964             error_code = GetLastError();
965         }
966         CloseHandle(hServer);
967         close();
968         Error = error_code;
969         return false;
970     }
971     accept_data ad = *(accept_data*)RcvBuf->Data;
972 
973     SetEvent(Signal[TD]);
974     for (i = RD; i <= RTT; i++) {
975         CloseHandle(Signal[i]);
976     }
977     UnmapViewOfFile(SndBuf);
978     CloseHandle(BufHnd);
979     BufHnd = NULL;
980 
981     if (!DuplicateHandle(hServer, ad.BufHnd, hSelf, &BufHnd,
982                          0, false, DUPLICATE_SAME_ACCESS))
983     {
984         Error = GetLastError();
985         CloseHandle(hServer);
986         CloseHandle(Mutex);
987         return false;
988     } else {
989         for (i = RD; i <= RTT; i++) {
990             if (!DuplicateHandle(hServer, ad.Signal[i],
991                                  hSelf, &Signal[i],
992                                  0, false, DUPLICATE_SAME_ACCESS))
993             {
994                 Error = GetLastError();
995                 CloseHandle(hServer);
996                 CloseHandle(BufHnd);
997                 CloseHandle(Mutex);
998                 while (--i >= 0) CloseHandle(Signal[1]);
999                 return false;
1000             }
1001         }
1002     }
1003     CloseHandle(hServer);
1004 
1005     SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);
1006     if (!SndBuf) {
1007         Error = GetLastError();
1008         CloseHandle(BufHnd);
1009         CloseHandle(Mutex);
1010         for (i = RD; i <= RTT; i++) {
1011             CloseHandle(Signal[i]);
1012         }
1013         return false;
1014     }
1015     RcvBuf = SndBuf+1;
1016     Error = ok;
1017     state = ss_open;
1018     return true;
1019 }
1020 
1021 
1022 #endif
1023 
1024 END_GIGABASE_NAMESPACE
1025 
1026 #endif
1027