1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
22
23 #include <ares.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/log_windows.h>
28 #include <grpc/support/string_util.h>
29 #include <grpc/support/time.h>
30 #include <string.h>
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/iomgr/combiner.h"
34 #include "src/core/lib/iomgr/iocp_windows.h"
35 #include "src/core/lib/iomgr/sockaddr_utils.h"
36 #include "src/core/lib/iomgr/sockaddr_windows.h"
37 #include "src/core/lib/iomgr/socket_windows.h"
38 #include "src/core/lib/iomgr/tcp_windows.h"
39 #include "src/core/lib/slice/slice_internal.h"
40
41 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
42 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
43
44 /* TODO(apolcyn): remove this hack after fixing upstream.
45 * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
46 * which uses "struct iovec" type, which on Windows is defined inside of
47 * a c-ares header that is not public.
48 * See https://github.com/c-ares/c-ares/issues/206. */
49 struct iovec {
50 void* iov_base;
51 size_t iov_len;
52 };
53
54 namespace grpc_core {
55
56 /* c-ares reads and takes action on the error codes of the
57 * "virtual socket operations" in this file, via the WSAGetLastError
58 * APIs. If code in this file wants to set a specific WSA error that
59 * c-ares should read, it must do so by calling SetWSAError() on the
60 * WSAErrorContext instance passed to it. A WSAErrorContext must only be
61 * instantiated at the top of the virtual socket function callstack. */
62 class WSAErrorContext {
63 public:
WSAErrorContext()64 explicit WSAErrorContext(){};
65
~WSAErrorContext()66 ~WSAErrorContext() {
67 if (error_ != 0) {
68 WSASetLastError(error_);
69 }
70 }
71
72 /* Disallow copy and assignment operators */
73 WSAErrorContext(const WSAErrorContext&) = delete;
74 WSAErrorContext& operator=(const WSAErrorContext&) = delete;
75
SetWSAError(int error)76 void SetWSAError(int error) { error_ = error; }
77
78 private:
79 int error_ = 0;
80 };
81
82 /* c-ares creates its own sockets and is meant to read them when readable and
83 * write them when writeable. To fit this socket usage model into the grpc
84 * windows poller (which gives notifications when attempted reads and writes are
85 * actually fulfilled rather than possible), this GrpcPolledFdWindows class
86 * takes advantage of the ares_set_socket_functions API and acts as a virtual
87 * socket. It holds its own read and write buffers which are written to and read
88 * from c-ares and are used with the grpc windows poller, and it, e.g.,
89 * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
90 * library to wait for an async read. */
91 class GrpcPolledFdWindows : public GrpcPolledFd {
92 public:
93 enum WriteState {
94 WRITE_IDLE,
95 WRITE_REQUESTED,
96 WRITE_PENDING,
97 WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
98 };
99
GrpcPolledFdWindows(ares_socket_t as,grpc_combiner * combiner,int address_family,int socket_type)100 GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner,
101 int address_family, int socket_type)
102 : read_buf_(grpc_empty_slice()),
103 write_buf_(grpc_empty_slice()),
104 tcp_write_state_(WRITE_IDLE),
105 gotten_into_driver_list_(false),
106 address_family_(address_family),
107 socket_type_(socket_type) {
108 gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
109 winsocket_ = grpc_winsocket_create(as, name_);
110 combiner_ = GRPC_COMBINER_REF(combiner, name_);
111 GRPC_CLOSURE_INIT(&outer_read_closure_,
112 &GrpcPolledFdWindows::OnIocpReadable, this,
113 grpc_combiner_scheduler(combiner_));
114 GRPC_CLOSURE_INIT(&outer_write_closure_,
115 &GrpcPolledFdWindows::OnIocpWriteable, this,
116 grpc_combiner_scheduler(combiner_));
117 GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
118 &GrpcPolledFdWindows::OnTcpConnectLocked, this,
119 grpc_combiner_scheduler(combiner_));
120 GRPC_CLOSURE_INIT(&continue_register_for_on_readable_locked_,
121 &GrpcPolledFdWindows::ContinueRegisterForOnReadableLocked,
122 this, grpc_combiner_scheduler(combiner_));
123 GRPC_CLOSURE_INIT(
124 &continue_register_for_on_writeable_locked_,
125 &GrpcPolledFdWindows::ContinueRegisterForOnWriteableLocked, this,
126 grpc_combiner_scheduler(combiner_));
127 }
128
~GrpcPolledFdWindows()129 ~GrpcPolledFdWindows() {
130 GRPC_COMBINER_UNREF(combiner_, name_);
131 grpc_slice_unref_internal(read_buf_);
132 grpc_slice_unref_internal(write_buf_);
133 GPR_ASSERT(read_closure_ == nullptr);
134 GPR_ASSERT(write_closure_ == nullptr);
135 grpc_winsocket_destroy(winsocket_);
136 gpr_free(name_);
137 }
138
ScheduleAndNullReadClosure(grpc_error * error)139 void ScheduleAndNullReadClosure(grpc_error* error) {
140 GRPC_CLOSURE_SCHED(read_closure_, error);
141 read_closure_ = nullptr;
142 }
143
ScheduleAndNullWriteClosure(grpc_error * error)144 void ScheduleAndNullWriteClosure(grpc_error* error) {
145 GRPC_CLOSURE_SCHED(write_closure_, error);
146 write_closure_ = nullptr;
147 }
148
RegisterForOnReadableLocked(grpc_closure * read_closure)149 void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
150 GPR_ASSERT(read_closure_ == nullptr);
151 read_closure_ = read_closure;
152 GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
153 grpc_slice_unref_internal(read_buf_);
154 GPR_ASSERT(!read_buf_has_data_);
155 read_buf_ = GRPC_SLICE_MALLOC(4192);
156 if (connect_done_) {
157 GRPC_CLOSURE_SCHED(&continue_register_for_on_readable_locked_,
158 GRPC_ERROR_NONE);
159 } else {
160 GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == nullptr);
161 pending_continue_register_for_on_readable_locked_ =
162 &continue_register_for_on_readable_locked_;
163 }
164 }
165
ContinueRegisterForOnReadableLocked(void * arg,grpc_error * unused_error)166 static void ContinueRegisterForOnReadableLocked(void* arg,
167 grpc_error* unused_error) {
168 GrpcPolledFdWindows* grpc_polled_fd =
169 static_cast<GrpcPolledFdWindows*>(arg);
170 grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE);
171 }
172
InnerContinueRegisterForOnReadableLocked(grpc_error * unused_error)173 void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) {
174 GRPC_CARES_TRACE_LOG(
175 "fd:|%s| InnerContinueRegisterForOnReadableLocked "
176 "wsa_connect_error_:%d",
177 GetName(), wsa_connect_error_);
178 GPR_ASSERT(connect_done_);
179 if (wsa_connect_error_ != 0) {
180 ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
181 return;
182 }
183 WSABUF buffer;
184 buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
185 buffer.len = GRPC_SLICE_LENGTH(read_buf_);
186 memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
187 recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
188 DWORD flags = 0;
189 if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
190 nullptr, &flags, (sockaddr*)recv_from_source_addr_,
191 &recv_from_source_addr_len_,
192 &winsocket_->read_info.overlapped, nullptr)) {
193 int wsa_last_error = WSAGetLastError();
194 char* msg = gpr_format_message(wsa_last_error);
195 GRPC_CARES_TRACE_LOG(
196 "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
197 "msg:|%s|",
198 GetName(), wsa_last_error, msg);
199 gpr_free(msg);
200 if (wsa_last_error != WSA_IO_PENDING) {
201 ScheduleAndNullReadClosure(
202 GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
203 return;
204 }
205 }
206 grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
207 }
208
RegisterForOnWriteableLocked(grpc_closure * write_closure)209 void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
210 if (socket_type_ == SOCK_DGRAM) {
211 GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
212 GetName());
213 } else {
214 GPR_ASSERT(socket_type_ == SOCK_STREAM);
215 GRPC_CARES_TRACE_LOG(
216 "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d",
217 GetName(), tcp_write_state_);
218 }
219 GPR_ASSERT(write_closure_ == nullptr);
220 write_closure_ = write_closure;
221 if (connect_done_) {
222 GRPC_CLOSURE_SCHED(&continue_register_for_on_writeable_locked_,
223 GRPC_ERROR_NONE);
224 } else {
225 GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == nullptr);
226 pending_continue_register_for_on_writeable_locked_ =
227 &continue_register_for_on_writeable_locked_;
228 }
229 }
230
ContinueRegisterForOnWriteableLocked(void * arg,grpc_error * unused_error)231 static void ContinueRegisterForOnWriteableLocked(void* arg,
232 grpc_error* unused_error) {
233 GrpcPolledFdWindows* grpc_polled_fd =
234 static_cast<GrpcPolledFdWindows*>(arg);
235 grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE);
236 }
237
InnerContinueRegisterForOnWriteableLocked(grpc_error * unused_error)238 void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) {
239 GRPC_CARES_TRACE_LOG(
240 "fd:|%s| InnerContinueRegisterForOnWriteableLocked "
241 "wsa_connect_error_:%d",
242 GetName(), wsa_connect_error_);
243 GPR_ASSERT(connect_done_);
244 if (wsa_connect_error_ != 0) {
245 ScheduleAndNullWriteClosure(
246 GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
247 return;
248 }
249 if (socket_type_ == SOCK_DGRAM) {
250 ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
251 } else {
252 GPR_ASSERT(socket_type_ == SOCK_STREAM);
253 int wsa_error_code = 0;
254 switch (tcp_write_state_) {
255 case WRITE_IDLE:
256 ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
257 break;
258 case WRITE_REQUESTED:
259 tcp_write_state_ = WRITE_PENDING;
260 if (SendWriteBuf(nullptr, &winsocket_->write_info.overlapped,
261 &wsa_error_code) != 0) {
262 ScheduleAndNullWriteClosure(
263 GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
264 } else {
265 grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
266 }
267 break;
268 case WRITE_PENDING:
269 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
270 abort();
271 }
272 }
273 }
274
IsFdStillReadableLocked()275 bool IsFdStillReadableLocked() override {
276 return GRPC_SLICE_LENGTH(read_buf_) > 0;
277 }
278
ShutdownLocked(grpc_error * error)279 void ShutdownLocked(grpc_error* error) override {
280 grpc_winsocket_shutdown(winsocket_);
281 }
282
GetWrappedAresSocketLocked()283 ares_socket_t GetWrappedAresSocketLocked() override {
284 return grpc_winsocket_wrapped_socket(winsocket_);
285 }
286
GetName()287 const char* GetName() override { return name_; }
288
RecvFrom(WSAErrorContext * wsa_error_ctx,void * data,ares_socket_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len)289 ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
290 ares_socket_t data_len, int flags,
291 struct sockaddr* from, ares_socklen_t* from_len) {
292 GRPC_CARES_TRACE_LOG(
293 "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
294 "length:|%d|",
295 GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
296 if (!read_buf_has_data_) {
297 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
298 return -1;
299 }
300 ares_ssize_t bytes_read = 0;
301 for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
302 ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
303 bytes_read++;
304 }
305 read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
306 GRPC_SLICE_LENGTH(read_buf_));
307 if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
308 read_buf_has_data_ = false;
309 }
310 /* c-ares overloads this recv_from virtual socket function to receive
311 * data on both UDP and TCP sockets, and from is nullptr for TCP. */
312 if (from != nullptr) {
313 GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
314 memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
315 *from_len = recv_from_source_addr_len_;
316 }
317 return bytes_read;
318 }
319
FlattenIovec(const struct iovec * iov,int iov_count)320 grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
321 int total = 0;
322 for (int i = 0; i < iov_count; i++) {
323 total += iov[i].iov_len;
324 }
325 grpc_slice out = GRPC_SLICE_MALLOC(total);
326 size_t cur = 0;
327 for (int i = 0; i < iov_count; i++) {
328 for (int k = 0; k < iov[i].iov_len; k++) {
329 GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
330 }
331 }
332 return out;
333 }
334
SendWriteBuf(LPDWORD bytes_sent_ptr,LPWSAOVERLAPPED overlapped,int * wsa_error_code)335 int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
336 int* wsa_error_code) {
337 WSABUF buf;
338 buf.len = GRPC_SLICE_LENGTH(write_buf_);
339 buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
340 DWORD flags = 0;
341 int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
342 bytes_sent_ptr, flags, overlapped, nullptr);
343 *wsa_error_code = WSAGetLastError();
344 GRPC_CARES_TRACE_LOG(
345 "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
346 "overlapped:%p "
347 "return:%d *wsa_error_code:%d",
348 GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
349 overlapped, out, *wsa_error_code);
350 return out;
351 }
352
SendV(WSAErrorContext * wsa_error_ctx,const struct iovec * iov,int iov_count)353 ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
354 int iov_count) {
355 GRPC_CARES_TRACE_LOG(
356 "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
357 GetName(), connect_done_, wsa_connect_error_);
358 if (!connect_done_) {
359 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
360 return -1;
361 }
362 if (wsa_connect_error_ != 0) {
363 wsa_error_ctx->SetWSAError(wsa_connect_error_);
364 return -1;
365 }
366 switch (socket_type_) {
367 case SOCK_DGRAM:
368 return SendVUDP(wsa_error_ctx, iov, iov_count);
369 case SOCK_STREAM:
370 return SendVTCP(wsa_error_ctx, iov, iov_count);
371 default:
372 abort();
373 }
374 }
375
SendVUDP(WSAErrorContext * wsa_error_ctx,const struct iovec * iov,int iov_count)376 ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
377 int iov_count) {
378 // c-ares doesn't handle retryable errors on writes of UDP sockets.
379 // Therefore, the sendv handler for UDP sockets must only attempt
380 // to write everything inline.
381 GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
382 GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
383 grpc_slice_unref_internal(write_buf_);
384 write_buf_ = FlattenIovec(iov, iov_count);
385 DWORD bytes_sent = 0;
386 int wsa_error_code = 0;
387 if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
388 wsa_error_ctx->SetWSAError(wsa_error_code);
389 char* msg = gpr_format_message(wsa_error_code);
390 GRPC_CARES_TRACE_LOG(
391 "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
392 wsa_error_code, msg);
393 gpr_free(msg);
394 return -1;
395 }
396 write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
397 GRPC_SLICE_LENGTH(write_buf_));
398 return bytes_sent;
399 }
400
SendVTCP(WSAErrorContext * wsa_error_ctx,const struct iovec * iov,int iov_count)401 ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
402 int iov_count) {
403 // The "sendv" handler on TCP sockets buffers up write
404 // requests and returns an artifical WSAEWOULDBLOCK. Writing that buffer out
405 // in the background, and making further send progress in general, will
406 // happen as long as c-ares continues to show interest in writeability on
407 // this fd.
408 GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
409 GetName(), tcp_write_state_);
410 switch (tcp_write_state_) {
411 case WRITE_IDLE:
412 tcp_write_state_ = WRITE_REQUESTED;
413 GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
414 grpc_slice_unref_internal(write_buf_);
415 write_buf_ = FlattenIovec(iov, iov_count);
416 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
417 return -1;
418 case WRITE_REQUESTED:
419 case WRITE_PENDING:
420 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
421 return -1;
422 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
423 // c-ares is retrying a send on data that we previously returned
424 // WSAEWOULDBLOCK for, but then subsequently wrote out in the
425 // background. Right now, we assume that c-ares is retrying the same
426 // send again. If c-ares still needs to send even more data, we'll get
427 // to it eventually.
428 grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
429 GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
430 GRPC_SLICE_LENGTH(write_buf_));
431 ares_ssize_t total_sent = 0;
432 for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
433 GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
434 GRPC_SLICE_START_PTR(write_buf_)[i]);
435 total_sent++;
436 }
437 grpc_slice_unref_internal(currently_attempted);
438 tcp_write_state_ = WRITE_IDLE;
439 return total_sent;
440 }
441 abort();
442 }
443
OnTcpConnectLocked(void * arg,grpc_error * error)444 static void OnTcpConnectLocked(void* arg, grpc_error* error) {
445 GrpcPolledFdWindows* grpc_polled_fd =
446 static_cast<GrpcPolledFdWindows*>(arg);
447 grpc_polled_fd->InnerOnTcpConnectLocked(error);
448 }
449
InnerOnTcpConnectLocked(grpc_error * error)450 void InnerOnTcpConnectLocked(grpc_error* error) {
451 GRPC_CARES_TRACE_LOG(
452 "fd:%s InnerOnTcpConnectLocked error:|%s| "
453 "pending_register_for_readable:%" PRIdPTR
454 " pending_register_for_writeable:%" PRIdPTR,
455 GetName(), grpc_error_string(error),
456 pending_continue_register_for_on_readable_locked_,
457 pending_continue_register_for_on_writeable_locked_);
458 GPR_ASSERT(!connect_done_);
459 connect_done_ = true;
460 GPR_ASSERT(wsa_connect_error_ == 0);
461 if (error == GRPC_ERROR_NONE) {
462 DWORD transferred_bytes = 0;
463 DWORD flags;
464 BOOL wsa_success =
465 WSAGetOverlappedResult(grpc_winsocket_wrapped_socket(winsocket_),
466 &winsocket_->write_info.overlapped,
467 &transferred_bytes, FALSE, &flags);
468 GPR_ASSERT(transferred_bytes == 0);
469 if (!wsa_success) {
470 wsa_connect_error_ = WSAGetLastError();
471 char* msg = gpr_format_message(wsa_connect_error_);
472 GRPC_CARES_TRACE_LOG(
473 "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
474 "msg:|%s|",
475 GetName(), wsa_connect_error_, msg);
476 gpr_free(msg);
477 }
478 } else {
479 // Spoof up an error code that will cause any future c-ares operations on
480 // this fd to abort.
481 wsa_connect_error_ = WSA_OPERATION_ABORTED;
482 }
483 if (pending_continue_register_for_on_readable_locked_ != nullptr) {
484 GRPC_CLOSURE_SCHED(pending_continue_register_for_on_readable_locked_,
485 GRPC_ERROR_NONE);
486 }
487 if (pending_continue_register_for_on_writeable_locked_ != nullptr) {
488 GRPC_CLOSURE_SCHED(pending_continue_register_for_on_writeable_locked_,
489 GRPC_ERROR_NONE);
490 }
491 }
492
Connect(WSAErrorContext * wsa_error_ctx,const struct sockaddr * target,ares_socklen_t target_len)493 int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
494 ares_socklen_t target_len) {
495 switch (socket_type_) {
496 case SOCK_DGRAM:
497 return ConnectUDP(wsa_error_ctx, target, target_len);
498 case SOCK_STREAM:
499 return ConnectTCP(wsa_error_ctx, target, target_len);
500 default:
501 abort();
502 }
503 }
504
ConnectUDP(WSAErrorContext * wsa_error_ctx,const struct sockaddr * target,ares_socklen_t target_len)505 int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
506 ares_socklen_t target_len) {
507 GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
508 GPR_ASSERT(!connect_done_);
509 GPR_ASSERT(wsa_connect_error_ == 0);
510 SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
511 int out =
512 WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
513 wsa_connect_error_ = WSAGetLastError();
514 wsa_error_ctx->SetWSAError(wsa_connect_error_);
515 connect_done_ = true;
516 char* msg = gpr_format_message(wsa_connect_error_);
517 GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
518 wsa_connect_error_, msg);
519 gpr_free(msg);
520 // c-ares expects a posix-style connect API
521 return out == 0 ? 0 : -1;
522 }
523
ConnectTCP(WSAErrorContext * wsa_error_ctx,const struct sockaddr * target,ares_socklen_t target_len)524 int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
525 ares_socklen_t target_len) {
526 GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
527 LPFN_CONNECTEX ConnectEx;
528 GUID guid = WSAID_CONNECTEX;
529 DWORD ioctl_num_bytes;
530 SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
531 if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
532 &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
533 nullptr) != 0) {
534 int wsa_last_error = WSAGetLastError();
535 wsa_error_ctx->SetWSAError(wsa_last_error);
536 char* msg = gpr_format_message(wsa_last_error);
537 GRPC_CARES_TRACE_LOG(
538 "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
539 "msg:|%s|",
540 GetName(), wsa_last_error, msg);
541 gpr_free(msg);
542 connect_done_ = true;
543 wsa_connect_error_ = wsa_last_error;
544 return -1;
545 }
546 grpc_resolved_address wildcard4_addr;
547 grpc_resolved_address wildcard6_addr;
548 grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
549 grpc_resolved_address* local_address = nullptr;
550 if (address_family_ == AF_INET) {
551 local_address = &wildcard4_addr;
552 } else {
553 local_address = &wildcard6_addr;
554 }
555 if (bind(s, (struct sockaddr*)local_address->addr,
556 (int)local_address->len) != 0) {
557 int wsa_last_error = WSAGetLastError();
558 wsa_error_ctx->SetWSAError(wsa_last_error);
559 char* msg = gpr_format_message(wsa_last_error);
560 GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
561 wsa_last_error, msg);
562 gpr_free(msg);
563 connect_done_ = true;
564 wsa_connect_error_ = wsa_last_error;
565 return -1;
566 }
567 int out = 0;
568 if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
569 &winsocket_->write_info.overlapped) == 0) {
570 out = -1;
571 int wsa_last_error = WSAGetLastError();
572 wsa_error_ctx->SetWSAError(wsa_last_error);
573 char* msg = gpr_format_message(wsa_last_error);
574 GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
575 wsa_last_error, msg);
576 gpr_free(msg);
577 if (wsa_last_error == WSA_IO_PENDING) {
578 // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
579 // connect, but an async connect on IOCP socket will give
580 // WSA_IO_PENDING, so we need to convert.
581 wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
582 } else {
583 // By returning a non-retryable error to c-ares at this point,
584 // we're aborting the possibility of any future operations on this fd.
585 connect_done_ = true;
586 wsa_connect_error_ = wsa_last_error;
587 return -1;
588 }
589 }
590 grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
591 return out;
592 }
593
OnIocpReadable(void * arg,grpc_error * error)594 static void OnIocpReadable(void* arg, grpc_error* error) {
595 GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
596 polled_fd->OnIocpReadableInner(error);
597 }
598
599 // TODO(apolcyn): improve this error handling to be less conversative.
600 // An e.g. ECONNRESET error here should result in errors when
601 // c-ares reads from this socket later, but it shouldn't necessarily cancel
602 // the entire resolution attempt. Doing so will allow the "inject broken
603 // nameserver list" test to pass on Windows.
OnIocpReadableInner(grpc_error * error)604 void OnIocpReadableInner(grpc_error* error) {
605 if (error == GRPC_ERROR_NONE) {
606 if (winsocket_->read_info.wsa_error != 0) {
607 /* WSAEMSGSIZE would be due to receiving more data
608 * than our read buffer's fixed capacity. Assume that
609 * the connection is TCP and read the leftovers
610 * in subsequent c-ares reads. */
611 if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
612 GRPC_ERROR_UNREF(error);
613 error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
614 "OnIocpReadableInner");
615 GRPC_CARES_TRACE_LOG(
616 "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
617 "code:|%d| msg:|%s|",
618 GetName(), winsocket_->read_info.wsa_error,
619 grpc_error_string(error));
620 }
621 }
622 }
623 if (error == GRPC_ERROR_NONE) {
624 read_buf_ = grpc_slice_sub_no_ref(
625 read_buf_, 0, winsocket_->read_info.bytes_transferred);
626 read_buf_has_data_ = true;
627 } else {
628 grpc_slice_unref_internal(read_buf_);
629 read_buf_ = grpc_empty_slice();
630 }
631 GRPC_CARES_TRACE_LOG(
632 "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
633 GRPC_SLICE_LENGTH(read_buf_));
634 ScheduleAndNullReadClosure(error);
635 }
636
OnIocpWriteable(void * arg,grpc_error * error)637 static void OnIocpWriteable(void* arg, grpc_error* error) {
638 GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
639 polled_fd->OnIocpWriteableInner(error);
640 }
641
OnIocpWriteableInner(grpc_error * error)642 void OnIocpWriteableInner(grpc_error* error) {
643 GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
644 GPR_ASSERT(socket_type_ == SOCK_STREAM);
645 if (error == GRPC_ERROR_NONE) {
646 if (winsocket_->write_info.wsa_error != 0) {
647 GRPC_ERROR_UNREF(error);
648 error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
649 "OnIocpWriteableInner");
650 GRPC_CARES_TRACE_LOG(
651 "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
652 "code:|%d| msg:|%s|",
653 GetName(), winsocket_->write_info.wsa_error,
654 grpc_error_string(error));
655 }
656 }
657 GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
658 if (error == GRPC_ERROR_NONE) {
659 tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
660 write_buf_ = grpc_slice_sub_no_ref(
661 write_buf_, 0, winsocket_->write_info.bytes_transferred);
662 GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
663 GetName(), winsocket_->write_info.bytes_transferred);
664 } else {
665 grpc_slice_unref_internal(write_buf_);
666 write_buf_ = grpc_empty_slice();
667 }
668 ScheduleAndNullWriteClosure(error);
669 }
670
gotten_into_driver_list() const671 bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
set_gotten_into_driver_list()672 void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
673
674 grpc_combiner* combiner_;
675 char recv_from_source_addr_[200];
676 ares_socklen_t recv_from_source_addr_len_;
677 grpc_slice read_buf_;
678 bool read_buf_has_data_ = false;
679 grpc_slice write_buf_;
680 grpc_closure* read_closure_ = nullptr;
681 grpc_closure* write_closure_ = nullptr;
682 grpc_closure outer_read_closure_;
683 grpc_closure outer_write_closure_;
684 grpc_winsocket* winsocket_;
685 // tcp_write_state_ is only used on TCP GrpcPolledFds
686 WriteState tcp_write_state_;
687 char* name_ = nullptr;
688 bool gotten_into_driver_list_;
689 int address_family_;
690 int socket_type_;
691 grpc_closure on_tcp_connect_locked_;
692 bool connect_done_ = false;
693 int wsa_connect_error_ = 0;
694 // We don't run register_for_{readable,writeable} logic until
695 // a socket is connected. In the interim, we queue readable/writeable
696 // registrations with the following state.
697 grpc_closure continue_register_for_on_readable_locked_;
698 grpc_closure continue_register_for_on_writeable_locked_;
699 grpc_closure* pending_continue_register_for_on_readable_locked_ = nullptr;
700 grpc_closure* pending_continue_register_for_on_writeable_locked_ = nullptr;
701 };
702
703 struct SockToPolledFdEntry {
SockToPolledFdEntrygrpc_core::SockToPolledFdEntry704 SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
705 : socket(s), polled_fd(fd) {}
706 SOCKET socket;
707 GrpcPolledFdWindows* polled_fd;
708 SockToPolledFdEntry* next = nullptr;
709 };
710
711 /* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
712 * to GrpcPolledFdWindow's, and is used to find the appropriate
713 * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
714 * socket call on the ares_socket_t type. Instances are owned by and one-to-one
715 * with a GrpcPolledFdWindows factory and event driver */
716 class SockToPolledFdMap {
717 public:
SockToPolledFdMap(grpc_combiner * combiner)718 SockToPolledFdMap(grpc_combiner* combiner) {
719 combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
720 }
721
~SockToPolledFdMap()722 ~SockToPolledFdMap() {
723 GPR_ASSERT(head_ == nullptr);
724 GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
725 }
726
AddNewSocket(SOCKET s,GrpcPolledFdWindows * polled_fd)727 void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
728 SockToPolledFdEntry* new_node = New<SockToPolledFdEntry>(s, polled_fd);
729 new_node->next = head_;
730 head_ = new_node;
731 }
732
LookupPolledFd(SOCKET s)733 GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
734 for (SockToPolledFdEntry* node = head_; node != nullptr;
735 node = node->next) {
736 if (node->socket == s) {
737 GPR_ASSERT(node->polled_fd != nullptr);
738 return node->polled_fd;
739 }
740 }
741 abort();
742 }
743
RemoveEntry(SOCKET s)744 void RemoveEntry(SOCKET s) {
745 GPR_ASSERT(head_ != nullptr);
746 SockToPolledFdEntry** prev = &head_;
747 for (SockToPolledFdEntry* node = head_; node != nullptr;
748 node = node->next) {
749 if (node->socket == s) {
750 *prev = node->next;
751 Delete(node);
752 return;
753 }
754 prev = &node->next;
755 }
756 abort();
757 }
758
759 /* These virtual socket functions are called from within the c-ares
760 * library. These methods generally dispatch those socket calls to the
761 * appropriate methods. The virtual "socket" and "close" methods are
762 * special and instead create/add and remove/destroy GrpcPolledFdWindows
763 * objects.
764 */
Socket(int af,int type,int protocol,void * user_data)765 static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
766 if (type != SOCK_DGRAM && type != SOCK_STREAM) {
767 GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
768 return INVALID_SOCKET;
769 }
770 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
771 SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
772 grpc_get_default_wsa_socket_flags());
773 if (s == INVALID_SOCKET) {
774 GRPC_CARES_TRACE_LOG(
775 "WSASocket failed with params af:%d type:%d protocol:%d", af, type,
776 protocol);
777 return s;
778 }
779 grpc_tcp_set_non_block(s);
780 GrpcPolledFdWindows* polled_fd =
781 New<GrpcPolledFdWindows>(s, map->combiner_, af, type);
782 GRPC_CARES_TRACE_LOG(
783 "fd:|%s| created with params af:%d type:%d protocol:%d",
784 polled_fd->GetName(), af, type, protocol);
785 map->AddNewSocket(s, polled_fd);
786 return s;
787 }
788
Connect(ares_socket_t as,const struct sockaddr * target,ares_socklen_t target_len,void * user_data)789 static int Connect(ares_socket_t as, const struct sockaddr* target,
790 ares_socklen_t target_len, void* user_data) {
791 WSAErrorContext wsa_error_ctx;
792 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
793 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
794 return polled_fd->Connect(&wsa_error_ctx, target, target_len);
795 }
796
SendV(ares_socket_t as,const struct iovec * iov,int iovec_count,void * user_data)797 static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
798 int iovec_count, void* user_data) {
799 WSAErrorContext wsa_error_ctx;
800 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
801 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
802 return polled_fd->SendV(&wsa_error_ctx, iov, iovec_count);
803 }
804
RecvFrom(ares_socket_t as,void * data,size_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len,void * user_data)805 static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
806 int flags, struct sockaddr* from,
807 ares_socklen_t* from_len, void* user_data) {
808 WSAErrorContext wsa_error_ctx;
809 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
810 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
811 return polled_fd->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
812 from_len);
813 }
814
CloseSocket(SOCKET s,void * user_data)815 static int CloseSocket(SOCKET s, void* user_data) {
816 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
817 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
818 map->RemoveEntry(s);
819 // If a gRPC polled fd has not made it in to the driver's list yet, then
820 // the driver has not and will never see this socket.
821 if (!polled_fd->gotten_into_driver_list()) {
822 polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
823 "Shut down c-ares fd before without it ever having made it into the "
824 "driver's list"));
825 return 0;
826 }
827 return 0;
828 }
829
830 private:
831 SockToPolledFdEntry* head_ = nullptr;
832 grpc_combiner* combiner_;
833 };
834
835 const struct ares_socket_functions custom_ares_sock_funcs = {
836 &SockToPolledFdMap::Socket /* socket */,
837 &SockToPolledFdMap::CloseSocket /* close */,
838 &SockToPolledFdMap::Connect /* connect */,
839 &SockToPolledFdMap::RecvFrom /* recvfrom */,
840 &SockToPolledFdMap::SendV /* sendv */,
841 };
842
843 class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
844 public:
GrpcPolledFdFactoryWindows(grpc_combiner * combiner)845 GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
846 : sock_to_polled_fd_map_(combiner) {}
847
NewGrpcPolledFdLocked(ares_socket_t as,grpc_pollset_set * driver_pollset_set,grpc_combiner * combiner)848 GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
849 grpc_pollset_set* driver_pollset_set,
850 grpc_combiner* combiner) override {
851 GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
852 // Set a flag so that the virtual socket "close" method knows it
853 // doesn't need to call ShutdownLocked, since now the driver will.
854 polled_fd->set_gotten_into_driver_list();
855 return polled_fd;
856 }
857
ConfigureAresChannelLocked(ares_channel channel)858 void ConfigureAresChannelLocked(ares_channel channel) override {
859 ares_set_socket_functions(channel, &custom_ares_sock_funcs,
860 &sock_to_polled_fd_map_);
861 }
862
863 private:
864 SockToPolledFdMap sock_to_polled_fd_map_;
865 };
866
NewGrpcPolledFdFactory(grpc_combiner * combiner)867 UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
868 return UniquePtr<GrpcPolledFdFactory>(
869 New<GrpcPolledFdFactoryWindows>(combiner));
870 }
871
872 } // namespace grpc_core
873
874 #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */
875