1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*
19 Copyright (c) 2007 Alexander Eremin <netwhistler@gmail.com>
20 All rights reserved.
21
22 Redistribution and use in source and binary forms, with or without
23 modification, are permitted provided that the following conditions are met:
24
25 1. Redistributions of source code must retain the above copyright notice, this
26 list of conditions and the following disclaimer.
27 2. Redistributions in binary form must reproduce the above copyright notice,
28 this list of conditions and the following disclaimer in the documentation
29 and/or other materials provided with the distribution.
30
31 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
32 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
33 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
34 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
35 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
36 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
38 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
40 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41
42 The views and conclusions contained in the software and documentation are those
43 of the authors and should not be interpreted as representing official policies,
44 either expressed or implied, of the FreeBSD Project.
45 */
46
47 #include "mcsconfig.h"
48
49 #include <cstdio>
50 #include <cerrno>
51 #ifdef _MSC_VER
52 #define WIN32_LEAN_AND_MEAN
53 #define NOMINMAX
54 #include <windows.h>
55 #include <winsock2.h>
56 #include <ws2tcpip.h>
57 #include <iphlpapi.h>
58 #include <icmpapi.h>
59 #include <stdio.h>
60 #else
61 #if __FreeBSD__
62 #include <sys/types.h>
63 #include <netinet/in.h>
64 #include <netinet/ip.h>
65 #endif
66 #include <sys/socket.h>
67 #include <poll.h>
68 #include <netinet/in.h>
69 #include <netinet/ip_icmp.h>
70 #include <arpa/inet.h>
71 #include <netinet/tcp.h>
72 #include <fcntl.h>
73 #include <sys/ioctl.h>
74 #endif
75 #include <sys/types.h>
76 #include <sys/time.h>
77 #include <cstring>
78
79 #include <stdexcept>
80 #include <string>
81 #include <sstream>
82 using namespace std;
83
84 #include <boost/scoped_array.hpp>
85 using boost::scoped_array;
86
87 #define INETSTREAMSOCKET_DLLEXPORT
88 #include "inetstreamsocket.h"
89 #undef INETSTREAMSOCKET_DLLEXPORT
90 #include "bytestream.h"
91 #include "iosocket.h"
92 #include "socketparms.h"
93 #include "socketclosed.h"
94 #include "logger.h"
95 #include "loggingid.h"
96 #include "idbcompress.h"
97
98 // some static functions
99 namespace
100 {
101 using messageqcpp::ByteStream;
102
103 // @bug 2441 - Retry after 512 read() error.
104 // ERESTARTSYS (512) is a kernal I/O errno that is similar to a EINTR, except
105 // that it is not supposed to "leak" out into the user space. But we are
106 // sometimes seeing "unknown error 512" error msgs in response to calls to
107 // read(), so adding logic to retry after ERESTARTSYS the way we do for EINTR.
108 //const int KERR_ERESTARTSYS = 512;
109
110 #ifdef _MSC_VER
111 const int MaxSendPacketSize = 64 * 1024;
112 #endif
113
in_cksum(unsigned short * buf,int sz)114 int in_cksum(unsigned short* buf, int sz)
115 {
116 int nleft = sz;
117 int sum = 0;
118 unsigned short* w = buf;
119 unsigned short ans = 0;
120
121 while (nleft > 1)
122 {
123 sum += *w++;
124 nleft -= 2;
125 }
126
127 if (nleft == 1)
128 {
129 *(unsigned char*)(&ans) = *(unsigned char*)w;
130 sum += ans;
131 }
132
133 sum = (sum >> 16) + (sum & 0xFFFF);
134 sum += (sum >> 16);
135 ans = ~sum;
136 return ans;
137 }
138
139 } //namespace anon
140
141 namespace messageqcpp
142 {
143
InetStreamSocket(size_t blocksize)144 InetStreamSocket::InetStreamSocket(size_t blocksize) :
145 fSocketParms(PF_INET, SOCK_STREAM, IPPROTO_TCP),
146 fBlocksize(blocksize),
147 fSyncProto(true),
148 fMagicBuffer(0)
149 {
150 memset(&fSa, 0, sizeof(fSa));
151 fConnectionTimeout.tv_sec = 20;
152 fConnectionTimeout.tv_nsec = 0;
153 }
154
~InetStreamSocket()155 InetStreamSocket::~InetStreamSocket()
156 {
157 }
158
open()159 void InetStreamSocket::open()
160 {
161 int bufferSize;
162 int ret;
163 socklen_t bufferSizeSize;
164
165 if (isOpen())
166 throw logic_error("InetStreamSocket::open: socket is already open");
167
168 int sd;
169 sd = ::socket(fSocketParms.domain(), fSocketParms.type(), fSocketParms.protocol());
170 int e = errno;
171
172 if (sd < 0)
173 {
174 #ifdef _MSC_VER
175 int wsaError = WSAGetLastError();
176
177 if (wsaError == WSANOTINITIALISED)
178 {
179 WSAData wsadata;
180 const WORD minVersion = MAKEWORD(2, 2);
181
182 if (WSAStartup(minVersion, &wsadata) == 0)
183 {
184 if (wsadata.wVersion == minVersion)
185 {
186 sd = ::socket(fSocketParms.domain(), fSocketParms.type(), fSocketParms.protocol());
187 e = errno;
188
189 if (sd >= 0) goto setopts;
190 }
191
192 //Didn't get the required min version, error out
193 }
194
195 //WSAStartup failed, continue to report error
196 }
197
198 #endif
199 string msg = "InetStreamSocket::open: socket() error: ";
200 scoped_array<char> buf(new char[80]);
201 #if STRERROR_R_CHAR_P
202 const char* p;
203
204 if ((p = strerror_r(e, buf.get(), 80)) != 0)
205 msg += p;
206
207 #else
208 int p;
209
210 if ((p = strerror_r(e, buf.get(), 80)) == 0)
211 msg += buf.get();
212
213 #endif
214 throw runtime_error(msg);
215 }
216
217 #ifdef _MSC_VER
218 setopts:
219 #endif
220
221 /* XXXPAT: If we have latency problems again, try these...
222 bufferSizeSize = 4;
223 bufferSize = 512000;
224 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, &bufferSize, bufferSizeSize);
225 bufferSize = 512000;
226 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, &bufferSize, bufferSizeSize);
227 bufferSize = 1;
228 setsockopt(sd, SOL_SOCKET, SO_RCVLOWAT, &bufferSize, bufferSizeSize);
229 setsockopt(sd, SOL_SOCKET, SO_SNDLOWAT, &bufferSize, bufferSizeSize);
230 */
231 bufferSize = 1;
232 bufferSizeSize = 4;
233 ret = setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (const char*)&bufferSize, bufferSizeSize);
234
235 if (ret < 0)
236 {
237 perror("setsockopt");
238 exit(1);
239 }
240
241 bufferSize = 1;
242 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (const char*)&bufferSize, bufferSizeSize);
243
244 if (ret < 0)
245 {
246 perror("setsockopt");
247 exit(1);
248 }
249
250 fSocketParms.sd(sd);
251 }
252
close()253 void InetStreamSocket::close()
254 {
255 if (isOpen())
256 {
257 ::shutdown(fSocketParms.sd(), SHUT_RDWR);
258 #ifdef _MSC_VER
259 ::closesocket(fSocketParms.sd());
260 #else
261 ::close(fSocketParms.sd());
262 #endif
263 fSocketParms.sd(-1);
264 }
265 }
266
267 // needs to be in sync with clone()
doCopy(const InetStreamSocket & rhs)268 void InetStreamSocket::doCopy(const InetStreamSocket& rhs)
269 {
270 fBlocksize = rhs.fBlocksize;
271 fSocketParms = rhs.fSocketParms;
272 fSa = rhs.fSa;
273 fConnectionTimeout = rhs.fConnectionTimeout;
274 fSyncProto = rhs.fSyncProto;
275 }
276
277 // needs to be in sync with doCopy()
clone() const278 Socket* InetStreamSocket::clone() const
279 {
280 InetStreamSocket* iss = new InetStreamSocket(fBlocksize);
281 iss->fSocketParms = fSocketParms;
282 iss->fSa = fSa;
283 iss->fConnectionTimeout = fConnectionTimeout;
284 iss->fSyncProto = fSyncProto;
285 return iss;
286 }
287
InetStreamSocket(const InetStreamSocket & rhs)288 InetStreamSocket::InetStreamSocket(const InetStreamSocket& rhs)
289 {
290 doCopy(rhs);
291 }
292
operator =(const InetStreamSocket & rhs)293 InetStreamSocket& InetStreamSocket::operator=(const InetStreamSocket& rhs)
294 {
295 if (this != &rhs)
296 doCopy(rhs);
297
298 return *this;
299 }
300
301 /* The caller needs to know when/if the remote closes the connection or sends data.
302 * Returns 0 on timeout, 1 if there is data to read, 2 if the connection was dropped.
303 */
pollConnection(int connectionNum,long msecs)304 int InetStreamSocket::pollConnection(int connectionNum, long msecs)
305 {
306 struct pollfd pfd[1];
307 int err;
308
309 retry:
310 memset(&pfd, 0, sizeof(struct pollfd));
311 pfd[0].fd = connectionNum;
312 pfd[0].events = POLLIN;
313 err = poll(pfd, 1, msecs);
314
315 if (err < 0)
316 {
317 int e = errno;
318
319 if (e == EINTR || e == KERR_ERESTARTSYS)
320 goto retry;
321 }
322
323 // Linux doesn't set POLLHUP, need add'l check for data or EOF
324 if (pfd[0].revents & POLLIN)
325 {
326 char buf;
327 err = ::recv(connectionNum, &buf, 1, MSG_PEEK);
328
329 if (err == 0)
330 return 2;
331 else if (err == 1) // there is in fact data to read
332 return 1;
333 else
334 return 3;
335 }
336
337 if (err == 0) // timeout
338 return 0;
339
340 return 3; // catch-all error code
341 }
342
343 /* returns true when the next thing in the stream is the beginning of a new
344 ByteStream object. */
readToMagic(long msecs,bool * isTimeOut,Stats * stats) const345 bool InetStreamSocket::readToMagic(long msecs, bool* isTimeOut, Stats* stats) const
346 {
347 int err;
348 struct pollfd pfd[1];
349 uint8_t* magicBuffer8;
350
351 fMagicBuffer = 0;
352 magicBuffer8 = reinterpret_cast<uint8_t*>(&fMagicBuffer);
353 pfd[0].fd = fSocketParms.sd();
354 pfd[0].events = POLLIN;
355
356 while ((fMagicBuffer != BYTESTREAM_MAGIC) && (fMagicBuffer != COMPRESSED_BYTESTREAM_MAGIC))
357 {
358
359 if (msecs >= 0)
360 {
361 pfd[0].revents = 0;
362
363 err = poll(pfd, 1, msecs);
364
365 if (err < 0)
366 {
367 int e = errno;
368
369 if (e == EINTR)
370 {
371 continue;
372 }
373
374 if (e == KERR_ERESTARTSYS)
375 {
376 logIoError("InetStreamSocket::readToMagic(): I/O error1", e);
377 continue;
378 }
379
380 ostringstream oss;
381 oss << "InetStreamSocket::readToMagic(): I/O error1: " <<
382 strerror(e);
383 throw runtime_error(oss.str());
384 }
385
386 if (pfd[0].revents & (POLLHUP | POLLNVAL | POLLERR))
387 {
388 ostringstream oss;
389 oss << "InetStreamSocket::readToMagic(): I/O error1: rc-" <<
390 err << "; poll signal interrupt ( ";
391
392 if (pfd[0].revents & POLLHUP)
393 oss << "POLLHUP ";
394
395 if (pfd[0].revents & POLLNVAL)
396 oss << "POLLNVAL ";
397
398 if (pfd[0].revents & POLLERR)
399 oss << "POLLERR ";
400
401 oss << ")";
402 throw runtime_error(oss.str());
403 }
404
405 if (err == 0) // timeout
406 {
407 if (isTimeOut)
408 *isTimeOut = true;
409
410 return false;
411 }
412 }
413
414 fMagicBuffer = fMagicBuffer >> 8;
415 retry:
416 #ifdef _MSC_VER
417 err = ::recv(fSocketParms.sd(), (char*)&magicBuffer8[3], 1, 0);
418 #else
419 err = ::read(fSocketParms.sd(), &magicBuffer8[3], 1);
420 #endif
421
422 if (err < 0)
423 {
424 int e = errno;
425 #ifdef _MSC_VER
426
427 if (WSAGetLastError() == WSAECONNRESET)
428 {
429 //throw runtime_error("connection reset by peer");
430 if (msecs < 0) return false;
431 else throw SocketClosed("InetStreamSocket::readToMagic: Remote is closed");
432 }
433
434 #endif
435
436 if (e == EINTR)
437 {
438 goto retry;
439 }
440
441 if (e == KERR_ERESTARTSYS)
442 {
443 logIoError("InetStreamSocket::readToMagic(): I/O error2.0", e);
444 goto retry;
445 }
446
447 ostringstream oss;
448 oss << "InetStreamSocket::readToMagic(): I/O error2.1: " <<
449 "err = " << err << " e = " << e <<
450 #ifdef _MSC_VER
451 " WSA error = " << WSAGetLastError() <<
452 #endif
453 ": " << strerror(e);
454 throw runtime_error(oss.str());
455 }
456
457 // EOF. If no timeout was specified, ByteStream() gets returned to the caller.
458 // If one was, throw SocketClosed.
459 if (err == 0) // EOF. if a timeout was specified, ByteStream()
460 {
461 if (msecs < 0)
462 return false;
463 else
464 throw SocketClosed("InetStreamSocket::readToMagic: Remote is closed");
465 }
466
467 if (stats)
468 stats->dataRecvd(1);
469 }
470
471 return true;
472 }
473
read(const struct::timespec * timeout,bool * isTimeOut,Stats * stats) const474 const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const
475 {
476 long msecs = -1;
477
478 struct pollfd pfd[1];
479 pfd[0].fd = fSocketParms.sd();
480 pfd[0].events = POLLIN;
481
482 if (timeout != 0)
483 msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
484
485 // we need to read the 4-byte message length first.
486 uint32_t msglen;
487 uint8_t* msglenp = reinterpret_cast<uint8_t*>(&msglen);
488 size_t mlread = 0;
489
490 if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF
491 {
492 // MCOL-480 The connector calls with timeout in a loop so that
493 // it can check a killed flag. This means that for a long running query,
494 // the following fills the warning log.
495 // if (isTimeOut && *isTimeOut)
496 // {
497 // logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
498 // }
499 return SBS(new ByteStream(0));
500 }
501
502 //FIXME: This seems like a lot of work to read 4 bytes...
503 while (mlread < sizeof(msglen))
504 {
505 ssize_t t;
506
507 if (timeout != NULL)
508 {
509 int err;
510
511 pfd[0].revents = 0;
512 err = poll(pfd, 1, msecs);
513
514 if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
515 {
516 ostringstream oss;
517 oss << "InetStreamSocket::read: I/O error1: " <<
518 strerror(errno);
519 throw runtime_error(oss.str());
520 }
521
522 if (err == 0) // timeout
523 {
524 if (isTimeOut)
525 *isTimeOut = true;
526
527 logIoError("InetStreamSocket::read: timeout during first poll", 0);
528 return SBS(new ByteStream(0));
529 }
530 }
531
532 #ifdef _MSC_VER
533 t = ::recv(fSocketParms.sd(), (char*)(msglenp + mlread), sizeof(msglen) - mlread, 0);
534 #else
535 t = ::read(fSocketParms.sd(), msglenp + mlread, sizeof(msglen) - mlread);
536 #endif
537
538 if (t == 0)
539 {
540 if (timeout == NULL)
541 {
542 logIoError("InetStreamSocket::read: timeout during first read", 0);
543 return SBS(new ByteStream(0)); // don't return an incomplete message
544 }
545 else
546 throw SocketClosed("InetStreamSocket::read: Remote is closed");
547 }
548
549 if (t < 0)
550 {
551 int e = errno;
552
553 if (e == EINTR)
554 {
555 continue;
556 }
557
558 if (e == KERR_ERESTARTSYS)
559 {
560 logIoError("InetStreamSocket::read: I/O error2", e);
561 continue;
562 }
563
564 ostringstream oss;
565 oss << "InetStreamSocket::read: I/O error2: " <<
566 strerror(e);
567 throw runtime_error(oss.str());
568 }
569
570 mlread += t;
571 }
572
573 if (stats)
574 stats->dataRecvd(sizeof(msglen));
575
576 SBS res(new ByteStream(msglen));
577 uint8_t* bufp = res->getInputPtr();
578
579 size_t nread = 0;
580
581 //Finally read the actual message...
582 while (nread < msglen)
583 {
584 ssize_t t;
585
586 if (timeout != NULL)
587 {
588 int err;
589
590 pfd[0].revents = 0;
591 err = poll(pfd, 1, msecs);
592
593 if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
594 {
595 ostringstream oss;
596 oss << "InetStreamSocket::read: I/O error3: " <<
597 strerror(errno);
598 throw runtime_error(oss.str());
599 }
600
601 if (err == 0) // timeout
602 {
603 if (isTimeOut)
604 {
605 logIoError("InetStreamSocket::read: timeout during second poll", 0);
606 *isTimeOut = true;
607 }
608
609 if (stats)
610 stats->dataRecvd(nread);
611
612 return SBS(new ByteStream(0));
613 }
614 }
615
616 #ifdef _MSC_VER
617 int readAmount = std::min((int)msglen - (int)nread, MaxSendPacketSize);
618 t = ::recv(fSocketParms.sd(), (char*)(bufp + nread), readAmount, 0);
619 #else
620 t = ::read(fSocketParms.sd(), bufp + nread, msglen - nread);
621 #endif
622
623 if (t == 0)
624 {
625 if (stats)
626 stats->dataRecvd(nread);
627
628 if (timeout == NULL)
629 return SBS(new ByteStream(0)); // don't return an incomplete message
630 else
631 {
632 logIoError("InetStreamSocket::read: timeout during second read", 0);
633 throw SocketClosed("InetStreamSocket::read: Remote is closed");
634 }
635 }
636
637 if (t < 0)
638 {
639 ostringstream oss;
640 #ifdef _MSC_VER
641 int e = WSAGetLastError();
642 oss << "InetStreamSocket::read: I/O error4: WSA: " << e;
643 #else
644 int e = errno;
645
646 if (e == EINTR)
647 {
648 continue;
649 }
650
651 if (e == KERR_ERESTARTSYS)
652 {
653 logIoError("InetStreamSocket::read: I/O error4", e);
654 continue;
655 }
656
657 oss << "InetStreamSocket::read: I/O error4: " <<
658 strerror(e);
659 #endif
660
661 if (stats)
662 stats->dataRecvd(nread);
663
664 throw runtime_error(oss.str());
665 }
666
667 nread += t;
668 }
669
670 if (stats)
671 stats->dataRecvd(msglen);
672
673 res->advanceInputPtr(msglen);
674 return res;
675 }
676
677 /*
678 * The protocol here is that we write the length of the ByteStream first, then the bytes. On the
679 * read side, we reverse it.
680 */
681
write(SBS msg,Stats * stats)682 void InetStreamSocket::write(SBS msg, Stats* stats)
683 {
684 write(*msg, stats);
685 }
686
do_write(const ByteStream & msg,uint32_t whichMagic,Stats * stats) const687 void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stats* stats) const
688 {
689 uint32_t msglen = msg.length();
690 uint32_t magic = whichMagic;
691 uint32_t* realBuf;
692
693 if (msglen == 0) return;
694
695 /* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there
696 are at least 8 bytes before that for the magic & length fields */
697 realBuf = (uint32_t*)msg.buf();
698 realBuf -= 2;
699 realBuf[0] = magic;
700 realBuf[1] = msglen;
701
702 try
703 {
704 written(fSocketParms.sd(), (const uint8_t*)realBuf, msglen + sizeof(msglen) + sizeof(magic));
705 }
706 catch (std::exception& ex)
707 {
708 string errorMsg(ex.what());
709 errorMsg += " -- write from " + toString();
710 throw runtime_error(errorMsg);
711 }
712
713 if (stats)
714 stats->dataSent(msglen + sizeof(msglen) + sizeof(magic));
715 }
716
write(const ByteStream & msg,Stats * stats)717 void InetStreamSocket::write(const ByteStream& msg, Stats* stats)
718 {
719 do_write(msg, BYTESTREAM_MAGIC, stats);
720 }
721
write_raw(const ByteStream & msg,Stats * stats) const722 void InetStreamSocket::write_raw(const ByteStream& msg, Stats* stats) const
723 {
724 uint32_t msglen = msg.length();
725
726 if (msglen == 0) return;
727
728 try
729 {
730 written(fSocketParms.sd(), msg.buf(), msglen);
731 }
732 catch (std::exception& ex)
733 {
734 string errorMsg(ex.what());
735 errorMsg += " -- write_raw from " + toString();
736 throw runtime_error(errorMsg);
737 }
738
739 if (stats)
740 stats->dataSent(msglen);
741 }
742
bind(const sockaddr * serv_addr)743 void InetStreamSocket::bind(const sockaddr* serv_addr)
744 {
745 memcpy(&fSa, serv_addr, sizeof(sockaddr_in));
746
747 if (::bind(fSocketParms.sd(), serv_addr, sizeof(sockaddr_in)) != 0)
748 {
749 int e = errno;
750 string msg = "InetStreamSocket::bind: bind() error: ";
751 scoped_array<char> buf(new char[80]);
752 #if STRERROR_R_CHAR_P
753 const char* p;
754
755 if ((p = strerror_r(e, buf.get(), 80)) != 0)
756 msg += p;
757
758 #else
759 int p;
760
761 if ((p = strerror_r(e, buf.get(), 80)) == 0)
762 msg += buf.get();
763
764 #endif
765 throw runtime_error(msg);
766 }
767
768 }
769
listen(int backlog)770 void InetStreamSocket::listen(int backlog)
771 {
772 #ifndef _MSC_VER
773 fcntl(socketParms().sd(), F_SETFD, fcntl(socketParms().sd(), F_GETFD) | FD_CLOEXEC);
774 #endif
775
776 if (::listen(socketParms().sd(), backlog) != 0)
777 {
778 int e = errno;
779 string msg = "InetStreamSocket::listen: listen() error: ";
780 scoped_array<char> buf(new char[80]);
781 #if STRERROR_R_CHAR_P
782 const char* p;
783
784 if ((p = strerror_r(e, buf.get(), 80)) != 0)
785 msg += p;
786
787 #else
788 int p;
789
790 if ((p = strerror_r(e, buf.get(), 80)) == 0)
791 msg += buf.get();
792
793 #endif
794 throw runtime_error(msg);
795 }
796
797 }
798
accept(const struct timespec * timeout)799 const IOSocket InetStreamSocket::accept(const struct timespec* timeout)
800 {
801 int clientfd;
802 long msecs = 0;
803
804 IOSocket ios(new InetStreamSocket(fBlocksize));
805
806 struct pollfd pfd[1];
807 pfd[0].fd = socketParms().sd();
808 pfd[0].events = POLLIN;
809
810 if (timeout != 0)
811 {
812 msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
813
814 if (poll(pfd, 1, msecs) != 1 || (pfd[0].revents & POLLIN) == 0 ||
815 pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
816 return ios;
817 }
818
819 struct sockaddr sa;
820
821 socklen_t sl = sizeof(sa);
822
823 int e;
824
825 do
826 {
827 clientfd = ::accept(socketParms().sd(), &sa, &sl);
828 e = errno;
829 }
830 while (clientfd < 0 && (e == EINTR ||
831 #ifdef ERESTART
832 e == ERESTART ||
833 #endif
834 #ifdef ECONNABORTED
835 e == ECONNABORTED ||
836 #endif
837 false));
838
839 if (clientfd < 0)
840 {
841 string msg = "InetStreamSocket::accept: accept() error: ";
842 scoped_array<char> buf(new char[80]);
843 #if STRERROR_R_CHAR_P
844 const char* p;
845
846 if ((p = strerror_r(e, buf.get(), 80)) != 0)
847 msg += p;
848
849 #else
850 int p;
851
852 if ((p = strerror_r(e, buf.get(), 80)) == 0)
853 msg += buf.get();
854
855 #endif
856 throw runtime_error(msg);
857 }
858
859 if (fSyncProto)
860 {
861 /* send a byte to artificially synchronize with connect() on the remote */
862 char b = 'A';
863 int ret;
864
865 ret = ::send(clientfd, &b, 1, 0);
866 e = errno;
867
868 if (ret < 0)
869 {
870 ostringstream os;
871 char blah[80];
872 #if STRERROR_R_CHAR_P
873 const char* p;
874
875 if ((p = strerror_r(e, blah, 80)) != 0)
876 os << "InetStreamSocket::accept sync: " << p;
877
878 #else
879 int p;
880
881 if ((p = strerror_r(e, blah, 80)) == 0)
882 os << "InetStreamSocket::accept sync: " << blah;
883
884 #endif
885 ::close(clientfd);
886 throw runtime_error(os.str());
887 }
888 else if (ret == 0)
889 {
890 ::close(clientfd);
891 throw runtime_error("InetStreamSocket::accept sync: got unexpected error code");
892 }
893 }
894
895 SocketParms sp;
896 sp = ios.socketParms();
897 sp.sd(clientfd);
898 ios.socketParms(sp);
899 ios.sa(&sa);
900 return ios;
901
902 }
903
connect(const sockaddr * serv_addr)904 void InetStreamSocket::connect(const sockaddr* serv_addr)
905 {
906 memcpy(&fSa, serv_addr, sizeof(sockaddr_in));
907
908 if (::connect(socketParms().sd(), serv_addr, sizeof(sockaddr_in)))
909 {
910 int e = errno;
911 string msg = "InetStreamSocket::connect: connect() error: ";
912 #ifdef _MSC_VER
913 char m[80];
914 int x = WSAGetLastError();
915
916 if (x == WSAECONNREFUSED)
917 strcpy(m, "connection refused");
918 else
919 sprintf(m, "%d 0x%x", x, x);
920
921 msg += m;
922 #else
923 scoped_array<char> buf(new char[80]);
924 #if STRERROR_R_CHAR_P
925 const char* p;
926
927 if ((p = strerror_r(e, buf.get(), 80)) != 0)
928 msg += p;
929
930 #else
931 int p;
932
933 if ((p = strerror_r(e, buf.get(), 80)) == 0)
934 msg += buf.get();
935
936 #endif
937 #endif
938 msg += " to: " + toString();
939 throw runtime_error(msg);
940 }
941
942 if (!fSyncProto)
943 return;
944
945 /* read a byte to artificially synchronize with accept() on the remote */
946 int ret = -1;
947 int e = EBADF;
948 struct pollfd pfd;
949
950 long msecs = fConnectionTimeout.tv_sec * 1000 + fConnectionTimeout.tv_nsec / 1000000;
951
952 do
953 {
954 pfd.fd = socketParms().sd();
955 pfd.revents = 0;
956 pfd.events = POLLIN;
957 ret = poll(&pfd, 1, msecs);
958 e = errno;
959 }
960 while (ret == -1 && e == EINTR && !(pfd.revents & (POLLERR | POLLHUP | POLLNVAL)));
961
962 // success
963 if (ret == 1)
964 {
965 #ifdef _MSC_VER
966 char buf = '\0';
967 (void)::recv(socketParms().sd(), &buf, 1, 0);
968 #else
969 #if defined(__GNUC__) && __GNUC__ >= 5
970 #pragma GCC diagnostic push
971 #pragma GCC diagnostic ignored "-Wunused-result"
972 char buf = '\0';
973 ::read(socketParms().sd(), &buf, 1); // we know 1 byte is in the recv buffer
974 #pragma GCC diagnostic pop
975 #else
976 char buf = '\0';
977 ::read(socketParms().sd(), &buf, 1); // we know 1 byte is in the recv buffer
978 #endif // pragma
979 #endif
980 return;
981 }
982
983 /* handle the various errors */
984 if (ret == 0)
985 throw runtime_error("InetStreamSocket::connect: connection timed out");
986 else if (ret == -1 && e != EINTR)
987 {
988 ostringstream os;
989 char blah[80];
990 #if STRERROR_R_CHAR_P
991 const char* p;
992
993 if ((p = strerror_r(e, blah, 80)) != 0)
994 os << "InetStreamSocket::connect: " << p;
995
996 #else
997 int p;
998
999 if ((p = strerror_r(e, blah, 80)) == 0)
1000 os << "InetStreamSocket::connect: " << blah;
1001
1002 #endif
1003 throw runtime_error(os.str());
1004 }
1005 else
1006 throw runtime_error("InetStreamSocket::connect: unknown connection error");
1007 }
1008
toString() const1009 const string InetStreamSocket::toString() const
1010 {
1011 ostringstream oss;
1012 char buf[INET_ADDRSTRLEN];
1013 const SocketParms& sp = fSocketParms;
1014 oss << "InetStreamSocket: sd: " << sp.sd() <<
1015 #ifndef _MSC_VER
1016 " inet: " << inet_ntop(AF_INET, &fSa.sin_addr, buf, INET_ADDRSTRLEN) <<
1017 #endif
1018 " port: " << ntohs(fSa.sin_port);
1019 return oss.str();
1020 }
1021
1022 //
1023 // Log a Warning msg pertaining to an I/O error; Currently used to log a
1024 // ERESTARTSYS (errno 512) condition, but could be used to log any other
1025 // I/O error that will retried.
1026 //
logIoError(const char * errMsg,int errNum) const1027 void InetStreamSocket::logIoError(const char* errMsg, int errNum) const
1028 {
1029 logging::Logger logger(31);
1030 logging::Message::Args args;
1031 logging::LoggingID li(31);
1032 args.add(errMsg);
1033 args.add(strerror(errNum));
1034 args.add(toString());
1035
1036 logging::MsgMap msgMap;
1037 msgMap[logging::M0071] = logging::Message( logging::M0071 );
1038 logger.msgMap(msgMap);
1039
1040 logger.logMessage(logging::LOG_TYPE_WARNING, logging::M0071, args, li);
1041 }
1042
written(int fd,const uint8_t * ptr,size_t nbytes) const1043 ssize_t InetStreamSocket::written(int fd, const uint8_t* ptr, size_t nbytes) const
1044 {
1045 size_t nleft;
1046 ssize_t nwritten;
1047 const char* bufp;
1048
1049 nleft = nbytes;
1050 bufp = reinterpret_cast<const char*>(ptr);
1051
1052 while (nleft > 0)
1053 {
1054 // the O_NONBLOCK flag is not set, this is a blocking I/O.
1055 #ifdef _MSC_VER
1056 int writeAmount = std::min((int)nleft, MaxSendPacketSize);
1057
1058 if ((nwritten = ::send(fd, bufp, writeAmount, 0)) < 0)
1059 #else
1060 if ((nwritten = ::write(fd, bufp, nleft)) < 0)
1061 #endif
1062 {
1063 if (errno == EINTR)
1064 nwritten = 0;
1065 else
1066 {
1067 // save the error no first
1068 int e = errno;
1069 string errorMsg = "InetStreamSocket::write error: ";
1070 scoped_array<char> buf(new char[80]);
1071 #if STRERROR_R_CHAR_P
1072 const char* p;
1073
1074 if ((p = strerror_r(e, buf.get(), 80)) != 0)
1075 errorMsg += p;
1076
1077 #else
1078 int p;
1079
1080 if ((p = strerror_r(e, buf.get(), 80)) == 0)
1081 errorMsg += buf.get();
1082
1083 #endif
1084 throw runtime_error(errorMsg);
1085 }
1086 }
1087
1088 nleft -= nwritten;
1089 bufp += nwritten;
1090 }
1091
1092 return nbytes;
1093 }
1094
addr2String() const1095 const string InetStreamSocket::addr2String() const
1096 {
1097 string s;
1098 #ifdef _MSC_VER
1099 //This is documented to be thread-safe in Windows
1100 s = inet_ntoa(fSa.sin_addr);
1101 #else
1102 char dst[INET_ADDRSTRLEN];
1103 s = inet_ntop(AF_INET, &fSa.sin_addr, dst, INET_ADDRSTRLEN);
1104 #endif
1105 return s;
1106 }
1107
isSameAddr(const Socket * rhs) const1108 bool InetStreamSocket::isSameAddr(const Socket* rhs) const
1109 {
1110 const InetStreamSocket* issp = dynamic_cast<const InetStreamSocket*>(rhs);
1111
1112 if (!issp) return false;
1113
1114 return (fSa.sin_addr.s_addr == issp->fSa.sin_addr.s_addr);
1115 }
1116
1117 /*static*/
ping(const std::string & ipaddr,const struct timespec * timeout)1118 int InetStreamSocket::ping(const std::string& ipaddr, const struct timespec* timeout)
1119 {
1120 sockaddr_in pingaddr;
1121 memset(&pingaddr, 0, sizeof(pingaddr));
1122
1123 if (inet_aton(ipaddr.c_str(), &pingaddr.sin_addr) == 0)
1124 return -1;
1125
1126 long msecs = 30 * 1000;
1127
1128 if (timeout)
1129 msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
1130
1131 #ifndef _MSC_VER
1132 int pingsock;
1133 pingsock = ::socket(PF_INET, SOCK_RAW, IPPROTO_ICMP);
1134
1135 if (pingsock < 0)
1136 return -1;
1137
1138 ssize_t len = 0;
1139 size_t pktlen = 0;
1140 const size_t PktSize = 1024;
1141 char pkt[PktSize];
1142 memset(pkt, 0, PktSize);
1143 struct icmp* pingPktPtr = reinterpret_cast<struct icmp*>(pkt);
1144
1145 pingPktPtr->icmp_type = ICMP_ECHO;
1146 pingPktPtr->icmp_cksum = in_cksum(reinterpret_cast<unsigned short*>(pkt), PktSize);
1147
1148 pktlen = 56 + ICMP_MINLEN;
1149 len = ::sendto(pingsock, pkt, pktlen, 0, reinterpret_cast<const struct sockaddr*>(&pingaddr),
1150 sizeof(pingaddr));
1151
1152 if (len < 0 || static_cast<size_t>(len) != pktlen)
1153 {
1154 ::close(pingsock);
1155 return -1;
1156 }
1157
1158 memset(pkt, 0, PktSize);
1159 pktlen = PktSize;
1160
1161 int pollrc = 0;
1162 pollrc = pollConnection(pingsock, msecs);
1163
1164 if (pollrc != 1)
1165 {
1166 ::close(pingsock);
1167 return -1;
1168 }
1169
1170 len = ::recvfrom(pingsock, pkt, pktlen, 0, 0, 0);
1171
1172 if (len < 76)
1173 {
1174 ::close(pingsock);
1175 return -1;
1176 }
1177
1178 struct ip* iphdr = reinterpret_cast<struct ip*>(pkt);
1179
1180 pingPktPtr = reinterpret_cast<struct icmp*>(pkt + (iphdr->ip_hl << 2));
1181
1182 if (pingPktPtr->icmp_type != ICMP_ECHOREPLY)
1183 {
1184 ::close(pingsock);
1185 return -1;
1186 }
1187
1188 ::close(pingsock);
1189
1190 #else //Windows version
1191 HANDLE icmpFile;
1192 icmpFile = IcmpCreateFile();
1193
1194 if (icmpFile == INVALID_HANDLE_VALUE)
1195 return -1;
1196
1197 DWORD ret;
1198 const size_t PingPktSize = 1024;
1199 char rqd[PingPktSize];
1200 WORD rqs = PingPktSize;
1201 char rpd[PingPktSize];
1202 DWORD rps = PingPktSize;
1203
1204 ZeroMemory(rqd, PingPktSize);
1205 ZeroMemory(rpd, PingPktSize);
1206
1207 rqs = 64;
1208
1209 ret = IcmpSendEcho(icmpFile, pingaddr.sin_addr.s_addr, rqd, rqs, 0, rpd, rps, msecs);
1210
1211 if (ret <= 0)
1212 {
1213 IcmpCloseHandle(icmpFile);
1214 return -1;
1215 }
1216
1217 PICMP_ECHO_REPLY echoReply = (PICMP_ECHO_REPLY)rpd;
1218
1219 if (echoReply->Status != IP_SUCCESS)
1220 {
1221 IcmpCloseHandle(icmpFile);
1222 return -1;
1223 }
1224
1225 IcmpCloseHandle(icmpFile);
1226 #endif
1227
1228 return 0;
1229 }
1230
isConnected() const1231 bool InetStreamSocket::isConnected() const
1232 {
1233 int error = 0;
1234 socklen_t len = sizeof(error);
1235 int retval = getsockopt(fSocketParms.sd(), SOL_SOCKET, SO_ERROR, &error, &len);
1236
1237 if (error || retval)
1238 return false;
1239
1240 struct pollfd pfd[1];
1241 pfd[0].fd = fSocketParms.sd();
1242 pfd[0].events = POLLIN;
1243 pfd[0].revents = 0;
1244
1245 error = poll(pfd, 1, 0);
1246
1247 if ((error < 0) || (pfd[0].revents & (POLLHUP | POLLNVAL | POLLERR)))
1248 {
1249 return false;
1250 }
1251
1252 return true;
1253 }
1254
hasData() const1255 bool InetStreamSocket::hasData() const
1256 {
1257 int count;
1258 char buf[1];
1259 ssize_t retval;
1260 ioctl(fSocketParms.sd(), FIONREAD, &count);
1261
1262 if (count)
1263 return true;
1264
1265 // EAGAIN | EWOULDBLOCK means the socket is clear. Anything else is data or error
1266 retval = recv(fSocketParms.sd(), buf, 1, MSG_DONTWAIT);
1267
1268 if (retval & (EAGAIN | EWOULDBLOCK))
1269 return false;
1270
1271 return true;
1272 }
1273
1274 } //namespace messageqcpp
1275
1276