1 /* 2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.websocket; 27 28 import jdk.internal.net.http.common.Demand; 29 import jdk.internal.net.http.common.Logger; 30 import jdk.internal.net.http.common.MinimalFuture; 31 import jdk.internal.net.http.common.SequentialScheduler; 32 import jdk.internal.net.http.common.SequentialScheduler.CompleteRestartableTask; 33 import jdk.internal.net.http.common.Utils; 34 35 import java.io.IOException; 36 import java.lang.System.Logger.Level; 37 import java.nio.ByteBuffer; 38 import java.nio.CharBuffer; 39 import java.nio.channels.SelectionKey; 40 import java.util.concurrent.CompletableFuture; 41 import java.util.concurrent.atomic.AtomicLong; 42 import java.util.concurrent.atomic.AtomicReference; 43 import java.util.function.BiConsumer; 44 import java.util.function.Supplier; 45 46 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE; 47 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED; 48 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED; 49 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING; 50 51 public class TransportImpl implements Transport { 52 53 // -- Debugging infrastructure -- 54 55 private static final Logger debug = 56 Utils.getWebSocketLogger("[Transport]"::toString, Utils.DEBUG_WS); 57 58 /* Used for correlating enters to and exists from a method */ 59 private final AtomicLong counter = new AtomicLong(); 60 61 private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask()); 62 63 private final MessageQueue queue; 64 private final MessageEncoder encoder = new MessageEncoder(); 65 /* A reusable buffer for writing, initially with no remaining bytes */ 66 private final ByteBuffer dst = createWriteBuffer().position(0).limit(0); 67 /* This array is created once for gathering writes accepted by RawChannel */ 68 private final ByteBuffer[] dstArray = new ByteBuffer[]{dst}; 69 private final MessageStreamConsumer messageConsumer; 70 private final MessageDecoder decoder; 71 private final Frame.Reader reader = new Frame.Reader(); 72 73 private final Demand demand = new Demand(); 74 private final SequentialScheduler receiveScheduler; 75 private final RawChannel channel; 76 private final Object closeLock = new Object(); 77 private final RawChannel.RawEvent writeEvent = new WriteEvent(); 78 private final RawChannel.RawEvent readEvent = new ReadEvent(); 79 private final AtomicReference<ChannelState> writeState 80 = new AtomicReference<>(UNREGISTERED); 81 private ByteBuffer data; 82 private volatile ChannelState readState = UNREGISTERED; 83 private boolean inputClosed; 84 private boolean outputClosed; 85 TransportImpl(MessageQueue queue, MessageStreamConsumer consumer, RawChannel channel)86 public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer, 87 RawChannel channel) { 88 this.queue = queue; 89 this.messageConsumer = consumer; 90 this.channel = channel; 91 this.decoder = new MessageDecoder(this.messageConsumer); 92 this.data = channel.initialByteBuffer(); 93 // To ensure the initial non-final `data` will be visible 94 // (happens-before) when `readEvent.handle()` invokes `receiveScheduler` 95 // the following assignment is done last: 96 receiveScheduler = new SequentialScheduler(new ReceiveTask()); 97 } 98 createWriteBuffer()99 private ByteBuffer createWriteBuffer() { 100 String name = "jdk.httpclient.websocket.writeBufferSize"; 101 int capacity = Utils.getIntegerNetProperty(name, 16384); 102 if (debug.on()) { 103 debug.log("write buffer capacity %s", capacity); 104 } 105 106 // TODO (optimization?): allocateDirect if SSL? 107 return ByteBuffer.allocate(capacity); 108 } 109 write()110 private boolean write() throws IOException { 111 if (debug.on()) { 112 debug.log("writing to the channel"); 113 } 114 long count = channel.write(dstArray, 0, dstArray.length); 115 if (debug.on()) { 116 debug.log("%s bytes written", count); 117 } 118 for (ByteBuffer b : dstArray) { 119 if (b.hasRemaining()) { 120 return false; 121 } 122 } 123 return true; 124 } 125 126 @Override sendText(CharSequence message, boolean isLast, T attachment, BiConsumer<? super T, ? super Throwable> action)127 public <T> CompletableFuture<T> sendText(CharSequence message, 128 boolean isLast, 129 T attachment, 130 BiConsumer<? super T, ? super Throwable> action) { 131 long id = 0; 132 if (debug.on()) { 133 id = counter.incrementAndGet(); 134 debug.log("enter send text %s message.length=%s last=%s", 135 id, message.length(), isLast); 136 } 137 // TODO (optimization?): 138 // These sendXXX methods might be a good place to decide whether or not 139 // we can write straight ahead, possibly returning null instead of 140 // creating a CompletableFuture 141 142 // Even if the text is already CharBuffer, the client will not be happy 143 // if they discover the position is changing. So, no instanceof 144 // cheating, wrap always. 145 CharBuffer text = CharBuffer.wrap(message); 146 MinimalFuture<T> f = new MinimalFuture<>(); 147 try { 148 queue.addText(text, isLast, attachment, action, f); 149 sendScheduler.runOrSchedule(); 150 } catch (IOException e) { 151 action.accept(null, e); 152 f.completeExceptionally(e); 153 } 154 if (debug.on()) { 155 debug.log("exit send text %s returned %s", id, f); 156 } 157 return f; 158 } 159 160 @Override sendBinary(ByteBuffer message, boolean isLast, T attachment, BiConsumer<? super T, ? super Throwable> action)161 public <T> CompletableFuture<T> sendBinary(ByteBuffer message, 162 boolean isLast, 163 T attachment, 164 BiConsumer<? super T, ? super Throwable> action) { 165 long id = 0; 166 if (debug.on()) { 167 id = counter.incrementAndGet(); 168 debug.log("enter send binary %s message.remaining=%s last=%s", 169 id, message.remaining(), isLast); 170 } 171 MinimalFuture<T> f = new MinimalFuture<>(); 172 try { 173 queue.addBinary(message, isLast, attachment, action, f); 174 sendScheduler.runOrSchedule(); 175 } catch (IOException e) { 176 action.accept(null, e); 177 f.completeExceptionally(e); 178 } 179 if (debug.on()) { 180 debug.log("exit send binary %s returned %s", id, f); 181 } 182 return f; 183 } 184 185 @Override sendPing(ByteBuffer message, T attachment, BiConsumer<? super T, ? super Throwable> action)186 public <T> CompletableFuture<T> sendPing(ByteBuffer message, 187 T attachment, 188 BiConsumer<? super T, ? super Throwable> action) { 189 long id = 0; 190 if (debug.on()) { 191 id = counter.incrementAndGet(); 192 debug.log("enter send ping %s message.remaining=%s", 193 id, message.remaining()); 194 } 195 MinimalFuture<T> f = new MinimalFuture<>(); 196 try { 197 queue.addPing(message, attachment, action, f); 198 sendScheduler.runOrSchedule(); 199 } catch (IOException e) { 200 action.accept(null, e); 201 f.completeExceptionally(e); 202 } 203 if (debug.on()) { 204 debug.log("exit send ping %s returned %s", id, f); 205 } 206 return f; 207 } 208 209 @Override sendPong(ByteBuffer message, T attachment, BiConsumer<? super T, ? super Throwable> action)210 public <T> CompletableFuture<T> sendPong(ByteBuffer message, 211 T attachment, 212 BiConsumer<? super T, ? super Throwable> action) { 213 long id = 0; 214 if (debug.on()) { 215 id = counter.incrementAndGet(); 216 debug.log("enter send pong %s message.remaining=%s", 217 id, message.remaining()); 218 } 219 MinimalFuture<T> f = new MinimalFuture<>(); 220 try { 221 queue.addPong(message, attachment, action, f); 222 sendScheduler.runOrSchedule(); 223 } catch (IOException e) { 224 action.accept(null, e); 225 f.completeExceptionally(e); 226 } 227 if (debug.on()) { 228 debug.log("exit send pong %s returned %s", id, f); 229 } 230 return f; 231 } 232 233 @Override sendPong(Supplier<? extends ByteBuffer> message, T attachment, BiConsumer<? super T, ? super Throwable> action)234 public <T> CompletableFuture<T> sendPong(Supplier<? extends ByteBuffer> message, 235 T attachment, 236 BiConsumer<? super T, ? super Throwable> action) { 237 long id = 0; 238 if (debug.on()) { 239 id = counter.incrementAndGet(); 240 debug.log("enter send pong %s supplier=%s", 241 id, message); 242 } 243 MinimalFuture<T> f = new MinimalFuture<>(); 244 try { 245 queue.addPong(message, attachment, action, f); 246 sendScheduler.runOrSchedule(); 247 } catch (IOException e) { 248 action.accept(null, e); 249 f.completeExceptionally(e); 250 } 251 if (debug.on()) { 252 debug.log("exit send pong %s returned %s", id, f); 253 } 254 return f; 255 } 256 257 @Override sendClose(int statusCode, String reason, T attachment, BiConsumer<? super T, ? super Throwable> action)258 public <T> CompletableFuture<T> sendClose(int statusCode, 259 String reason, 260 T attachment, 261 BiConsumer<? super T, ? super Throwable> action) { 262 long id = 0; 263 if (debug.on()) { 264 id = counter.incrementAndGet(); 265 debug.log("enter send close %s statusCode=%s reason.length=%s", 266 id, statusCode, reason.length()); 267 } 268 MinimalFuture<T> f = new MinimalFuture<>(); 269 try { 270 queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f); 271 sendScheduler.runOrSchedule(); 272 } catch (IOException e) { 273 action.accept(null, e); 274 f.completeExceptionally(e); 275 } 276 if (debug.on()) { 277 debug.log("exit send close %s returned %s", id, f); 278 } 279 return f; 280 } 281 282 @Override request(long n)283 public void request(long n) { 284 if (debug.on()) { 285 debug.log("request %s", n); 286 } 287 if (demand.increase(n)) { 288 receiveScheduler.runOrSchedule(); 289 } 290 } 291 292 @Override acknowledgeReception()293 public void acknowledgeReception() { 294 boolean decremented = demand.tryDecrement(); 295 if (!decremented) { 296 throw new InternalError(); 297 } 298 } 299 300 @Override closeOutput()301 public void closeOutput() throws IOException { 302 if (debug.on()) { 303 debug.log("closeOutput"); 304 } 305 synchronized (closeLock) { 306 if (!outputClosed) { 307 outputClosed = true; 308 try { 309 channel.shutdownOutput(); 310 } finally { 311 if (inputClosed) { 312 channel.close(); 313 } 314 } 315 } 316 } 317 writeState.set(CLOSED); 318 sendScheduler.runOrSchedule(); 319 } 320 321 /* 322 * Permanently stops reading from the channel and delivering messages 323 * regardless of the current demand and data availability. 324 */ 325 @Override closeInput()326 public void closeInput() throws IOException { 327 if (debug.on()) { 328 debug.log("closeInput"); 329 } 330 synchronized (closeLock) { 331 if (!inputClosed) { 332 inputClosed = true; 333 try { 334 receiveScheduler.stop(); 335 channel.shutdownInput(); 336 } finally { 337 if (outputClosed) { 338 channel.close(); 339 } 340 } 341 } 342 } 343 } 344 345 /* Common states for send and receive tasks */ 346 enum ChannelState { 347 UNREGISTERED, 348 AVAILABLE, 349 WAITING, 350 CLOSED, 351 } 352 353 @SuppressWarnings({"rawtypes"}) 354 private class SendTask extends CompleteRestartableTask { 355 356 private final MessageQueue.QueueCallback<Boolean, IOException> 357 encodingCallback = new MessageQueue.QueueCallback<>() { 358 359 @Override 360 public <T> Boolean onText(CharBuffer message, 361 boolean isLast, 362 T attachment, 363 BiConsumer<? super T, ? super Throwable> action, 364 CompletableFuture<? super T> future) throws IOException 365 { 366 return encoder.encodeText(message, isLast, dst); 367 } 368 369 @Override 370 public <T> Boolean onBinary(ByteBuffer message, 371 boolean isLast, 372 T attachment, 373 BiConsumer<? super T, ? super Throwable> action, 374 CompletableFuture<? super T> future) throws IOException 375 { 376 return encoder.encodeBinary(message, isLast, dst); 377 } 378 379 @Override 380 public <T> Boolean onPing(ByteBuffer message, 381 T attachment, 382 BiConsumer<? super T, ? super Throwable> action, 383 CompletableFuture<? super T> future) throws IOException 384 { 385 return encoder.encodePing(message, dst); 386 } 387 388 @Override 389 public <T> Boolean onPong(ByteBuffer message, 390 T attachment, 391 BiConsumer<? super T, ? super Throwable> action, 392 CompletableFuture<? super T> future) throws IOException 393 { 394 return encoder.encodePong(message, dst); 395 } 396 397 @Override 398 public <T> Boolean onPong(Supplier<? extends ByteBuffer> message, 399 T attachment, 400 BiConsumer<? super T, ? super Throwable> action, 401 CompletableFuture<? super T> future) throws IOException { 402 return encoder.encodePong(message.get(), dst); 403 } 404 405 @Override 406 public <T> Boolean onClose(int statusCode, 407 CharBuffer reason, 408 T attachment, 409 BiConsumer<? super T, ? super Throwable> action, 410 CompletableFuture<? super T> future) throws IOException 411 { 412 return encoder.encodeClose(statusCode, reason, dst); 413 } 414 415 @Override 416 public Boolean onEmpty() { 417 return false; 418 } 419 }; 420 421 /* Whether the task sees the current head message for first time */ 422 private boolean firstPass = true; 423 /* Whether the message has been fully encoded */ 424 private boolean encoded; 425 426 // -- Current message completion communication fields -- 427 428 private Object attachment; 429 private BiConsumer action; 430 private CompletableFuture future; 431 private final MessageQueue.QueueCallback<Boolean, RuntimeException> 432 /* If there is a message, loads its completion communication fields */ 433 loadCallback = new MessageQueue.QueueCallback<Boolean, RuntimeException>() { 434 435 @Override 436 public <T> Boolean onText(CharBuffer message, 437 boolean isLast, 438 T attachment, 439 BiConsumer<? super T, ? super Throwable> action, 440 CompletableFuture<? super T> future) 441 { 442 SendTask.this.attachment = attachment; 443 SendTask.this.action = action; 444 SendTask.this.future = future; 445 return true; 446 } 447 448 @Override 449 public <T> Boolean onBinary(ByteBuffer message, 450 boolean isLast, 451 T attachment, 452 BiConsumer<? super T, ? super Throwable> action, 453 CompletableFuture<? super T> future) 454 { 455 SendTask.this.attachment = attachment; 456 SendTask.this.action = action; 457 SendTask.this.future = future; 458 return true; 459 } 460 461 @Override 462 public <T> Boolean onPing(ByteBuffer message, 463 T attachment, 464 BiConsumer<? super T, ? super Throwable> action, 465 CompletableFuture<? super T> future) 466 { 467 SendTask.this.attachment = attachment; 468 SendTask.this.action = action; 469 SendTask.this.future = future; 470 return true; 471 } 472 473 @Override 474 public <T> Boolean onPong(ByteBuffer message, 475 T attachment, 476 BiConsumer<? super T, ? super Throwable> action, 477 CompletableFuture<? super T> future) 478 { 479 SendTask.this.attachment = attachment; 480 SendTask.this.action = action; 481 SendTask.this.future = future; 482 return true; 483 } 484 485 @Override 486 public <T> Boolean onPong(Supplier<? extends ByteBuffer> message, 487 T attachment, 488 BiConsumer<? super T, ? super Throwable> action, 489 CompletableFuture<? super T> future) 490 { 491 SendTask.this.attachment = attachment; 492 SendTask.this.action = action; 493 SendTask.this.future = future; 494 return true; 495 } 496 497 @Override 498 public <T> Boolean onClose(int statusCode, 499 CharBuffer reason, 500 T attachment, 501 BiConsumer<? super T, ? super Throwable> action, 502 CompletableFuture<? super T> future) 503 { 504 SendTask.this.attachment = attachment; 505 SendTask.this.action = action; 506 SendTask.this.future = future; 507 return true; 508 } 509 510 @Override 511 public Boolean onEmpty() { 512 return false; 513 } 514 }; 515 516 @Override run()517 public void run() { 518 // Could have been only called in one of the following cases: 519 // (a) A message has been added to the queue 520 // (b) The channel is ready for writing 521 if (debug.on()) { 522 debug.log("enter send task"); 523 } 524 while (!queue.isEmpty()) { 525 try { 526 if (dst.hasRemaining()) { 527 if (debug.on()) { 528 debug.log("%s bytes remaining in buffer %s", 529 dst.remaining(), dst); 530 } 531 // The previous part of the binary representation of the 532 // message hasn't been fully written 533 if (!tryCompleteWrite()) { 534 break; 535 } 536 } else if (!encoded) { 537 if (firstPass) { 538 firstPass = false; 539 queue.peek(loadCallback); 540 if (debug.on()) { 541 debug.log("load message"); 542 } 543 } 544 dst.clear(); 545 encoded = queue.peek(encodingCallback); 546 dst.flip(); 547 if (!tryCompleteWrite()) { 548 break; 549 } 550 } else { 551 // All done, remove and complete 552 encoder.reset(); 553 removeAndComplete(null); 554 } 555 } catch (Throwable t) { 556 if (debug.on()) { 557 debug.log("send task exception %s", (Object) t); 558 } 559 // buffer cleanup: if there is an exception, the buffer 560 // should appear empty for the next write as there is 561 // nothing to write 562 dst.position(dst.limit()); 563 encoder.reset(); 564 removeAndComplete(t); 565 } 566 } 567 if (debug.on()) { 568 debug.log("exit send task"); 569 } 570 } 571 tryCompleteWrite()572 private boolean tryCompleteWrite() throws IOException { 573 if (debug.on()) { 574 debug.log("enter writing"); 575 } 576 boolean finished = false; 577 loop: 578 while (true) { 579 final ChannelState ws = writeState.get(); 580 if (debug.on()) { 581 debug.log("write state: %s", ws); 582 } 583 switch (ws) { 584 case WAITING: 585 break loop; 586 case UNREGISTERED: 587 if (debug.on()) { 588 debug.log("registering write event"); 589 } 590 channel.registerEvent(writeEvent); 591 writeState.compareAndSet(UNREGISTERED, WAITING); 592 if (debug.on()) { 593 debug.log("registered write event"); 594 } 595 break loop; 596 case AVAILABLE: 597 boolean written = write(); 598 if (written) { 599 if (debug.on()) { 600 debug.log("finished writing to the channel"); 601 } 602 finished = true; 603 break loop; // All done 604 } else { 605 writeState.compareAndSet(AVAILABLE, UNREGISTERED); 606 continue loop; // Effectively "goto UNREGISTERED" 607 } 608 case CLOSED: 609 throw new IOException("Output closed"); 610 default: 611 throw new InternalError(String.valueOf(ws)); 612 } 613 } 614 if (debug.on()) { 615 debug.log("exit writing"); 616 } 617 return finished; 618 } 619 620 @SuppressWarnings("unchecked") removeAndComplete(Throwable error)621 private void removeAndComplete(Throwable error) { 622 if (debug.on()) { 623 debug.log("removeAndComplete error=%s", (Object) error); 624 } 625 queue.remove(); 626 if (error != null) { 627 try { 628 action.accept(null, error); 629 } finally { 630 future.completeExceptionally(error); 631 } 632 } else { 633 try { 634 action.accept(attachment, null); 635 } finally { 636 future.complete(attachment); 637 } 638 } 639 encoded = false; 640 firstPass = true; 641 attachment = null; 642 action = null; 643 future = null; 644 } 645 } 646 647 private class ReceiveTask extends CompleteRestartableTask { 648 649 @Override run()650 public void run() { 651 if (debug.on()) { 652 debug.log("enter receive task"); 653 } 654 loop: 655 while (!receiveScheduler.isStopped()) { 656 ChannelState rs = readState; 657 if (data.hasRemaining()) { 658 if (debug.on()) { 659 debug.log("remaining bytes received %s", 660 data.remaining()); 661 } 662 if (!demand.isFulfilled()) { 663 try { 664 int oldPos = data.position(); 665 reader.readFrame(data, decoder); 666 int newPos = data.position(); 667 // Reader always consumes bytes: 668 assert oldPos != newPos : data; 669 } catch (Throwable e) { 670 receiveScheduler.stop(); 671 messageConsumer.onError(e); 672 } 673 if (!data.hasRemaining()) { 674 rs = readState = UNREGISTERED; 675 } 676 continue; 677 } 678 break loop; 679 } 680 if (debug.on()) { 681 debug.log("receive state: %s", rs); 682 } 683 switch (rs) { 684 case WAITING: 685 break loop; 686 case UNREGISTERED: 687 try { 688 rs = readState = WAITING; 689 channel.registerEvent(readEvent); 690 } catch (Throwable e) { 691 receiveScheduler.stop(); 692 messageConsumer.onError(e); 693 } 694 break loop; 695 case AVAILABLE: 696 try { 697 data = channel.read(); 698 } catch (Throwable e) { 699 receiveScheduler.stop(); 700 messageConsumer.onError(e); 701 break loop; 702 } 703 if (data == null) { // EOF 704 receiveScheduler.stop(); 705 messageConsumer.onComplete(); 706 break loop; 707 } else if (!data.hasRemaining()) { 708 // No data at the moment. Pretty much a "goto", 709 // reusing the existing code path for registration 710 rs = readState = UNREGISTERED; 711 } 712 continue loop; 713 default: 714 throw new InternalError(String.valueOf(rs)); 715 } 716 } 717 if (debug.on()) { 718 debug.log("exit receive task"); 719 } 720 } 721 } 722 723 private class WriteEvent implements RawChannel.RawEvent { 724 725 @Override interestOps()726 public int interestOps() { 727 return SelectionKey.OP_WRITE; 728 } 729 730 @Override handle()731 public void handle() { 732 if (debug.on()) { 733 debug.log("write event"); 734 } 735 ChannelState s; 736 do { 737 s = writeState.get(); 738 if (s == CLOSED) { 739 if (debug.on()) { 740 debug.log("write state %s", s); 741 } 742 break; 743 } 744 } while (!writeState.compareAndSet(s, AVAILABLE)); 745 sendScheduler.runOrSchedule(); 746 } 747 } 748 749 private class ReadEvent implements RawChannel.RawEvent { 750 751 @Override interestOps()752 public int interestOps() { 753 return SelectionKey.OP_READ; 754 } 755 756 @Override handle()757 public void handle() { 758 if (debug.on()) { 759 debug.log("read event"); 760 } 761 readState = AVAILABLE; 762 receiveScheduler.runOrSchedule(); 763 } 764 } 765 } 766