1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package exchange
18
19import (
20	"context"
21	"reflect"
22	"sync"
23	"testing"
24	"time"
25
26	eventstypes "github.com/containerd/containerd/api/events"
27	"github.com/containerd/containerd/errdefs"
28	"github.com/containerd/containerd/events"
29	"github.com/containerd/containerd/namespaces"
30	"github.com/containerd/typeurl"
31	"github.com/pkg/errors"
32)
33
34func TestExchangeBasic(t *testing.T) {
35	ctx := namespaces.WithNamespace(context.Background(), t.Name())
36	testevents := []events.Event{
37		&eventstypes.ContainerCreate{ID: "asdf"},
38		&eventstypes.ContainerCreate{ID: "qwer"},
39		&eventstypes.ContainerCreate{ID: "zxcv"},
40	}
41	exchange := NewExchange()
42
43	t.Log("subscribe")
44	var cancel1, cancel2 func()
45
46	// Create two subscribers for same set of events and make sure they
47	// traverse the exchange.
48	ctx1, cancel1 := context.WithCancel(ctx)
49	eventq1, errq1 := exchange.Subscribe(ctx1)
50
51	ctx2, cancel2 := context.WithCancel(ctx)
52	eventq2, errq2 := exchange.Subscribe(ctx2)
53
54	t.Log("publish")
55	var wg sync.WaitGroup
56	wg.Add(1)
57	errChan := make(chan error)
58	go func() {
59		defer wg.Done()
60		defer close(errChan)
61		for _, event := range testevents {
62			if err := exchange.Publish(ctx, "/test", event); err != nil {
63				errChan <- err
64				return
65			}
66		}
67
68		t.Log("finished publishing")
69	}()
70
71	t.Log("waiting")
72	wg.Wait()
73	if err := <-errChan; err != nil {
74		t.Fatal(err)
75	}
76
77	for _, subscriber := range []struct {
78		eventq <-chan *events.Envelope
79		errq   <-chan error
80		cancel func()
81	}{
82		{
83			eventq: eventq1,
84			errq:   errq1,
85			cancel: cancel1,
86		},
87		{
88			eventq: eventq2,
89			errq:   errq2,
90			cancel: cancel2,
91		},
92	} {
93		var received []events.Event
94	subscribercheck:
95		for {
96			select {
97			case env := <-subscriber.eventq:
98				ev, err := typeurl.UnmarshalAny(env.Event)
99				if err != nil {
100					t.Fatal(err)
101				}
102				received = append(received, ev.(*eventstypes.ContainerCreate))
103			case err := <-subscriber.errq:
104				if err != nil {
105					t.Fatal(err)
106				}
107				break subscribercheck
108			}
109
110			if reflect.DeepEqual(received, testevents) {
111				// when we do this, we expect the errs channel to be closed and
112				// this will return.
113				subscriber.cancel()
114			}
115		}
116	}
117}
118
119func TestExchangeFilters(t *testing.T) {
120	var (
121		ctx      = namespaces.WithNamespace(context.Background(), t.Name())
122		exchange = NewExchange()
123
124		// config events, All events will be published
125		containerCreateEvents = []events.Event{
126			&eventstypes.ContainerCreate{ID: "asdf"},
127			&eventstypes.ContainerCreate{ID: "qwer"},
128			&eventstypes.ContainerCreate{ID: "zxcv"},
129		}
130		taskExitEvents = []events.Event{
131			&eventstypes.TaskExit{ContainerID: "abcdef"},
132		}
133		testEventSets = []struct {
134			topic  string
135			events []events.Event
136		}{
137			{
138				topic:  "/containers/create",
139				events: containerCreateEvents,
140			},
141			{
142				topic:  "/tasks/exit",
143				events: taskExitEvents,
144			},
145		}
146		allTestEvents = func(eventSets []struct {
147			topic  string
148			events []events.Event
149		}) (events []events.Event) {
150			for _, v := range eventSets {
151				events = append(events, v.events...)
152			}
153			return
154		}(testEventSets)
155
156		// config test cases
157		testCases = []struct {
158			testName string
159			filters  []string
160
161			// The fields as below are for store data. Don't config them.
162			expectEvents []events.Event
163			eventq       <-chan *events.Envelope
164			errq         <-chan error
165			cancel       func()
166		}{
167			{
168				testName:     "No Filter",
169				expectEvents: allTestEvents,
170			},
171			{
172				testName: "Filter events by one topic",
173				filters: []string{
174					`topic=="/containers/create"`,
175				},
176				expectEvents: containerCreateEvents,
177			},
178			{
179				testName: "Filter events by field",
180				filters: []string{
181					"event.id",
182				},
183				expectEvents: containerCreateEvents,
184			},
185			{
186				testName: "Filter events by field OR topic",
187				filters: []string{
188					`topic=="/containers/create"`,
189					"event.id",
190				},
191				expectEvents: containerCreateEvents,
192			},
193			{
194				testName: "Filter events by regex ",
195				filters: []string{
196					`topic~="/containers/*"`,
197				},
198				expectEvents: containerCreateEvents,
199			},
200			{
201				testName: "Filter events for by anyone of two topics",
202				filters: []string{
203					`topic=="/tasks/exit"`,
204					`topic=="/containers/create"`,
205				},
206				expectEvents: append(containerCreateEvents, taskExitEvents...),
207			},
208			{
209				testName: "Filter events for by one topic AND id",
210				filters: []string{
211					`topic=="/containers/create",event.id=="qwer"`,
212				},
213				expectEvents: []events.Event{
214					&eventstypes.ContainerCreate{ID: "qwer"},
215				},
216			},
217		}
218	)
219
220	t.Log("subscribe")
221	for i := range testCases {
222		var ctx1 context.Context
223		ctx1, testCases[i].cancel = context.WithCancel(ctx)
224		testCases[i].eventq, testCases[i].errq = exchange.Subscribe(ctx1, testCases[i].filters...)
225	}
226
227	t.Log("publish")
228	var wg sync.WaitGroup
229	wg.Add(1)
230	errChan := make(chan error)
231	go func() {
232		defer wg.Done()
233		defer close(errChan)
234		for _, es := range testEventSets {
235			for _, e := range es.events {
236				if err := exchange.Publish(ctx, es.topic, e); err != nil {
237					errChan <- err
238					return
239				}
240			}
241		}
242
243		t.Log("finished publishing")
244	}()
245
246	t.Log("waiting")
247	wg.Wait()
248	if err := <-errChan; err != nil {
249		t.Fatal(err)
250	}
251
252	t.Log("receive event")
253	for _, subscriber := range testCases {
254		t.Logf("test case: %q", subscriber.testName)
255		var received []events.Event
256	subscribercheck:
257		for {
258			select {
259			case env := <-subscriber.eventq:
260				ev, err := typeurl.UnmarshalAny(env.Event)
261				if err != nil {
262					t.Fatal(err)
263				}
264				received = append(received, ev)
265			case err := <-subscriber.errq:
266				if err != nil {
267					t.Fatal(err)
268				}
269				break subscribercheck
270			}
271
272			if reflect.DeepEqual(received, subscriber.expectEvents) {
273				// when we do this, we expect the errs channel to be closed and
274				// this will return.
275				subscriber.cancel()
276			}
277		}
278	}
279}
280
281func TestExchangeValidateTopic(t *testing.T) {
282	namespace := t.Name()
283	ctx := namespaces.WithNamespace(context.Background(), namespace)
284	exchange := NewExchange()
285
286	for _, testcase := range []struct {
287		input string
288		err   error
289	}{
290		{
291			input: "/test",
292		},
293		{
294			input: "/test/test",
295		},
296		{
297			input: "test",
298			err:   errdefs.ErrInvalidArgument,
299		},
300	} {
301		t.Run(testcase.input, func(t *testing.T) {
302			event := &eventstypes.ContainerCreate{ID: t.Name()}
303			if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
304				if err == nil {
305					t.Fatalf("expected error %v, received nil", testcase.err)
306				} else {
307					t.Fatalf("expected error %v, received %v", testcase.err, err)
308				}
309			}
310
311			evany, err := typeurl.MarshalAny(event)
312			if err != nil {
313				t.Fatal(err)
314			}
315
316			envelope := events.Envelope{
317				Timestamp: time.Now().UTC(),
318				Namespace: namespace,
319				Topic:     testcase.input,
320				Event:     evany,
321			}
322
323			// make sure we get same errors with forward.
324			if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
325				if err == nil {
326					t.Fatalf("expected error %v, received nil", testcase.err)
327				} else {
328					t.Fatalf("expected error %v, received %v", testcase.err, err)
329				}
330			}
331
332		})
333	}
334}
335