1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.websocket; 27 28 import jdk.internal.net.http.common.Demand; 29 import jdk.internal.net.http.common.Log; 30 import jdk.internal.net.http.common.Logger; 31 import jdk.internal.net.http.common.MinimalFuture; 32 import jdk.internal.net.http.common.SequentialScheduler; 33 import jdk.internal.net.http.common.Utils; 34 import jdk.internal.net.http.websocket.OpeningHandshake.Result; 35 36 import java.io.IOException; 37 import java.lang.ref.Reference; 38 import java.net.ProtocolException; 39 import java.net.URI; 40 import java.net.http.WebSocket; 41 import java.nio.ByteBuffer; 42 import java.nio.CharBuffer; 43 import java.nio.charset.CharacterCodingException; 44 import java.nio.charset.CharsetEncoder; 45 import java.nio.charset.CodingErrorAction; 46 import java.nio.charset.StandardCharsets; 47 import java.util.Objects; 48 import java.util.concurrent.CompletableFuture; 49 import java.util.concurrent.CompletionStage; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 import java.util.concurrent.atomic.AtomicLong; 52 import java.util.concurrent.atomic.AtomicReference; 53 import java.util.function.BiConsumer; 54 import java.util.function.Function; 55 56 import static java.util.Objects.requireNonNull; 57 import static jdk.internal.net.http.common.MinimalFuture.failedFuture; 58 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY; 59 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE; 60 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient; 61 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY; 62 import static jdk.internal.net.http.websocket.WebSocketImpl.State.CLOSE; 63 import static jdk.internal.net.http.websocket.WebSocketImpl.State.ERROR; 64 import static jdk.internal.net.http.websocket.WebSocketImpl.State.IDLE; 65 import static jdk.internal.net.http.websocket.WebSocketImpl.State.OPEN; 66 import static jdk.internal.net.http.websocket.WebSocketImpl.State.PING; 67 import static jdk.internal.net.http.websocket.WebSocketImpl.State.PONG; 68 import static jdk.internal.net.http.websocket.WebSocketImpl.State.TEXT; 69 import static jdk.internal.net.http.websocket.WebSocketImpl.State.WAITING; 70 71 /* 72 * A WebSocket client. 73 */ 74 public final class WebSocketImpl implements WebSocket { 75 76 private static final Logger debug = 77 Utils.getWebSocketLogger("[WebSocket]"::toString, Utils.DEBUG_WS); 78 private final AtomicLong sendCounter = new AtomicLong(); 79 private final AtomicLong receiveCounter = new AtomicLong(); 80 81 enum State { 82 OPEN, 83 IDLE, 84 WAITING, 85 TEXT, 86 BINARY, 87 PING, 88 PONG, 89 CLOSE, 90 ERROR 91 } 92 93 private final AtomicReference<ByteBuffer> lastAutomaticPong = new AtomicReference<>(); 94 private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this); 95 private volatile boolean inputClosed; 96 private final AtomicBoolean outputClosed = new AtomicBoolean(); 97 98 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 99 100 /* Components of calls to Listener's methods */ 101 private boolean last; 102 private ByteBuffer binaryData; 103 private CharSequence text; 104 private int statusCode; 105 private String reason; 106 private final AtomicReference<Throwable> error = new AtomicReference<>(); 107 108 private final URI uri; 109 private final String subprotocol; 110 private final Listener listener; 111 112 private final AtomicBoolean pendingTextOrBinary = new AtomicBoolean(); 113 private final AtomicBoolean pendingPingOrPong = new AtomicBoolean(); 114 private final Transport transport; 115 private final SequentialScheduler receiveScheduler 116 = new SequentialScheduler(new ReceiveTask()); 117 private final Demand demand = new Demand(); 118 newInstanceAsync(BuilderImpl b)119 public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) { 120 Function<Result, WebSocket> newWebSocket = r -> { 121 WebSocket ws = newInstance(b.getUri(), 122 r.subprotocol, 123 b.getListener(), 124 r.transport); 125 // Make sure we don't release the builder until this lambda 126 // has been executed. The builder has a strong reference to 127 // the HttpClientFacade, and we want to keep that live until 128 // after the raw channel is created and passed to WebSocketImpl. 129 Reference.reachabilityFence(b); 130 return ws; 131 }; 132 OpeningHandshake h; 133 try { 134 h = new OpeningHandshake(b); 135 } catch (Throwable e) { 136 return failedFuture(e); 137 } 138 return h.send().thenApply(newWebSocket); 139 } 140 141 /* Exposed for testing purposes */ newInstance(URI uri, String subprotocol, Listener listener, TransportFactory transport)142 static WebSocketImpl newInstance(URI uri, 143 String subprotocol, 144 Listener listener, 145 TransportFactory transport) { 146 WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport); 147 // This initialisation is outside of the constructor for the sake of 148 // safe publication of WebSocketImpl.this 149 ws.signalOpen(); 150 return ws; 151 } 152 WebSocketImpl(URI uri, String subprotocol, Listener listener, TransportFactory transportFactory)153 private WebSocketImpl(URI uri, 154 String subprotocol, 155 Listener listener, 156 TransportFactory transportFactory) { 157 this.uri = requireNonNull(uri); 158 this.subprotocol = requireNonNull(subprotocol); 159 this.listener = requireNonNull(listener); 160 // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close + 161 // 2 automatic Ping replies + 1 automatic Close = 6 messages 162 // Why 2 automatic Pong replies? One is being sent, but the byte buffer 163 // has been set to null, another just has been added. 164 this.transport = transportFactory.createTransport(new MessageQueue(6), 165 new SignallingMessageConsumer()); 166 } 167 168 // FIXME: add to action handling of errors -> signalError() 169 170 @Override sendText(CharSequence message, boolean last)171 public CompletableFuture<WebSocket> sendText(CharSequence message, 172 boolean last) { 173 Objects.requireNonNull(message); 174 long id = 0; 175 if (debug.on()) { 176 id = sendCounter.incrementAndGet(); 177 debug.log("enter send text %s payload length=%s last=%s", 178 id, message.length(), last); 179 } 180 CompletableFuture<WebSocket> result; 181 if (!setPendingTextOrBinary()) { 182 result = failedFuture(new IllegalStateException("Send pending")); 183 } else { 184 result = transport.sendText(message, last, this, 185 (r, e) -> clearPendingTextOrBinary()); 186 } 187 if (debug.on()) { 188 debug.log("exit send text %s returned %s", id, result); 189 } 190 191 return replaceNull(result); 192 } 193 194 @Override sendBinary(ByteBuffer message, boolean last)195 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, 196 boolean last) { 197 Objects.requireNonNull(message); 198 long id = 0; 199 if (debug.on()) { 200 id = sendCounter.incrementAndGet(); 201 debug.log("enter send binary %s payload=%s last=%s", 202 id, message, last); 203 } 204 CompletableFuture<WebSocket> result; 205 if (!setPendingTextOrBinary()) { 206 result = failedFuture(new IllegalStateException("Send pending")); 207 } else { 208 result = transport.sendBinary(message, last, this, 209 (r, e) -> clearPendingTextOrBinary()); 210 } 211 if (debug.on()) { 212 debug.log("exit send binary %s returned %s", id, result); 213 } 214 return replaceNull(result); 215 } 216 clearPendingTextOrBinary()217 private void clearPendingTextOrBinary() { 218 pendingTextOrBinary.set(false); 219 } 220 setPendingTextOrBinary()221 private boolean setPendingTextOrBinary() { 222 return pendingTextOrBinary.compareAndSet(false, true); 223 } 224 replaceNull( CompletableFuture<WebSocket> cf)225 private CompletableFuture<WebSocket> replaceNull( 226 CompletableFuture<WebSocket> cf) 227 { 228 if (cf == null) { 229 return DONE; 230 } else { 231 return cf; 232 } 233 } 234 235 @Override sendPing(ByteBuffer message)236 public CompletableFuture<WebSocket> sendPing(ByteBuffer message) { 237 Objects.requireNonNull(message); 238 long id = 0; 239 if (debug.on()) { 240 id = sendCounter.incrementAndGet(); 241 debug.log("enter send ping %s payload=%s", id, message); 242 } 243 CompletableFuture<WebSocket> result; 244 if (!setPendingPingOrPong()) { 245 result = failedFuture(new IllegalStateException("Send pending")); 246 } else { 247 result = transport.sendPing(message, this, 248 (r, e) -> clearPendingPingOrPong()); 249 } 250 if (debug.on()) { 251 debug.log("exit send ping %s returned %s", id, result); 252 } 253 return replaceNull(result); 254 } 255 256 @Override sendPong(ByteBuffer message)257 public CompletableFuture<WebSocket> sendPong(ByteBuffer message) { 258 Objects.requireNonNull(message); 259 long id = 0; 260 if (debug.on()) { 261 id = sendCounter.incrementAndGet(); 262 debug.log("enter send pong %s payload=%s", id, message); 263 } 264 CompletableFuture<WebSocket> result; 265 if (!setPendingPingOrPong()) { 266 result = failedFuture(new IllegalStateException("Send pending")); 267 } else { 268 result = transport.sendPong(message, this, 269 (r, e) -> clearPendingPingOrPong()); 270 } 271 if (debug.on()) { 272 debug.log("exit send pong %s returned %s", id, result); 273 } 274 return replaceNull(result); 275 } 276 setPendingPingOrPong()277 private boolean setPendingPingOrPong() { 278 return pendingPingOrPong.compareAndSet(false, true); 279 } 280 clearPendingPingOrPong()281 private void clearPendingPingOrPong() { 282 pendingPingOrPong.set(false); 283 } 284 285 @Override sendClose(int statusCode, String reason)286 public CompletableFuture<WebSocket> sendClose(int statusCode, 287 String reason) { 288 Objects.requireNonNull(reason); 289 long id = 0; 290 if (debug.on()) { 291 id = sendCounter.incrementAndGet(); 292 debug.log("enter send close %s statusCode=%s reason.length=%s", 293 id, statusCode, reason.length()); 294 } 295 CompletableFuture<WebSocket> result; 296 // Close message is the only type of message whose validity is checked 297 // in the corresponding send method. This is made in order to close the 298 // output in place. Otherwise the number of Close messages in queue 299 // would not be bounded. 300 if (!isLegalToSendFromClient(statusCode)) { 301 result = failedFuture(new IllegalArgumentException("statusCode")); 302 } else if (!isLegalReason(reason)) { 303 result = failedFuture(new IllegalArgumentException("reason")); 304 } else if (!outputClosed.compareAndSet(false, true)){ 305 result = failedFuture(new IOException("Output closed")); 306 } else { 307 result = sendClose0(statusCode, reason); 308 } 309 if (debug.on()) { 310 debug.log("exit send close %s returned %s", id, result); 311 } 312 return replaceNull(result); 313 } 314 isLegalReason(String reason)315 private static boolean isLegalReason(String reason) { 316 if (reason.length() > 123) { // quick check 317 return false; 318 } 319 CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder() 320 .onMalformedInput(CodingErrorAction.REPORT) 321 .onUnmappableCharacter(CodingErrorAction.REPORT); 322 ByteBuffer bytes; 323 try { 324 bytes = encoder.encode(CharBuffer.wrap(reason)); 325 } catch (CharacterCodingException ignored) { 326 return false; 327 } 328 return bytes.remaining() <= 123; 329 } 330 331 /* 332 * The implementation uses this method internally to send Close messages 333 * with codes that are not allowed to be sent through the API. 334 */ sendClose0(int statusCode, String reason)335 private CompletableFuture<WebSocket> sendClose0(int statusCode, 336 String reason) { 337 return transport.sendClose(statusCode, reason, this, 338 (r, e) -> processCloseError(e)); 339 } 340 processCloseError(Throwable e)341 private void processCloseError(Throwable e) { 342 if (e == null) { 343 debug.log("send close completed successfully"); 344 } else { 345 debug.log("send close completed with error", e); 346 } 347 outputClosed.set(true); 348 try { 349 transport.closeOutput(); 350 } catch (IOException ignored) { } 351 } 352 353 @Override request(long n)354 public void request(long n) { 355 if (debug.on()) { 356 debug.log("request %s", n); 357 } 358 if (demand.increase(n)) { 359 receiveScheduler.runOrSchedule(); 360 } 361 } 362 363 @Override getSubprotocol()364 public String getSubprotocol() { 365 return subprotocol; 366 } 367 368 @Override isOutputClosed()369 public boolean isOutputClosed() { 370 return outputClosed.get(); 371 } 372 373 @Override isInputClosed()374 public boolean isInputClosed() { 375 return inputClosed; 376 } 377 378 @Override abort()379 public void abort() { 380 if (debug.on()) { 381 debug.log("abort"); 382 } 383 inputClosed = true; 384 outputClosed.set(true); 385 receiveScheduler.stop(); 386 close(); 387 } 388 389 @Override toString()390 public String toString() { 391 return super.toString() 392 + "[uri=" + uri 393 + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "") 394 + "]"; 395 } 396 397 /* 398 * The assumptions about order is as follows: 399 * 400 * - state is never changed more than twice inside the `run` method: 401 * x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or 402 * overwriting parts of messages creating a mess since there's no 403 * queueing) 404 * - OPEN is always the first state 405 * - no messages are requested/delivered before onOpen is called (this 406 * is implemented by making WebSocket instance accessible first in 407 * onOpen) 408 * - after the state has been observed as CLOSE/ERROR, the scheduler 409 * is stopped 410 */ 411 private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask { 412 413 // Transport only asked here and nowhere else because we must make sure 414 // onOpen is invoked first and no messages become pending before onOpen 415 // finishes 416 417 @Override run()418 public void run() { 419 if (debug.on()) { 420 debug.log("enter receive task"); 421 } 422 loop: 423 while (!receiveScheduler.isStopped()) { 424 State s = state.get(); 425 if (debug.on()) { 426 debug.log("receive state: %s", s); 427 } 428 try { 429 switch (s) { 430 case OPEN: 431 processOpen(); 432 tryChangeState(OPEN, IDLE); 433 break; 434 case TEXT: 435 processText(); 436 tryChangeState(TEXT, IDLE); 437 break; 438 case BINARY: 439 processBinary(); 440 tryChangeState(BINARY, IDLE); 441 break; 442 case PING: 443 processPing(); 444 tryChangeState(PING, IDLE); 445 break; 446 case PONG: 447 processPong(); 448 tryChangeState(PONG, IDLE); 449 break; 450 case CLOSE: 451 processClose(); 452 break loop; 453 case ERROR: 454 processError(); 455 break loop; 456 case IDLE: 457 if (demand.tryDecrement() 458 && tryChangeState(IDLE, WAITING)) { 459 transport.request(1); 460 } 461 break loop; 462 case WAITING: 463 // For debugging spurious signalling: when there was 464 // a signal, but apparently nothing has changed 465 break loop; 466 default: 467 throw new InternalError(String.valueOf(s)); 468 } 469 } catch (Throwable t) { 470 signalError(t); 471 } 472 } 473 if (debug.on()) { 474 debug.log("exit receive task"); 475 } 476 } 477 processError()478 private void processError() throws IOException { 479 if (debug.on()) { 480 debug.log("processError"); 481 } 482 transport.closeInput(); 483 receiveScheduler.stop(); 484 Throwable err = error.get(); 485 if (err instanceof FailWebSocketException) { 486 int code1 = ((FailWebSocketException) err).getStatusCode(); 487 err = new ProtocolException().initCause(err); 488 if (debug.on()) { 489 debug.log("failing %s with error=%s statusCode=%s", 490 WebSocketImpl.this, err, code1); 491 } 492 sendCloseSilently(code1); 493 } 494 long id = 0; 495 if (debug.on()) { 496 id = receiveCounter.incrementAndGet(); 497 debug.log("enter onError %s error=%s", id, err); 498 } 499 try { 500 listener.onError(WebSocketImpl.this, err); 501 } finally { 502 if (debug.on()) { 503 debug.log("exit onError %s", id); 504 } 505 } 506 } 507 processClose()508 private void processClose() throws IOException { 509 debug.log("processClose"); 510 transport.closeInput(); 511 receiveScheduler.stop(); 512 CompletionStage<?> cs = null; // when the listener is ready to close 513 long id = 0; 514 if (debug.on()) { 515 id = receiveCounter.incrementAndGet(); 516 debug.log("enter onClose %s statusCode=%s reason.length=%s", 517 id, statusCode, reason.length()); 518 } 519 try { 520 cs = listener.onClose(WebSocketImpl.this, statusCode, reason); 521 } finally { 522 debug.log("exit onClose %s returned %s", id, cs); 523 } 524 if (cs == null) { 525 cs = DONE; 526 } 527 int code; 528 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { 529 code = NORMAL_CLOSURE; 530 debug.log("using statusCode %s instead of %s", 531 statusCode, code); 532 533 } else { 534 code = statusCode; 535 } 536 cs.whenComplete((r, e) -> { 537 if (debug.on()) { 538 debug.log("CompletionStage returned by onClose completed result=%s error=%s", 539 r, e); 540 } 541 sendCloseSilently(code); 542 }); 543 } 544 processPong()545 private void processPong() { 546 long id = 0; 547 if (debug.on()) { 548 id = receiveCounter.incrementAndGet(); 549 debug.log("enter onPong %s payload=%s", 550 id, binaryData); 551 } 552 CompletionStage<?> cs = null; 553 try { 554 cs = listener.onPong(WebSocketImpl.this, binaryData); 555 } finally { 556 if (debug.on()) { 557 debug.log("exit onPong %s returned %s", id, cs); 558 } 559 } 560 } 561 processPing()562 private void processPing() { 563 if (debug.on()) { 564 debug.log("processPing"); 565 } 566 // A full copy of this (small) data is made. This way sending a 567 // replying Pong could be done in parallel with the listener 568 // handling this Ping. 569 ByteBuffer slice = binaryData.slice(); 570 if (!outputClosed.get()) { 571 ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining()) 572 .put(binaryData) 573 .flip(); 574 if (!trySwapAutomaticPong(copy)) { 575 // Non-exclusive send; 576 BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> { 577 if (e != null) { // TODO: better error handing. What if already closed? 578 signalError(Utils.getCompletionCause(e)); 579 } 580 }; 581 transport.sendPong(WebSocketImpl.this::clearAutomaticPong, 582 WebSocketImpl.this, 583 reporter); 584 } 585 } 586 long id = 0; 587 if (debug.on()) { 588 id = receiveCounter.incrementAndGet(); 589 debug.log("enter onPing %s payload=%s", id, slice); 590 } 591 CompletionStage<?> cs = null; 592 try { 593 cs = listener.onPing(WebSocketImpl.this, slice); 594 } finally { 595 if (debug.on()) { 596 debug.log("exit onPing %s returned %s", id, cs); 597 } 598 } 599 } 600 processBinary()601 private void processBinary() { 602 long id = 0; 603 if (debug.on()) { 604 id = receiveCounter.incrementAndGet(); 605 debug.log("enter onBinary %s payload=%s last=%s", 606 id, binaryData, last); 607 } 608 CompletionStage<?> cs = null; 609 try { 610 cs = listener.onBinary(WebSocketImpl.this, binaryData, last); 611 } finally { 612 if (debug.on()) { 613 debug.log("exit onBinary %s returned %s", id, cs); 614 } 615 } 616 } 617 processText()618 private void processText() { 619 long id = 0; 620 if (debug.on()) { 621 id = receiveCounter.incrementAndGet(); 622 debug.log("enter onText %s payload.length=%s last=%s", 623 id, text.length(), last); 624 } 625 CompletionStage<?> cs = null; 626 try { 627 cs = listener.onText(WebSocketImpl.this, text, last); 628 } finally { 629 if (debug.on()) { 630 debug.log("exit onText %s returned %s", id, cs); 631 } 632 } 633 } 634 processOpen()635 private void processOpen() { 636 long id = 0; 637 if (debug.on()) { 638 id = receiveCounter.incrementAndGet(); 639 debug.log("enter onOpen %s", id); 640 } 641 try { 642 listener.onOpen(WebSocketImpl.this); 643 } finally { 644 if (debug.on()) { 645 debug.log("exit onOpen %s", id); 646 } 647 } 648 } 649 } 650 sendCloseSilently(int statusCode)651 private void sendCloseSilently(int statusCode) { 652 sendClose0(statusCode, "").whenComplete((r, e) -> { 653 if (e != null) { 654 if (debug.on()) { 655 debug.log("automatic closure completed with error", 656 (Object) e); 657 } 658 } 659 }); 660 } 661 clearAutomaticPong()662 private ByteBuffer clearAutomaticPong() { 663 ByteBuffer data; 664 do { 665 data = lastAutomaticPong.get(); 666 if (data == null) { 667 // This method must never be called unless a message that is 668 // using it has been added previously 669 throw new InternalError(); 670 } 671 } while (!lastAutomaticPong.compareAndSet(data, null)); 672 return data; 673 } 674 675 // bound pings trySwapAutomaticPong(ByteBuffer copy)676 private boolean trySwapAutomaticPong(ByteBuffer copy) { 677 ByteBuffer message; 678 boolean swapped; 679 while (true) { 680 message = lastAutomaticPong.get(); 681 if (message == null) { 682 if (!lastAutomaticPong.compareAndSet(null, copy)) { 683 // It's only this method that can change null to ByteBuffer, 684 // and this method is invoked at most by one thread at a 685 // time. Thus no failure in the atomic operation above is 686 // expected. 687 throw new InternalError(); 688 } 689 swapped = false; 690 break; 691 } else if (lastAutomaticPong.compareAndSet(message, copy)) { 692 swapped = true; 693 break; 694 } 695 } 696 if (debug.on()) { 697 debug.log("swapped automatic pong from %s to %s", 698 message, copy); 699 } 700 return swapped; 701 } 702 signalOpen()703 private void signalOpen() { 704 debug.log("signalOpen"); 705 receiveScheduler.runOrSchedule(); 706 } 707 signalError(Throwable error)708 private void signalError(Throwable error) { 709 if (debug.on()) { 710 debug.log("signalError %s", (Object) error); 711 } 712 inputClosed = true; 713 outputClosed.set(true); 714 if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) { 715 if (debug.on()) { 716 debug.log("signalError", error); 717 } 718 Log.logError(error); 719 } else { 720 close(); 721 } 722 } 723 close()724 private void close() { 725 if (debug.on()) { 726 debug.log("close"); 727 } 728 Throwable first = null; 729 try { 730 transport.closeInput(); 731 } catch (Throwable t1) { 732 first = t1; 733 } finally { 734 Throwable second = null; 735 try { 736 transport.closeOutput(); 737 } catch (Throwable t2) { 738 second = t2; 739 } finally { 740 Throwable e = null; 741 if (first != null && second != null) { 742 first.addSuppressed(second); 743 e = first; 744 } else if (first != null) { 745 e = first; 746 } else if (second != null) { 747 e = second; 748 } 749 if (e != null) { 750 if (debug.on()) { 751 debug.log("exception in close", e); 752 } 753 } 754 } 755 } 756 } 757 signalClose(int statusCode, String reason)758 private void signalClose(int statusCode, String reason) { 759 // FIXME: make sure no race reason & close are not intermixed 760 inputClosed = true; 761 this.statusCode = statusCode; 762 this.reason = reason; 763 boolean managed = trySetState(CLOSE); 764 if (debug.on()) { 765 debug.log("signalClose statusCode=%s reason.length=%s: %s", 766 statusCode, reason.length(), managed); 767 } 768 if (managed) { 769 try { 770 transport.closeInput(); 771 } catch (Throwable t) { 772 if (debug.on()) { 773 debug.log("exception closing input", (Object) t); 774 } 775 } 776 } 777 } 778 779 private class SignallingMessageConsumer implements MessageStreamConsumer { 780 781 @Override onText(CharSequence data, boolean last)782 public void onText(CharSequence data, boolean last) { 783 transport.acknowledgeReception(); 784 text = data; 785 WebSocketImpl.this.last = last; 786 tryChangeState(WAITING, TEXT); 787 } 788 789 @Override onBinary(ByteBuffer data, boolean last)790 public void onBinary(ByteBuffer data, boolean last) { 791 transport.acknowledgeReception(); 792 binaryData = data; 793 WebSocketImpl.this.last = last; 794 tryChangeState(WAITING, BINARY); 795 } 796 797 @Override onPing(ByteBuffer data)798 public void onPing(ByteBuffer data) { 799 transport.acknowledgeReception(); 800 binaryData = data; 801 tryChangeState(WAITING, PING); 802 } 803 804 @Override onPong(ByteBuffer data)805 public void onPong(ByteBuffer data) { 806 transport.acknowledgeReception(); 807 binaryData = data; 808 tryChangeState(WAITING, PONG); 809 } 810 811 @Override onClose(int statusCode, CharSequence reason)812 public void onClose(int statusCode, CharSequence reason) { 813 transport.acknowledgeReception(); 814 signalClose(statusCode, reason.toString()); 815 } 816 817 @Override onComplete()818 public void onComplete() { 819 transport.acknowledgeReception(); 820 signalClose(CLOSED_ABNORMALLY, ""); 821 } 822 823 @Override onError(Throwable error)824 public void onError(Throwable error) { 825 signalError(error); 826 } 827 } 828 trySetState(State newState)829 private boolean trySetState(State newState) { 830 State currentState; 831 boolean success = false; 832 while (true) { 833 currentState = state.get(); 834 if (currentState == ERROR || currentState == CLOSE) { 835 break; 836 } else if (state.compareAndSet(currentState, newState)) { 837 receiveScheduler.runOrSchedule(); 838 success = true; 839 break; 840 } 841 } 842 if (debug.on()) { 843 debug.log("set state %s (previous %s) %s", 844 newState, currentState, success); 845 } 846 return success; 847 } 848 tryChangeState(State expectedState, State newState)849 private boolean tryChangeState(State expectedState, State newState) { 850 State witness = state.compareAndExchange(expectedState, newState); 851 boolean success = false; 852 if (witness == expectedState) { 853 receiveScheduler.runOrSchedule(); 854 success = true; 855 } else if (witness != ERROR && witness != CLOSE) { 856 // This should be the only reason for inability to change the state 857 // from IDLE to WAITING: the state has changed to terminal 858 throw new InternalError(); 859 } 860 if (debug.on()) { 861 debug.log("change state from %s to %s %s", 862 expectedState, newState, success); 863 } 864 return success; 865 } 866 867 /* Exposed for testing purposes */ transport()868 protected Transport transport() { 869 return transport; 870 } 871 } 872