1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/utils/port/SocketFd.h"
8 
9 #include "td/utils/common.h"
10 #include "td/utils/format.h"
11 #include "td/utils/logging.h"
12 #include "td/utils/misc.h"
13 #include "td/utils/port/detail/skip_eintr.h"
14 #include "td/utils/port/PollFlags.h"
15 #include "td/utils/SliceBuilder.h"
16 
17 #if TD_PORT_WINDOWS
18 #include "td/utils/buffer.h"
19 #include "td/utils/port/detail/Iocp.h"
20 #include "td/utils/SpinLock.h"
21 #include "td/utils/VectorQueue.h"
22 
23 #include <limits>
24 #endif
25 
26 #if TD_PORT_POSIX
27 #include <cerrno>
28 
29 #include <arpa/inet.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <netinet/tcp.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <unistd.h>
36 #endif
37 
38 #include <atomic>
39 #include <cstring>
40 
41 namespace td {
42 namespace detail {
43 #if TD_PORT_WINDOWS
44 class SocketFdImpl final : private Iocp::Callback {
45  public:
SocketFdImpl(NativeFd native_fd)46   explicit SocketFdImpl(NativeFd native_fd) : info(std::move(native_fd)) {
47     VLOG(fd) << get_native_fd() << " create from native_fd";
48     get_poll_info().add_flags(PollFlags::Write());
49     Iocp::get()->subscribe(get_native_fd(), this);
50     is_read_active_ = true;
51     notify_iocp_connected();
52   }
53 
SocketFdImpl(NativeFd native_fd,const IPAddress & addr)54   SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) {
55     VLOG(fd) << get_native_fd() << " create from native_fd and connect";
56     get_poll_info().add_flags(PollFlags::Write());
57     Iocp::get()->subscribe(get_native_fd(), this);
58     LPFN_CONNECTEX ConnectExPtr = nullptr;
59     GUID guid = WSAID_CONNECTEX;
60     DWORD numBytes;
61     auto error =
62         ::WSAIoctl(get_native_fd().socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, static_cast<void *>(&guid),
63                    sizeof(guid), static_cast<void *>(&ConnectExPtr), sizeof(ConnectExPtr), &numBytes, nullptr, nullptr);
64     if (error) {
65       on_error(OS_SOCKET_ERROR("WSAIoctl failed"));
66       return;
67     }
68     std::memset(&read_overlapped_, 0, sizeof(read_overlapped_));
69     inc_refcnt();
70     is_read_active_ = true;
71     auto status = ConnectExPtr(get_native_fd().socket(), addr.get_sockaddr(), narrow_cast<int>(addr.get_sockaddr_len()),
72                                nullptr, 0, nullptr, &read_overlapped_);
73 
74     if (status == TRUE || !check_status("Failed to connect")) {
75       is_read_active_ = false;
76       dec_refcnt();
77     }
78   }
79 
close()80   void close() {
81     if (!is_write_waiting_) {
82       VLOG(fd) << get_native_fd() << " will close after ongoing write";
83       auto lock = lock_.lock();
84       need_close_after_write_ = true;
85       return;
86     }
87     notify_iocp_close();
88   }
89 
get_poll_info()90   PollableFdInfo &get_poll_info() {
91     return info;
92   }
get_poll_info() const93   const PollableFdInfo &get_poll_info() const {
94     return info;
95   }
96 
get_native_fd() const97   const NativeFd &get_native_fd() const {
98     return info.native_fd();
99   }
100 
write(Slice data)101   Result<size_t> write(Slice data) {
102     // LOG(ERROR) << "Write: " << format::as_hex_dump<0>(data);
103     output_writer_.append(data);
104     return write_finish(data.size());
105   }
106 
writev(Span<IoSlice> slices)107   Result<size_t> writev(Span<IoSlice> slices) {
108     size_t total_size = 0;
109     for (auto io_slice : slices) {
110       auto size = as_slice(io_slice).size();
111       CHECK(size <= std::numeric_limits<size_t>::max() - total_size);
112       total_size += size;
113     }
114 
115     auto left_size = total_size;
116     for (auto io_slice : slices) {
117       auto slice = as_slice(io_slice);
118       output_writer_.append(slice, left_size);
119       left_size -= slice.size();
120     }
121 
122     return write_finish(total_size);
123   }
124 
write_finish(size_t total_size)125   Result<size_t> write_finish(size_t total_size) {
126     if (is_write_waiting_) {
127       auto lock = lock_.lock();
128       is_write_waiting_ = false;
129       lock.reset();
130       notify_iocp_write();
131     }
132     return total_size;
133   }
134 
read(MutableSlice slice)135   Result<size_t> read(MutableSlice slice) {
136     if (get_poll_info().get_flags_local().has_pending_error()) {
137       TRY_STATUS(get_pending_error());
138     }
139     input_reader_.sync_with_writer();
140     auto res = input_reader_.advance(td::min(slice.size(), input_reader_.size()), slice);
141     if (res == 0) {
142       get_poll_info().clear_flags(PollFlags::Read());
143     } else {
144       // LOG(ERROR) << "Read: " << format::as_hex_dump<0>(Slice(slice.substr(0, res)));
145     }
146     return res;
147   }
148 
get_pending_error()149   Status get_pending_error() {
150     Status res;
151     {
152       auto lock = lock_.lock();
153       if (!pending_errors_.empty()) {
154         res = pending_errors_.pop();
155       }
156       if (res.is_ok()) {
157         get_poll_info().clear_flags(PollFlags::Error());
158       }
159     }
160     return res;
161   }
162 
163  private:
164   PollableFdInfo info;
165   SpinLock lock_;
166 
167   std::atomic<int> refcnt_{1};
168   bool close_flag_{false};
169   bool need_close_after_write_{false};
170 
171   bool is_connected_{false};
172   bool is_read_active_{false};
173   ChainBufferWriter input_writer_;
174   ChainBufferReader input_reader_ = input_writer_.extract_reader();
175   WSAOVERLAPPED read_overlapped_;
176   VectorQueue<Status> pending_errors_;
177 
178   bool is_write_active_{false};
179   std::atomic<bool> is_write_waiting_{false};
180   ChainBufferWriter output_writer_;
181   ChainBufferReader output_reader_ = output_writer_.extract_reader();
182   WSAOVERLAPPED write_overlapped_;
183 
184   char close_overlapped_;
185 
check_status(Slice message)186   bool check_status(Slice message) {
187     auto last_error = WSAGetLastError();
188     if (last_error == ERROR_IO_PENDING) {
189       return true;
190     }
191     on_error(OS_SOCKET_ERROR(message));
192     return false;
193   }
194 
loop_read()195   void loop_read() {
196     CHECK(is_connected_);
197     CHECK(!is_read_active_);
198     if (close_flag_ || need_close_after_write_) {
199       return;
200     }
201     std::memset(&read_overlapped_, 0, sizeof(read_overlapped_));
202     auto dest = input_writer_.prepare_append();
203     WSABUF buf;
204     buf.len = narrow_cast<ULONG>(dest.size());
205     buf.buf = dest.data();
206     DWORD flags = 0;
207     int status = WSARecv(get_native_fd().socket(), &buf, 1, nullptr, &flags, &read_overlapped_, nullptr);
208     if (status == 0 || check_status("Failed to read from connection")) {
209       inc_refcnt();
210       is_read_active_ = true;
211     }
212   }
213 
loop_write()214   void loop_write() {
215     CHECK(is_connected_);
216     CHECK(!is_write_active_);
217 
218     output_reader_.sync_with_writer();
219     auto to_write = output_reader_.prepare_read();
220     if (to_write.empty()) {
221       auto lock = lock_.lock();
222       output_reader_.sync_with_writer();
223       to_write = output_reader_.prepare_read();
224       if (to_write.empty()) {
225         is_write_waiting_ = true;
226         if (need_close_after_write_) {
227           notify_iocp_close();
228         }
229         return;
230       }
231     }
232     if (to_write.empty()) {
233       return;
234     }
235     std::memset(&write_overlapped_, 0, sizeof(write_overlapped_));
236     constexpr size_t BUF_SIZE = 20;
237     WSABUF buf[BUF_SIZE];
238     auto it = output_reader_.clone();
239     size_t buf_i;
240     for (buf_i = 0; buf_i < BUF_SIZE; buf_i++) {
241       auto src = it.prepare_read();
242       if (src.empty()) {
243         break;
244       }
245       buf[buf_i].len = narrow_cast<ULONG>(src.size());
246       buf[buf_i].buf = const_cast<CHAR *>(src.data());
247       it.confirm_read(src.size());
248     }
249     int status =
250         WSASend(get_native_fd().socket(), buf, narrow_cast<DWORD>(buf_i), nullptr, 0, &write_overlapped_, nullptr);
251     if (status == 0 || check_status("Failed to write to connection")) {
252       inc_refcnt();
253       is_write_active_ = true;
254     }
255   }
256 
on_iocp(Result<size_t> r_size,WSAOVERLAPPED * overlapped)257   void on_iocp(Result<size_t> r_size, WSAOVERLAPPED *overlapped) final {
258     // called from other thread
259     if (dec_refcnt() || close_flag_) {
260       VLOG(fd) << "Ignore IOCP (socket is closing)";
261       return;
262     }
263     if (r_size.is_error()) {
264       return on_error(get_socket_pending_error(get_native_fd(), overlapped, r_size.move_as_error()));
265     }
266 
267     if (!is_connected_ && overlapped == &read_overlapped_) {
268       return on_connected();
269     }
270 
271     auto size = r_size.move_as_ok();
272     if (overlapped == &write_overlapped_) {
273       return on_write(size);
274     }
275     if (overlapped == nullptr) {
276       CHECK(size == 0);
277       return on_write(size);
278     }
279 
280     if (overlapped == &read_overlapped_) {
281       return on_read(size);
282     }
283     if (overlapped == reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_)) {
284       return on_close();
285     }
286     UNREACHABLE();
287   }
288 
on_error(Status status)289   void on_error(Status status) {
290     VLOG(fd) << get_native_fd() << " on error " << status;
291     {
292       auto lock = lock_.lock();
293       pending_errors_.push(std::move(status));
294     }
295     get_poll_info().add_flags_from_poll(PollFlags::Error());
296   }
297 
on_connected()298   void on_connected() {
299     VLOG(fd) << get_native_fd() << " on connected";
300     CHECK(!is_connected_);
301     CHECK(is_read_active_);
302     is_connected_ = true;
303     is_read_active_ = false;
304     loop_read();
305     loop_write();
306   }
307 
on_read(size_t size)308   void on_read(size_t size) {
309     VLOG(fd) << get_native_fd() << " on read " << size;
310     CHECK(is_read_active_);
311     is_read_active_ = false;
312     if (size == 0) {
313       get_poll_info().add_flags_from_poll(PollFlags::Close());
314       return;
315     }
316     input_writer_.confirm_append(size);
317     get_poll_info().add_flags_from_poll(PollFlags::Read());
318     loop_read();
319   }
320 
on_write(size_t size)321   void on_write(size_t size) {
322     VLOG(fd) << get_native_fd() << " on write " << size;
323     if (size == 0) {
324       if (is_write_active_) {
325         return;
326       }
327       is_write_active_ = true;
328     }
329     CHECK(is_write_active_);
330     is_write_active_ = false;
331     output_reader_.advance(size);
332     loop_write();
333   }
334 
on_close()335   void on_close() {
336     VLOG(fd) << get_native_fd() << " on close";
337     close_flag_ = true;
338     info.set_native_fd({});
339   }
dec_refcnt()340   bool dec_refcnt() {
341     VLOG(fd) << get_native_fd() << " dec_refcnt from " << refcnt_;
342     if (--refcnt_ == 0) {
343       delete this;
344       return true;
345     }
346     return false;
347   }
inc_refcnt()348   void inc_refcnt() {
349     CHECK(refcnt_ != 0);
350     refcnt_++;
351     VLOG(fd) << get_native_fd() << " inc_refcnt to " << refcnt_;
352   }
353 
notify_iocp_write()354   void notify_iocp_write() {
355     inc_refcnt();
356     Iocp::get()->post(0, this, nullptr);
357   }
notify_iocp_close()358   void notify_iocp_close() {
359     Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_));
360   }
notify_iocp_connected()361   void notify_iocp_connected() {
362     inc_refcnt();
363     Iocp::get()->post(0, this, &read_overlapped_);
364   }
365 };
366 
operator ()(SocketFdImpl * impl)367 void SocketFdImplDeleter::operator()(SocketFdImpl *impl) {
368   impl->close();
369 }
370 
371 class InitWSA {
372  public:
InitWSA()373   InitWSA() {
374     /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
375     WORD wVersionRequested = MAKEWORD(2, 2);
376     WSADATA wsaData;
377     if (WSAStartup(wVersionRequested, &wsaData) != 0) {
378       auto error = OS_SOCKET_ERROR("Failed to init WSA");
379       LOG(FATAL) << error;
380     }
381   }
382 };
383 
384 static InitWSA init_wsa;
385 
386 #else
387 class SocketFdImpl {
388  public:
389   PollableFdInfo info;
390   explicit SocketFdImpl(NativeFd fd) : info(std::move(fd)) {
391   }
392   PollableFdInfo &get_poll_info() {
393     return info;
394   }
395   const PollableFdInfo &get_poll_info() const {
396     return info;
397   }
398 
399   const NativeFd &get_native_fd() const {
400     return info.native_fd();
401   }
402 
403   Result<size_t> writev(Span<IoSlice> slices) {
404     int native_fd = get_native_fd().socket();
405     TRY_RESULT(slices_size, narrow_cast_safe<int>(slices.size()));
406     auto write_res = detail::skip_eintr([&] {
407     // sendmsg can erroneously return 2^32 - 1 on Android 5.1 and Android 6.0, so it must not be used there
408 #if defined(MSG_NOSIGNAL) && !TD_ANDROID
409       msghdr msg;
410       std::memset(&msg, 0, sizeof(msg));
411       msg.msg_iov = const_cast<iovec *>(slices.begin());
412       msg.msg_iovlen = slices_size;
413       return sendmsg(native_fd, &msg, MSG_NOSIGNAL);
414 #else
415       return ::writev(native_fd, slices.begin(), slices_size);
416 #endif
417     });
418     if (write_res >= 0) {
419       auto result = narrow_cast<size_t>(write_res);
420       auto left = result;
421       for (const auto &slice : slices) {
422         if (left <= slice.iov_len) {
423           return result;
424         }
425         left -= slice.iov_len;
426       }
427       LOG(FATAL) << "Receive " << write_res << " as writev response, but tried to write only " << result - left
428                  << " bytes";
429     }
430     return write_finish();
431   }
432 
433   Result<size_t> write(Slice slice) {
434     int native_fd = get_native_fd().socket();
435     auto write_res = detail::skip_eintr([&] {
436       return
437 #ifdef MSG_NOSIGNAL
438           send(native_fd, slice.begin(), slice.size(), MSG_NOSIGNAL);
439 #else
440           ::write(native_fd, slice.begin(), slice.size());
441 #endif
442     });
443     if (write_res >= 0) {
444       auto result = narrow_cast<size_t>(write_res);
445       LOG_CHECK(result <= slice.size()) << "Receive " << write_res << " as write response, but tried to write only "
446                                         << slice.size() << " bytes";
447       return result;
448     }
449     return write_finish();
450   }
451 
452   Result<size_t> write_finish() {
453     auto write_errno = errno;
454     if (write_errno == EAGAIN
455 #if EAGAIN != EWOULDBLOCK
456         || write_errno == EWOULDBLOCK
457 #endif
458     ) {
459       get_poll_info().clear_flags(PollFlags::Write());
460       return 0;
461     }
462 
463     auto error = Status::PosixError(write_errno, PSLICE() << "Write to " << get_native_fd() << " has failed");
464     switch (write_errno) {
465       case EBADF:
466       case ENXIO:
467       case EFAULT:
468       case EINVAL:
469         LOG(FATAL) << error;
470         UNREACHABLE();
471       default:
472         LOG(WARNING) << error;
473       // fallthrough
474       case ECONNRESET:
475       case EDQUOT:
476       case EFBIG:
477       case EIO:
478       case ENETDOWN:
479       case ENETUNREACH:
480       case ENOSPC:
481       case EPIPE:
482         get_poll_info().clear_flags(PollFlags::Write());
483         get_poll_info().add_flags(PollFlags::Close());
484         return std::move(error);
485     }
486   }
487   Result<size_t> read(MutableSlice slice) {
488     if (get_poll_info().get_flags_local().has_pending_error()) {
489       TRY_STATUS(get_pending_error());
490     }
491     int native_fd = get_native_fd().socket();
492     CHECK(!slice.empty());
493     auto read_res = detail::skip_eintr([&] { return ::read(native_fd, slice.begin(), slice.size()); });
494     auto read_errno = errno;
495     if (read_res >= 0) {
496       if (read_res == 0) {
497         errno = 0;
498         get_poll_info().clear_flags(PollFlags::Read());
499         get_poll_info().add_flags(PollFlags::Close());
500       }
501       auto result = narrow_cast<size_t>(read_res);
502       CHECK(result <= slice.size());
503       return result;
504     }
505     if (read_errno == EAGAIN
506 #if EAGAIN != EWOULDBLOCK
507         || read_errno == EWOULDBLOCK
508 #endif
509     ) {
510       get_poll_info().clear_flags(PollFlags::Read());
511       return 0;
512     }
513     auto error = Status::PosixError(read_errno, PSLICE() << "Read from " << get_native_fd() << " has failed");
514     switch (read_errno) {
515       case EISDIR:
516       case EBADF:
517       case ENXIO:
518       case EFAULT:
519       case EINVAL:
520         LOG(FATAL) << error;
521         UNREACHABLE();
522       default:
523         LOG(WARNING) << error;
524       // fallthrough
525       case ENOTCONN:
526       case EIO:
527       case ENOBUFS:
528       case ENOMEM:
529       case ECONNRESET:
530       case ETIMEDOUT:
531         get_poll_info().clear_flags(PollFlags::Read());
532         get_poll_info().add_flags(PollFlags::Close());
533         return std::move(error);
534     }
535   }
536   Status get_pending_error() {
537     if (!get_poll_info().get_flags_local().has_pending_error()) {
538       return Status::OK();
539     }
540     TRY_STATUS(detail::get_socket_pending_error(get_native_fd()));
541     get_poll_info().clear_flags(PollFlags::Error());
542     return Status::OK();
543   }
544 };
545 
546 void SocketFdImplDeleter::operator()(SocketFdImpl *impl) {
547   delete impl;
548 }
549 
550 #endif
551 
552 #if TD_PORT_POSIX
get_socket_pending_error(const NativeFd & fd)553 Status get_socket_pending_error(const NativeFd &fd) {
554   int error = 0;
555   socklen_t errlen = sizeof(error);
556   if (getsockopt(fd.socket(), SOL_SOCKET, SO_ERROR, static_cast<void *>(&error), &errlen) == 0) {
557     if (error == 0) {
558       return Status::OK();
559     }
560     return Status::PosixError(error, PSLICE() << "Error on " << fd);
561   }
562   auto status = OS_SOCKET_ERROR(PSLICE() << "Can't load error on socket " << fd);
563   LOG(INFO) << "Can't load pending socket error: " << status;
564   return status;
565 }
566 #elif TD_PORT_WINDOWS
get_socket_pending_error(const NativeFd & fd,WSAOVERLAPPED * overlapped,Status iocp_error)567 Status get_socket_pending_error(const NativeFd &fd, WSAOVERLAPPED *overlapped, Status iocp_error) {
568   // We need to call WSAGetOverlappedResult() just so WSAGetLastError() will return the correct error. See
569   // https://stackoverflow.com/questions/28925003/calling-wsagetlasterror-from-an-iocp-thread-return-incorrect-result
570   DWORD num_bytes = 0;
571   DWORD flags = 0;
572   BOOL success = WSAGetOverlappedResult(fd.socket(), overlapped, &num_bytes, false, &flags);
573   if (success) {
574     LOG(ERROR) << "WSAGetOverlappedResult succeded after " << iocp_error;
575     return iocp_error;
576   }
577   return OS_SOCKET_ERROR(PSLICE() << "Error on " << fd);
578 }
579 #endif
580 
init_socket_options(NativeFd & native_fd)581 Status init_socket_options(NativeFd &native_fd) {
582   TRY_STATUS(native_fd.set_is_blocking_unsafe(false));
583 
584   auto sock = native_fd.socket();
585 #if TD_PORT_POSIX
586   int flags = 1;
587 #elif TD_PORT_WINDOWS
588   BOOL flags = TRUE;
589 #endif
590   setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&flags), sizeof(flags));
591   setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char *>(&flags), sizeof(flags));
592   setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<const char *>(&flags), sizeof(flags));
593 #if TD_PORT_POSIX
594 #ifndef MSG_NOSIGNAL  // Darwin
595 
596 #ifdef SO_NOSIGPIPE
597   setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, reinterpret_cast<const char *>(&flags), sizeof(flags));
598 #else
599 #warning "Failed to suppress SIGPIPE signals. Use signal(SIGPIPE, SIG_IGN) to suppress them."
600 #endif
601 
602 #endif
603 #endif
604   // TODO: SO_REUSEADDR, SO_KEEPALIVE, TCP_NODELAY, SO_SNDBUF, SO_RCVBUF, TCP_QUICKACK, SO_LINGER
605 
606   return Status::OK();
607 }
608 
609 }  // namespace detail
610 
611 SocketFd::SocketFd() = default;
612 SocketFd::SocketFd(SocketFd &&) noexcept = default;
613 SocketFd &SocketFd::operator=(SocketFd &&) noexcept = default;
614 SocketFd::~SocketFd() = default;
615 
SocketFd(unique_ptr<detail::SocketFdImpl> impl)616 SocketFd::SocketFd(unique_ptr<detail::SocketFdImpl> impl) : impl_(impl.release()) {
617 }
618 
from_native_fd(NativeFd fd)619 Result<SocketFd> SocketFd::from_native_fd(NativeFd fd) {
620   TRY_STATUS(detail::init_socket_options(fd));
621   return SocketFd(make_unique<detail::SocketFdImpl>(std::move(fd)));
622 }
623 
open(const IPAddress & address)624 Result<SocketFd> SocketFd::open(const IPAddress &address) {
625 #if TD_DARWIN_WATCH_OS
626   return SocketFd{};
627 #endif
628 
629   NativeFd native_fd{socket(address.get_address_family(), SOCK_STREAM, IPPROTO_TCP)};
630   if (!native_fd) {
631     return OS_SOCKET_ERROR("Failed to create a socket");
632   }
633   TRY_STATUS(detail::init_socket_options(native_fd));
634 
635 #if TD_PORT_POSIX
636   int e_connect =
637       connect(native_fd.socket(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len()));
638   if (e_connect == -1) {
639     auto connect_errno = errno;
640     if (connect_errno != EINPROGRESS) {
641       return Status::PosixError(connect_errno, PSLICE() << "Failed to connect to " << address);
642     }
643   }
644   return SocketFd(make_unique<detail::SocketFdImpl>(std::move(native_fd)));
645 #elif TD_PORT_WINDOWS
646   auto bind_addr = address.get_any_addr();
647   auto e_bind = bind(native_fd.socket(), bind_addr.get_sockaddr(), narrow_cast<int>(bind_addr.get_sockaddr_len()));
648   if (e_bind != 0) {
649     return OS_SOCKET_ERROR("Failed to bind a socket");
650   }
651   return SocketFd(make_unique<detail::SocketFdImpl>(std::move(native_fd), address));
652 #endif
653 }
654 
close()655 void SocketFd::close() {
656   impl_.reset();
657 }
658 
empty() const659 bool SocketFd::empty() const {
660   return !impl_;
661 }
662 
get_poll_info()663 PollableFdInfo &SocketFd::get_poll_info() {
664   CHECK(!empty());
665   return impl_->get_poll_info();
666 }
get_poll_info() const667 const PollableFdInfo &SocketFd::get_poll_info() const {
668   CHECK(!empty());
669   return impl_->get_poll_info();
670 }
671 
get_native_fd() const672 const NativeFd &SocketFd::get_native_fd() const {
673   CHECK(!empty());
674   return impl_->get_native_fd();
675 }
676 
get_pending_error()677 Status SocketFd::get_pending_error() {
678   CHECK(!empty());
679   return impl_->get_pending_error();
680 }
681 
write(Slice slice)682 Result<size_t> SocketFd::write(Slice slice) {
683   CHECK(!empty());
684   return impl_->write(slice);
685 }
686 
writev(Span<IoSlice> slices)687 Result<size_t> SocketFd::writev(Span<IoSlice> slices) {
688   CHECK(!empty());
689   return impl_->writev(slices);
690 }
691 
read(MutableSlice slice)692 Result<size_t> SocketFd::read(MutableSlice slice) {
693   CHECK(!empty());
694   return impl_->read(slice);
695 }
696 
697 }  // namespace td
698