1 /*
2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.websocket;
27 
28 import jdk.internal.net.http.common.Demand;
29 import jdk.internal.net.http.common.Log;
30 import jdk.internal.net.http.common.Logger;
31 import jdk.internal.net.http.common.MinimalFuture;
32 import jdk.internal.net.http.common.SequentialScheduler;
33 import jdk.internal.net.http.common.Utils;
34 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
35 
36 import java.io.IOException;
37 import java.lang.ref.Reference;
38 import java.net.ProtocolException;
39 import java.net.URI;
40 import java.net.http.WebSocket;
41 import java.nio.ByteBuffer;
42 import java.nio.CharBuffer;
43 import java.nio.charset.CharacterCodingException;
44 import java.nio.charset.CharsetEncoder;
45 import java.nio.charset.CodingErrorAction;
46 import java.nio.charset.StandardCharsets;
47 import java.util.Objects;
48 import java.util.concurrent.CompletableFuture;
49 import java.util.concurrent.CompletionStage;
50 import java.util.concurrent.atomic.AtomicBoolean;
51 import java.util.concurrent.atomic.AtomicLong;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.BiConsumer;
54 import java.util.function.Function;
55 
56 import static java.util.Objects.requireNonNull;
57 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
58 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
59 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
60 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient;
61 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY;
62 import static jdk.internal.net.http.websocket.WebSocketImpl.State.CLOSE;
63 import static jdk.internal.net.http.websocket.WebSocketImpl.State.ERROR;
64 import static jdk.internal.net.http.websocket.WebSocketImpl.State.IDLE;
65 import static jdk.internal.net.http.websocket.WebSocketImpl.State.OPEN;
66 import static jdk.internal.net.http.websocket.WebSocketImpl.State.PING;
67 import static jdk.internal.net.http.websocket.WebSocketImpl.State.PONG;
68 import static jdk.internal.net.http.websocket.WebSocketImpl.State.TEXT;
69 import static jdk.internal.net.http.websocket.WebSocketImpl.State.WAITING;
70 
71 /*
72  * A WebSocket client.
73  */
74 public final class WebSocketImpl implements WebSocket {
75 
76     private static final Logger debug =
77             Utils.getWebSocketLogger("[WebSocket]"::toString, Utils.DEBUG_WS);
78     private final AtomicLong sendCounter = new AtomicLong();
79     private final AtomicLong receiveCounter = new AtomicLong();
80 
81     enum State {
82         OPEN,
83         IDLE,
84         WAITING,
85         TEXT,
86         BINARY,
87         PING,
88         PONG,
89         CLOSE,
90         ERROR
91     }
92 
93     private final AtomicReference<ByteBuffer> lastAutomaticPong = new AtomicReference<>();
94     private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
95     private volatile boolean inputClosed;
96     private final AtomicBoolean outputClosed = new AtomicBoolean();
97 
98     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
99 
100     /* Components of calls to Listener's methods */
101     private boolean last;
102     private ByteBuffer binaryData;
103     private CharSequence text;
104     private int statusCode;
105     private String reason;
106     private final AtomicReference<Throwable> error = new AtomicReference<>();
107 
108     private final URI uri;
109     private final String subprotocol;
110     private final Listener listener;
111 
112     private final AtomicBoolean pendingTextOrBinary = new AtomicBoolean();
113     private final AtomicBoolean pendingPingOrPong = new AtomicBoolean();
114     private final Transport transport;
115     private final SequentialScheduler receiveScheduler
116             = new SequentialScheduler(new ReceiveTask());
117     private final Demand demand = new Demand();
118 
newInstanceAsync(BuilderImpl b)119     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
120         Function<Result, WebSocket> newWebSocket = r -> {
121             WebSocket ws = newInstance(b.getUri(),
122                                        r.subprotocol,
123                                        b.getListener(),
124                                        r.transport);
125             // Make sure we don't release the builder until this lambda
126             // has been executed. The builder has a strong reference to
127             // the HttpClientFacade, and we want to keep that live until
128             // after the raw channel is created and passed to WebSocketImpl.
129             Reference.reachabilityFence(b);
130             return ws;
131         };
132         OpeningHandshake h;
133         try {
134             h = new OpeningHandshake(b);
135         } catch (Throwable e) {
136             return failedFuture(e);
137         }
138         return h.send().thenApply(newWebSocket);
139     }
140 
141     /* Exposed for testing purposes */
newInstance(URI uri, String subprotocol, Listener listener, TransportFactory transport)142     static WebSocketImpl newInstance(URI uri,
143                                      String subprotocol,
144                                      Listener listener,
145                                      TransportFactory transport) {
146         WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
147         // This initialisation is outside of the constructor for the sake of
148         // safe publication of WebSocketImpl.this
149         ws.signalOpen();
150         return ws;
151     }
152 
WebSocketImpl(URI uri, String subprotocol, Listener listener, TransportFactory transportFactory)153     private WebSocketImpl(URI uri,
154                           String subprotocol,
155                           Listener listener,
156                           TransportFactory transportFactory) {
157         this.uri = requireNonNull(uri);
158         this.subprotocol = requireNonNull(subprotocol);
159         this.listener = requireNonNull(listener);
160         // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
161         // 2 automatic Ping replies + 1 automatic Close = 6 messages
162         // Why 2 automatic Pong replies? One is being sent, but the byte buffer
163         // has been set to null, another just has been added.
164         this.transport = transportFactory.createTransport(new MessageQueue(6),
165                 new SignallingMessageConsumer());
166     }
167 
168     // FIXME: add to action handling of errors -> signalError()
169 
170     @Override
sendText(CharSequence message, boolean last)171     public CompletableFuture<WebSocket> sendText(CharSequence message,
172                                                  boolean last) {
173         Objects.requireNonNull(message);
174         long id = 0;
175         if (debug.on()) {
176             id = sendCounter.incrementAndGet();
177             debug.log("enter send text %s payload length=%s last=%s",
178                       id, message.length(), last);
179         }
180         CompletableFuture<WebSocket> result;
181         if (!setPendingTextOrBinary()) {
182             result = failedFuture(new IllegalStateException("Send pending"));
183         } else {
184             result = transport.sendText(message, last, this,
185                                         (r, e) -> clearPendingTextOrBinary());
186         }
187         if (debug.on()) {
188             debug.log("exit send text %s returned %s", id, result);
189         }
190 
191         return replaceNull(result);
192     }
193 
194     @Override
sendBinary(ByteBuffer message, boolean last)195     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
196                                                    boolean last) {
197         Objects.requireNonNull(message);
198         long id = 0;
199         if (debug.on()) {
200             id = sendCounter.incrementAndGet();
201             debug.log("enter send binary %s payload=%s last=%s",
202                       id, message, last);
203         }
204         CompletableFuture<WebSocket> result;
205         if (!setPendingTextOrBinary()) {
206             result = failedFuture(new IllegalStateException("Send pending"));
207         } else {
208             result = transport.sendBinary(message, last, this,
209                                           (r, e) -> clearPendingTextOrBinary());
210         }
211         if (debug.on()) {
212             debug.log("exit send binary %s returned %s", id, result);
213         }
214         return replaceNull(result);
215     }
216 
clearPendingTextOrBinary()217     private void clearPendingTextOrBinary() {
218         pendingTextOrBinary.set(false);
219     }
220 
setPendingTextOrBinary()221     private boolean setPendingTextOrBinary() {
222         return pendingTextOrBinary.compareAndSet(false, true);
223     }
224 
replaceNull( CompletableFuture<WebSocket> cf)225     private CompletableFuture<WebSocket> replaceNull(
226             CompletableFuture<WebSocket> cf)
227     {
228         if (cf == null) {
229             return DONE;
230         } else {
231             return cf;
232         }
233     }
234 
235     @Override
sendPing(ByteBuffer message)236     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
237         Objects.requireNonNull(message);
238         long id = 0;
239         if (debug.on()) {
240             id = sendCounter.incrementAndGet();
241             debug.log("enter send ping %s payload=%s", id, message);
242         }
243         CompletableFuture<WebSocket> result;
244         if (!setPendingPingOrPong()) {
245             result = failedFuture(new IllegalStateException("Send pending"));
246         } else {
247             result = transport.sendPing(message, this,
248                                         (r, e) -> clearPendingPingOrPong());
249         }
250         if (debug.on()) {
251             debug.log("exit send ping %s returned %s", id, result);
252         }
253         return replaceNull(result);
254     }
255 
256     @Override
sendPong(ByteBuffer message)257     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
258         Objects.requireNonNull(message);
259         long id = 0;
260         if (debug.on()) {
261             id = sendCounter.incrementAndGet();
262             debug.log("enter send pong %s payload=%s", id, message);
263         }
264         CompletableFuture<WebSocket> result;
265         if (!setPendingPingOrPong()) {
266             result = failedFuture(new IllegalStateException("Send pending"));
267         } else {
268             result =  transport.sendPong(message, this,
269                                          (r, e) -> clearPendingPingOrPong());
270         }
271         if (debug.on()) {
272             debug.log("exit send pong %s returned %s", id, result);
273         }
274         return replaceNull(result);
275     }
276 
setPendingPingOrPong()277     private boolean setPendingPingOrPong() {
278         return pendingPingOrPong.compareAndSet(false, true);
279     }
280 
clearPendingPingOrPong()281     private void clearPendingPingOrPong() {
282         pendingPingOrPong.set(false);
283     }
284 
285     @Override
sendClose(int statusCode, String reason)286     public CompletableFuture<WebSocket> sendClose(int statusCode,
287                                                   String reason) {
288         Objects.requireNonNull(reason);
289         long id = 0;
290         if (debug.on()) {
291             id = sendCounter.incrementAndGet();
292             debug.log("enter send close %s statusCode=%s reason.length=%s",
293                       id, statusCode, reason.length());
294         }
295         CompletableFuture<WebSocket> result;
296         // Close message is the only type of message whose validity is checked
297         // in the corresponding send method. This is made in order to close the
298         // output in place. Otherwise the number of Close messages in queue
299         // would not be bounded.
300         if (!isLegalToSendFromClient(statusCode)) {
301             result = failedFuture(new IllegalArgumentException("statusCode"));
302         } else if (!isLegalReason(reason)) {
303             result = failedFuture(new IllegalArgumentException("reason"));
304         } else if (!outputClosed.compareAndSet(false, true)){
305             result = failedFuture(new IOException("Output closed"));
306         } else {
307             result = sendClose0(statusCode, reason);
308         }
309         if (debug.on()) {
310             debug.log("exit send close %s returned %s", id, result);
311         }
312         return replaceNull(result);
313     }
314 
isLegalReason(String reason)315     private static boolean isLegalReason(String reason) {
316         if (reason.length() > 123) { // quick check
317             return false;
318         }
319         CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder()
320                         .onMalformedInput(CodingErrorAction.REPORT)
321                         .onUnmappableCharacter(CodingErrorAction.REPORT);
322         ByteBuffer bytes;
323         try {
324             bytes = encoder.encode(CharBuffer.wrap(reason));
325         } catch (CharacterCodingException ignored) {
326             return false;
327         }
328         return bytes.remaining() <= 123;
329     }
330 
331     /*
332      * The implementation uses this method internally to send Close messages
333      * with codes that are not allowed to be sent through the API.
334      */
sendClose0(int statusCode, String reason)335     private CompletableFuture<WebSocket> sendClose0(int statusCode,
336                                                     String reason) {
337         return transport.sendClose(statusCode, reason, this,
338                                    (r, e) -> processCloseError(e));
339     }
340 
processCloseError(Throwable e)341     private void processCloseError(Throwable e) {
342         if (e == null) {
343             debug.log("send close completed successfully");
344         } else {
345             debug.log("send close completed with error", e);
346         }
347         outputClosed.set(true);
348         try {
349             transport.closeOutput();
350         } catch (IOException ignored) { }
351     }
352 
353     @Override
request(long n)354     public void request(long n) {
355         if (debug.on()) {
356             debug.log("request %s", n);
357         }
358         if (demand.increase(n)) {
359             receiveScheduler.runOrSchedule();
360         }
361     }
362 
363     @Override
getSubprotocol()364     public String getSubprotocol() {
365         return subprotocol;
366     }
367 
368     @Override
isOutputClosed()369     public boolean isOutputClosed() {
370         return outputClosed.get();
371     }
372 
373     @Override
isInputClosed()374     public boolean isInputClosed() {
375         return inputClosed;
376     }
377 
378     @Override
abort()379     public void abort() {
380         if (debug.on()) {
381             debug.log("abort");
382         }
383         inputClosed = true;
384         outputClosed.set(true);
385         receiveScheduler.stop();
386         close();
387     }
388 
389     @Override
toString()390     public String toString() {
391         return super.toString()
392                 + "[uri=" + uri
393                 + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
394                 + "]";
395     }
396 
397     /*
398      * The assumptions about order is as follows:
399      *
400      *     - state is never changed more than twice inside the `run` method:
401      *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
402      *       overwriting parts of messages creating a mess since there's no
403      *       queueing)
404      *     - OPEN is always the first state
405      *     - no messages are requested/delivered before onOpen is called (this
406      *       is implemented by making WebSocket instance accessible first in
407      *       onOpen)
408      *     - after the state has been observed as CLOSE/ERROR, the scheduler
409      *       is stopped
410      */
411     private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
412 
413         // Transport only asked here and nowhere else because we must make sure
414         // onOpen is invoked first and no messages become pending before onOpen
415         // finishes
416 
417         @Override
run()418         public void run() {
419             if (debug.on()) {
420                 debug.log("enter receive task");
421             }
422             loop:
423             while (!receiveScheduler.isStopped()) {
424                 State s = state.get();
425                 if (debug.on()) {
426                     debug.log("receive state: %s", s);
427                 }
428                 try {
429                     switch (s) {
430                         case OPEN:
431                             processOpen();
432                             tryChangeState(OPEN, IDLE);
433                             break;
434                         case TEXT:
435                             processText();
436                             tryChangeState(TEXT, IDLE);
437                             break;
438                         case BINARY:
439                             processBinary();
440                             tryChangeState(BINARY, IDLE);
441                             break;
442                         case PING:
443                             processPing();
444                             tryChangeState(PING, IDLE);
445                             break;
446                         case PONG:
447                             processPong();
448                             tryChangeState(PONG, IDLE);
449                             break;
450                         case CLOSE:
451                             processClose();
452                             break loop;
453                         case ERROR:
454                             processError();
455                             break loop;
456                         case IDLE:
457                             if (demand.tryDecrement()
458                                     && tryChangeState(IDLE, WAITING)) {
459                                 transport.request(1);
460                             }
461                             break loop;
462                         case WAITING:
463                             // For debugging spurious signalling: when there was
464                             // a signal, but apparently nothing has changed
465                             break loop;
466                         default:
467                             throw new InternalError(String.valueOf(s));
468                     }
469                 } catch (Throwable t) {
470                     signalError(t);
471                 }
472             }
473             if (debug.on()) {
474                 debug.log("exit receive task");
475             }
476         }
477 
processError()478         private void processError() throws IOException {
479             if (debug.on()) {
480                 debug.log("processError");
481             }
482             transport.closeInput();
483             receiveScheduler.stop();
484             Throwable err = error.get();
485             if (err instanceof FailWebSocketException) {
486                 int code1 = ((FailWebSocketException) err).getStatusCode();
487                 err = new ProtocolException().initCause(err);
488                 if (debug.on()) {
489                     debug.log("failing %s with error=%s statusCode=%s",
490                               WebSocketImpl.this, err, code1);
491                 }
492                 sendCloseSilently(code1);
493             }
494             long id = 0;
495             if (debug.on()) {
496                 id = receiveCounter.incrementAndGet();
497                 debug.log("enter onError %s error=%s", id, err);
498             }
499             try {
500                 listener.onError(WebSocketImpl.this, err);
501             } finally {
502                 if (debug.on()) {
503                     debug.log("exit onError %s", id);
504                 }
505             }
506         }
507 
processClose()508         private void processClose() throws IOException {
509             debug.log("processClose");
510             transport.closeInput();
511             receiveScheduler.stop();
512             CompletionStage<?> cs = null; // when the listener is ready to close
513             long id = 0;
514             if (debug.on()) {
515                 id = receiveCounter.incrementAndGet();
516                 debug.log("enter onClose %s statusCode=%s reason.length=%s",
517                           id, statusCode, reason.length());
518             }
519             try {
520                 cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
521             } finally {
522                 debug.log("exit onClose %s returned %s", id, cs);
523             }
524             if (cs == null) {
525                 cs = DONE;
526             }
527             int code;
528             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
529                 code = NORMAL_CLOSURE;
530                 debug.log("using statusCode %s instead of %s",
531                           statusCode, code);
532 
533             } else {
534                 code = statusCode;
535             }
536             cs.whenComplete((r, e) -> {
537                 if (debug.on()) {
538                     debug.log("CompletionStage returned by onClose completed result=%s error=%s",
539                               r, e);
540                 }
541                 sendCloseSilently(code);
542             });
543         }
544 
processPong()545         private void processPong() {
546             long id = 0;
547             if (debug.on()) {
548                 id = receiveCounter.incrementAndGet();
549                 debug.log("enter onPong %s payload=%s",
550                           id, binaryData);
551             }
552             CompletionStage<?> cs = null;
553             try {
554                 cs = listener.onPong(WebSocketImpl.this, binaryData);
555             } finally {
556                 if (debug.on()) {
557                     debug.log("exit onPong %s returned %s", id, cs);
558                 }
559             }
560         }
561 
processPing()562         private void processPing() {
563             if (debug.on()) {
564                 debug.log("processPing");
565             }
566             // A full copy of this (small) data is made. This way sending a
567             // replying Pong could be done in parallel with the listener
568             // handling this Ping.
569             ByteBuffer slice = binaryData.slice();
570             if (!outputClosed.get()) {
571                 ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
572                         .put(binaryData)
573                         .flip();
574                 if (!trySwapAutomaticPong(copy)) {
575                     // Non-exclusive send;
576                     BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
577                         if (e != null) { // TODO: better error handing. What if already closed?
578                             signalError(Utils.getCompletionCause(e));
579                         }
580                     };
581                     transport.sendPong(WebSocketImpl.this::clearAutomaticPong,
582                                        WebSocketImpl.this,
583                                        reporter);
584                 }
585             }
586             long id = 0;
587             if (debug.on()) {
588                 id = receiveCounter.incrementAndGet();
589                 debug.log("enter onPing %s payload=%s", id, slice);
590             }
591             CompletionStage<?> cs = null;
592             try {
593                 cs = listener.onPing(WebSocketImpl.this, slice);
594             } finally {
595                 if (debug.on()) {
596                     debug.log("exit onPing %s returned %s", id, cs);
597                 }
598             }
599         }
600 
processBinary()601         private void processBinary() {
602             long id = 0;
603             if (debug.on()) {
604                 id = receiveCounter.incrementAndGet();
605                 debug.log("enter onBinary %s payload=%s last=%s",
606                           id, binaryData, last);
607             }
608             CompletionStage<?> cs = null;
609             try {
610                 cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
611             } finally {
612                 if (debug.on()) {
613                     debug.log("exit onBinary %s returned %s", id, cs);
614                 }
615             }
616         }
617 
processText()618         private void processText() {
619             long id = 0;
620             if (debug.on()) {
621                 id = receiveCounter.incrementAndGet();
622                 debug.log("enter onText %s payload.length=%s last=%s",
623                           id, text.length(), last);
624             }
625             CompletionStage<?> cs = null;
626             try {
627                 cs = listener.onText(WebSocketImpl.this, text, last);
628             } finally {
629                 if (debug.on()) {
630                     debug.log("exit onText %s returned %s", id, cs);
631                 }
632             }
633         }
634 
processOpen()635         private void processOpen() {
636             long id = 0;
637             if (debug.on()) {
638                 id = receiveCounter.incrementAndGet();
639                 debug.log("enter onOpen %s", id);
640             }
641             try {
642                 listener.onOpen(WebSocketImpl.this);
643             } finally {
644                 if (debug.on()) {
645                     debug.log("exit onOpen %s", id);
646                 }
647             }
648         }
649     }
650 
sendCloseSilently(int statusCode)651     private void sendCloseSilently(int statusCode) {
652         sendClose0(statusCode, "").whenComplete((r, e) -> {
653             if (e != null) {
654                 if (debug.on()) {
655                     debug.log("automatic closure completed with error",
656                               (Object) e);
657                 }
658             }
659         });
660     }
661 
clearAutomaticPong()662     private ByteBuffer clearAutomaticPong() {
663         ByteBuffer data;
664         do {
665             data = lastAutomaticPong.get();
666             if (data == null) {
667                 // This method must never be called unless a message that is
668                 // using it has been added previously
669                 throw new InternalError();
670             }
671         } while (!lastAutomaticPong.compareAndSet(data, null));
672         return data;
673     }
674 
675     // bound pings
trySwapAutomaticPong(ByteBuffer copy)676     private boolean trySwapAutomaticPong(ByteBuffer copy) {
677         ByteBuffer message;
678         boolean swapped;
679         while (true) {
680             message = lastAutomaticPong.get();
681             if (message == null) {
682                 if (!lastAutomaticPong.compareAndSet(null, copy)) {
683                     // It's only this method that can change null to ByteBuffer,
684                     // and this method is invoked at most by one thread at a
685                     // time. Thus no failure in the atomic operation above is
686                     // expected.
687                     throw new InternalError();
688                 }
689                 swapped = false;
690                 break;
691             } else if (lastAutomaticPong.compareAndSet(message, copy)) {
692                 swapped = true;
693                 break;
694             }
695         }
696         if (debug.on()) {
697             debug.log("swapped automatic pong from %s to %s",
698                       message, copy);
699         }
700         return swapped;
701     }
702 
signalOpen()703     private void signalOpen() {
704         debug.log("signalOpen");
705         receiveScheduler.runOrSchedule();
706     }
707 
signalError(Throwable error)708     private void signalError(Throwable error) {
709         if (debug.on()) {
710             debug.log("signalError %s", (Object) error);
711         }
712         inputClosed = true;
713         outputClosed.set(true);
714         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
715             if (debug.on()) {
716                 debug.log("signalError", error);
717             }
718             Log.logError(error);
719         } else {
720             close();
721         }
722     }
723 
close()724     private void close() {
725         if (debug.on()) {
726             debug.log("close");
727         }
728         Throwable first = null;
729         try {
730             transport.closeInput();
731         } catch (Throwable t1) {
732             first = t1;
733         } finally {
734             Throwable second = null;
735             try {
736                 transport.closeOutput();
737             } catch (Throwable t2) {
738                 second = t2;
739             } finally {
740                 Throwable e = null;
741                 if (first != null && second != null) {
742                     first.addSuppressed(second);
743                     e = first;
744                 } else if (first != null) {
745                     e = first;
746                 } else if (second != null) {
747                     e = second;
748                 }
749                 if (e != null) {
750                     if (debug.on()) {
751                         debug.log("exception in close", e);
752                     }
753                 }
754             }
755         }
756     }
757 
signalClose(int statusCode, String reason)758     private void signalClose(int statusCode, String reason) {
759         // FIXME: make sure no race reason & close are not intermixed
760         inputClosed = true;
761         this.statusCode = statusCode;
762         this.reason = reason;
763         boolean managed = trySetState(CLOSE);
764         if (debug.on()) {
765             debug.log("signalClose statusCode=%s reason.length=%s: %s",
766                       statusCode, reason.length(), managed);
767         }
768         if (managed) {
769             try {
770                 transport.closeInput();
771             } catch (Throwable t) {
772                 if (debug.on()) {
773                     debug.log("exception closing input", (Object) t);
774                 }
775             }
776         }
777     }
778 
779     private class SignallingMessageConsumer implements MessageStreamConsumer {
780 
781         @Override
onText(CharSequence data, boolean last)782         public void onText(CharSequence data, boolean last) {
783             transport.acknowledgeReception();
784             text = data;
785             WebSocketImpl.this.last = last;
786             tryChangeState(WAITING, TEXT);
787         }
788 
789         @Override
onBinary(ByteBuffer data, boolean last)790         public void onBinary(ByteBuffer data, boolean last) {
791             transport.acknowledgeReception();
792             binaryData = data;
793             WebSocketImpl.this.last = last;
794             tryChangeState(WAITING, BINARY);
795         }
796 
797         @Override
onPing(ByteBuffer data)798         public void onPing(ByteBuffer data) {
799             transport.acknowledgeReception();
800             binaryData = data;
801             tryChangeState(WAITING, PING);
802         }
803 
804         @Override
onPong(ByteBuffer data)805         public void onPong(ByteBuffer data) {
806             transport.acknowledgeReception();
807             binaryData = data;
808             tryChangeState(WAITING, PONG);
809         }
810 
811         @Override
onClose(int statusCode, CharSequence reason)812         public void onClose(int statusCode, CharSequence reason) {
813             transport.acknowledgeReception();
814             signalClose(statusCode, reason.toString());
815         }
816 
817         @Override
onComplete()818         public void onComplete() {
819             transport.acknowledgeReception();
820             signalClose(CLOSED_ABNORMALLY, "");
821         }
822 
823         @Override
onError(Throwable error)824         public void onError(Throwable error) {
825             signalError(error);
826         }
827     }
828 
trySetState(State newState)829     private boolean trySetState(State newState) {
830         State currentState;
831         boolean success = false;
832         while (true) {
833             currentState = state.get();
834             if (currentState == ERROR || currentState == CLOSE) {
835                 break;
836             } else if (state.compareAndSet(currentState, newState)) {
837                 receiveScheduler.runOrSchedule();
838                 success = true;
839                 break;
840             }
841         }
842         if (debug.on()) {
843             debug.log("set state %s (previous %s) %s",
844                       newState, currentState, success);
845         }
846         return success;
847     }
848 
tryChangeState(State expectedState, State newState)849     private boolean tryChangeState(State expectedState, State newState) {
850         State witness = state.compareAndExchange(expectedState, newState);
851         boolean success = false;
852         if (witness == expectedState) {
853             receiveScheduler.runOrSchedule();
854             success = true;
855         } else if (witness != ERROR && witness != CLOSE) {
856             // This should be the only reason for inability to change the state
857             // from IDLE to WAITING: the state has changed to terminal
858             throw new InternalError();
859         }
860         if (debug.on()) {
861             debug.log("change state from %s to %s %s",
862                       expectedState, newState, success);
863         }
864         return success;
865     }
866 
867     /* Exposed for testing purposes */
transport()868     protected Transport transport() {
869         return transport;
870     }
871 }
872