1package notifications 2 3import ( 4 "context" 5 "net/http" 6 7 "github.com/docker/distribution" 8 9 dcontext "github.com/docker/distribution/context" 10 "github.com/docker/distribution/reference" 11 "github.com/opencontainers/go-digest" 12) 13 14// ManifestListener describes a set of methods for listening to events related to manifests. 15type ManifestListener interface { 16 ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error 17 ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error 18 ManifestDeleted(repo reference.Named, dgst digest.Digest) error 19} 20 21// BlobListener describes a listener that can respond to layer related events. 22type BlobListener interface { 23 BlobPushed(repo reference.Named, desc distribution.Descriptor) error 24 BlobPulled(repo reference.Named, desc distribution.Descriptor) error 25 BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error 26 BlobDeleted(repo reference.Named, desc digest.Digest) error 27} 28 29// RepoListener provides repository methods that respond to repository lifecycle 30type RepoListener interface { 31 TagDeleted(repo reference.Named, tag string) error 32 RepoDeleted(repo reference.Named) error 33} 34 35// Listener combines all repository events into a single interface. 36type Listener interface { 37 ManifestListener 38 BlobListener 39 RepoListener 40} 41 42type repositoryListener struct { 43 distribution.Repository 44 listener Listener 45} 46 47type removerListener struct { 48 distribution.RepositoryRemover 49 listener Listener 50} 51 52// Listen dispatches events on the repository to the listener. 53func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) { 54 return &repositoryListener{ 55 Repository: repo, 56 listener: listener, 57 }, &removerListener{ 58 RepositoryRemover: remover, 59 listener: listener, 60 } 61} 62 63func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error { 64 err := nl.RepositoryRemover.Remove(ctx, name) 65 if err != nil { 66 return err 67 } 68 return nl.listener.RepoDeleted(name) 69} 70 71func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { 72 manifests, err := rl.Repository.Manifests(ctx, options...) 73 if err != nil { 74 return nil, err 75 } 76 return &manifestServiceListener{ 77 ManifestService: manifests, 78 parent: rl, 79 }, nil 80} 81 82func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore { 83 return &blobServiceListener{ 84 BlobStore: rl.Repository.Blobs(ctx), 85 parent: rl, 86 } 87} 88 89type manifestServiceListener struct { 90 distribution.ManifestService 91 parent *repositoryListener 92} 93 94func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error { 95 err := msl.ManifestService.Delete(ctx, dgst) 96 if err == nil { 97 if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil { 98 dcontext.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err) 99 } 100 } 101 102 return err 103} 104 105func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { 106 sm, err := msl.ManifestService.Get(ctx, dgst, options...) 107 if err == nil { 108 if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil { 109 dcontext.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err) 110 } 111 } 112 113 return sm, err 114} 115 116func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) { 117 dgst, err := msl.ManifestService.Put(ctx, sm, options...) 118 119 if err == nil { 120 if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil { 121 dcontext.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err) 122 } 123 } 124 125 return dgst, err 126} 127 128type blobServiceListener struct { 129 distribution.BlobStore 130 parent *repositoryListener 131} 132 133var _ distribution.BlobStore = &blobServiceListener{} 134 135func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { 136 p, err := bsl.BlobStore.Get(ctx, dgst) 137 if err == nil { 138 if desc, err := bsl.Stat(ctx, dgst); err != nil { 139 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) 140 } else { 141 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil { 142 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) 143 } 144 } 145 } 146 147 return p, err 148} 149 150func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { 151 rc, err := bsl.BlobStore.Open(ctx, dgst) 152 if err == nil { 153 if desc, err := bsl.Stat(ctx, dgst); err != nil { 154 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) 155 } else { 156 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil { 157 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) 158 } 159 } 160 } 161 162 return rc, err 163} 164 165func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { 166 err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst) 167 if err == nil { 168 if desc, err := bsl.Stat(ctx, dgst); err != nil { 169 dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) 170 } else { 171 if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil { 172 dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) 173 } 174 } 175 } 176 177 return err 178} 179 180func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { 181 desc, err := bsl.BlobStore.Put(ctx, mediaType, p) 182 if err == nil { 183 if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil { 184 dcontext.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err) 185 } 186 } 187 188 return desc, err 189} 190 191func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { 192 wr, err := bsl.BlobStore.Create(ctx, options...) 193 switch err := err.(type) { 194 case distribution.ErrBlobMounted: 195 if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil { 196 dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err) 197 } 198 return nil, err 199 } 200 return bsl.decorateWriter(wr), err 201} 202 203func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error { 204 err := bsl.BlobStore.Delete(ctx, dgst) 205 if err == nil { 206 if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil { 207 dcontext.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err) 208 } 209 } 210 211 return err 212} 213 214func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { 215 wr, err := bsl.BlobStore.Resume(ctx, id) 216 return bsl.decorateWriter(wr), err 217} 218 219func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter { 220 return &blobWriterListener{ 221 BlobWriter: wr, 222 parent: bsl, 223 } 224} 225 226type blobWriterListener struct { 227 distribution.BlobWriter 228 parent *blobServiceListener 229} 230 231func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { 232 committed, err := bwl.BlobWriter.Commit(ctx, desc) 233 if err == nil { 234 if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil { 235 dcontext.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err) 236 } 237 } 238 239 return committed, err 240} 241 242type tagServiceListener struct { 243 distribution.TagService 244 parent *repositoryListener 245} 246 247func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService { 248 return &tagServiceListener{ 249 TagService: rl.Repository.Tags(ctx), 250 parent: rl, 251 } 252} 253 254func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error { 255 if err := tagSL.TagService.Untag(ctx, tag); err != nil { 256 return err 257 } 258 if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil { 259 dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err) 260 return err 261 } 262 return nil 263} 264