1 /* 2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.common; 27 28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction; 29 30 import javax.net.ssl.SSLEngine; 31 import javax.net.ssl.SSLEngineResult; 32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 33 import javax.net.ssl.SSLEngineResult.Status; 34 import javax.net.ssl.SSLException; 35 import javax.net.ssl.SSLHandshakeException; 36 import java.io.IOException; 37 import java.lang.ref.Reference; 38 import java.lang.ref.ReferenceQueue; 39 import java.lang.ref.WeakReference; 40 import java.nio.ByteBuffer; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.Iterator; 44 import java.util.LinkedList; 45 import java.util.List; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.ConcurrentLinkedQueue; 48 import java.util.concurrent.Executor; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.Flow.Subscriber; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.function.Consumer; 53 import java.util.function.IntBinaryOperator; 54 55 /** 56 * Implements SSL using two SubscriberWrappers. 57 * 58 * <p> Constructor takes two Flow.Subscribers: one that receives the network 59 * data (after it has been encrypted by SSLFlowDelegate) data, and one that 60 * receives the application data (before it has been encrypted by SSLFlowDelegate). 61 * 62 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding 63 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. 64 * See diagram below. 65 * 66 * <p> How Flow.Subscribers are used in this class, and where they come from: 67 * <pre> 68 * {@code 69 * 70 * 71 * 72 * ---------> data flow direction 73 * 74 * 75 * +------------------+ 76 * upstreamWriter | | downWriter 77 * ---------------> | | ------------> 78 * obtained from this | | supplied to constructor 79 * | SSLFlowDelegate | 80 * downReader | | upstreamReader 81 * <--------------- | | <-------------- 82 * supplied to constructor | | obtained from this 83 * +------------------+ 84 * 85 * Errors are reported to the downReader Flow.Subscriber 86 * 87 * } 88 * </pre> 89 */ 90 public class SSLFlowDelegate { 91 92 final Logger debug = 93 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 94 95 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; 96 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); 97 // When handshake is in progress trying to wrap may produce no bytes. 98 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); 99 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); 100 private static final boolean isMonitored = 101 monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")); 102 103 final Executor exec; 104 final Reader reader; 105 final Writer writer; 106 final SSLEngine engine; 107 final String tubeName; // hack 108 final CompletableFuture<String> alpnCF; // completes on initial handshake 109 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped 110 volatile boolean close_notify_received; 111 final CompletableFuture<Void> readerCF; 112 final CompletableFuture<Void> writerCF; 113 final CompletableFuture<Void> stopCF; 114 final Consumer<ByteBuffer> recycler; 115 static AtomicInteger scount = new AtomicInteger(1); 116 final int id; 117 118 /** 119 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 120 * Flow.Subscriber requires an associated {@link CompletableFuture} 121 * for errors that need to be signaled from downstream to upstream. 122 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123 public SSLFlowDelegate(SSLEngine engine, 124 Executor exec, 125 Subscriber<? super List<ByteBuffer>> downReader, 126 Subscriber<? super List<ByteBuffer>> downWriter) 127 { 128 this(engine, exec, null, downReader, downWriter); 129 } 130 131 /** 132 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 133 * Flow.Subscriber requires an associated {@link CompletableFuture} 134 * for errors that need to be signaled from downstream to upstream. 135 */ SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136 public SSLFlowDelegate(SSLEngine engine, 137 Executor exec, 138 Consumer<ByteBuffer> recycler, 139 Subscriber<? super List<ByteBuffer>> downReader, 140 Subscriber<? super List<ByteBuffer>> downWriter) 141 { 142 this.id = scount.getAndIncrement(); 143 this.tubeName = String.valueOf(downWriter); 144 this.recycler = recycler; 145 this.reader = new Reader(); 146 this.writer = new Writer(); 147 this.engine = engine; 148 this.exec = exec; 149 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); 150 this.readerCF = reader.completion(); 151 this.writerCF = reader.completion(); 152 readerCF.exceptionally(this::stopOnError); 153 writerCF.exceptionally(this::stopOnError); 154 this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion()) 155 .thenRun(this::normalStop); 156 this.alpnCF = new MinimalFuture<>(); 157 158 // connect the Reader to the downReader and the 159 // Writer to the downWriter. 160 connect(downReader, downWriter); 161 162 if (isMonitored) Monitor.add(monitor); 163 } 164 165 /** 166 * Returns true if the SSLFlowDelegate has detected a TLS 167 * close_notify from the server. 168 * @return true, if a close_notify was detected. 169 */ closeNotifyReceived()170 public boolean closeNotifyReceived() { 171 return close_notify_received; 172 } 173 174 /** 175 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, 176 * and the write sink (downWriter) to the SSLFlowDelegate Writer. 177 * Called from within the constructor. Overwritten by SSLTube. 178 * 179 * @param downReader The left hand side read sink (typically, the 180 * HttpConnection read subscriber). 181 * @param downWriter The right hand side write sink (typically 182 * the SocketTube write subscriber). 183 */ connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184 void connect(Subscriber<? super List<ByteBuffer>> downReader, 185 Subscriber<? super List<ByteBuffer>> downWriter) { 186 this.reader.subscribe(downReader); 187 this.writer.subscribe(downWriter); 188 } 189 190 /** 191 * Returns a CompletableFuture<String> which completes after 192 * the initial handshake completes, and which contains the negotiated 193 * alpn. 194 */ alpn()195 public CompletableFuture<String> alpn() { 196 return alpnCF; 197 } 198 setALPN()199 private void setALPN() { 200 // Handshake is finished. So, can retrieve the ALPN now 201 if (alpnCF.isDone()) 202 return; 203 String alpn = engine.getApplicationProtocol(); 204 if (debug.on()) debug.log("setALPN = %s", alpn); 205 alpnCF.complete(alpn); 206 } 207 monitor()208 public String monitor() { 209 StringBuilder sb = new StringBuilder(); 210 sb.append("SSL: id ").append(id); 211 sb.append(" ").append(dbgString()); 212 sb.append(" HS state: " + states(handshakeState)); 213 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); 214 if (stateList != null) { 215 sb.append(" LL : "); 216 for (String s : stateList) { 217 sb.append(s).append(" "); 218 } 219 } 220 sb.append("\r\n"); 221 sb.append("Reader:: ").append(reader.toString()); 222 sb.append("\r\n"); 223 sb.append("Writer:: ").append(writer.toString()); 224 sb.append("\r\n==================================="); 225 return sb.toString(); 226 } 227 enterReadScheduling()228 protected SchedulingAction enterReadScheduling() { 229 return SchedulingAction.CONTINUE; 230 } 231 232 233 /** 234 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). 235 * Any decrypted buffers returned to be passed downstream. 236 * Status codes: 237 * NEED_UNWRAP: do nothing. Following incoming data will contain 238 * any required handshake data 239 * NEED_WRAP: call writer.addData() with empty buffer 240 * NEED_TASK: delegate task to executor 241 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap 242 * BUFFER_UNDERFLOW: keep buffer and wait for more data 243 * OK: return generated buffers. 244 * 245 * Upstream subscription strategy is to try and keep no more than 246 * TARGET_BUFSIZE bytes in readBuf 247 */ 248 final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { 249 // Maximum record size is 16k. 250 // Because SocketTube can feeds us up to 3 16K buffers, 251 // then setting this size to 16K means that the readBuf 252 // can store up to 64K-1 (16K-1 + 3*16K) 253 static final int TARGET_BUFSIZE = 16 * 1024; 254 255 final SequentialScheduler scheduler; 256 volatile ByteBuffer readBuf; 257 volatile boolean completing; 258 final Object readBufferLock = new Object(); 259 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 260 261 private final class ReaderDownstreamPusher implements Runnable { 262 @Override run()263 public void run() { 264 processData(); 265 } 266 } 267 Reader()268 Reader() { 269 super(); 270 scheduler = SequentialScheduler.synchronizedScheduler( 271 new ReaderDownstreamPusher()); 272 this.readBuf = ByteBuffer.allocate(1024); 273 readBuf.limit(0); // keep in read mode 274 } 275 276 @Override supportsRecycling()277 public boolean supportsRecycling() { 278 return recycler != null; 279 } 280 enterScheduling()281 protected SchedulingAction enterScheduling() { 282 return enterReadScheduling(); 283 } 284 dbgString()285 public final String dbgString() { 286 return "SSL Reader(" + tubeName + ")"; 287 } 288 289 /** 290 * entry point for buffers delivered from upstream Subscriber 291 */ 292 @Override incoming(List<ByteBuffer> buffers, boolean complete)293 public void incoming(List<ByteBuffer> buffers, boolean complete) { 294 if (debugr.on()) 295 debugr.log("Adding %d bytes to read buffer", 296 Utils.remaining(buffers)); 297 addToReadBuf(buffers, complete); 298 scheduler.runOrSchedule(exec); 299 } 300 301 @Override toString()302 public String toString() { 303 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() 304 + ", count: " + count.toString() + ", scheduler: " 305 + (scheduler.isStopped() ? "stopped" : "running") 306 + ", status: " + lastUnwrapStatus 307 + ", handshakeState: " + handshakeState.get() 308 + ", engine: " + engine.getHandshakeStatus(); 309 } 310 reallocReadBuf()311 private void reallocReadBuf() { 312 int sz = readBuf.capacity(); 313 ByteBuffer newb = ByteBuffer.allocate(sz * 2); 314 readBuf.flip(); 315 Utils.copy(readBuf, newb); 316 readBuf = newb; 317 } 318 319 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)320 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 321 if (readBuf.remaining() > TARGET_BUFSIZE) { 322 if (debugr.on()) 323 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", 324 readBuf.remaining()); 325 return 0; 326 } else { 327 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 328 } 329 } 330 331 // readBuf is kept ready for reading outside of this method addToReadBuf(List<ByteBuffer> buffers, boolean complete)332 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { 333 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); 334 synchronized (readBufferLock) { 335 for (ByteBuffer buf : buffers) { 336 readBuf.compact(); 337 while (readBuf.remaining() < buf.remaining()) 338 reallocReadBuf(); 339 readBuf.put(buf); 340 readBuf.flip(); 341 // should be safe to call inside lock 342 // since the only implementation 343 // offers the buffer to an unbounded queue. 344 // WARNING: do not touch buf after this point! 345 if (recycler != null) recycler.accept(buf); 346 } 347 if (complete) { 348 this.completing = complete; 349 minBytesRequired = 0; 350 } 351 } 352 } 353 schedule()354 void schedule() { 355 scheduler.runOrSchedule(exec); 356 } 357 stop()358 void stop() { 359 if (debugr.on()) debugr.log("stop"); 360 scheduler.stop(); 361 } 362 363 AtomicInteger count = new AtomicInteger(0); 364 365 // minimum number of bytes required to call unwrap. 366 // Usually this is 0, unless there was a buffer underflow. 367 // In this case we need to wait for more bytes than what 368 // we had before calling unwrap() again. 369 volatile int minBytesRequired; 370 371 // work function where it all happens processData()372 final void processData() { 373 try { 374 if (debugr.on()) 375 debugr.log("processData:" 376 + " readBuf remaining:" + readBuf.remaining() 377 + ", state:" + states(handshakeState) 378 + ", engine handshake status:" + engine.getHandshakeStatus()); 379 int len; 380 boolean complete = false; 381 while (readBuf.remaining() > (len = minBytesRequired)) { 382 boolean handshaking = false; 383 try { 384 EngineResult result; 385 synchronized (readBufferLock) { 386 complete = this.completing; 387 if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); 388 // Unless there is a BUFFER_UNDERFLOW, we should try to 389 // unwrap any number of bytes. Set minBytesRequired to 0: 390 // we only need to do that if minBytesRequired is not already 0. 391 len = len > 0 ? minBytesRequired = 0 : len; 392 result = unwrapBuffer(readBuf); 393 len = readBuf.remaining(); 394 if (debugr.on()) { 395 debugr.log("Unwrapped: result: %s", result.result); 396 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); 397 } 398 } 399 if (result.bytesProduced() > 0) { 400 if (debugr.on()) 401 debugr.log("sending %d", result.bytesProduced()); 402 count.addAndGet(result.bytesProduced()); 403 outgoing(result.destBuffer, false); 404 } 405 if (result.status() == Status.BUFFER_UNDERFLOW) { 406 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); 407 // not enough data in the read buffer... 408 // no need to try to unwrap again unless we get more bytes 409 // than minBytesRequired = len in the read buffer. 410 synchronized (readBufferLock) { 411 minBytesRequired = len; 412 // more bytes could already have been added... 413 assert readBuf.remaining() >= len; 414 // check if we have received some data, and if so 415 // we can just re-spin the loop 416 if (readBuf.remaining() > len) continue; 417 else if (this.completing) { 418 if (debug.on()) { 419 debugr.log("BUFFER_UNDERFLOW with EOF," + 420 " %d bytes non decrypted.", len); 421 } 422 // The channel won't send us any more data, and 423 // we are in underflow: we need to fail. 424 throw new IOException("BUFFER_UNDERFLOW with EOF, " 425 + len + " bytes non decrypted."); 426 } 427 } 428 // request more data and return. 429 requestMore(); 430 return; 431 } 432 if (complete && result.status() == Status.CLOSED) { 433 if (debugr.on()) debugr.log("Closed: completing"); 434 outgoing(Utils.EMPTY_BB_LIST, true); 435 // complete ALPN if not yet completed 436 setALPN(); 437 return; 438 } 439 if (result.handshaking()) { 440 handshaking = true; 441 if (debugr.on()) debugr.log("handshaking"); 442 if (doHandshake(result, READER)) continue; // need unwrap 443 else break; // doHandshake will have triggered the write scheduler if necessary 444 } else { 445 if (trySetALPN()) { 446 resumeActivity(); 447 } 448 } 449 } catch (IOException ex) { 450 errorCommon(ex); 451 handleError(ex); 452 return; 453 } 454 if (handshaking && !complete) 455 return; 456 } 457 if (!complete) { 458 synchronized (readBufferLock) { 459 complete = this.completing && !readBuf.hasRemaining(); 460 } 461 } 462 if (complete) { 463 if (debugr.on()) debugr.log("completing"); 464 // Complete the alpnCF, if not already complete, regardless of 465 // whether or not the ALPN is available, there will be no more 466 // activity. 467 setALPN(); 468 outgoing(Utils.EMPTY_BB_LIST, true); 469 } 470 } catch (Throwable ex) { 471 errorCommon(ex); 472 handleError(ex); 473 } 474 } 475 476 private volatile Status lastUnwrapStatus; unwrapBuffer(ByteBuffer src)477 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { 478 ByteBuffer dst = getAppBuffer(); 479 int len = src.remaining(); 480 while (true) { 481 SSLEngineResult sslResult = engine.unwrap(src, dst); 482 switch (lastUnwrapStatus = sslResult.getStatus()) { 483 case BUFFER_OVERFLOW: 484 // may happen if app size buffer was changed, or if 485 // our 'adaptiveBufferSize' guess was too small for 486 // the current payload. In that case, update the 487 // value of applicationBufferSize, and allocate a 488 // buffer of that size, which we are sure will be 489 // big enough to decode whatever needs to be 490 // decoded. We will later update adaptiveBufferSize 491 // in OK: below. 492 int appSize = applicationBufferSize = 493 engine.getSession().getApplicationBufferSize(); 494 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 495 dst.flip(); 496 b.put(dst); 497 dst = b; 498 break; 499 case CLOSED: 500 assert dst.position() == 0; 501 return doClosure(new EngineResult(sslResult)); 502 case BUFFER_UNDERFLOW: 503 // handled implicitly by compaction/reallocation of readBuf 504 assert dst.position() == 0; 505 return new EngineResult(sslResult); 506 case OK: 507 int size = dst.position(); 508 if (debug.on()) { 509 debugr.log("Decoded " + size + " bytes out of " + len 510 + " into buffer of " + dst.capacity() 511 + " remaining to decode: " + src.remaining()); 512 } 513 // if the record payload was bigger than what was originally 514 // allocated, then sets the adaptiveAppBufferSize to size 515 // and we will use that new size as a guess for the next app 516 // buffer. 517 if (size > adaptiveAppBufferSize) { 518 adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; 519 } 520 dst.flip(); 521 return new EngineResult(sslResult, dst); 522 } 523 } 524 } 525 } 526 527 public interface Monitorable { getInfo()528 public String getInfo(); 529 } 530 531 public static class Monitor extends Thread { 532 final List<WeakReference<Monitorable>> list; 533 final List<FinalMonitorable> finalList; 534 final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>(); 535 static Monitor themon; 536 537 static { 538 themon = new Monitor(); themon.start()539 themon.start(); // uncomment to enable Monitor 540 } 541 542 // An instance used to temporarily store the 543 // last observable state of a monitorable object. 544 // When Monitor.remove(o) is called, we replace 545 // 'o' with a FinalMonitorable whose reference 546 // will be enqueued after the last observable state 547 // has been printed. 548 final class FinalMonitorable implements Monitorable { 549 final String finalState; FinalMonitorable(Monitorable o)550 FinalMonitorable(Monitorable o) { 551 finalState = o.getInfo(); 552 finalList.add(this); 553 } 554 @Override getInfo()555 public String getInfo() { 556 finalList.remove(this); 557 return finalState; 558 } 559 } 560 Monitor()561 Monitor() { 562 super("Monitor"); 563 setDaemon(true); 564 list = Collections.synchronizedList(new LinkedList<>()); 565 finalList = new ArrayList<>(); // access is synchronized on list above 566 } 567 addTarget(Monitorable o)568 void addTarget(Monitorable o) { 569 list.add(new WeakReference<>(o, queue)); 570 } removeTarget(Monitorable o)571 void removeTarget(Monitorable o) { 572 // It can take a long time for GC to clean up references. 573 // Calling Monitor.remove() early helps removing noise from the 574 // logs/ 575 synchronized (list) { 576 Iterator<WeakReference<Monitorable>> it = list.iterator(); 577 while (it.hasNext()) { 578 Monitorable m = it.next().get(); 579 if (m == null) it.remove(); 580 if (o == m) { 581 it.remove(); 582 break; 583 } 584 } 585 FinalMonitorable m = new FinalMonitorable(o); 586 addTarget(m); 587 Reference.reachabilityFence(m); 588 } 589 } 590 add(Monitorable o)591 public static void add(Monitorable o) { 592 themon.addTarget(o); 593 } remove(Monitorable o)594 public static void remove(Monitorable o) { 595 themon.removeTarget(o); 596 } 597 598 @Override run()599 public void run() { 600 System.out.println("Monitor starting"); 601 try { 602 while (true) { 603 Thread.sleep(20 * 1000); 604 synchronized (list) { 605 Reference<? extends Monitorable> expired; 606 while ((expired = queue.poll()) != null) list.remove(expired); 607 for (WeakReference<Monitorable> ref : list) { 608 Monitorable o = ref.get(); 609 if (o == null) continue; 610 if (o instanceof FinalMonitorable) { 611 ref.enqueue(); 612 } 613 System.out.println(o.getInfo()); 614 System.out.println("-------------------------"); 615 } 616 } 617 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); 618 } 619 } catch (InterruptedException e) { 620 System.out.println("Monitor exiting with " + e); 621 } 622 } 623 } 624 625 /** 626 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() 627 * Any encrypted buffers generated are passed downstream to be written. 628 * Status codes: 629 * NEED_UNWRAP: call reader.addData() with empty buffer 630 * NEED_WRAP: call addData() with empty buffer 631 * NEED_TASK: delegate task to executor 632 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap 633 * BUFFER_UNDERFLOW: shouldn't happen on writing side 634 * OK: return generated buffers 635 */ 636 class Writer extends SubscriberWrapper { 637 final SequentialScheduler scheduler; 638 // queues of buffers received from upstream waiting 639 // to be processed by the SSLEngine 640 final List<ByteBuffer> writeList; 641 final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 642 volatile boolean completing; 643 boolean completed; // only accessed in processData 644 645 class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { run()646 @Override public void run() { processData(); } 647 } 648 Writer()649 Writer() { 650 super(); 651 writeList = Collections.synchronizedList(new LinkedList<>()); 652 scheduler = new SequentialScheduler(new WriterDownstreamPusher()); 653 } 654 655 @Override incoming(List<ByteBuffer> buffers, boolean complete)656 protected void incoming(List<ByteBuffer> buffers, boolean complete) { 657 assert complete ? buffers == Utils.EMPTY_BB_LIST : true; 658 assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; 659 if (complete) { 660 if (debugw.on()) debugw.log("adding SENTINEL"); 661 completing = true; 662 writeList.add(SENTINEL); 663 } else { 664 writeList.addAll(buffers); 665 } 666 if (debugw.on()) 667 debugw.log("added " + buffers.size() 668 + " (" + Utils.remaining(buffers) 669 + " bytes) to the writeList"); 670 scheduler.runOrSchedule(); 671 } 672 dbgString()673 public final String dbgString() { 674 return "SSL Writer(" + tubeName + ")"; 675 } 676 onSubscribe()677 protected void onSubscribe() { 678 if (debugw.on()) debugw.log("onSubscribe initiating handshaking"); 679 addData(HS_TRIGGER); // initiates handshaking 680 } 681 schedule()682 void schedule() { 683 scheduler.runOrSchedule(); 684 } 685 stop()686 void stop() { 687 if (debugw.on()) debugw.log("stop"); 688 scheduler.stop(); 689 } 690 691 @Override closing()692 public boolean closing() { 693 return closeNotifyReceived(); 694 } 695 isCompleting()696 private boolean isCompleting() { 697 return completing; 698 } 699 700 @Override upstreamWindowUpdate(long currentWindow, long downstreamQsize)701 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 702 if (writeList.size() > 10) 703 return 0; 704 else 705 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 706 } 707 hsTriggered()708 private boolean hsTriggered() { 709 synchronized(writeList) { 710 for (ByteBuffer b : writeList) 711 if (b == HS_TRIGGER) 712 return true; 713 return false; 714 } 715 } 716 triggerWrite()717 void triggerWrite() { 718 synchronized (writeList) { 719 if (writeList.isEmpty()) { 720 writeList.add(HS_TRIGGER); 721 } 722 } 723 scheduler.runOrSchedule(); 724 } 725 processData()726 private void processData() { 727 boolean completing = isCompleting(); 728 729 try { 730 if (debugw.on()) 731 debugw.log("processData, writeList remaining:" 732 + Utils.remaining(writeList) + ", hsTriggered:" 733 + hsTriggered() + ", needWrap:" + needWrap()); 734 735 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { 736 ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); 737 EngineResult result = wrapBuffers(outbufs); 738 if (debugw.on()) 739 debugw.log("wrapBuffer returned %s", result.result); 740 741 if (result.status() == Status.CLOSED) { 742 if (!upstreamCompleted) { 743 upstreamCompleted = true; 744 upstreamSubscription.cancel(); 745 // complete ALPN if not yet completed 746 setALPN(); 747 } 748 if (result.bytesProduced() <= 0) 749 return; 750 751 if (!completing && !completed) { 752 completing = this.completing = true; 753 // There could still be some outgoing data in outbufs. 754 writeList.add(SENTINEL); 755 } 756 } 757 758 boolean handshaking = false; 759 if (result.handshaking()) { 760 if (debugw.on()) debugw.log("handshaking"); 761 doHandshake(result, WRITER); // ok to ignore return 762 handshaking = true; 763 } else { 764 if (trySetALPN()) { 765 resumeActivity(); 766 } 767 } 768 cleanList(writeList); // tidy up the source list 769 sendResultBytes(result); 770 if (handshaking) { 771 if (!completing && needWrap()) { 772 continue; 773 } else { 774 return; 775 } 776 } 777 } 778 if (completing && Utils.remaining(writeList) == 0) { 779 if (!completed) { 780 completed = true; 781 writeList.clear(); 782 outgoing(Utils.EMPTY_BB_LIST, true); 783 } 784 return; 785 } 786 if (writeList.isEmpty() && needWrap()) { 787 writer.addData(HS_TRIGGER); 788 } 789 } catch (Throwable ex) { 790 errorCommon(ex); 791 handleError(ex); 792 } 793 } 794 795 // The SSLEngine insists on being given a buffer that is at least 796 // SSLSession.getPacketBufferSize() long (usually 16K). If given 797 // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only 798 // has 6 bytes to wrap. Typical usage shows that for GET we 799 // usually produce an average of ~ 100 bytes. 800 // To avoid wasting space, and because allocating and zeroing 801 // 16K buffers for encoding 6 bytes is costly, we are reusing the 802 // same writeBuffer to interact with SSLEngine.wrap(). 803 // If the SSLEngine produces less than writeBuffer.capacity() / 2, 804 // then we copy off the bytes to a smaller buffer that we send 805 // downstream. Otherwise, we send the writeBuffer downstream 806 // and will allocate a new one next time. 807 volatile ByteBuffer writeBuffer; 808 private volatile Status lastWrappedStatus; 809 @SuppressWarnings("fallthrough") wrapBuffers(ByteBuffer[] src)810 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 811 long len = Utils.remaining(src); 812 if (debugw.on()) 813 debugw.log("wrapping " + len + " bytes"); 814 815 ByteBuffer dst = writeBuffer; 816 if (dst == null) dst = writeBuffer = getNetBuffer(); 817 assert dst.position() == 0 : "buffer position is " + dst.position(); 818 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); 819 820 while (true) { 821 SSLEngineResult sslResult = engine.wrap(src, dst); 822 if (debugw.on()) debugw.log("SSLResult: " + sslResult); 823 switch (lastWrappedStatus = sslResult.getStatus()) { 824 case BUFFER_OVERFLOW: 825 // Shouldn't happen. We allocated buffer with packet size 826 // get it again if net buffer size was changed 827 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); 828 int netSize = packetBufferSize 829 = engine.getSession().getPacketBufferSize(); 830 ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); 831 dst.flip(); 832 b.put(dst); 833 dst = b; 834 break; // try again 835 case CLOSED: 836 if (debugw.on()) debugw.log("CLOSED"); 837 // fallthrough. There could be some remaining data in dst. 838 // CLOSED will be handled by the caller. 839 case OK: 840 final ByteBuffer dest; 841 if (dst.position() == 0) { 842 dest = NOTHING; // can happen if handshake is in progress 843 } else if (dst.position() < dst.capacity() / 2) { 844 // less than half the buffer was used. 845 // copy off the bytes to a smaller buffer, and keep 846 // the writeBuffer for next time. 847 dst.flip(); 848 dest = Utils.copyAligned(dst); 849 dst.clear(); 850 } else { 851 // more than half the buffer was used. 852 // just send that buffer downstream, and we will 853 // get a new writeBuffer next time it is needed. 854 dst.flip(); 855 dest = dst; 856 writeBuffer = null; 857 } 858 if (debugw.on()) 859 debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", 860 dest.remaining(), dest.capacity(), Utils.remaining(src)); 861 return new EngineResult(sslResult, dest); 862 case BUFFER_UNDERFLOW: 863 // Shouldn't happen. Doesn't returns when wrap() 864 // underflow handled externally 865 // assert false : "Buffer Underflow"; 866 if (debug.on()) debug.log("BUFFER_UNDERFLOW"); 867 return new EngineResult(sslResult); 868 default: 869 if (debugw.on()) 870 debugw.log("result: %s", sslResult.getStatus()); 871 assert false : "result:" + sslResult.getStatus(); 872 } 873 } 874 } 875 needWrap()876 private boolean needWrap() { 877 return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; 878 } 879 sendResultBytes(EngineResult result)880 private void sendResultBytes(EngineResult result) { 881 if (result.bytesProduced() > 0) { 882 if (debugw.on()) 883 debugw.log("Sending %d bytes downstream", 884 result.bytesProduced()); 885 outgoing(result.destBuffer, false); 886 } 887 } 888 889 @Override toString()890 public String toString() { 891 return "WRITER: " + super.toString() 892 + ", writeList size: " + Integer.toString(writeList.size()) 893 + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") 894 + ", status: " + lastWrappedStatus; 895 //" writeList: " + writeList.toString(); 896 } 897 } 898 handleError(Throwable t)899 private void handleError(Throwable t) { 900 if (debug.on()) debug.log("handleError", t); 901 readerCF.completeExceptionally(t); 902 writerCF.completeExceptionally(t); 903 // no-op if already completed 904 alpnCF.completeExceptionally(t); 905 reader.stop(); 906 writer.stop(); 907 } 908 909 boolean stopped; 910 normalStop()911 private synchronized void normalStop() { 912 if (stopped) 913 return; 914 stopped = true; 915 reader.stop(); 916 writer.stop(); 917 // make sure the alpnCF is completed. 918 if (!alpnCF.isDone()) { 919 Throwable alpn = new SSLHandshakeException( 920 "Connection closed before successful ALPN negotiation"); 921 alpnCF.completeExceptionally(alpn); 922 } 923 if (isMonitored) Monitor.remove(monitor); 924 } 925 stopOnError(Throwable error)926 private Void stopOnError(Throwable error) { 927 // maybe log, etc 928 // ensure the ALPN is completed 929 // We could also do this in SSLTube.SSLSubscriberWrapper 930 // onError/onComplete - with the caveat that the ALP CF 931 // would get completed externally. Doing it here keeps 932 // it all inside SSLFlowDelegate. 933 if (!alpnCF.isDone()) { 934 alpnCF.completeExceptionally(error); 935 } 936 normalStop(); 937 return null; 938 } 939 cleanList(List<ByteBuffer> l)940 private void cleanList(List<ByteBuffer> l) { 941 synchronized (l) { 942 Iterator<ByteBuffer> iter = l.iterator(); 943 while (iter.hasNext()) { 944 ByteBuffer b = iter.next(); 945 if (!b.hasRemaining() && b != SENTINEL) { 946 iter.remove(); 947 } 948 } 949 } 950 } 951 952 /** 953 * States for handshake. We avoid races when accessing/updating the AtomicInt 954 * because updates always schedule an additional call to both the read() 955 * and write() functions. 956 */ 957 private static final int NOT_HANDSHAKING = 0; 958 private static final int HANDSHAKING = 1; 959 960 // Bit flags 961 // a thread is currently executing tasks 962 private static final int DOING_TASKS = 4; 963 // a thread wants to execute tasks, while another thread is executing 964 private static final int REQUESTING_TASKS = 8; 965 private static final int TASK_BITS = 12; // Both bits 966 967 private static final int READER = 1; 968 private static final int WRITER = 2; 969 states(AtomicInteger state)970 private static String states(AtomicInteger state) { 971 int s = state.get(); 972 StringBuilder sb = new StringBuilder(); 973 int x = s & ~TASK_BITS; 974 switch (x) { 975 case NOT_HANDSHAKING: 976 sb.append(" NOT_HANDSHAKING "); 977 break; 978 case HANDSHAKING: 979 sb.append(" HANDSHAKING "); 980 break; 981 default: 982 throw new InternalError(); 983 } 984 if ((s & DOING_TASKS) > 0) 985 sb.append("|DOING_TASKS"); 986 if ((s & REQUESTING_TASKS) > 0) 987 sb.append("|REQUESTING_TASKS"); 988 return sb.toString(); 989 } 990 resumeActivity()991 private void resumeActivity() { 992 reader.schedule(); 993 writer.schedule(); 994 } 995 996 final AtomicInteger handshakeState; 997 final ConcurrentLinkedQueue<String> stateList = 998 debug.on() ? new ConcurrentLinkedQueue<>() : null; 999 1000 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS 1001 // depending on previous value 1002 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { 1003 if ((current & DOING_TASKS) == 0) 1004 return DOING_TASKS | (current & HANDSHAKING); 1005 else 1006 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); 1007 }; 1008 1009 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set 1010 // clears bits if not. 1011 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { 1012 if ((current & REQUESTING_TASKS) != 0) 1013 return DOING_TASKS | (current & HANDSHAKING); 1014 // clear both bits 1015 return (current & HANDSHAKING); 1016 }; 1017 doHandshake(EngineResult r, int caller)1018 private boolean doHandshake(EngineResult r, int caller) { 1019 // unconditionally sets the HANDSHAKING bit, while preserving task bits 1020 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); 1021 if (stateList != null && debug.on()) { 1022 stateList.add(r.handshakeStatus().toString()); 1023 stateList.add(Integer.toString(caller)); 1024 } 1025 switch (r.handshakeStatus()) { 1026 case NEED_TASK: 1027 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); 1028 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks 1029 return false; 1030 } 1031 1032 if (debug.on()) debug.log("obtaining and initiating task execution"); 1033 List<Runnable> tasks = obtainTasks(); 1034 executeTasks(tasks); 1035 return false; // executeTasks will resume activity 1036 case NEED_WRAP: 1037 if (caller == READER) { 1038 writer.triggerWrite(); 1039 return false; 1040 } 1041 break; 1042 case NEED_UNWRAP: 1043 case NEED_UNWRAP_AGAIN: 1044 // do nothing else 1045 // receiving-side data will trigger unwrap 1046 if (caller == WRITER) { 1047 reader.schedule(); 1048 return false; 1049 } 1050 break; 1051 default: 1052 throw new InternalError("Unexpected handshake status:" 1053 + r.handshakeStatus()); 1054 } 1055 return true; 1056 } 1057 obtainTasks()1058 private List<Runnable> obtainTasks() { 1059 List<Runnable> l = new ArrayList<>(); 1060 Runnable r; 1061 while ((r = engine.getDelegatedTask()) != null) { 1062 l.add(r); 1063 } 1064 return l; 1065 } 1066 executeTasks(List<Runnable> tasks)1067 private void executeTasks(List<Runnable> tasks) { 1068 exec.execute(() -> { 1069 try { 1070 List<Runnable> nextTasks = tasks; 1071 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); 1072 do { 1073 nextTasks.forEach(Runnable::run); 1074 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { 1075 nextTasks = obtainTasks(); 1076 } else { 1077 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); 1078 if ((s & DOING_TASKS) != 0) { 1079 if (debug.on()) debug.log("re-running tasks (B)"); 1080 nextTasks = obtainTasks(); 1081 continue; 1082 } 1083 break; 1084 } 1085 } while (true); 1086 if (debug.on()) debug.log("finished task execution"); 1087 HandshakeStatus hs = engine.getHandshakeStatus(); 1088 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) { 1089 // We're no longer handshaking, try setting ALPN 1090 trySetALPN(); 1091 } 1092 resumeActivity(); 1093 } catch (Throwable t) { 1094 handleError(t); 1095 } 1096 }); 1097 } 1098 trySetALPN()1099 boolean trySetALPN() { 1100 // complete ALPN CF if needed. 1101 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 1102 applicationBufferSize = engine.getSession().getApplicationBufferSize(); 1103 packetBufferSize = engine.getSession().getPacketBufferSize(); 1104 setALPN(); 1105 return true; 1106 } 1107 return false; 1108 } 1109 1110 // FIXME: acknowledge a received CLOSE request from peer doClosure(EngineResult r)1111 EngineResult doClosure(EngineResult r) throws IOException { 1112 if (debug.on()) 1113 debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", 1114 r.result, engine.getHandshakeStatus(), 1115 engine.isOutboundDone(), engine.isInboundDone()); 1116 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { 1117 // we have received TLS close_notify and need to send 1118 // an acknowledgement back. We're calling doHandshake 1119 // to finish the close handshake. 1120 if (engine.isInboundDone() && !engine.isOutboundDone()) { 1121 if (debug.on()) debug.log("doClosure: close_notify received"); 1122 close_notify_received = true; 1123 if (!writer.scheduler.isStopped()) { 1124 doHandshake(r, READER); 1125 } else { 1126 // We have received closed notify, but we 1127 // won't be able to send the acknowledgement. 1128 // Nothing more will come from the socket either, 1129 // so mark the reader as completed. 1130 synchronized (reader.readBufferLock) { 1131 reader.completing = true; 1132 } 1133 } 1134 } 1135 } 1136 return r; 1137 } 1138 1139 /** 1140 * Returns the upstream Flow.Subscriber of the reading (incoming) side. 1141 * This flow must be given the encrypted data read from upstream (eg socket) 1142 * before it is decrypted. 1143 */ upstreamReader()1144 public Flow.Subscriber<List<ByteBuffer>> upstreamReader() { 1145 return reader; 1146 } 1147 1148 /** 1149 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. 1150 * This flow contains the plaintext data before it is encrypted. 1151 */ upstreamWriter()1152 public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() { 1153 return writer; 1154 } 1155 resumeReader()1156 public boolean resumeReader() { 1157 return reader.signalScheduling(); 1158 } 1159 resetReaderDemand()1160 public void resetReaderDemand() { 1161 reader.resetDownstreamDemand(); 1162 } 1163 1164 static class EngineResult { 1165 final SSLEngineResult result; 1166 final ByteBuffer destBuffer; 1167 1168 // normal result EngineResult(SSLEngineResult result)1169 EngineResult(SSLEngineResult result) { 1170 this(result, null); 1171 } 1172 EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1173 EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { 1174 this.result = result; 1175 this.destBuffer = destBuffer; 1176 } 1177 handshaking()1178 boolean handshaking() { 1179 HandshakeStatus s = result.getHandshakeStatus(); 1180 return s != HandshakeStatus.FINISHED 1181 && s != HandshakeStatus.NOT_HANDSHAKING 1182 && result.getStatus() != Status.CLOSED; 1183 } 1184 needUnwrap()1185 boolean needUnwrap() { 1186 HandshakeStatus s = result.getHandshakeStatus(); 1187 return s == HandshakeStatus.NEED_UNWRAP; 1188 } 1189 1190 bytesConsumed()1191 int bytesConsumed() { 1192 return result.bytesConsumed(); 1193 } 1194 bytesProduced()1195 int bytesProduced() { 1196 return result.bytesProduced(); 1197 } 1198 handshakeStatus()1199 SSLEngineResult.HandshakeStatus handshakeStatus() { 1200 return result.getHandshakeStatus(); 1201 } 1202 status()1203 SSLEngineResult.Status status() { 1204 return result.getStatus(); 1205 } 1206 } 1207 1208 // The maximum network buffer size negotiated during 1209 // the handshake. Usually 16K. 1210 volatile int packetBufferSize; getNetBuffer()1211 final ByteBuffer getNetBuffer() { 1212 int netSize = packetBufferSize; 1213 if (netSize <= 0) { 1214 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); 1215 } 1216 return ByteBuffer.allocate(netSize); 1217 } 1218 1219 // The maximum application buffer size negotiated during 1220 // the handshake. Usually close to 16K. 1221 volatile int applicationBufferSize; 1222 // Despite of the maximum applicationBufferSize negotiated 1223 // above, TLS records usually have a much smaller payload. 1224 // The adaptativeAppBufferSize records the max payload 1225 // ever decoded, and we use that as a guess for how big 1226 // a buffer we will need for the next payload. 1227 // This avoids allocating and zeroing a 16K buffer for 1228 // nothing... 1229 volatile int adaptiveAppBufferSize; getAppBuffer()1230 final ByteBuffer getAppBuffer() { 1231 int appSize = applicationBufferSize; 1232 if (appSize <= 0) { 1233 applicationBufferSize = appSize 1234 = engine.getSession().getApplicationBufferSize(); 1235 } 1236 int size = adaptiveAppBufferSize; 1237 if (size <= 0) { 1238 size = 512; // start with 512 this is usually enough for handshaking / headers 1239 } else if (size > appSize) { 1240 size = appSize; 1241 } 1242 // will cause a BUFFER_OVERFLOW if not big enough, but 1243 // that's OK. 1244 return ByteBuffer.allocate(size); 1245 } 1246 dbgString()1247 final String dbgString() { 1248 return "SSLFlowDelegate(" + tubeName + ")"; 1249 } 1250 } 1251