1 /*
2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.common;
27 
28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
29 
30 import javax.net.ssl.SSLEngine;
31 import javax.net.ssl.SSLEngineResult;
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
33 import javax.net.ssl.SSLEngineResult.Status;
34 import javax.net.ssl.SSLException;
35 import javax.net.ssl.SSLHandshakeException;
36 import java.io.IOException;
37 import java.lang.ref.Reference;
38 import java.lang.ref.ReferenceQueue;
39 import java.lang.ref.WeakReference;
40 import java.nio.ByteBuffer;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.Flow;
50 import java.util.concurrent.Flow.Subscriber;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.function.Consumer;
53 import java.util.function.IntBinaryOperator;
54 
55 /**
56  * Implements SSL using two SubscriberWrappers.
57  *
58  * <p> Constructor takes two Flow.Subscribers: one that receives the network
59  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
60  * receives the application data (before it has been encrypted by SSLFlowDelegate).
61  *
62  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
63  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
64  * See diagram below.
65  *
66  * <p> How Flow.Subscribers are used in this class, and where they come from:
67  * <pre>
68  * {@code
69  *
70  *
71  *
72  * --------->  data flow direction
73  *
74  *
75  *                         +------------------+
76  *        upstreamWriter   |                  | downWriter
77  *        ---------------> |                  | ------------>
78  *  obtained from this     |                  | supplied to constructor
79  *                         | SSLFlowDelegate  |
80  *        downReader       |                  | upstreamReader
81  *        <--------------- |                  | <--------------
82  * supplied to constructor |                  | obtained from this
83  *                         +------------------+
84  *
85  * Errors are reported to the downReader Flow.Subscriber
86  *
87  * }
88  * </pre>
89  */
90 public class SSLFlowDelegate {
91 
92     final Logger debug =
93             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
94 
95     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
96     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
97     // When handshake is in progress trying to wrap may produce no bytes.
98     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
99     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
100     private static final boolean isMonitored =
101             monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
102 
103     final Executor exec;
104     final Reader reader;
105     final Writer writer;
106     final SSLEngine engine;
107     final String tubeName; // hack
108     final CompletableFuture<String> alpnCF; // completes on initial handshake
109     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
110     volatile boolean close_notify_received;
111     final CompletableFuture<Void> readerCF;
112     final CompletableFuture<Void> writerCF;
113     final CompletableFuture<Void> stopCF;
114     final Consumer<ByteBuffer> recycler;
115     static AtomicInteger scount = new AtomicInteger(1);
116     final int id;
117 
118     /**
119      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
120      * Flow.Subscriber requires an associated {@link CompletableFuture}
121      * for errors that need to be signaled from downstream to upstream.
122      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123     public SSLFlowDelegate(SSLEngine engine,
124                            Executor exec,
125                            Subscriber<? super List<ByteBuffer>> downReader,
126                            Subscriber<? super List<ByteBuffer>> downWriter)
127     {
128         this(engine, exec, null, downReader, downWriter);
129     }
130 
131     /**
132      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
133      * Flow.Subscriber requires an associated {@link CompletableFuture}
134      * for errors that need to be signaled from downstream to upstream.
135      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136     public SSLFlowDelegate(SSLEngine engine,
137             Executor exec,
138             Consumer<ByteBuffer> recycler,
139             Subscriber<? super List<ByteBuffer>> downReader,
140             Subscriber<? super List<ByteBuffer>> downWriter)
141         {
142         this.id = scount.getAndIncrement();
143         this.tubeName = String.valueOf(downWriter);
144         this.recycler = recycler;
145         this.reader = new Reader();
146         this.writer = new Writer();
147         this.engine = engine;
148         this.exec = exec;
149         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
150         this.readerCF = reader.completion();
151         this.writerCF = reader.completion();
152         readerCF.exceptionally(this::stopOnError);
153         writerCF.exceptionally(this::stopOnError);
154         this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
155             .thenRun(this::normalStop);
156         this.alpnCF = new MinimalFuture<>();
157 
158         // connect the Reader to the downReader and the
159         // Writer to the downWriter.
160         connect(downReader, downWriter);
161 
162         if (isMonitored) Monitor.add(monitor);
163     }
164 
165     /**
166      * Returns true if the SSLFlowDelegate has detected a TLS
167      * close_notify from the server.
168      * @return true, if a close_notify was detected.
169      */
closeNotifyReceived()170     public boolean closeNotifyReceived() {
171         return close_notify_received;
172     }
173 
174     /**
175      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
176      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
177      * Called from within the constructor. Overwritten by SSLTube.
178      *
179      * @param downReader  The left hand side read sink (typically, the
180      *                    HttpConnection read subscriber).
181      * @param downWriter  The right hand side write sink (typically
182      *                    the SocketTube write subscriber).
183      */
connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184     void connect(Subscriber<? super List<ByteBuffer>> downReader,
185                  Subscriber<? super List<ByteBuffer>> downWriter) {
186         this.reader.subscribe(downReader);
187         this.writer.subscribe(downWriter);
188     }
189 
190    /**
191     * Returns a CompletableFuture<String> which completes after
192     * the initial handshake completes, and which contains the negotiated
193     * alpn.
194     */
alpn()195     public CompletableFuture<String> alpn() {
196         return alpnCF;
197     }
198 
setALPN()199     private void setALPN() {
200         // Handshake is finished. So, can retrieve the ALPN now
201         if (alpnCF.isDone())
202             return;
203         String alpn = engine.getApplicationProtocol();
204         if (debug.on()) debug.log("setALPN = %s", alpn);
205         alpnCF.complete(alpn);
206     }
207 
monitor()208     public String monitor() {
209         StringBuilder sb = new StringBuilder();
210         sb.append("SSL: id ").append(id);
211         sb.append(" ").append(dbgString());
212         sb.append(" HS state: " + states(handshakeState));
213         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
214         if (stateList != null) {
215             sb.append(" LL : ");
216             for (String s : stateList) {
217                 sb.append(s).append(" ");
218             }
219         }
220         sb.append("\r\n");
221         sb.append("Reader:: ").append(reader.toString());
222         sb.append("\r\n");
223         sb.append("Writer:: ").append(writer.toString());
224         sb.append("\r\n===================================");
225         return sb.toString();
226     }
227 
enterReadScheduling()228     protected SchedulingAction enterReadScheduling() {
229         return SchedulingAction.CONTINUE;
230     }
231 
232 
233     /**
234      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
235      * Any decrypted buffers returned to be passed downstream.
236      * Status codes:
237      *     NEED_UNWRAP: do nothing. Following incoming data will contain
238      *                  any required handshake data
239      *     NEED_WRAP: call writer.addData() with empty buffer
240      *     NEED_TASK: delegate task to executor
241      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
242      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
243      *     OK: return generated buffers.
244      *
245      * Upstream subscription strategy is to try and keep no more than
246      * TARGET_BUFSIZE bytes in readBuf
247      */
248     final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
249         // Maximum record size is 16k.
250         // Because SocketTube can feeds us up to 3 16K buffers,
251         // then setting this size to 16K means that the readBuf
252         // can store up to 64K-1 (16K-1 + 3*16K)
253         static final int TARGET_BUFSIZE = 16 * 1024;
254 
255         final SequentialScheduler scheduler;
256         volatile ByteBuffer readBuf;
257         volatile boolean completing;
258         final Object readBufferLock = new Object();
259         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
260 
261         private final class ReaderDownstreamPusher implements Runnable {
262             @Override
run()263             public void run() {
264                 processData();
265             }
266         }
267 
Reader()268         Reader() {
269             super();
270             scheduler = SequentialScheduler.synchronizedScheduler(
271                     new ReaderDownstreamPusher());
272             this.readBuf = ByteBuffer.allocate(1024);
273             readBuf.limit(0); // keep in read mode
274         }
275 
276         @Override
supportsRecycling()277         public boolean supportsRecycling() {
278             return recycler != null;
279         }
280 
enterScheduling()281         protected SchedulingAction enterScheduling() {
282             return enterReadScheduling();
283         }
284 
dbgString()285         public final String dbgString() {
286             return "SSL Reader(" + tubeName + ")";
287         }
288 
289         /**
290          * entry point for buffers delivered from upstream Subscriber
291          */
292         @Override
incoming(List<ByteBuffer> buffers, boolean complete)293         public void incoming(List<ByteBuffer> buffers, boolean complete) {
294             if (debugr.on())
295                 debugr.log("Adding %d bytes to read buffer",
296                         Utils.remaining(buffers));
297             addToReadBuf(buffers, complete);
298             scheduler.runOrSchedule(exec);
299         }
300 
301         @Override
toString()302         public String toString() {
303             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
304                     + ", count: " + count.toString() + ", scheduler: "
305                     + (scheduler.isStopped() ? "stopped" : "running")
306                     + ", status: " + lastUnwrapStatus
307                     + ", handshakeState: " + handshakeState.get()
308                     + ", engine: " + engine.getHandshakeStatus();
309         }
310 
reallocReadBuf()311         private void reallocReadBuf() {
312             int sz = readBuf.capacity();
313             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
314             readBuf.flip();
315             Utils.copy(readBuf, newb);
316             readBuf = newb;
317         }
318 
319         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)320         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
321             if (readBuf.remaining() > TARGET_BUFSIZE) {
322                 if (debugr.on())
323                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
324                             readBuf.remaining());
325                 return 0;
326             } else {
327                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
328             }
329         }
330 
331         // readBuf is kept ready for reading outside of this method
addToReadBuf(List<ByteBuffer> buffers, boolean complete)332         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
333             assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
334             synchronized (readBufferLock) {
335                 for (ByteBuffer buf : buffers) {
336                     readBuf.compact();
337                     while (readBuf.remaining() < buf.remaining())
338                         reallocReadBuf();
339                     readBuf.put(buf);
340                     readBuf.flip();
341                     // should be safe to call inside lock
342                     // since the only implementation
343                     // offers the buffer to an unbounded queue.
344                     // WARNING: do not touch buf after this point!
345                     if (recycler != null) recycler.accept(buf);
346                 }
347                 if (complete) {
348                     this.completing = complete;
349                     minBytesRequired = 0;
350                 }
351             }
352         }
353 
schedule()354         void schedule() {
355             scheduler.runOrSchedule(exec);
356         }
357 
stop()358         void stop() {
359             if (debugr.on()) debugr.log("stop");
360             scheduler.stop();
361         }
362 
363         AtomicInteger count = new AtomicInteger(0);
364 
365         // minimum number of bytes required to call unwrap.
366         // Usually this is 0, unless there was a buffer underflow.
367         // In this case we need to wait for more bytes than what
368         // we had before calling unwrap() again.
369         volatile int minBytesRequired;
370 
371         // work function where it all happens
processData()372         final void processData() {
373             try {
374                 if (debugr.on())
375                     debugr.log("processData:"
376                             + " readBuf remaining:" + readBuf.remaining()
377                             + ", state:" + states(handshakeState)
378                             + ", engine handshake status:" + engine.getHandshakeStatus());
379                 int len;
380                 boolean complete = false;
381                 while (readBuf.remaining() > (len = minBytesRequired)) {
382                     boolean handshaking = false;
383                     try {
384                         EngineResult result;
385                         synchronized (readBufferLock) {
386                             complete = this.completing;
387                             if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
388                             // Unless there is a BUFFER_UNDERFLOW, we should try to
389                             // unwrap any number of bytes. Set minBytesRequired to 0:
390                             // we only need to do that if minBytesRequired is not already 0.
391                             len = len > 0 ? minBytesRequired = 0 : len;
392                             result = unwrapBuffer(readBuf);
393                             len = readBuf.remaining();
394                             if (debugr.on()) {
395                                 debugr.log("Unwrapped: result: %s", result.result);
396                                 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
397                             }
398                         }
399                         if (result.bytesProduced() > 0) {
400                             if (debugr.on())
401                                 debugr.log("sending %d", result.bytesProduced());
402                             count.addAndGet(result.bytesProduced());
403                             outgoing(result.destBuffer, false);
404                         }
405                         if (result.status() == Status.BUFFER_UNDERFLOW) {
406                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
407                             // not enough data in the read buffer...
408                             // no need to try to unwrap again unless we get more bytes
409                             // than minBytesRequired = len in the read buffer.
410                             synchronized (readBufferLock) {
411                                 minBytesRequired = len;
412                                 // more bytes could already have been added...
413                                 assert readBuf.remaining() >= len;
414                                 // check if we have received some data, and if so
415                                 // we can just re-spin the loop
416                                 if (readBuf.remaining() > len) continue;
417                                 else if (this.completing) {
418                                     if (debug.on()) {
419                                         debugr.log("BUFFER_UNDERFLOW with EOF," +
420                                                 " %d bytes non decrypted.", len);
421                                     }
422                                     // The channel won't send us any more data, and
423                                     // we are in underflow: we need to fail.
424                                     throw new IOException("BUFFER_UNDERFLOW with EOF, "
425                                             + len + " bytes non decrypted.");
426                                 }
427                             }
428                             // request more data and return.
429                             requestMore();
430                             return;
431                         }
432                         if (complete && result.status() == Status.CLOSED) {
433                             if (debugr.on()) debugr.log("Closed: completing");
434                             outgoing(Utils.EMPTY_BB_LIST, true);
435                             // complete ALPN if not yet completed
436                             setALPN();
437                             return;
438                         }
439                         if (result.handshaking()) {
440                             handshaking = true;
441                             if (debugr.on()) debugr.log("handshaking");
442                             if (doHandshake(result, READER)) continue; // need unwrap
443                             else break; // doHandshake will have triggered the write scheduler if necessary
444                         } else {
445                             if (trySetALPN()) {
446                                 resumeActivity();
447                             }
448                         }
449                     } catch (IOException ex) {
450                         errorCommon(ex);
451                         handleError(ex);
452                         return;
453                     }
454                     if (handshaking && !complete)
455                         return;
456                 }
457                 if (!complete) {
458                     synchronized (readBufferLock) {
459                         complete = this.completing && !readBuf.hasRemaining();
460                     }
461                 }
462                 if (complete) {
463                     if (debugr.on()) debugr.log("completing");
464                     // Complete the alpnCF, if not already complete, regardless of
465                     // whether or not the ALPN is available, there will be no more
466                     // activity.
467                     setALPN();
468                     outgoing(Utils.EMPTY_BB_LIST, true);
469                 }
470             } catch (Throwable ex) {
471                 errorCommon(ex);
472                 handleError(ex);
473             }
474         }
475 
476         private volatile Status lastUnwrapStatus;
unwrapBuffer(ByteBuffer src)477         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
478             ByteBuffer dst = getAppBuffer();
479             int len = src.remaining();
480             while (true) {
481                 SSLEngineResult sslResult = engine.unwrap(src, dst);
482                 switch (lastUnwrapStatus = sslResult.getStatus()) {
483                     case BUFFER_OVERFLOW:
484                         // may happen if app size buffer was changed, or if
485                         // our 'adaptiveBufferSize' guess was too small for
486                         // the current payload. In that case, update the
487                         // value of applicationBufferSize, and allocate a
488                         // buffer of that size, which we are sure will be
489                         // big enough to decode whatever needs to be
490                         // decoded. We will later update adaptiveBufferSize
491                         // in OK: below.
492                         int appSize = applicationBufferSize =
493                                 engine.getSession().getApplicationBufferSize();
494                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
495                         dst.flip();
496                         b.put(dst);
497                         dst = b;
498                         break;
499                     case CLOSED:
500                         assert dst.position() == 0;
501                         return doClosure(new EngineResult(sslResult));
502                     case BUFFER_UNDERFLOW:
503                         // handled implicitly by compaction/reallocation of readBuf
504                         assert dst.position() == 0;
505                         return new EngineResult(sslResult);
506                     case OK:
507                         int size = dst.position();
508                         if (debug.on()) {
509                             debugr.log("Decoded " + size + " bytes out of " + len
510                                     + " into buffer of " + dst.capacity()
511                                     + " remaining to decode: " + src.remaining());
512                         }
513                         // if the record payload was bigger than what was originally
514                         // allocated, then sets the adaptiveAppBufferSize to size
515                         // and we will use that new size as a guess for the next app
516                         // buffer.
517                         if (size > adaptiveAppBufferSize) {
518                             adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
519                         }
520                         dst.flip();
521                         return new EngineResult(sslResult, dst);
522                 }
523             }
524         }
525     }
526 
527     public interface Monitorable {
getInfo()528         public String getInfo();
529     }
530 
531     public static class Monitor extends Thread {
532         final List<WeakReference<Monitorable>> list;
533         final List<FinalMonitorable> finalList;
534         final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
535         static Monitor themon;
536 
537         static {
538             themon = new Monitor();
themon.start()539             themon.start(); // uncomment to enable Monitor
540         }
541 
542         // An instance used to temporarily store the
543         // last observable state of a monitorable object.
544         // When Monitor.remove(o) is called, we replace
545         // 'o' with a FinalMonitorable whose reference
546         // will be enqueued after the last observable state
547         // has been printed.
548         final class FinalMonitorable implements Monitorable {
549             final String finalState;
FinalMonitorable(Monitorable o)550             FinalMonitorable(Monitorable o) {
551                 finalState = o.getInfo();
552                 finalList.add(this);
553             }
554             @Override
getInfo()555             public String getInfo() {
556                 finalList.remove(this);
557                 return finalState;
558             }
559         }
560 
Monitor()561         Monitor() {
562             super("Monitor");
563             setDaemon(true);
564             list = Collections.synchronizedList(new LinkedList<>());
565             finalList = new ArrayList<>(); // access is synchronized on list above
566         }
567 
addTarget(Monitorable o)568         void addTarget(Monitorable o) {
569             list.add(new WeakReference<>(o, queue));
570         }
removeTarget(Monitorable o)571         void removeTarget(Monitorable o) {
572             // It can take a long time for GC to clean up references.
573             // Calling Monitor.remove() early helps removing noise from the
574             // logs/
575             synchronized (list) {
576                 Iterator<WeakReference<Monitorable>> it = list.iterator();
577                 while (it.hasNext()) {
578                     Monitorable m = it.next().get();
579                     if (m == null) it.remove();
580                     if (o == m) {
581                         it.remove();
582                         break;
583                     }
584                 }
585                 FinalMonitorable m = new FinalMonitorable(o);
586                 addTarget(m);
587                 Reference.reachabilityFence(m);
588             }
589         }
590 
add(Monitorable o)591         public static void add(Monitorable o) {
592             themon.addTarget(o);
593         }
remove(Monitorable o)594         public static void remove(Monitorable o) {
595             themon.removeTarget(o);
596         }
597 
598         @Override
run()599         public void run() {
600             System.out.println("Monitor starting");
601             try {
602                 while (true) {
603                     Thread.sleep(20 * 1000);
604                     synchronized (list) {
605                         Reference<? extends Monitorable> expired;
606                         while ((expired = queue.poll()) != null) list.remove(expired);
607                         for (WeakReference<Monitorable> ref : list) {
608                             Monitorable o = ref.get();
609                             if (o == null) continue;
610                             if (o instanceof FinalMonitorable) {
611                                 ref.enqueue();
612                             }
613                             System.out.println(o.getInfo());
614                             System.out.println("-------------------------");
615                         }
616                     }
617                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
618                 }
619             } catch (InterruptedException e) {
620                 System.out.println("Monitor exiting with " + e);
621             }
622         }
623     }
624 
625     /**
626      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
627      * Any encrypted buffers generated are passed downstream to be written.
628      * Status codes:
629      *     NEED_UNWRAP: call reader.addData() with empty buffer
630      *     NEED_WRAP: call addData() with empty buffer
631      *     NEED_TASK: delegate task to executor
632      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
633      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
634      *     OK: return generated buffers
635      */
636     class Writer extends SubscriberWrapper {
637         final SequentialScheduler scheduler;
638         // queues of buffers received from upstream waiting
639         // to be processed by the SSLEngine
640         final List<ByteBuffer> writeList;
641         final Logger debugw =  Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
642         volatile boolean completing;
643         boolean completed; // only accessed in processData
644 
645         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
run()646             @Override public void run() { processData(); }
647         }
648 
Writer()649         Writer() {
650             super();
651             writeList = Collections.synchronizedList(new LinkedList<>());
652             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
653         }
654 
655         @Override
incoming(List<ByteBuffer> buffers, boolean complete)656         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
657             assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
658             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
659             if (complete) {
660                 if (debugw.on()) debugw.log("adding SENTINEL");
661                 completing = true;
662                 writeList.add(SENTINEL);
663             } else {
664                 writeList.addAll(buffers);
665             }
666             if (debugw.on())
667                 debugw.log("added " + buffers.size()
668                            + " (" + Utils.remaining(buffers)
669                            + " bytes) to the writeList");
670             scheduler.runOrSchedule();
671         }
672 
dbgString()673         public final String dbgString() {
674             return "SSL Writer(" + tubeName + ")";
675         }
676 
onSubscribe()677         protected void onSubscribe() {
678             if (debugw.on()) debugw.log("onSubscribe initiating handshaking");
679             addData(HS_TRIGGER);  // initiates handshaking
680         }
681 
schedule()682         void schedule() {
683             scheduler.runOrSchedule();
684         }
685 
stop()686         void stop() {
687             if (debugw.on()) debugw.log("stop");
688             scheduler.stop();
689         }
690 
691         @Override
closing()692         public boolean closing() {
693             return closeNotifyReceived();
694         }
695 
isCompleting()696         private boolean isCompleting() {
697             return completing;
698         }
699 
700         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)701         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
702             if (writeList.size() > 10)
703                 return 0;
704             else
705                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
706         }
707 
hsTriggered()708         private boolean hsTriggered() {
709             synchronized(writeList) {
710                 for (ByteBuffer b : writeList)
711                     if (b == HS_TRIGGER)
712                         return true;
713                 return false;
714             }
715         }
716 
triggerWrite()717         void triggerWrite() {
718             synchronized (writeList) {
719                 if (writeList.isEmpty()) {
720                     writeList.add(HS_TRIGGER);
721                 }
722             }
723             scheduler.runOrSchedule();
724         }
725 
processData()726         private void processData() {
727             boolean completing = isCompleting();
728 
729             try {
730                 if (debugw.on())
731                     debugw.log("processData, writeList remaining:"
732                                 + Utils.remaining(writeList) + ", hsTriggered:"
733                                 + hsTriggered() + ", needWrap:" + needWrap());
734 
735                 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
736                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
737                     EngineResult result = wrapBuffers(outbufs);
738                     if (debugw.on())
739                         debugw.log("wrapBuffer returned %s", result.result);
740 
741                     if (result.status() == Status.CLOSED) {
742                         if (!upstreamCompleted) {
743                             upstreamCompleted = true;
744                             upstreamSubscription.cancel();
745                             // complete ALPN if not yet completed
746                             setALPN();
747                         }
748                         if (result.bytesProduced() <= 0)
749                             return;
750 
751                         if (!completing && !completed) {
752                             completing = this.completing = true;
753                             // There could still be some outgoing data in outbufs.
754                             writeList.add(SENTINEL);
755                         }
756                     }
757 
758                     boolean handshaking = false;
759                     if (result.handshaking()) {
760                         if (debugw.on()) debugw.log("handshaking");
761                         doHandshake(result, WRITER);  // ok to ignore return
762                         handshaking = true;
763                     } else {
764                         if (trySetALPN()) {
765                             resumeActivity();
766                         }
767                     }
768                     cleanList(writeList); // tidy up the source list
769                     sendResultBytes(result);
770                     if (handshaking) {
771                         if (!completing && needWrap()) {
772                             continue;
773                         } else {
774                             return;
775                         }
776                     }
777                 }
778                 if (completing && Utils.remaining(writeList) == 0) {
779                     if (!completed) {
780                         completed = true;
781                         writeList.clear();
782                         outgoing(Utils.EMPTY_BB_LIST, true);
783                     }
784                     return;
785                 }
786                 if (writeList.isEmpty() && needWrap()) {
787                     writer.addData(HS_TRIGGER);
788                 }
789             } catch (Throwable ex) {
790                 errorCommon(ex);
791                 handleError(ex);
792             }
793         }
794 
795         // The SSLEngine insists on being given a buffer that is at least
796         // SSLSession.getPacketBufferSize() long (usually 16K). If given
797         // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
798         // has 6 bytes to wrap. Typical usage shows that for GET we
799         // usually produce an average of ~ 100 bytes.
800         // To avoid wasting space, and because allocating and zeroing
801         // 16K buffers for encoding 6 bytes is costly, we are reusing the
802         // same writeBuffer to interact with SSLEngine.wrap().
803         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
804         // then we copy off the bytes to a smaller buffer that we send
805         // downstream. Otherwise, we send the writeBuffer downstream
806         // and will allocate a new one next time.
807         volatile ByteBuffer writeBuffer;
808         private volatile Status lastWrappedStatus;
809         @SuppressWarnings("fallthrough")
wrapBuffers(ByteBuffer[] src)810         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
811             long len = Utils.remaining(src);
812             if (debugw.on())
813                 debugw.log("wrapping " + len + " bytes");
814 
815             ByteBuffer dst = writeBuffer;
816             if (dst == null) dst = writeBuffer = getNetBuffer();
817             assert dst.position() == 0 : "buffer position is " + dst.position();
818             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
819 
820             while (true) {
821                 SSLEngineResult sslResult = engine.wrap(src, dst);
822                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
823                 switch (lastWrappedStatus = sslResult.getStatus()) {
824                     case BUFFER_OVERFLOW:
825                         // Shouldn't happen. We allocated buffer with packet size
826                         // get it again if net buffer size was changed
827                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
828                         int netSize = packetBufferSize
829                                 = engine.getSession().getPacketBufferSize();
830                         ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
831                         dst.flip();
832                         b.put(dst);
833                         dst = b;
834                         break; // try again
835                     case CLOSED:
836                         if (debugw.on()) debugw.log("CLOSED");
837                         // fallthrough. There could be some remaining data in dst.
838                         // CLOSED will be handled by the caller.
839                     case OK:
840                         final ByteBuffer dest;
841                         if (dst.position() == 0) {
842                             dest = NOTHING; // can happen if handshake is in progress
843                         } else if (dst.position() < dst.capacity() / 2) {
844                             // less than half the buffer was used.
845                             // copy off the bytes to a smaller buffer, and keep
846                             // the writeBuffer for next time.
847                             dst.flip();
848                             dest = Utils.copyAligned(dst);
849                             dst.clear();
850                         } else {
851                             // more than half the buffer was used.
852                             // just send that buffer downstream, and we will
853                             // get a new writeBuffer next time it is needed.
854                             dst.flip();
855                             dest = dst;
856                             writeBuffer = null;
857                         }
858                         if (debugw.on())
859                             debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
860                                        dest.remaining(),  dest.capacity(), Utils.remaining(src));
861                         return new EngineResult(sslResult, dest);
862                     case BUFFER_UNDERFLOW:
863                         // Shouldn't happen.  Doesn't returns when wrap()
864                         // underflow handled externally
865                         // assert false : "Buffer Underflow";
866                         if (debug.on()) debug.log("BUFFER_UNDERFLOW");
867                         return new EngineResult(sslResult);
868                     default:
869                         if (debugw.on())
870                             debugw.log("result: %s", sslResult.getStatus());
871                         assert false : "result:" + sslResult.getStatus();
872                 }
873             }
874         }
875 
needWrap()876         private boolean needWrap() {
877             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
878         }
879 
sendResultBytes(EngineResult result)880         private void sendResultBytes(EngineResult result) {
881             if (result.bytesProduced() > 0) {
882                 if (debugw.on())
883                     debugw.log("Sending %d bytes downstream",
884                                result.bytesProduced());
885                 outgoing(result.destBuffer, false);
886             }
887         }
888 
889         @Override
toString()890         public String toString() {
891             return "WRITER: " + super.toString()
892                     + ", writeList size: " + Integer.toString(writeList.size())
893                     + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
894                     + ", status: " + lastWrappedStatus;
895                     //" writeList: " + writeList.toString();
896         }
897     }
898 
handleError(Throwable t)899     private void handleError(Throwable t) {
900         if (debug.on()) debug.log("handleError", t);
901         readerCF.completeExceptionally(t);
902         writerCF.completeExceptionally(t);
903         // no-op if already completed
904         alpnCF.completeExceptionally(t);
905         reader.stop();
906         writer.stop();
907     }
908 
909     boolean stopped;
910 
normalStop()911     private synchronized void normalStop() {
912         if (stopped)
913             return;
914         stopped = true;
915         reader.stop();
916         writer.stop();
917         // make sure the alpnCF is completed.
918         if (!alpnCF.isDone()) {
919             Throwable alpn = new SSLHandshakeException(
920                     "Connection closed before successful ALPN negotiation");
921             alpnCF.completeExceptionally(alpn);
922         }
923         if (isMonitored) Monitor.remove(monitor);
924     }
925 
stopOnError(Throwable error)926     private Void stopOnError(Throwable error) {
927         // maybe log, etc
928         // ensure the ALPN is completed
929         // We could also do this in SSLTube.SSLSubscriberWrapper
930         // onError/onComplete - with the caveat that the ALP CF
931         // would get completed externally. Doing it here keeps
932         // it all inside SSLFlowDelegate.
933         if (!alpnCF.isDone()) {
934             alpnCF.completeExceptionally(error);
935         }
936         normalStop();
937         return null;
938     }
939 
cleanList(List<ByteBuffer> l)940     private void cleanList(List<ByteBuffer> l) {
941         synchronized (l) {
942             Iterator<ByteBuffer> iter = l.iterator();
943             while (iter.hasNext()) {
944                 ByteBuffer b = iter.next();
945                 if (!b.hasRemaining() && b != SENTINEL) {
946                     iter.remove();
947                 }
948             }
949         }
950     }
951 
952     /**
953      * States for handshake. We avoid races when accessing/updating the AtomicInt
954      * because updates always schedule an additional call to both the read()
955      * and write() functions.
956      */
957     private static final int NOT_HANDSHAKING = 0;
958     private static final int HANDSHAKING = 1;
959 
960     // Bit flags
961     // a thread is currently executing tasks
962     private static final int DOING_TASKS = 4;
963     // a thread wants to execute tasks, while another thread is executing
964     private static final int REQUESTING_TASKS = 8;
965     private static final int TASK_BITS = 12; // Both bits
966 
967     private static final int READER = 1;
968     private static final int WRITER = 2;
969 
states(AtomicInteger state)970     private static String states(AtomicInteger state) {
971         int s = state.get();
972         StringBuilder sb = new StringBuilder();
973         int x = s & ~TASK_BITS;
974         switch (x) {
975             case NOT_HANDSHAKING:
976                 sb.append(" NOT_HANDSHAKING ");
977                 break;
978             case HANDSHAKING:
979                 sb.append(" HANDSHAKING ");
980                 break;
981             default:
982                 throw new InternalError();
983         }
984         if ((s & DOING_TASKS) > 0)
985             sb.append("|DOING_TASKS");
986         if ((s & REQUESTING_TASKS) > 0)
987             sb.append("|REQUESTING_TASKS");
988         return sb.toString();
989     }
990 
resumeActivity()991     private void resumeActivity() {
992         reader.schedule();
993         writer.schedule();
994     }
995 
996     final AtomicInteger handshakeState;
997     final ConcurrentLinkedQueue<String> stateList =
998             debug.on() ? new ConcurrentLinkedQueue<>() : null;
999 
1000     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
1001     // depending on previous value
1002     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
1003         if ((current & DOING_TASKS) == 0)
1004             return DOING_TASKS | (current & HANDSHAKING);
1005         else
1006             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
1007     };
1008 
1009     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
1010     // clears bits if not.
1011     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
1012         if ((current & REQUESTING_TASKS) != 0)
1013             return DOING_TASKS | (current & HANDSHAKING);
1014         // clear both bits
1015         return (current & HANDSHAKING);
1016     };
1017 
doHandshake(EngineResult r, int caller)1018     private boolean doHandshake(EngineResult r, int caller) {
1019         // unconditionally sets the HANDSHAKING bit, while preserving task bits
1020         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
1021         if (stateList != null && debug.on()) {
1022             stateList.add(r.handshakeStatus().toString());
1023             stateList.add(Integer.toString(caller));
1024         }
1025         switch (r.handshakeStatus()) {
1026             case NEED_TASK:
1027                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
1028                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
1029                     return false;
1030                 }
1031 
1032                 if (debug.on()) debug.log("obtaining and initiating task execution");
1033                 List<Runnable> tasks = obtainTasks();
1034                 executeTasks(tasks);
1035                 return false;  // executeTasks will resume activity
1036             case NEED_WRAP:
1037                 if (caller == READER) {
1038                     writer.triggerWrite();
1039                     return false;
1040                 }
1041                 break;
1042             case NEED_UNWRAP:
1043             case NEED_UNWRAP_AGAIN:
1044                 // do nothing else
1045                 // receiving-side data will trigger unwrap
1046                 if (caller == WRITER) {
1047                     reader.schedule();
1048                     return false;
1049                 }
1050                 break;
1051             default:
1052                 throw new InternalError("Unexpected handshake status:"
1053                                         + r.handshakeStatus());
1054         }
1055         return true;
1056     }
1057 
obtainTasks()1058     private List<Runnable> obtainTasks() {
1059         List<Runnable> l = new ArrayList<>();
1060         Runnable r;
1061         while ((r = engine.getDelegatedTask()) != null) {
1062             l.add(r);
1063         }
1064         return l;
1065     }
1066 
executeTasks(List<Runnable> tasks)1067     private void executeTasks(List<Runnable> tasks) {
1068         exec.execute(() -> {
1069             try {
1070                 List<Runnable> nextTasks = tasks;
1071                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
1072                 do {
1073                     nextTasks.forEach(Runnable::run);
1074                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
1075                         nextTasks = obtainTasks();
1076                     } else {
1077                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
1078                         if ((s & DOING_TASKS) != 0) {
1079                             if (debug.on()) debug.log("re-running tasks (B)");
1080                             nextTasks = obtainTasks();
1081                             continue;
1082                         }
1083                         break;
1084                     }
1085                 } while (true);
1086                 if (debug.on()) debug.log("finished task execution");
1087                 HandshakeStatus hs = engine.getHandshakeStatus();
1088                 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
1089                     // We're no longer handshaking, try setting ALPN
1090                     trySetALPN();
1091                 }
1092                 resumeActivity();
1093             } catch (Throwable t) {
1094                 handleError(t);
1095             }
1096         });
1097     }
1098 
trySetALPN()1099     boolean trySetALPN() {
1100         // complete ALPN CF if needed.
1101         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
1102             applicationBufferSize = engine.getSession().getApplicationBufferSize();
1103             packetBufferSize = engine.getSession().getPacketBufferSize();
1104             setALPN();
1105             return true;
1106         }
1107         return false;
1108     }
1109 
1110     // FIXME: acknowledge a received CLOSE request from peer
doClosure(EngineResult r)1111     EngineResult doClosure(EngineResult r) throws IOException {
1112         if (debug.on())
1113             debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
1114                       r.result, engine.getHandshakeStatus(),
1115                       engine.isOutboundDone(), engine.isInboundDone());
1116         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
1117             // we have received TLS close_notify and need to send
1118             // an acknowledgement back. We're calling doHandshake
1119             // to finish the close handshake.
1120             if (engine.isInboundDone() && !engine.isOutboundDone()) {
1121                 if (debug.on()) debug.log("doClosure: close_notify received");
1122                 close_notify_received = true;
1123                 if (!writer.scheduler.isStopped()) {
1124                     doHandshake(r, READER);
1125                 } else {
1126                     // We have received closed notify, but we
1127                     // won't be able to send the acknowledgement.
1128                     // Nothing more will come from the socket either,
1129                     // so mark the reader as completed.
1130                     synchronized (reader.readBufferLock) {
1131                         reader.completing = true;
1132                     }
1133                 }
1134             }
1135         }
1136         return r;
1137     }
1138 
1139     /**
1140      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
1141      * This flow must be given the encrypted data read from upstream (eg socket)
1142      * before it is decrypted.
1143      */
upstreamReader()1144     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
1145         return reader;
1146     }
1147 
1148     /**
1149      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
1150      * This flow contains the plaintext data before it is encrypted.
1151      */
upstreamWriter()1152     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
1153         return writer;
1154     }
1155 
resumeReader()1156     public boolean resumeReader() {
1157         return reader.signalScheduling();
1158     }
1159 
resetReaderDemand()1160     public void resetReaderDemand() {
1161         reader.resetDownstreamDemand();
1162     }
1163 
1164     static class EngineResult {
1165         final SSLEngineResult result;
1166         final ByteBuffer destBuffer;
1167 
1168         // normal result
EngineResult(SSLEngineResult result)1169         EngineResult(SSLEngineResult result) {
1170             this(result, null);
1171         }
1172 
EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1173         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
1174             this.result = result;
1175             this.destBuffer = destBuffer;
1176         }
1177 
handshaking()1178         boolean handshaking() {
1179             HandshakeStatus s = result.getHandshakeStatus();
1180             return s != HandshakeStatus.FINISHED
1181                    && s != HandshakeStatus.NOT_HANDSHAKING
1182                    && result.getStatus() != Status.CLOSED;
1183         }
1184 
needUnwrap()1185         boolean needUnwrap() {
1186             HandshakeStatus s = result.getHandshakeStatus();
1187             return s == HandshakeStatus.NEED_UNWRAP;
1188         }
1189 
1190 
bytesConsumed()1191         int bytesConsumed() {
1192             return result.bytesConsumed();
1193         }
1194 
bytesProduced()1195         int bytesProduced() {
1196             return result.bytesProduced();
1197         }
1198 
handshakeStatus()1199         SSLEngineResult.HandshakeStatus handshakeStatus() {
1200             return result.getHandshakeStatus();
1201         }
1202 
status()1203         SSLEngineResult.Status status() {
1204             return result.getStatus();
1205         }
1206     }
1207 
1208     // The maximum network buffer size negotiated during
1209     // the handshake. Usually 16K.
1210     volatile int packetBufferSize;
getNetBuffer()1211     final ByteBuffer getNetBuffer() {
1212         int netSize = packetBufferSize;
1213         if (netSize <= 0) {
1214             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
1215         }
1216         return ByteBuffer.allocate(netSize);
1217     }
1218 
1219     // The maximum application buffer size negotiated during
1220     // the handshake. Usually close to 16K.
1221     volatile int applicationBufferSize;
1222     // Despite of the maximum applicationBufferSize negotiated
1223     // above, TLS records usually have a much smaller payload.
1224     // The adaptativeAppBufferSize records the max payload
1225     // ever decoded, and we use that as a guess for how big
1226     // a buffer we will need for the next payload.
1227     // This avoids allocating and zeroing a 16K buffer for
1228     // nothing...
1229     volatile int adaptiveAppBufferSize;
getAppBuffer()1230     final ByteBuffer getAppBuffer() {
1231         int appSize = applicationBufferSize;
1232         if (appSize <= 0) {
1233             applicationBufferSize = appSize
1234                     = engine.getSession().getApplicationBufferSize();
1235         }
1236         int size = adaptiveAppBufferSize;
1237         if (size <= 0) {
1238             size = 512; // start with 512 this is usually enough for handshaking / headers
1239         } else if (size > appSize) {
1240             size = appSize;
1241         }
1242         // will cause a BUFFER_OVERFLOW if not big enough, but
1243         // that's OK.
1244         return ByteBuffer.allocate(size);
1245     }
1246 
dbgString()1247     final String dbgString() {
1248         return "SSLFlowDelegate(" + tubeName + ")";
1249     }
1250 }
1251