1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*
19   Copyright (c) 2007 Alexander Eremin <netwhistler@gmail.com>
20 All rights reserved.
21 
22 Redistribution and use in source and binary forms, with or without
23 modification, are permitted provided that the following conditions are met:
24 
25 1. Redistributions of source code must retain the above copyright notice, this
26    list of conditions and the following disclaimer.
27 2. Redistributions in binary form must reproduce the above copyright notice,
28    this list of conditions and the following disclaimer in the documentation
29    and/or other materials provided with the distribution.
30 
31 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
32 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
33 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
34 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
35 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
36 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
38 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
40 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 
42 The views and conclusions contained in the software and documentation are those
43 of the authors and should not be interpreted as representing official policies,
44 either expressed or implied, of the FreeBSD Project.
45 */
46 
47 #include "mcsconfig.h"
48 
49 #include <cstdio>
50 #include <cerrno>
51 #ifdef _MSC_VER
52 #define WIN32_LEAN_AND_MEAN
53 #define NOMINMAX
54 #include <windows.h>
55 #include <winsock2.h>
56 #include <ws2tcpip.h>
57 #include <iphlpapi.h>
58 #include <icmpapi.h>
59 #include <stdio.h>
60 #else
61 #if __FreeBSD__
62 #include <sys/types.h>
63 #include <netinet/in.h>
64 #include <netinet/ip.h>
65 #endif
66 #include <sys/socket.h>
67 #include <poll.h>
68 #include <netinet/in.h>
69 #include <netinet/ip_icmp.h>
70 #include <arpa/inet.h>
71 #include <netinet/tcp.h>
72 #include <fcntl.h>
73 #include <sys/ioctl.h>
74 #endif
75 #include <sys/types.h>
76 #include <sys/time.h>
77 #include <cstring>
78 
79 #include <stdexcept>
80 #include <string>
81 #include <sstream>
82 using namespace std;
83 
84 #include <boost/scoped_array.hpp>
85 using boost::scoped_array;
86 
87 #define INETSTREAMSOCKET_DLLEXPORT
88 #include "inetstreamsocket.h"
89 #undef INETSTREAMSOCKET_DLLEXPORT
90 #include "bytestream.h"
91 #include "iosocket.h"
92 #include "socketparms.h"
93 #include "socketclosed.h"
94 #include "logger.h"
95 #include "loggingid.h"
96 #include "idbcompress.h"
97 
98 // some static functions
99 namespace
100 {
101 using messageqcpp::ByteStream;
102 
103 // @bug 2441 - Retry after 512 read() error.
104 // ERESTARTSYS (512) is a kernal I/O errno that is similar to a EINTR, except
105 // that it is not supposed to "leak" out into the user space.   But we are
106 // sometimes seeing "unknown error 512" error msgs in response to calls to
107 // read(), so adding logic to retry after ERESTARTSYS the way we do for EINTR.
108 //const int KERR_ERESTARTSYS = 512;
109 
110 #ifdef _MSC_VER
111 const int MaxSendPacketSize = 64 * 1024;
112 #endif
113 
in_cksum(unsigned short * buf,int sz)114 int in_cksum(unsigned short* buf, int sz)
115 {
116     int nleft = sz;
117     int sum = 0;
118     unsigned short* w = buf;
119     unsigned short ans = 0;
120 
121     while (nleft > 1)
122     {
123         sum += *w++;
124         nleft -= 2;
125     }
126 
127     if (nleft == 1)
128     {
129         *(unsigned char*)(&ans) = *(unsigned char*)w;
130         sum += ans;
131     }
132 
133     sum = (sum >> 16) + (sum & 0xFFFF);
134     sum += (sum >> 16);
135     ans = ~sum;
136     return ans;
137 }
138 
139 } //namespace anon
140 
141 namespace messageqcpp
142 {
143 
InetStreamSocket(size_t blocksize)144 InetStreamSocket::InetStreamSocket(size_t blocksize) :
145     fSocketParms(PF_INET, SOCK_STREAM, IPPROTO_TCP),
146     fBlocksize(blocksize),
147     fSyncProto(true),
148     fMagicBuffer(0)
149 {
150     memset(&fSa, 0, sizeof(fSa));
151     fConnectionTimeout.tv_sec = 20;
152     fConnectionTimeout.tv_nsec = 0;
153 }
154 
~InetStreamSocket()155 InetStreamSocket::~InetStreamSocket()
156 {
157 }
158 
open()159 void InetStreamSocket::open()
160 {
161     int bufferSize;
162     int ret;
163     socklen_t bufferSizeSize;
164 
165     if (isOpen())
166         throw logic_error("InetStreamSocket::open: socket is already open");
167 
168     int sd;
169     sd = ::socket(fSocketParms.domain(), fSocketParms.type(), fSocketParms.protocol());
170     int e = errno;
171 
172     if (sd < 0)
173     {
174 #ifdef _MSC_VER
175         int wsaError = WSAGetLastError();
176 
177         if (wsaError == WSANOTINITIALISED)
178         {
179             WSAData wsadata;
180             const WORD minVersion = MAKEWORD(2, 2);
181 
182             if (WSAStartup(minVersion, &wsadata) == 0)
183             {
184                 if (wsadata.wVersion == minVersion)
185                 {
186                     sd = ::socket(fSocketParms.domain(), fSocketParms.type(), fSocketParms.protocol());
187                     e = errno;
188 
189                     if (sd >= 0) goto setopts;
190                 }
191 
192                 //Didn't get the required min version, error out
193             }
194 
195             //WSAStartup failed, continue to report error
196         }
197 
198 #endif
199         string msg = "InetStreamSocket::open: socket() error: ";
200         scoped_array<char> buf(new char[80]);
201 #if STRERROR_R_CHAR_P
202         const char* p;
203 
204         if ((p = strerror_r(e, buf.get(), 80)) != 0)
205             msg += p;
206 
207 #else
208         int p;
209 
210         if ((p = strerror_r(e, buf.get(), 80)) == 0)
211             msg += buf.get();
212 
213 #endif
214         throw runtime_error(msg);
215     }
216 
217 #ifdef _MSC_VER
218 setopts:
219 #endif
220 
221     /*  XXXPAT:  If we have latency problems again, try these...
222     	bufferSizeSize = 4;
223     	bufferSize = 512000;
224     	setsockopt(sd, SOL_SOCKET, SO_SNDBUF, &bufferSize, bufferSizeSize);
225     	bufferSize = 512000;
226     	setsockopt(sd, SOL_SOCKET, SO_RCVBUF, &bufferSize, bufferSizeSize);
227     	bufferSize = 1;
228     	setsockopt(sd, SOL_SOCKET, SO_RCVLOWAT, &bufferSize, bufferSizeSize);
229     	setsockopt(sd, SOL_SOCKET, SO_SNDLOWAT, &bufferSize, bufferSizeSize);
230     */
231     bufferSize = 1;
232     bufferSizeSize = 4;
233     ret = setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (const char*)&bufferSize, bufferSizeSize);
234 
235     if (ret < 0)
236     {
237         perror("setsockopt");
238         exit(1);
239     }
240 
241     bufferSize = 1;
242     ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (const char*)&bufferSize, bufferSizeSize);
243 
244     if (ret < 0)
245     {
246         perror("setsockopt");
247         exit(1);
248     }
249 
250     fSocketParms.sd(sd);
251 }
252 
close()253 void InetStreamSocket::close()
254 {
255     if (isOpen())
256     {
257         ::shutdown(fSocketParms.sd(), SHUT_RDWR);
258 #ifdef _MSC_VER
259         ::closesocket(fSocketParms.sd());
260 #else
261         ::close(fSocketParms.sd());
262 #endif
263         fSocketParms.sd(-1);
264     }
265 }
266 
267 // needs to be in sync with clone()
doCopy(const InetStreamSocket & rhs)268 void InetStreamSocket::doCopy(const InetStreamSocket& rhs)
269 {
270     fBlocksize = rhs.fBlocksize;
271     fSocketParms = rhs.fSocketParms;
272     fSa = rhs.fSa;
273     fConnectionTimeout = rhs.fConnectionTimeout;
274     fSyncProto = rhs.fSyncProto;
275 }
276 
277 // needs to be in sync with doCopy()
clone() const278 Socket* InetStreamSocket::clone() const
279 {
280     InetStreamSocket* iss = new InetStreamSocket(fBlocksize);
281     iss->fSocketParms = fSocketParms;
282     iss->fSa = fSa;
283     iss->fConnectionTimeout = fConnectionTimeout;
284     iss->fSyncProto = fSyncProto;
285     return iss;
286 }
287 
InetStreamSocket(const InetStreamSocket & rhs)288 InetStreamSocket::InetStreamSocket(const InetStreamSocket& rhs)
289 {
290     doCopy(rhs);
291 }
292 
operator =(const InetStreamSocket & rhs)293 InetStreamSocket& InetStreamSocket::operator=(const InetStreamSocket& rhs)
294 {
295     if (this != &rhs)
296         doCopy(rhs);
297 
298     return *this;
299 }
300 
301 /* The caller needs to know when/if the remote closes the connection or sends data.
302  * Returns 0 on timeout, 1 if there is data to read, 2 if the connection was dropped.
303  */
pollConnection(int connectionNum,long msecs)304 int InetStreamSocket::pollConnection(int connectionNum, long msecs)
305 {
306     struct pollfd pfd[1];
307     int err;
308 
309 retry:
310     memset(&pfd, 0, sizeof(struct pollfd));
311     pfd[0].fd = connectionNum;
312     pfd[0].events = POLLIN;
313     err = poll(pfd, 1, msecs);
314 
315     if (err < 0)
316     {
317         int e = errno;
318 
319         if (e == EINTR || e == KERR_ERESTARTSYS)
320             goto retry;
321     }
322 
323     // Linux doesn't set POLLHUP, need add'l check for data or EOF
324     if (pfd[0].revents & POLLIN)
325     {
326         char buf;
327         err = ::recv(connectionNum, &buf, 1, MSG_PEEK);
328 
329         if (err == 0)
330             return 2;
331         else if (err == 1)	// there is in fact data to read
332             return 1;
333         else
334             return 3;
335     }
336 
337     if (err == 0)	// timeout
338         return 0;
339 
340     return 3;       // catch-all error code
341 }
342 
343 /* returns true when the next thing in the stream is the beginning of a new
344 ByteStream object. */
readToMagic(long msecs,bool * isTimeOut,Stats * stats) const345 bool InetStreamSocket::readToMagic(long msecs, bool* isTimeOut, Stats* stats) const
346 {
347     int err;
348     struct pollfd pfd[1];
349     uint8_t* magicBuffer8;
350 
351     fMagicBuffer = 0;
352     magicBuffer8 = reinterpret_cast<uint8_t*>(&fMagicBuffer);
353     pfd[0].fd = fSocketParms.sd();
354     pfd[0].events = POLLIN;
355 
356     while ((fMagicBuffer != BYTESTREAM_MAGIC) && (fMagicBuffer != COMPRESSED_BYTESTREAM_MAGIC))
357     {
358 
359         if (msecs >= 0)
360         {
361             pfd[0].revents = 0;
362 
363             err = poll(pfd, 1, msecs);
364 
365             if (err < 0)
366             {
367                 int e = errno;
368 
369                 if (e == EINTR)
370                 {
371                     continue;
372                 }
373 
374                 if (e == KERR_ERESTARTSYS)
375                 {
376                     logIoError("InetStreamSocket::readToMagic(): I/O error1", e);
377                     continue;
378                 }
379 
380                 ostringstream oss;
381                 oss << "InetStreamSocket::readToMagic(): I/O error1: " <<
382                     strerror(e);
383                 throw runtime_error(oss.str());
384             }
385 
386             if (pfd[0].revents & (POLLHUP | POLLNVAL | POLLERR))
387             {
388                 ostringstream oss;
389                 oss << "InetStreamSocket::readToMagic(): I/O error1: rc-" <<
390                     err << "; poll signal interrupt ( ";
391 
392                 if (pfd[0].revents & POLLHUP)
393                     oss << "POLLHUP ";
394 
395                 if (pfd[0].revents & POLLNVAL)
396                     oss << "POLLNVAL ";
397 
398                 if (pfd[0].revents & POLLERR)
399                     oss << "POLLERR ";
400 
401                 oss << ")";
402                 throw runtime_error(oss.str());
403             }
404 
405             if (err == 0) // timeout
406             {
407                 if (isTimeOut)
408                     *isTimeOut = true;
409 
410                 return false;
411             }
412         }
413 
414         fMagicBuffer = fMagicBuffer >> 8;
415 retry:
416 #ifdef _MSC_VER
417         err = ::recv(fSocketParms.sd(), (char*)&magicBuffer8[3], 1, 0);
418 #else
419         err = ::read(fSocketParms.sd(), &magicBuffer8[3], 1);
420 #endif
421 
422         if (err < 0)
423         {
424             int e = errno;
425 #ifdef _MSC_VER
426 
427             if (WSAGetLastError() == WSAECONNRESET)
428             {
429                 //throw runtime_error("connection reset by peer");
430                 if (msecs < 0) return false;
431                 else throw SocketClosed("InetStreamSocket::readToMagic: Remote is closed");
432             }
433 
434 #endif
435 
436             if (e == EINTR)
437             {
438                 goto retry;
439             }
440 
441             if (e == KERR_ERESTARTSYS)
442             {
443                 logIoError("InetStreamSocket::readToMagic(): I/O error2.0", e);
444                 goto retry;
445             }
446 
447             ostringstream oss;
448             oss << "InetStreamSocket::readToMagic(): I/O error2.1: " <<
449                 "err = " << err << " e = " << e <<
450 #ifdef _MSC_VER
451                 " WSA error = " << WSAGetLastError() <<
452 #endif
453                 ": " << strerror(e);
454             throw runtime_error(oss.str());
455         }
456 
457         // EOF. If no timeout was specified, ByteStream() gets returned to the caller.
458         // If one was, throw SocketClosed.
459         if (err == 0)	// EOF. if a timeout was specified, ByteStream()
460         {
461             if (msecs < 0)
462                 return false;
463             else
464                 throw SocketClosed("InetStreamSocket::readToMagic: Remote is closed");
465         }
466 
467         if (stats)
468             stats->dataRecvd(1);
469     }
470 
471     return true;
472 }
473 
read(const struct::timespec * timeout,bool * isTimeOut,Stats * stats) const474 const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const
475 {
476     long msecs = -1;
477 
478     struct pollfd pfd[1];
479     pfd[0].fd = fSocketParms.sd();
480     pfd[0].events = POLLIN;
481 
482     if (timeout != 0)
483         msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
484 
485     // we need to read the 4-byte message length first.
486     uint32_t msglen;
487     uint8_t* msglenp = reinterpret_cast<uint8_t*>(&msglen);
488     size_t mlread = 0;
489 
490     if (readToMagic(msecs, isTimeOut, stats) == false)	//indicates a timeout or EOF
491     {
492         // MCOL-480 The connector calls with timeout in a loop so that
493         // it can check a killed flag. This means that for a long running query,
494         // the following fills the warning log.
495 //		if (isTimeOut && *isTimeOut)
496 //		{
497 //			logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
498 //		}
499         return SBS(new ByteStream(0));
500     }
501 
502     //FIXME: This seems like a lot of work to read 4 bytes...
503     while (mlread < sizeof(msglen))
504     {
505         ssize_t t;
506 
507         if (timeout != NULL)
508         {
509             int err;
510 
511             pfd[0].revents = 0;
512             err = poll(pfd, 1, msecs);
513 
514             if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
515             {
516                 ostringstream oss;
517                 oss << "InetStreamSocket::read: I/O error1: " <<
518                     strerror(errno);
519                 throw runtime_error(oss.str());
520             }
521 
522             if (err == 0)  // timeout
523             {
524                 if (isTimeOut)
525                     *isTimeOut = true;
526 
527                 logIoError("InetStreamSocket::read: timeout during first poll", 0);
528                 return SBS(new ByteStream(0));
529             }
530         }
531 
532 #ifdef _MSC_VER
533         t = ::recv(fSocketParms.sd(), (char*)(msglenp + mlread), sizeof(msglen) - mlread, 0);
534 #else
535         t = ::read(fSocketParms.sd(), msglenp + mlread, sizeof(msglen) - mlread);
536 #endif
537 
538         if (t == 0)
539         {
540             if (timeout == NULL)
541             {
542                 logIoError("InetStreamSocket::read: timeout during first read", 0);
543                 return SBS(new ByteStream(0));  // don't return an incomplete message
544             }
545             else
546                 throw SocketClosed("InetStreamSocket::read: Remote is closed");
547         }
548 
549         if (t < 0)
550         {
551             int e = errno;
552 
553             if (e == EINTR)
554             {
555                 continue;
556             }
557 
558             if (e == KERR_ERESTARTSYS)
559             {
560                 logIoError("InetStreamSocket::read: I/O error2", e);
561                 continue;
562             }
563 
564             ostringstream oss;
565             oss << "InetStreamSocket::read: I/O error2: " <<
566                 strerror(e);
567             throw runtime_error(oss.str());
568         }
569 
570         mlread += t;
571     }
572 
573     if (stats)
574         stats->dataRecvd(sizeof(msglen));
575 
576     SBS res(new ByteStream(msglen));
577     uint8_t* bufp = res->getInputPtr();
578 
579     size_t nread = 0;
580 
581     //Finally read the actual message...
582     while (nread < msglen)
583     {
584         ssize_t t;
585 
586         if (timeout != NULL)
587         {
588             int err;
589 
590             pfd[0].revents = 0;
591             err = poll(pfd, 1, msecs);
592 
593             if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
594             {
595                 ostringstream oss;
596                 oss << "InetStreamSocket::read: I/O error3: " <<
597                     strerror(errno);
598                 throw runtime_error(oss.str());
599             }
600 
601             if (err == 0)  // timeout
602             {
603                 if (isTimeOut)
604                 {
605                     logIoError("InetStreamSocket::read: timeout during second poll", 0);
606                     *isTimeOut = true;
607                 }
608 
609                 if (stats)
610                     stats->dataRecvd(nread);
611 
612                 return SBS(new ByteStream(0));
613             }
614         }
615 
616 #ifdef _MSC_VER
617         int readAmount = std::min((int)msglen - (int)nread, MaxSendPacketSize);
618         t = ::recv(fSocketParms.sd(), (char*)(bufp + nread), readAmount, 0);
619 #else
620         t = ::read(fSocketParms.sd(), bufp + nread, msglen - nread);
621 #endif
622 
623         if (t == 0)
624         {
625             if (stats)
626                 stats->dataRecvd(nread);
627 
628             if (timeout == NULL)
629                 return SBS(new ByteStream(0));	// don't return an incomplete message
630             else
631             {
632                 logIoError("InetStreamSocket::read: timeout during second read", 0);
633                 throw SocketClosed("InetStreamSocket::read: Remote is closed");
634             }
635         }
636 
637         if (t < 0)
638         {
639             ostringstream oss;
640 #ifdef _MSC_VER
641             int e = WSAGetLastError();
642             oss << "InetStreamSocket::read: I/O error4: WSA: " << e;
643 #else
644             int e = errno;
645 
646             if (e == EINTR)
647             {
648                 continue;
649             }
650 
651             if (e == KERR_ERESTARTSYS)
652             {
653                 logIoError("InetStreamSocket::read: I/O error4", e);
654                 continue;
655             }
656 
657             oss << "InetStreamSocket::read: I/O error4: " <<
658                 strerror(e);
659 #endif
660 
661             if (stats)
662                 stats->dataRecvd(nread);
663 
664             throw runtime_error(oss.str());
665         }
666 
667         nread += t;
668     }
669 
670     if (stats)
671         stats->dataRecvd(msglen);
672 
673     res->advanceInputPtr(msglen);
674     return res;
675 }
676 
677 /*
678 * The protocol here is that we write the length of the ByteStream first, then the bytes. On the
679 * read side, we reverse it.
680 */
681 
write(SBS msg,Stats * stats)682 void InetStreamSocket::write(SBS msg, Stats* stats)
683 {
684     write(*msg, stats);
685 }
686 
do_write(const ByteStream & msg,uint32_t whichMagic,Stats * stats) const687 void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stats* stats) const
688 {
689     uint32_t msglen = msg.length();
690     uint32_t magic = whichMagic;
691     uint32_t* realBuf;
692 
693     if (msglen == 0) return;
694 
695     /* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there
696        are at least 8 bytes before that for the magic & length fields */
697     realBuf = (uint32_t*)msg.buf();
698     realBuf -= 2;
699     realBuf[0] = magic;
700     realBuf[1] = msglen;
701 
702     try
703     {
704         written(fSocketParms.sd(), (const uint8_t*)realBuf, msglen + sizeof(msglen) + sizeof(magic));
705     }
706     catch (std::exception& ex)
707     {
708         string errorMsg(ex.what());
709         errorMsg += " -- write from " + toString();
710         throw runtime_error(errorMsg);
711     }
712 
713     if (stats)
714         stats->dataSent(msglen + sizeof(msglen) + sizeof(magic));
715 }
716 
write(const ByteStream & msg,Stats * stats)717 void InetStreamSocket::write(const ByteStream& msg, Stats* stats)
718 {
719     do_write(msg, BYTESTREAM_MAGIC, stats);
720 }
721 
write_raw(const ByteStream & msg,Stats * stats) const722 void InetStreamSocket::write_raw(const ByteStream& msg, Stats* stats) const
723 {
724     uint32_t msglen = msg.length();
725 
726     if (msglen == 0) return;
727 
728     try
729     {
730         written(fSocketParms.sd(), msg.buf(), msglen);
731     }
732     catch (std::exception& ex)
733     {
734         string errorMsg(ex.what());
735         errorMsg += " -- write_raw from " + toString();
736         throw runtime_error(errorMsg);
737     }
738 
739     if (stats)
740         stats->dataSent(msglen);
741 }
742 
bind(const sockaddr * serv_addr)743 void InetStreamSocket::bind(const sockaddr* serv_addr)
744 {
745     memcpy(&fSa, serv_addr, sizeof(sockaddr_in));
746 
747     if (::bind(fSocketParms.sd(), serv_addr, sizeof(sockaddr_in)) != 0)
748     {
749         int e = errno;
750         string msg = "InetStreamSocket::bind: bind() error: ";
751         scoped_array<char> buf(new char[80]);
752 #if STRERROR_R_CHAR_P
753         const char* p;
754 
755         if ((p = strerror_r(e, buf.get(), 80)) != 0)
756             msg += p;
757 
758 #else
759         int p;
760 
761         if ((p = strerror_r(e, buf.get(), 80)) == 0)
762             msg += buf.get();
763 
764 #endif
765         throw runtime_error(msg);
766     }
767 
768 }
769 
listen(int backlog)770 void InetStreamSocket::listen(int backlog)
771 {
772 #ifndef _MSC_VER
773     fcntl(socketParms().sd(), F_SETFD, fcntl(socketParms().sd(), F_GETFD) | FD_CLOEXEC);
774 #endif
775 
776     if (::listen(socketParms().sd(), backlog) != 0)
777     {
778         int e = errno;
779         string msg = "InetStreamSocket::listen: listen() error: ";
780         scoped_array<char> buf(new char[80]);
781 #if STRERROR_R_CHAR_P
782         const char* p;
783 
784         if ((p = strerror_r(e, buf.get(), 80)) != 0)
785             msg += p;
786 
787 #else
788         int p;
789 
790         if ((p = strerror_r(e, buf.get(), 80)) == 0)
791             msg += buf.get();
792 
793 #endif
794         throw runtime_error(msg);
795     }
796 
797 }
798 
accept(const struct timespec * timeout)799 const IOSocket InetStreamSocket::accept(const struct timespec* timeout)
800 {
801     int clientfd;
802     long msecs = 0;
803 
804     IOSocket ios(new InetStreamSocket(fBlocksize));
805 
806     struct pollfd pfd[1];
807     pfd[0].fd = socketParms().sd();
808     pfd[0].events = POLLIN;
809 
810     if (timeout != 0)
811     {
812         msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
813 
814         if (poll(pfd, 1, msecs) != 1 || (pfd[0].revents & POLLIN) == 0 ||
815                 pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
816             return ios;
817     }
818 
819     struct sockaddr sa;
820 
821     socklen_t sl = sizeof(sa);
822 
823     int e;
824 
825     do
826     {
827         clientfd = ::accept(socketParms().sd(), &sa, &sl);
828         e = errno;
829     }
830     while (clientfd < 0 && (e == EINTR ||
831 #ifdef ERESTART
832                             e == ERESTART ||
833 #endif
834 #ifdef ECONNABORTED
835                             e == ECONNABORTED ||
836 #endif
837                             false));
838 
839     if (clientfd < 0)
840     {
841         string msg = "InetStreamSocket::accept: accept() error: ";
842         scoped_array<char> buf(new char[80]);
843 #if STRERROR_R_CHAR_P
844         const char* p;
845 
846         if ((p = strerror_r(e, buf.get(), 80)) != 0)
847             msg += p;
848 
849 #else
850         int p;
851 
852         if ((p = strerror_r(e, buf.get(), 80)) == 0)
853             msg += buf.get();
854 
855 #endif
856         throw runtime_error(msg);
857     }
858 
859     if (fSyncProto)
860     {
861         /* send a byte to artificially synchronize with connect() on the remote */
862         char b = 'A';
863         int ret;
864 
865         ret = ::send(clientfd, &b, 1, 0);
866         e = errno;
867 
868         if (ret < 0)
869         {
870             ostringstream  os;
871             char blah[80];
872 #if STRERROR_R_CHAR_P
873             const char* p;
874 
875             if ((p = strerror_r(e, blah, 80)) != 0)
876                 os << "InetStreamSocket::accept sync: " << p;
877 
878 #else
879             int p;
880 
881             if ((p = strerror_r(e, blah, 80)) == 0)
882                 os << "InetStreamSocket::accept sync: " << blah;
883 
884 #endif
885             ::close(clientfd);
886             throw runtime_error(os.str());
887         }
888         else if (ret == 0)
889         {
890             ::close(clientfd);
891             throw runtime_error("InetStreamSocket::accept sync: got unexpected error code");
892         }
893     }
894 
895     SocketParms sp;
896     sp = ios.socketParms();
897     sp.sd(clientfd);
898     ios.socketParms(sp);
899     ios.sa(&sa);
900     return ios;
901 
902 }
903 
connect(const sockaddr * serv_addr)904 void InetStreamSocket::connect(const sockaddr* serv_addr)
905 {
906     memcpy(&fSa, serv_addr, sizeof(sockaddr_in));
907 
908     if (::connect(socketParms().sd(), serv_addr, sizeof(sockaddr_in)))
909     {
910         int e = errno;
911         string msg = "InetStreamSocket::connect: connect() error: ";
912 #ifdef _MSC_VER
913         char m[80];
914         int x = WSAGetLastError();
915 
916         if (x == WSAECONNREFUSED)
917             strcpy(m, "connection refused");
918         else
919             sprintf(m, "%d 0x%x", x, x);
920 
921         msg += m;
922 #else
923         scoped_array<char> buf(new char[80]);
924 #if STRERROR_R_CHAR_P
925         const char* p;
926 
927         if ((p = strerror_r(e, buf.get(), 80)) != 0)
928             msg += p;
929 
930 #else
931         int p;
932 
933         if ((p = strerror_r(e, buf.get(), 80)) == 0)
934             msg += buf.get();
935 
936 #endif
937 #endif
938         msg += " to: " + toString();
939         throw runtime_error(msg);
940     }
941 
942     if (!fSyncProto)
943         return;
944 
945     /* read a byte to artificially synchronize with accept() on the remote */
946     int ret = -1;
947     int e = EBADF;
948     struct pollfd pfd;
949 
950     long msecs = fConnectionTimeout.tv_sec * 1000 + fConnectionTimeout.tv_nsec / 1000000;
951 
952     do
953     {
954         pfd.fd = socketParms().sd();
955         pfd.revents = 0;
956         pfd.events = POLLIN;
957         ret = poll(&pfd, 1, msecs);
958         e = errno;
959     }
960     while (ret == -1 && e == EINTR && !(pfd.revents & (POLLERR | POLLHUP | POLLNVAL)));
961 
962     // success
963     if (ret == 1)
964     {
965 #ifdef _MSC_VER
966         char buf = '\0';
967         (void)::recv(socketParms().sd(), &buf, 1, 0);
968 #else
969 #if defined(__GNUC__) && __GNUC__ >= 5
970 #pragma GCC diagnostic push
971 #pragma GCC diagnostic ignored "-Wunused-result"
972         char buf = '\0';
973         ::read(socketParms().sd(), &buf, 1);   // we know 1 byte is in the recv buffer
974 #pragma GCC diagnostic pop
975 #else
976         char buf = '\0';
977         ::read(socketParms().sd(), &buf, 1);   // we know 1 byte is in the recv buffer
978 #endif // pragma
979 #endif
980         return;
981     }
982 
983     /* handle the various errors */
984     if (ret == 0)
985         throw runtime_error("InetStreamSocket::connect: connection timed out");
986     else if (ret == -1 && e != EINTR)
987     {
988         ostringstream  os;
989         char blah[80];
990 #if STRERROR_R_CHAR_P
991         const char* p;
992 
993         if ((p = strerror_r(e, blah, 80)) != 0)
994             os << "InetStreamSocket::connect: " << p;
995 
996 #else
997         int p;
998 
999         if ((p = strerror_r(e, blah, 80)) == 0)
1000             os << "InetStreamSocket::connect: " << blah;
1001 
1002 #endif
1003         throw runtime_error(os.str());
1004     }
1005     else
1006         throw runtime_error("InetStreamSocket::connect: unknown connection error");
1007 }
1008 
toString() const1009 const string InetStreamSocket::toString() const
1010 {
1011     ostringstream oss;
1012     char buf[INET_ADDRSTRLEN];
1013     const SocketParms& sp = fSocketParms;
1014     oss << "InetStreamSocket: sd: " << sp.sd() <<
1015 #ifndef _MSC_VER
1016         " inet: " << inet_ntop(AF_INET, &fSa.sin_addr, buf, INET_ADDRSTRLEN) <<
1017 #endif
1018         " port: " << ntohs(fSa.sin_port);
1019     return oss.str();
1020 }
1021 
1022 //
1023 //  Log a Warning msg pertaining to an I/O error; Currently used to log a
1024 //  ERESTARTSYS (errno 512) condition, but could be used to log any other
1025 //  I/O error that will retried.
1026 //
logIoError(const char * errMsg,int errNum) const1027 void InetStreamSocket::logIoError(const char* errMsg, int errNum) const
1028 {
1029     logging::Logger        logger(31);
1030     logging::Message::Args args;
1031     logging::LoggingID     li(31);
1032     args.add(errMsg);
1033     args.add(strerror(errNum));
1034     args.add(toString());
1035 
1036     logging::MsgMap msgMap;
1037     msgMap[logging::M0071] = logging::Message( logging::M0071 );
1038     logger.msgMap(msgMap);
1039 
1040     logger.logMessage(logging::LOG_TYPE_WARNING, logging::M0071, args, li);
1041 }
1042 
written(int fd,const uint8_t * ptr,size_t nbytes) const1043 ssize_t InetStreamSocket::written(int fd, const uint8_t* ptr, size_t nbytes) const
1044 {
1045     size_t nleft;
1046     ssize_t nwritten;
1047     const char* bufp;
1048 
1049     nleft = nbytes;
1050     bufp = reinterpret_cast<const char*>(ptr);
1051 
1052     while (nleft > 0)
1053     {
1054         // the O_NONBLOCK flag is not set, this is a blocking I/O.
1055 #ifdef _MSC_VER
1056         int writeAmount = std::min((int)nleft, MaxSendPacketSize);
1057 
1058         if ((nwritten = ::send(fd, bufp, writeAmount, 0)) < 0)
1059 #else
1060         if ((nwritten = ::write(fd, bufp, nleft)) < 0)
1061 #endif
1062         {
1063             if (errno == EINTR)
1064                 nwritten = 0;
1065             else
1066             {
1067                 // save the error no first
1068                 int e = errno;
1069                 string errorMsg = "InetStreamSocket::write error: ";
1070                 scoped_array<char> buf(new char[80]);
1071 #if STRERROR_R_CHAR_P
1072                 const char* p;
1073 
1074                 if ((p = strerror_r(e, buf.get(), 80)) != 0)
1075                     errorMsg += p;
1076 
1077 #else
1078                 int p;
1079 
1080                 if ((p = strerror_r(e, buf.get(), 80)) == 0)
1081                     errorMsg += buf.get();
1082 
1083 #endif
1084                 throw runtime_error(errorMsg);
1085             }
1086         }
1087 
1088         nleft -= nwritten;
1089         bufp += nwritten;
1090     }
1091 
1092     return nbytes;
1093 }
1094 
addr2String() const1095 const string InetStreamSocket::addr2String() const
1096 {
1097     string s;
1098 #ifdef _MSC_VER
1099     //This is documented to be thread-safe in Windows
1100     s = inet_ntoa(fSa.sin_addr);
1101 #else
1102     char dst[INET_ADDRSTRLEN];
1103     s = inet_ntop(AF_INET, &fSa.sin_addr, dst, INET_ADDRSTRLEN);
1104 #endif
1105     return s;
1106 }
1107 
isSameAddr(const Socket * rhs) const1108 bool InetStreamSocket::isSameAddr(const Socket* rhs) const
1109 {
1110     const InetStreamSocket* issp = dynamic_cast<const InetStreamSocket*>(rhs);
1111 
1112     if (!issp) return false;
1113 
1114     return (fSa.sin_addr.s_addr == issp->fSa.sin_addr.s_addr);
1115 }
1116 
1117 /*static*/
ping(const std::string & ipaddr,const struct timespec * timeout)1118 int InetStreamSocket::ping(const std::string& ipaddr, const struct timespec* timeout)
1119 {
1120     sockaddr_in pingaddr;
1121     memset(&pingaddr, 0, sizeof(pingaddr));
1122 
1123     if (inet_aton(ipaddr.c_str(), &pingaddr.sin_addr) == 0)
1124         return -1;
1125 
1126     long msecs = 30 * 1000;
1127 
1128     if (timeout)
1129         msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
1130 
1131 #ifndef _MSC_VER
1132     int pingsock;
1133     pingsock = ::socket(PF_INET, SOCK_RAW, IPPROTO_ICMP);
1134 
1135     if (pingsock < 0)
1136         return -1;
1137 
1138     ssize_t len = 0;
1139     size_t pktlen = 0;
1140     const size_t PktSize = 1024;
1141     char pkt[PktSize];
1142     memset(pkt, 0, PktSize);
1143     struct icmp* pingPktPtr = reinterpret_cast<struct icmp*>(pkt);
1144 
1145     pingPktPtr->icmp_type = ICMP_ECHO;
1146     pingPktPtr->icmp_cksum = in_cksum(reinterpret_cast<unsigned short*>(pkt), PktSize);
1147 
1148     pktlen = 56 + ICMP_MINLEN;
1149     len = ::sendto(pingsock, pkt, pktlen, 0, reinterpret_cast<const struct sockaddr*>(&pingaddr),
1150                    sizeof(pingaddr));
1151 
1152     if (len < 0 || static_cast<size_t>(len) != pktlen)
1153     {
1154         ::close(pingsock);
1155         return -1;
1156     }
1157 
1158     memset(pkt, 0, PktSize);
1159     pktlen = PktSize;
1160 
1161     int pollrc = 0;
1162     pollrc = pollConnection(pingsock, msecs);
1163 
1164     if (pollrc != 1)
1165     {
1166         ::close(pingsock);
1167         return -1;
1168     }
1169 
1170     len = ::recvfrom(pingsock, pkt, pktlen, 0, 0, 0);
1171 
1172     if (len < 76)
1173     {
1174         ::close(pingsock);
1175         return -1;
1176     }
1177 
1178     struct ip* iphdr = reinterpret_cast<struct ip*>(pkt);
1179 
1180     pingPktPtr = reinterpret_cast<struct icmp*>(pkt + (iphdr->ip_hl << 2));
1181 
1182     if (pingPktPtr->icmp_type != ICMP_ECHOREPLY)
1183     {
1184         ::close(pingsock);
1185         return -1;
1186     }
1187 
1188     ::close(pingsock);
1189 
1190 #else //Windows version
1191     HANDLE icmpFile;
1192     icmpFile = IcmpCreateFile();
1193 
1194     if (icmpFile == INVALID_HANDLE_VALUE)
1195         return -1;
1196 
1197     DWORD ret;
1198     const size_t PingPktSize = 1024;
1199     char rqd[PingPktSize];
1200     WORD rqs = PingPktSize;
1201     char rpd[PingPktSize];
1202     DWORD rps = PingPktSize;
1203 
1204     ZeroMemory(rqd, PingPktSize);
1205     ZeroMemory(rpd, PingPktSize);
1206 
1207     rqs = 64;
1208 
1209     ret = IcmpSendEcho(icmpFile, pingaddr.sin_addr.s_addr, rqd, rqs, 0, rpd, rps, msecs);
1210 
1211     if (ret <= 0)
1212     {
1213         IcmpCloseHandle(icmpFile);
1214         return -1;
1215     }
1216 
1217     PICMP_ECHO_REPLY echoReply = (PICMP_ECHO_REPLY)rpd;
1218 
1219     if (echoReply->Status != IP_SUCCESS)
1220     {
1221         IcmpCloseHandle(icmpFile);
1222         return -1;
1223     }
1224 
1225     IcmpCloseHandle(icmpFile);
1226 #endif
1227 
1228     return 0;
1229 }
1230 
isConnected() const1231 bool InetStreamSocket::isConnected() const
1232 {
1233     int error = 0;
1234     socklen_t len = sizeof(error);
1235     int retval = getsockopt(fSocketParms.sd(), SOL_SOCKET, SO_ERROR, &error, &len);
1236 
1237     if (error || retval)
1238         return false;
1239 
1240     struct pollfd pfd[1];
1241     pfd[0].fd = fSocketParms.sd();
1242     pfd[0].events = POLLIN;
1243     pfd[0].revents = 0;
1244 
1245     error = poll(pfd, 1, 0);
1246 
1247     if ((error < 0) || (pfd[0].revents & (POLLHUP | POLLNVAL | POLLERR)))
1248     {
1249         return false;
1250     }
1251 
1252     return true;
1253 }
1254 
hasData() const1255 bool InetStreamSocket::hasData() const
1256 {
1257     int count;
1258     char buf[1];
1259     ssize_t retval;
1260     ioctl(fSocketParms.sd(), FIONREAD, &count);
1261 
1262     if (count)
1263         return true;
1264 
1265     // EAGAIN | EWOULDBLOCK means the socket is clear. Anything else is data or error
1266     retval = recv(fSocketParms.sd(), buf, 1, MSG_DONTWAIT);
1267 
1268     if (retval & (EAGAIN | EWOULDBLOCK))
1269         return false;
1270 
1271     return true;
1272 }
1273 
1274 } //namespace messageqcpp
1275 
1276