1 /* 2 * Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.common; 27 28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction; 29 30 import javax.net.ssl.SSLEngine; 31 import javax.net.ssl.SSLEngineResult; 32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 33 import javax.net.ssl.SSLEngineResult.Status; 34 import javax.net.ssl.SSLException; 35 import javax.net.ssl.SSLHandshakeException; 36 import java.io.IOException; 37 import java.lang.ref.Reference; 38 import java.lang.ref.ReferenceQueue; 39 import java.lang.ref.WeakReference; 40 import java.nio.ByteBuffer; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.LinkedList; 45 import java.util.List; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.Executor; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.Flow.Subscriber; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.function.Consumer; 53 import java.util.function.IntBinaryOperator; 54 55 /** 56 * Implements SSL using two SubscriberWrappers. 57 * 58 * <p> Constructor takes two Flow.Subscribers: one that receives the network 59 * data (after it has been encrypted by SSLFlowDelegate) data, and one that 60 * receives the application data (before it has been encrypted by SSLFlowDelegate). 61 * 62 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding 63 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. 64 * See diagram below. 65 * 66 * <p> How Flow.Subscribers are used in this class, and where they come from: 67 * <pre> 68 * {@code 69 * 70 * 71 * 72 * ---------> data flow direction 73 * 74 * 75 * +------------------+ 76 * upstreamWriter | | downWriter 77 * ---------------> | | ------------> 78 * obtained from this | | supplied to constructor 79 * | SSLFlowDelegate | 80 * downReader | | upstreamReader 81 * <--------------- | | <-------------- 82 * supplied to constructor | | obtained from this 83 * +------------------+ 84 * 85 * Errors are reported to the downReader Flow.Subscriber 86 * 87 * } 88 * </pre> 89 */ 90 public class SSLFlowDelegate { 91 92 final Logger debug = 93 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 94 95 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; 96 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); 97 // When handshake is in progress trying to wrap may produce no bytes. 98 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); 99 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); 100 private static final boolean isMonitored = 101 monProp != null && (monProp.isEmpty() || monProp.equalsIgnoreCase("true")); 102 103 final Executor exec; 104 final Reader reader; 105 final Writer writer; 106 final SSLEngine engine; 107 final String tubeName; // hack 108 final CompletableFuture<String> alpnCF; // completes on initial handshake 109 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped 110 volatile boolean close_notify_received; 111 final CompletableFuture<Void> readerCF; 112 final CompletableFuture<Void> writerCF; 113 final CompletableFuture<Void> stopCF; 114 final Consumer<ByteBuffer> recycler; 115 static AtomicInteger scount = new AtomicInteger(1); 116 final int id; 117 118 /** 119 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 120 * Flow.Subscriber requires an associated {@link CompletableFuture} 121 * for errors that need to be signaled from downstream to upstream. 122 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123 public SSLFlowDelegate(SSLEngine engine, 124 Executor exec, 125 Subscriber<? super List<ByteBuffer>> downReader, 126 Subscriber<? super List<ByteBuffer>> downWriter) 127 { 128 this(engine, exec, null, downReader, downWriter); 129 } 130 131 /** 132 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 133 * Flow.Subscriber requires an associated {@link CompletableFuture} 134 * for errors that need to be signaled from downstream to upstream. 135 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136 public SSLFlowDelegate(SSLEngine engine, 137 Executor exec, 138 Consumer<ByteBuffer> recycler, 139 Subscriber<? super List<ByteBuffer>> downReader, 140 Subscriber<? super List<ByteBuffer>> downWriter) 141 { 142 this.id = scount.getAndIncrement(); 143 this.tubeName = String.valueOf(downWriter); 144 this.recycler = recycler; 145 this.reader = new Reader(); 146 this.writer = new Writer(); 147 this.engine = engine; 148 this.exec = exec; 149 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); 150 this.readerCF = reader.completion(); 151 this.writerCF = reader.completion(); 152 readerCF.exceptionally(this::stopOnError); 153 writerCF.exceptionally(this::stopOnError); 154 this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion()) 155 .thenRun(this::normalStop); 156 this.alpnCF = new MinimalFuture<>(); 157 158 // connect the Reader to the downReader and the 159 // Writer to the downWriter. 160 connect(downReader, downWriter); 161 162 if (isMonitored) Monitor.add(monitor); 163 } 164 165 /** 166 * Returns true if the SSLFlowDelegate has detected a TLS 167 * close_notify from the server. 168 * @return true, if a close_notify was detected. 169 */ closeNotifyReceived()170 public boolean closeNotifyReceived() { 171 return close_notify_received; 172 } 173 174 /** 175 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, 176 * and the write sink (downWriter) to the SSLFlowDelegate Writer. 177 * Called from within the constructor. Overwritten by SSLTube. 178 * 179 * @param downReader The left hand side read sink (typically, the 180 * HttpConnection read subscriber). 181 * @param downWriter The right hand side write sink (typically 182 * the SocketTube write subscriber). 183 */ connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184 void connect(Subscriber<? super List<ByteBuffer>> downReader, 185 Subscriber<? super List<ByteBuffer>> downWriter) { 186 this.reader.subscribe(downReader); 187 this.writer.subscribe(downWriter); 188 } 189 190 /** 191 * Returns a CompletableFuture<String> which completes after 192 * the initial handshake completes, and which contains the negotiated 193 * alpn. 194 */ alpn()195 public CompletableFuture<String> alpn() { 196 return alpnCF; 197 } 198 setALPN()199 private void setALPN() { 200 // Handshake is finished. So, can retrieve the ALPN now 201 if (alpnCF.isDone()) 202 return; 203 String alpn = engine.getApplicationProtocol(); 204 if (debug.on()) debug.log("setALPN = %s", alpn); 205 alpnCF.complete(alpn); 206 } 207 monitor()208 public String monitor() { 209 StringBuilder sb = new StringBuilder(); 210 sb.append("SSL: id ").append(id); 211 sb.append(" ").append(dbgString()); 212 sb.append(" HS state: " + states(handshakeState)); 213 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); 214 if (stateList != null) { 215 sb.append(" LL : "); 216 for (String s : stateList) { 217 sb.append(s).append(" "); 218 } 219 } 220 sb.append("\r\n"); 221 sb.append("Reader:: ").append(reader.toString()); 222 sb.append("\r\n"); 223 sb.append("Writer:: ").append(writer.toString()); 224 sb.append("\r\n==================================="); 225 return sb.toString(); 226 } 227 enterReadScheduling()228 protected SchedulingAction enterReadScheduling() { 229 return SchedulingAction.CONTINUE; 230 } 231 checkForHandshake(Throwable t)232 protected Throwable checkForHandshake(Throwable t) { 233 return t; 234 } 235 236 237 /** 238 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). 239 * Any decrypted buffers returned to be passed downstream. 240 * Status codes: 241 * NEED_UNWRAP: do nothing. Following incoming data will contain 242 * any required handshake data 243 * NEED_WRAP: call writer.addData() with empty buffer 244 * NEED_TASK: delegate task to executor 245 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap 246 * BUFFER_UNDERFLOW: keep buffer and wait for more data 247 * OK: return generated buffers. 248 * 249 * Upstream subscription strategy is to try and keep no more than 250 * TARGET_BUFSIZE bytes in readBuf 251 */ 252 final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { 253 // Maximum record size is 16k. 254 // Because SocketTube can feeds us up to 3 16K buffers, 255 // then setting this size to 16K means that the readBuf 256 // can store up to 64K-1 (16K-1 + 3*16K) 257 static final int TARGET_BUFSIZE = 16 * 1024; 258 259 final SequentialScheduler scheduler; 260 volatile ByteBuffer readBuf; 261 volatile boolean completing; 262 final Object readBufferLock = new Object(); 263 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 264 265 private final class ReaderDownstreamPusher implements Runnable { 266 @Override run()267 public void run() { 268 processData(); 269 } 270 } 271 Reader()272 Reader() { 273 super(); 274 scheduler = SequentialScheduler.synchronizedScheduler( 275 new ReaderDownstreamPusher()); 276 this.readBuf = ByteBuffer.allocate(1024); 277 readBuf.limit(0); // keep in read mode 278 } 279 280 @Override supportsRecycling()281 public boolean supportsRecycling() { 282 return recycler != null; 283 } 284 enterScheduling()285 protected SchedulingAction enterScheduling() { 286 return enterReadScheduling(); 287 } 288 dbgString()289 public final String dbgString() { 290 return "SSL Reader(" + tubeName + ")"; 291 } 292 293 /** 294 * entry point for buffers delivered from upstream Subscriber 295 */ 296 @Override incoming(List<ByteBuffer> buffers, boolean complete)297 public void incoming(List<ByteBuffer> buffers, boolean complete) { 298 if (debugr.on()) 299 debugr.log("Adding %d bytes to read buffer", 300 Utils.remaining(buffers)); 301 addToReadBuf(buffers, complete); 302 scheduler.runOrSchedule(exec); 303 } 304 305 @Override toString()306 public String toString() { 307 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() 308 + ", count: " + count.toString() + ", scheduler: " 309 + (scheduler.isStopped() ? "stopped" : "running") 310 + ", status: " + lastUnwrapStatus 311 + ", handshakeState: " + handshakeState.get() 312 + ", engine: " + engine.getHandshakeStatus(); 313 } 314 reallocReadBuf()315 private void reallocReadBuf() { 316 int sz = readBuf.capacity(); 317 ByteBuffer newb = ByteBuffer.allocate(sz * 2); 318 readBuf.flip(); 319 Utils.copy(readBuf, newb); 320 readBuf = newb; 321 } 322 323 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)324 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 325 if (needsMoreData()) { 326 // run the scheduler to see if more data should be requested 327 if (debugr.on()) { 328 int remaining = readBuf.remaining(); 329 if (remaining > TARGET_BUFSIZE) { 330 // just some logging to check how much we have in the read buffer 331 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", 332 remaining); 333 } 334 } 335 scheduler.runOrSchedule(); 336 } 337 return 0; // we will request more from the scheduler loop (processData). 338 } 339 340 // readBuf is kept ready for reading outside of this method addToReadBuf(List<ByteBuffer> buffers, boolean complete)341 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { 342 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); 343 synchronized (readBufferLock) { 344 for (ByteBuffer buf : buffers) { 345 readBuf.compact(); 346 while (readBuf.remaining() < buf.remaining()) 347 reallocReadBuf(); 348 readBuf.put(buf); 349 readBuf.flip(); 350 // should be safe to call inside lock 351 // since the only implementation 352 // offers the buffer to an unbounded queue. 353 // WARNING: do not touch buf after this point! 354 if (recycler != null) recycler.accept(buf); 355 } 356 if (complete) { 357 this.completing = complete; 358 minBytesRequired = 0; 359 } 360 } 361 } 362 363 @Override errorCommon(Throwable throwable)364 protected boolean errorCommon(Throwable throwable) { 365 throwable = SSLFlowDelegate.this.checkForHandshake(throwable); 366 return super.errorCommon(throwable); 367 } 368 schedule()369 void schedule() { 370 scheduler.runOrSchedule(exec); 371 } 372 stop()373 void stop() { 374 if (debugr.on()) debugr.log("stop"); 375 scheduler.stop(); 376 } 377 378 AtomicInteger count = new AtomicInteger(); 379 380 // minimum number of bytes required to call unwrap. 381 // Usually this is 0, unless there was a buffer underflow. 382 // In this case we need to wait for more bytes than what 383 // we had before calling unwrap() again. 384 volatile int minBytesRequired; 385 386 // We might need to request more data if: 387 // - we have a subscription from upstream 388 // - and we don't have enough data to decrypt in the read buffer 389 // - *and* - either we're handshaking, and more data is required (NEED_UNWRAP), 390 // - or we have demand from downstream, but we have nothing decrypted 391 // to forward downstream. needsMoreData()392 boolean needsMoreData() { 393 if (upstreamSubscription != null && readBuf.remaining() <= minBytesRequired && 394 (engine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP 395 || !downstreamSubscription.demand.isFulfilled() && hasNoOutputData())) { 396 return true; 397 } 398 return false; 399 } 400 401 // If the readBuf has not enough data, and we either need to 402 // unwrap (handshaking) or we have demand from downstream, 403 // then request more data requestMoreDataIfNeeded()404 void requestMoreDataIfNeeded() { 405 if (needsMoreData()) { 406 // request more will only request more if our 407 // demand from upstream is fulfilled 408 requestMore(); 409 } 410 } 411 412 // work function where it all happens processData()413 final void processData() { 414 try { 415 if (debugr.on()) 416 debugr.log("processData:" 417 + " readBuf remaining:" + readBuf.remaining() 418 + ", state:" + states(handshakeState) 419 + ", engine handshake status:" + engine.getHandshakeStatus()); 420 int len; 421 boolean complete = false; 422 while (readBuf.remaining() > (len = minBytesRequired)) { 423 boolean handshaking = false; 424 try { 425 EngineResult result; 426 synchronized (readBufferLock) { 427 complete = this.completing; 428 if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); 429 // Unless there is a BUFFER_UNDERFLOW, we should try to 430 // unwrap any number of bytes. Set minBytesRequired to 0: 431 // we only need to do that if minBytesRequired is not already 0. 432 len = len > 0 ? minBytesRequired = 0 : len; 433 result = unwrapBuffer(readBuf); 434 len = readBuf.remaining(); 435 if (debugr.on()) { 436 debugr.log("Unwrapped: result: %s", result.result); 437 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); 438 } 439 } 440 if (result.bytesProduced() > 0) { 441 if (debugr.on()) 442 debugr.log("sending %d", result.bytesProduced()); 443 count.addAndGet(result.bytesProduced()); 444 outgoing(result.destBuffer, false); 445 } 446 if (result.status() == Status.BUFFER_UNDERFLOW) { 447 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); 448 // not enough data in the read buffer... 449 // no need to try to unwrap again unless we get more bytes 450 // than minBytesRequired = len in the read buffer. 451 synchronized (readBufferLock) { 452 minBytesRequired = len; 453 // more bytes could already have been added... 454 assert readBuf.remaining() >= len; 455 // check if we have received some data, and if so 456 // we can just re-spin the loop 457 if (readBuf.remaining() > len) continue; 458 else if (this.completing) { 459 if (debug.on()) { 460 debugr.log("BUFFER_UNDERFLOW with EOF," + 461 " %d bytes non decrypted.", len); 462 } 463 // The channel won't send us any more data, and 464 // we are in underflow: we need to fail. 465 throw new IOException("BUFFER_UNDERFLOW with EOF, " 466 + len + " bytes non decrypted."); 467 } 468 } 469 // request more data and return. 470 requestMore(); 471 return; 472 } 473 if (complete && result.status() == Status.CLOSED) { 474 if (debugr.on()) debugr.log("Closed: completing"); 475 outgoing(Utils.EMPTY_BB_LIST, true); 476 // complete ALPN if not yet completed 477 setALPN(); 478 requestMoreDataIfNeeded(); 479 return; 480 } 481 if (result.handshaking()) { 482 handshaking = true; 483 if (debugr.on()) debugr.log("handshaking"); 484 if (doHandshake(result, READER)) continue; // need unwrap 485 else break; // doHandshake will have triggered the write scheduler if necessary 486 } else { 487 if (trySetALPN()) { 488 resumeActivity(); 489 } 490 } 491 } catch (IOException ex) { 492 Throwable cause = checkForHandshake(ex); 493 errorCommon(cause); 494 handleError(cause); 495 return; 496 } 497 if (handshaking && !complete) { 498 requestMoreDataIfNeeded(); 499 return; 500 } 501 } 502 if (!complete) { 503 synchronized (readBufferLock) { 504 complete = this.completing && !readBuf.hasRemaining(); 505 } 506 } 507 if (complete) { 508 if (debugr.on()) debugr.log("completing"); 509 // Complete the alpnCF, if not already complete, regardless of 510 // whether or not the ALPN is available, there will be no more 511 // activity. 512 setALPN(); 513 outgoing(Utils.EMPTY_BB_LIST, true); 514 } else { 515 requestMoreDataIfNeeded(); 516 } 517 } catch (Throwable ex) { 518 ex = checkForHandshake(ex); 519 errorCommon(ex); 520 handleError(ex); 521 } 522 } 523 524 private volatile Status lastUnwrapStatus; unwrapBuffer(ByteBuffer src)525 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { 526 ByteBuffer dst = getAppBuffer(); 527 int len = src.remaining(); 528 while (true) { 529 SSLEngineResult sslResult = engine.unwrap(src, dst); 530 switch (lastUnwrapStatus = sslResult.getStatus()) { 531 case BUFFER_OVERFLOW: 532 // may happen if app size buffer was changed, or if 533 // our 'adaptiveBufferSize' guess was too small for 534 // the current payload. In that case, update the 535 // value of applicationBufferSize, and allocate a 536 // buffer of that size, which we are sure will be 537 // big enough to decode whatever needs to be 538 // decoded. We will later update adaptiveBufferSize 539 // in OK: below. 540 int appSize = applicationBufferSize = 541 engine.getSession().getApplicationBufferSize(); 542 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 543 dst.flip(); 544 b.put(dst); 545 dst = b; 546 break; 547 case CLOSED: 548 assert dst.position() == 0; 549 return doClosure(new EngineResult(sslResult)); 550 case BUFFER_UNDERFLOW: 551 // handled implicitly by compaction/reallocation of readBuf 552 assert dst.position() == 0; 553 return new EngineResult(sslResult); 554 case OK: 555 int size = dst.position(); 556 if (debug.on()) { 557 debugr.log("Decoded " + size + " bytes out of " + len 558 + " into buffer of " + dst.capacity() 559 + " remaining to decode: " + src.remaining()); 560 } 561 // if the record payload was bigger than what was originally 562 // allocated, then sets the adaptiveAppBufferSize to size 563 // and we will use that new size as a guess for the next app 564 // buffer. 565 if (size > adaptiveAppBufferSize) { 566 adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; 567 } 568 dst.flip(); 569 return new EngineResult(sslResult, dst); 570 } 571 } 572 } 573 } 574 575 public interface Monitorable { getInfo()576 public String getInfo(); 577 } 578 579 public static class Monitor extends Thread { 580 final List<WeakReference<Monitorable>> list; 581 final List<FinalMonitorable> finalList; 582 final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>(); 583 static Monitor themon; 584 585 static { 586 themon = new Monitor(); themon.start()587 themon.start(); // uncomment to enable Monitor 588 } 589 590 // An instance used to temporarily store the 591 // last observable state of a monitorable object. 592 // When Monitor.remove(o) is called, we replace 593 // 'o' with a FinalMonitorable whose reference 594 // will be enqueued after the last observable state 595 // has been printed. 596 final class FinalMonitorable implements Monitorable { 597 final String finalState; FinalMonitorable(Monitorable o)598 FinalMonitorable(Monitorable o) { 599 finalState = o.getInfo(); 600 finalList.add(this); 601 } 602 @Override getInfo()603 public String getInfo() { 604 finalList.remove(this); 605 return finalState; 606 } 607 } 608 Monitor()609 Monitor() { 610 super("Monitor"); 611 setDaemon(true); 612 list = Collections.synchronizedList(new LinkedList<>()); 613 finalList = new ArrayList<>(); // access is synchronized on list above 614 } 615 addTarget(Monitorable o)616 void addTarget(Monitorable o) { 617 list.add(new WeakReference<>(o, queue)); 618 } removeTarget(Monitorable o)619 void removeTarget(Monitorable o) { 620 // It can take a long time for GC to clean up references. 621 // Calling Monitor.remove() early helps removing noise from the 622 // logs/ 623 synchronized (list) { 624 Iterator<WeakReference<Monitorable>> it = list.iterator(); 625 while (it.hasNext()) { 626 Monitorable m = it.next().get(); 627 if (m == null) it.remove(); 628 if (o == m) { 629 it.remove(); 630 break; 631 } 632 } 633 FinalMonitorable m = new FinalMonitorable(o); 634 addTarget(m); 635 Reference.reachabilityFence(m); 636 } 637 } 638 add(Monitorable o)639 public static void add(Monitorable o) { 640 themon.addTarget(o); 641 } remove(Monitorable o)642 public static void remove(Monitorable o) { 643 themon.removeTarget(o); 644 } 645 646 @Override run()647 public void run() { 648 System.out.println("Monitor starting"); 649 try { 650 while (true) { 651 Thread.sleep(20 * 1000); 652 synchronized (list) { 653 Reference<? extends Monitorable> expired; 654 while ((expired = queue.poll()) != null) list.remove(expired); 655 for (WeakReference<Monitorable> ref : list) { 656 Monitorable o = ref.get(); 657 if (o == null) continue; 658 if (o instanceof FinalMonitorable) { 659 ref.enqueue(); 660 } 661 System.out.println(o.getInfo()); 662 System.out.println("-------------------------"); 663 } 664 } 665 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); 666 } 667 } catch (InterruptedException e) { 668 System.out.println("Monitor exiting with " + e); 669 } 670 } 671 } 672 673 /** 674 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() 675 * Any encrypted buffers generated are passed downstream to be written. 676 * Status codes: 677 * NEED_UNWRAP: call reader.addData() with empty buffer 678 * NEED_WRAP: call addData() with empty buffer 679 * NEED_TASK: delegate task to executor 680 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap 681 * BUFFER_UNDERFLOW: shouldn't happen on writing side 682 * OK: return generated buffers 683 */ 684 class Writer extends SubscriberWrapper { 685 final SequentialScheduler scheduler; 686 // queues of buffers received from upstream waiting 687 // to be processed by the SSLEngine 688 final List<ByteBuffer> writeList; 689 final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 690 volatile boolean completing; 691 boolean completed; // only accessed in processData 692 693 class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { run()694 @Override public void run() { processData(); } 695 } 696 Writer()697 Writer() { 698 super(); 699 writeList = Collections.synchronizedList(new LinkedList<>()); 700 scheduler = new SequentialScheduler(new WriterDownstreamPusher()); 701 } 702 703 @Override incoming(List<ByteBuffer> buffers, boolean complete)704 protected void incoming(List<ByteBuffer> buffers, boolean complete) { 705 assert complete ? buffers == Utils.EMPTY_BB_LIST : true; 706 assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; 707 if (complete) { 708 if (debugw.on()) debugw.log("adding SENTINEL"); 709 completing = true; 710 writeList.add(SENTINEL); 711 } else { 712 writeList.addAll(buffers); 713 } 714 if (debugw.on()) 715 debugw.log("added " + buffers.size() 716 + " (" + Utils.remaining(buffers) 717 + " bytes) to the writeList"); 718 scheduler.runOrSchedule(); 719 } 720 dbgString()721 public final String dbgString() { 722 return "SSL Writer(" + tubeName + ")"; 723 } 724 onSubscribe()725 protected void onSubscribe() { 726 if (debugw.on()) debugw.log("onSubscribe initiating handshaking"); 727 addData(HS_TRIGGER); // initiates handshaking 728 } 729 schedule()730 void schedule() { 731 scheduler.runOrSchedule(); 732 } 733 stop()734 void stop() { 735 if (debugw.on()) debugw.log("stop"); 736 scheduler.stop(); 737 } 738 739 @Override closing()740 public boolean closing() { 741 return closeNotifyReceived(); 742 } 743 isCompleting()744 private boolean isCompleting() { 745 return completing; 746 } 747 748 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)749 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 750 if (writeList.size() > 10) 751 return 0; 752 else 753 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 754 } 755 hsTriggered()756 private boolean hsTriggered() { 757 synchronized(writeList) { 758 for (ByteBuffer b : writeList) 759 if (b == HS_TRIGGER) 760 return true; 761 return false; 762 } 763 } 764 triggerWrite()765 void triggerWrite() { 766 synchronized (writeList) { 767 if (writeList.isEmpty()) { 768 writeList.add(HS_TRIGGER); 769 } 770 } 771 scheduler.runOrSchedule(); 772 } 773 processData()774 private void processData() { 775 boolean completing = isCompleting(); 776 777 try { 778 if (debugw.on()) 779 debugw.log("processData, writeList remaining:" 780 + Utils.remaining(writeList) + ", hsTriggered:" 781 + hsTriggered() + ", needWrap:" + needWrap()); 782 783 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { 784 ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); 785 EngineResult result = wrapBuffers(outbufs); 786 if (debugw.on()) 787 debugw.log("wrapBuffer returned %s", result.result); 788 789 if (result.status() == Status.CLOSED) { 790 if (!upstreamCompleted) { 791 upstreamCompleted = true; 792 upstreamSubscription.cancel(); 793 // complete ALPN if not yet completed 794 setALPN(); 795 } 796 if (result.bytesProduced() <= 0) 797 return; 798 799 if (!completing && !completed) { 800 completing = this.completing = true; 801 // There could still be some outgoing data in outbufs. 802 writeList.add(SENTINEL); 803 } 804 } 805 806 boolean handshaking = false; 807 if (result.handshaking()) { 808 if (debugw.on()) debugw.log("handshaking"); 809 doHandshake(result, WRITER); // ok to ignore return 810 handshaking = true; 811 } else { 812 if (trySetALPN()) { 813 resumeActivity(); 814 } 815 } 816 cleanList(writeList); // tidy up the source list 817 sendResultBytes(result); 818 if (handshaking) { 819 if (!completing && needWrap()) { 820 continue; 821 } else { 822 return; 823 } 824 } 825 } 826 if (completing && Utils.remaining(writeList) == 0) { 827 if (!completed) { 828 completed = true; 829 writeList.clear(); 830 outgoing(Utils.EMPTY_BB_LIST, true); 831 } 832 return; 833 } 834 if (writeList.isEmpty() && needWrap()) { 835 writer.addData(HS_TRIGGER); 836 } 837 } catch (Throwable ex) { 838 ex = checkForHandshake(ex); 839 errorCommon(ex); 840 handleError(ex); 841 } 842 } 843 844 // The SSLEngine insists on being given a buffer that is at least 845 // SSLSession.getPacketBufferSize() long (usually 16K). If given 846 // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only 847 // has 6 bytes to wrap. Typical usage shows that for GET we 848 // usually produce an average of ~ 100 bytes. 849 // To avoid wasting space, and because allocating and zeroing 850 // 16K buffers for encoding 6 bytes is costly, we are reusing the 851 // same writeBuffer to interact with SSLEngine.wrap(). 852 // If the SSLEngine produces less than writeBuffer.capacity() / 2, 853 // then we copy off the bytes to a smaller buffer that we send 854 // downstream. Otherwise, we send the writeBuffer downstream 855 // and will allocate a new one next time. 856 volatile ByteBuffer writeBuffer; 857 private volatile Status lastWrappedStatus; 858 @SuppressWarnings("fallthrough") wrapBuffers(ByteBuffer[] src)859 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 860 long len = Utils.remaining(src); 861 if (debugw.on()) 862 debugw.log("wrapping " + len + " bytes"); 863 864 ByteBuffer dst = writeBuffer; 865 if (dst == null) dst = writeBuffer = getNetBuffer(); 866 assert dst.position() == 0 : "buffer position is " + dst.position(); 867 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); 868 869 while (true) { 870 SSLEngineResult sslResult = engine.wrap(src, dst); 871 if (debugw.on()) debugw.log("SSLResult: " + sslResult); 872 switch (lastWrappedStatus = sslResult.getStatus()) { 873 case BUFFER_OVERFLOW: 874 // Shouldn't happen. We allocated buffer with packet size 875 // get it again if net buffer size was changed 876 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); 877 int netSize = packetBufferSize 878 = engine.getSession().getPacketBufferSize(); 879 ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); 880 dst.flip(); 881 b.put(dst); 882 dst = b; 883 break; // try again 884 case CLOSED: 885 if (debugw.on()) debugw.log("CLOSED"); 886 // fallthrough. There could be some remaining data in dst. 887 // CLOSED will be handled by the caller. 888 case OK: 889 final ByteBuffer dest; 890 if (dst.position() == 0) { 891 dest = NOTHING; // can happen if handshake is in progress 892 } else if (dst.position() < dst.capacity() / 2) { 893 // less than half the buffer was used. 894 // copy off the bytes to a smaller buffer, and keep 895 // the writeBuffer for next time. 896 dst.flip(); 897 dest = Utils.copyAligned(dst); 898 dst.clear(); 899 } else { 900 // more than half the buffer was used. 901 // just send that buffer downstream, and we will 902 // get a new writeBuffer next time it is needed. 903 dst.flip(); 904 dest = dst; 905 writeBuffer = null; 906 } 907 if (debugw.on()) 908 debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", 909 dest.remaining(), dest.capacity(), Utils.remaining(src)); 910 return new EngineResult(sslResult, dest); 911 case BUFFER_UNDERFLOW: 912 // Shouldn't happen. Doesn't returns when wrap() 913 // underflow handled externally 914 // assert false : "Buffer Underflow"; 915 if (debug.on()) debug.log("BUFFER_UNDERFLOW"); 916 return new EngineResult(sslResult); 917 default: 918 if (debugw.on()) 919 debugw.log("result: %s", sslResult.getStatus()); 920 assert false : "result:" + sslResult.getStatus(); 921 } 922 } 923 } 924 needWrap()925 private boolean needWrap() { 926 return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; 927 } 928 sendResultBytes(EngineResult result)929 private void sendResultBytes(EngineResult result) { 930 if (result.bytesProduced() > 0) { 931 if (debugw.on()) 932 debugw.log("Sending %d bytes downstream", 933 result.bytesProduced()); 934 outgoing(result.destBuffer, false); 935 } 936 } 937 938 @Override toString()939 public String toString() { 940 return "WRITER: " + super.toString() 941 + ", writeList size: " + Integer.toString(writeList.size()) 942 + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") 943 + ", status: " + lastWrappedStatus; 944 //" writeList: " + writeList.toString(); 945 } 946 } 947 handleError(Throwable t)948 private void handleError(Throwable t) { 949 if (debug.on()) debug.log("handleError", t); 950 readerCF.completeExceptionally(t); 951 writerCF.completeExceptionally(t); 952 // no-op if already completed 953 alpnCF.completeExceptionally(t); 954 reader.stop(); 955 writer.stop(); 956 } 957 958 boolean stopped; 959 normalStop()960 private synchronized void normalStop() { 961 if (stopped) 962 return; 963 stopped = true; 964 reader.stop(); 965 writer.stop(); 966 // make sure the alpnCF is completed. 967 if (!alpnCF.isDone()) { 968 Throwable alpn = new SSLHandshakeException( 969 "Connection closed before successful ALPN negotiation"); 970 alpnCF.completeExceptionally(alpn); 971 } 972 if (isMonitored) Monitor.remove(monitor); 973 } 974 stopOnError(Throwable error)975 private Void stopOnError(Throwable error) { 976 // maybe log, etc 977 // ensure the ALPN is completed 978 // We could also do this in SSLTube.SSLSubscriberWrapper 979 // onError/onComplete - with the caveat that the ALP CF 980 // would get completed externally. Doing it here keeps 981 // it all inside SSLFlowDelegate. 982 if (!alpnCF.isDone()) { 983 alpnCF.completeExceptionally(error); 984 } 985 normalStop(); 986 return null; 987 } 988 cleanList(List<ByteBuffer> l)989 private void cleanList(List<ByteBuffer> l) { 990 synchronized (l) { 991 Iterator<ByteBuffer> iter = l.iterator(); 992 while (iter.hasNext()) { 993 ByteBuffer b = iter.next(); 994 if (!b.hasRemaining() && b != SENTINEL) { 995 iter.remove(); 996 } 997 } 998 } 999 } 1000 1001 /** 1002 * States for handshake. We avoid races when accessing/updating the AtomicInt 1003 * because updates always schedule an additional call to both the read() 1004 * and write() functions. 1005 */ 1006 private static final int NOT_HANDSHAKING = 0; 1007 private static final int HANDSHAKING = 1; 1008 1009 // Bit flags 1010 // a thread is currently executing tasks 1011 private static final int DOING_TASKS = 4; 1012 // a thread wants to execute tasks, while another thread is executing 1013 private static final int REQUESTING_TASKS = 8; 1014 private static final int TASK_BITS = 12; // Both bits 1015 1016 private static final int READER = 1; 1017 private static final int WRITER = 2; 1018 states(AtomicInteger state)1019 private static String states(AtomicInteger state) { 1020 int s = state.get(); 1021 StringBuilder sb = new StringBuilder(); 1022 int x = s & ~TASK_BITS; 1023 switch (x) { 1024 case NOT_HANDSHAKING -> sb.append(" NOT_HANDSHAKING "); 1025 case HANDSHAKING -> sb.append(" HANDSHAKING "); 1026 1027 default -> throw new InternalError(); 1028 } 1029 if ((s & DOING_TASKS) > 0) 1030 sb.append("|DOING_TASKS"); 1031 if ((s & REQUESTING_TASKS) > 0) 1032 sb.append("|REQUESTING_TASKS"); 1033 return sb.toString(); 1034 } 1035 resumeActivity()1036 private void resumeActivity() { 1037 reader.schedule(); 1038 writer.schedule(); 1039 } 1040 1041 final AtomicInteger handshakeState; 1042 final ConcurrentLinkedQueue<String> stateList = 1043 debug.on() ? new ConcurrentLinkedQueue<>() : null; 1044 1045 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS 1046 // depending on previous value 1047 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { 1048 if ((current & DOING_TASKS) == 0) 1049 return DOING_TASKS | (current & HANDSHAKING); 1050 else 1051 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); 1052 }; 1053 1054 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set 1055 // clears bits if not. 1056 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { 1057 if ((current & REQUESTING_TASKS) != 0) 1058 return DOING_TASKS | (current & HANDSHAKING); 1059 // clear both bits 1060 return (current & HANDSHAKING); 1061 }; 1062 doHandshake(EngineResult r, int caller)1063 private boolean doHandshake(EngineResult r, int caller) { 1064 // unconditionally sets the HANDSHAKING bit, while preserving task bits 1065 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); 1066 if (stateList != null && debug.on()) { 1067 stateList.add(r.handshakeStatus().toString()); 1068 stateList.add(Integer.toString(caller)); 1069 } 1070 switch (r.handshakeStatus()) { 1071 case NEED_TASK: 1072 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); 1073 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks 1074 return false; 1075 } 1076 1077 if (debug.on()) debug.log("obtaining and initiating task execution"); 1078 List<Runnable> tasks = obtainTasks(); 1079 executeTasks(tasks); 1080 return false; // executeTasks will resume activity 1081 case NEED_WRAP: 1082 if (caller == READER) { 1083 writer.triggerWrite(); 1084 return false; 1085 } 1086 break; 1087 case NEED_UNWRAP: 1088 case NEED_UNWRAP_AGAIN: 1089 // do nothing else 1090 // receiving-side data will trigger unwrap 1091 if (caller == WRITER) { 1092 reader.schedule(); 1093 return false; 1094 } 1095 break; 1096 default: 1097 throw new InternalError("Unexpected handshake status:" 1098 + r.handshakeStatus()); 1099 } 1100 return true; 1101 } 1102 obtainTasks()1103 private List<Runnable> obtainTasks() { 1104 List<Runnable> l = new ArrayList<>(); 1105 Runnable r; 1106 while ((r = engine.getDelegatedTask()) != null) { 1107 l.add(r); 1108 } 1109 return l; 1110 } 1111 executeTasks(List<Runnable> tasks)1112 private void executeTasks(List<Runnable> tasks) { 1113 exec.execute(() -> { 1114 try { 1115 List<Runnable> nextTasks = tasks; 1116 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); 1117 do { 1118 nextTasks.forEach(Runnable::run); 1119 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { 1120 nextTasks = obtainTasks(); 1121 } else { 1122 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); 1123 if ((s & DOING_TASKS) != 0) { 1124 if (debug.on()) debug.log("re-running tasks (B)"); 1125 nextTasks = obtainTasks(); 1126 continue; 1127 } 1128 break; 1129 } 1130 } while (true); 1131 if (debug.on()) debug.log("finished task execution"); 1132 HandshakeStatus hs = engine.getHandshakeStatus(); 1133 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) { 1134 // We're no longer handshaking, try setting ALPN 1135 trySetALPN(); 1136 } 1137 resumeActivity(); 1138 } catch (Throwable t) { 1139 handleError(checkForHandshake(t)); 1140 } 1141 }); 1142 } 1143 trySetALPN()1144 boolean trySetALPN() { 1145 // complete ALPN CF if needed. 1146 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 1147 applicationBufferSize = engine.getSession().getApplicationBufferSize(); 1148 packetBufferSize = engine.getSession().getPacketBufferSize(); 1149 setALPN(); 1150 return true; 1151 } 1152 return false; 1153 } 1154 1155 // FIXME: acknowledge a received CLOSE request from peer doClosure(EngineResult r)1156 EngineResult doClosure(EngineResult r) throws IOException { 1157 if (debug.on()) 1158 debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", 1159 r.result, engine.getHandshakeStatus(), 1160 engine.isOutboundDone(), engine.isInboundDone()); 1161 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { 1162 // we have received TLS close_notify and need to send 1163 // an acknowledgement back. We're calling doHandshake 1164 // to finish the close handshake. 1165 if (engine.isInboundDone() && !engine.isOutboundDone()) { 1166 if (debug.on()) debug.log("doClosure: close_notify received"); 1167 close_notify_received = true; 1168 if (!writer.scheduler.isStopped()) { 1169 doHandshake(r, READER); 1170 } else { 1171 // We have received closed notify, but we 1172 // won't be able to send the acknowledgement. 1173 // Nothing more will come from the socket either, 1174 // so mark the reader as completed. 1175 synchronized (reader.readBufferLock) { 1176 reader.completing = true; 1177 } 1178 } 1179 } 1180 } 1181 return r; 1182 } 1183 1184 /** 1185 * Returns the upstream Flow.Subscriber of the reading (incoming) side. 1186 * This flow must be given the encrypted data read from upstream (eg socket) 1187 * before it is decrypted. 1188 */ upstreamReader()1189 public Flow.Subscriber<List<ByteBuffer>> upstreamReader() { 1190 return reader; 1191 } 1192 1193 /** 1194 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. 1195 * This flow contains the plaintext data before it is encrypted. 1196 */ upstreamWriter()1197 public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() { 1198 return writer; 1199 } 1200 resumeReader()1201 public boolean resumeReader() { 1202 return reader.signalScheduling(); 1203 } 1204 resetReaderDemand()1205 public void resetReaderDemand() { 1206 reader.resetDownstreamDemand(); 1207 } 1208 1209 static class EngineResult { 1210 final SSLEngineResult result; 1211 final ByteBuffer destBuffer; 1212 1213 // normal result EngineResult(SSLEngineResult result)1214 EngineResult(SSLEngineResult result) { 1215 this(result, null); 1216 } 1217 EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1218 EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { 1219 this.result = result; 1220 this.destBuffer = destBuffer; 1221 } 1222 handshaking()1223 boolean handshaking() { 1224 HandshakeStatus s = result.getHandshakeStatus(); 1225 return s != HandshakeStatus.FINISHED 1226 && s != HandshakeStatus.NOT_HANDSHAKING 1227 && result.getStatus() != Status.CLOSED; 1228 } 1229 needUnwrap()1230 boolean needUnwrap() { 1231 HandshakeStatus s = result.getHandshakeStatus(); 1232 return s == HandshakeStatus.NEED_UNWRAP; 1233 } 1234 1235 bytesConsumed()1236 int bytesConsumed() { 1237 return result.bytesConsumed(); 1238 } 1239 bytesProduced()1240 int bytesProduced() { 1241 return result.bytesProduced(); 1242 } 1243 handshakeStatus()1244 SSLEngineResult.HandshakeStatus handshakeStatus() { 1245 return result.getHandshakeStatus(); 1246 } 1247 status()1248 SSLEngineResult.Status status() { 1249 return result.getStatus(); 1250 } 1251 } 1252 1253 // The maximum network buffer size negotiated during 1254 // the handshake. Usually 16K. 1255 volatile int packetBufferSize; getNetBuffer()1256 final ByteBuffer getNetBuffer() { 1257 int netSize = packetBufferSize; 1258 if (netSize <= 0) { 1259 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); 1260 } 1261 return ByteBuffer.allocate(netSize); 1262 } 1263 1264 // The maximum application buffer size negotiated during 1265 // the handshake. Usually close to 16K. 1266 volatile int applicationBufferSize; 1267 // Despite of the maximum applicationBufferSize negotiated 1268 // above, TLS records usually have a much smaller payload. 1269 // The adaptativeAppBufferSize records the max payload 1270 // ever decoded, and we use that as a guess for how big 1271 // a buffer we will need for the next payload. 1272 // This avoids allocating and zeroing a 16K buffer for 1273 // nothing... 1274 volatile int adaptiveAppBufferSize; getAppBuffer()1275 final ByteBuffer getAppBuffer() { 1276 int appSize = applicationBufferSize; 1277 if (appSize <= 0) { 1278 applicationBufferSize = appSize 1279 = engine.getSession().getApplicationBufferSize(); 1280 } 1281 int size = adaptiveAppBufferSize; 1282 if (size <= 0) { 1283 size = 512; // start with 512 this is usually enough for handshaking / headers 1284 } else if (size > appSize) { 1285 size = appSize; 1286 } 1287 // will cause a BUFFER_OVERFLOW if not big enough, but 1288 // that's OK. 1289 return ByteBuffer.allocate(size); 1290 } 1291 dbgString()1292 final String dbgString() { 1293 return "SSLFlowDelegate(" + tubeName + ")"; 1294 } 1295 } 1296