1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/utils/port/SocketFd.h"
8
9 #include "td/utils/common.h"
10 #include "td/utils/format.h"
11 #include "td/utils/logging.h"
12 #include "td/utils/misc.h"
13 #include "td/utils/port/detail/skip_eintr.h"
14 #include "td/utils/port/PollFlags.h"
15 #include "td/utils/SliceBuilder.h"
16
17 #if TD_PORT_WINDOWS
18 #include "td/utils/buffer.h"
19 #include "td/utils/port/detail/Iocp.h"
20 #include "td/utils/SpinLock.h"
21 #include "td/utils/VectorQueue.h"
22
23 #include <limits>
24 #endif
25
26 #if TD_PORT_POSIX
27 #include <cerrno>
28
29 #include <arpa/inet.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <netinet/tcp.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <unistd.h>
36 #endif
37
38 #include <atomic>
39 #include <cstring>
40
41 namespace td {
42 namespace detail {
43 #if TD_PORT_WINDOWS
44 class SocketFdImpl final : private Iocp::Callback {
45 public:
SocketFdImpl(NativeFd native_fd)46 explicit SocketFdImpl(NativeFd native_fd) : info(std::move(native_fd)) {
47 VLOG(fd) << get_native_fd() << " create from native_fd";
48 get_poll_info().add_flags(PollFlags::Write());
49 Iocp::get()->subscribe(get_native_fd(), this);
50 is_read_active_ = true;
51 notify_iocp_connected();
52 }
53
SocketFdImpl(NativeFd native_fd,const IPAddress & addr)54 SocketFdImpl(NativeFd native_fd, const IPAddress &addr) : info(std::move(native_fd)) {
55 VLOG(fd) << get_native_fd() << " create from native_fd and connect";
56 get_poll_info().add_flags(PollFlags::Write());
57 Iocp::get()->subscribe(get_native_fd(), this);
58 LPFN_CONNECTEX ConnectExPtr = nullptr;
59 GUID guid = WSAID_CONNECTEX;
60 DWORD numBytes;
61 auto error =
62 ::WSAIoctl(get_native_fd().socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, static_cast<void *>(&guid),
63 sizeof(guid), static_cast<void *>(&ConnectExPtr), sizeof(ConnectExPtr), &numBytes, nullptr, nullptr);
64 if (error) {
65 on_error(OS_SOCKET_ERROR("WSAIoctl failed"));
66 return;
67 }
68 std::memset(&read_overlapped_, 0, sizeof(read_overlapped_));
69 inc_refcnt();
70 is_read_active_ = true;
71 auto status = ConnectExPtr(get_native_fd().socket(), addr.get_sockaddr(), narrow_cast<int>(addr.get_sockaddr_len()),
72 nullptr, 0, nullptr, &read_overlapped_);
73
74 if (status == TRUE || !check_status("Failed to connect")) {
75 is_read_active_ = false;
76 dec_refcnt();
77 }
78 }
79
close()80 void close() {
81 if (!is_write_waiting_) {
82 VLOG(fd) << get_native_fd() << " will close after ongoing write";
83 auto lock = lock_.lock();
84 need_close_after_write_ = true;
85 return;
86 }
87 notify_iocp_close();
88 }
89
get_poll_info()90 PollableFdInfo &get_poll_info() {
91 return info;
92 }
get_poll_info() const93 const PollableFdInfo &get_poll_info() const {
94 return info;
95 }
96
get_native_fd() const97 const NativeFd &get_native_fd() const {
98 return info.native_fd();
99 }
100
write(Slice data)101 Result<size_t> write(Slice data) {
102 // LOG(ERROR) << "Write: " << format::as_hex_dump<0>(data);
103 output_writer_.append(data);
104 return write_finish(data.size());
105 }
106
writev(Span<IoSlice> slices)107 Result<size_t> writev(Span<IoSlice> slices) {
108 size_t total_size = 0;
109 for (auto io_slice : slices) {
110 auto size = as_slice(io_slice).size();
111 CHECK(size <= std::numeric_limits<size_t>::max() - total_size);
112 total_size += size;
113 }
114
115 auto left_size = total_size;
116 for (auto io_slice : slices) {
117 auto slice = as_slice(io_slice);
118 output_writer_.append(slice, left_size);
119 left_size -= slice.size();
120 }
121
122 return write_finish(total_size);
123 }
124
write_finish(size_t total_size)125 Result<size_t> write_finish(size_t total_size) {
126 if (is_write_waiting_) {
127 auto lock = lock_.lock();
128 is_write_waiting_ = false;
129 lock.reset();
130 notify_iocp_write();
131 }
132 return total_size;
133 }
134
read(MutableSlice slice)135 Result<size_t> read(MutableSlice slice) {
136 if (get_poll_info().get_flags_local().has_pending_error()) {
137 TRY_STATUS(get_pending_error());
138 }
139 input_reader_.sync_with_writer();
140 auto res = input_reader_.advance(td::min(slice.size(), input_reader_.size()), slice);
141 if (res == 0) {
142 get_poll_info().clear_flags(PollFlags::Read());
143 } else {
144 // LOG(ERROR) << "Read: " << format::as_hex_dump<0>(Slice(slice.substr(0, res)));
145 }
146 return res;
147 }
148
get_pending_error()149 Status get_pending_error() {
150 Status res;
151 {
152 auto lock = lock_.lock();
153 if (!pending_errors_.empty()) {
154 res = pending_errors_.pop();
155 }
156 if (res.is_ok()) {
157 get_poll_info().clear_flags(PollFlags::Error());
158 }
159 }
160 return res;
161 }
162
163 private:
164 PollableFdInfo info;
165 SpinLock lock_;
166
167 std::atomic<int> refcnt_{1};
168 bool close_flag_{false};
169 bool need_close_after_write_{false};
170
171 bool is_connected_{false};
172 bool is_read_active_{false};
173 ChainBufferWriter input_writer_;
174 ChainBufferReader input_reader_ = input_writer_.extract_reader();
175 WSAOVERLAPPED read_overlapped_;
176 VectorQueue<Status> pending_errors_;
177
178 bool is_write_active_{false};
179 std::atomic<bool> is_write_waiting_{false};
180 ChainBufferWriter output_writer_;
181 ChainBufferReader output_reader_ = output_writer_.extract_reader();
182 WSAOVERLAPPED write_overlapped_;
183
184 char close_overlapped_;
185
check_status(Slice message)186 bool check_status(Slice message) {
187 auto last_error = WSAGetLastError();
188 if (last_error == ERROR_IO_PENDING) {
189 return true;
190 }
191 on_error(OS_SOCKET_ERROR(message));
192 return false;
193 }
194
loop_read()195 void loop_read() {
196 CHECK(is_connected_);
197 CHECK(!is_read_active_);
198 if (close_flag_ || need_close_after_write_) {
199 return;
200 }
201 std::memset(&read_overlapped_, 0, sizeof(read_overlapped_));
202 auto dest = input_writer_.prepare_append();
203 WSABUF buf;
204 buf.len = narrow_cast<ULONG>(dest.size());
205 buf.buf = dest.data();
206 DWORD flags = 0;
207 int status = WSARecv(get_native_fd().socket(), &buf, 1, nullptr, &flags, &read_overlapped_, nullptr);
208 if (status == 0 || check_status("Failed to read from connection")) {
209 inc_refcnt();
210 is_read_active_ = true;
211 }
212 }
213
loop_write()214 void loop_write() {
215 CHECK(is_connected_);
216 CHECK(!is_write_active_);
217
218 output_reader_.sync_with_writer();
219 auto to_write = output_reader_.prepare_read();
220 if (to_write.empty()) {
221 auto lock = lock_.lock();
222 output_reader_.sync_with_writer();
223 to_write = output_reader_.prepare_read();
224 if (to_write.empty()) {
225 is_write_waiting_ = true;
226 if (need_close_after_write_) {
227 notify_iocp_close();
228 }
229 return;
230 }
231 }
232 if (to_write.empty()) {
233 return;
234 }
235 std::memset(&write_overlapped_, 0, sizeof(write_overlapped_));
236 constexpr size_t BUF_SIZE = 20;
237 WSABUF buf[BUF_SIZE];
238 auto it = output_reader_.clone();
239 size_t buf_i;
240 for (buf_i = 0; buf_i < BUF_SIZE; buf_i++) {
241 auto src = it.prepare_read();
242 if (src.empty()) {
243 break;
244 }
245 buf[buf_i].len = narrow_cast<ULONG>(src.size());
246 buf[buf_i].buf = const_cast<CHAR *>(src.data());
247 it.confirm_read(src.size());
248 }
249 int status =
250 WSASend(get_native_fd().socket(), buf, narrow_cast<DWORD>(buf_i), nullptr, 0, &write_overlapped_, nullptr);
251 if (status == 0 || check_status("Failed to write to connection")) {
252 inc_refcnt();
253 is_write_active_ = true;
254 }
255 }
256
on_iocp(Result<size_t> r_size,WSAOVERLAPPED * overlapped)257 void on_iocp(Result<size_t> r_size, WSAOVERLAPPED *overlapped) final {
258 // called from other thread
259 if (dec_refcnt() || close_flag_) {
260 VLOG(fd) << "Ignore IOCP (socket is closing)";
261 return;
262 }
263 if (r_size.is_error()) {
264 return on_error(get_socket_pending_error(get_native_fd(), overlapped, r_size.move_as_error()));
265 }
266
267 if (!is_connected_ && overlapped == &read_overlapped_) {
268 return on_connected();
269 }
270
271 auto size = r_size.move_as_ok();
272 if (overlapped == &write_overlapped_) {
273 return on_write(size);
274 }
275 if (overlapped == nullptr) {
276 CHECK(size == 0);
277 return on_write(size);
278 }
279
280 if (overlapped == &read_overlapped_) {
281 return on_read(size);
282 }
283 if (overlapped == reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_)) {
284 return on_close();
285 }
286 UNREACHABLE();
287 }
288
on_error(Status status)289 void on_error(Status status) {
290 VLOG(fd) << get_native_fd() << " on error " << status;
291 {
292 auto lock = lock_.lock();
293 pending_errors_.push(std::move(status));
294 }
295 get_poll_info().add_flags_from_poll(PollFlags::Error());
296 }
297
on_connected()298 void on_connected() {
299 VLOG(fd) << get_native_fd() << " on connected";
300 CHECK(!is_connected_);
301 CHECK(is_read_active_);
302 is_connected_ = true;
303 is_read_active_ = false;
304 loop_read();
305 loop_write();
306 }
307
on_read(size_t size)308 void on_read(size_t size) {
309 VLOG(fd) << get_native_fd() << " on read " << size;
310 CHECK(is_read_active_);
311 is_read_active_ = false;
312 if (size == 0) {
313 get_poll_info().add_flags_from_poll(PollFlags::Close());
314 return;
315 }
316 input_writer_.confirm_append(size);
317 get_poll_info().add_flags_from_poll(PollFlags::Read());
318 loop_read();
319 }
320
on_write(size_t size)321 void on_write(size_t size) {
322 VLOG(fd) << get_native_fd() << " on write " << size;
323 if (size == 0) {
324 if (is_write_active_) {
325 return;
326 }
327 is_write_active_ = true;
328 }
329 CHECK(is_write_active_);
330 is_write_active_ = false;
331 output_reader_.advance(size);
332 loop_write();
333 }
334
on_close()335 void on_close() {
336 VLOG(fd) << get_native_fd() << " on close";
337 close_flag_ = true;
338 info.set_native_fd({});
339 }
dec_refcnt()340 bool dec_refcnt() {
341 VLOG(fd) << get_native_fd() << " dec_refcnt from " << refcnt_;
342 if (--refcnt_ == 0) {
343 delete this;
344 return true;
345 }
346 return false;
347 }
inc_refcnt()348 void inc_refcnt() {
349 CHECK(refcnt_ != 0);
350 refcnt_++;
351 VLOG(fd) << get_native_fd() << " inc_refcnt to " << refcnt_;
352 }
353
notify_iocp_write()354 void notify_iocp_write() {
355 inc_refcnt();
356 Iocp::get()->post(0, this, nullptr);
357 }
notify_iocp_close()358 void notify_iocp_close() {
359 Iocp::get()->post(0, this, reinterpret_cast<WSAOVERLAPPED *>(&close_overlapped_));
360 }
notify_iocp_connected()361 void notify_iocp_connected() {
362 inc_refcnt();
363 Iocp::get()->post(0, this, &read_overlapped_);
364 }
365 };
366
operator ()(SocketFdImpl * impl)367 void SocketFdImplDeleter::operator()(SocketFdImpl *impl) {
368 impl->close();
369 }
370
371 class InitWSA {
372 public:
InitWSA()373 InitWSA() {
374 /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
375 WORD wVersionRequested = MAKEWORD(2, 2);
376 WSADATA wsaData;
377 if (WSAStartup(wVersionRequested, &wsaData) != 0) {
378 auto error = OS_SOCKET_ERROR("Failed to init WSA");
379 LOG(FATAL) << error;
380 }
381 }
382 };
383
384 static InitWSA init_wsa;
385
386 #else
387 class SocketFdImpl {
388 public:
389 PollableFdInfo info;
390 explicit SocketFdImpl(NativeFd fd) : info(std::move(fd)) {
391 }
392 PollableFdInfo &get_poll_info() {
393 return info;
394 }
395 const PollableFdInfo &get_poll_info() const {
396 return info;
397 }
398
399 const NativeFd &get_native_fd() const {
400 return info.native_fd();
401 }
402
403 Result<size_t> writev(Span<IoSlice> slices) {
404 int native_fd = get_native_fd().socket();
405 TRY_RESULT(slices_size, narrow_cast_safe<int>(slices.size()));
406 auto write_res = detail::skip_eintr([&] {
407 // sendmsg can erroneously return 2^32 - 1 on Android 5.1 and Android 6.0, so it must not be used there
408 #if defined(MSG_NOSIGNAL) && !TD_ANDROID
409 msghdr msg;
410 std::memset(&msg, 0, sizeof(msg));
411 msg.msg_iov = const_cast<iovec *>(slices.begin());
412 msg.msg_iovlen = slices_size;
413 return sendmsg(native_fd, &msg, MSG_NOSIGNAL);
414 #else
415 return ::writev(native_fd, slices.begin(), slices_size);
416 #endif
417 });
418 if (write_res >= 0) {
419 auto result = narrow_cast<size_t>(write_res);
420 auto left = result;
421 for (const auto &slice : slices) {
422 if (left <= slice.iov_len) {
423 return result;
424 }
425 left -= slice.iov_len;
426 }
427 LOG(FATAL) << "Receive " << write_res << " as writev response, but tried to write only " << result - left
428 << " bytes";
429 }
430 return write_finish();
431 }
432
433 Result<size_t> write(Slice slice) {
434 int native_fd = get_native_fd().socket();
435 auto write_res = detail::skip_eintr([&] {
436 return
437 #ifdef MSG_NOSIGNAL
438 send(native_fd, slice.begin(), slice.size(), MSG_NOSIGNAL);
439 #else
440 ::write(native_fd, slice.begin(), slice.size());
441 #endif
442 });
443 if (write_res >= 0) {
444 auto result = narrow_cast<size_t>(write_res);
445 LOG_CHECK(result <= slice.size()) << "Receive " << write_res << " as write response, but tried to write only "
446 << slice.size() << " bytes";
447 return result;
448 }
449 return write_finish();
450 }
451
452 Result<size_t> write_finish() {
453 auto write_errno = errno;
454 if (write_errno == EAGAIN
455 #if EAGAIN != EWOULDBLOCK
456 || write_errno == EWOULDBLOCK
457 #endif
458 ) {
459 get_poll_info().clear_flags(PollFlags::Write());
460 return 0;
461 }
462
463 auto error = Status::PosixError(write_errno, PSLICE() << "Write to " << get_native_fd() << " has failed");
464 switch (write_errno) {
465 case EBADF:
466 case ENXIO:
467 case EFAULT:
468 case EINVAL:
469 LOG(FATAL) << error;
470 UNREACHABLE();
471 default:
472 LOG(WARNING) << error;
473 // fallthrough
474 case ECONNRESET:
475 case EDQUOT:
476 case EFBIG:
477 case EIO:
478 case ENETDOWN:
479 case ENETUNREACH:
480 case ENOSPC:
481 case EPIPE:
482 get_poll_info().clear_flags(PollFlags::Write());
483 get_poll_info().add_flags(PollFlags::Close());
484 return std::move(error);
485 }
486 }
487 Result<size_t> read(MutableSlice slice) {
488 if (get_poll_info().get_flags_local().has_pending_error()) {
489 TRY_STATUS(get_pending_error());
490 }
491 int native_fd = get_native_fd().socket();
492 CHECK(!slice.empty());
493 auto read_res = detail::skip_eintr([&] { return ::read(native_fd, slice.begin(), slice.size()); });
494 auto read_errno = errno;
495 if (read_res >= 0) {
496 if (read_res == 0) {
497 errno = 0;
498 get_poll_info().clear_flags(PollFlags::Read());
499 get_poll_info().add_flags(PollFlags::Close());
500 }
501 auto result = narrow_cast<size_t>(read_res);
502 CHECK(result <= slice.size());
503 return result;
504 }
505 if (read_errno == EAGAIN
506 #if EAGAIN != EWOULDBLOCK
507 || read_errno == EWOULDBLOCK
508 #endif
509 ) {
510 get_poll_info().clear_flags(PollFlags::Read());
511 return 0;
512 }
513 auto error = Status::PosixError(read_errno, PSLICE() << "Read from " << get_native_fd() << " has failed");
514 switch (read_errno) {
515 case EISDIR:
516 case EBADF:
517 case ENXIO:
518 case EFAULT:
519 case EINVAL:
520 LOG(FATAL) << error;
521 UNREACHABLE();
522 default:
523 LOG(WARNING) << error;
524 // fallthrough
525 case ENOTCONN:
526 case EIO:
527 case ENOBUFS:
528 case ENOMEM:
529 case ECONNRESET:
530 case ETIMEDOUT:
531 get_poll_info().clear_flags(PollFlags::Read());
532 get_poll_info().add_flags(PollFlags::Close());
533 return std::move(error);
534 }
535 }
536 Status get_pending_error() {
537 if (!get_poll_info().get_flags_local().has_pending_error()) {
538 return Status::OK();
539 }
540 TRY_STATUS(detail::get_socket_pending_error(get_native_fd()));
541 get_poll_info().clear_flags(PollFlags::Error());
542 return Status::OK();
543 }
544 };
545
546 void SocketFdImplDeleter::operator()(SocketFdImpl *impl) {
547 delete impl;
548 }
549
550 #endif
551
552 #if TD_PORT_POSIX
get_socket_pending_error(const NativeFd & fd)553 Status get_socket_pending_error(const NativeFd &fd) {
554 int error = 0;
555 socklen_t errlen = sizeof(error);
556 if (getsockopt(fd.socket(), SOL_SOCKET, SO_ERROR, static_cast<void *>(&error), &errlen) == 0) {
557 if (error == 0) {
558 return Status::OK();
559 }
560 return Status::PosixError(error, PSLICE() << "Error on " << fd);
561 }
562 auto status = OS_SOCKET_ERROR(PSLICE() << "Can't load error on socket " << fd);
563 LOG(INFO) << "Can't load pending socket error: " << status;
564 return status;
565 }
566 #elif TD_PORT_WINDOWS
get_socket_pending_error(const NativeFd & fd,WSAOVERLAPPED * overlapped,Status iocp_error)567 Status get_socket_pending_error(const NativeFd &fd, WSAOVERLAPPED *overlapped, Status iocp_error) {
568 // We need to call WSAGetOverlappedResult() just so WSAGetLastError() will return the correct error. See
569 // https://stackoverflow.com/questions/28925003/calling-wsagetlasterror-from-an-iocp-thread-return-incorrect-result
570 DWORD num_bytes = 0;
571 DWORD flags = 0;
572 BOOL success = WSAGetOverlappedResult(fd.socket(), overlapped, &num_bytes, false, &flags);
573 if (success) {
574 LOG(ERROR) << "WSAGetOverlappedResult succeded after " << iocp_error;
575 return iocp_error;
576 }
577 return OS_SOCKET_ERROR(PSLICE() << "Error on " << fd);
578 }
579 #endif
580
init_socket_options(NativeFd & native_fd)581 Status init_socket_options(NativeFd &native_fd) {
582 TRY_STATUS(native_fd.set_is_blocking_unsafe(false));
583
584 auto sock = native_fd.socket();
585 #if TD_PORT_POSIX
586 int flags = 1;
587 #elif TD_PORT_WINDOWS
588 BOOL flags = TRUE;
589 #endif
590 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&flags), sizeof(flags));
591 setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char *>(&flags), sizeof(flags));
592 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<const char *>(&flags), sizeof(flags));
593 #if TD_PORT_POSIX
594 #ifndef MSG_NOSIGNAL // Darwin
595
596 #ifdef SO_NOSIGPIPE
597 setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, reinterpret_cast<const char *>(&flags), sizeof(flags));
598 #else
599 #warning "Failed to suppress SIGPIPE signals. Use signal(SIGPIPE, SIG_IGN) to suppress them."
600 #endif
601
602 #endif
603 #endif
604 // TODO: SO_REUSEADDR, SO_KEEPALIVE, TCP_NODELAY, SO_SNDBUF, SO_RCVBUF, TCP_QUICKACK, SO_LINGER
605
606 return Status::OK();
607 }
608
609 } // namespace detail
610
611 SocketFd::SocketFd() = default;
612 SocketFd::SocketFd(SocketFd &&) noexcept = default;
613 SocketFd &SocketFd::operator=(SocketFd &&) noexcept = default;
614 SocketFd::~SocketFd() = default;
615
SocketFd(unique_ptr<detail::SocketFdImpl> impl)616 SocketFd::SocketFd(unique_ptr<detail::SocketFdImpl> impl) : impl_(impl.release()) {
617 }
618
from_native_fd(NativeFd fd)619 Result<SocketFd> SocketFd::from_native_fd(NativeFd fd) {
620 TRY_STATUS(detail::init_socket_options(fd));
621 return SocketFd(make_unique<detail::SocketFdImpl>(std::move(fd)));
622 }
623
open(const IPAddress & address)624 Result<SocketFd> SocketFd::open(const IPAddress &address) {
625 #if TD_DARWIN_WATCH_OS
626 return SocketFd{};
627 #endif
628
629 NativeFd native_fd{socket(address.get_address_family(), SOCK_STREAM, IPPROTO_TCP)};
630 if (!native_fd) {
631 return OS_SOCKET_ERROR("Failed to create a socket");
632 }
633 TRY_STATUS(detail::init_socket_options(native_fd));
634
635 #if TD_PORT_POSIX
636 int e_connect =
637 connect(native_fd.socket(), address.get_sockaddr(), narrow_cast<socklen_t>(address.get_sockaddr_len()));
638 if (e_connect == -1) {
639 auto connect_errno = errno;
640 if (connect_errno != EINPROGRESS) {
641 return Status::PosixError(connect_errno, PSLICE() << "Failed to connect to " << address);
642 }
643 }
644 return SocketFd(make_unique<detail::SocketFdImpl>(std::move(native_fd)));
645 #elif TD_PORT_WINDOWS
646 auto bind_addr = address.get_any_addr();
647 auto e_bind = bind(native_fd.socket(), bind_addr.get_sockaddr(), narrow_cast<int>(bind_addr.get_sockaddr_len()));
648 if (e_bind != 0) {
649 return OS_SOCKET_ERROR("Failed to bind a socket");
650 }
651 return SocketFd(make_unique<detail::SocketFdImpl>(std::move(native_fd), address));
652 #endif
653 }
654
close()655 void SocketFd::close() {
656 impl_.reset();
657 }
658
empty() const659 bool SocketFd::empty() const {
660 return !impl_;
661 }
662
get_poll_info()663 PollableFdInfo &SocketFd::get_poll_info() {
664 CHECK(!empty());
665 return impl_->get_poll_info();
666 }
get_poll_info() const667 const PollableFdInfo &SocketFd::get_poll_info() const {
668 CHECK(!empty());
669 return impl_->get_poll_info();
670 }
671
get_native_fd() const672 const NativeFd &SocketFd::get_native_fd() const {
673 CHECK(!empty());
674 return impl_->get_native_fd();
675 }
676
get_pending_error()677 Status SocketFd::get_pending_error() {
678 CHECK(!empty());
679 return impl_->get_pending_error();
680 }
681
write(Slice slice)682 Result<size_t> SocketFd::write(Slice slice) {
683 CHECK(!empty());
684 return impl_->write(slice);
685 }
686
writev(Span<IoSlice> slices)687 Result<size_t> SocketFd::writev(Span<IoSlice> slices) {
688 CHECK(!empty());
689 return impl_->writev(slices);
690 }
691
read(MutableSlice slice)692 Result<size_t> SocketFd::read(MutableSlice slice) {
693 CHECK(!empty());
694 return impl_->read(slice);
695 }
696
697 } // namespace td
698