1package notifications
2
3import (
4	"context"
5	"net/http"
6
7	"github.com/docker/distribution"
8
9	dcontext "github.com/docker/distribution/context"
10	"github.com/docker/distribution/reference"
11	"github.com/opencontainers/go-digest"
12)
13
14// ManifestListener describes a set of methods for listening to events related to manifests.
15type ManifestListener interface {
16	ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
17	ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
18	ManifestDeleted(repo reference.Named, dgst digest.Digest) error
19}
20
21// BlobListener describes a listener that can respond to layer related events.
22type BlobListener interface {
23	BlobPushed(repo reference.Named, desc distribution.Descriptor) error
24	BlobPulled(repo reference.Named, desc distribution.Descriptor) error
25	BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
26	BlobDeleted(repo reference.Named, desc digest.Digest) error
27}
28
29// RepoListener provides repository methods that respond to repository lifecycle
30type RepoListener interface {
31	TagDeleted(repo reference.Named, tag string) error
32	RepoDeleted(repo reference.Named) error
33}
34
35// Listener combines all repository events into a single interface.
36type Listener interface {
37	ManifestListener
38	BlobListener
39	RepoListener
40}
41
42type repositoryListener struct {
43	distribution.Repository
44	listener Listener
45}
46
47type removerListener struct {
48	distribution.RepositoryRemover
49	listener Listener
50}
51
52// Listen dispatches events on the repository to the listener.
53func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) {
54	return &repositoryListener{
55			Repository: repo,
56			listener:   listener,
57		}, &removerListener{
58			RepositoryRemover: remover,
59			listener:          listener,
60		}
61}
62
63func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error {
64	err := nl.RepositoryRemover.Remove(ctx, name)
65	if err != nil {
66		return err
67	}
68	return nl.listener.RepoDeleted(name)
69}
70
71func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
72	manifests, err := rl.Repository.Manifests(ctx, options...)
73	if err != nil {
74		return nil, err
75	}
76	return &manifestServiceListener{
77		ManifestService: manifests,
78		parent:          rl,
79	}, nil
80}
81
82func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
83	return &blobServiceListener{
84		BlobStore: rl.Repository.Blobs(ctx),
85		parent:    rl,
86	}
87}
88
89type manifestServiceListener struct {
90	distribution.ManifestService
91	parent *repositoryListener
92}
93
94func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
95	err := msl.ManifestService.Delete(ctx, dgst)
96	if err == nil {
97		if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
98			dcontext.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err)
99		}
100	}
101
102	return err
103}
104
105func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
106	sm, err := msl.ManifestService.Get(ctx, dgst, options...)
107	if err == nil {
108		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
109			dcontext.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err)
110		}
111	}
112
113	return sm, err
114}
115
116func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
117	dgst, err := msl.ManifestService.Put(ctx, sm, options...)
118
119	if err == nil {
120		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
121			dcontext.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err)
122		}
123	}
124
125	return dgst, err
126}
127
128type blobServiceListener struct {
129	distribution.BlobStore
130	parent *repositoryListener
131}
132
133var _ distribution.BlobStore = &blobServiceListener{}
134
135func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
136	p, err := bsl.BlobStore.Get(ctx, dgst)
137	if err == nil {
138		if desc, err := bsl.Stat(ctx, dgst); err != nil {
139			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
140		} else {
141			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
142				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
143			}
144		}
145	}
146
147	return p, err
148}
149
150func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
151	rc, err := bsl.BlobStore.Open(ctx, dgst)
152	if err == nil {
153		if desc, err := bsl.Stat(ctx, dgst); err != nil {
154			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
155		} else {
156			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
157				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
158			}
159		}
160	}
161
162	return rc, err
163}
164
165func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
166	err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
167	if err == nil {
168		if desc, err := bsl.Stat(ctx, dgst); err != nil {
169			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
170		} else {
171			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
172				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
173			}
174		}
175	}
176
177	return err
178}
179
180func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
181	desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
182	if err == nil {
183		if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil {
184			dcontext.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err)
185		}
186	}
187
188	return desc, err
189}
190
191func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
192	wr, err := bsl.BlobStore.Create(ctx, options...)
193	switch err := err.(type) {
194	case distribution.ErrBlobMounted:
195		if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
196			dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
197		}
198		return nil, err
199	}
200	return bsl.decorateWriter(wr), err
201}
202
203func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
204	err := bsl.BlobStore.Delete(ctx, dgst)
205	if err == nil {
206		if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
207			dcontext.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
208		}
209	}
210
211	return err
212}
213
214func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
215	wr, err := bsl.BlobStore.Resume(ctx, id)
216	return bsl.decorateWriter(wr), err
217}
218
219func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
220	return &blobWriterListener{
221		BlobWriter: wr,
222		parent:     bsl,
223	}
224}
225
226type blobWriterListener struct {
227	distribution.BlobWriter
228	parent *blobServiceListener
229}
230
231func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
232	committed, err := bwl.BlobWriter.Commit(ctx, desc)
233	if err == nil {
234		if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil {
235			dcontext.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
236		}
237	}
238
239	return committed, err
240}
241
242type tagServiceListener struct {
243	distribution.TagService
244	parent *repositoryListener
245}
246
247func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService {
248	return &tagServiceListener{
249		TagService: rl.Repository.Tags(ctx),
250		parent:     rl,
251	}
252}
253
254func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error {
255	if err := tagSL.TagService.Untag(ctx, tag); err != nil {
256		return err
257	}
258	if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil {
259		dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err)
260		return err
261	}
262	return nil
263}
264