1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hdfs.web; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.net.HttpURLConnection; 24 import java.net.URL; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.StringTokenizer; 28 29 import org.apache.commons.io.input.BoundedInputStream; 30 import org.apache.hadoop.fs.FSInputStream; 31 import org.apache.http.HttpStatus; 32 33 import com.google.common.annotations.VisibleForTesting; 34 import com.google.common.net.HttpHeaders; 35 36 /** 37 * To support HTTP byte streams, a new connection to an HTTP server needs to be 38 * created each time. This class hides the complexity of those multiple 39 * connections from the client. Whenever seek() is called, a new connection 40 * is made on the successive read(). The normal input stream functions are 41 * connected to the currently active input stream. 42 */ 43 public abstract class ByteRangeInputStream extends FSInputStream { 44 45 /** 46 * This class wraps a URL and provides method to open connection. 47 * It can be overridden to change how a connection is opened. 48 */ 49 public static abstract class URLOpener { 50 protected URL url; 51 URLOpener(URL u)52 public URLOpener(URL u) { 53 url = u; 54 } 55 setURL(URL u)56 public void setURL(URL u) { 57 url = u; 58 } 59 getURL()60 public URL getURL() { 61 return url; 62 } 63 64 /** Connect to server with a data offset. */ connect(final long offset, final boolean resolved)65 protected abstract HttpURLConnection connect(final long offset, 66 final boolean resolved) throws IOException; 67 } 68 69 enum StreamStatus { 70 NORMAL, SEEK, CLOSED 71 } 72 protected InputStream in; 73 protected final URLOpener originalURL; 74 protected final URLOpener resolvedURL; 75 protected long startPos = 0; 76 protected long currentPos = 0; 77 protected Long fileLength = null; 78 79 StreamStatus status = StreamStatus.SEEK; 80 81 /** 82 * Create with the specified URLOpeners. Original url is used to open the 83 * stream for the first time. Resolved url is used in subsequent requests. 84 * @param o Original url 85 * @param r Resolved url 86 */ ByteRangeInputStream(URLOpener o, URLOpener r)87 public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException { 88 this.originalURL = o; 89 this.resolvedURL = r; 90 getInputStream(); 91 } 92 getResolvedUrl(final HttpURLConnection connection )93 protected abstract URL getResolvedUrl(final HttpURLConnection connection 94 ) throws IOException; 95 96 @VisibleForTesting getInputStream()97 protected InputStream getInputStream() throws IOException { 98 switch (status) { 99 case NORMAL: 100 break; 101 case SEEK: 102 if (in != null) { 103 in.close(); 104 } 105 in = openInputStream(); 106 status = StreamStatus.NORMAL; 107 break; 108 case CLOSED: 109 throw new IOException("Stream closed"); 110 } 111 return in; 112 } 113 114 @VisibleForTesting openInputStream()115 protected InputStream openInputStream() throws IOException { 116 // Use the original url if no resolved url exists, eg. if 117 // it's the first time a request is made. 118 final boolean resolved = resolvedURL.getURL() != null; 119 final URLOpener opener = resolved? resolvedURL: originalURL; 120 121 final HttpURLConnection connection = opener.connect(startPos, resolved); 122 resolvedURL.setURL(getResolvedUrl(connection)); 123 124 InputStream in = connection.getInputStream(); 125 final Map<String, List<String>> headers = connection.getHeaderFields(); 126 if (isChunkedTransferEncoding(headers)) { 127 // file length is not known 128 fileLength = null; 129 } else { 130 // for non-chunked transfer-encoding, get content-length 131 long streamlength = getStreamLength(connection, headers); 132 fileLength = startPos + streamlength; 133 134 // Java has a bug with >2GB request streams. It won't bounds check 135 // the reads so the transfer blocks until the server times out 136 in = new BoundedInputStream(in, streamlength); 137 } 138 139 return in; 140 } 141 getStreamLength(HttpURLConnection connection, Map<String, List<String>> headers)142 private static long getStreamLength(HttpURLConnection connection, 143 Map<String, List<String>> headers) throws IOException { 144 String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH); 145 if (cl == null) { 146 // Try to get the content length by parsing the content range 147 // because HftpFileSystem does not return the content length 148 // if the content is partial. 149 if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) { 150 cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE); 151 return getLengthFromRange(cl); 152 } else { 153 throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: " 154 + headers); 155 } 156 } 157 return Long.parseLong(cl); 158 } 159 getLengthFromRange(String cl)160 private static long getLengthFromRange(String cl) throws IOException { 161 try { 162 163 String[] str = cl.substring(6).split("[-/]"); 164 return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1; 165 } catch (Exception e) { 166 throw new IOException( 167 "failed to get content length by parsing the content range: " + cl 168 + " " + e.getMessage()); 169 } 170 } 171 isChunkedTransferEncoding( final Map<String, List<String>> headers)172 private static boolean isChunkedTransferEncoding( 173 final Map<String, List<String>> headers) { 174 return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked") 175 || contains(headers, HttpHeaders.TE, "chunked"); 176 } 177 178 /** Does the HTTP header map contain the given key, value pair? */ contains(final Map<String, List<String>> headers, final String key, final String value)179 private static boolean contains(final Map<String, List<String>> headers, 180 final String key, final String value) { 181 final List<String> values = headers.get(key); 182 if (values != null) { 183 for(String v : values) { 184 for(final StringTokenizer t = new StringTokenizer(v, ","); 185 t.hasMoreTokens(); ) { 186 if (value.equalsIgnoreCase(t.nextToken())) { 187 return true; 188 } 189 } 190 } 191 } 192 return false; 193 } 194 update(final int n)195 private int update(final int n) throws IOException { 196 if (n != -1) { 197 currentPos += n; 198 } else if (fileLength != null && currentPos < fileLength) { 199 throw new IOException("Got EOF but currentPos = " + currentPos 200 + " < filelength = " + fileLength); 201 } 202 return n; 203 } 204 205 @Override read()206 public int read() throws IOException { 207 final int b = getInputStream().read(); 208 update((b == -1) ? -1 : 1); 209 return b; 210 } 211 212 @Override read(byte b[], int off, int len)213 public int read(byte b[], int off, int len) throws IOException { 214 return update(getInputStream().read(b, off, len)); 215 } 216 217 /** 218 * Seek to the given offset from the start of the file. 219 * The next read() will be from that location. Can't 220 * seek past the end of the file. 221 */ 222 @Override seek(long pos)223 public void seek(long pos) throws IOException { 224 if (pos != currentPos) { 225 startPos = pos; 226 currentPos = pos; 227 if (status != StreamStatus.CLOSED) { 228 status = StreamStatus.SEEK; 229 } 230 } 231 } 232 233 /** 234 * Return the current offset from the start of the file 235 */ 236 @Override getPos()237 public long getPos() throws IOException { 238 return currentPos; 239 } 240 241 /** 242 * Seeks a different copy of the data. Returns true if 243 * found a new source, false otherwise. 244 */ 245 @Override seekToNewSource(long targetPos)246 public boolean seekToNewSource(long targetPos) throws IOException { 247 return false; 248 } 249 250 @Override close()251 public void close() throws IOException { 252 if (in != null) { 253 in.close(); 254 in = null; 255 } 256 status = StreamStatus.CLOSED; 257 } 258 } 259