1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.net;
20 
21 import java.io.IOException;
22 import org.apache.hadoop.classification.InterfaceAudience;
23 import java.io.InputStream;
24 import java.net.Socket;
25 import java.net.SocketTimeoutException;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.FileChannel;
28 import java.nio.channels.ReadableByteChannel;
29 import java.nio.channels.SelectableChannel;
30 import java.nio.channels.SelectionKey;
31 
32 /**
33  * This implements an input stream that can have a timeout while reading.
34  * This sets non-blocking flag on the socket channel.
35  * So after create this object, read() on
36  * {@link Socket#getInputStream()} and write() on
37  * {@link Socket#getOutputStream()} for the associated socket will throw
38  * IllegalBlockingModeException.
39  * Please use {@link SocketOutputStream} for writing.
40  */
41 @InterfaceAudience.LimitedPrivate("HDFS")
42 public class SocketInputStream extends InputStream
43                                implements ReadableByteChannel {
44 
45   private Reader reader;
46 
47   private static class Reader extends SocketIOWithTimeout {
48     ReadableByteChannel channel;
49 
Reader(ReadableByteChannel channel, long timeout)50     Reader(ReadableByteChannel channel, long timeout) throws IOException {
51       super((SelectableChannel)channel, timeout);
52       this.channel = channel;
53     }
54 
55     @Override
performIO(ByteBuffer buf)56     int performIO(ByteBuffer buf) throws IOException {
57       return channel.read(buf);
58     }
59   }
60 
61   /**
62    * Create a new input stream with the given timeout. If the timeout
63    * is zero, it will be treated as infinite timeout. The socket's
64    * channel will be configured to be non-blocking.
65    *
66    * @param channel
67    *        Channel for reading, should also be a {@link SelectableChannel}.
68    *        The channel will be configured to be non-blocking.
69    * @param timeout timeout in milliseconds. must not be negative.
70    * @throws IOException
71    */
SocketInputStream(ReadableByteChannel channel, long timeout)72   public SocketInputStream(ReadableByteChannel channel, long timeout)
73                                                         throws IOException {
74     SocketIOWithTimeout.checkChannelValidity(channel);
75     reader = new Reader(channel, timeout);
76   }
77 
78   /**
79    * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
80    *
81    * Create a new input stream with the given timeout. If the timeout
82    * is zero, it will be treated as infinite timeout. The socket's
83    * channel will be configured to be non-blocking.
84    *
85    * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
86    *
87    * @param socket should have a channel associated with it.
88    * @param timeout timeout timeout in milliseconds. must not be negative.
89    * @throws IOException
90    */
SocketInputStream(Socket socket, long timeout)91   public SocketInputStream(Socket socket, long timeout)
92                                          throws IOException {
93     this(socket.getChannel(), timeout);
94   }
95 
96   /**
97    * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
98    * :<br><br>
99    *
100    * Create a new input stream with the given timeout. If the timeout
101    * is zero, it will be treated as infinite timeout. The socket's
102    * channel will be configured to be non-blocking.
103    * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
104    *
105    * @param socket should have a channel associated with it.
106    * @throws IOException
107    */
SocketInputStream(Socket socket)108   public SocketInputStream(Socket socket) throws IOException {
109     this(socket.getChannel(), socket.getSoTimeout());
110   }
111 
112   @Override
read()113   public int read() throws IOException {
114     /* Allocation can be removed if required.
115      * probably no need to optimize or encourage single byte read.
116      */
117     byte[] buf = new byte[1];
118     int ret = read(buf, 0, 1);
119     if (ret > 0) {
120       return (int)(buf[0] & 0xff);
121     }
122     if (ret != -1) {
123       // unexpected
124       throw new IOException("Could not read from stream");
125     }
126     return ret;
127   }
128 
129   @Override
read(byte[] b, int off, int len)130   public int read(byte[] b, int off, int len) throws IOException {
131     return read(ByteBuffer.wrap(b, off, len));
132   }
133 
134   @Override
close()135   public synchronized void close() throws IOException {
136     /* close the channel since Socket.getInputStream().close()
137      * closes the socket.
138      */
139     reader.channel.close();
140     reader.close();
141   }
142 
143   /**
144    * Returns underlying channel used by inputstream.
145    * This is useful in certain cases like channel for
146    * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
147    */
getChannel()148   public ReadableByteChannel getChannel() {
149     return reader.channel;
150   }
151 
152   //ReadableByteChannel interface
153 
154   @Override
isOpen()155   public boolean isOpen() {
156     return reader.isOpen();
157   }
158 
159   @Override
read(ByteBuffer dst)160   public int read(ByteBuffer dst) throws IOException {
161     return reader.doIO(dst, SelectionKey.OP_READ);
162   }
163 
164   /**
165    * waits for the underlying channel to be ready for reading.
166    * The timeout specified for this stream applies to this wait.
167    *
168    * @throws SocketTimeoutException
169    *         if select on the channel times out.
170    * @throws IOException
171    *         if any other I/O error occurs.
172    */
waitForReadable()173   public void waitForReadable() throws IOException {
174     reader.waitForIO(SelectionKey.OP_READ);
175   }
176 
setTimeout(long timeoutMs)177   public void setTimeout(long timeoutMs) {
178     reader.setTimeout(timeoutMs);
179   }
180 }
181