1 /* 2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.common; 27 28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction; 29 30 import javax.net.ssl.SSLEngine; 31 import javax.net.ssl.SSLEngineResult; 32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 33 import javax.net.ssl.SSLEngineResult.Status; 34 import javax.net.ssl.SSLException; 35 import javax.net.ssl.SSLHandshakeException; 36 import java.io.IOException; 37 import java.lang.ref.Reference; 38 import java.lang.ref.ReferenceQueue; 39 import java.lang.ref.WeakReference; 40 import java.nio.ByteBuffer; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.LinkedList; 45 import java.util.List; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.Executor; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.Flow.Subscriber; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.function.Consumer; 53 import java.util.function.IntBinaryOperator; 54 55 /** 56 * Implements SSL using two SubscriberWrappers. 57 * 58 * <p> Constructor takes two Flow.Subscribers: one that receives the network 59 * data (after it has been encrypted by SSLFlowDelegate) data, and one that 60 * receives the application data (before it has been encrypted by SSLFlowDelegate). 61 * 62 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding 63 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. 64 * See diagram below. 65 * 66 * <p> How Flow.Subscribers are used in this class, and where they come from: 67 * <pre> 68 * {@code 69 * 70 * 71 * 72 * ---------> data flow direction 73 * 74 * 75 * +------------------+ 76 * upstreamWriter | | downWriter 77 * ---------------> | | ------------> 78 * obtained from this | | supplied to constructor 79 * | SSLFlowDelegate | 80 * downReader | | upstreamReader 81 * <--------------- | | <-------------- 82 * supplied to constructor | | obtained from this 83 * +------------------+ 84 * 85 * Errors are reported to the downReader Flow.Subscriber 86 * 87 * } 88 * </pre> 89 */ 90 public class SSLFlowDelegate { 91 92 final Logger debug = 93 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 94 95 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; 96 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); 97 // When handshake is in progress trying to wrap may produce no bytes. 98 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); 99 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); 100 private static final boolean isMonitored = 101 monProp != null && (monProp.isEmpty() || monProp.equalsIgnoreCase("true")); 102 103 final Executor exec; 104 final Reader reader; 105 final Writer writer; 106 final SSLEngine engine; 107 final String tubeName; // hack 108 final CompletableFuture<String> alpnCF; // completes on initial handshake 109 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped 110 volatile boolean close_notify_received; 111 final CompletableFuture<Void> readerCF; 112 final CompletableFuture<Void> writerCF; 113 final CompletableFuture<Void> stopCF; 114 final Consumer<ByteBuffer> recycler; 115 static AtomicInteger scount = new AtomicInteger(1); 116 final int id; 117 118 /** 119 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 120 * Flow.Subscriber requires an associated {@link CompletableFuture} 121 * for errors that need to be signaled from downstream to upstream. 122 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123 public SSLFlowDelegate(SSLEngine engine, 124 Executor exec, 125 Subscriber<? super List<ByteBuffer>> downReader, 126 Subscriber<? super List<ByteBuffer>> downWriter) 127 { 128 this(engine, exec, null, downReader, downWriter); 129 } 130 131 /** 132 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 133 * Flow.Subscriber requires an associated {@link CompletableFuture} 134 * for errors that need to be signaled from downstream to upstream. 135 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136 public SSLFlowDelegate(SSLEngine engine, 137 Executor exec, 138 Consumer<ByteBuffer> recycler, 139 Subscriber<? super List<ByteBuffer>> downReader, 140 Subscriber<? super List<ByteBuffer>> downWriter) 141 { 142 this.id = scount.getAndIncrement(); 143 this.tubeName = String.valueOf(downWriter); 144 this.recycler = recycler; 145 this.reader = new Reader(); 146 this.writer = new Writer(); 147 this.engine = engine; 148 this.exec = exec; 149 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); 150 this.readerCF = reader.completion(); 151 this.writerCF = reader.completion(); 152 readerCF.exceptionally(this::stopOnError); 153 writerCF.exceptionally(this::stopOnError); 154 this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion()) 155 .thenRun(this::normalStop); 156 this.alpnCF = new MinimalFuture<>(); 157 158 // connect the Reader to the downReader and the 159 // Writer to the downWriter. 160 connect(downReader, downWriter); 161 162 if (isMonitored) Monitor.add(monitor); 163 } 164 165 /** 166 * Returns true if the SSLFlowDelegate has detected a TLS 167 * close_notify from the server. 168 * @return true, if a close_notify was detected. 169 */ closeNotifyReceived()170 public boolean closeNotifyReceived() { 171 return close_notify_received; 172 } 173 174 /** 175 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, 176 * and the write sink (downWriter) to the SSLFlowDelegate Writer. 177 * Called from within the constructor. Overwritten by SSLTube. 178 * 179 * @param downReader The left hand side read sink (typically, the 180 * HttpConnection read subscriber). 181 * @param downWriter The right hand side write sink (typically 182 * the SocketTube write subscriber). 183 */ connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184 void connect(Subscriber<? super List<ByteBuffer>> downReader, 185 Subscriber<? super List<ByteBuffer>> downWriter) { 186 this.reader.subscribe(downReader); 187 this.writer.subscribe(downWriter); 188 } 189 190 /** 191 * Returns a CompletableFuture<String> which completes after 192 * the initial handshake completes, and which contains the negotiated 193 * alpn. 194 */ alpn()195 public CompletableFuture<String> alpn() { 196 return alpnCF; 197 } 198 setALPN()199 private void setALPN() { 200 // Handshake is finished. So, can retrieve the ALPN now 201 if (alpnCF.isDone()) 202 return; 203 String alpn = engine.getApplicationProtocol(); 204 if (debug.on()) debug.log("setALPN = %s", alpn); 205 alpnCF.complete(alpn); 206 } 207 monitor()208 public String monitor() { 209 StringBuilder sb = new StringBuilder(); 210 sb.append("SSL: id ").append(id); 211 sb.append(" ").append(dbgString()); 212 sb.append(" HS state: " + states(handshakeState)); 213 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); 214 if (stateList != null) { 215 sb.append(" LL : "); 216 for (String s : stateList) { 217 sb.append(s).append(" "); 218 } 219 } 220 sb.append("\r\n"); 221 sb.append("Reader:: ").append(reader.toString()); 222 sb.append("\r\n"); 223 sb.append("Writer:: ").append(writer.toString()); 224 sb.append("\r\n==================================="); 225 return sb.toString(); 226 } 227 enterReadScheduling()228 protected SchedulingAction enterReadScheduling() { 229 return SchedulingAction.CONTINUE; 230 } 231 232 233 /** 234 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). 235 * Any decrypted buffers returned to be passed downstream. 236 * Status codes: 237 * NEED_UNWRAP: do nothing. Following incoming data will contain 238 * any required handshake data 239 * NEED_WRAP: call writer.addData() with empty buffer 240 * NEED_TASK: delegate task to executor 241 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap 242 * BUFFER_UNDERFLOW: keep buffer and wait for more data 243 * OK: return generated buffers. 244 * 245 * Upstream subscription strategy is to try and keep no more than 246 * TARGET_BUFSIZE bytes in readBuf 247 */ 248 final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { 249 // Maximum record size is 16k. 250 // Because SocketTube can feeds us up to 3 16K buffers, 251 // then setting this size to 16K means that the readBuf 252 // can store up to 64K-1 (16K-1 + 3*16K) 253 static final int TARGET_BUFSIZE = 16 * 1024; 254 255 final SequentialScheduler scheduler; 256 volatile ByteBuffer readBuf; 257 volatile boolean completing; 258 final Object readBufferLock = new Object(); 259 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 260 261 private final class ReaderDownstreamPusher implements Runnable { 262 @Override run()263 public void run() { 264 processData(); 265 } 266 } 267 Reader()268 Reader() { 269 super(); 270 scheduler = SequentialScheduler.synchronizedScheduler( 271 new ReaderDownstreamPusher()); 272 this.readBuf = ByteBuffer.allocate(1024); 273 readBuf.limit(0); // keep in read mode 274 } 275 276 @Override supportsRecycling()277 public boolean supportsRecycling() { 278 return recycler != null; 279 } 280 enterScheduling()281 protected SchedulingAction enterScheduling() { 282 return enterReadScheduling(); 283 } 284 dbgString()285 public final String dbgString() { 286 return "SSL Reader(" + tubeName + ")"; 287 } 288 289 /** 290 * entry point for buffers delivered from upstream Subscriber 291 */ 292 @Override incoming(List<ByteBuffer> buffers, boolean complete)293 public void incoming(List<ByteBuffer> buffers, boolean complete) { 294 if (debugr.on()) 295 debugr.log("Adding %d bytes to read buffer", 296 Utils.remaining(buffers)); 297 addToReadBuf(buffers, complete); 298 scheduler.runOrSchedule(exec); 299 } 300 301 @Override toString()302 public String toString() { 303 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() 304 + ", count: " + count.toString() + ", scheduler: " 305 + (scheduler.isStopped() ? "stopped" : "running") 306 + ", status: " + lastUnwrapStatus 307 + ", handshakeState: " + handshakeState.get() 308 + ", engine: " + engine.getHandshakeStatus(); 309 } 310 reallocReadBuf()311 private void reallocReadBuf() { 312 int sz = readBuf.capacity(); 313 ByteBuffer newb = ByteBuffer.allocate(sz * 2); 314 readBuf.flip(); 315 Utils.copy(readBuf, newb); 316 readBuf = newb; 317 } 318 319 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)320 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 321 if (needsMoreData()) { 322 // run the scheduler to see if more data should be requested 323 if (debugr.on()) { 324 int remaining = readBuf.remaining(); 325 if (remaining > TARGET_BUFSIZE) { 326 // just some logging to check how much we have in the read buffer 327 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", 328 remaining); 329 } 330 } 331 scheduler.runOrSchedule(); 332 } 333 return 0; // we will request more from the scheduler loop (processData). 334 } 335 336 // readBuf is kept ready for reading outside of this method addToReadBuf(List<ByteBuffer> buffers, boolean complete)337 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { 338 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); 339 synchronized (readBufferLock) { 340 for (ByteBuffer buf : buffers) { 341 readBuf.compact(); 342 while (readBuf.remaining() < buf.remaining()) 343 reallocReadBuf(); 344 readBuf.put(buf); 345 readBuf.flip(); 346 // should be safe to call inside lock 347 // since the only implementation 348 // offers the buffer to an unbounded queue. 349 // WARNING: do not touch buf after this point! 350 if (recycler != null) recycler.accept(buf); 351 } 352 if (complete) { 353 this.completing = complete; 354 minBytesRequired = 0; 355 } 356 } 357 } 358 schedule()359 void schedule() { 360 scheduler.runOrSchedule(exec); 361 } 362 stop()363 void stop() { 364 if (debugr.on()) debugr.log("stop"); 365 scheduler.stop(); 366 } 367 368 AtomicInteger count = new AtomicInteger(0); 369 370 // minimum number of bytes required to call unwrap. 371 // Usually this is 0, unless there was a buffer underflow. 372 // In this case we need to wait for more bytes than what 373 // we had before calling unwrap() again. 374 volatile int minBytesRequired; 375 376 // We might need to request more data if: 377 // - we have a subscription from upstream 378 // - and we don't have enough data to decrypt in the read buffer 379 // - *and* - either we're handshaking, and more data is required (NEED_UNWRAP), 380 // - or we have demand from downstream, but we have nothing decrypted 381 // to forward downstream. needsMoreData()382 boolean needsMoreData() { 383 if (upstreamSubscription != null && readBuf.remaining() <= minBytesRequired && 384 (engine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP 385 || !downstreamSubscription.demand.isFulfilled() && hasNoOutputData())) { 386 return true; 387 } 388 return false; 389 } 390 391 // If the readBuf has not enough data, and we either need to 392 // unwrap (handshaking) or we have demand from downstream, 393 // then request more data requestMoreDataIfNeeded()394 void requestMoreDataIfNeeded() { 395 if (needsMoreData()) { 396 // request more will only request more if our 397 // demand from upstream is fulfilled 398 requestMore(); 399 } 400 } 401 402 // work function where it all happens processData()403 final void processData() { 404 try { 405 if (debugr.on()) 406 debugr.log("processData:" 407 + " readBuf remaining:" + readBuf.remaining() 408 + ", state:" + states(handshakeState) 409 + ", engine handshake status:" + engine.getHandshakeStatus()); 410 int len; 411 boolean complete = false; 412 while (readBuf.remaining() > (len = minBytesRequired)) { 413 boolean handshaking = false; 414 try { 415 EngineResult result; 416 synchronized (readBufferLock) { 417 complete = this.completing; 418 if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); 419 // Unless there is a BUFFER_UNDERFLOW, we should try to 420 // unwrap any number of bytes. Set minBytesRequired to 0: 421 // we only need to do that if minBytesRequired is not already 0. 422 len = len > 0 ? minBytesRequired = 0 : len; 423 result = unwrapBuffer(readBuf); 424 len = readBuf.remaining(); 425 if (debugr.on()) { 426 debugr.log("Unwrapped: result: %s", result.result); 427 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); 428 } 429 } 430 if (result.bytesProduced() > 0) { 431 if (debugr.on()) 432 debugr.log("sending %d", result.bytesProduced()); 433 count.addAndGet(result.bytesProduced()); 434 outgoing(result.destBuffer, false); 435 } 436 if (result.status() == Status.BUFFER_UNDERFLOW) { 437 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); 438 // not enough data in the read buffer... 439 // no need to try to unwrap again unless we get more bytes 440 // than minBytesRequired = len in the read buffer. 441 synchronized (readBufferLock) { 442 minBytesRequired = len; 443 // more bytes could already have been added... 444 assert readBuf.remaining() >= len; 445 // check if we have received some data, and if so 446 // we can just re-spin the loop 447 if (readBuf.remaining() > len) continue; 448 else if (this.completing) { 449 if (debug.on()) { 450 debugr.log("BUFFER_UNDERFLOW with EOF," + 451 " %d bytes non decrypted.", len); 452 } 453 // The channel won't send us any more data, and 454 // we are in underflow: we need to fail. 455 throw new IOException("BUFFER_UNDERFLOW with EOF, " 456 + len + " bytes non decrypted."); 457 } 458 } 459 // request more data and return. 460 requestMore(); 461 return; 462 } 463 if (complete && result.status() == Status.CLOSED) { 464 if (debugr.on()) debugr.log("Closed: completing"); 465 outgoing(Utils.EMPTY_BB_LIST, true); 466 // complete ALPN if not yet completed 467 setALPN(); 468 requestMoreDataIfNeeded(); 469 return; 470 } 471 if (result.handshaking()) { 472 handshaking = true; 473 if (debugr.on()) debugr.log("handshaking"); 474 if (doHandshake(result, READER)) continue; // need unwrap 475 else break; // doHandshake will have triggered the write scheduler if necessary 476 } else { 477 if (trySetALPN()) { 478 resumeActivity(); 479 } 480 } 481 } catch (IOException ex) { 482 errorCommon(ex); 483 handleError(ex); 484 return; 485 } 486 if (handshaking && !complete) { 487 requestMoreDataIfNeeded(); 488 return; 489 } 490 } 491 if (!complete) { 492 synchronized (readBufferLock) { 493 complete = this.completing && !readBuf.hasRemaining(); 494 } 495 } 496 if (complete) { 497 if (debugr.on()) debugr.log("completing"); 498 // Complete the alpnCF, if not already complete, regardless of 499 // whether or not the ALPN is available, there will be no more 500 // activity. 501 setALPN(); 502 outgoing(Utils.EMPTY_BB_LIST, true); 503 } else { 504 requestMoreDataIfNeeded(); 505 } 506 } catch (Throwable ex) { 507 errorCommon(ex); 508 handleError(ex); 509 } 510 } 511 512 private volatile Status lastUnwrapStatus; unwrapBuffer(ByteBuffer src)513 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { 514 ByteBuffer dst = getAppBuffer(); 515 int len = src.remaining(); 516 while (true) { 517 SSLEngineResult sslResult = engine.unwrap(src, dst); 518 switch (lastUnwrapStatus = sslResult.getStatus()) { 519 case BUFFER_OVERFLOW: 520 // may happen if app size buffer was changed, or if 521 // our 'adaptiveBufferSize' guess was too small for 522 // the current payload. In that case, update the 523 // value of applicationBufferSize, and allocate a 524 // buffer of that size, which we are sure will be 525 // big enough to decode whatever needs to be 526 // decoded. We will later update adaptiveBufferSize 527 // in OK: below. 528 int appSize = applicationBufferSize = 529 engine.getSession().getApplicationBufferSize(); 530 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 531 dst.flip(); 532 b.put(dst); 533 dst = b; 534 break; 535 case CLOSED: 536 assert dst.position() == 0; 537 return doClosure(new EngineResult(sslResult)); 538 case BUFFER_UNDERFLOW: 539 // handled implicitly by compaction/reallocation of readBuf 540 assert dst.position() == 0; 541 return new EngineResult(sslResult); 542 case OK: 543 int size = dst.position(); 544 if (debug.on()) { 545 debugr.log("Decoded " + size + " bytes out of " + len 546 + " into buffer of " + dst.capacity() 547 + " remaining to decode: " + src.remaining()); 548 } 549 // if the record payload was bigger than what was originally 550 // allocated, then sets the adaptiveAppBufferSize to size 551 // and we will use that new size as a guess for the next app 552 // buffer. 553 if (size > adaptiveAppBufferSize) { 554 adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; 555 } 556 dst.flip(); 557 return new EngineResult(sslResult, dst); 558 } 559 } 560 } 561 } 562 563 public interface Monitorable { getInfo()564 public String getInfo(); 565 } 566 567 public static class Monitor extends Thread { 568 final List<WeakReference<Monitorable>> list; 569 final List<FinalMonitorable> finalList; 570 final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>(); 571 static Monitor themon; 572 573 static { 574 themon = new Monitor(); themon.start()575 themon.start(); // uncomment to enable Monitor 576 } 577 578 // An instance used to temporarily store the 579 // last observable state of a monitorable object. 580 // When Monitor.remove(o) is called, we replace 581 // 'o' with a FinalMonitorable whose reference 582 // will be enqueued after the last observable state 583 // has been printed. 584 final class FinalMonitorable implements Monitorable { 585 final String finalState; FinalMonitorable(Monitorable o)586 FinalMonitorable(Monitorable o) { 587 finalState = o.getInfo(); 588 finalList.add(this); 589 } 590 @Override getInfo()591 public String getInfo() { 592 finalList.remove(this); 593 return finalState; 594 } 595 } 596 Monitor()597 Monitor() { 598 super("Monitor"); 599 setDaemon(true); 600 list = Collections.synchronizedList(new LinkedList<>()); 601 finalList = new ArrayList<>(); // access is synchronized on list above 602 } 603 addTarget(Monitorable o)604 void addTarget(Monitorable o) { 605 list.add(new WeakReference<>(o, queue)); 606 } removeTarget(Monitorable o)607 void removeTarget(Monitorable o) { 608 // It can take a long time for GC to clean up references. 609 // Calling Monitor.remove() early helps removing noise from the 610 // logs/ 611 synchronized (list) { 612 Iterator<WeakReference<Monitorable>> it = list.iterator(); 613 while (it.hasNext()) { 614 Monitorable m = it.next().get(); 615 if (m == null) it.remove(); 616 if (o == m) { 617 it.remove(); 618 break; 619 } 620 } 621 FinalMonitorable m = new FinalMonitorable(o); 622 addTarget(m); 623 Reference.reachabilityFence(m); 624 } 625 } 626 add(Monitorable o)627 public static void add(Monitorable o) { 628 themon.addTarget(o); 629 } remove(Monitorable o)630 public static void remove(Monitorable o) { 631 themon.removeTarget(o); 632 } 633 634 @Override run()635 public void run() { 636 System.out.println("Monitor starting"); 637 try { 638 while (true) { 639 Thread.sleep(20 * 1000); 640 synchronized (list) { 641 Reference<? extends Monitorable> expired; 642 while ((expired = queue.poll()) != null) list.remove(expired); 643 for (WeakReference<Monitorable> ref : list) { 644 Monitorable o = ref.get(); 645 if (o == null) continue; 646 if (o instanceof FinalMonitorable) { 647 ref.enqueue(); 648 } 649 System.out.println(o.getInfo()); 650 System.out.println("-------------------------"); 651 } 652 } 653 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); 654 } 655 } catch (InterruptedException e) { 656 System.out.println("Monitor exiting with " + e); 657 } 658 } 659 } 660 661 /** 662 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() 663 * Any encrypted buffers generated are passed downstream to be written. 664 * Status codes: 665 * NEED_UNWRAP: call reader.addData() with empty buffer 666 * NEED_WRAP: call addData() with empty buffer 667 * NEED_TASK: delegate task to executor 668 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap 669 * BUFFER_UNDERFLOW: shouldn't happen on writing side 670 * OK: return generated buffers 671 */ 672 class Writer extends SubscriberWrapper { 673 final SequentialScheduler scheduler; 674 // queues of buffers received from upstream waiting 675 // to be processed by the SSLEngine 676 final List<ByteBuffer> writeList; 677 final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 678 volatile boolean completing; 679 boolean completed; // only accessed in processData 680 681 class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { run()682 @Override public void run() { processData(); } 683 } 684 Writer()685 Writer() { 686 super(); 687 writeList = Collections.synchronizedList(new LinkedList<>()); 688 scheduler = new SequentialScheduler(new WriterDownstreamPusher()); 689 } 690 691 @Override incoming(List<ByteBuffer> buffers, boolean complete)692 protected void incoming(List<ByteBuffer> buffers, boolean complete) { 693 assert complete ? buffers == Utils.EMPTY_BB_LIST : true; 694 assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; 695 if (complete) { 696 if (debugw.on()) debugw.log("adding SENTINEL"); 697 completing = true; 698 writeList.add(SENTINEL); 699 } else { 700 writeList.addAll(buffers); 701 } 702 if (debugw.on()) 703 debugw.log("added " + buffers.size() 704 + " (" + Utils.remaining(buffers) 705 + " bytes) to the writeList"); 706 scheduler.runOrSchedule(); 707 } 708 dbgString()709 public final String dbgString() { 710 return "SSL Writer(" + tubeName + ")"; 711 } 712 onSubscribe()713 protected void onSubscribe() { 714 if (debugw.on()) debugw.log("onSubscribe initiating handshaking"); 715 addData(HS_TRIGGER); // initiates handshaking 716 } 717 schedule()718 void schedule() { 719 scheduler.runOrSchedule(); 720 } 721 stop()722 void stop() { 723 if (debugw.on()) debugw.log("stop"); 724 scheduler.stop(); 725 } 726 727 @Override closing()728 public boolean closing() { 729 return closeNotifyReceived(); 730 } 731 isCompleting()732 private boolean isCompleting() { 733 return completing; 734 } 735 736 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)737 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 738 if (writeList.size() > 10) 739 return 0; 740 else 741 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 742 } 743 hsTriggered()744 private boolean hsTriggered() { 745 synchronized(writeList) { 746 for (ByteBuffer b : writeList) 747 if (b == HS_TRIGGER) 748 return true; 749 return false; 750 } 751 } 752 triggerWrite()753 void triggerWrite() { 754 synchronized (writeList) { 755 if (writeList.isEmpty()) { 756 writeList.add(HS_TRIGGER); 757 } 758 } 759 scheduler.runOrSchedule(); 760 } 761 processData()762 private void processData() { 763 boolean completing = isCompleting(); 764 765 try { 766 if (debugw.on()) 767 debugw.log("processData, writeList remaining:" 768 + Utils.remaining(writeList) + ", hsTriggered:" 769 + hsTriggered() + ", needWrap:" + needWrap()); 770 771 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { 772 ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); 773 EngineResult result = wrapBuffers(outbufs); 774 if (debugw.on()) 775 debugw.log("wrapBuffer returned %s", result.result); 776 777 if (result.status() == Status.CLOSED) { 778 if (!upstreamCompleted) { 779 upstreamCompleted = true; 780 upstreamSubscription.cancel(); 781 // complete ALPN if not yet completed 782 setALPN(); 783 } 784 if (result.bytesProduced() <= 0) 785 return; 786 787 if (!completing && !completed) { 788 completing = this.completing = true; 789 // There could still be some outgoing data in outbufs. 790 writeList.add(SENTINEL); 791 } 792 } 793 794 boolean handshaking = false; 795 if (result.handshaking()) { 796 if (debugw.on()) debugw.log("handshaking"); 797 doHandshake(result, WRITER); // ok to ignore return 798 handshaking = true; 799 } else { 800 if (trySetALPN()) { 801 resumeActivity(); 802 } 803 } 804 cleanList(writeList); // tidy up the source list 805 sendResultBytes(result); 806 if (handshaking) { 807 if (!completing && needWrap()) { 808 continue; 809 } else { 810 return; 811 } 812 } 813 } 814 if (completing && Utils.remaining(writeList) == 0) { 815 if (!completed) { 816 completed = true; 817 writeList.clear(); 818 outgoing(Utils.EMPTY_BB_LIST, true); 819 } 820 return; 821 } 822 if (writeList.isEmpty() && needWrap()) { 823 writer.addData(HS_TRIGGER); 824 } 825 } catch (Throwable ex) { 826 errorCommon(ex); 827 handleError(ex); 828 } 829 } 830 831 // The SSLEngine insists on being given a buffer that is at least 832 // SSLSession.getPacketBufferSize() long (usually 16K). If given 833 // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only 834 // has 6 bytes to wrap. Typical usage shows that for GET we 835 // usually produce an average of ~ 100 bytes. 836 // To avoid wasting space, and because allocating and zeroing 837 // 16K buffers for encoding 6 bytes is costly, we are reusing the 838 // same writeBuffer to interact with SSLEngine.wrap(). 839 // If the SSLEngine produces less than writeBuffer.capacity() / 2, 840 // then we copy off the bytes to a smaller buffer that we send 841 // downstream. Otherwise, we send the writeBuffer downstream 842 // and will allocate a new one next time. 843 volatile ByteBuffer writeBuffer; 844 private volatile Status lastWrappedStatus; 845 @SuppressWarnings("fallthrough") wrapBuffers(ByteBuffer[] src)846 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 847 long len = Utils.remaining(src); 848 if (debugw.on()) 849 debugw.log("wrapping " + len + " bytes"); 850 851 ByteBuffer dst = writeBuffer; 852 if (dst == null) dst = writeBuffer = getNetBuffer(); 853 assert dst.position() == 0 : "buffer position is " + dst.position(); 854 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); 855 856 while (true) { 857 SSLEngineResult sslResult = engine.wrap(src, dst); 858 if (debugw.on()) debugw.log("SSLResult: " + sslResult); 859 switch (lastWrappedStatus = sslResult.getStatus()) { 860 case BUFFER_OVERFLOW: 861 // Shouldn't happen. We allocated buffer with packet size 862 // get it again if net buffer size was changed 863 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); 864 int netSize = packetBufferSize 865 = engine.getSession().getPacketBufferSize(); 866 ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); 867 dst.flip(); 868 b.put(dst); 869 dst = b; 870 break; // try again 871 case CLOSED: 872 if (debugw.on()) debugw.log("CLOSED"); 873 // fallthrough. There could be some remaining data in dst. 874 // CLOSED will be handled by the caller. 875 case OK: 876 final ByteBuffer dest; 877 if (dst.position() == 0) { 878 dest = NOTHING; // can happen if handshake is in progress 879 } else if (dst.position() < dst.capacity() / 2) { 880 // less than half the buffer was used. 881 // copy off the bytes to a smaller buffer, and keep 882 // the writeBuffer for next time. 883 dst.flip(); 884 dest = Utils.copyAligned(dst); 885 dst.clear(); 886 } else { 887 // more than half the buffer was used. 888 // just send that buffer downstream, and we will 889 // get a new writeBuffer next time it is needed. 890 dst.flip(); 891 dest = dst; 892 writeBuffer = null; 893 } 894 if (debugw.on()) 895 debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", 896 dest.remaining(), dest.capacity(), Utils.remaining(src)); 897 return new EngineResult(sslResult, dest); 898 case BUFFER_UNDERFLOW: 899 // Shouldn't happen. Doesn't returns when wrap() 900 // underflow handled externally 901 // assert false : "Buffer Underflow"; 902 if (debug.on()) debug.log("BUFFER_UNDERFLOW"); 903 return new EngineResult(sslResult); 904 default: 905 if (debugw.on()) 906 debugw.log("result: %s", sslResult.getStatus()); 907 assert false : "result:" + sslResult.getStatus(); 908 } 909 } 910 } 911 needWrap()912 private boolean needWrap() { 913 return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; 914 } 915 sendResultBytes(EngineResult result)916 private void sendResultBytes(EngineResult result) { 917 if (result.bytesProduced() > 0) { 918 if (debugw.on()) 919 debugw.log("Sending %d bytes downstream", 920 result.bytesProduced()); 921 outgoing(result.destBuffer, false); 922 } 923 } 924 925 @Override toString()926 public String toString() { 927 return "WRITER: " + super.toString() 928 + ", writeList size: " + Integer.toString(writeList.size()) 929 + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") 930 + ", status: " + lastWrappedStatus; 931 //" writeList: " + writeList.toString(); 932 } 933 } 934 handleError(Throwable t)935 private void handleError(Throwable t) { 936 if (debug.on()) debug.log("handleError", t); 937 readerCF.completeExceptionally(t); 938 writerCF.completeExceptionally(t); 939 // no-op if already completed 940 alpnCF.completeExceptionally(t); 941 reader.stop(); 942 writer.stop(); 943 } 944 945 boolean stopped; 946 normalStop()947 private synchronized void normalStop() { 948 if (stopped) 949 return; 950 stopped = true; 951 reader.stop(); 952 writer.stop(); 953 // make sure the alpnCF is completed. 954 if (!alpnCF.isDone()) { 955 Throwable alpn = new SSLHandshakeException( 956 "Connection closed before successful ALPN negotiation"); 957 alpnCF.completeExceptionally(alpn); 958 } 959 if (isMonitored) Monitor.remove(monitor); 960 } 961 stopOnError(Throwable error)962 private Void stopOnError(Throwable error) { 963 // maybe log, etc 964 // ensure the ALPN is completed 965 // We could also do this in SSLTube.SSLSubscriberWrapper 966 // onError/onComplete - with the caveat that the ALP CF 967 // would get completed externally. Doing it here keeps 968 // it all inside SSLFlowDelegate. 969 if (!alpnCF.isDone()) { 970 alpnCF.completeExceptionally(error); 971 } 972 normalStop(); 973 return null; 974 } 975 cleanList(List<ByteBuffer> l)976 private void cleanList(List<ByteBuffer> l) { 977 synchronized (l) { 978 Iterator<ByteBuffer> iter = l.iterator(); 979 while (iter.hasNext()) { 980 ByteBuffer b = iter.next(); 981 if (!b.hasRemaining() && b != SENTINEL) { 982 iter.remove(); 983 } 984 } 985 } 986 } 987 988 /** 989 * States for handshake. We avoid races when accessing/updating the AtomicInt 990 * because updates always schedule an additional call to both the read() 991 * and write() functions. 992 */ 993 private static final int NOT_HANDSHAKING = 0; 994 private static final int HANDSHAKING = 1; 995 996 // Bit flags 997 // a thread is currently executing tasks 998 private static final int DOING_TASKS = 4; 999 // a thread wants to execute tasks, while another thread is executing 1000 private static final int REQUESTING_TASKS = 8; 1001 private static final int TASK_BITS = 12; // Both bits 1002 1003 private static final int READER = 1; 1004 private static final int WRITER = 2; 1005 states(AtomicInteger state)1006 private static String states(AtomicInteger state) { 1007 int s = state.get(); 1008 StringBuilder sb = new StringBuilder(); 1009 int x = s & ~TASK_BITS; 1010 switch (x) { 1011 case NOT_HANDSHAKING: 1012 sb.append(" NOT_HANDSHAKING "); 1013 break; 1014 case HANDSHAKING: 1015 sb.append(" HANDSHAKING "); 1016 break; 1017 default: 1018 throw new InternalError(); 1019 } 1020 if ((s & DOING_TASKS) > 0) 1021 sb.append("|DOING_TASKS"); 1022 if ((s & REQUESTING_TASKS) > 0) 1023 sb.append("|REQUESTING_TASKS"); 1024 return sb.toString(); 1025 } 1026 resumeActivity()1027 private void resumeActivity() { 1028 reader.schedule(); 1029 writer.schedule(); 1030 } 1031 1032 final AtomicInteger handshakeState; 1033 final ConcurrentLinkedQueue<String> stateList = 1034 debug.on() ? new ConcurrentLinkedQueue<>() : null; 1035 1036 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS 1037 // depending on previous value 1038 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { 1039 if ((current & DOING_TASKS) == 0) 1040 return DOING_TASKS | (current & HANDSHAKING); 1041 else 1042 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); 1043 }; 1044 1045 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set 1046 // clears bits if not. 1047 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { 1048 if ((current & REQUESTING_TASKS) != 0) 1049 return DOING_TASKS | (current & HANDSHAKING); 1050 // clear both bits 1051 return (current & HANDSHAKING); 1052 }; 1053 doHandshake(EngineResult r, int caller)1054 private boolean doHandshake(EngineResult r, int caller) { 1055 // unconditionally sets the HANDSHAKING bit, while preserving task bits 1056 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); 1057 if (stateList != null && debug.on()) { 1058 stateList.add(r.handshakeStatus().toString()); 1059 stateList.add(Integer.toString(caller)); 1060 } 1061 switch (r.handshakeStatus()) { 1062 case NEED_TASK: 1063 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); 1064 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks 1065 return false; 1066 } 1067 1068 if (debug.on()) debug.log("obtaining and initiating task execution"); 1069 List<Runnable> tasks = obtainTasks(); 1070 executeTasks(tasks); 1071 return false; // executeTasks will resume activity 1072 case NEED_WRAP: 1073 if (caller == READER) { 1074 writer.triggerWrite(); 1075 return false; 1076 } 1077 break; 1078 case NEED_UNWRAP: 1079 case NEED_UNWRAP_AGAIN: 1080 // do nothing else 1081 // receiving-side data will trigger unwrap 1082 if (caller == WRITER) { 1083 reader.schedule(); 1084 return false; 1085 } 1086 break; 1087 default: 1088 throw new InternalError("Unexpected handshake status:" 1089 + r.handshakeStatus()); 1090 } 1091 return true; 1092 } 1093 obtainTasks()1094 private List<Runnable> obtainTasks() { 1095 List<Runnable> l = new ArrayList<>(); 1096 Runnable r; 1097 while ((r = engine.getDelegatedTask()) != null) { 1098 l.add(r); 1099 } 1100 return l; 1101 } 1102 executeTasks(List<Runnable> tasks)1103 private void executeTasks(List<Runnable> tasks) { 1104 exec.execute(() -> { 1105 try { 1106 List<Runnable> nextTasks = tasks; 1107 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); 1108 do { 1109 nextTasks.forEach(Runnable::run); 1110 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { 1111 nextTasks = obtainTasks(); 1112 } else { 1113 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); 1114 if ((s & DOING_TASKS) != 0) { 1115 if (debug.on()) debug.log("re-running tasks (B)"); 1116 nextTasks = obtainTasks(); 1117 continue; 1118 } 1119 break; 1120 } 1121 } while (true); 1122 if (debug.on()) debug.log("finished task execution"); 1123 HandshakeStatus hs = engine.getHandshakeStatus(); 1124 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) { 1125 // We're no longer handshaking, try setting ALPN 1126 trySetALPN(); 1127 } 1128 resumeActivity(); 1129 } catch (Throwable t) { 1130 handleError(t); 1131 } 1132 }); 1133 } 1134 trySetALPN()1135 boolean trySetALPN() { 1136 // complete ALPN CF if needed. 1137 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 1138 applicationBufferSize = engine.getSession().getApplicationBufferSize(); 1139 packetBufferSize = engine.getSession().getPacketBufferSize(); 1140 setALPN(); 1141 return true; 1142 } 1143 return false; 1144 } 1145 1146 // FIXME: acknowledge a received CLOSE request from peer doClosure(EngineResult r)1147 EngineResult doClosure(EngineResult r) throws IOException { 1148 if (debug.on()) 1149 debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", 1150 r.result, engine.getHandshakeStatus(), 1151 engine.isOutboundDone(), engine.isInboundDone()); 1152 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { 1153 // we have received TLS close_notify and need to send 1154 // an acknowledgement back. We're calling doHandshake 1155 // to finish the close handshake. 1156 if (engine.isInboundDone() && !engine.isOutboundDone()) { 1157 if (debug.on()) debug.log("doClosure: close_notify received"); 1158 close_notify_received = true; 1159 if (!writer.scheduler.isStopped()) { 1160 doHandshake(r, READER); 1161 } else { 1162 // We have received closed notify, but we 1163 // won't be able to send the acknowledgement. 1164 // Nothing more will come from the socket either, 1165 // so mark the reader as completed. 1166 synchronized (reader.readBufferLock) { 1167 reader.completing = true; 1168 } 1169 } 1170 } 1171 } 1172 return r; 1173 } 1174 1175 /** 1176 * Returns the upstream Flow.Subscriber of the reading (incoming) side. 1177 * This flow must be given the encrypted data read from upstream (eg socket) 1178 * before it is decrypted. 1179 */ upstreamReader()1180 public Flow.Subscriber<List<ByteBuffer>> upstreamReader() { 1181 return reader; 1182 } 1183 1184 /** 1185 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. 1186 * This flow contains the plaintext data before it is encrypted. 1187 */ upstreamWriter()1188 public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() { 1189 return writer; 1190 } 1191 resumeReader()1192 public boolean resumeReader() { 1193 return reader.signalScheduling(); 1194 } 1195 resetReaderDemand()1196 public void resetReaderDemand() { 1197 reader.resetDownstreamDemand(); 1198 } 1199 1200 static class EngineResult { 1201 final SSLEngineResult result; 1202 final ByteBuffer destBuffer; 1203 1204 // normal result EngineResult(SSLEngineResult result)1205 EngineResult(SSLEngineResult result) { 1206 this(result, null); 1207 } 1208 EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1209 EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { 1210 this.result = result; 1211 this.destBuffer = destBuffer; 1212 } 1213 handshaking()1214 boolean handshaking() { 1215 HandshakeStatus s = result.getHandshakeStatus(); 1216 return s != HandshakeStatus.FINISHED 1217 && s != HandshakeStatus.NOT_HANDSHAKING 1218 && result.getStatus() != Status.CLOSED; 1219 } 1220 needUnwrap()1221 boolean needUnwrap() { 1222 HandshakeStatus s = result.getHandshakeStatus(); 1223 return s == HandshakeStatus.NEED_UNWRAP; 1224 } 1225 1226 bytesConsumed()1227 int bytesConsumed() { 1228 return result.bytesConsumed(); 1229 } 1230 bytesProduced()1231 int bytesProduced() { 1232 return result.bytesProduced(); 1233 } 1234 handshakeStatus()1235 SSLEngineResult.HandshakeStatus handshakeStatus() { 1236 return result.getHandshakeStatus(); 1237 } 1238 status()1239 SSLEngineResult.Status status() { 1240 return result.getStatus(); 1241 } 1242 } 1243 1244 // The maximum network buffer size negotiated during 1245 // the handshake. Usually 16K. 1246 volatile int packetBufferSize; getNetBuffer()1247 final ByteBuffer getNetBuffer() { 1248 int netSize = packetBufferSize; 1249 if (netSize <= 0) { 1250 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); 1251 } 1252 return ByteBuffer.allocate(netSize); 1253 } 1254 1255 // The maximum application buffer size negotiated during 1256 // the handshake. Usually close to 16K. 1257 volatile int applicationBufferSize; 1258 // Despite of the maximum applicationBufferSize negotiated 1259 // above, TLS records usually have a much smaller payload. 1260 // The adaptativeAppBufferSize records the max payload 1261 // ever decoded, and we use that as a guess for how big 1262 // a buffer we will need for the next payload. 1263 // This avoids allocating and zeroing a 16K buffer for 1264 // nothing... 1265 volatile int adaptiveAppBufferSize; getAppBuffer()1266 final ByteBuffer getAppBuffer() { 1267 int appSize = applicationBufferSize; 1268 if (appSize <= 0) { 1269 applicationBufferSize = appSize 1270 = engine.getSession().getApplicationBufferSize(); 1271 } 1272 int size = adaptiveAppBufferSize; 1273 if (size <= 0) { 1274 size = 512; // start with 512 this is usually enough for handshaking / headers 1275 } else if (size > appSize) { 1276 size = appSize; 1277 } 1278 // will cause a BUFFER_OVERFLOW if not big enough, but 1279 // that's OK. 1280 return ByteBuffer.allocate(size); 1281 } 1282 dbgString()1283 final String dbgString() { 1284 return "SSLFlowDelegate(" + tubeName + ")"; 1285 } 1286 } 1287