1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package shim
18
19import (
20	"context"
21	"sync"
22	"time"
23
24	v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
25	"github.com/containerd/containerd/events"
26	"github.com/containerd/containerd/namespaces"
27	"github.com/containerd/containerd/pkg/ttrpcutil"
28	"github.com/containerd/ttrpc"
29	"github.com/containerd/typeurl"
30	"github.com/sirupsen/logrus"
31)
32
33const (
34	queueSize  = 2048
35	maxRequeue = 5
36)
37
38type item struct {
39	ev    *v1.Envelope
40	ctx   context.Context
41	count int
42}
43
44func NewPublisher(address string) (*RemoteEventsPublisher, error) {
45	client, err := ttrpcutil.NewClient(address)
46	if err != nil {
47		return nil, err
48	}
49
50	l := &RemoteEventsPublisher{
51		client:  client,
52		closed:  make(chan struct{}),
53		requeue: make(chan *item, queueSize),
54	}
55
56	go l.processQueue()
57	return l, nil
58}
59
60type RemoteEventsPublisher struct {
61	client  *ttrpcutil.Client
62	closed  chan struct{}
63	closer  sync.Once
64	requeue chan *item
65}
66
67func (l *RemoteEventsPublisher) Done() <-chan struct{} {
68	return l.closed
69}
70
71func (l *RemoteEventsPublisher) Close() (err error) {
72	err = l.client.Close()
73	l.closer.Do(func() {
74		close(l.closed)
75	})
76	return err
77}
78
79func (l *RemoteEventsPublisher) processQueue() {
80	for i := range l.requeue {
81		if i.count > maxRequeue {
82			logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
83			// drop the event
84			continue
85		}
86
87		if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
88			logrus.WithError(err).Error("forward event")
89			l.queue(i)
90		}
91	}
92}
93
94func (l *RemoteEventsPublisher) queue(i *item) {
95	go func() {
96		i.count++
97		// re-queue after a short delay
98		time.Sleep(time.Duration(1*i.count) * time.Second)
99		l.requeue <- i
100	}()
101}
102
103func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
104	ns, err := namespaces.NamespaceRequired(ctx)
105	if err != nil {
106		return err
107	}
108	any, err := typeurl.MarshalAny(event)
109	if err != nil {
110		return err
111	}
112	i := &item{
113		ev: &v1.Envelope{
114			Timestamp: time.Now(),
115			Namespace: ns,
116			Topic:     topic,
117			Event:     any,
118		},
119		ctx: ctx,
120	}
121
122	if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
123		l.queue(i)
124		return err
125	}
126
127	return nil
128}
129
130func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
131	service, err := l.client.EventsService()
132	if err == nil {
133		fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
134		_, err = service.Forward(fCtx, req)
135		cancel()
136		if err == nil {
137			return nil
138		}
139	}
140
141	if err != ttrpc.ErrClosed {
142		return err
143	}
144
145	// Reconnect and retry request
146	if err = l.client.Reconnect(); err != nil {
147		return err
148	}
149
150	service, err = l.client.EventsService()
151	if err != nil {
152		return err
153	}
154
155	// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
156	fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
157	_, err = service.Forward(fCtx, req)
158	cancel()
159	if err != nil {
160		return err
161	}
162
163	return nil
164}
165