1package proxy 2 3import ( 4 "context" 5 "io" 6 "net/http" 7 "strconv" 8 "sync" 9 "time" 10 11 "github.com/docker/distribution" 12 dcontext "github.com/docker/distribution/context" 13 "github.com/docker/distribution/reference" 14 "github.com/docker/distribution/registry/proxy/scheduler" 15 "github.com/opencontainers/go-digest" 16) 17 18// todo(richardscothern): from cache control header or config file 19const blobTTL = 24 * 7 * time.Hour 20 21type proxyBlobStore struct { 22 localStore distribution.BlobStore 23 remoteStore distribution.BlobService 24 scheduler *scheduler.TTLExpirationScheduler 25 repositoryName reference.Named 26 authChallenger authChallenger 27} 28 29var _ distribution.BlobStore = &proxyBlobStore{} 30 31// inflight tracks currently downloading blobs 32var inflight = make(map[digest.Digest]struct{}) 33 34// mu protects inflight 35var mu sync.Mutex 36 37func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { 38 w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) 39 w.Header().Set("Content-Type", mediaType) 40 w.Header().Set("Docker-Content-Digest", digest.String()) 41 w.Header().Set("Etag", digest.String()) 42} 43 44func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) { 45 desc, err := pbs.remoteStore.Stat(ctx, dgst) 46 if err != nil { 47 return distribution.Descriptor{}, err 48 } 49 50 if w, ok := writer.(http.ResponseWriter); ok { 51 setResponseHeaders(w, desc.Size, desc.MediaType, dgst) 52 } 53 54 remoteReader, err := pbs.remoteStore.Open(ctx, dgst) 55 if err != nil { 56 return distribution.Descriptor{}, err 57 } 58 59 defer remoteReader.Close() 60 61 _, err = io.CopyN(writer, remoteReader, desc.Size) 62 if err != nil { 63 return distribution.Descriptor{}, err 64 } 65 66 proxyMetrics.BlobPush(uint64(desc.Size)) 67 68 return desc, nil 69} 70 71func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) { 72 localDesc, err := pbs.localStore.Stat(ctx, dgst) 73 if err != nil { 74 // Stat can report a zero sized file here if it's checked between creation 75 // and population. Return nil error, and continue 76 return false, nil 77 } 78 79 if err == nil { 80 proxyMetrics.BlobPush(uint64(localDesc.Size)) 81 return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) 82 } 83 84 return false, nil 85 86} 87 88func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error { 89 defer func() { 90 mu.Lock() 91 delete(inflight, dgst) 92 mu.Unlock() 93 }() 94 95 var desc distribution.Descriptor 96 var err error 97 var bw distribution.BlobWriter 98 99 bw, err = pbs.localStore.Create(ctx) 100 if err != nil { 101 return err 102 } 103 104 desc, err = pbs.copyContent(ctx, dgst, bw) 105 if err != nil { 106 return err 107 } 108 109 _, err = bw.Commit(ctx, desc) 110 if err != nil { 111 return err 112 } 113 114 return nil 115} 116 117func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { 118 served, err := pbs.serveLocal(ctx, w, r, dgst) 119 if err != nil { 120 dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error()) 121 return err 122 } 123 124 if served { 125 return nil 126 } 127 128 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { 129 return err 130 } 131 132 mu.Lock() 133 _, ok := inflight[dgst] 134 if ok { 135 mu.Unlock() 136 _, err := pbs.copyContent(ctx, dgst, w) 137 return err 138 } 139 inflight[dgst] = struct{}{} 140 mu.Unlock() 141 142 go func(dgst digest.Digest) { 143 if err := pbs.storeLocal(ctx, dgst); err != nil { 144 dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error()) 145 } 146 147 blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) 148 if err != nil { 149 dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) 150 return 151 } 152 153 pbs.scheduler.AddBlob(blobRef, repositoryTTL) 154 }(dgst) 155 156 _, err = pbs.copyContent(ctx, dgst, w) 157 if err != nil { 158 return err 159 } 160 return nil 161} 162 163func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { 164 desc, err := pbs.localStore.Stat(ctx, dgst) 165 if err == nil { 166 return desc, err 167 } 168 169 if err != distribution.ErrBlobUnknown { 170 return distribution.Descriptor{}, err 171 } 172 173 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { 174 return distribution.Descriptor{}, err 175 } 176 177 return pbs.remoteStore.Stat(ctx, dgst) 178} 179 180func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { 181 blob, err := pbs.localStore.Get(ctx, dgst) 182 if err == nil { 183 return blob, nil 184 } 185 186 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil { 187 return []byte{}, err 188 } 189 190 blob, err = pbs.remoteStore.Get(ctx, dgst) 191 if err != nil { 192 return []byte{}, err 193 } 194 195 _, err = pbs.localStore.Put(ctx, "", blob) 196 if err != nil { 197 return []byte{}, err 198 } 199 return blob, nil 200} 201 202// Unsupported functions 203func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { 204 return distribution.Descriptor{}, distribution.ErrUnsupported 205} 206 207func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { 208 return nil, distribution.ErrUnsupported 209} 210 211func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { 212 return nil, distribution.ErrUnsupported 213} 214 215func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { 216 return distribution.Descriptor{}, distribution.ErrUnsupported 217} 218 219func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { 220 return nil, distribution.ErrUnsupported 221} 222 223func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { 224 return distribution.ErrUnsupported 225} 226