1package proxy
2
3import (
4	"context"
5	"io"
6	"net/http"
7	"strconv"
8	"sync"
9	"time"
10
11	"github.com/docker/distribution"
12	dcontext "github.com/docker/distribution/context"
13	"github.com/docker/distribution/reference"
14	"github.com/docker/distribution/registry/proxy/scheduler"
15	"github.com/opencontainers/go-digest"
16)
17
18// todo(richardscothern): from cache control header or config file
19const blobTTL = 24 * 7 * time.Hour
20
21type proxyBlobStore struct {
22	localStore     distribution.BlobStore
23	remoteStore    distribution.BlobService
24	scheduler      *scheduler.TTLExpirationScheduler
25	repositoryName reference.Named
26	authChallenger authChallenger
27}
28
29var _ distribution.BlobStore = &proxyBlobStore{}
30
31// inflight tracks currently downloading blobs
32var inflight = make(map[digest.Digest]struct{})
33
34// mu protects inflight
35var mu sync.Mutex
36
37func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
38	w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
39	w.Header().Set("Content-Type", mediaType)
40	w.Header().Set("Docker-Content-Digest", digest.String())
41	w.Header().Set("Etag", digest.String())
42}
43
44func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
45	desc, err := pbs.remoteStore.Stat(ctx, dgst)
46	if err != nil {
47		return distribution.Descriptor{}, err
48	}
49
50	if w, ok := writer.(http.ResponseWriter); ok {
51		setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
52	}
53
54	remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
55	if err != nil {
56		return distribution.Descriptor{}, err
57	}
58
59	defer remoteReader.Close()
60
61	_, err = io.CopyN(writer, remoteReader, desc.Size)
62	if err != nil {
63		return distribution.Descriptor{}, err
64	}
65
66	proxyMetrics.BlobPush(uint64(desc.Size))
67
68	return desc, nil
69}
70
71func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
72	localDesc, err := pbs.localStore.Stat(ctx, dgst)
73	if err != nil {
74		// Stat can report a zero sized file here if it's checked between creation
75		// and population.  Return nil error, and continue
76		return false, nil
77	}
78
79	if err == nil {
80		proxyMetrics.BlobPush(uint64(localDesc.Size))
81		return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
82	}
83
84	return false, nil
85
86}
87
88func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
89	defer func() {
90		mu.Lock()
91		delete(inflight, dgst)
92		mu.Unlock()
93	}()
94
95	var desc distribution.Descriptor
96	var err error
97	var bw distribution.BlobWriter
98
99	bw, err = pbs.localStore.Create(ctx)
100	if err != nil {
101		return err
102	}
103
104	desc, err = pbs.copyContent(ctx, dgst, bw)
105	if err != nil {
106		return err
107	}
108
109	_, err = bw.Commit(ctx, desc)
110	if err != nil {
111		return err
112	}
113
114	return nil
115}
116
117func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
118	served, err := pbs.serveLocal(ctx, w, r, dgst)
119	if err != nil {
120		dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
121		return err
122	}
123
124	if served {
125		return nil
126	}
127
128	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
129		return err
130	}
131
132	mu.Lock()
133	_, ok := inflight[dgst]
134	if ok {
135		mu.Unlock()
136		_, err := pbs.copyContent(ctx, dgst, w)
137		return err
138	}
139	inflight[dgst] = struct{}{}
140	mu.Unlock()
141
142	go func(dgst digest.Digest) {
143		if err := pbs.storeLocal(ctx, dgst); err != nil {
144			dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
145		}
146
147		blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
148		if err != nil {
149			dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
150			return
151		}
152
153		pbs.scheduler.AddBlob(blobRef, repositoryTTL)
154	}(dgst)
155
156	_, err = pbs.copyContent(ctx, dgst, w)
157	if err != nil {
158		return err
159	}
160	return nil
161}
162
163func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
164	desc, err := pbs.localStore.Stat(ctx, dgst)
165	if err == nil {
166		return desc, err
167	}
168
169	if err != distribution.ErrBlobUnknown {
170		return distribution.Descriptor{}, err
171	}
172
173	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
174		return distribution.Descriptor{}, err
175	}
176
177	return pbs.remoteStore.Stat(ctx, dgst)
178}
179
180func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
181	blob, err := pbs.localStore.Get(ctx, dgst)
182	if err == nil {
183		return blob, nil
184	}
185
186	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
187		return []byte{}, err
188	}
189
190	blob, err = pbs.remoteStore.Get(ctx, dgst)
191	if err != nil {
192		return []byte{}, err
193	}
194
195	_, err = pbs.localStore.Put(ctx, "", blob)
196	if err != nil {
197		return []byte{}, err
198	}
199	return blob, nil
200}
201
202// Unsupported functions
203func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
204	return distribution.Descriptor{}, distribution.ErrUnsupported
205}
206
207func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
208	return nil, distribution.ErrUnsupported
209}
210
211func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
212	return nil, distribution.ErrUnsupported
213}
214
215func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
216	return distribution.Descriptor{}, distribution.ErrUnsupported
217}
218
219func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
220	return nil, distribution.ErrUnsupported
221}
222
223func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
224	return distribution.ErrUnsupported
225}
226