1package resumable // import "github.com/docker/docker/registry/resumable"
2
3import (
4	"fmt"
5	"io"
6	"net/http"
7	"time"
8
9	"github.com/sirupsen/logrus"
10)
11
12type requestReader struct {
13	client          *http.Client
14	request         *http.Request
15	lastRange       int64
16	totalSize       int64
17	currentResponse *http.Response
18	failures        uint32
19	maxFailures     uint32
20	waitDuration    time.Duration
21}
22
23// NewRequestReader makes it possible to resume reading a request's body transparently
24// maxfail is the number of times we retry to make requests again (not resumes)
25// totalsize is the total length of the body; auto detect if not provided
26func NewRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
27	return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second}
28}
29
30// NewRequestReaderWithInitialResponse makes it possible to resume
31// reading the body of an already initiated request.
32func NewRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
33	return &requestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second}
34}
35
36func (r *requestReader) Read(p []byte) (n int, err error) {
37	if r.client == nil || r.request == nil {
38		return 0, fmt.Errorf("client and request can't be nil")
39	}
40	isFreshRequest := false
41	if r.lastRange != 0 && r.currentResponse == nil {
42		readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
43		r.request.Header.Set("Range", readRange)
44		time.Sleep(r.waitDuration)
45	}
46	if r.currentResponse == nil {
47		r.currentResponse, err = r.client.Do(r.request)
48		isFreshRequest = true
49	}
50	if err != nil && r.failures+1 != r.maxFailures {
51		r.cleanUpResponse()
52		r.failures++
53		time.Sleep(time.Duration(r.failures) * r.waitDuration)
54		return 0, nil
55	} else if err != nil {
56		r.cleanUpResponse()
57		return 0, err
58	}
59	if r.currentResponse.StatusCode == http.StatusRequestedRangeNotSatisfiable && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
60		r.cleanUpResponse()
61		return 0, io.EOF
62	} else if r.currentResponse.StatusCode != http.StatusPartialContent && r.lastRange != 0 && isFreshRequest {
63		r.cleanUpResponse()
64		return 0, fmt.Errorf("the server doesn't support byte ranges")
65	}
66	if r.totalSize == 0 {
67		r.totalSize = r.currentResponse.ContentLength
68	} else if r.totalSize <= 0 {
69		r.cleanUpResponse()
70		return 0, fmt.Errorf("failed to auto detect content length")
71	}
72	n, err = r.currentResponse.Body.Read(p)
73	r.lastRange += int64(n)
74	if err != nil {
75		r.cleanUpResponse()
76	}
77	if err != nil && err != io.EOF {
78		logrus.Infof("encountered error during pull and clearing it before resume: %s", err)
79		err = nil
80	}
81	return n, err
82}
83
84func (r *requestReader) Close() error {
85	r.cleanUpResponse()
86	r.client = nil
87	r.request = nil
88	return nil
89}
90
91func (r *requestReader) cleanUpResponse() {
92	if r.currentResponse != nil {
93		r.currentResponse.Body.Close()
94		r.currentResponse = nil
95	}
96}
97