1package transport 2 3import ( 4 "errors" 5 "fmt" 6 "io" 7 "net/http" 8 "regexp" 9 "strconv" 10) 11 12var ( 13 contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`) 14 15 // ErrWrongCodeForByteRange is returned if the client sends a request 16 // with a Range header but the server returns a 2xx or 3xx code other 17 // than 206 Partial Content. 18 ErrWrongCodeForByteRange = errors.New("expected HTTP 206 from byte range request") 19) 20 21// ReadSeekCloser combines io.ReadSeeker with io.Closer. 22type ReadSeekCloser interface { 23 io.ReadSeeker 24 io.Closer 25} 26 27// NewHTTPReadSeeker handles reading from an HTTP endpoint using a GET 28// request. When seeking and starting a read from a non-zero offset 29// the a "Range" header will be added which sets the offset. 30// TODO(dmcgowan): Move this into a separate utility package 31func NewHTTPReadSeeker(client *http.Client, url string, errorHandler func(*http.Response) error) ReadSeekCloser { 32 return &httpReadSeeker{ 33 client: client, 34 url: url, 35 errorHandler: errorHandler, 36 } 37} 38 39type httpReadSeeker struct { 40 client *http.Client 41 url string 42 43 // errorHandler creates an error from an unsuccessful HTTP response. 44 // This allows the error to be created with the HTTP response body 45 // without leaking the body through a returned error. 46 errorHandler func(*http.Response) error 47 48 size int64 49 50 // rc is the remote read closer. 51 rc io.ReadCloser 52 // readerOffset tracks the offset as of the last read. 53 readerOffset int64 54 // seekOffset allows Seek to override the offset. Seek changes 55 // seekOffset instead of changing readOffset directly so that 56 // connection resets can be delayed and possibly avoided if the 57 // seek is undone (i.e. seeking to the end and then back to the 58 // beginning). 59 seekOffset int64 60 err error 61} 62 63func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { 64 if hrs.err != nil { 65 return 0, hrs.err 66 } 67 68 // If we sought to a different position, we need to reset the 69 // connection. This logic is here instead of Seek so that if 70 // a seek is undone before the next read, the connection doesn't 71 // need to be closed and reopened. A common example of this is 72 // seeking to the end to determine the length, and then seeking 73 // back to the original position. 74 if hrs.readerOffset != hrs.seekOffset { 75 hrs.reset() 76 } 77 78 hrs.readerOffset = hrs.seekOffset 79 80 rd, err := hrs.reader() 81 if err != nil { 82 return 0, err 83 } 84 85 n, err = rd.Read(p) 86 hrs.seekOffset += int64(n) 87 hrs.readerOffset += int64(n) 88 89 return n, err 90} 91 92func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { 93 if hrs.err != nil { 94 return 0, hrs.err 95 } 96 97 lastReaderOffset := hrs.readerOffset 98 99 if whence == io.SeekStart && hrs.rc == nil { 100 // If no request has been made yet, and we are seeking to an 101 // absolute position, set the read offset as well to avoid an 102 // unnecessary request. 103 hrs.readerOffset = offset 104 } 105 106 _, err := hrs.reader() 107 if err != nil { 108 hrs.readerOffset = lastReaderOffset 109 return 0, err 110 } 111 112 newOffset := hrs.seekOffset 113 114 switch whence { 115 case io.SeekCurrent: 116 newOffset += offset 117 case io.SeekEnd: 118 if hrs.size < 0 { 119 return 0, errors.New("content length not known") 120 } 121 newOffset = hrs.size + offset 122 case io.SeekStart: 123 newOffset = offset 124 } 125 126 if newOffset < 0 { 127 err = errors.New("cannot seek to negative position") 128 } else { 129 hrs.seekOffset = newOffset 130 } 131 132 return hrs.seekOffset, err 133} 134 135func (hrs *httpReadSeeker) Close() error { 136 if hrs.err != nil { 137 return hrs.err 138 } 139 140 // close and release reader chain 141 if hrs.rc != nil { 142 hrs.rc.Close() 143 } 144 145 hrs.rc = nil 146 147 hrs.err = errors.New("httpLayer: closed") 148 149 return nil 150} 151 152func (hrs *httpReadSeeker) reset() { 153 if hrs.err != nil { 154 return 155 } 156 if hrs.rc != nil { 157 hrs.rc.Close() 158 hrs.rc = nil 159 } 160} 161 162func (hrs *httpReadSeeker) reader() (io.Reader, error) { 163 if hrs.err != nil { 164 return nil, hrs.err 165 } 166 167 if hrs.rc != nil { 168 return hrs.rc, nil 169 } 170 171 req, err := http.NewRequest("GET", hrs.url, nil) 172 if err != nil { 173 return nil, err 174 } 175 176 if hrs.readerOffset > 0 { 177 // If we are at different offset, issue a range request from there. 178 req.Header.Add("Range", fmt.Sprintf("bytes=%d-", hrs.readerOffset)) 179 // TODO: get context in here 180 // context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range")) 181 } 182 183 req.Header.Add("Accept-Encoding", "identity") 184 resp, err := hrs.client.Do(req) 185 if err != nil { 186 return nil, err 187 } 188 189 // Normally would use client.SuccessStatus, but that would be a cyclic 190 // import 191 if resp.StatusCode >= 200 && resp.StatusCode <= 399 { 192 if hrs.readerOffset > 0 { 193 if resp.StatusCode != http.StatusPartialContent { 194 return nil, ErrWrongCodeForByteRange 195 } 196 197 contentRange := resp.Header.Get("Content-Range") 198 if contentRange == "" { 199 return nil, errors.New("no Content-Range header found in HTTP 206 response") 200 } 201 202 submatches := contentRangeRegexp.FindStringSubmatch(contentRange) 203 if len(submatches) < 4 { 204 return nil, fmt.Errorf("could not parse Content-Range header: %s", contentRange) 205 } 206 207 startByte, err := strconv.ParseUint(submatches[1], 10, 64) 208 if err != nil { 209 return nil, fmt.Errorf("could not parse start of range in Content-Range header: %s", contentRange) 210 } 211 212 if startByte != uint64(hrs.readerOffset) { 213 return nil, fmt.Errorf("received Content-Range starting at offset %d instead of requested %d", startByte, hrs.readerOffset) 214 } 215 216 endByte, err := strconv.ParseUint(submatches[2], 10, 64) 217 if err != nil { 218 return nil, fmt.Errorf("could not parse end of range in Content-Range header: %s", contentRange) 219 } 220 221 if submatches[3] == "*" { 222 hrs.size = -1 223 } else { 224 size, err := strconv.ParseUint(submatches[3], 10, 64) 225 if err != nil { 226 return nil, fmt.Errorf("could not parse total size in Content-Range header: %s", contentRange) 227 } 228 229 if endByte+1 != size { 230 return nil, fmt.Errorf("range in Content-Range stops before the end of the content: %s", contentRange) 231 } 232 233 hrs.size = int64(size) 234 } 235 } else if resp.StatusCode == http.StatusOK { 236 hrs.size = resp.ContentLength 237 } else { 238 hrs.size = -1 239 } 240 hrs.rc = resp.Body 241 } else { 242 defer resp.Body.Close() 243 if hrs.errorHandler != nil { 244 return nil, hrs.errorHandler(resp) 245 } 246 return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) 247 } 248 249 return hrs.rc, nil 250} 251