1 /* $NetBSD: bufferevent_async.c,v 1.1.1.1 2013/04/11 16:43:28 christos Exp $ */ 2 /* 3 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 4 * 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "event2/event-config.h" 31 #include <sys/cdefs.h> 32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.1 2013/04/11 16:43:28 christos Exp $"); 33 34 #ifdef _EVENT_HAVE_SYS_TIME_H 35 #include <sys/time.h> 36 #endif 37 38 #include <errno.h> 39 #include <stdio.h> 40 #include <stdlib.h> 41 #include <string.h> 42 #ifdef _EVENT_HAVE_STDARG_H 43 #include <stdarg.h> 44 #endif 45 #ifdef _EVENT_HAVE_UNISTD_H 46 #include <unistd.h> 47 #endif 48 49 #ifdef WIN32 50 #include <winsock2.h> 51 #include <ws2tcpip.h> 52 #endif 53 54 #include <sys/queue.h> 55 56 #include "event2/util.h" 57 #include "event2/bufferevent.h" 58 #include "event2/buffer.h" 59 #include "event2/bufferevent_struct.h" 60 #include "event2/event.h" 61 #include "event2/util.h" 62 #include "event-internal.h" 63 #include "log-internal.h" 64 #include "mm-internal.h" 65 #include "bufferevent-internal.h" 66 #include "util-internal.h" 67 #include "iocp-internal.h" 68 69 #ifndef SO_UPDATE_CONNECT_CONTEXT 70 /* Mingw is sometimes missing this */ 71 #define SO_UPDATE_CONNECT_CONTEXT 0x7010 72 #endif 73 74 /* prototypes */ 75 static int be_async_enable(struct bufferevent *, short); 76 static int be_async_disable(struct bufferevent *, short); 77 static void be_async_destruct(struct bufferevent *); 78 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 79 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 80 81 struct bufferevent_async { 82 struct bufferevent_private bev; 83 struct event_overlapped connect_overlapped; 84 struct event_overlapped read_overlapped; 85 struct event_overlapped write_overlapped; 86 size_t read_in_progress; 87 size_t write_in_progress; 88 unsigned ok : 1; 89 unsigned read_added : 1; 90 unsigned write_added : 1; 91 }; 92 93 const struct bufferevent_ops bufferevent_ops_async = { 94 "socket_async", 95 evutil_offsetof(struct bufferevent_async, bev.bev), 96 be_async_enable, 97 be_async_disable, 98 be_async_destruct, 99 _bufferevent_generic_adj_timeouts, 100 be_async_flush, 101 be_async_ctrl, 102 }; 103 104 static inline struct bufferevent_async * 105 upcast(struct bufferevent *bev) 106 { 107 struct bufferevent_async *bev_a; 108 if (bev->be_ops != &bufferevent_ops_async) 109 return NULL; 110 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 111 return bev_a; 112 } 113 114 static inline struct bufferevent_async * 115 upcast_connect(struct event_overlapped *eo) 116 { 117 struct bufferevent_async *bev_a; 118 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 119 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 120 return bev_a; 121 } 122 123 static inline struct bufferevent_async * 124 upcast_read(struct event_overlapped *eo) 125 { 126 struct bufferevent_async *bev_a; 127 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 128 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 129 return bev_a; 130 } 131 132 static inline struct bufferevent_async * 133 upcast_write(struct event_overlapped *eo) 134 { 135 struct bufferevent_async *bev_a; 136 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 137 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 138 return bev_a; 139 } 140 141 static void 142 bev_async_del_write(struct bufferevent_async *beva) 143 { 144 struct bufferevent *bev = &beva->bev.bev; 145 146 if (beva->write_added) { 147 beva->write_added = 0; 148 event_base_del_virtual(bev->ev_base); 149 } 150 } 151 152 static void 153 bev_async_del_read(struct bufferevent_async *beva) 154 { 155 struct bufferevent *bev = &beva->bev.bev; 156 157 if (beva->read_added) { 158 beva->read_added = 0; 159 event_base_del_virtual(bev->ev_base); 160 } 161 } 162 163 static void 164 bev_async_add_write(struct bufferevent_async *beva) 165 { 166 struct bufferevent *bev = &beva->bev.bev; 167 168 if (!beva->write_added) { 169 beva->write_added = 1; 170 event_base_add_virtual(bev->ev_base); 171 } 172 } 173 174 static void 175 bev_async_add_read(struct bufferevent_async *beva) 176 { 177 struct bufferevent *bev = &beva->bev.bev; 178 179 if (!beva->read_added) { 180 beva->read_added = 1; 181 event_base_add_virtual(bev->ev_base); 182 } 183 } 184 185 static void 186 bev_async_consider_writing(struct bufferevent_async *beva) 187 { 188 size_t at_most; 189 int limit; 190 struct bufferevent *bev = &beva->bev.bev; 191 192 /* Don't write if there's a write in progress, or we do not 193 * want to write, or when there's nothing left to write. */ 194 if (beva->write_in_progress || beva->bev.connecting) 195 return; 196 if (!beva->ok || !(bev->enabled&EV_WRITE) || 197 !evbuffer_get_length(bev->output)) { 198 bev_async_del_write(beva); 199 return; 200 } 201 202 at_most = evbuffer_get_length(bev->output); 203 204 /* This is safe so long as bufferevent_get_write_max never returns 205 * more than INT_MAX. That's true for now. XXXX */ 206 limit = (int)_bufferevent_get_write_max(&beva->bev); 207 if (at_most >= (size_t)limit && limit >= 0) 208 at_most = limit; 209 210 if (beva->bev.write_suspended) { 211 bev_async_del_write(beva); 212 return; 213 } 214 215 /* XXXX doesn't respect low-water mark very well. */ 216 bufferevent_incref(bev); 217 if (evbuffer_launch_write(bev->output, at_most, 218 &beva->write_overlapped)) { 219 bufferevent_decref(bev); 220 beva->ok = 0; 221 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 222 } else { 223 beva->write_in_progress = at_most; 224 _bufferevent_decrement_write_buckets(&beva->bev, at_most); 225 bev_async_add_write(beva); 226 } 227 } 228 229 static void 230 bev_async_consider_reading(struct bufferevent_async *beva) 231 { 232 size_t cur_size; 233 size_t read_high; 234 size_t at_most; 235 int limit; 236 struct bufferevent *bev = &beva->bev.bev; 237 238 /* Don't read if there is a read in progress, or we do not 239 * want to read. */ 240 if (beva->read_in_progress || beva->bev.connecting) 241 return; 242 if (!beva->ok || !(bev->enabled&EV_READ)) { 243 bev_async_del_read(beva); 244 return; 245 } 246 247 /* Don't read if we're full */ 248 cur_size = evbuffer_get_length(bev->input); 249 read_high = bev->wm_read.high; 250 if (read_high) { 251 if (cur_size >= read_high) { 252 bev_async_del_read(beva); 253 return; 254 } 255 at_most = read_high - cur_size; 256 } else { 257 at_most = 16384; /* FIXME totally magic. */ 258 } 259 260 /* XXXX This over-commits. */ 261 /* XXXX see also not above on cast on _bufferevent_get_write_max() */ 262 limit = (int)_bufferevent_get_read_max(&beva->bev); 263 if (at_most >= (size_t)limit && limit >= 0) 264 at_most = limit; 265 266 if (beva->bev.read_suspended) { 267 bev_async_del_read(beva); 268 return; 269 } 270 271 bufferevent_incref(bev); 272 if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { 273 beva->ok = 0; 274 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 275 bufferevent_decref(bev); 276 } else { 277 beva->read_in_progress = at_most; 278 _bufferevent_decrement_read_buckets(&beva->bev, at_most); 279 bev_async_add_read(beva); 280 } 281 282 return; 283 } 284 285 static void 286 be_async_outbuf_callback(struct evbuffer *buf, 287 const struct evbuffer_cb_info *cbinfo, 288 void *arg) 289 { 290 struct bufferevent *bev = arg; 291 struct bufferevent_async *bev_async = upcast(bev); 292 293 /* If we added data to the outbuf and were not writing before, 294 * we may want to write now. */ 295 296 _bufferevent_incref_and_lock(bev); 297 298 if (cbinfo->n_added) 299 bev_async_consider_writing(bev_async); 300 301 _bufferevent_decref_and_unlock(bev); 302 } 303 304 static void 305 be_async_inbuf_callback(struct evbuffer *buf, 306 const struct evbuffer_cb_info *cbinfo, 307 void *arg) 308 { 309 struct bufferevent *bev = arg; 310 struct bufferevent_async *bev_async = upcast(bev); 311 312 /* If we drained data from the inbuf and were not reading before, 313 * we may want to read now */ 314 315 _bufferevent_incref_and_lock(bev); 316 317 if (cbinfo->n_deleted) 318 bev_async_consider_reading(bev_async); 319 320 _bufferevent_decref_and_unlock(bev); 321 } 322 323 static int 324 be_async_enable(struct bufferevent *buf, short what) 325 { 326 struct bufferevent_async *bev_async = upcast(buf); 327 328 if (!bev_async->ok) 329 return -1; 330 331 if (bev_async->bev.connecting) { 332 /* Don't launch anything during connection attempts. */ 333 return 0; 334 } 335 336 if (what & EV_READ) 337 BEV_RESET_GENERIC_READ_TIMEOUT(buf); 338 if (what & EV_WRITE) 339 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 340 341 /* If we newly enable reading or writing, and we aren't reading or 342 writing already, consider launching a new read or write. */ 343 344 if (what & EV_READ) 345 bev_async_consider_reading(bev_async); 346 if (what & EV_WRITE) 347 bev_async_consider_writing(bev_async); 348 return 0; 349 } 350 351 static int 352 be_async_disable(struct bufferevent *bev, short what) 353 { 354 struct bufferevent_async *bev_async = upcast(bev); 355 /* XXXX If we disable reading or writing, we may want to consider 356 * canceling any in-progress read or write operation, though it might 357 * not work. */ 358 359 if (what & EV_READ) { 360 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 361 bev_async_del_read(bev_async); 362 } 363 if (what & EV_WRITE) { 364 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 365 bev_async_del_write(bev_async); 366 } 367 368 return 0; 369 } 370 371 static void 372 be_async_destruct(struct bufferevent *bev) 373 { 374 struct bufferevent_async *bev_async = upcast(bev); 375 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 376 evutil_socket_t fd; 377 378 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 379 !upcast(bev)->read_in_progress); 380 381 bev_async_del_read(bev_async); 382 bev_async_del_write(bev_async); 383 384 fd = _evbuffer_overlapped_get_fd(bev->input); 385 if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { 386 /* XXXX possible double-close */ 387 evutil_closesocket(fd); 388 } 389 /* delete this in case non-blocking connect was used */ 390 if (event_initialized(&bev->ev_write)) { 391 event_del(&bev->ev_write); 392 _bufferevent_del_generic_timeout_cbs(bev); 393 } 394 } 395 396 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 397 * we use WSAGetOverlappedResult to translate. */ 398 static void 399 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 400 { 401 DWORD bytes, flags; 402 evutil_socket_t fd; 403 404 fd = _evbuffer_overlapped_get_fd(bev->input); 405 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 406 } 407 408 static int 409 be_async_flush(struct bufferevent *bev, short what, 410 enum bufferevent_flush_mode mode) 411 { 412 return 0; 413 } 414 415 static void 416 connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 417 ev_ssize_t nbytes, int ok) 418 { 419 struct bufferevent_async *bev_a = upcast_connect(eo); 420 struct bufferevent *bev = &bev_a->bev.bev; 421 evutil_socket_t sock; 422 423 BEV_LOCK(bev); 424 425 EVUTIL_ASSERT(bev_a->bev.connecting); 426 bev_a->bev.connecting = 0; 427 sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input); 428 /* XXXX Handle error? */ 429 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 430 431 if (ok) 432 bufferevent_async_set_connected(bev); 433 else 434 bev_async_set_wsa_error(bev, eo); 435 436 _bufferevent_run_eventcb(bev, 437 ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); 438 439 event_base_del_virtual(bev->ev_base); 440 441 _bufferevent_decref_and_unlock(bev); 442 } 443 444 static void 445 read_complete(struct event_overlapped *eo, ev_uintptr_t key, 446 ev_ssize_t nbytes, int ok) 447 { 448 struct bufferevent_async *bev_a = upcast_read(eo); 449 struct bufferevent *bev = &bev_a->bev.bev; 450 short what = BEV_EVENT_READING; 451 ev_ssize_t amount_unread; 452 BEV_LOCK(bev); 453 EVUTIL_ASSERT(bev_a->read_in_progress); 454 455 amount_unread = bev_a->read_in_progress - nbytes; 456 evbuffer_commit_read(bev->input, nbytes); 457 bev_a->read_in_progress = 0; 458 if (amount_unread) 459 _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); 460 461 if (!ok) 462 bev_async_set_wsa_error(bev, eo); 463 464 if (bev_a->ok) { 465 if (ok && nbytes) { 466 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 467 if (evbuffer_get_length(bev->input) >= bev->wm_read.low) 468 _bufferevent_run_readcb(bev); 469 bev_async_consider_reading(bev_a); 470 } else if (!ok) { 471 what |= BEV_EVENT_ERROR; 472 bev_a->ok = 0; 473 _bufferevent_run_eventcb(bev, what); 474 } else if (!nbytes) { 475 what |= BEV_EVENT_EOF; 476 bev_a->ok = 0; 477 _bufferevent_run_eventcb(bev, what); 478 } 479 } 480 481 _bufferevent_decref_and_unlock(bev); 482 } 483 484 static void 485 write_complete(struct event_overlapped *eo, ev_uintptr_t key, 486 ev_ssize_t nbytes, int ok) 487 { 488 struct bufferevent_async *bev_a = upcast_write(eo); 489 struct bufferevent *bev = &bev_a->bev.bev; 490 short what = BEV_EVENT_WRITING; 491 ev_ssize_t amount_unwritten; 492 493 BEV_LOCK(bev); 494 EVUTIL_ASSERT(bev_a->write_in_progress); 495 496 amount_unwritten = bev_a->write_in_progress - nbytes; 497 evbuffer_commit_write(bev->output, nbytes); 498 bev_a->write_in_progress = 0; 499 500 if (amount_unwritten) 501 _bufferevent_decrement_write_buckets(&bev_a->bev, 502 -amount_unwritten); 503 504 505 if (!ok) 506 bev_async_set_wsa_error(bev, eo); 507 508 if (bev_a->ok) { 509 if (ok && nbytes) { 510 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 511 if (evbuffer_get_length(bev->output) <= 512 bev->wm_write.low) 513 _bufferevent_run_writecb(bev); 514 bev_async_consider_writing(bev_a); 515 } else if (!ok) { 516 what |= BEV_EVENT_ERROR; 517 bev_a->ok = 0; 518 _bufferevent_run_eventcb(bev, what); 519 } else if (!nbytes) { 520 what |= BEV_EVENT_EOF; 521 bev_a->ok = 0; 522 _bufferevent_run_eventcb(bev, what); 523 } 524 } 525 526 _bufferevent_decref_and_unlock(bev); 527 } 528 529 struct bufferevent * 530 bufferevent_async_new(struct event_base *base, 531 evutil_socket_t fd, int options) 532 { 533 struct bufferevent_async *bev_a; 534 struct bufferevent *bev; 535 struct event_iocp_port *iocp; 536 537 options |= BEV_OPT_THREADSAFE; 538 539 if (!(iocp = event_base_get_iocp(base))) 540 return NULL; 541 542 if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { 543 int err = GetLastError(); 544 /* We may have alrady associated this fd with a port. 545 * Let's hope it's this port, and that the error code 546 * for doing this neer changes. */ 547 if (err != ERROR_INVALID_PARAMETER) 548 return NULL; 549 } 550 551 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 552 return NULL; 553 554 bev = &bev_a->bev.bev; 555 if (!(bev->input = evbuffer_overlapped_new(fd))) { 556 mm_free(bev_a); 557 return NULL; 558 } 559 if (!(bev->output = evbuffer_overlapped_new(fd))) { 560 evbuffer_free(bev->input); 561 mm_free(bev_a); 562 return NULL; 563 } 564 565 if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, 566 options)<0) 567 goto err; 568 569 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 570 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 571 572 event_overlapped_init(&bev_a->connect_overlapped, connect_complete); 573 event_overlapped_init(&bev_a->read_overlapped, read_complete); 574 event_overlapped_init(&bev_a->write_overlapped, write_complete); 575 576 bev_a->ok = fd >= 0; 577 if (bev_a->ok) 578 _bufferevent_init_generic_timeout_cbs(bev); 579 580 return bev; 581 err: 582 bufferevent_free(&bev_a->bev.bev); 583 return NULL; 584 } 585 586 void 587 bufferevent_async_set_connected(struct bufferevent *bev) 588 { 589 struct bufferevent_async *bev_async = upcast(bev); 590 bev_async->ok = 1; 591 _bufferevent_init_generic_timeout_cbs(bev); 592 /* Now's a good time to consider reading/writing */ 593 be_async_enable(bev, bev->enabled); 594 } 595 596 int 597 bufferevent_async_can_connect(struct bufferevent *bev) 598 { 599 const struct win32_extension_fns *ext = 600 event_get_win32_extension_fns(); 601 602 if (BEV_IS_ASYNC(bev) && 603 event_base_get_iocp(bev->ev_base) && 604 ext && ext->ConnectEx) 605 return 1; 606 607 return 0; 608 } 609 610 int 611 bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, 612 const struct sockaddr *sa, int socklen) 613 { 614 BOOL rc; 615 struct bufferevent_async *bev_async = upcast(bev); 616 struct sockaddr_storage ss; 617 const struct win32_extension_fns *ext = 618 event_get_win32_extension_fns(); 619 620 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 621 622 /* ConnectEx() requires that the socket be bound to an address 623 * with bind() before using, otherwise it will fail. We attempt 624 * to issue a bind() here, taking into account that the error 625 * code is set to WSAEINVAL when the socket is already bound. */ 626 memset(&ss, 0, sizeof(ss)); 627 if (sa->sa_family == AF_INET) { 628 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 629 sin->sin_family = AF_INET; 630 sin->sin_addr.s_addr = INADDR_ANY; 631 } else if (sa->sa_family == AF_INET6) { 632 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 633 sin6->sin6_family = AF_INET6; 634 sin6->sin6_addr = in6addr_any; 635 } else { 636 /* Well, the user will have to bind() */ 637 return -1; 638 } 639 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 640 WSAGetLastError() != WSAEINVAL) 641 return -1; 642 643 event_base_add_virtual(bev->ev_base); 644 bufferevent_incref(bev); 645 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 646 &bev_async->connect_overlapped.overlapped); 647 if (rc || WSAGetLastError() == ERROR_IO_PENDING) 648 return 0; 649 650 event_base_del_virtual(bev->ev_base); 651 bufferevent_decref(bev); 652 653 return -1; 654 } 655 656 static int 657 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 658 union bufferevent_ctrl_data *data) 659 { 660 switch (op) { 661 case BEV_CTRL_GET_FD: 662 data->fd = _evbuffer_overlapped_get_fd(bev->input); 663 return 0; 664 case BEV_CTRL_SET_FD: { 665 struct event_iocp_port *iocp; 666 667 if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) 668 return 0; 669 if (!(iocp = event_base_get_iocp(bev->ev_base))) 670 return -1; 671 if (event_iocp_port_associate(iocp, data->fd, 1) < 0) 672 return -1; 673 _evbuffer_overlapped_set_fd(bev->input, data->fd); 674 _evbuffer_overlapped_set_fd(bev->output, data->fd); 675 return 0; 676 } 677 case BEV_CTRL_CANCEL_ALL: { 678 struct bufferevent_async *bev_a = upcast(bev); 679 evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); 680 if (fd != (evutil_socket_t)INVALID_SOCKET && 681 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 682 closesocket(fd); 683 } 684 bev_a->ok = 0; 685 return 0; 686 } 687 case BEV_CTRL_GET_UNDERLYING: 688 default: 689 return -1; 690 } 691 } 692 693 694