1 /*
2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.common;
27 
28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
29 
30 import javax.net.ssl.SSLEngine;
31 import javax.net.ssl.SSLEngineResult;
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
33 import javax.net.ssl.SSLEngineResult.Status;
34 import javax.net.ssl.SSLException;
35 import javax.net.ssl.SSLHandshakeException;
36 import java.io.IOException;
37 import java.lang.ref.Reference;
38 import java.lang.ref.ReferenceQueue;
39 import java.lang.ref.WeakReference;
40 import java.nio.ByteBuffer;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.Iterator;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.Flow;
50 import java.util.concurrent.Flow.Subscriber;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.function.Consumer;
53 import java.util.function.IntBinaryOperator;
54 
55 /**
56  * Implements SSL using two SubscriberWrappers.
57  *
58  * <p> Constructor takes two Flow.Subscribers: one that receives the network
59  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
60  * receives the application data (before it has been encrypted by SSLFlowDelegate).
61  *
62  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
63  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
64  * See diagram below.
65  *
66  * <p> How Flow.Subscribers are used in this class, and where they come from:
67  * <pre>
68  * {@code
69  *
70  *
71  *
72  * --------->  data flow direction
73  *
74  *
75  *                         +------------------+
76  *        upstreamWriter   |                  | downWriter
77  *        ---------------> |                  | ------------>
78  *  obtained from this     |                  | supplied to constructor
79  *                         | SSLFlowDelegate  |
80  *        downReader       |                  | upstreamReader
81  *        <--------------- |                  | <--------------
82  * supplied to constructor |                  | obtained from this
83  *                         +------------------+
84  *
85  * Errors are reported to the downReader Flow.Subscriber
86  *
87  * }
88  * </pre>
89  */
90 public class SSLFlowDelegate {
91 
92     final Logger debug =
93             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
94 
95     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
96     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
97     // When handshake is in progress trying to wrap may produce no bytes.
98     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
99     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
100     private static final boolean isMonitored =
101             monProp != null && (monProp.isEmpty() || monProp.equalsIgnoreCase("true"));
102 
103     final Executor exec;
104     final Reader reader;
105     final Writer writer;
106     final SSLEngine engine;
107     final String tubeName; // hack
108     final CompletableFuture<String> alpnCF; // completes on initial handshake
109     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
110     volatile boolean close_notify_received;
111     final CompletableFuture<Void> readerCF;
112     final CompletableFuture<Void> writerCF;
113     final CompletableFuture<Void> stopCF;
114     final Consumer<ByteBuffer> recycler;
115     static AtomicInteger scount = new AtomicInteger(1);
116     final int id;
117 
118     /**
119      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
120      * Flow.Subscriber requires an associated {@link CompletableFuture}
121      * for errors that need to be signaled from downstream to upstream.
122      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)123     public SSLFlowDelegate(SSLEngine engine,
124                            Executor exec,
125                            Subscriber<? super List<ByteBuffer>> downReader,
126                            Subscriber<? super List<ByteBuffer>> downWriter)
127     {
128         this(engine, exec, null, downReader, downWriter);
129     }
130 
131     /**
132      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
133      * Flow.Subscriber requires an associated {@link CompletableFuture}
134      * for errors that need to be signaled from downstream to upstream.
135      */
SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)136     public SSLFlowDelegate(SSLEngine engine,
137             Executor exec,
138             Consumer<ByteBuffer> recycler,
139             Subscriber<? super List<ByteBuffer>> downReader,
140             Subscriber<? super List<ByteBuffer>> downWriter)
141         {
142         this.id = scount.getAndIncrement();
143         this.tubeName = String.valueOf(downWriter);
144         this.recycler = recycler;
145         this.reader = new Reader();
146         this.writer = new Writer();
147         this.engine = engine;
148         this.exec = exec;
149         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
150         this.readerCF = reader.completion();
151         this.writerCF = reader.completion();
152         readerCF.exceptionally(this::stopOnError);
153         writerCF.exceptionally(this::stopOnError);
154         this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
155             .thenRun(this::normalStop);
156         this.alpnCF = new MinimalFuture<>();
157 
158         // connect the Reader to the downReader and the
159         // Writer to the downWriter.
160         connect(downReader, downWriter);
161 
162         if (isMonitored) Monitor.add(monitor);
163     }
164 
165     /**
166      * Returns true if the SSLFlowDelegate has detected a TLS
167      * close_notify from the server.
168      * @return true, if a close_notify was detected.
169      */
closeNotifyReceived()170     public boolean closeNotifyReceived() {
171         return close_notify_received;
172     }
173 
174     /**
175      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
176      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
177      * Called from within the constructor. Overwritten by SSLTube.
178      *
179      * @param downReader  The left hand side read sink (typically, the
180      *                    HttpConnection read subscriber).
181      * @param downWriter  The right hand side write sink (typically
182      *                    the SocketTube write subscriber).
183      */
connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter)184     void connect(Subscriber<? super List<ByteBuffer>> downReader,
185                  Subscriber<? super List<ByteBuffer>> downWriter) {
186         this.reader.subscribe(downReader);
187         this.writer.subscribe(downWriter);
188     }
189 
190    /**
191     * Returns a CompletableFuture<String> which completes after
192     * the initial handshake completes, and which contains the negotiated
193     * alpn.
194     */
alpn()195     public CompletableFuture<String> alpn() {
196         return alpnCF;
197     }
198 
setALPN()199     private void setALPN() {
200         // Handshake is finished. So, can retrieve the ALPN now
201         if (alpnCF.isDone())
202             return;
203         String alpn = engine.getApplicationProtocol();
204         if (debug.on()) debug.log("setALPN = %s", alpn);
205         alpnCF.complete(alpn);
206     }
207 
monitor()208     public String monitor() {
209         StringBuilder sb = new StringBuilder();
210         sb.append("SSL: id ").append(id);
211         sb.append(" ").append(dbgString());
212         sb.append(" HS state: " + states(handshakeState));
213         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
214         if (stateList != null) {
215             sb.append(" LL : ");
216             for (String s : stateList) {
217                 sb.append(s).append(" ");
218             }
219         }
220         sb.append("\r\n");
221         sb.append("Reader:: ").append(reader.toString());
222         sb.append("\r\n");
223         sb.append("Writer:: ").append(writer.toString());
224         sb.append("\r\n===================================");
225         return sb.toString();
226     }
227 
enterReadScheduling()228     protected SchedulingAction enterReadScheduling() {
229         return SchedulingAction.CONTINUE;
230     }
231 
232 
233     /**
234      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
235      * Any decrypted buffers returned to be passed downstream.
236      * Status codes:
237      *     NEED_UNWRAP: do nothing. Following incoming data will contain
238      *                  any required handshake data
239      *     NEED_WRAP: call writer.addData() with empty buffer
240      *     NEED_TASK: delegate task to executor
241      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
242      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
243      *     OK: return generated buffers.
244      *
245      * Upstream subscription strategy is to try and keep no more than
246      * TARGET_BUFSIZE bytes in readBuf
247      */
248     final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
249         // Maximum record size is 16k.
250         // Because SocketTube can feeds us up to 3 16K buffers,
251         // then setting this size to 16K means that the readBuf
252         // can store up to 64K-1 (16K-1 + 3*16K)
253         static final int TARGET_BUFSIZE = 16 * 1024;
254 
255         final SequentialScheduler scheduler;
256         volatile ByteBuffer readBuf;
257         volatile boolean completing;
258         final Object readBufferLock = new Object();
259         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
260 
261         private final class ReaderDownstreamPusher implements Runnable {
262             @Override
run()263             public void run() {
264                 processData();
265             }
266         }
267 
Reader()268         Reader() {
269             super();
270             scheduler = SequentialScheduler.synchronizedScheduler(
271                     new ReaderDownstreamPusher());
272             this.readBuf = ByteBuffer.allocate(1024);
273             readBuf.limit(0); // keep in read mode
274         }
275 
276         @Override
supportsRecycling()277         public boolean supportsRecycling() {
278             return recycler != null;
279         }
280 
enterScheduling()281         protected SchedulingAction enterScheduling() {
282             return enterReadScheduling();
283         }
284 
dbgString()285         public final String dbgString() {
286             return "SSL Reader(" + tubeName + ")";
287         }
288 
289         /**
290          * entry point for buffers delivered from upstream Subscriber
291          */
292         @Override
incoming(List<ByteBuffer> buffers, boolean complete)293         public void incoming(List<ByteBuffer> buffers, boolean complete) {
294             if (debugr.on())
295                 debugr.log("Adding %d bytes to read buffer",
296                         Utils.remaining(buffers));
297             addToReadBuf(buffers, complete);
298             scheduler.runOrSchedule(exec);
299         }
300 
301         @Override
toString()302         public String toString() {
303             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
304                     + ", count: " + count.toString() + ", scheduler: "
305                     + (scheduler.isStopped() ? "stopped" : "running")
306                     + ", status: " + lastUnwrapStatus
307                     + ", handshakeState: " + handshakeState.get()
308                     + ", engine: " + engine.getHandshakeStatus();
309         }
310 
reallocReadBuf()311         private void reallocReadBuf() {
312             int sz = readBuf.capacity();
313             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
314             readBuf.flip();
315             Utils.copy(readBuf, newb);
316             readBuf = newb;
317         }
318 
319         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)320         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
321             if (needsMoreData()) {
322                 // run the scheduler to see if more data should be requested
323                 if (debugr.on()) {
324                     int remaining = readBuf.remaining();
325                     if (remaining > TARGET_BUFSIZE) {
326                         // just some logging to check how much we have in the read buffer
327                         debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
328                                 remaining);
329                     }
330                 }
331                 scheduler.runOrSchedule();
332             }
333             return 0; // we will request more from the scheduler loop (processData).
334         }
335 
336         // readBuf is kept ready for reading outside of this method
addToReadBuf(List<ByteBuffer> buffers, boolean complete)337         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
338             assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
339             synchronized (readBufferLock) {
340                 for (ByteBuffer buf : buffers) {
341                     readBuf.compact();
342                     while (readBuf.remaining() < buf.remaining())
343                         reallocReadBuf();
344                     readBuf.put(buf);
345                     readBuf.flip();
346                     // should be safe to call inside lock
347                     // since the only implementation
348                     // offers the buffer to an unbounded queue.
349                     // WARNING: do not touch buf after this point!
350                     if (recycler != null) recycler.accept(buf);
351                 }
352                 if (complete) {
353                     this.completing = complete;
354                     minBytesRequired = 0;
355                 }
356             }
357         }
358 
schedule()359         void schedule() {
360             scheduler.runOrSchedule(exec);
361         }
362 
stop()363         void stop() {
364             if (debugr.on()) debugr.log("stop");
365             scheduler.stop();
366         }
367 
368         AtomicInteger count = new AtomicInteger(0);
369 
370         // minimum number of bytes required to call unwrap.
371         // Usually this is 0, unless there was a buffer underflow.
372         // In this case we need to wait for more bytes than what
373         // we had before calling unwrap() again.
374         volatile int minBytesRequired;
375 
376         // We might need to request more data if:
377         //  - we have a subscription from upstream
378         //  - and we don't have enough data to decrypt in the read buffer
379         //  - *and* - either we're handshaking, and more data is required (NEED_UNWRAP),
380         //          - or we have demand from downstream, but we have nothing decrypted
381         //            to forward downstream.
needsMoreData()382         boolean needsMoreData() {
383             if (upstreamSubscription != null && readBuf.remaining() <= minBytesRequired &&
384                     (engine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP
385                             || !downstreamSubscription.demand.isFulfilled() && hasNoOutputData())) {
386                 return true;
387             }
388             return false;
389         }
390 
391         // If the readBuf has not enough data, and we either need to
392         // unwrap (handshaking) or we have demand from downstream,
393         // then request more data
requestMoreDataIfNeeded()394         void requestMoreDataIfNeeded() {
395             if (needsMoreData()) {
396                 // request more will only request more if our
397                 // demand from upstream is fulfilled
398                 requestMore();
399             }
400         }
401 
402         // work function where it all happens
processData()403         final void processData() {
404             try {
405                 if (debugr.on())
406                     debugr.log("processData:"
407                             + " readBuf remaining:" + readBuf.remaining()
408                             + ", state:" + states(handshakeState)
409                             + ", engine handshake status:" + engine.getHandshakeStatus());
410                 int len;
411                 boolean complete = false;
412                 while (readBuf.remaining() > (len = minBytesRequired)) {
413                     boolean handshaking = false;
414                     try {
415                         EngineResult result;
416                         synchronized (readBufferLock) {
417                             complete = this.completing;
418                             if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
419                             // Unless there is a BUFFER_UNDERFLOW, we should try to
420                             // unwrap any number of bytes. Set minBytesRequired to 0:
421                             // we only need to do that if minBytesRequired is not already 0.
422                             len = len > 0 ? minBytesRequired = 0 : len;
423                             result = unwrapBuffer(readBuf);
424                             len = readBuf.remaining();
425                             if (debugr.on()) {
426                                 debugr.log("Unwrapped: result: %s", result.result);
427                                 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
428                             }
429                         }
430                         if (result.bytesProduced() > 0) {
431                             if (debugr.on())
432                                 debugr.log("sending %d", result.bytesProduced());
433                             count.addAndGet(result.bytesProduced());
434                             outgoing(result.destBuffer, false);
435                         }
436                         if (result.status() == Status.BUFFER_UNDERFLOW) {
437                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
438                             // not enough data in the read buffer...
439                             // no need to try to unwrap again unless we get more bytes
440                             // than minBytesRequired = len in the read buffer.
441                             synchronized (readBufferLock) {
442                                 minBytesRequired = len;
443                                 // more bytes could already have been added...
444                                 assert readBuf.remaining() >= len;
445                                 // check if we have received some data, and if so
446                                 // we can just re-spin the loop
447                                 if (readBuf.remaining() > len) continue;
448                                 else if (this.completing) {
449                                     if (debug.on()) {
450                                         debugr.log("BUFFER_UNDERFLOW with EOF," +
451                                                 " %d bytes non decrypted.", len);
452                                     }
453                                     // The channel won't send us any more data, and
454                                     // we are in underflow: we need to fail.
455                                     throw new IOException("BUFFER_UNDERFLOW with EOF, "
456                                             + len + " bytes non decrypted.");
457                                 }
458                             }
459                             // request more data and return.
460                             requestMore();
461                             return;
462                         }
463                         if (complete && result.status() == Status.CLOSED) {
464                             if (debugr.on()) debugr.log("Closed: completing");
465                             outgoing(Utils.EMPTY_BB_LIST, true);
466                             // complete ALPN if not yet completed
467                             setALPN();
468                             requestMoreDataIfNeeded();
469                             return;
470                         }
471                         if (result.handshaking()) {
472                             handshaking = true;
473                             if (debugr.on()) debugr.log("handshaking");
474                             if (doHandshake(result, READER)) continue; // need unwrap
475                             else break; // doHandshake will have triggered the write scheduler if necessary
476                         } else {
477                             if (trySetALPN()) {
478                                 resumeActivity();
479                             }
480                         }
481                     } catch (IOException ex) {
482                         errorCommon(ex);
483                         handleError(ex);
484                         return;
485                     }
486                     if (handshaking && !complete) {
487                         requestMoreDataIfNeeded();
488                         return;
489                     }
490                 }
491                 if (!complete) {
492                     synchronized (readBufferLock) {
493                         complete = this.completing && !readBuf.hasRemaining();
494                     }
495                 }
496                 if (complete) {
497                     if (debugr.on()) debugr.log("completing");
498                     // Complete the alpnCF, if not already complete, regardless of
499                     // whether or not the ALPN is available, there will be no more
500                     // activity.
501                     setALPN();
502                     outgoing(Utils.EMPTY_BB_LIST, true);
503                 } else {
504                     requestMoreDataIfNeeded();
505                 }
506             } catch (Throwable ex) {
507                 errorCommon(ex);
508                 handleError(ex);
509             }
510         }
511 
512         private volatile Status lastUnwrapStatus;
unwrapBuffer(ByteBuffer src)513         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
514             ByteBuffer dst = getAppBuffer();
515             int len = src.remaining();
516             while (true) {
517                 SSLEngineResult sslResult = engine.unwrap(src, dst);
518                 switch (lastUnwrapStatus = sslResult.getStatus()) {
519                     case BUFFER_OVERFLOW:
520                         // may happen if app size buffer was changed, or if
521                         // our 'adaptiveBufferSize' guess was too small for
522                         // the current payload. In that case, update the
523                         // value of applicationBufferSize, and allocate a
524                         // buffer of that size, which we are sure will be
525                         // big enough to decode whatever needs to be
526                         // decoded. We will later update adaptiveBufferSize
527                         // in OK: below.
528                         int appSize = applicationBufferSize =
529                                 engine.getSession().getApplicationBufferSize();
530                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
531                         dst.flip();
532                         b.put(dst);
533                         dst = b;
534                         break;
535                     case CLOSED:
536                         assert dst.position() == 0;
537                         return doClosure(new EngineResult(sslResult));
538                     case BUFFER_UNDERFLOW:
539                         // handled implicitly by compaction/reallocation of readBuf
540                         assert dst.position() == 0;
541                         return new EngineResult(sslResult);
542                     case OK:
543                         int size = dst.position();
544                         if (debug.on()) {
545                             debugr.log("Decoded " + size + " bytes out of " + len
546                                     + " into buffer of " + dst.capacity()
547                                     + " remaining to decode: " + src.remaining());
548                         }
549                         // if the record payload was bigger than what was originally
550                         // allocated, then sets the adaptiveAppBufferSize to size
551                         // and we will use that new size as a guess for the next app
552                         // buffer.
553                         if (size > adaptiveAppBufferSize) {
554                             adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
555                         }
556                         dst.flip();
557                         return new EngineResult(sslResult, dst);
558                 }
559             }
560         }
561     }
562 
563     public interface Monitorable {
getInfo()564         public String getInfo();
565     }
566 
567     public static class Monitor extends Thread {
568         final List<WeakReference<Monitorable>> list;
569         final List<FinalMonitorable> finalList;
570         final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
571         static Monitor themon;
572 
573         static {
574             themon = new Monitor();
themon.start()575             themon.start(); // uncomment to enable Monitor
576         }
577 
578         // An instance used to temporarily store the
579         // last observable state of a monitorable object.
580         // When Monitor.remove(o) is called, we replace
581         // 'o' with a FinalMonitorable whose reference
582         // will be enqueued after the last observable state
583         // has been printed.
584         final class FinalMonitorable implements Monitorable {
585             final String finalState;
FinalMonitorable(Monitorable o)586             FinalMonitorable(Monitorable o) {
587                 finalState = o.getInfo();
588                 finalList.add(this);
589             }
590             @Override
getInfo()591             public String getInfo() {
592                 finalList.remove(this);
593                 return finalState;
594             }
595         }
596 
Monitor()597         Monitor() {
598             super("Monitor");
599             setDaemon(true);
600             list = Collections.synchronizedList(new LinkedList<>());
601             finalList = new ArrayList<>(); // access is synchronized on list above
602         }
603 
addTarget(Monitorable o)604         void addTarget(Monitorable o) {
605             list.add(new WeakReference<>(o, queue));
606         }
removeTarget(Monitorable o)607         void removeTarget(Monitorable o) {
608             // It can take a long time for GC to clean up references.
609             // Calling Monitor.remove() early helps removing noise from the
610             // logs/
611             synchronized (list) {
612                 Iterator<WeakReference<Monitorable>> it = list.iterator();
613                 while (it.hasNext()) {
614                     Monitorable m = it.next().get();
615                     if (m == null) it.remove();
616                     if (o == m) {
617                         it.remove();
618                         break;
619                     }
620                 }
621                 FinalMonitorable m = new FinalMonitorable(o);
622                 addTarget(m);
623                 Reference.reachabilityFence(m);
624             }
625         }
626 
add(Monitorable o)627         public static void add(Monitorable o) {
628             themon.addTarget(o);
629         }
remove(Monitorable o)630         public static void remove(Monitorable o) {
631             themon.removeTarget(o);
632         }
633 
634         @Override
run()635         public void run() {
636             System.out.println("Monitor starting");
637             try {
638                 while (true) {
639                     Thread.sleep(20 * 1000);
640                     synchronized (list) {
641                         Reference<? extends Monitorable> expired;
642                         while ((expired = queue.poll()) != null) list.remove(expired);
643                         for (WeakReference<Monitorable> ref : list) {
644                             Monitorable o = ref.get();
645                             if (o == null) continue;
646                             if (o instanceof FinalMonitorable) {
647                                 ref.enqueue();
648                             }
649                             System.out.println(o.getInfo());
650                             System.out.println("-------------------------");
651                         }
652                     }
653                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
654                 }
655             } catch (InterruptedException e) {
656                 System.out.println("Monitor exiting with " + e);
657             }
658         }
659     }
660 
661     /**
662      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
663      * Any encrypted buffers generated are passed downstream to be written.
664      * Status codes:
665      *     NEED_UNWRAP: call reader.addData() with empty buffer
666      *     NEED_WRAP: call addData() with empty buffer
667      *     NEED_TASK: delegate task to executor
668      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
669      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
670      *     OK: return generated buffers
671      */
672     class Writer extends SubscriberWrapper {
673         final SequentialScheduler scheduler;
674         // queues of buffers received from upstream waiting
675         // to be processed by the SSLEngine
676         final List<ByteBuffer> writeList;
677         final Logger debugw =  Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
678         volatile boolean completing;
679         boolean completed; // only accessed in processData
680 
681         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
run()682             @Override public void run() { processData(); }
683         }
684 
Writer()685         Writer() {
686             super();
687             writeList = Collections.synchronizedList(new LinkedList<>());
688             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
689         }
690 
691         @Override
incoming(List<ByteBuffer> buffers, boolean complete)692         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
693             assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
694             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
695             if (complete) {
696                 if (debugw.on()) debugw.log("adding SENTINEL");
697                 completing = true;
698                 writeList.add(SENTINEL);
699             } else {
700                 writeList.addAll(buffers);
701             }
702             if (debugw.on())
703                 debugw.log("added " + buffers.size()
704                            + " (" + Utils.remaining(buffers)
705                            + " bytes) to the writeList");
706             scheduler.runOrSchedule();
707         }
708 
dbgString()709         public final String dbgString() {
710             return "SSL Writer(" + tubeName + ")";
711         }
712 
onSubscribe()713         protected void onSubscribe() {
714             if (debugw.on()) debugw.log("onSubscribe initiating handshaking");
715             addData(HS_TRIGGER);  // initiates handshaking
716         }
717 
schedule()718         void schedule() {
719             scheduler.runOrSchedule();
720         }
721 
stop()722         void stop() {
723             if (debugw.on()) debugw.log("stop");
724             scheduler.stop();
725         }
726 
727         @Override
closing()728         public boolean closing() {
729             return closeNotifyReceived();
730         }
731 
isCompleting()732         private boolean isCompleting() {
733             return completing;
734         }
735 
736         @Override
upstreamWindowUpdate(long currentWindow, long downstreamQsize)737         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
738             if (writeList.size() > 10)
739                 return 0;
740             else
741                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
742         }
743 
hsTriggered()744         private boolean hsTriggered() {
745             synchronized(writeList) {
746                 for (ByteBuffer b : writeList)
747                     if (b == HS_TRIGGER)
748                         return true;
749                 return false;
750             }
751         }
752 
triggerWrite()753         void triggerWrite() {
754             synchronized (writeList) {
755                 if (writeList.isEmpty()) {
756                     writeList.add(HS_TRIGGER);
757                 }
758             }
759             scheduler.runOrSchedule();
760         }
761 
processData()762         private void processData() {
763             boolean completing = isCompleting();
764 
765             try {
766                 if (debugw.on())
767                     debugw.log("processData, writeList remaining:"
768                                 + Utils.remaining(writeList) + ", hsTriggered:"
769                                 + hsTriggered() + ", needWrap:" + needWrap());
770 
771                 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
772                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
773                     EngineResult result = wrapBuffers(outbufs);
774                     if (debugw.on())
775                         debugw.log("wrapBuffer returned %s", result.result);
776 
777                     if (result.status() == Status.CLOSED) {
778                         if (!upstreamCompleted) {
779                             upstreamCompleted = true;
780                             upstreamSubscription.cancel();
781                             // complete ALPN if not yet completed
782                             setALPN();
783                         }
784                         if (result.bytesProduced() <= 0)
785                             return;
786 
787                         if (!completing && !completed) {
788                             completing = this.completing = true;
789                             // There could still be some outgoing data in outbufs.
790                             writeList.add(SENTINEL);
791                         }
792                     }
793 
794                     boolean handshaking = false;
795                     if (result.handshaking()) {
796                         if (debugw.on()) debugw.log("handshaking");
797                         doHandshake(result, WRITER);  // ok to ignore return
798                         handshaking = true;
799                     } else {
800                         if (trySetALPN()) {
801                             resumeActivity();
802                         }
803                     }
804                     cleanList(writeList); // tidy up the source list
805                     sendResultBytes(result);
806                     if (handshaking) {
807                         if (!completing && needWrap()) {
808                             continue;
809                         } else {
810                             return;
811                         }
812                     }
813                 }
814                 if (completing && Utils.remaining(writeList) == 0) {
815                     if (!completed) {
816                         completed = true;
817                         writeList.clear();
818                         outgoing(Utils.EMPTY_BB_LIST, true);
819                     }
820                     return;
821                 }
822                 if (writeList.isEmpty() && needWrap()) {
823                     writer.addData(HS_TRIGGER);
824                 }
825             } catch (Throwable ex) {
826                 errorCommon(ex);
827                 handleError(ex);
828             }
829         }
830 
831         // The SSLEngine insists on being given a buffer that is at least
832         // SSLSession.getPacketBufferSize() long (usually 16K). If given
833         // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
834         // has 6 bytes to wrap. Typical usage shows that for GET we
835         // usually produce an average of ~ 100 bytes.
836         // To avoid wasting space, and because allocating and zeroing
837         // 16K buffers for encoding 6 bytes is costly, we are reusing the
838         // same writeBuffer to interact with SSLEngine.wrap().
839         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
840         // then we copy off the bytes to a smaller buffer that we send
841         // downstream. Otherwise, we send the writeBuffer downstream
842         // and will allocate a new one next time.
843         volatile ByteBuffer writeBuffer;
844         private volatile Status lastWrappedStatus;
845         @SuppressWarnings("fallthrough")
wrapBuffers(ByteBuffer[] src)846         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
847             long len = Utils.remaining(src);
848             if (debugw.on())
849                 debugw.log("wrapping " + len + " bytes");
850 
851             ByteBuffer dst = writeBuffer;
852             if (dst == null) dst = writeBuffer = getNetBuffer();
853             assert dst.position() == 0 : "buffer position is " + dst.position();
854             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
855 
856             while (true) {
857                 SSLEngineResult sslResult = engine.wrap(src, dst);
858                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
859                 switch (lastWrappedStatus = sslResult.getStatus()) {
860                     case BUFFER_OVERFLOW:
861                         // Shouldn't happen. We allocated buffer with packet size
862                         // get it again if net buffer size was changed
863                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
864                         int netSize = packetBufferSize
865                                 = engine.getSession().getPacketBufferSize();
866                         ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
867                         dst.flip();
868                         b.put(dst);
869                         dst = b;
870                         break; // try again
871                     case CLOSED:
872                         if (debugw.on()) debugw.log("CLOSED");
873                         // fallthrough. There could be some remaining data in dst.
874                         // CLOSED will be handled by the caller.
875                     case OK:
876                         final ByteBuffer dest;
877                         if (dst.position() == 0) {
878                             dest = NOTHING; // can happen if handshake is in progress
879                         } else if (dst.position() < dst.capacity() / 2) {
880                             // less than half the buffer was used.
881                             // copy off the bytes to a smaller buffer, and keep
882                             // the writeBuffer for next time.
883                             dst.flip();
884                             dest = Utils.copyAligned(dst);
885                             dst.clear();
886                         } else {
887                             // more than half the buffer was used.
888                             // just send that buffer downstream, and we will
889                             // get a new writeBuffer next time it is needed.
890                             dst.flip();
891                             dest = dst;
892                             writeBuffer = null;
893                         }
894                         if (debugw.on())
895                             debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
896                                        dest.remaining(),  dest.capacity(), Utils.remaining(src));
897                         return new EngineResult(sslResult, dest);
898                     case BUFFER_UNDERFLOW:
899                         // Shouldn't happen.  Doesn't returns when wrap()
900                         // underflow handled externally
901                         // assert false : "Buffer Underflow";
902                         if (debug.on()) debug.log("BUFFER_UNDERFLOW");
903                         return new EngineResult(sslResult);
904                     default:
905                         if (debugw.on())
906                             debugw.log("result: %s", sslResult.getStatus());
907                         assert false : "result:" + sslResult.getStatus();
908                 }
909             }
910         }
911 
needWrap()912         private boolean needWrap() {
913             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
914         }
915 
sendResultBytes(EngineResult result)916         private void sendResultBytes(EngineResult result) {
917             if (result.bytesProduced() > 0) {
918                 if (debugw.on())
919                     debugw.log("Sending %d bytes downstream",
920                                result.bytesProduced());
921                 outgoing(result.destBuffer, false);
922             }
923         }
924 
925         @Override
toString()926         public String toString() {
927             return "WRITER: " + super.toString()
928                     + ", writeList size: " + Integer.toString(writeList.size())
929                     + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
930                     + ", status: " + lastWrappedStatus;
931                     //" writeList: " + writeList.toString();
932         }
933     }
934 
handleError(Throwable t)935     private void handleError(Throwable t) {
936         if (debug.on()) debug.log("handleError", t);
937         readerCF.completeExceptionally(t);
938         writerCF.completeExceptionally(t);
939         // no-op if already completed
940         alpnCF.completeExceptionally(t);
941         reader.stop();
942         writer.stop();
943     }
944 
945     boolean stopped;
946 
normalStop()947     private synchronized void normalStop() {
948         if (stopped)
949             return;
950         stopped = true;
951         reader.stop();
952         writer.stop();
953         // make sure the alpnCF is completed.
954         if (!alpnCF.isDone()) {
955             Throwable alpn = new SSLHandshakeException(
956                     "Connection closed before successful ALPN negotiation");
957             alpnCF.completeExceptionally(alpn);
958         }
959         if (isMonitored) Monitor.remove(monitor);
960     }
961 
stopOnError(Throwable error)962     private Void stopOnError(Throwable error) {
963         // maybe log, etc
964         // ensure the ALPN is completed
965         // We could also do this in SSLTube.SSLSubscriberWrapper
966         // onError/onComplete - with the caveat that the ALP CF
967         // would get completed externally. Doing it here keeps
968         // it all inside SSLFlowDelegate.
969         if (!alpnCF.isDone()) {
970             alpnCF.completeExceptionally(error);
971         }
972         normalStop();
973         return null;
974     }
975 
cleanList(List<ByteBuffer> l)976     private void cleanList(List<ByteBuffer> l) {
977         synchronized (l) {
978             Iterator<ByteBuffer> iter = l.iterator();
979             while (iter.hasNext()) {
980                 ByteBuffer b = iter.next();
981                 if (!b.hasRemaining() && b != SENTINEL) {
982                     iter.remove();
983                 }
984             }
985         }
986     }
987 
988     /**
989      * States for handshake. We avoid races when accessing/updating the AtomicInt
990      * because updates always schedule an additional call to both the read()
991      * and write() functions.
992      */
993     private static final int NOT_HANDSHAKING = 0;
994     private static final int HANDSHAKING = 1;
995 
996     // Bit flags
997     // a thread is currently executing tasks
998     private static final int DOING_TASKS = 4;
999     // a thread wants to execute tasks, while another thread is executing
1000     private static final int REQUESTING_TASKS = 8;
1001     private static final int TASK_BITS = 12; // Both bits
1002 
1003     private static final int READER = 1;
1004     private static final int WRITER = 2;
1005 
states(AtomicInteger state)1006     private static String states(AtomicInteger state) {
1007         int s = state.get();
1008         StringBuilder sb = new StringBuilder();
1009         int x = s & ~TASK_BITS;
1010         switch (x) {
1011             case NOT_HANDSHAKING:
1012                 sb.append(" NOT_HANDSHAKING ");
1013                 break;
1014             case HANDSHAKING:
1015                 sb.append(" HANDSHAKING ");
1016                 break;
1017             default:
1018                 throw new InternalError();
1019         }
1020         if ((s & DOING_TASKS) > 0)
1021             sb.append("|DOING_TASKS");
1022         if ((s & REQUESTING_TASKS) > 0)
1023             sb.append("|REQUESTING_TASKS");
1024         return sb.toString();
1025     }
1026 
resumeActivity()1027     private void resumeActivity() {
1028         reader.schedule();
1029         writer.schedule();
1030     }
1031 
1032     final AtomicInteger handshakeState;
1033     final ConcurrentLinkedQueue<String> stateList =
1034             debug.on() ? new ConcurrentLinkedQueue<>() : null;
1035 
1036     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
1037     // depending on previous value
1038     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
1039         if ((current & DOING_TASKS) == 0)
1040             return DOING_TASKS | (current & HANDSHAKING);
1041         else
1042             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
1043     };
1044 
1045     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
1046     // clears bits if not.
1047     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
1048         if ((current & REQUESTING_TASKS) != 0)
1049             return DOING_TASKS | (current & HANDSHAKING);
1050         // clear both bits
1051         return (current & HANDSHAKING);
1052     };
1053 
doHandshake(EngineResult r, int caller)1054     private boolean doHandshake(EngineResult r, int caller) {
1055         // unconditionally sets the HANDSHAKING bit, while preserving task bits
1056         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
1057         if (stateList != null && debug.on()) {
1058             stateList.add(r.handshakeStatus().toString());
1059             stateList.add(Integer.toString(caller));
1060         }
1061         switch (r.handshakeStatus()) {
1062             case NEED_TASK:
1063                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
1064                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
1065                     return false;
1066                 }
1067 
1068                 if (debug.on()) debug.log("obtaining and initiating task execution");
1069                 List<Runnable> tasks = obtainTasks();
1070                 executeTasks(tasks);
1071                 return false;  // executeTasks will resume activity
1072             case NEED_WRAP:
1073                 if (caller == READER) {
1074                     writer.triggerWrite();
1075                     return false;
1076                 }
1077                 break;
1078             case NEED_UNWRAP:
1079             case NEED_UNWRAP_AGAIN:
1080                 // do nothing else
1081                 // receiving-side data will trigger unwrap
1082                 if (caller == WRITER) {
1083                     reader.schedule();
1084                     return false;
1085                 }
1086                 break;
1087             default:
1088                 throw new InternalError("Unexpected handshake status:"
1089                                         + r.handshakeStatus());
1090         }
1091         return true;
1092     }
1093 
obtainTasks()1094     private List<Runnable> obtainTasks() {
1095         List<Runnable> l = new ArrayList<>();
1096         Runnable r;
1097         while ((r = engine.getDelegatedTask()) != null) {
1098             l.add(r);
1099         }
1100         return l;
1101     }
1102 
executeTasks(List<Runnable> tasks)1103     private void executeTasks(List<Runnable> tasks) {
1104         exec.execute(() -> {
1105             try {
1106                 List<Runnable> nextTasks = tasks;
1107                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
1108                 do {
1109                     nextTasks.forEach(Runnable::run);
1110                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
1111                         nextTasks = obtainTasks();
1112                     } else {
1113                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
1114                         if ((s & DOING_TASKS) != 0) {
1115                             if (debug.on()) debug.log("re-running tasks (B)");
1116                             nextTasks = obtainTasks();
1117                             continue;
1118                         }
1119                         break;
1120                     }
1121                 } while (true);
1122                 if (debug.on()) debug.log("finished task execution");
1123                 HandshakeStatus hs = engine.getHandshakeStatus();
1124                 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
1125                     // We're no longer handshaking, try setting ALPN
1126                     trySetALPN();
1127                 }
1128                 resumeActivity();
1129             } catch (Throwable t) {
1130                 handleError(t);
1131             }
1132         });
1133     }
1134 
trySetALPN()1135     boolean trySetALPN() {
1136         // complete ALPN CF if needed.
1137         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
1138             applicationBufferSize = engine.getSession().getApplicationBufferSize();
1139             packetBufferSize = engine.getSession().getPacketBufferSize();
1140             setALPN();
1141             return true;
1142         }
1143         return false;
1144     }
1145 
1146     // FIXME: acknowledge a received CLOSE request from peer
doClosure(EngineResult r)1147     EngineResult doClosure(EngineResult r) throws IOException {
1148         if (debug.on())
1149             debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
1150                       r.result, engine.getHandshakeStatus(),
1151                       engine.isOutboundDone(), engine.isInboundDone());
1152         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
1153             // we have received TLS close_notify and need to send
1154             // an acknowledgement back. We're calling doHandshake
1155             // to finish the close handshake.
1156             if (engine.isInboundDone() && !engine.isOutboundDone()) {
1157                 if (debug.on()) debug.log("doClosure: close_notify received");
1158                 close_notify_received = true;
1159                 if (!writer.scheduler.isStopped()) {
1160                     doHandshake(r, READER);
1161                 } else {
1162                     // We have received closed notify, but we
1163                     // won't be able to send the acknowledgement.
1164                     // Nothing more will come from the socket either,
1165                     // so mark the reader as completed.
1166                     synchronized (reader.readBufferLock) {
1167                         reader.completing = true;
1168                     }
1169                 }
1170             }
1171         }
1172         return r;
1173     }
1174 
1175     /**
1176      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
1177      * This flow must be given the encrypted data read from upstream (eg socket)
1178      * before it is decrypted.
1179      */
upstreamReader()1180     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
1181         return reader;
1182     }
1183 
1184     /**
1185      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
1186      * This flow contains the plaintext data before it is encrypted.
1187      */
upstreamWriter()1188     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
1189         return writer;
1190     }
1191 
resumeReader()1192     public boolean resumeReader() {
1193         return reader.signalScheduling();
1194     }
1195 
resetReaderDemand()1196     public void resetReaderDemand() {
1197         reader.resetDownstreamDemand();
1198     }
1199 
1200     static class EngineResult {
1201         final SSLEngineResult result;
1202         final ByteBuffer destBuffer;
1203 
1204         // normal result
EngineResult(SSLEngineResult result)1205         EngineResult(SSLEngineResult result) {
1206             this(result, null);
1207         }
1208 
EngineResult(SSLEngineResult result, ByteBuffer destBuffer)1209         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
1210             this.result = result;
1211             this.destBuffer = destBuffer;
1212         }
1213 
handshaking()1214         boolean handshaking() {
1215             HandshakeStatus s = result.getHandshakeStatus();
1216             return s != HandshakeStatus.FINISHED
1217                    && s != HandshakeStatus.NOT_HANDSHAKING
1218                    && result.getStatus() != Status.CLOSED;
1219         }
1220 
needUnwrap()1221         boolean needUnwrap() {
1222             HandshakeStatus s = result.getHandshakeStatus();
1223             return s == HandshakeStatus.NEED_UNWRAP;
1224         }
1225 
1226 
bytesConsumed()1227         int bytesConsumed() {
1228             return result.bytesConsumed();
1229         }
1230 
bytesProduced()1231         int bytesProduced() {
1232             return result.bytesProduced();
1233         }
1234 
handshakeStatus()1235         SSLEngineResult.HandshakeStatus handshakeStatus() {
1236             return result.getHandshakeStatus();
1237         }
1238 
status()1239         SSLEngineResult.Status status() {
1240             return result.getStatus();
1241         }
1242     }
1243 
1244     // The maximum network buffer size negotiated during
1245     // the handshake. Usually 16K.
1246     volatile int packetBufferSize;
getNetBuffer()1247     final ByteBuffer getNetBuffer() {
1248         int netSize = packetBufferSize;
1249         if (netSize <= 0) {
1250             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
1251         }
1252         return ByteBuffer.allocate(netSize);
1253     }
1254 
1255     // The maximum application buffer size negotiated during
1256     // the handshake. Usually close to 16K.
1257     volatile int applicationBufferSize;
1258     // Despite of the maximum applicationBufferSize negotiated
1259     // above, TLS records usually have a much smaller payload.
1260     // The adaptativeAppBufferSize records the max payload
1261     // ever decoded, and we use that as a guess for how big
1262     // a buffer we will need for the next payload.
1263     // This avoids allocating and zeroing a 16K buffer for
1264     // nothing...
1265     volatile int adaptiveAppBufferSize;
getAppBuffer()1266     final ByteBuffer getAppBuffer() {
1267         int appSize = applicationBufferSize;
1268         if (appSize <= 0) {
1269             applicationBufferSize = appSize
1270                     = engine.getSession().getApplicationBufferSize();
1271         }
1272         int size = adaptiveAppBufferSize;
1273         if (size <= 0) {
1274             size = 512; // start with 512 this is usually enough for handshaking / headers
1275         } else if (size > appSize) {
1276             size = appSize;
1277         }
1278         // will cause a BUFFER_OVERFLOW if not big enough, but
1279         // that's OK.
1280         return ByteBuffer.allocate(size);
1281     }
1282 
dbgString()1283     final String dbgString() {
1284         return "SSLFlowDelegate(" + tubeName + ")";
1285     }
1286 }
1287