1 /*
2  * Copyright (C) 2012-2020 Codership Oy <info@codership.com>
3  */
4 
5 #include "asio_tcp.hpp"
6 #include "gcomm/util.hpp"
7 #include "gcomm/common.hpp"
8 
9 #define FAILED_HANDLER(_e) failed_handler(_e, __FUNCTION__, __LINE__)
10 
11 // Helpers to set socket buffer sizes for both connecting
12 // and listening sockets.
13 
14 static bool asio_recv_buf_warned(false);
15 template <class Socket>
set_recv_buf_size_helper(const gu::Config & conf,Socket & socket)16 void set_recv_buf_size_helper(const gu::Config& conf, Socket& socket)
17 {
18     if (conf.get(gcomm::Conf::SocketRecvBufSize) != GCOMM_ASIO_AUTO_BUF_SIZE)
19     {
20         size_t const recv_buf_size
21             (conf.get<size_t>(gcomm::Conf::SocketRecvBufSize));
22         // this should have been checked already
23         assert(ssize_t(recv_buf_size) >= 0);
24 
25         socket->set_receive_buffer_size(recv_buf_size);
26         auto cur_value(socket->get_receive_buffer_size());
27         log_debug << "socket recv buf size " << cur_value;
28         if (cur_value < recv_buf_size && not asio_recv_buf_warned)
29         {
30             log_warn << "Receive buffer size " << cur_value
31                      << " less than requested " << recv_buf_size
32                      << ", this may affect performance in high latency/high "
33                      << "throughput networks.";
34             asio_recv_buf_warned = true;
35         }
36     }
37 }
38 
39 static bool asio_send_buf_warned(false);
40 template <class Socket>
set_send_buf_size_helper(const gu::Config & conf,Socket & socket)41 void set_send_buf_size_helper(const gu::Config& conf, Socket& socket)
42 {
43     if (conf.get(gcomm::Conf::SocketSendBufSize) != GCOMM_ASIO_AUTO_BUF_SIZE)
44     {
45         size_t const send_buf_size
46             (conf.get<size_t>(gcomm::Conf::SocketSendBufSize));
47         // this should have been checked already
48         assert(ssize_t(send_buf_size) >= 0);
49 
50         socket->set_send_buffer_size(send_buf_size);
51         auto cur_value(socket->get_send_buffer_size());
52         log_debug << "socket send buf size " << cur_value;
53         if (cur_value < send_buf_size && not asio_send_buf_warned)
54         {
55             log_warn << "Send buffer size " << cur_value
56                      << " less than requested " << send_buf_size
57                      << ", this may affect performance in high latency/high "
58                      << "throughput networks.";
59             asio_send_buf_warned = true;
60         }
61     }
62 }
63 
AsioTcpSocket(AsioProtonet & net,const gu::URI & uri)64 gcomm::AsioTcpSocket::AsioTcpSocket(AsioProtonet& net, const gu::URI& uri)
65     :
66     Socket       (uri),
67     net_         (net),
68     socket_      (net.io_service_.make_socket(uri)),
69     send_q_      (),
70     last_queued_tstamp_(),
71     recv_buf_    (net_.mtu() + NetHeader::serial_size_),
72     recv_offset_ (0),
73     last_delivered_tstamp_(),
74     state_       (S_CLOSED),
75     deferred_close_timer_()
76 {
77     log_debug << "ctor for " << id();
78 }
79 
AsioTcpSocket(AsioProtonet & net,const gu::URI & uri,const std::shared_ptr<gu::AsioSocket> & socket)80 gcomm::AsioTcpSocket::AsioTcpSocket(AsioProtonet& net,
81                                     const gu::URI& uri,
82                                     const std::shared_ptr<gu::AsioSocket>& socket)
83     :
84     Socket       (uri),
85     net_         (net),
86     socket_      (socket),
87     send_q_      (),
88     last_queued_tstamp_(),
89     recv_buf_    (net_.mtu() + NetHeader::serial_size_),
90     recv_offset_ (0),
91     last_delivered_tstamp_(),
92     state_       (S_CLOSED),
93     deferred_close_timer_()
94 {
95     log_debug << "ctor for " << id();
96 }
97 
~AsioTcpSocket()98 gcomm::AsioTcpSocket::~AsioTcpSocket()
99 {
100     log_debug << "dtor for " << id() << " state " << state_
101              << " send q size " << send_q_.size();
102     if (state_ != S_CLOSED)
103     {
104         socket_->close();
105     }
106 }
107 
failed_handler(const gu::AsioErrorCode & ec,const std::string & func,int line)108 void gcomm::AsioTcpSocket::failed_handler(const gu::AsioErrorCode& ec,
109                                           const std::string& func,
110                                           int line)
111 {
112     log_debug << "failed handler from " << func << ":" << line
113               << " socket " << id()
114               << " error " << ec
115               << " " << socket_->is_open() << " state " << state();
116 
117     try
118     {
119         log_debug << "local endpoint " << local_addr()
120                   << " remote endpoint " << remote_addr();
121     } catch (...) { }
122 
123     const State prev_state(state());
124 
125     if (state() != S_CLOSED)
126     {
127         state_ = S_FAILED;
128     }
129 
130     if (prev_state != S_FAILED && prev_state != S_CLOSED)
131     {
132         net_.dispatch(id(), Datagram(), ProtoUpMeta(ec.value()));
133     }
134 }
135 
connect_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec)136 void gcomm::AsioTcpSocket::connect_handler(gu::AsioSocket& socket,
137                                            const gu::AsioErrorCode& ec)
138 {
139     Critical<AsioProtonet> crit(net_);
140 
141     try
142     {
143         if (ec)
144         {
145             FAILED_HANDLER(ec);
146             return;
147         }
148         else
149         {
150             state_ = S_CONNECTED;
151             init_tstamps();
152             net_.dispatch(id(), Datagram(), ProtoUpMeta(ec.value()));
153             async_receive();
154         }
155     }
156     catch (const gu::Exception& e)
157     {
158         FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
159     }
160 }
161 
connect(const gu::URI & uri)162 void gcomm::AsioTcpSocket::connect(const gu::URI& uri)
163 {
164     try
165     {
166         Critical<AsioProtonet> crit(net_);
167 
168         socket_->open(uri);
169 
170 
171         set_buf_sizes(); // Must be done before connect
172         const std::string bind_ip(uri.get_option(gcomm::Socket::OptIfAddr, ""));
173         if (not bind_ip.empty())
174         {
175             socket_->bind(gu::make_address(bind_ip));
176         }
177 
178         socket_->async_connect(uri, shared_from_this());
179         state_ = S_CONNECTING;
180     }
181     catch (const gu::Exception& e)
182     {
183         std::ostringstream msg;
184         msg << "error while connecting to remote host "
185             << uri.to_string()
186             << "', asio error '" << e.what() << "'";
187         log_warn << msg.str();
188         gu_throw_error(e.get_errno()) << msg.str();
189     }
190 }
191 
192 #include "gu_disable_non_virtual_dtor.hpp"
193 
194 // Helper class to keep the socket open for writing remaining messages
195 // after gcomm::AsioTcpSocket::close() has been called.
196 // The socket is kept open until all queued messages have been written
197 // or timeout occurs. This is achieved by storing shared pointer
198 // of the socket into timer object.
199 class gcomm::AsioTcpSocket::DeferredCloseTimer
200     : public gu::AsioSteadyTimerHandler
201     , public std::enable_shared_from_this<DeferredCloseTimer>
202 {
203 public:
DeferredCloseTimer(gu::AsioIoService & io_service,const std::shared_ptr<AsioTcpSocket> & socket)204     DeferredCloseTimer(gu::AsioIoService& io_service,
205                        const std::shared_ptr<AsioTcpSocket>& socket)
206         : socket_(socket)
207         , io_service_(io_service)
208         , timer_(io_service_)
209     {
210     }
211 
~DeferredCloseTimer()212     ~DeferredCloseTimer()
213     {
214         log_info << "Deferred close timer destruct";
215     }
216 
start()217     void start()
218     {
219         timer_.expires_from_now(std::chrono::seconds(5));
220         timer_.async_wait(shared_from_this());
221         log_info << "Deferred close timer started for socket with "
222                  << "remote endpoint: " << socket_->remote_addr();
223     }
224 
cancel()225     void cancel()
226     {
227         log_debug << "Deferred close timer cancel " << socket_->socket_;
228         timer_.cancel();
229     }
230 
handle_wait(const gu::AsioErrorCode & ec)231     virtual void handle_wait(const gu::AsioErrorCode& ec) GALERA_OVERRIDE
232     {
233         log_info << "Deferred close timer handle_wait "
234                  << ec << " for " << socket_->socket_;
235         socket_->close();
236         socket_.reset();
237     }
238 
239 private:
240     std::shared_ptr<AsioTcpSocket> socket_;
241     gu::AsioIoService& io_service_;
242     gu::AsioSteadyTimer timer_;
243 };
244 
245 #include "gu_enable_non_virtual_dtor.hpp"
246 
247 
close()248 void gcomm::AsioTcpSocket::close()
249 {
250     Critical<AsioProtonet> crit(net_);
251 
252     if (state() == S_CLOSED || state() == S_CLOSING) return;
253 
254     log_debug << "closing " << id()
255               << " socket " << socket_
256               << " state " << state()
257               << " send_q size " << send_q_.size();
258 
259     if (send_q_.empty() == true || state() != S_CONNECTED)
260     {
261         socket_->close();
262         state_ = S_CLOSED;
263     }
264     else
265     {
266         state_ = S_CLOSING;
267         auto timer(std::make_shared<DeferredCloseTimer>(
268                        net_.io_service_, shared_from_this()));
269         deferred_close_timer_ = timer;
270         timer->start();
271     }
272 }
273 
274 // Enable to introduce random errors for write handler
275 // #define GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
276 
write_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec,size_t bytes_transferred)277 void gcomm::AsioTcpSocket::write_handler(gu::AsioSocket& socket,
278                                          const gu::AsioErrorCode& ec,
279                                          size_t bytes_transferred)
280 {
281 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
282     static const long empty_rate(10000);
283     static const long bytes_transferred_less_than_rate(10000);
284     static const long bytes_transferred_not_zero_rate(10000);
285 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
286 
287     Critical<AsioProtonet> crit(net_);
288 
289     if (state() != S_CONNECTED && state() != S_CLOSING)
290     {
291         log_debug << "write handler for " << id()
292                   << " state " << state();
293         if (not gu::is_verbose_error(ec))
294         {
295             log_warn << "write_handler(): " << ec.message()
296                      << " (" << gu::extra_error_info(ec) << ")";
297         }
298         return;
299     }
300 
301     log_debug << "gcomm::AsioTcpSocket::write_handler() ec " << ec << " socket "
302               << socket_ << " send_q " << send_q_.size();
303     if (!ec)
304     {
305         if (send_q_.empty() == true
306 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
307             || ::rand() % empty_rate == 0
308 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
309             )
310         {
311             log_warn << "write_handler() called with empty send_q_. "
312                      << "Transport may not be reliable, closing the socket";
313             FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
314         }
315         else if (send_q_.front().len() < bytes_transferred
316 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
317                  || ::rand() % bytes_transferred_less_than_rate == 0
318 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
319             )
320         {
321             log_warn << "write_handler() bytes_transferred "
322                      << bytes_transferred
323                      << " less than sent "
324                      << send_q_.front().len()
325                      << ". Transport may not be reliable, closing the socket";
326             FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
327         }
328         else
329         {
330             while (send_q_.empty() == false &&
331                    bytes_transferred >= send_q_.front().len())
332             {
333                 const Datagram& dg(send_q_.front());
334                 bytes_transferred -= dg.len();
335                 send_q_.pop_front();
336             }
337             log_debug << "AsioTcpSocket::write_handler() after queue purge "
338                       << socket_
339                       << " send_q " << send_q_.size();
340             if (bytes_transferred != 0
341 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
342                 || ::rand() % bytes_transferred_not_zero_rate == 0
343 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
344                 )
345             {
346                 log_warn << "write_handler() bytes_transferred "
347                          << bytes_transferred
348                          << " after processing the send_q_. "
349                          << "Transport may not be reliable, closing the socket";
350                 FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
351             }
352             else if (send_q_.empty() == false)
353             {
354                 const Datagram& dg(send_q_.front());
355                 std::array<gu::AsioConstBuffer, 2> cbs;
356                 cbs[0] = gu::AsioConstBuffer(dg.header()
357                                              + dg.header_offset(),
358                                              dg.header_len());
359                 cbs[1] = gu::AsioConstBuffer(dg.payload().data(),
360                                              dg.payload().size());
361                 socket_->async_write(cbs, shared_from_this());
362             }
363             else if (state_ == S_CLOSING)
364             {
365                 log_debug << "deferred close of " << id();
366                 socket_->close();
367                 // deferred_close_timer_->cancel();
368                 cancel_deferred_close_timer();
369                 state_ = S_CLOSED;
370             }
371         }
372     }
373     else if (state_ == S_CLOSING)
374     {
375         log_debug << "deferred close of " << id() << " error " << ec;
376         socket_->close();
377         // deferred_close_timer_->cancel();
378         cancel_deferred_close_timer();
379         state_ = S_CLOSED;
380     }
381     else
382     {
383         FAILED_HANDLER(ec);
384     }
385 }
386 
set_option(const std::string & key,const std::string & val)387 void gcomm::AsioTcpSocket::set_option(const std::string& key,
388                                       const std::string& val)
389 {
390     // Currently adjustable socket.recv_buf_size and socket.send_buf_size
391     // bust be set before the connection is established, so the runtime
392     // setting will not be effective.
393     log_warn << "Setting " << key << " in run time does not have effect, "
394              << "please set the configuration in provider options "
395              << "and restart";
396 }
397 
398 namespace gcomm
399 {
400     class AsioPostForSendHandler
401     {
402     public:
AsioPostForSendHandler(const std::shared_ptr<AsioTcpSocket> & socket)403         AsioPostForSendHandler(const std::shared_ptr<AsioTcpSocket>& socket)
404             :
405             socket_(socket)
406         { }
operator ()()407         void operator()()
408         {
409             log_debug << "AsioPostForSendHandler " << socket_->socket_;
410             Critical<AsioProtonet> crit(socket_->net_);
411             // Send queue is processed also in closing state
412             // in order to deliver as many messages as possible,
413             // even if the socket has been discarded by
414             // upper layers.
415             if ((socket_->state() == gcomm::Socket::S_CONNECTED ||
416                  socket_->state() == gcomm::Socket::S_CLOSING) &&
417                 socket_->send_q_.empty() == false)
418             {
419                 const gcomm::Datagram& dg(socket_->send_q_.front());
420                 std::array<gu::AsioConstBuffer, 2> cbs;
421                 cbs[0] = gu::AsioConstBuffer(dg.header()
422                                              + dg.header_offset(),
423                                              dg.header_len());
424                 cbs[1] = gu::AsioConstBuffer(dg.payload().data(),
425                                              dg.payload().size());
426                 socket_->socket_->async_write(cbs, socket_);
427             }
428         }
429     private:
430         std::shared_ptr<AsioTcpSocket> socket_;
431     };
432 }
433 
send(int segment,const Datagram & dg)434 int gcomm::AsioTcpSocket::send(int segment, const Datagram& dg)
435 {
436     Critical<AsioProtonet> crit(net_);
437 
438     log_debug << "AsioTcpSocket::send() socket "
439               << socket_ << " state " << state_ << " send_q " << send_q_.size();
440     if (state() != S_CONNECTED)
441     {
442         return ENOTCONN;
443     }
444 
445     if (send_q_.size() >= max_send_q_bytes)
446     {
447         return ENOBUFS;
448     }
449 
450     NetHeader hdr(static_cast<uint32_t>(dg.len()), net_.version_);
451 
452     if (net_.checksum_ != NetHeader::CS_NONE)
453     {
454         hdr.set_crc32(crc32(net_.checksum_, dg), net_.checksum_);
455     }
456 
457     last_queued_tstamp_ = gu::datetime::Date::monotonic();
458     // Make copy of datagram to be able to adjust the header
459     Datagram priv_dg(dg);
460     priv_dg.set_header_offset(priv_dg.header_offset() -
461                               NetHeader::serial_size_);
462     serialize(hdr,
463               priv_dg.header(),
464               priv_dg.header_size(),
465               priv_dg.header_offset());
466     send_q_.push_back(segment, priv_dg);
467     if (send_q_.size() == 1)
468     {
469         net_.io_service_.post(AsioPostForSendHandler(shared_from_this()));
470     }
471     return 0;
472 }
473 
474 
read_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec,const size_t bytes_transferred)475 void gcomm::AsioTcpSocket::read_handler(gu::AsioSocket& socket,
476                                         const gu::AsioErrorCode& ec,
477                                         const size_t bytes_transferred)
478 {
479     Critical<AsioProtonet> crit(net_);
480 
481     if (ec)
482     {
483         if (not gu::is_verbose_error(ec))
484         {
485             log_warn << "read_handler(): " << ec.message() << " ("
486                      << gu::extra_error_info(ec) << ")";
487         }
488         FAILED_HANDLER(ec);
489         return;
490     }
491 
492     if (state() != S_CONNECTED && state() != S_CLOSING)
493     {
494         log_debug << "read handler for " << id()
495                   << " state " << state();
496         return;
497     }
498 
499     recv_offset_ += bytes_transferred;
500 
501     while (recv_offset_ >= NetHeader::serial_size_)
502     {
503         NetHeader hdr;
504         try
505         {
506             unserialize(&recv_buf_[0], recv_buf_.size(), 0, hdr);
507         }
508         catch (gu::Exception& e)
509         {
510             FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
511             return;
512         }
513         if (recv_offset_ >= hdr.len() + NetHeader::serial_size_)
514         {
515             Datagram dg(
516                 gu::SharedBuffer(
517                     new gu::Buffer(&recv_buf_[0] + NetHeader::serial_size_,
518                                    &recv_buf_[0] + NetHeader::serial_size_
519                                    + hdr.len())));
520             if (net_.checksum_ != NetHeader::CS_NONE)
521             {
522 #ifdef TEST_NET_CHECKSUM_ERROR
523                 long rnd(rand());
524                 if (rnd % 10000 == 0)
525                 {
526                     hdr.set_crc32(net_.checksum_, static_cast<uint32_t>(rnd));
527                 }
528 #endif /* TEST_NET_CHECKSUM_ERROR */
529 
530                 if (check_cs (hdr, dg))
531                 {
532                     log_warn << "checksum failed, hdr: len=" << hdr.len()
533                              << " has_crc32="  << hdr.has_crc32()
534                              << " has_crc32c=" << hdr.has_crc32c()
535                              << " crc32=" << hdr.crc32();
536                     FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
537                     return;
538                 }
539             }
540             ProtoUpMeta um;
541             last_delivered_tstamp_ = gu::datetime::Date::monotonic();
542             net_.dispatch(id(), dg, um);
543             recv_offset_ -= NetHeader::serial_size_ + hdr.len();
544 
545             if (recv_offset_ > 0)
546             {
547                 memmove(&recv_buf_[0],
548                         &recv_buf_[0] + NetHeader::serial_size_ + hdr.len(),
549                         recv_offset_);
550             }
551         }
552         else
553         {
554             break;
555         }
556     }
557 
558     if (socket_->is_open())
559     {
560         socket_->async_read(gu::AsioMutableBuffer(
561                                 &recv_buf_[0] + recv_offset_,
562                                 recv_buf_.size() - recv_offset_),
563                             shared_from_this());
564     }
565 }
566 
read_completion_condition(gu::AsioSocket &,const gu::AsioErrorCode & ec,const size_t bytes_transferred)567 size_t gcomm::AsioTcpSocket::read_completion_condition(
568     gu::AsioSocket&,
569     const gu::AsioErrorCode& ec,
570     const size_t bytes_transferred)
571 {
572     Critical<AsioProtonet> crit(net_);
573     if (ec)
574     {
575         if (not gu::is_verbose_error(ec))
576         {
577             log_warn << "read_completion_condition(): "
578                      << ec.message() << " ("
579                      << gu::extra_error_info(ec) << ")";
580         }
581         FAILED_HANDLER(ec);
582         return 0;
583     }
584 
585     if (state() != S_CONNECTED && state() != S_CLOSING)
586     {
587         log_debug << "read completion condition for " << id()
588                   << " state " << state();
589         return 0;
590     }
591 
592     if (recv_offset_ + bytes_transferred >= NetHeader::serial_size_)
593     {
594         NetHeader hdr;
595         try
596         {
597             unserialize(&recv_buf_[0], NetHeader::serial_size_, 0, hdr);
598         }
599         catch (const gu::Exception& e)
600         {
601             log_warn << "unserialize error " << e.what();
602             FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
603             return 0;
604         }
605         if (recv_offset_ + bytes_transferred >= NetHeader::serial_size_ + hdr.len())
606         {
607             return 0;
608         }
609     }
610 
611     return (recv_buf_.size() - recv_offset_);
612 }
613 
614 
async_receive()615 void gcomm::AsioTcpSocket::async_receive()
616 {
617     Critical<AsioProtonet> crit(net_);
618 
619     gcomm_assert(state() == S_CONNECTED);
620 
621     socket_->async_read(gu::AsioMutableBuffer(&recv_buf_[0], recv_buf_.size()),
622                         shared_from_this());
623 }
624 
mtu() const625 size_t gcomm::AsioTcpSocket::mtu() const
626 {
627     return net_.mtu();
628 }
629 
630 
631 
local_addr() const632 std::string gcomm::AsioTcpSocket::local_addr() const
633 {
634     return socket_->local_addr();
635 }
636 
remote_addr() const637 std::string gcomm::AsioTcpSocket::remote_addr() const
638 {
639     return socket_->remote_addr();
640 }
641 
set_buf_sizes()642 void gcomm::AsioTcpSocket::set_buf_sizes()
643 {
644     set_recv_buf_size_helper(net_.conf(), socket_);
645     set_send_buf_size_helper(net_.conf(), socket_);
646 }
647 
cancel_deferred_close_timer()648 void gcomm::AsioTcpSocket::cancel_deferred_close_timer()
649 {
650     auto timer(deferred_close_timer_.lock());
651     if (timer) timer->cancel();
652 }
653 
654 #ifndef __DragonFly__
stats() const655 gcomm::SocketStats gcomm::AsioTcpSocket::stats() const
656 {
657     SocketStats ret;
658 #ifndef __DragonFly__
659 /* XXX emmmm??? */
660     try
661     {
662         auto tcpi(socket_->get_tcp_info());
663         ret.rtt            = tcpi.tcpi_rtt;
664         ret.rttvar         = tcpi.tcpi_rttvar;
665         ret.rto            = tcpi.tcpi_rto;
666 #if defined(__linux__)
667         ret.lost           = tcpi.tcpi_lost;
668 #else
669         ret.lost           = 0;
670 #endif /* __linux__ */
671         ret.last_data_recv = tcpi.tcpi_last_data_recv;
672         ret.cwnd           = tcpi.tcpi_snd_cwnd;
673         gu::datetime::Date now(gu::datetime::Date::monotonic());
674         Critical<AsioProtonet> crit(net_);
675         ret.last_queued_since = (now - last_queued_tstamp_).get_nsecs();
676         ret.last_delivered_since = (now - last_delivered_tstamp_).get_nsecs();
677         ret.send_queue_length = send_q_.size();
678         ret.send_queue_bytes = send_q_.queued_bytes();
679         ret.send_queue_segments = send_q_.segments();
680     }
681     catch (...)
682     { }
683 #endif
684     return ret;
685 }
686 #endif
687 
AsioTcpAcceptor(AsioProtonet & net,const gu::URI & uri)688 gcomm::AsioTcpAcceptor::AsioTcpAcceptor(AsioProtonet& net, const gu::URI& uri)
689     :
690     Acceptor        (uri),
691     net_            (net),
692     acceptor_       (net_.io_service_.make_acceptor(uri)),
693     accepted_socket_()
694 { }
695 
~AsioTcpAcceptor()696 gcomm::AsioTcpAcceptor::~AsioTcpAcceptor()
697 {
698     close();
699 }
700 
701 
accept_handler(gu::AsioAcceptor &,const std::shared_ptr<gu::AsioSocket> & accepted_socket,const gu::AsioErrorCode & error)702 void gcomm::AsioTcpAcceptor::accept_handler(
703     gu::AsioAcceptor&,
704     const std::shared_ptr<gu::AsioSocket>& accepted_socket,
705     const gu::AsioErrorCode& error)
706 {
707     if (!error)
708     {
709         auto socket(std::make_shared<AsioTcpSocket>(net_, uri_, accepted_socket));
710         socket->state_ = Socket::S_CONNECTED;
711         accepted_socket_ = socket;
712         log_debug << "accepted socket " << socket->id();
713         net_.dispatch(id(), Datagram(), ProtoUpMeta(error.value()));
714         acceptor_->async_accept(shared_from_this());
715     }
716 }
717 
set_buf_sizes()718 void gcomm::AsioTcpAcceptor::set_buf_sizes()
719 {
720     set_recv_buf_size_helper(net_.conf(), acceptor_);
721     set_send_buf_size_helper(net_.conf(), acceptor_);
722 }
723 
724 
listen(const gu::URI & uri)725 void gcomm::AsioTcpAcceptor::listen(const gu::URI& uri)
726 {
727     acceptor_->open(uri);
728     set_buf_sizes(); // Must be done before listen
729     acceptor_->listen(uri);
730     acceptor_->async_accept(shared_from_this());
731 }
732 
listen_addr() const733 std::string gcomm::AsioTcpAcceptor::listen_addr() const
734 {
735     return acceptor_->listen_addr();
736 }
737 
close()738 void gcomm::AsioTcpAcceptor::close()
739 {
740     acceptor_->close();
741 }
742 
743 
accept()744 gcomm::SocketPtr gcomm::AsioTcpAcceptor::accept()
745 {
746     if (accepted_socket_->state() == Socket::S_CONNECTED)
747     {
748         accepted_socket_->async_receive();
749     }
750     return accepted_socket_;
751 }
752