1 /*
2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.websocket;
27 
28 import jdk.internal.net.http.common.Demand;
29 import jdk.internal.net.http.common.Logger;
30 import jdk.internal.net.http.common.MinimalFuture;
31 import jdk.internal.net.http.common.SequentialScheduler;
32 import jdk.internal.net.http.common.SequentialScheduler.CompleteRestartableTask;
33 import jdk.internal.net.http.common.Utils;
34 
35 import java.io.IOException;
36 import java.lang.System.Logger.Level;
37 import java.nio.ByteBuffer;
38 import java.nio.CharBuffer;
39 import java.nio.channels.SelectionKey;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.BiConsumer;
44 import java.util.function.Supplier;
45 
46 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
47 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.CLOSED;
48 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
49 import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
50 
51 public class TransportImpl implements Transport {
52 
53     // -- Debugging infrastructure --
54 
55     private static final Logger debug =
56             Utils.getWebSocketLogger("[Transport]"::toString, Utils.DEBUG_WS);
57 
58     /* Used for correlating enters to and exists from a method */
59     private final AtomicLong counter = new AtomicLong();
60 
61     private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
62 
63     private final MessageQueue queue;
64     private final MessageEncoder encoder = new MessageEncoder();
65     /* A reusable buffer for writing, initially with no remaining bytes */
66     private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
67     /* This array is created once for gathering writes accepted by RawChannel */
68     private final ByteBuffer[] dstArray = new ByteBuffer[]{dst};
69     private final MessageStreamConsumer messageConsumer;
70     private final MessageDecoder decoder;
71     private final Frame.Reader reader = new Frame.Reader();
72 
73     private final Demand demand = new Demand();
74     private final SequentialScheduler receiveScheduler;
75     private final RawChannel channel;
76     private final Object closeLock = new Object();
77     private final RawChannel.RawEvent writeEvent = new WriteEvent();
78     private final RawChannel.RawEvent readEvent = new ReadEvent();
79     private final AtomicReference<ChannelState> writeState
80             = new AtomicReference<>(UNREGISTERED);
81     private ByteBuffer data;
82     private volatile ChannelState readState = UNREGISTERED;
83     private boolean inputClosed;
84     private boolean outputClosed;
85 
TransportImpl(MessageQueue queue, MessageStreamConsumer consumer, RawChannel channel)86     public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer,
87                          RawChannel channel) {
88         this.queue = queue;
89         this.messageConsumer = consumer;
90         this.channel = channel;
91         this.decoder = new MessageDecoder(this.messageConsumer);
92         this.data = channel.initialByteBuffer();
93         // To ensure the initial non-final `data` will be visible
94         // (happens-before) when `readEvent.handle()` invokes `receiveScheduler`
95         // the following assignment is done last:
96         receiveScheduler = new SequentialScheduler(new ReceiveTask());
97     }
98 
createWriteBuffer()99     private ByteBuffer createWriteBuffer() {
100         String name = "jdk.httpclient.websocket.writeBufferSize";
101         int capacity = Utils.getIntegerNetProperty(name, 16384);
102         if (debug.on()) {
103             debug.log("write buffer capacity %s", capacity);
104         }
105 
106         // TODO (optimization?): allocateDirect if SSL?
107         return ByteBuffer.allocate(capacity);
108     }
109 
write()110     private boolean write() throws IOException {
111         if (debug.on()) {
112             debug.log("writing to the channel");
113         }
114         long count = channel.write(dstArray, 0, dstArray.length);
115         if (debug.on()) {
116             debug.log("%s bytes written", count);
117         }
118         for (ByteBuffer b : dstArray) {
119             if (b.hasRemaining()) {
120                 return false;
121             }
122         }
123         return true;
124     }
125 
126     @Override
sendText(CharSequence message, boolean isLast, T attachment, BiConsumer<? super T, ? super Throwable> action)127     public <T> CompletableFuture<T> sendText(CharSequence message,
128                                              boolean isLast,
129                                              T attachment,
130                                              BiConsumer<? super T, ? super Throwable> action) {
131         long id = 0;
132         if (debug.on()) {
133             id = counter.incrementAndGet();
134             debug.log("enter send text %s message.length=%s last=%s",
135                               id, message.length(), isLast);
136         }
137         // TODO (optimization?):
138         // These sendXXX methods might be a good place to decide whether or not
139         // we can write straight ahead, possibly returning null instead of
140         // creating a CompletableFuture
141 
142         // Even if the text is already CharBuffer, the client will not be happy
143         // if they discover the position is changing. So, no instanceof
144         // cheating, wrap always.
145         CharBuffer text = CharBuffer.wrap(message);
146         MinimalFuture<T> f = new MinimalFuture<>();
147         try {
148             queue.addText(text, isLast, attachment, action, f);
149             sendScheduler.runOrSchedule();
150         } catch (IOException e) {
151             action.accept(null, e);
152             f.completeExceptionally(e);
153         }
154         if (debug.on()) {
155             debug.log("exit send text %s returned %s", id, f);
156         }
157         return f;
158     }
159 
160     @Override
sendBinary(ByteBuffer message, boolean isLast, T attachment, BiConsumer<? super T, ? super Throwable> action)161     public <T> CompletableFuture<T> sendBinary(ByteBuffer message,
162                                                boolean isLast,
163                                                T attachment,
164                                                BiConsumer<? super T, ? super Throwable> action) {
165         long id = 0;
166         if (debug.on()) {
167             id = counter.incrementAndGet();
168             debug.log("enter send binary %s message.remaining=%s last=%s",
169                               id, message.remaining(), isLast);
170         }
171         MinimalFuture<T> f = new MinimalFuture<>();
172         try {
173             queue.addBinary(message, isLast, attachment, action, f);
174             sendScheduler.runOrSchedule();
175         } catch (IOException e) {
176             action.accept(null, e);
177             f.completeExceptionally(e);
178         }
179         if (debug.on()) {
180             debug.log("exit send binary %s returned %s", id, f);
181         }
182         return f;
183     }
184 
185     @Override
sendPing(ByteBuffer message, T attachment, BiConsumer<? super T, ? super Throwable> action)186     public <T> CompletableFuture<T> sendPing(ByteBuffer message,
187                                              T attachment,
188                                              BiConsumer<? super T, ? super Throwable> action) {
189         long id = 0;
190         if (debug.on()) {
191             id = counter.incrementAndGet();
192             debug.log("enter send ping %s message.remaining=%s",
193                               id, message.remaining());
194         }
195         MinimalFuture<T> f = new MinimalFuture<>();
196         try {
197             queue.addPing(message, attachment, action, f);
198             sendScheduler.runOrSchedule();
199         } catch (IOException e) {
200             action.accept(null, e);
201             f.completeExceptionally(e);
202         }
203         if (debug.on()) {
204             debug.log("exit send ping %s returned %s", id, f);
205         }
206         return f;
207     }
208 
209     @Override
sendPong(ByteBuffer message, T attachment, BiConsumer<? super T, ? super Throwable> action)210     public <T> CompletableFuture<T> sendPong(ByteBuffer message,
211                                              T attachment,
212                                              BiConsumer<? super T, ? super Throwable> action) {
213         long id = 0;
214         if (debug.on()) {
215             id = counter.incrementAndGet();
216             debug.log("enter send pong %s message.remaining=%s",
217                               id, message.remaining());
218         }
219         MinimalFuture<T> f = new MinimalFuture<>();
220         try {
221             queue.addPong(message, attachment, action, f);
222             sendScheduler.runOrSchedule();
223         } catch (IOException e) {
224             action.accept(null, e);
225             f.completeExceptionally(e);
226         }
227         if (debug.on()) {
228             debug.log("exit send pong %s returned %s", id, f);
229         }
230         return f;
231     }
232 
233     @Override
sendPong(Supplier<? extends ByteBuffer> message, T attachment, BiConsumer<? super T, ? super Throwable> action)234     public <T> CompletableFuture<T> sendPong(Supplier<? extends ByteBuffer> message,
235                                              T attachment,
236                                              BiConsumer<? super T, ? super Throwable> action) {
237         long id = 0;
238         if (debug.on()) {
239             id = counter.incrementAndGet();
240             debug.log("enter send pong %s supplier=%s",
241                       id, message);
242         }
243         MinimalFuture<T> f = new MinimalFuture<>();
244         try {
245             queue.addPong(message, attachment, action, f);
246             sendScheduler.runOrSchedule();
247         } catch (IOException e) {
248             action.accept(null, e);
249             f.completeExceptionally(e);
250         }
251         if (debug.on()) {
252             debug.log("exit send pong %s returned %s", id, f);
253         }
254         return f;
255     }
256 
257     @Override
sendClose(int statusCode, String reason, T attachment, BiConsumer<? super T, ? super Throwable> action)258     public <T> CompletableFuture<T> sendClose(int statusCode,
259                                               String reason,
260                                               T attachment,
261                                               BiConsumer<? super T, ? super Throwable> action) {
262         long id = 0;
263         if (debug.on()) {
264             id = counter.incrementAndGet();
265             debug.log("enter send close %s statusCode=%s reason.length=%s",
266                               id, statusCode, reason.length());
267         }
268         MinimalFuture<T> f = new MinimalFuture<>();
269         try {
270             queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
271             sendScheduler.runOrSchedule();
272         } catch (IOException e) {
273             action.accept(null, e);
274             f.completeExceptionally(e);
275         }
276         if (debug.on()) {
277             debug.log("exit send close %s returned %s", id, f);
278         }
279         return f;
280     }
281 
282     @Override
request(long n)283     public void request(long n) {
284         if (debug.on()) {
285             debug.log("request %s", n);
286         }
287         if (demand.increase(n)) {
288             receiveScheduler.runOrSchedule();
289         }
290     }
291 
292     @Override
acknowledgeReception()293     public void acknowledgeReception() {
294         boolean decremented = demand.tryDecrement();
295         if (!decremented) {
296             throw new InternalError();
297         }
298     }
299 
300     @Override
closeOutput()301     public void closeOutput() throws IOException {
302         if (debug.on()) {
303             debug.log("closeOutput");
304         }
305         synchronized (closeLock) {
306             if (!outputClosed) {
307                 outputClosed = true;
308                 try {
309                     channel.shutdownOutput();
310                 } finally {
311                     if (inputClosed) {
312                         channel.close();
313                     }
314                 }
315             }
316         }
317         writeState.set(CLOSED);
318         sendScheduler.runOrSchedule();
319     }
320 
321     /*
322      * Permanently stops reading from the channel and delivering messages
323      * regardless of the current demand and data availability.
324      */
325     @Override
closeInput()326     public void closeInput() throws IOException {
327         if (debug.on()) {
328             debug.log("closeInput");
329         }
330         synchronized (closeLock) {
331             if (!inputClosed) {
332                 inputClosed = true;
333                 try {
334                     receiveScheduler.stop();
335                     channel.shutdownInput();
336                 } finally {
337                     if (outputClosed) {
338                         channel.close();
339                     }
340                 }
341             }
342         }
343     }
344 
345     /* Common states for send and receive tasks */
346     enum ChannelState {
347         UNREGISTERED,
348         AVAILABLE,
349         WAITING,
350         CLOSED,
351     }
352 
353     @SuppressWarnings({"rawtypes"})
354     private class SendTask extends CompleteRestartableTask {
355 
356         private final MessageQueue.QueueCallback<Boolean, IOException>
357                 encodingCallback = new MessageQueue.QueueCallback<>() {
358 
359             @Override
360             public <T> Boolean onText(CharBuffer message,
361                                       boolean isLast,
362                                       T attachment,
363                                       BiConsumer<? super T, ? super Throwable> action,
364                                       CompletableFuture<? super T> future) throws IOException
365             {
366                 return encoder.encodeText(message, isLast, dst);
367             }
368 
369             @Override
370             public <T> Boolean onBinary(ByteBuffer message,
371                                         boolean isLast,
372                                         T attachment,
373                                         BiConsumer<? super T, ? super Throwable> action,
374                                         CompletableFuture<? super T> future) throws IOException
375             {
376                 return encoder.encodeBinary(message, isLast, dst);
377             }
378 
379             @Override
380             public <T> Boolean onPing(ByteBuffer message,
381                                       T attachment,
382                                       BiConsumer<? super T, ? super Throwable> action,
383                                       CompletableFuture<? super T> future) throws IOException
384             {
385                 return encoder.encodePing(message, dst);
386             }
387 
388             @Override
389             public <T> Boolean onPong(ByteBuffer message,
390                                       T attachment,
391                                       BiConsumer<? super T, ? super Throwable> action,
392                                       CompletableFuture<? super T> future) throws IOException
393             {
394                 return encoder.encodePong(message, dst);
395             }
396 
397             @Override
398             public <T> Boolean onPong(Supplier<? extends ByteBuffer> message,
399                                       T attachment,
400                                       BiConsumer<? super T, ? super Throwable> action,
401                                       CompletableFuture<? super T> future) throws IOException {
402                 return encoder.encodePong(message.get(), dst);
403             }
404 
405             @Override
406             public <T> Boolean onClose(int statusCode,
407                                        CharBuffer reason,
408                                        T attachment,
409                                        BiConsumer<? super T, ? super Throwable> action,
410                                        CompletableFuture<? super T> future) throws IOException
411             {
412                 return encoder.encodeClose(statusCode, reason, dst);
413             }
414 
415             @Override
416             public Boolean onEmpty() {
417                 return false;
418             }
419         };
420 
421         /* Whether the task sees the current head message for first time */
422         private boolean firstPass = true;
423         /* Whether the message has been fully encoded */
424         private boolean encoded;
425 
426         // -- Current message completion communication fields --
427 
428         private Object attachment;
429         private BiConsumer action;
430         private CompletableFuture future;
431         private final MessageQueue.QueueCallback<Boolean, RuntimeException>
432                 /* If there is a message, loads its completion communication fields */
433                 loadCallback = new MessageQueue.QueueCallback<Boolean, RuntimeException>() {
434 
435             @Override
436             public <T> Boolean onText(CharBuffer message,
437                                       boolean isLast,
438                                       T attachment,
439                                       BiConsumer<? super T, ? super Throwable> action,
440                                       CompletableFuture<? super T> future)
441             {
442                 SendTask.this.attachment = attachment;
443                 SendTask.this.action = action;
444                 SendTask.this.future = future;
445                 return true;
446             }
447 
448             @Override
449             public <T> Boolean onBinary(ByteBuffer message,
450                                         boolean isLast,
451                                         T attachment,
452                                         BiConsumer<? super T, ? super Throwable> action,
453                                         CompletableFuture<? super T> future)
454             {
455                 SendTask.this.attachment = attachment;
456                 SendTask.this.action = action;
457                 SendTask.this.future = future;
458                 return true;
459             }
460 
461             @Override
462             public <T> Boolean onPing(ByteBuffer message,
463                                       T attachment,
464                                       BiConsumer<? super T, ? super Throwable> action,
465                                       CompletableFuture<? super T> future)
466             {
467                 SendTask.this.attachment = attachment;
468                 SendTask.this.action = action;
469                 SendTask.this.future = future;
470                 return true;
471             }
472 
473             @Override
474             public <T> Boolean onPong(ByteBuffer message,
475                                       T attachment,
476                                       BiConsumer<? super T, ? super Throwable> action,
477                                       CompletableFuture<? super T> future)
478             {
479                 SendTask.this.attachment = attachment;
480                 SendTask.this.action = action;
481                 SendTask.this.future = future;
482                 return true;
483             }
484 
485             @Override
486             public <T> Boolean onPong(Supplier<? extends ByteBuffer> message,
487                                       T attachment,
488                                       BiConsumer<? super T, ? super Throwable> action,
489                                       CompletableFuture<? super T> future)
490             {
491                 SendTask.this.attachment = attachment;
492                 SendTask.this.action = action;
493                 SendTask.this.future = future;
494                 return true;
495             }
496 
497             @Override
498             public <T> Boolean onClose(int statusCode,
499                                        CharBuffer reason,
500                                        T attachment,
501                                        BiConsumer<? super T, ? super Throwable> action,
502                                        CompletableFuture<? super T> future)
503             {
504                 SendTask.this.attachment = attachment;
505                 SendTask.this.action = action;
506                 SendTask.this.future = future;
507                 return true;
508             }
509 
510             @Override
511             public Boolean onEmpty() {
512                 return false;
513             }
514         };
515 
516         @Override
run()517         public void run() {
518             // Could have been only called in one of the following cases:
519             //   (a) A message has been added to the queue
520             //   (b) The channel is ready for writing
521             if (debug.on()) {
522                 debug.log("enter send task");
523             }
524             while (!queue.isEmpty()) {
525                 try {
526                     if (dst.hasRemaining()) {
527                         if (debug.on()) {
528                             debug.log("%s bytes remaining in buffer %s",
529                                       dst.remaining(), dst);
530                         }
531                         // The previous part of the binary representation of the
532                         // message hasn't been fully written
533                         if (!tryCompleteWrite()) {
534                             break;
535                         }
536                     } else if (!encoded) {
537                         if (firstPass) {
538                             firstPass = false;
539                             queue.peek(loadCallback);
540                             if (debug.on()) {
541                                 debug.log("load message");
542                             }
543                         }
544                         dst.clear();
545                         encoded = queue.peek(encodingCallback);
546                         dst.flip();
547                         if (!tryCompleteWrite()) {
548                             break;
549                         }
550                     } else {
551                         // All done, remove and complete
552                         encoder.reset();
553                         removeAndComplete(null);
554                     }
555                 } catch (Throwable t) {
556                     if (debug.on()) {
557                         debug.log("send task exception %s", (Object) t);
558                     }
559                     // buffer cleanup: if there is an exception, the buffer
560                     // should appear empty for the next write as there is
561                     // nothing to write
562                     dst.position(dst.limit());
563                     encoder.reset();
564                     removeAndComplete(t);
565                 }
566             }
567             if (debug.on()) {
568                 debug.log("exit send task");
569             }
570         }
571 
tryCompleteWrite()572         private boolean tryCompleteWrite() throws IOException {
573             if (debug.on()) {
574                 debug.log("enter writing");
575             }
576             boolean finished = false;
577             loop:
578             while (true) {
579                 final ChannelState ws = writeState.get();
580                 if (debug.on()) {
581                     debug.log("write state: %s", ws);
582                 }
583                 switch (ws) {
584                     case WAITING:
585                         break loop;
586                     case UNREGISTERED:
587                         if (debug.on()) {
588                             debug.log("registering write event");
589                         }
590                         channel.registerEvent(writeEvent);
591                         writeState.compareAndSet(UNREGISTERED, WAITING);
592                         if (debug.on()) {
593                             debug.log("registered write event");
594                         }
595                         break loop;
596                     case AVAILABLE:
597                         boolean written = write();
598                         if (written) {
599                             if (debug.on()) {
600                                 debug.log("finished writing to the channel");
601                             }
602                             finished = true;
603                             break loop;   // All done
604                         } else {
605                             writeState.compareAndSet(AVAILABLE, UNREGISTERED);
606                             continue loop; //  Effectively "goto UNREGISTERED"
607                         }
608                     case CLOSED:
609                         throw new IOException("Output closed");
610                     default:
611                         throw new InternalError(String.valueOf(ws));
612                 }
613             }
614             if (debug.on()) {
615                 debug.log("exit writing");
616             }
617             return finished;
618         }
619 
620         @SuppressWarnings("unchecked")
removeAndComplete(Throwable error)621         private void removeAndComplete(Throwable error) {
622             if (debug.on()) {
623                 debug.log("removeAndComplete error=%s", (Object) error);
624             }
625             queue.remove();
626             if (error != null) {
627                 try {
628                     action.accept(null, error);
629                 } finally {
630                     future.completeExceptionally(error);
631                 }
632             } else {
633                 try {
634                     action.accept(attachment, null);
635                 } finally {
636                     future.complete(attachment);
637                 }
638             }
639             encoded = false;
640             firstPass = true;
641             attachment = null;
642             action = null;
643             future = null;
644         }
645     }
646 
647     private class ReceiveTask extends CompleteRestartableTask {
648 
649         @Override
run()650         public void run() {
651             if (debug.on()) {
652                 debug.log("enter receive task");
653             }
654             loop:
655             while (!receiveScheduler.isStopped()) {
656                 ChannelState rs = readState;
657                 if (data.hasRemaining()) {
658                     if (debug.on()) {
659                         debug.log("remaining bytes received %s",
660                                   data.remaining());
661                     }
662                     if (!demand.isFulfilled()) {
663                         try {
664                             int oldPos = data.position();
665                             reader.readFrame(data, decoder);
666                             int newPos = data.position();
667                             // Reader always consumes bytes:
668                             assert oldPos != newPos : data;
669                         } catch (Throwable e) {
670                             receiveScheduler.stop();
671                             messageConsumer.onError(e);
672                         }
673                         if (!data.hasRemaining()) {
674                             rs = readState = UNREGISTERED;
675                         }
676                         continue;
677                     }
678                     break loop;
679                 }
680                 if (debug.on()) {
681                     debug.log("receive state: %s", rs);
682                 }
683                 switch (rs) {
684                     case WAITING:
685                         break loop;
686                     case UNREGISTERED:
687                         try {
688                             rs = readState = WAITING;
689                             channel.registerEvent(readEvent);
690                         } catch (Throwable e) {
691                             receiveScheduler.stop();
692                             messageConsumer.onError(e);
693                         }
694                         break loop;
695                     case AVAILABLE:
696                         try {
697                             data = channel.read();
698                         } catch (Throwable e) {
699                             receiveScheduler.stop();
700                             messageConsumer.onError(e);
701                             break loop;
702                         }
703                         if (data == null) { // EOF
704                             receiveScheduler.stop();
705                             messageConsumer.onComplete();
706                             break loop;
707                         } else if (!data.hasRemaining()) {
708                             // No data at the moment. Pretty much a "goto",
709                             // reusing the existing code path for registration
710                             rs = readState = UNREGISTERED;
711                         }
712                         continue loop;
713                     default:
714                         throw new InternalError(String.valueOf(rs));
715                 }
716             }
717             if (debug.on()) {
718                 debug.log("exit receive task");
719             }
720         }
721     }
722 
723     private class WriteEvent implements RawChannel.RawEvent {
724 
725         @Override
interestOps()726         public int interestOps() {
727             return SelectionKey.OP_WRITE;
728         }
729 
730         @Override
handle()731         public void handle() {
732             if (debug.on()) {
733                 debug.log("write event");
734             }
735             ChannelState s;
736             do {
737                 s = writeState.get();
738                 if (s == CLOSED) {
739                     if (debug.on()) {
740                         debug.log("write state %s", s);
741                     }
742                     break;
743                 }
744             } while (!writeState.compareAndSet(s, AVAILABLE));
745             sendScheduler.runOrSchedule();
746         }
747     }
748 
749     private class ReadEvent implements RawChannel.RawEvent {
750 
751         @Override
interestOps()752         public int interestOps() {
753             return SelectionKey.OP_READ;
754         }
755 
756         @Override
handle()757         public void handle() {
758             if (debug.on()) {
759                 debug.log("read event");
760             }
761             readState = AVAILABLE;
762             receiveScheduler.runOrSchedule();
763         }
764     }
765 }
766