1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hdfs.web;
20 
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.net.HttpURLConnection;
24 import java.net.URL;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.StringTokenizer;
28 
29 import org.apache.commons.io.input.BoundedInputStream;
30 import org.apache.hadoop.fs.FSInputStream;
31 import org.apache.http.HttpStatus;
32 
33 import com.google.common.annotations.VisibleForTesting;
34 import com.google.common.net.HttpHeaders;
35 
36 /**
37  * To support HTTP byte streams, a new connection to an HTTP server needs to be
38  * created each time. This class hides the complexity of those multiple
39  * connections from the client. Whenever seek() is called, a new connection
40  * is made on the successive read(). The normal input stream functions are
41  * connected to the currently active input stream.
42  */
43 public abstract class ByteRangeInputStream extends FSInputStream {
44 
45   /**
46    * This class wraps a URL and provides method to open connection.
47    * It can be overridden to change how a connection is opened.
48    */
49   public static abstract class URLOpener {
50     protected URL url;
51 
URLOpener(URL u)52     public URLOpener(URL u) {
53       url = u;
54     }
55 
setURL(URL u)56     public void setURL(URL u) {
57       url = u;
58     }
59 
getURL()60     public URL getURL() {
61       return url;
62     }
63 
64     /** Connect to server with a data offset. */
connect(final long offset, final boolean resolved)65     protected abstract HttpURLConnection connect(final long offset,
66         final boolean resolved) throws IOException;
67   }
68 
69   enum StreamStatus {
70     NORMAL, SEEK, CLOSED
71   }
72   protected InputStream in;
73   protected final URLOpener originalURL;
74   protected final URLOpener resolvedURL;
75   protected long startPos = 0;
76   protected long currentPos = 0;
77   protected Long fileLength = null;
78 
79   StreamStatus status = StreamStatus.SEEK;
80 
81   /**
82    * Create with the specified URLOpeners. Original url is used to open the
83    * stream for the first time. Resolved url is used in subsequent requests.
84    * @param o Original url
85    * @param r Resolved url
86    */
ByteRangeInputStream(URLOpener o, URLOpener r)87   public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
88     this.originalURL = o;
89     this.resolvedURL = r;
90     getInputStream();
91   }
92 
getResolvedUrl(final HttpURLConnection connection )93   protected abstract URL getResolvedUrl(final HttpURLConnection connection
94       ) throws IOException;
95 
96   @VisibleForTesting
getInputStream()97   protected InputStream getInputStream() throws IOException {
98     switch (status) {
99       case NORMAL:
100         break;
101       case SEEK:
102         if (in != null) {
103           in.close();
104         }
105         in = openInputStream();
106         status = StreamStatus.NORMAL;
107         break;
108       case CLOSED:
109         throw new IOException("Stream closed");
110     }
111     return in;
112   }
113 
114   @VisibleForTesting
openInputStream()115   protected InputStream openInputStream() throws IOException {
116     // Use the original url if no resolved url exists, eg. if
117     // it's the first time a request is made.
118     final boolean resolved = resolvedURL.getURL() != null;
119     final URLOpener opener = resolved? resolvedURL: originalURL;
120 
121     final HttpURLConnection connection = opener.connect(startPos, resolved);
122     resolvedURL.setURL(getResolvedUrl(connection));
123 
124     InputStream in = connection.getInputStream();
125     final Map<String, List<String>> headers = connection.getHeaderFields();
126     if (isChunkedTransferEncoding(headers)) {
127       // file length is not known
128       fileLength = null;
129     } else {
130       // for non-chunked transfer-encoding, get content-length
131       long streamlength = getStreamLength(connection, headers);
132       fileLength = startPos + streamlength;
133 
134       // Java has a bug with >2GB request streams.  It won't bounds check
135       // the reads so the transfer blocks until the server times out
136       in = new BoundedInputStream(in, streamlength);
137     }
138 
139     return in;
140   }
141 
getStreamLength(HttpURLConnection connection, Map<String, List<String>> headers)142   private static long getStreamLength(HttpURLConnection connection,
143       Map<String, List<String>> headers) throws IOException {
144     String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
145     if (cl == null) {
146       // Try to get the content length by parsing the content range
147       // because HftpFileSystem does not return the content length
148       // if the content is partial.
149       if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) {
150         cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE);
151         return getLengthFromRange(cl);
152       } else {
153         throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
154             + headers);
155       }
156     }
157     return Long.parseLong(cl);
158   }
159 
getLengthFromRange(String cl)160   private static long getLengthFromRange(String cl) throws IOException {
161     try {
162 
163       String[] str = cl.substring(6).split("[-/]");
164       return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1;
165     } catch (Exception e) {
166       throw new IOException(
167           "failed to get content length by parsing the content range: " + cl
168               + " " + e.getMessage());
169     }
170   }
171 
isChunkedTransferEncoding( final Map<String, List<String>> headers)172   private static boolean isChunkedTransferEncoding(
173       final Map<String, List<String>> headers) {
174     return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
175         || contains(headers, HttpHeaders.TE, "chunked");
176   }
177 
178   /** Does the HTTP header map contain the given key, value pair? */
contains(final Map<String, List<String>> headers, final String key, final String value)179   private static boolean contains(final Map<String, List<String>> headers,
180       final String key, final String value) {
181     final List<String> values = headers.get(key);
182     if (values != null) {
183       for(String v : values) {
184         for(final StringTokenizer t = new StringTokenizer(v, ",");
185             t.hasMoreTokens(); ) {
186           if (value.equalsIgnoreCase(t.nextToken())) {
187             return true;
188           }
189         }
190       }
191     }
192     return false;
193   }
194 
update(final int n)195   private int update(final int n) throws IOException {
196     if (n != -1) {
197       currentPos += n;
198     } else if (fileLength != null && currentPos < fileLength) {
199       throw new IOException("Got EOF but currentPos = " + currentPos
200           + " < filelength = " + fileLength);
201     }
202     return n;
203   }
204 
205   @Override
read()206   public int read() throws IOException {
207     final int b = getInputStream().read();
208     update((b == -1) ? -1 : 1);
209     return b;
210   }
211 
212   @Override
read(byte b[], int off, int len)213   public int read(byte b[], int off, int len) throws IOException {
214     return update(getInputStream().read(b, off, len));
215   }
216 
217   /**
218    * Seek to the given offset from the start of the file.
219    * The next read() will be from that location.  Can't
220    * seek past the end of the file.
221    */
222   @Override
seek(long pos)223   public void seek(long pos) throws IOException {
224     if (pos != currentPos) {
225       startPos = pos;
226       currentPos = pos;
227       if (status != StreamStatus.CLOSED) {
228         status = StreamStatus.SEEK;
229       }
230     }
231   }
232 
233   /**
234    * Return the current offset from the start of the file
235    */
236   @Override
getPos()237   public long getPos() throws IOException {
238     return currentPos;
239   }
240 
241   /**
242    * Seeks a different copy of the data.  Returns true if
243    * found a new source, false otherwise.
244    */
245   @Override
seekToNewSource(long targetPos)246   public boolean seekToNewSource(long targetPos) throws IOException {
247     return false;
248   }
249 
250   @Override
close()251   public void close() throws IOException {
252     if (in != null) {
253       in.close();
254       in = null;
255     }
256     status = StreamStatus.CLOSED;
257   }
258 }
259