1 /*
2  * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /* @test
25  * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
26  *      8151582 8068693 8153209
27  * @run main/othervm AsyncCloseAndInterrupt
28  * @key intermittent
29  * @summary Comprehensive test of asynchronous closing and interruption
30  * @author Mark Reinhold
31  */
32 
33 import java.io.*;
34 import java.net.*;
35 import java.nio.channels.*;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.TimeUnit;
45 
46 public class AsyncCloseAndInterrupt {
47 
48     static PrintStream log = System.err;
49 
sleep(int ms)50     static void sleep(int ms) {
51         try {
52             Thread.sleep(ms);
53         } catch (InterruptedException x) { }
54     }
55 
56     // Wildcard address localized to this machine -- Windoze doesn't allow
57     // connecting to a server socket that was previously bound to a true
58     // wildcard, namely new InetSocketAddress((InetAddress)null, 0).
59     //
60     private static InetSocketAddress wildcardAddress;
61 
62 
63     // Server socket that blindly accepts all connections
64 
65     static ServerSocketChannel acceptor;
66 
initAcceptor()67     private static void initAcceptor() throws IOException {
68         acceptor = ServerSocketChannel.open();
69         acceptor.socket().bind(wildcardAddress);
70 
71         Thread th = new Thread("Acceptor") {
72                 public void run() {
73                     try {
74                         for (;;) {
75                             SocketChannel sc = acceptor.accept();
76                         }
77                     } catch (IOException x) {
78                         x.printStackTrace();
79                     }
80                 }
81             };
82 
83         th.setDaemon(true);
84         th.start();
85     }
86 
87 
88     // Server socket that refuses all connections
89 
90     static ServerSocketChannel refuser;
91 
initRefuser()92     private static void initRefuser() throws IOException {
93         refuser = ServerSocketChannel.open();
94         refuser.bind(wildcardAddress, 1);      // use minimum backlog
95     }
96 
97     // Dead pipe source and sink
98 
99     static Pipe.SourceChannel deadSource;
100     static Pipe.SinkChannel deadSink;
101 
initPipes()102     private static void initPipes() throws IOException {
103         if (deadSource != null)
104             deadSource.close();
105         deadSource = Pipe.open().source();
106         if (deadSink != null)
107             deadSink.close();
108         deadSink = Pipe.open().sink();
109     }
110 
111 
112     // Files
113 
114     private static File fifoFile = null; // File that blocks on reads and writes
115     private static File diskFile = null; // Disk file
116 
initFile()117     private static void initFile() throws Exception {
118 
119         diskFile = File.createTempFile("aci", ".tmp");
120         diskFile.deleteOnExit();
121         FileChannel fc = new FileOutputStream(diskFile).getChannel();
122         buffer.clear();
123         if (fc.write(buffer) != buffer.capacity())
124             throw new RuntimeException("Cannot create disk file");
125         fc.close();
126 
127         if (TestUtil.onWindows()) {
128             log.println("WARNING: Cannot completely test FileChannels on Windows");
129             return;
130         }
131         fifoFile = new File("x.fifo");
132         if (fifoFile.exists()) {
133             if (!fifoFile.delete())
134                 throw new IOException("Cannot delete existing fifo " + fifoFile);
135         }
136         Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
137         if (p.waitFor() != 0)
138             throw new IOException("Error creating fifo");
139         new RandomAccessFile(fifoFile, "rw").close();
140 
141     }
142 
143 
144     // Channel factories
145 
146     static abstract class ChannelFactory {
147         private final String name;
ChannelFactory(String name)148         ChannelFactory(String name) {
149             this.name = name;
150         }
toString()151         public String toString() {
152             return name;
153         }
create()154         abstract InterruptibleChannel create() throws IOException;
155     }
156 
157     static ChannelFactory socketChannelFactory
158         = new ChannelFactory("SocketChannel") {
159                 InterruptibleChannel create() throws IOException {
160                     return SocketChannel.open();
161                 }
162             };
163 
164     static ChannelFactory connectedSocketChannelFactory
165         = new ChannelFactory("SocketChannel") {
166                 InterruptibleChannel create() throws IOException {
167                     SocketAddress sa = acceptor.socket().getLocalSocketAddress();
168                     return SocketChannel.open(sa);
169                 }
170             };
171 
172     static ChannelFactory serverSocketChannelFactory
173         = new ChannelFactory("ServerSocketChannel") {
174                 InterruptibleChannel create() throws IOException {
175                     ServerSocketChannel ssc = ServerSocketChannel.open();
176                     ssc.socket().bind(wildcardAddress);
177                     return ssc;
178                 }
179             };
180 
181     static ChannelFactory datagramChannelFactory
182         = new ChannelFactory("DatagramChannel") {
183                 InterruptibleChannel create() throws IOException {
184                     DatagramChannel dc = DatagramChannel.open();
185                     InetAddress lb = InetAddress.getByName("127.0.0.1");
186                     dc.bind(new InetSocketAddress(lb, 0));
187                     dc.connect(new InetSocketAddress(lb, 80));
188                     return dc;
189                 }
190             };
191 
192     static ChannelFactory pipeSourceChannelFactory
193         = new ChannelFactory("Pipe.SourceChannel") {
194                 InterruptibleChannel create() throws IOException {
195                     // ## arrange to close sink
196                     return Pipe.open().source();
197                 }
198             };
199 
200     static ChannelFactory pipeSinkChannelFactory
201         = new ChannelFactory("Pipe.SinkChannel") {
202                 InterruptibleChannel create() throws IOException {
203                     // ## arrange to close source
204                     return Pipe.open().sink();
205                 }
206             };
207 
208     static ChannelFactory fifoFileChannelFactory
209         = new ChannelFactory("FileChannel") {
210                 InterruptibleChannel create() throws IOException {
211                     return new RandomAccessFile(fifoFile, "rw").getChannel();
212                 }
213             };
214 
215     static ChannelFactory diskFileChannelFactory
216         = new ChannelFactory("FileChannel") {
217                 InterruptibleChannel create() throws IOException {
218                     return new RandomAccessFile(diskFile, "rw").getChannel();
219                 }
220             };
221 
222 
223     // I/O operations
224 
225     static abstract class Op {
226         private final String name;
Op(String name)227         protected Op(String name) {
228             this.name = name;
229         }
doIO(InterruptibleChannel ich)230         abstract void doIO(InterruptibleChannel ich) throws IOException;
setup()231         void setup() throws IOException { }
toString()232         public String toString() { return name; }
233     }
234 
235     static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
236 
237     static ByteBuffer[] buffers = new ByteBuffer[] {
238         ByteBuffer.allocateDirect(1 << 19),
239         ByteBuffer.allocateDirect(1 << 19)
240     };
241 
clearBuffers()242     static void clearBuffers() {
243         buffers[0].clear();
244         buffers[1].clear();
245     }
246 
show(Channel ch)247     static void show(Channel ch) {
248         log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
249         if (ch.isOpen() && (ch instanceof SocketChannel)) {
250             SocketChannel sc = (SocketChannel)ch;
251             if (sc.socket().isInputShutdown())
252                 log.print(", input shutdown");
253             if (sc.socket().isOutputShutdown())
254                 log.print(", output shutdown");
255         }
256         log.println();
257     }
258 
259     static final Op READ = new Op("read") {
260             void doIO(InterruptibleChannel ich) throws IOException {
261                 ReadableByteChannel rbc = (ReadableByteChannel)ich;
262                 buffer.clear();
263                 int n = rbc.read(buffer);
264                 log.println("Read returned " + n);
265                 show(rbc);
266                 if     (rbc.isOpen()
267                         && (n == -1)
268                         && (rbc instanceof SocketChannel)
269                         && ((SocketChannel)rbc).socket().isInputShutdown()) {
270                     return;
271                 }
272                 throw new RuntimeException("Read succeeded");
273             }
274         };
275 
276     static final Op READV = new Op("readv") {
277             void doIO(InterruptibleChannel ich) throws IOException {
278                 ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
279                 clearBuffers();
280                 int n = (int)sbc.read(buffers);
281                 log.println("Read returned " + n);
282                 show(sbc);
283                 if     (sbc.isOpen()
284                         && (n == -1)
285                         && (sbc instanceof SocketChannel)
286                         && ((SocketChannel)sbc).socket().isInputShutdown()) {
287                     return;
288                 }
289                 throw new RuntimeException("Read succeeded");
290             }
291         };
292 
293     static final Op RECEIVE = new Op("receive") {
294             void doIO(InterruptibleChannel ich) throws IOException {
295                 DatagramChannel dc = (DatagramChannel)ich;
296                 buffer.clear();
297                 dc.receive(buffer);
298                 show(dc);
299                 throw new RuntimeException("Read succeeded");
300             }
301         };
302 
303     static final Op WRITE = new Op("write") {
304             void doIO(InterruptibleChannel ich) throws IOException {
305 
306                 WritableByteChannel wbc = (WritableByteChannel)ich;
307 
308                 SocketChannel sc = null;
309                 if (wbc instanceof SocketChannel)
310                     sc = (SocketChannel)wbc;
311 
312                 int n = 0;
313                 for (;;) {
314                     buffer.clear();
315                     int d = wbc.write(buffer);
316                     n += d;
317                     if (!wbc.isOpen())
318                         break;
319                     if ((sc != null) && sc.socket().isOutputShutdown())
320                         break;
321                 }
322                 log.println("Wrote " + n + " bytes");
323                 show(wbc);
324             }
325         };
326 
327     static final Op WRITEV = new Op("writev") {
328             void doIO(InterruptibleChannel ich) throws IOException {
329 
330                 GatheringByteChannel gbc = (GatheringByteChannel)ich;
331 
332                 SocketChannel sc = null;
333                 if (gbc instanceof SocketChannel)
334                     sc = (SocketChannel)gbc;
335 
336                 int n = 0;
337                 for (;;) {
338                     clearBuffers();
339                     int d = (int)gbc.write(buffers);
340                     n += d;
341                     if (!gbc.isOpen())
342                         break;
343                     if ((sc != null) && sc.socket().isOutputShutdown())
344                         break;
345                 }
346                 log.println("Wrote " + n + " bytes");
347                 show(gbc);
348 
349             }
350         };
351 
352     static final Op CONNECT = new Op("connect") {
353             void setup() {
354                 waitPump("connect waiting for pumping refuser ...");
355             }
356             void doIO(InterruptibleChannel ich) throws IOException {
357                 SocketChannel sc = (SocketChannel)ich;
358                 if (sc.connect(refuser.socket().getLocalSocketAddress()))
359                     throw new RuntimeException("Connection succeeded");
360                 throw new RuntimeException("Connection did not block");
361             }
362         };
363 
364     static final Op FINISH_CONNECT = new Op("finishConnect") {
365             void setup() {
366                 waitPump("finishConnect waiting for pumping refuser ...");
367             }
368             void doIO(InterruptibleChannel ich) throws IOException {
369                 SocketChannel sc = (SocketChannel)ich;
370                 sc.configureBlocking(false);
371                 SocketAddress sa = refuser.socket().getLocalSocketAddress();
372                 if (sc.connect(sa))
373                     throw new RuntimeException("Connection succeeded");
374                 sc.configureBlocking(true);
375                 if (sc.finishConnect())
376                     throw new RuntimeException("Connection succeeded");
377                 throw new RuntimeException("Connection did not block");
378             }
379         };
380 
381     static final Op ACCEPT = new Op("accept") {
382             void doIO(InterruptibleChannel ich) throws IOException {
383                 ServerSocketChannel ssc = (ServerSocketChannel)ich;
384                 ssc.accept();
385                 throw new RuntimeException("Accept succeeded");
386             }
387         };
388 
389     // Use only with diskFileChannelFactory
390     static final Op TRANSFER_TO = new Op("transferTo") {
391             void doIO(InterruptibleChannel ich) throws IOException {
392                 FileChannel fc = (FileChannel)ich;
393                 long n = fc.transferTo(0, fc.size(), deadSink);
394                 log.println("Transferred " + n + " bytes");
395                 show(fc);
396             }
397         };
398 
399     // Use only with diskFileChannelFactory
400     static final Op TRANSFER_FROM = new Op("transferFrom") {
401             void doIO(InterruptibleChannel ich) throws IOException {
402                 FileChannel fc = (FileChannel)ich;
403                 long n = fc.transferFrom(deadSource, 0, 1 << 20);
404                 log.println("Transferred " + n + " bytes");
405                 show(fc);
406             }
407         };
408 
409 
410 
411     // Test modes
412 
413     static final int TEST_PREINTR = 0;  // Interrupt thread before I/O
414     static final int TEST_INTR = 1;     // Interrupt thread during I/O
415     static final int TEST_CLOSE = 2;    // Close channel during I/O
416     static final int TEST_SHUTI = 3;    // Shutdown input during I/O
417     static final int TEST_SHUTO = 4;    // Shutdown output during I/O
418 
419     static final String[] testName = new String[] {
420         "pre-interrupt", "interrupt", "close",
421         "shutdown-input", "shutdown-output"
422     };
423 
424 
425     static class Tester extends TestThread {
426 
427         private InterruptibleChannel ch;
428         private Op op;
429         private int test;
430         volatile boolean ready = false;
431 
Tester(ChannelFactory cf, InterruptibleChannel ch, Op op, int test)432         protected Tester(ChannelFactory cf, InterruptibleChannel ch,
433                          Op op, int test)
434         {
435             super(cf + "/" + op + "/" + testName[test]);
436             this.ch = ch;
437             this.op = op;
438             this.test = test;
439         }
440 
441         @SuppressWarnings("fallthrough")
caught(Channel ch, IOException x)442         private void caught(Channel ch, IOException x) {
443             String xn = x.getClass().getName();
444             switch (test) {
445 
446             case TEST_PREINTR:
447             case TEST_INTR:
448                 if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
449                     throw new RuntimeException("Wrong exception thrown: " + x);
450                 break;
451 
452             case TEST_CLOSE:
453             case TEST_SHUTO:
454                 if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
455                     throw new RuntimeException("Wrong exception thrown: " + x);
456                 break;
457 
458             case TEST_SHUTI:
459                 if (TestUtil.onWindows())
460                     break;
461                 // FALL THROUGH
462 
463             default:
464                 throw new Error(x);
465             }
466 
467             if (ch.isOpen()) {
468                 if (test == TEST_SHUTO) {
469                     SocketChannel sc = (SocketChannel)ch;
470                     if (!sc.socket().isOutputShutdown())
471                         throw new RuntimeException("Output not shutdown");
472                 } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
473                     // Let this case pass -- CBIE applies to other channel
474                 } else {
475                     throw new RuntimeException("Channel still open");
476                 }
477             }
478 
479             log.println("Thrown as expected: " + x);
480         }
481 
go()482         final void go() throws Exception {
483             if (test == TEST_PREINTR)
484                 Thread.currentThread().interrupt();
485             ready = true;
486             try {
487                 op.doIO(ch);
488             } catch (ClosedByInterruptException x) {
489                 caught(ch, x);
490             } catch (AsynchronousCloseException x) {
491                 caught(ch, x);
492             } finally {
493                 ch.close();
494             }
495         }
496 
497     }
498 
499     private static volatile boolean pumpDone = false;
500     private static volatile boolean pumpReady = false;
501 
waitPump(String msg)502     private static void waitPump(String msg){
503         log.println(msg);
504         while (!pumpReady){
505             sleep(200);
506         }
507         log.println(msg + " done");
508     }
509 
510     // Create a pump thread dedicated to saturate refuser's connection backlog
pumpRefuser(ExecutorService pumperExecutor)511     private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
512 
513         Callable<Integer> pumpTask = new Callable<Integer>() {
514 
515             @Override
516             public Integer call() throws IOException {
517                 // Can't reliably saturate connection backlog on Windows Server editions
518                 assert !TestUtil.onWindows();
519                 log.println("Start pumping refuser ...");
520                 List<SocketChannel> refuserClients = new ArrayList<>();
521 
522                 // Saturate the refuser's connection backlog so that further connection
523                 // attempts will be blocked
524                 pumpReady = false;
525                 while (!pumpDone) {
526                     SocketChannel sc = SocketChannel.open();
527                     sc.configureBlocking(false);
528                     boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
529 
530                     // Assume that the connection backlog is saturated if a
531                     // client cannot connect to the refuser within 50 milliseconds
532                     long start = System.currentTimeMillis();
533                     while (!pumpReady && !connected
534                             && (System.currentTimeMillis() - start < 50)) {
535                         connected = sc.finishConnect();
536                     }
537 
538                     if (connected) {
539                         // Retain so that finalizer doesn't close
540                         refuserClients.add(sc);
541                     } else {
542                         sc.close();
543                         pumpReady = true;
544                     }
545                 }
546 
547                 for (SocketChannel sc : refuserClients) {
548                     sc.close();
549                 }
550                 refuser.close();
551 
552                 log.println("Stop pumping refuser ...");
553                 return refuserClients.size();
554             }
555         };
556 
557         return pumperExecutor.submit(pumpTask);
558     }
559 
560     // Test
test(ChannelFactory cf, Op op, int test)561     static void test(ChannelFactory cf, Op op, int test) throws Exception {
562         test(cf, op, test, true);
563     }
564 
test(ChannelFactory cf, Op op, int test, boolean extraSleep)565     static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)
566         throws Exception
567     {
568         log.println();
569         initPipes();
570         InterruptibleChannel ch = cf.create();
571         Tester t = new Tester(cf, ch, op, test);
572         log.println(t);
573         op.setup();
574         t.start();
575         do {
576             sleep(50);
577         } while (!t.ready);
578 
579         if (extraSleep) {
580             sleep(100);
581         }
582 
583         switch (test) {
584 
585         case TEST_INTR:
586             t.interrupt();
587             break;
588 
589         case TEST_CLOSE:
590             ch.close();
591             break;
592 
593         case TEST_SHUTI:
594             if (TestUtil.onWindows()) {
595                 log.println("WARNING: Asynchronous shutdown not working on Windows");
596                 ch.close();
597             } else {
598                 ((SocketChannel)ch).socket().shutdownInput();
599             }
600             break;
601 
602         case TEST_SHUTO:
603             if (TestUtil.onWindows()) {
604                 log.println("WARNING: Asynchronous shutdown not working on Windows");
605                 ch.close();
606             } else {
607                 ((SocketChannel)ch).socket().shutdownOutput();
608             }
609             break;
610 
611         default:
612             break;
613         }
614 
615         t.finishAndThrow(10000);
616     }
617 
test(ChannelFactory cf, Op op)618     static void test(ChannelFactory cf, Op op) throws Exception {
619         test(cf, op, true);
620     }
621 
test(ChannelFactory cf, Op op, boolean extraSleep)622     static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {
623         // Test INTR cases before PREINTER cases since sometimes
624         // interrupted threads can't load classes
625         test(cf, op, TEST_INTR, extraSleep);
626         test(cf, op, TEST_PREINTR, extraSleep);
627 
628         // Bugs, see FileChannelImpl for details
629         if (op == TRANSFER_FROM) {
630             log.println("WARNING: transferFrom/close not tested");
631             return;
632         }
633         if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
634             log.println("WARNING: transferTo/close not tested");
635             return;
636         }
637 
638         test(cf, op, TEST_CLOSE, extraSleep);
639     }
640 
test(ChannelFactory cf)641     static void test(ChannelFactory cf)
642         throws Exception
643     {
644         InterruptibleChannel ch = cf.create(); // Sample channel
645         ch.close();
646 
647         if (ch instanceof ReadableByteChannel) {
648             test(cf, READ);
649             if (ch instanceof SocketChannel)
650                 test(cf, READ, TEST_SHUTI);
651         }
652 
653         if (ch instanceof ScatteringByteChannel) {
654             test(cf, READV);
655             if (ch instanceof SocketChannel)
656                 test(cf, READV, TEST_SHUTI);
657         }
658 
659         if (ch instanceof DatagramChannel) {
660             test(cf, RECEIVE);
661 
662             // Return here: We can't effectively test writes since, if they
663             // block, they do so only for a fleeting moment unless the network
664             // interface is overloaded.
665             return;
666 
667         }
668 
669         if (ch instanceof WritableByteChannel) {
670             test(cf, WRITE);
671             if (ch instanceof SocketChannel)
672                 test(cf, WRITE, TEST_SHUTO);
673         }
674 
675         if (ch instanceof GatheringByteChannel) {
676             test(cf, WRITEV);
677             if (ch instanceof SocketChannel)
678                 test(cf, WRITEV, TEST_SHUTO);
679         }
680 
681     }
682 
main(String[] args)683     public static void main(String[] args) throws Exception {
684 
685         wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
686         initAcceptor();
687         if (!TestUtil.onWindows())
688             initRefuser();
689         initPipes();
690         initFile();
691 
692         if (TestUtil.onWindows()) {
693             log.println("WARNING: Cannot test FileChannel transfer operations"
694                         + " on Windows");
695         } else {
696             test(diskFileChannelFactory, TRANSFER_TO);
697             test(diskFileChannelFactory, TRANSFER_FROM);
698         }
699         if (fifoFile != null)
700             test(fifoFileChannelFactory);
701 
702         // Testing positional file reads and writes is impractical: It requires
703         // access to a large file soft-mounted via NFS, and even then isn't
704         // completely guaranteed to work.
705         //
706         // Testing map is impractical and arguably unnecessary: It's
707         // unclear under what conditions mmap(2) will actually block.
708 
709         test(connectedSocketChannelFactory);
710 
711         if (TestUtil.onWindows() || TestUtil.onSolaris() || TestUtil.onBSD()) {
712             log.println("WARNING Cannot reliably test connect/finishConnect"
713                 + " operations on this platform");
714         } else {
715             // Only the following tests need refuser's connection backlog
716             // to be saturated
717             ExecutorService pumperExecutor =
718                     Executors.newSingleThreadExecutor(
719                     new ThreadFactory() {
720 
721                         @Override
722                         public Thread newThread(Runnable r) {
723                             Thread t = new Thread(r);
724                             t.setDaemon(true);
725                             t.setName("Pumper");
726                             return t;
727                         }
728                     });
729 
730             pumpDone = false;
731             try {
732                 Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
733                 waitPump("\nWait for initial Pump");
734 
735                 test(socketChannelFactory, CONNECT, false);
736                 test(socketChannelFactory, FINISH_CONNECT, false);
737 
738                 pumpDone = true;
739                 Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
740                 log.println("Pump " + newConn + " connections.");
741             } finally {
742                 pumperExecutor.shutdown();
743             }
744         }
745 
746         test(serverSocketChannelFactory, ACCEPT);
747         test(datagramChannelFactory);
748         test(pipeSourceChannelFactory);
749         test(pipeSinkChannelFactory);
750     }
751 }
752