1 /*
2 * Copyright (C) 2012-2020 Codership Oy <info@codership.com>
3 */
4
5 #include "asio_tcp.hpp"
6 #include "gcomm/util.hpp"
7 #include "gcomm/common.hpp"
8
9 #define FAILED_HANDLER(_e) failed_handler(_e, __FUNCTION__, __LINE__)
10
11 // Helpers to set socket buffer sizes for both connecting
12 // and listening sockets.
13
14 static bool asio_recv_buf_warned(false);
15 template <class Socket>
set_recv_buf_size_helper(const gu::Config & conf,Socket & socket)16 void set_recv_buf_size_helper(const gu::Config& conf, Socket& socket)
17 {
18 if (conf.get(gcomm::Conf::SocketRecvBufSize) != GCOMM_ASIO_AUTO_BUF_SIZE)
19 {
20 size_t const recv_buf_size
21 (conf.get<size_t>(gcomm::Conf::SocketRecvBufSize));
22 // this should have been checked already
23 assert(ssize_t(recv_buf_size) >= 0);
24
25 socket->set_receive_buffer_size(recv_buf_size);
26 auto cur_value(socket->get_receive_buffer_size());
27 log_debug << "socket recv buf size " << cur_value;
28 if (cur_value < recv_buf_size && not asio_recv_buf_warned)
29 {
30 log_warn << "Receive buffer size " << cur_value
31 << " less than requested " << recv_buf_size
32 << ", this may affect performance in high latency/high "
33 << "throughput networks.";
34 asio_recv_buf_warned = true;
35 }
36 }
37 }
38
39 static bool asio_send_buf_warned(false);
40 template <class Socket>
set_send_buf_size_helper(const gu::Config & conf,Socket & socket)41 void set_send_buf_size_helper(const gu::Config& conf, Socket& socket)
42 {
43 if (conf.get(gcomm::Conf::SocketSendBufSize) != GCOMM_ASIO_AUTO_BUF_SIZE)
44 {
45 size_t const send_buf_size
46 (conf.get<size_t>(gcomm::Conf::SocketSendBufSize));
47 // this should have been checked already
48 assert(ssize_t(send_buf_size) >= 0);
49
50 socket->set_send_buffer_size(send_buf_size);
51 auto cur_value(socket->get_send_buffer_size());
52 log_debug << "socket send buf size " << cur_value;
53 if (cur_value < send_buf_size && not asio_send_buf_warned)
54 {
55 log_warn << "Send buffer size " << cur_value
56 << " less than requested " << send_buf_size
57 << ", this may affect performance in high latency/high "
58 << "throughput networks.";
59 asio_send_buf_warned = true;
60 }
61 }
62 }
63
AsioTcpSocket(AsioProtonet & net,const gu::URI & uri)64 gcomm::AsioTcpSocket::AsioTcpSocket(AsioProtonet& net, const gu::URI& uri)
65 :
66 Socket (uri),
67 net_ (net),
68 socket_ (net.io_service_.make_socket(uri)),
69 send_q_ (),
70 last_queued_tstamp_(),
71 recv_buf_ (net_.mtu() + NetHeader::serial_size_),
72 recv_offset_ (0),
73 last_delivered_tstamp_(),
74 state_ (S_CLOSED),
75 deferred_close_timer_()
76 {
77 log_debug << "ctor for " << id();
78 }
79
AsioTcpSocket(AsioProtonet & net,const gu::URI & uri,const std::shared_ptr<gu::AsioSocket> & socket)80 gcomm::AsioTcpSocket::AsioTcpSocket(AsioProtonet& net,
81 const gu::URI& uri,
82 const std::shared_ptr<gu::AsioSocket>& socket)
83 :
84 Socket (uri),
85 net_ (net),
86 socket_ (socket),
87 send_q_ (),
88 last_queued_tstamp_(),
89 recv_buf_ (net_.mtu() + NetHeader::serial_size_),
90 recv_offset_ (0),
91 last_delivered_tstamp_(),
92 state_ (S_CLOSED),
93 deferred_close_timer_()
94 {
95 log_debug << "ctor for " << id();
96 }
97
~AsioTcpSocket()98 gcomm::AsioTcpSocket::~AsioTcpSocket()
99 {
100 log_debug << "dtor for " << id() << " state " << state_
101 << " send q size " << send_q_.size();
102 if (state_ != S_CLOSED)
103 {
104 socket_->close();
105 }
106 }
107
failed_handler(const gu::AsioErrorCode & ec,const std::string & func,int line)108 void gcomm::AsioTcpSocket::failed_handler(const gu::AsioErrorCode& ec,
109 const std::string& func,
110 int line)
111 {
112 log_debug << "failed handler from " << func << ":" << line
113 << " socket " << id()
114 << " error " << ec
115 << " " << socket_->is_open() << " state " << state();
116
117 try
118 {
119 log_debug << "local endpoint " << local_addr()
120 << " remote endpoint " << remote_addr();
121 } catch (...) { }
122
123 const State prev_state(state());
124
125 if (state() != S_CLOSED)
126 {
127 state_ = S_FAILED;
128 }
129
130 if (prev_state != S_FAILED && prev_state != S_CLOSED)
131 {
132 net_.dispatch(id(), Datagram(), ProtoUpMeta(ec.value()));
133 }
134 }
135
connect_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec)136 void gcomm::AsioTcpSocket::connect_handler(gu::AsioSocket& socket,
137 const gu::AsioErrorCode& ec)
138 {
139 Critical<AsioProtonet> crit(net_);
140
141 try
142 {
143 if (ec)
144 {
145 FAILED_HANDLER(ec);
146 return;
147 }
148 else
149 {
150 state_ = S_CONNECTED;
151 init_tstamps();
152 net_.dispatch(id(), Datagram(), ProtoUpMeta(ec.value()));
153 async_receive();
154 }
155 }
156 catch (const gu::Exception& e)
157 {
158 FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
159 }
160 }
161
connect(const gu::URI & uri)162 void gcomm::AsioTcpSocket::connect(const gu::URI& uri)
163 {
164 try
165 {
166 Critical<AsioProtonet> crit(net_);
167
168 socket_->open(uri);
169
170
171 set_buf_sizes(); // Must be done before connect
172 const std::string bind_ip(uri.get_option(gcomm::Socket::OptIfAddr, ""));
173 if (not bind_ip.empty())
174 {
175 socket_->bind(gu::make_address(bind_ip));
176 }
177
178 socket_->async_connect(uri, shared_from_this());
179 state_ = S_CONNECTING;
180 }
181 catch (const gu::Exception& e)
182 {
183 std::ostringstream msg;
184 msg << "error while connecting to remote host "
185 << uri.to_string()
186 << "', asio error '" << e.what() << "'";
187 log_warn << msg.str();
188 gu_throw_error(e.get_errno()) << msg.str();
189 }
190 }
191
192 #include "gu_disable_non_virtual_dtor.hpp"
193
194 // Helper class to keep the socket open for writing remaining messages
195 // after gcomm::AsioTcpSocket::close() has been called.
196 // The socket is kept open until all queued messages have been written
197 // or timeout occurs. This is achieved by storing shared pointer
198 // of the socket into timer object.
199 class gcomm::AsioTcpSocket::DeferredCloseTimer
200 : public gu::AsioSteadyTimerHandler
201 , public std::enable_shared_from_this<DeferredCloseTimer>
202 {
203 public:
DeferredCloseTimer(gu::AsioIoService & io_service,const std::shared_ptr<AsioTcpSocket> & socket)204 DeferredCloseTimer(gu::AsioIoService& io_service,
205 const std::shared_ptr<AsioTcpSocket>& socket)
206 : socket_(socket)
207 , io_service_(io_service)
208 , timer_(io_service_)
209 {
210 }
211
~DeferredCloseTimer()212 ~DeferredCloseTimer()
213 {
214 log_info << "Deferred close timer destruct";
215 }
216
start()217 void start()
218 {
219 timer_.expires_from_now(std::chrono::seconds(5));
220 timer_.async_wait(shared_from_this());
221 log_info << "Deferred close timer started for socket with "
222 << "remote endpoint: " << socket_->remote_addr();
223 }
224
cancel()225 void cancel()
226 {
227 log_debug << "Deferred close timer cancel " << socket_->socket_;
228 timer_.cancel();
229 }
230
handle_wait(const gu::AsioErrorCode & ec)231 virtual void handle_wait(const gu::AsioErrorCode& ec) GALERA_OVERRIDE
232 {
233 log_info << "Deferred close timer handle_wait "
234 << ec << " for " << socket_->socket_;
235 socket_->close();
236 socket_.reset();
237 }
238
239 private:
240 std::shared_ptr<AsioTcpSocket> socket_;
241 gu::AsioIoService& io_service_;
242 gu::AsioSteadyTimer timer_;
243 };
244
245 #include "gu_enable_non_virtual_dtor.hpp"
246
247
close()248 void gcomm::AsioTcpSocket::close()
249 {
250 Critical<AsioProtonet> crit(net_);
251
252 if (state() == S_CLOSED || state() == S_CLOSING) return;
253
254 log_debug << "closing " << id()
255 << " socket " << socket_
256 << " state " << state()
257 << " send_q size " << send_q_.size();
258
259 if (send_q_.empty() == true || state() != S_CONNECTED)
260 {
261 socket_->close();
262 state_ = S_CLOSED;
263 }
264 else
265 {
266 state_ = S_CLOSING;
267 auto timer(std::make_shared<DeferredCloseTimer>(
268 net_.io_service_, shared_from_this()));
269 deferred_close_timer_ = timer;
270 timer->start();
271 }
272 }
273
274 // Enable to introduce random errors for write handler
275 // #define GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
276
write_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec,size_t bytes_transferred)277 void gcomm::AsioTcpSocket::write_handler(gu::AsioSocket& socket,
278 const gu::AsioErrorCode& ec,
279 size_t bytes_transferred)
280 {
281 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
282 static const long empty_rate(10000);
283 static const long bytes_transferred_less_than_rate(10000);
284 static const long bytes_transferred_not_zero_rate(10000);
285 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
286
287 Critical<AsioProtonet> crit(net_);
288
289 if (state() != S_CONNECTED && state() != S_CLOSING)
290 {
291 log_debug << "write handler for " << id()
292 << " state " << state();
293 if (not gu::is_verbose_error(ec))
294 {
295 log_warn << "write_handler(): " << ec.message()
296 << " (" << gu::extra_error_info(ec) << ")";
297 }
298 return;
299 }
300
301 log_debug << "gcomm::AsioTcpSocket::write_handler() ec " << ec << " socket "
302 << socket_ << " send_q " << send_q_.size();
303 if (!ec)
304 {
305 if (send_q_.empty() == true
306 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
307 || ::rand() % empty_rate == 0
308 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
309 )
310 {
311 log_warn << "write_handler() called with empty send_q_. "
312 << "Transport may not be reliable, closing the socket";
313 FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
314 }
315 else if (send_q_.front().len() < bytes_transferred
316 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
317 || ::rand() % bytes_transferred_less_than_rate == 0
318 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
319 )
320 {
321 log_warn << "write_handler() bytes_transferred "
322 << bytes_transferred
323 << " less than sent "
324 << send_q_.front().len()
325 << ". Transport may not be reliable, closing the socket";
326 FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
327 }
328 else
329 {
330 while (send_q_.empty() == false &&
331 bytes_transferred >= send_q_.front().len())
332 {
333 const Datagram& dg(send_q_.front());
334 bytes_transferred -= dg.len();
335 send_q_.pop_front();
336 }
337 log_debug << "AsioTcpSocket::write_handler() after queue purge "
338 << socket_
339 << " send_q " << send_q_.size();
340 if (bytes_transferred != 0
341 #ifdef GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
342 || ::rand() % bytes_transferred_not_zero_rate == 0
343 #endif // GCOMM_ASIO_TCP_SIMULATE_WRITE_HANDLER_ERROR
344 )
345 {
346 log_warn << "write_handler() bytes_transferred "
347 << bytes_transferred
348 << " after processing the send_q_. "
349 << "Transport may not be reliable, closing the socket";
350 FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
351 }
352 else if (send_q_.empty() == false)
353 {
354 const Datagram& dg(send_q_.front());
355 std::array<gu::AsioConstBuffer, 2> cbs;
356 cbs[0] = gu::AsioConstBuffer(dg.header()
357 + dg.header_offset(),
358 dg.header_len());
359 cbs[1] = gu::AsioConstBuffer(dg.payload().data(),
360 dg.payload().size());
361 socket_->async_write(cbs, shared_from_this());
362 }
363 else if (state_ == S_CLOSING)
364 {
365 log_debug << "deferred close of " << id();
366 socket_->close();
367 // deferred_close_timer_->cancel();
368 cancel_deferred_close_timer();
369 state_ = S_CLOSED;
370 }
371 }
372 }
373 else if (state_ == S_CLOSING)
374 {
375 log_debug << "deferred close of " << id() << " error " << ec;
376 socket_->close();
377 // deferred_close_timer_->cancel();
378 cancel_deferred_close_timer();
379 state_ = S_CLOSED;
380 }
381 else
382 {
383 FAILED_HANDLER(ec);
384 }
385 }
386
set_option(const std::string & key,const std::string & val)387 void gcomm::AsioTcpSocket::set_option(const std::string& key,
388 const std::string& val)
389 {
390 // Currently adjustable socket.recv_buf_size and socket.send_buf_size
391 // bust be set before the connection is established, so the runtime
392 // setting will not be effective.
393 log_warn << "Setting " << key << " in run time does not have effect, "
394 << "please set the configuration in provider options "
395 << "and restart";
396 }
397
398 namespace gcomm
399 {
400 class AsioPostForSendHandler
401 {
402 public:
AsioPostForSendHandler(const std::shared_ptr<AsioTcpSocket> & socket)403 AsioPostForSendHandler(const std::shared_ptr<AsioTcpSocket>& socket)
404 :
405 socket_(socket)
406 { }
operator ()()407 void operator()()
408 {
409 log_debug << "AsioPostForSendHandler " << socket_->socket_;
410 Critical<AsioProtonet> crit(socket_->net_);
411 // Send queue is processed also in closing state
412 // in order to deliver as many messages as possible,
413 // even if the socket has been discarded by
414 // upper layers.
415 if ((socket_->state() == gcomm::Socket::S_CONNECTED ||
416 socket_->state() == gcomm::Socket::S_CLOSING) &&
417 socket_->send_q_.empty() == false)
418 {
419 const gcomm::Datagram& dg(socket_->send_q_.front());
420 std::array<gu::AsioConstBuffer, 2> cbs;
421 cbs[0] = gu::AsioConstBuffer(dg.header()
422 + dg.header_offset(),
423 dg.header_len());
424 cbs[1] = gu::AsioConstBuffer(dg.payload().data(),
425 dg.payload().size());
426 socket_->socket_->async_write(cbs, socket_);
427 }
428 }
429 private:
430 std::shared_ptr<AsioTcpSocket> socket_;
431 };
432 }
433
send(int segment,const Datagram & dg)434 int gcomm::AsioTcpSocket::send(int segment, const Datagram& dg)
435 {
436 Critical<AsioProtonet> crit(net_);
437
438 log_debug << "AsioTcpSocket::send() socket "
439 << socket_ << " state " << state_ << " send_q " << send_q_.size();
440 if (state() != S_CONNECTED)
441 {
442 return ENOTCONN;
443 }
444
445 if (send_q_.size() >= max_send_q_bytes)
446 {
447 return ENOBUFS;
448 }
449
450 NetHeader hdr(static_cast<uint32_t>(dg.len()), net_.version_);
451
452 if (net_.checksum_ != NetHeader::CS_NONE)
453 {
454 hdr.set_crc32(crc32(net_.checksum_, dg), net_.checksum_);
455 }
456
457 last_queued_tstamp_ = gu::datetime::Date::monotonic();
458 // Make copy of datagram to be able to adjust the header
459 Datagram priv_dg(dg);
460 priv_dg.set_header_offset(priv_dg.header_offset() -
461 NetHeader::serial_size_);
462 serialize(hdr,
463 priv_dg.header(),
464 priv_dg.header_size(),
465 priv_dg.header_offset());
466 send_q_.push_back(segment, priv_dg);
467 if (send_q_.size() == 1)
468 {
469 net_.io_service_.post(AsioPostForSendHandler(shared_from_this()));
470 }
471 return 0;
472 }
473
474
read_handler(gu::AsioSocket & socket,const gu::AsioErrorCode & ec,const size_t bytes_transferred)475 void gcomm::AsioTcpSocket::read_handler(gu::AsioSocket& socket,
476 const gu::AsioErrorCode& ec,
477 const size_t bytes_transferred)
478 {
479 Critical<AsioProtonet> crit(net_);
480
481 if (ec)
482 {
483 if (not gu::is_verbose_error(ec))
484 {
485 log_warn << "read_handler(): " << ec.message() << " ("
486 << gu::extra_error_info(ec) << ")";
487 }
488 FAILED_HANDLER(ec);
489 return;
490 }
491
492 if (state() != S_CONNECTED && state() != S_CLOSING)
493 {
494 log_debug << "read handler for " << id()
495 << " state " << state();
496 return;
497 }
498
499 recv_offset_ += bytes_transferred;
500
501 while (recv_offset_ >= NetHeader::serial_size_)
502 {
503 NetHeader hdr;
504 try
505 {
506 unserialize(&recv_buf_[0], recv_buf_.size(), 0, hdr);
507 }
508 catch (gu::Exception& e)
509 {
510 FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
511 return;
512 }
513 if (recv_offset_ >= hdr.len() + NetHeader::serial_size_)
514 {
515 Datagram dg(
516 gu::SharedBuffer(
517 new gu::Buffer(&recv_buf_[0] + NetHeader::serial_size_,
518 &recv_buf_[0] + NetHeader::serial_size_
519 + hdr.len())));
520 if (net_.checksum_ != NetHeader::CS_NONE)
521 {
522 #ifdef TEST_NET_CHECKSUM_ERROR
523 long rnd(rand());
524 if (rnd % 10000 == 0)
525 {
526 hdr.set_crc32(net_.checksum_, static_cast<uint32_t>(rnd));
527 }
528 #endif /* TEST_NET_CHECKSUM_ERROR */
529
530 if (check_cs (hdr, dg))
531 {
532 log_warn << "checksum failed, hdr: len=" << hdr.len()
533 << " has_crc32=" << hdr.has_crc32()
534 << " has_crc32c=" << hdr.has_crc32c()
535 << " crc32=" << hdr.crc32();
536 FAILED_HANDLER(gu::AsioErrorCode(EPROTO));
537 return;
538 }
539 }
540 ProtoUpMeta um;
541 last_delivered_tstamp_ = gu::datetime::Date::monotonic();
542 net_.dispatch(id(), dg, um);
543 recv_offset_ -= NetHeader::serial_size_ + hdr.len();
544
545 if (recv_offset_ > 0)
546 {
547 memmove(&recv_buf_[0],
548 &recv_buf_[0] + NetHeader::serial_size_ + hdr.len(),
549 recv_offset_);
550 }
551 }
552 else
553 {
554 break;
555 }
556 }
557
558 if (socket_->is_open())
559 {
560 socket_->async_read(gu::AsioMutableBuffer(
561 &recv_buf_[0] + recv_offset_,
562 recv_buf_.size() - recv_offset_),
563 shared_from_this());
564 }
565 }
566
read_completion_condition(gu::AsioSocket &,const gu::AsioErrorCode & ec,const size_t bytes_transferred)567 size_t gcomm::AsioTcpSocket::read_completion_condition(
568 gu::AsioSocket&,
569 const gu::AsioErrorCode& ec,
570 const size_t bytes_transferred)
571 {
572 Critical<AsioProtonet> crit(net_);
573 if (ec)
574 {
575 if (not gu::is_verbose_error(ec))
576 {
577 log_warn << "read_completion_condition(): "
578 << ec.message() << " ("
579 << gu::extra_error_info(ec) << ")";
580 }
581 FAILED_HANDLER(ec);
582 return 0;
583 }
584
585 if (state() != S_CONNECTED && state() != S_CLOSING)
586 {
587 log_debug << "read completion condition for " << id()
588 << " state " << state();
589 return 0;
590 }
591
592 if (recv_offset_ + bytes_transferred >= NetHeader::serial_size_)
593 {
594 NetHeader hdr;
595 try
596 {
597 unserialize(&recv_buf_[0], NetHeader::serial_size_, 0, hdr);
598 }
599 catch (const gu::Exception& e)
600 {
601 log_warn << "unserialize error " << e.what();
602 FAILED_HANDLER(gu::AsioErrorCode(e.get_errno()));
603 return 0;
604 }
605 if (recv_offset_ + bytes_transferred >= NetHeader::serial_size_ + hdr.len())
606 {
607 return 0;
608 }
609 }
610
611 return (recv_buf_.size() - recv_offset_);
612 }
613
614
async_receive()615 void gcomm::AsioTcpSocket::async_receive()
616 {
617 Critical<AsioProtonet> crit(net_);
618
619 gcomm_assert(state() == S_CONNECTED);
620
621 socket_->async_read(gu::AsioMutableBuffer(&recv_buf_[0], recv_buf_.size()),
622 shared_from_this());
623 }
624
mtu() const625 size_t gcomm::AsioTcpSocket::mtu() const
626 {
627 return net_.mtu();
628 }
629
630
631
local_addr() const632 std::string gcomm::AsioTcpSocket::local_addr() const
633 {
634 return socket_->local_addr();
635 }
636
remote_addr() const637 std::string gcomm::AsioTcpSocket::remote_addr() const
638 {
639 return socket_->remote_addr();
640 }
641
set_buf_sizes()642 void gcomm::AsioTcpSocket::set_buf_sizes()
643 {
644 set_recv_buf_size_helper(net_.conf(), socket_);
645 set_send_buf_size_helper(net_.conf(), socket_);
646 }
647
cancel_deferred_close_timer()648 void gcomm::AsioTcpSocket::cancel_deferred_close_timer()
649 {
650 auto timer(deferred_close_timer_.lock());
651 if (timer) timer->cancel();
652 }
653
654 #ifndef __DragonFly__
stats() const655 gcomm::SocketStats gcomm::AsioTcpSocket::stats() const
656 {
657 SocketStats ret;
658 #ifndef __DragonFly__
659 /* XXX emmmm??? */
660 try
661 {
662 auto tcpi(socket_->get_tcp_info());
663 ret.rtt = tcpi.tcpi_rtt;
664 ret.rttvar = tcpi.tcpi_rttvar;
665 ret.rto = tcpi.tcpi_rto;
666 #if defined(__linux__)
667 ret.lost = tcpi.tcpi_lost;
668 #else
669 ret.lost = 0;
670 #endif /* __linux__ */
671 ret.last_data_recv = tcpi.tcpi_last_data_recv;
672 ret.cwnd = tcpi.tcpi_snd_cwnd;
673 gu::datetime::Date now(gu::datetime::Date::monotonic());
674 Critical<AsioProtonet> crit(net_);
675 ret.last_queued_since = (now - last_queued_tstamp_).get_nsecs();
676 ret.last_delivered_since = (now - last_delivered_tstamp_).get_nsecs();
677 ret.send_queue_length = send_q_.size();
678 ret.send_queue_bytes = send_q_.queued_bytes();
679 ret.send_queue_segments = send_q_.segments();
680 }
681 catch (...)
682 { }
683 #endif
684 return ret;
685 }
686 #endif
687
AsioTcpAcceptor(AsioProtonet & net,const gu::URI & uri)688 gcomm::AsioTcpAcceptor::AsioTcpAcceptor(AsioProtonet& net, const gu::URI& uri)
689 :
690 Acceptor (uri),
691 net_ (net),
692 acceptor_ (net_.io_service_.make_acceptor(uri)),
693 accepted_socket_()
694 { }
695
~AsioTcpAcceptor()696 gcomm::AsioTcpAcceptor::~AsioTcpAcceptor()
697 {
698 close();
699 }
700
701
accept_handler(gu::AsioAcceptor &,const std::shared_ptr<gu::AsioSocket> & accepted_socket,const gu::AsioErrorCode & error)702 void gcomm::AsioTcpAcceptor::accept_handler(
703 gu::AsioAcceptor&,
704 const std::shared_ptr<gu::AsioSocket>& accepted_socket,
705 const gu::AsioErrorCode& error)
706 {
707 if (!error)
708 {
709 auto socket(std::make_shared<AsioTcpSocket>(net_, uri_, accepted_socket));
710 socket->state_ = Socket::S_CONNECTED;
711 accepted_socket_ = socket;
712 log_debug << "accepted socket " << socket->id();
713 net_.dispatch(id(), Datagram(), ProtoUpMeta(error.value()));
714 acceptor_->async_accept(shared_from_this());
715 }
716 }
717
set_buf_sizes()718 void gcomm::AsioTcpAcceptor::set_buf_sizes()
719 {
720 set_recv_buf_size_helper(net_.conf(), acceptor_);
721 set_send_buf_size_helper(net_.conf(), acceptor_);
722 }
723
724
listen(const gu::URI & uri)725 void gcomm::AsioTcpAcceptor::listen(const gu::URI& uri)
726 {
727 acceptor_->open(uri);
728 set_buf_sizes(); // Must be done before listen
729 acceptor_->listen(uri);
730 acceptor_->async_accept(shared_from_this());
731 }
732
listen_addr() const733 std::string gcomm::AsioTcpAcceptor::listen_addr() const
734 {
735 return acceptor_->listen_addr();
736 }
737
close()738 void gcomm::AsioTcpAcceptor::close()
739 {
740 acceptor_->close();
741 }
742
743
accept()744 gcomm::SocketPtr gcomm::AsioTcpAcceptor::accept()
745 {
746 if (accepted_socket_->state() == Socket::S_CONNECTED)
747 {
748 accepted_socket_->async_receive();
749 }
750 return accepted_socket_;
751 }
752