1 /*
2  * Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.common;
27 
28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
29 
30 import javax.net.ssl.SSLEngine;
31 import javax.net.ssl.SSLEngineResult;
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
33 import javax.net.ssl.SSLEngineResult.Status;
34 import javax.net.ssl.SSLException;
35 import javax.net.ssl.SSLHandshakeException;
36 import java.io.IOException;
37 import java.lang.ref.Reference;
38 import java.lang.ref.ReferenceQueue;
39 import java.lang.ref.WeakReference;
40 import java.nio.ByteBuffer;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.Flow;
50 import java.util.concurrent.Flow.Subscriber;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.function.Consumer;
53 import java.util.function.IntBinaryOperator;
54 
55 /**
56  * Implements SSL using two SubscriberWrappers.
57  *
58  * <p> Constructor takes two Flow.Subscribers: one that receives the network
59  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
60  * receives the application data (before it has been encrypted by SSLFlowDelegate).
61  *
62  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
63  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
64  * See diagram below.
65  *
66  * <p> How Flow.Subscribers are used in this class, and where they come from:
67  * <pre>
68  * {@code
69  *
70  *
71  *
72  * --------->  data flow direction
73  *
74  *
75  *                         +------------------+
76  *        upstreamWriter   |                  | downWriter
77  *        ---------------> |                  | ------------>
78  *  obtained from this     |                  | supplied to constructor
79  *                         | SSLFlowDelegate  |
80  *        downReader       |                  | upstreamReader
81  *        <--------------- |                  | <--------------
82  * supplied to constructor |                  | obtained from this
83  *                         +------------------+
84  *
85  * Errors are reported to the downReader Flow.Subscriber
86  *
87  * }
88  * </pre>
89  */
90 public class SSLFlowDelegate {
91 
92     final Logger debug =
93             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
94 
95     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
96     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
97     // When handshake is in progress trying to wrap may produce no bytes.
98     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
99     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
100     private static final boolean isMonitored =
101             monProp != null && (monProp.isEmpty() || monProp.equalsIgnoreCase("true"));
102 
103     final Executor exec;
104     final Reader reader;
105     final Writer writer;
106     final SSLEngine engine;
107     final String tubeName; // hack
108     final CompletableFuture<String> alpnCF; // completes on initial handshake
109     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
110     volatile boolean close_notify_received;
111     final CompletableFuture<Void> readerCF;
112     final CompletableFuture<Void> writerCF;
113     final CompletableFuture<Void> stopCF;
114     final Consumer<ByteBuffer> recycler;
115     static AtomicInteger scount = new AtomicInteger(1);
116     final int id;
117 
118     /**
119      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
120      * Flow.Subscriber requires an associated {@link CompletableFuture}
121      * for errors that need to be signaled from downstream to upstream.
122      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123     public SSLFlowDelegate(SSLEngine engine,
124                            Executor exec,
125                            Subscriber<? super List<ByteBuffer>> downReader,
126                            Subscriber<? super List<ByteBuffer>> downWriter)
127     {
128         this(engine, exec, null, downReader, downWriter);
129     }
130 
131     /**
132      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
133      * Flow.Subscriber requires an associated {@link CompletableFuture}
134      * for errors that need to be signaled from downstream to upstream.
135      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136     public SSLFlowDelegate(SSLEngine engine,
137             Executor exec,
138             Consumer<ByteBuffer> recycler,
139             Subscriber<? super List<ByteBuffer>> downReader,
140             Subscriber<? super List<ByteBuffer>> downWriter)
141         {
142         this.id = scount.getAndIncrement();
143         this.tubeName = String.valueOf(downWriter);
144         this.recycler = recycler;
145         this.reader = new Reader();
146         this.writer = new Writer();
147         this.engine = engine;
148         this.exec = exec;
149         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
150         this.readerCF = reader.completion();
151         this.writerCF = reader.completion();
152         readerCF.exceptionally(this::stopOnError);
153         writerCF.exceptionally(this::stopOnError);
154         this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
155             .thenRun(this::normalStop);
156         this.alpnCF = new MinimalFuture<>();
157 
158         // connect the Reader to the downReader and the
159         // Writer to the downWriter.
160         connect(downReader, downWriter);
161 
162         if (isMonitored) Monitor.add(monitor);
163     }
164 
165     /**
166      * Returns true if the SSLFlowDelegate has detected a TLS
167      * close_notify from the server.
168      * @return true, if a close_notify was detected.
169      */
closeNotifyReceived()170     public boolean closeNotifyReceived() {
171         return close_notify_received;
172     }
173 
174     /**
175      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
176      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
177      * Called from within the constructor. Overwritten by SSLTube.
178      *
179      * @param downReader  The left hand side read sink (typically, the
180      *                    HttpConnection read subscriber).
181      * @param downWriter  The right hand side write sink (typically
182      *                    the SocketTube write subscriber).
183      */
connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184     void connect(Subscriber<? super List<ByteBuffer>> downReader,
185                  Subscriber<? super List<ByteBuffer>> downWriter) {
186         this.reader.subscribe(downReader);
187         this.writer.subscribe(downWriter);
188     }
189 
190    /**
191     * Returns a CompletableFuture<String> which completes after
192     * the initial handshake completes, and which contains the negotiated
193     * alpn.
194     */
alpn()195     public CompletableFuture<String> alpn() {
196         return alpnCF;
197     }
198 
setALPN()199     private void setALPN() {
200         // Handshake is finished. So, can retrieve the ALPN now
201         if (alpnCF.isDone())
202             return;
203         String alpn = engine.getApplicationProtocol();
204         if (debug.on()) debug.log("setALPN = %s", alpn);
205         alpnCF.complete(alpn);
206     }
207 
monitor()208     public String monitor() {
209         StringBuilder sb = new StringBuilder();
210         sb.append("SSL: id ").append(id);
211         sb.append(" ").append(dbgString());
212         sb.append(" HS state: " + states(handshakeState));
213         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
214         if (stateList != null) {
215             sb.append(" LL : ");
216             for (String s : stateList) {
217                 sb.append(s).append(" ");
218             }
219         }
220         sb.append("\r\n");
221         sb.append("Reader:: ").append(reader.toString());
222         sb.append("\r\n");
223         sb.append("Writer:: ").append(writer.toString());
224         sb.append("\r\n===================================");
225         return sb.toString();
226     }
227 
enterReadScheduling()228     protected SchedulingAction enterReadScheduling() {
229         return SchedulingAction.CONTINUE;
230     }
231 
checkForHandshake(Throwable t)232     protected Throwable checkForHandshake(Throwable t) {
233         return t;
234     }
235 
236 
237     /**
238      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
239      * Any decrypted buffers returned to be passed downstream.
240      * Status codes:
241      *     NEED_UNWRAP: do nothing. Following incoming data will contain
242      *                  any required handshake data
243      *     NEED_WRAP: call writer.addData() with empty buffer
244      *     NEED_TASK: delegate task to executor
245      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
246      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
247      *     OK: return generated buffers.
248      *
249      * Upstream subscription strategy is to try and keep no more than
250      * TARGET_BUFSIZE bytes in readBuf
251      */
252     final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
253         // Maximum record size is 16k.
254         // Because SocketTube can feeds us up to 3 16K buffers,
255         // then setting this size to 16K means that the readBuf
256         // can store up to 64K-1 (16K-1 + 3*16K)
257         static final int TARGET_BUFSIZE = 16 * 1024;
258 
259         final SequentialScheduler scheduler;
260         volatile ByteBuffer readBuf;
261         volatile boolean completing;
262         final Object readBufferLock = new Object();
263         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
264 
265         private final class ReaderDownstreamPusher implements Runnable {
266             @Override
run()267             public void run() {
268                 processData();
269             }
270         }
271 
Reader()272         Reader() {
273             super();
274             scheduler = SequentialScheduler.synchronizedScheduler(
275                     new ReaderDownstreamPusher());
276             this.readBuf = ByteBuffer.allocate(1024);
277             readBuf.limit(0); // keep in read mode
278         }
279 
280         @Override
supportsRecycling()281         public boolean supportsRecycling() {
282             return recycler != null;
283         }
284 
enterScheduling()285         protected SchedulingAction enterScheduling() {
286             return enterReadScheduling();
287         }
288 
dbgString()289         public final String dbgString() {
290             return "SSL Reader(" + tubeName + ")";
291         }
292 
293         /**
294          * entry point for buffers delivered from upstream Subscriber
295          */
296         @Override
incoming(List<ByteBuffer> buffers, boolean complete)297         public void incoming(List<ByteBuffer> buffers, boolean complete) {
298             if (debugr.on())
299                 debugr.log("Adding %d bytes to read buffer",
300                         Utils.remaining(buffers));
301             addToReadBuf(buffers, complete);
302             scheduler.runOrSchedule(exec);
303         }
304 
305         @Override
toString()306         public String toString() {
307             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
308                     + ", count: " + count.toString() + ", scheduler: "
309                     + (scheduler.isStopped() ? "stopped" : "running")
310                     + ", status: " + lastUnwrapStatus
311                     + ", handshakeState: " + handshakeState.get()
312                     + ", engine: " + engine.getHandshakeStatus();
313         }
314 
reallocReadBuf()315         private void reallocReadBuf() {
316             int sz = readBuf.capacity();
317             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
318             readBuf.flip();
319             Utils.copy(readBuf, newb);
320             readBuf = newb;
321         }
322 
323         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)324         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
325             if (needsMoreData()) {
326                 // run the scheduler to see if more data should be requested
327                 if (debugr.on()) {
328                     int remaining = readBuf.remaining();
329                     if (remaining > TARGET_BUFSIZE) {
330                         // just some logging to check how much we have in the read buffer
331                         debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
332                                 remaining);
333                     }
334                 }
335                 scheduler.runOrSchedule();
336             }
337             return 0; // we will request more from the scheduler loop (processData).
338         }
339 
340         // readBuf is kept ready for reading outside of this method
addToReadBuf(List<ByteBuffer> buffers, boolean complete)341         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
342             assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
343             synchronized (readBufferLock) {
344                 for (ByteBuffer buf : buffers) {
345                     readBuf.compact();
346                     while (readBuf.remaining() < buf.remaining())
347                         reallocReadBuf();
348                     readBuf.put(buf);
349                     readBuf.flip();
350                     // should be safe to call inside lock
351                     // since the only implementation
352                     // offers the buffer to an unbounded queue.
353                     // WARNING: do not touch buf after this point!
354                     if (recycler != null) recycler.accept(buf);
355                 }
356                 if (complete) {
357                     this.completing = complete;
358                     minBytesRequired = 0;
359                 }
360             }
361         }
362 
363         @Override
errorCommon(Throwable throwable)364         protected boolean errorCommon(Throwable throwable) {
365             throwable = SSLFlowDelegate.this.checkForHandshake(throwable);
366             return super.errorCommon(throwable);
367         }
368 
schedule()369         void schedule() {
370             scheduler.runOrSchedule(exec);
371         }
372 
stop()373         void stop() {
374             if (debugr.on()) debugr.log("stop");
375             scheduler.stop();
376         }
377 
378         AtomicInteger count = new AtomicInteger();
379 
380         // minimum number of bytes required to call unwrap.
381         // Usually this is 0, unless there was a buffer underflow.
382         // In this case we need to wait for more bytes than what
383         // we had before calling unwrap() again.
384         volatile int minBytesRequired;
385 
386         // We might need to request more data if:
387         //  - we have a subscription from upstream
388         //  - and we don't have enough data to decrypt in the read buffer
389         //  - *and* - either we're handshaking, and more data is required (NEED_UNWRAP),
390         //          - or we have demand from downstream, but we have nothing decrypted
391         //            to forward downstream.
needsMoreData()392         boolean needsMoreData() {
393             if (upstreamSubscription != null && readBuf.remaining() <= minBytesRequired &&
394                     (engine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP
395                             || !downstreamSubscription.demand.isFulfilled() && hasNoOutputData())) {
396                 return true;
397             }
398             return false;
399         }
400 
401         // If the readBuf has not enough data, and we either need to
402         // unwrap (handshaking) or we have demand from downstream,
403         // then request more data
requestMoreDataIfNeeded()404         void requestMoreDataIfNeeded() {
405             if (needsMoreData()) {
406                 // request more will only request more if our
407                 // demand from upstream is fulfilled
408                 requestMore();
409             }
410         }
411 
412         // work function where it all happens
processData()413         final void processData() {
414             try {
415                 if (debugr.on())
416                     debugr.log("processData:"
417                             + " readBuf remaining:" + readBuf.remaining()
418                             + ", state:" + states(handshakeState)
419                             + ", engine handshake status:" + engine.getHandshakeStatus());
420                 int len;
421                 boolean complete = false;
422                 while (readBuf.remaining() > (len = minBytesRequired)) {
423                     boolean handshaking = false;
424                     try {
425                         EngineResult result;
426                         synchronized (readBufferLock) {
427                             complete = this.completing;
428                             if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
429                             // Unless there is a BUFFER_UNDERFLOW, we should try to
430                             // unwrap any number of bytes. Set minBytesRequired to 0:
431                             // we only need to do that if minBytesRequired is not already 0.
432                             len = len > 0 ? minBytesRequired = 0 : len;
433                             result = unwrapBuffer(readBuf);
434                             len = readBuf.remaining();
435                             if (debugr.on()) {
436                                 debugr.log("Unwrapped: result: %s", result.result);
437                                 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
438                             }
439                         }
440                         if (result.bytesProduced() > 0) {
441                             if (debugr.on())
442                                 debugr.log("sending %d", result.bytesProduced());
443                             count.addAndGet(result.bytesProduced());
444                             outgoing(result.destBuffer, false);
445                         }
446                         if (result.status() == Status.BUFFER_UNDERFLOW) {
447                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
448                             // not enough data in the read buffer...
449                             // no need to try to unwrap again unless we get more bytes
450                             // than minBytesRequired = len in the read buffer.
451                             synchronized (readBufferLock) {
452                                 minBytesRequired = len;
453                                 // more bytes could already have been added...
454                                 assert readBuf.remaining() >= len;
455                                 // check if we have received some data, and if so
456                                 // we can just re-spin the loop
457                                 if (readBuf.remaining() > len) continue;
458                                 else if (this.completing) {
459                                     if (debug.on()) {
460                                         debugr.log("BUFFER_UNDERFLOW with EOF," +
461                                                 " %d bytes non decrypted.", len);
462                                     }
463                                     // The channel won't send us any more data, and
464                                     // we are in underflow: we need to fail.
465                                     throw new IOException("BUFFER_UNDERFLOW with EOF, "
466                                             + len + " bytes non decrypted.");
467                                 }
468                             }
469                             // request more data and return.
470                             requestMore();
471                             return;
472                         }
473                         if (complete && result.status() == Status.CLOSED) {
474                             if (debugr.on()) debugr.log("Closed: completing");
475                             outgoing(Utils.EMPTY_BB_LIST, true);
476                             // complete ALPN if not yet completed
477                             setALPN();
478                             requestMoreDataIfNeeded();
479                             return;
480                         }
481                         if (result.handshaking()) {
482                             handshaking = true;
483                             if (debugr.on()) debugr.log("handshaking");
484                             if (doHandshake(result, READER)) continue; // need unwrap
485                             else break; // doHandshake will have triggered the write scheduler if necessary
486                         } else {
487                             if (trySetALPN()) {
488                                 resumeActivity();
489                             }
490                         }
491                     } catch (IOException ex) {
492                         Throwable cause = checkForHandshake(ex);
493                         errorCommon(cause);
494                         handleError(cause);
495                         return;
496                     }
497                     if (handshaking && !complete) {
498                         requestMoreDataIfNeeded();
499                         return;
500                     }
501                 }
502                 if (!complete) {
503                     synchronized (readBufferLock) {
504                         complete = this.completing && !readBuf.hasRemaining();
505                     }
506                 }
507                 if (complete) {
508                     if (debugr.on()) debugr.log("completing");
509                     // Complete the alpnCF, if not already complete, regardless of
510                     // whether or not the ALPN is available, there will be no more
511                     // activity.
512                     setALPN();
513                     outgoing(Utils.EMPTY_BB_LIST, true);
514                 } else {
515                     requestMoreDataIfNeeded();
516                 }
517             } catch (Throwable ex) {
518                 ex = checkForHandshake(ex);
519                 errorCommon(ex);
520                 handleError(ex);
521             }
522         }
523 
524         private volatile Status lastUnwrapStatus;
unwrapBuffer(ByteBuffer src)525         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
526             ByteBuffer dst = getAppBuffer();
527             int len = src.remaining();
528             while (true) {
529                 SSLEngineResult sslResult = engine.unwrap(src, dst);
530                 switch (lastUnwrapStatus = sslResult.getStatus()) {
531                     case BUFFER_OVERFLOW:
532                         // may happen if app size buffer was changed, or if
533                         // our 'adaptiveBufferSize' guess was too small for
534                         // the current payload. In that case, update the
535                         // value of applicationBufferSize, and allocate a
536                         // buffer of that size, which we are sure will be
537                         // big enough to decode whatever needs to be
538                         // decoded. We will later update adaptiveBufferSize
539                         // in OK: below.
540                         int appSize = applicationBufferSize =
541                                 engine.getSession().getApplicationBufferSize();
542                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
543                         dst.flip();
544                         b.put(dst);
545                         dst = b;
546                         break;
547                     case CLOSED:
548                         assert dst.position() == 0;
549                         return doClosure(new EngineResult(sslResult));
550                     case BUFFER_UNDERFLOW:
551                         // handled implicitly by compaction/reallocation of readBuf
552                         assert dst.position() == 0;
553                         return new EngineResult(sslResult);
554                     case OK:
555                         int size = dst.position();
556                         if (debug.on()) {
557                             debugr.log("Decoded " + size + " bytes out of " + len
558                                     + " into buffer of " + dst.capacity()
559                                     + " remaining to decode: " + src.remaining());
560                         }
561                         // if the record payload was bigger than what was originally
562                         // allocated, then sets the adaptiveAppBufferSize to size
563                         // and we will use that new size as a guess for the next app
564                         // buffer.
565                         if (size > adaptiveAppBufferSize) {
566                             adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
567                         }
568                         dst.flip();
569                         return new EngineResult(sslResult, dst);
570                 }
571             }
572         }
573     }
574 
575     public interface Monitorable {
getInfo()576         public String getInfo();
577     }
578 
579     public static class Monitor extends Thread {
580         final List<WeakReference<Monitorable>> list;
581         final List<FinalMonitorable> finalList;
582         final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
583         static Monitor themon;
584 
585         static {
586             themon = new Monitor();
themon.start()587             themon.start(); // uncomment to enable Monitor
588         }
589 
590         // An instance used to temporarily store the
591         // last observable state of a monitorable object.
592         // When Monitor.remove(o) is called, we replace
593         // 'o' with a FinalMonitorable whose reference
594         // will be enqueued after the last observable state
595         // has been printed.
596         final class FinalMonitorable implements Monitorable {
597             final String finalState;
FinalMonitorable(Monitorable o)598             FinalMonitorable(Monitorable o) {
599                 finalState = o.getInfo();
600                 finalList.add(this);
601             }
602             @Override
getInfo()603             public String getInfo() {
604                 finalList.remove(this);
605                 return finalState;
606             }
607         }
608 
Monitor()609         Monitor() {
610             super("Monitor");
611             setDaemon(true);
612             list = Collections.synchronizedList(new LinkedList<>());
613             finalList = new ArrayList<>(); // access is synchronized on list above
614         }
615 
addTarget(Monitorable o)616         void addTarget(Monitorable o) {
617             list.add(new WeakReference<>(o, queue));
618         }
removeTarget(Monitorable o)619         void removeTarget(Monitorable o) {
620             // It can take a long time for GC to clean up references.
621             // Calling Monitor.remove() early helps removing noise from the
622             // logs/
623             synchronized (list) {
624                 Iterator<WeakReference<Monitorable>> it = list.iterator();
625                 while (it.hasNext()) {
626                     Monitorable m = it.next().get();
627                     if (m == null) it.remove();
628                     if (o == m) {
629                         it.remove();
630                         break;
631                     }
632                 }
633                 FinalMonitorable m = new FinalMonitorable(o);
634                 addTarget(m);
635                 Reference.reachabilityFence(m);
636             }
637         }
638 
add(Monitorable o)639         public static void add(Monitorable o) {
640             themon.addTarget(o);
641         }
remove(Monitorable o)642         public static void remove(Monitorable o) {
643             themon.removeTarget(o);
644         }
645 
646         @Override
run()647         public void run() {
648             System.out.println("Monitor starting");
649             try {
650                 while (true) {
651                     Thread.sleep(20 * 1000);
652                     synchronized (list) {
653                         Reference<? extends Monitorable> expired;
654                         while ((expired = queue.poll()) != null) list.remove(expired);
655                         for (WeakReference<Monitorable> ref : list) {
656                             Monitorable o = ref.get();
657                             if (o == null) continue;
658                             if (o instanceof FinalMonitorable) {
659                                 ref.enqueue();
660                             }
661                             System.out.println(o.getInfo());
662                             System.out.println("-------------------------");
663                         }
664                     }
665                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
666                 }
667             } catch (InterruptedException e) {
668                 System.out.println("Monitor exiting with " + e);
669             }
670         }
671     }
672 
673     /**
674      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
675      * Any encrypted buffers generated are passed downstream to be written.
676      * Status codes:
677      *     NEED_UNWRAP: call reader.addData() with empty buffer
678      *     NEED_WRAP: call addData() with empty buffer
679      *     NEED_TASK: delegate task to executor
680      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
681      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
682      *     OK: return generated buffers
683      */
684     class Writer extends SubscriberWrapper {
685         final SequentialScheduler scheduler;
686         // queues of buffers received from upstream waiting
687         // to be processed by the SSLEngine
688         final List<ByteBuffer> writeList;
689         final Logger debugw =  Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
690         volatile boolean completing;
691         boolean completed; // only accessed in processData
692 
693         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
run()694             @Override public void run() { processData(); }
695         }
696 
Writer()697         Writer() {
698             super();
699             writeList = Collections.synchronizedList(new LinkedList<>());
700             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
701         }
702 
703         @Override
incoming(List<ByteBuffer> buffers, boolean complete)704         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
705             assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
706             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
707             if (complete) {
708                 if (debugw.on()) debugw.log("adding SENTINEL");
709                 completing = true;
710                 writeList.add(SENTINEL);
711             } else {
712                 writeList.addAll(buffers);
713             }
714             if (debugw.on())
715                 debugw.log("added " + buffers.size()
716                            + " (" + Utils.remaining(buffers)
717                            + " bytes) to the writeList");
718             scheduler.runOrSchedule();
719         }
720 
dbgString()721         public final String dbgString() {
722             return "SSL Writer(" + tubeName + ")";
723         }
724 
onSubscribe()725         protected void onSubscribe() {
726             if (debugw.on()) debugw.log("onSubscribe initiating handshaking");
727             addData(HS_TRIGGER);  // initiates handshaking
728         }
729 
schedule()730         void schedule() {
731             scheduler.runOrSchedule();
732         }
733 
stop()734         void stop() {
735             if (debugw.on()) debugw.log("stop");
736             scheduler.stop();
737         }
738 
739         @Override
closing()740         public boolean closing() {
741             return closeNotifyReceived();
742         }
743 
isCompleting()744         private boolean isCompleting() {
745             return completing;
746         }
747 
748         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)749         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
750             if (writeList.size() > 10)
751                 return 0;
752             else
753                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
754         }
755 
hsTriggered()756         private boolean hsTriggered() {
757             synchronized(writeList) {
758                 for (ByteBuffer b : writeList)
759                     if (b == HS_TRIGGER)
760                         return true;
761                 return false;
762             }
763         }
764 
triggerWrite()765         void triggerWrite() {
766             synchronized (writeList) {
767                 if (writeList.isEmpty()) {
768                     writeList.add(HS_TRIGGER);
769                 }
770             }
771             scheduler.runOrSchedule();
772         }
773 
processData()774         private void processData() {
775             boolean completing = isCompleting();
776 
777             try {
778                 if (debugw.on())
779                     debugw.log("processData, writeList remaining:"
780                                 + Utils.remaining(writeList) + ", hsTriggered:"
781                                 + hsTriggered() + ", needWrap:" + needWrap());
782 
783                 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
784                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
785                     EngineResult result = wrapBuffers(outbufs);
786                     if (debugw.on())
787                         debugw.log("wrapBuffer returned %s", result.result);
788 
789                     if (result.status() == Status.CLOSED) {
790                         if (!upstreamCompleted) {
791                             upstreamCompleted = true;
792                             upstreamSubscription.cancel();
793                             // complete ALPN if not yet completed
794                             setALPN();
795                         }
796                         if (result.bytesProduced() <= 0)
797                             return;
798 
799                         if (!completing && !completed) {
800                             completing = this.completing = true;
801                             // There could still be some outgoing data in outbufs.
802                             writeList.add(SENTINEL);
803                         }
804                     }
805 
806                     boolean handshaking = false;
807                     if (result.handshaking()) {
808                         if (debugw.on()) debugw.log("handshaking");
809                         doHandshake(result, WRITER);  // ok to ignore return
810                         handshaking = true;
811                     } else {
812                         if (trySetALPN()) {
813                             resumeActivity();
814                         }
815                     }
816                     cleanList(writeList); // tidy up the source list
817                     sendResultBytes(result);
818                     if (handshaking) {
819                         if (!completing && needWrap()) {
820                             continue;
821                         } else {
822                             return;
823                         }
824                     }
825                 }
826                 if (completing && Utils.remaining(writeList) == 0) {
827                     if (!completed) {
828                         completed = true;
829                         writeList.clear();
830                         outgoing(Utils.EMPTY_BB_LIST, true);
831                     }
832                     return;
833                 }
834                 if (writeList.isEmpty() && needWrap()) {
835                     writer.addData(HS_TRIGGER);
836                 }
837             } catch (Throwable ex) {
838                 ex = checkForHandshake(ex);
839                 errorCommon(ex);
840                 handleError(ex);
841             }
842         }
843 
844         // The SSLEngine insists on being given a buffer that is at least
845         // SSLSession.getPacketBufferSize() long (usually 16K). If given
846         // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
847         // has 6 bytes to wrap. Typical usage shows that for GET we
848         // usually produce an average of ~ 100 bytes.
849         // To avoid wasting space, and because allocating and zeroing
850         // 16K buffers for encoding 6 bytes is costly, we are reusing the
851         // same writeBuffer to interact with SSLEngine.wrap().
852         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
853         // then we copy off the bytes to a smaller buffer that we send
854         // downstream. Otherwise, we send the writeBuffer downstream
855         // and will allocate a new one next time.
856         volatile ByteBuffer writeBuffer;
857         private volatile Status lastWrappedStatus;
858         @SuppressWarnings("fallthrough")
wrapBuffers(ByteBuffer[] src)859         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
860             long len = Utils.remaining(src);
861             if (debugw.on())
862                 debugw.log("wrapping " + len + " bytes");
863 
864             ByteBuffer dst = writeBuffer;
865             if (dst == null) dst = writeBuffer = getNetBuffer();
866             assert dst.position() == 0 : "buffer position is " + dst.position();
867             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
868 
869             while (true) {
870                 SSLEngineResult sslResult = engine.wrap(src, dst);
871                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
872                 switch (lastWrappedStatus = sslResult.getStatus()) {
873                     case BUFFER_OVERFLOW:
874                         // Shouldn't happen. We allocated buffer with packet size
875                         // get it again if net buffer size was changed
876                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
877                         int netSize = packetBufferSize
878                                 = engine.getSession().getPacketBufferSize();
879                         ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
880                         dst.flip();
881                         b.put(dst);
882                         dst = b;
883                         break; // try again
884                     case CLOSED:
885                         if (debugw.on()) debugw.log("CLOSED");
886                         // fallthrough. There could be some remaining data in dst.
887                         // CLOSED will be handled by the caller.
888                     case OK:
889                         final ByteBuffer dest;
890                         if (dst.position() == 0) {
891                             dest = NOTHING; // can happen if handshake is in progress
892                         } else if (dst.position() < dst.capacity() / 2) {
893                             // less than half the buffer was used.
894                             // copy off the bytes to a smaller buffer, and keep
895                             // the writeBuffer for next time.
896                             dst.flip();
897                             dest = Utils.copyAligned(dst);
898                             dst.clear();
899                         } else {
900                             // more than half the buffer was used.
901                             // just send that buffer downstream, and we will
902                             // get a new writeBuffer next time it is needed.
903                             dst.flip();
904                             dest = dst;
905                             writeBuffer = null;
906                         }
907                         if (debugw.on())
908                             debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
909                                        dest.remaining(),  dest.capacity(), Utils.remaining(src));
910                         return new EngineResult(sslResult, dest);
911                     case BUFFER_UNDERFLOW:
912                         // Shouldn't happen.  Doesn't returns when wrap()
913                         // underflow handled externally
914                         // assert false : "Buffer Underflow";
915                         if (debug.on()) debug.log("BUFFER_UNDERFLOW");
916                         return new EngineResult(sslResult);
917                     default:
918                         if (debugw.on())
919                             debugw.log("result: %s", sslResult.getStatus());
920                         assert false : "result:" + sslResult.getStatus();
921                 }
922             }
923         }
924 
needWrap()925         private boolean needWrap() {
926             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
927         }
928 
sendResultBytes(EngineResult result)929         private void sendResultBytes(EngineResult result) {
930             if (result.bytesProduced() > 0) {
931                 if (debugw.on())
932                     debugw.log("Sending %d bytes downstream",
933                                result.bytesProduced());
934                 outgoing(result.destBuffer, false);
935             }
936         }
937 
938         @Override
toString()939         public String toString() {
940             return "WRITER: " + super.toString()
941                     + ", writeList size: " + Integer.toString(writeList.size())
942                     + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
943                     + ", status: " + lastWrappedStatus;
944                     //" writeList: " + writeList.toString();
945         }
946     }
947 
handleError(Throwable t)948     private void handleError(Throwable t) {
949         if (debug.on()) debug.log("handleError", t);
950         readerCF.completeExceptionally(t);
951         writerCF.completeExceptionally(t);
952         // no-op if already completed
953         alpnCF.completeExceptionally(t);
954         reader.stop();
955         writer.stop();
956     }
957 
958     boolean stopped;
959 
normalStop()960     private synchronized void normalStop() {
961         if (stopped)
962             return;
963         stopped = true;
964         reader.stop();
965         writer.stop();
966         // make sure the alpnCF is completed.
967         if (!alpnCF.isDone()) {
968             Throwable alpn = new SSLHandshakeException(
969                     "Connection closed before successful ALPN negotiation");
970             alpnCF.completeExceptionally(alpn);
971         }
972         if (isMonitored) Monitor.remove(monitor);
973     }
974 
stopOnError(Throwable error)975     private Void stopOnError(Throwable error) {
976         // maybe log, etc
977         // ensure the ALPN is completed
978         // We could also do this in SSLTube.SSLSubscriberWrapper
979         // onError/onComplete - with the caveat that the ALP CF
980         // would get completed externally. Doing it here keeps
981         // it all inside SSLFlowDelegate.
982         if (!alpnCF.isDone()) {
983             alpnCF.completeExceptionally(error);
984         }
985         normalStop();
986         return null;
987     }
988 
cleanList(List<ByteBuffer> l)989     private void cleanList(List<ByteBuffer> l) {
990         synchronized (l) {
991             Iterator<ByteBuffer> iter = l.iterator();
992             while (iter.hasNext()) {
993                 ByteBuffer b = iter.next();
994                 if (!b.hasRemaining() && b != SENTINEL) {
995                     iter.remove();
996                 }
997             }
998         }
999     }
1000 
1001     /**
1002      * States for handshake. We avoid races when accessing/updating the AtomicInt
1003      * because updates always schedule an additional call to both the read()
1004      * and write() functions.
1005      */
1006     private static final int NOT_HANDSHAKING = 0;
1007     private static final int HANDSHAKING = 1;
1008 
1009     // Bit flags
1010     // a thread is currently executing tasks
1011     private static final int DOING_TASKS = 4;
1012     // a thread wants to execute tasks, while another thread is executing
1013     private static final int REQUESTING_TASKS = 8;
1014     private static final int TASK_BITS = 12; // Both bits
1015 
1016     private static final int READER = 1;
1017     private static final int WRITER = 2;
1018 
states(AtomicInteger state)1019     private static String states(AtomicInteger state) {
1020         int s = state.get();
1021         StringBuilder sb = new StringBuilder();
1022         int x = s & ~TASK_BITS;
1023         switch (x) {
1024             case NOT_HANDSHAKING    -> sb.append(" NOT_HANDSHAKING ");
1025             case HANDSHAKING        -> sb.append(" HANDSHAKING ");
1026 
1027             default -> throw new InternalError();
1028         }
1029         if ((s & DOING_TASKS) > 0)
1030             sb.append("|DOING_TASKS");
1031         if ((s & REQUESTING_TASKS) > 0)
1032             sb.append("|REQUESTING_TASKS");
1033         return sb.toString();
1034     }
1035 
resumeActivity()1036     private void resumeActivity() {
1037         reader.schedule();
1038         writer.schedule();
1039     }
1040 
1041     final AtomicInteger handshakeState;
1042     final ConcurrentLinkedQueue<String> stateList =
1043             debug.on() ? new ConcurrentLinkedQueue<>() : null;
1044 
1045     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
1046     // depending on previous value
1047     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
1048         if ((current & DOING_TASKS) == 0)
1049             return DOING_TASKS | (current & HANDSHAKING);
1050         else
1051             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
1052     };
1053 
1054     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
1055     // clears bits if not.
1056     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
1057         if ((current & REQUESTING_TASKS) != 0)
1058             return DOING_TASKS | (current & HANDSHAKING);
1059         // clear both bits
1060         return (current & HANDSHAKING);
1061     };
1062 
doHandshake(EngineResult r, int caller)1063     private boolean doHandshake(EngineResult r, int caller) {
1064         // unconditionally sets the HANDSHAKING bit, while preserving task bits
1065         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
1066         if (stateList != null && debug.on()) {
1067             stateList.add(r.handshakeStatus().toString());
1068             stateList.add(Integer.toString(caller));
1069         }
1070         switch (r.handshakeStatus()) {
1071             case NEED_TASK:
1072                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
1073                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
1074                     return false;
1075                 }
1076 
1077                 if (debug.on()) debug.log("obtaining and initiating task execution");
1078                 List<Runnable> tasks = obtainTasks();
1079                 executeTasks(tasks);
1080                 return false;  // executeTasks will resume activity
1081             case NEED_WRAP:
1082                 if (caller == READER) {
1083                     writer.triggerWrite();
1084                     return false;
1085                 }
1086                 break;
1087             case NEED_UNWRAP:
1088             case NEED_UNWRAP_AGAIN:
1089                 // do nothing else
1090                 // receiving-side data will trigger unwrap
1091                 if (caller == WRITER) {
1092                     reader.schedule();
1093                     return false;
1094                 }
1095                 break;
1096             default:
1097                 throw new InternalError("Unexpected handshake status:"
1098                                         + r.handshakeStatus());
1099         }
1100         return true;
1101     }
1102 
obtainTasks()1103     private List<Runnable> obtainTasks() {
1104         List<Runnable> l = new ArrayList<>();
1105         Runnable r;
1106         while ((r = engine.getDelegatedTask()) != null) {
1107             l.add(r);
1108         }
1109         return l;
1110     }
1111 
executeTasks(List<Runnable> tasks)1112     private void executeTasks(List<Runnable> tasks) {
1113         exec.execute(() -> {
1114             try {
1115                 List<Runnable> nextTasks = tasks;
1116                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
1117                 do {
1118                     nextTasks.forEach(Runnable::run);
1119                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
1120                         nextTasks = obtainTasks();
1121                     } else {
1122                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
1123                         if ((s & DOING_TASKS) != 0) {
1124                             if (debug.on()) debug.log("re-running tasks (B)");
1125                             nextTasks = obtainTasks();
1126                             continue;
1127                         }
1128                         break;
1129                     }
1130                 } while (true);
1131                 if (debug.on()) debug.log("finished task execution");
1132                 HandshakeStatus hs = engine.getHandshakeStatus();
1133                 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
1134                     // We're no longer handshaking, try setting ALPN
1135                     trySetALPN();
1136                 }
1137                 resumeActivity();
1138             } catch (Throwable t) {
1139                 handleError(checkForHandshake(t));
1140             }
1141         });
1142     }
1143 
trySetALPN()1144     boolean trySetALPN() {
1145         // complete ALPN CF if needed.
1146         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
1147             applicationBufferSize = engine.getSession().getApplicationBufferSize();
1148             packetBufferSize = engine.getSession().getPacketBufferSize();
1149             setALPN();
1150             return true;
1151         }
1152         return false;
1153     }
1154 
1155     // FIXME: acknowledge a received CLOSE request from peer
doClosure(EngineResult r)1156     EngineResult doClosure(EngineResult r) throws IOException {
1157         if (debug.on())
1158             debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
1159                       r.result, engine.getHandshakeStatus(),
1160                       engine.isOutboundDone(), engine.isInboundDone());
1161         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
1162             // we have received TLS close_notify and need to send
1163             // an acknowledgement back. We're calling doHandshake
1164             // to finish the close handshake.
1165             if (engine.isInboundDone() && !engine.isOutboundDone()) {
1166                 if (debug.on()) debug.log("doClosure: close_notify received");
1167                 close_notify_received = true;
1168                 if (!writer.scheduler.isStopped()) {
1169                     doHandshake(r, READER);
1170                 } else {
1171                     // We have received closed notify, but we
1172                     // won't be able to send the acknowledgement.
1173                     // Nothing more will come from the socket either,
1174                     // so mark the reader as completed.
1175                     synchronized (reader.readBufferLock) {
1176                         reader.completing = true;
1177                     }
1178                 }
1179             }
1180         }
1181         return r;
1182     }
1183 
1184     /**
1185      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
1186      * This flow must be given the encrypted data read from upstream (eg socket)
1187      * before it is decrypted.
1188      */
upstreamReader()1189     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
1190         return reader;
1191     }
1192 
1193     /**
1194      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
1195      * This flow contains the plaintext data before it is encrypted.
1196      */
upstreamWriter()1197     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
1198         return writer;
1199     }
1200 
resumeReader()1201     public boolean resumeReader() {
1202         return reader.signalScheduling();
1203     }
1204 
resetReaderDemand()1205     public void resetReaderDemand() {
1206         reader.resetDownstreamDemand();
1207     }
1208 
1209     static class EngineResult {
1210         final SSLEngineResult result;
1211         final ByteBuffer destBuffer;
1212 
1213         // normal result
EngineResult(SSLEngineResult result)1214         EngineResult(SSLEngineResult result) {
1215             this(result, null);
1216         }
1217 
EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1218         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
1219             this.result = result;
1220             this.destBuffer = destBuffer;
1221         }
1222 
handshaking()1223         boolean handshaking() {
1224             HandshakeStatus s = result.getHandshakeStatus();
1225             return s != HandshakeStatus.FINISHED
1226                    && s != HandshakeStatus.NOT_HANDSHAKING
1227                    && result.getStatus() != Status.CLOSED;
1228         }
1229 
needUnwrap()1230         boolean needUnwrap() {
1231             HandshakeStatus s = result.getHandshakeStatus();
1232             return s == HandshakeStatus.NEED_UNWRAP;
1233         }
1234 
1235 
bytesConsumed()1236         int bytesConsumed() {
1237             return result.bytesConsumed();
1238         }
1239 
bytesProduced()1240         int bytesProduced() {
1241             return result.bytesProduced();
1242         }
1243 
handshakeStatus()1244         SSLEngineResult.HandshakeStatus handshakeStatus() {
1245             return result.getHandshakeStatus();
1246         }
1247 
status()1248         SSLEngineResult.Status status() {
1249             return result.getStatus();
1250         }
1251     }
1252 
1253     // The maximum network buffer size negotiated during
1254     // the handshake. Usually 16K.
1255     volatile int packetBufferSize;
getNetBuffer()1256     final ByteBuffer getNetBuffer() {
1257         int netSize = packetBufferSize;
1258         if (netSize <= 0) {
1259             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
1260         }
1261         return ByteBuffer.allocate(netSize);
1262     }
1263 
1264     // The maximum application buffer size negotiated during
1265     // the handshake. Usually close to 16K.
1266     volatile int applicationBufferSize;
1267     // Despite of the maximum applicationBufferSize negotiated
1268     // above, TLS records usually have a much smaller payload.
1269     // The adaptativeAppBufferSize records the max payload
1270     // ever decoded, and we use that as a guess for how big
1271     // a buffer we will need for the next payload.
1272     // This avoids allocating and zeroing a 16K buffer for
1273     // nothing...
1274     volatile int adaptiveAppBufferSize;
getAppBuffer()1275     final ByteBuffer getAppBuffer() {
1276         int appSize = applicationBufferSize;
1277         if (appSize <= 0) {
1278             applicationBufferSize = appSize
1279                     = engine.getSession().getApplicationBufferSize();
1280         }
1281         int size = adaptiveAppBufferSize;
1282         if (size <= 0) {
1283             size = 512; // start with 512 this is usually enough for handshaking / headers
1284         } else if (size > appSize) {
1285             size = appSize;
1286         }
1287         // will cause a BUFFER_OVERFLOW if not big enough, but
1288         // that's OK.
1289         return ByteBuffer.allocate(size);
1290     }
1291 
dbgString()1292     final String dbgString() {
1293         return "SSLFlowDelegate(" + tubeName + ")";
1294     }
1295 }
1296