1package health
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"strings"
8	"testing"
9	"time"
10
11	"github.com/google/go-cmp/cmp"
12	"github.com/hashicorp/go-hclog"
13	"github.com/hashicorp/go-uuid"
14	"github.com/stretchr/testify/require"
15	"google.golang.org/grpc/codes"
16	"google.golang.org/grpc/status"
17
18	"github.com/hashicorp/consul/agent/structs"
19	"github.com/hashicorp/consul/agent/submatview"
20	"github.com/hashicorp/consul/proto/pbcommon"
21	"github.com/hashicorp/consul/proto/pbservice"
22	"github.com/hashicorp/consul/proto/pbsubscribe"
23	"github.com/hashicorp/consul/types"
24)
25
26func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
27	index := uint64(42)
28	buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
29		newID, err := uuid.GenerateUUID()
30		require.NoError(t, err)
31		return structs.CheckServiceNode{
32			Node: &structs.Node{
33				ID:         types.NodeID(strings.ToUpper(newID)),
34				Node:       nodeName,
35				Address:    nodeName,
36				Datacenter: "dc1",
37				RaftIndex: structs.RaftIndex{
38					CreateIndex: index,
39					ModifyIndex: index,
40				},
41			},
42			Service: &structs.NodeService{
43				ID:      serviceID,
44				Service: "testService",
45				Port:    8080,
46				Weights: &structs.Weights{
47					Passing: 1,
48					Warning: 1,
49				},
50				RaftIndex: structs.RaftIndex{
51					CreateIndex: index,
52					ModifyIndex: index,
53				},
54				EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
55			},
56			Checks: []*structs.HealthCheck{},
57		}
58	}
59	zero := buildTestNode("a-zero-node", "testService:1")
60	one := buildTestNode("node1", "testService:1")
61	two := buildTestNode("node1", "testService:2")
62	three := buildTestNode("node2", "testService")
63	result := structs.IndexedCheckServiceNodes{
64		Nodes:     structs.CheckServiceNodes{three, two, zero, one},
65		QueryMeta: structs.QueryMeta{Index: index},
66	}
67	sortCheckServiceNodes(&result)
68	expected := structs.CheckServiceNodes{zero, one, two, three}
69	require.Equal(t, expected, result.Nodes)
70}
71
72func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
73	if testing.Short() {
74		t.Skip("too slow for testing.Short")
75	}
76
77	namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
78	streamClient := newStreamClient(validateNamespace(namespace))
79
80	ctx, cancel := context.WithCancel(context.Background())
81	defer cancel()
82
83	store := submatview.NewStore(hclog.New(nil))
84	go store.Run(ctx)
85
86	// Initially there are no services registered. Server should send an
87	// EndOfSnapshot message immediately with index of 1.
88	streamClient.QueueEvents(newEndOfSnapshotEvent(1))
89
90	req := serviceRequestStub{
91		serviceRequest: serviceRequest{
92			ServiceSpecificRequest: structs.ServiceSpecificRequest{
93				Datacenter:     "dc1",
94				ServiceName:    "web",
95				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
96				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
97			},
98		},
99		streamClient: streamClient,
100	}
101	empty := &structs.IndexedCheckServiceNodes{
102		Nodes: structs.CheckServiceNodes{},
103		QueryMeta: structs.QueryMeta{
104			Index:   1,
105			Backend: structs.QueryBackendStreaming,
106		},
107	}
108
109	runStep(t, "empty snapshot returned", func(t *testing.T) {
110		result, err := store.Get(ctx, req)
111		require.NoError(t, err)
112
113		require.Equal(t, uint64(1), result.Index)
114		require.Equal(t, empty, result.Value)
115
116		req.QueryOptions.MinQueryIndex = result.Index
117	})
118
119	runStep(t, "blocks for timeout", func(t *testing.T) {
120		// Subsequent fetch should block for the timeout
121		start := time.Now()
122		req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
123		result, err := store.Get(ctx, req)
124		require.NoError(t, err)
125		elapsed := time.Since(start)
126		require.True(t, elapsed >= 200*time.Millisecond,
127			"Fetch should have blocked until timeout")
128
129		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
130		require.Equal(t, empty, result.Value, "result value should not have changed")
131
132		req.QueryOptions.MinQueryIndex = result.Index
133	})
134
135	var lastResultValue structs.CheckServiceNodes
136
137	runStep(t, "blocks until update", func(t *testing.T) {
138		// Make another blocking query with a longer timeout and trigger an update
139		// event part way through.
140		start := time.Now()
141		go func() {
142			time.Sleep(200 * time.Millisecond)
143			streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
144		}()
145
146		req.QueryOptions.MaxQueryTime = time.Second
147		result, err := store.Get(ctx, req)
148		require.NoError(t, err)
149		elapsed := time.Since(start)
150		require.True(t, elapsed >= 200*time.Millisecond,
151			"Fetch should have blocked until the event was delivered")
152		require.True(t, elapsed < time.Second,
153			"Fetch should have returned before the timeout")
154
155		require.Equal(t, uint64(4), result.Index, "result index should not have changed")
156		lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
157		require.Len(t, lastResultValue, 1,
158			"result value should contain the new registration")
159
160		req.QueryOptions.MinQueryIndex = result.Index
161	})
162
163	runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
164		streamClient.QueueErr(tempError("broken pipe"))
165
166		// Next fetch will continue to block until timeout and receive the same
167		// result.
168		start := time.Now()
169		req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
170		result, err := store.Get(ctx, req)
171		require.NoError(t, err)
172		elapsed := time.Since(start)
173		require.True(t, elapsed >= 200*time.Millisecond,
174			"Fetch should have blocked until timeout")
175
176		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
177			"result index should not have changed")
178		require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
179			"result value should not have changed")
180
181		req.QueryOptions.MinQueryIndex = result.Index
182
183		// But an update should still be noticed due to reconnection
184		streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
185
186		start = time.Now()
187		req.QueryOptions.MaxQueryTime = time.Second
188		result, err = store.Get(ctx, req)
189		require.NoError(t, err)
190		elapsed = time.Since(start)
191		require.True(t, elapsed < time.Second,
192			"Fetch should have returned before the timeout")
193
194		require.Equal(t, uint64(10), result.Index, "result index should not have changed")
195		lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
196		require.Len(t, lastResultValue, 2,
197			"result value should contain the new registration")
198
199		req.QueryOptions.MinQueryIndex = result.Index
200	})
201
202	runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
203		// Wait and send the error while fetcher is waiting
204		go func() {
205			time.Sleep(200 * time.Millisecond)
206			streamClient.QueueErr(errors.New("invalid request"))
207		}()
208
209		// Next fetch should return the error
210		start := time.Now()
211		req.QueryOptions.MaxQueryTime = time.Second
212		result, err := store.Get(ctx, req)
213		require.Error(t, err)
214		elapsed := time.Since(start)
215		require.True(t, elapsed >= 200*time.Millisecond,
216			"Fetch should have blocked until error was sent")
217		require.True(t, elapsed < time.Second,
218			"Fetch should have returned before the timeout")
219
220		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
221		require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
222
223		req.QueryOptions.MinQueryIndex = result.Index
224
225		// But an update should still be noticed due to reconnection
226		streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web"))
227
228		req.QueryOptions.MaxQueryTime = time.Second
229		result, err = store.Get(ctx, req)
230		require.NoError(t, err)
231		elapsed = time.Since(start)
232		require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
233
234		require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
235		require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
236			"result value should contain the new registration")
237
238		req.QueryOptions.MinQueryIndex = result.Index
239	})
240}
241
242type tempError string
243
244func (e tempError) Error() string {
245	return string(e)
246}
247
248func (e tempError) Temporary() bool {
249	return true
250}
251
252func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
253	if testing.Short() {
254		t.Skip("too slow for testing.Short")
255	}
256
257	namespace := getNamespace("ns2")
258	client := newStreamClient(validateNamespace(namespace))
259
260	ctx, cancel := context.WithCancel(context.Background())
261	defer cancel()
262
263	store := submatview.NewStore(hclog.New(nil))
264
265	// Create an initial snapshot of 3 instances on different nodes
266	registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
267		return newEventServiceHealthRegister(index, nodeNum, "web")
268	}
269	client.QueueEvents(
270		registerServiceWeb(5, 1),
271		registerServiceWeb(5, 2),
272		registerServiceWeb(5, 3),
273		newEndOfSnapshotEvent(5))
274
275	req := serviceRequestStub{
276		serviceRequest: serviceRequest{
277			ServiceSpecificRequest: structs.ServiceSpecificRequest{
278				Datacenter:     "dc1",
279				ServiceName:    "web",
280				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
281				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
282			},
283		},
284		streamClient: client,
285	}
286
287	runStep(t, "full snapshot returned", func(t *testing.T) {
288		result, err := store.Get(ctx, req)
289		require.NoError(t, err)
290
291		require.Equal(t, uint64(5), result.Index)
292		expected := newExpectedNodes("node1", "node2", "node3")
293		expected.Index = 5
294		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
295
296		req.QueryOptions.MinQueryIndex = result.Index
297	})
298
299	runStep(t, "blocks until deregistration", func(t *testing.T) {
300		// Make another blocking query with a longer timeout and trigger an update
301		// event part way through.
302		start := time.Now()
303		go func() {
304			time.Sleep(200 * time.Millisecond)
305
306			// Deregister instance on node1
307			client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
308		}()
309
310		req.QueryOptions.MaxQueryTime = time.Second
311		result, err := store.Get(ctx, req)
312		require.NoError(t, err)
313		elapsed := time.Since(start)
314		require.True(t, elapsed >= 200*time.Millisecond,
315			"Fetch should have blocked until the event was delivered")
316		require.True(t, elapsed < time.Second,
317			"Fetch should have returned before the timeout")
318
319		require.Equal(t, uint64(20), result.Index)
320		expected := newExpectedNodes("node2", "node3")
321		expected.Index = 20
322		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
323
324		req.QueryOptions.MinQueryIndex = result.Index
325	})
326
327	runStep(t, "server reload is respected", func(t *testing.T) {
328		// Simulates the server noticing the request's ACL token privs changing. To
329		// detect this we'll queue up the new snapshot as a different set of nodes
330		// to the first.
331		client.QueueErr(status.Error(codes.Aborted, "reset by server"))
332
333		client.QueueEvents(
334			registerServiceWeb(50, 3), // overlap existing node
335			registerServiceWeb(50, 4),
336			registerServiceWeb(50, 5),
337			newEndOfSnapshotEvent(50))
338
339		// Make another blocking query with THE SAME index. It should immediately
340		// return the new snapshot.
341		start := time.Now()
342		req.QueryOptions.MaxQueryTime = time.Second
343		result, err := store.Get(ctx, req)
344		require.NoError(t, err)
345		elapsed := time.Since(start)
346		require.True(t, elapsed < time.Second,
347			"Fetch should have returned before the timeout")
348
349		require.Equal(t, uint64(50), result.Index)
350		expected := newExpectedNodes("node3", "node4", "node5")
351		expected.Index = 50
352		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
353
354		req.QueryOptions.MinQueryIndex = result.Index
355	})
356
357	runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
358		client.QueueErr(tempError("temporary connection error"))
359
360		client.QueueEvents(
361			newNewSnapshotToFollowEvent(),
362			registerServiceWeb(50, 3), // overlap existing node
363			registerServiceWeb(50, 4),
364			registerServiceWeb(50, 5),
365			newEndOfSnapshotEvent(50))
366
367		start := time.Now()
368		req.QueryOptions.MinQueryIndex = 49
369		req.QueryOptions.MaxQueryTime = time.Second
370		result, err := store.Get(ctx, req)
371		require.NoError(t, err)
372		elapsed := time.Since(start)
373		require.True(t, elapsed < time.Second,
374			"Fetch should have returned before the timeout")
375
376		require.Equal(t, uint64(50), result.Index)
377		expected := newExpectedNodes("node3", "node4", "node5")
378		expected.Index = 50
379		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
380	})
381}
382
383func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
384	result := &structs.IndexedCheckServiceNodes{}
385	result.QueryMeta.Backend = structs.QueryBackendStreaming
386	for _, node := range nodes {
387		result.Nodes = append(result.Nodes, structs.CheckServiceNode{
388			Node: &structs.Node{Node: node},
389		})
390	}
391	return result
392}
393
394// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
395// by Node name.
396var cmpCheckServiceNodeNames = cmp.Options{
397	cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
398		return x.Node.Node == y.Node.Node
399	}),
400}
401
402func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
403	t.Helper()
404	if diff := cmp.Diff(x, y, opts...); diff != "" {
405		t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
406	}
407}
408
409func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
410	namespace := getNamespace("ns3")
411	client := newStreamClient(validateNamespace(namespace))
412
413	ctx, cancel := context.WithCancel(context.Background())
414	defer cancel()
415
416	store := submatview.NewStore(hclog.New(nil))
417
418	// Create an initial snapshot of 3 instances but in a single event batch
419	batchEv := newEventBatchWithEvents(
420		newEventServiceHealthRegister(5, 1, "web"),
421		newEventServiceHealthRegister(5, 2, "web"),
422		newEventServiceHealthRegister(5, 3, "web"))
423	client.QueueEvents(
424		batchEv,
425		newEndOfSnapshotEvent(5))
426
427	req := serviceRequestStub{
428		serviceRequest: serviceRequest{
429			ServiceSpecificRequest: structs.ServiceSpecificRequest{
430				Datacenter:     "dc1",
431				ServiceName:    "web",
432				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
433				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
434			},
435		},
436		streamClient: client,
437	}
438
439	runStep(t, "full snapshot returned", func(t *testing.T) {
440		result, err := store.Get(ctx, req)
441		require.NoError(t, err)
442
443		require.Equal(t, uint64(5), result.Index)
444
445		expected := newExpectedNodes("node1", "node2", "node3")
446		expected.Index = 5
447		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
448		req.QueryOptions.MinQueryIndex = result.Index
449	})
450
451	runStep(t, "batched updates work too", func(t *testing.T) {
452		// Simulate multiple registrations happening in one Txn (so all have same
453		// index)
454		batchEv := newEventBatchWithEvents(
455			// Deregister an existing node
456			newEventServiceHealthDeregister(20, 1, "web"),
457			// Register another
458			newEventServiceHealthRegister(20, 4, "web"),
459		)
460		client.QueueEvents(batchEv)
461		req.QueryOptions.MaxQueryTime = time.Second
462		result, err := store.Get(ctx, req)
463		require.NoError(t, err)
464
465		require.Equal(t, uint64(20), result.Index)
466		expected := newExpectedNodes("node2", "node3", "node4")
467		expected.Index = 20
468		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
469
470		req.QueryOptions.MinQueryIndex = result.Index
471	})
472}
473
474func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
475	namespace := getNamespace("ns3")
476	streamClient := newStreamClient(validateNamespace(namespace))
477
478	ctx, cancel := context.WithCancel(context.Background())
479	defer cancel()
480
481	store := submatview.NewStore(hclog.New(nil))
482	go store.Run(ctx)
483
484	req := serviceRequestStub{
485		serviceRequest: serviceRequest{
486			ServiceSpecificRequest: structs.ServiceSpecificRequest{
487				Datacenter:     "dc1",
488				ServiceName:    "web",
489				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
490				QueryOptions: structs.QueryOptions{
491					Filter:       `Node.Node == "node2"`,
492					MaxQueryTime: time.Second,
493				},
494			},
495		},
496		streamClient: streamClient,
497	}
498
499	// Create an initial snapshot of 3 instances but in a single event batch
500	batchEv := newEventBatchWithEvents(
501		newEventServiceHealthRegister(5, 1, "web"),
502		newEventServiceHealthRegister(5, 2, "web"),
503		newEventServiceHealthRegister(5, 3, "web"))
504	streamClient.QueueEvents(
505		batchEv,
506		newEndOfSnapshotEvent(5))
507
508	runStep(t, "filtered snapshot returned", func(t *testing.T) {
509		result, err := store.Get(ctx, req)
510		require.NoError(t, err)
511
512		require.Equal(t, uint64(5), result.Index)
513		expected := newExpectedNodes("node2")
514		expected.Index = 5
515		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
516
517		req.QueryOptions.MinQueryIndex = result.Index
518	})
519
520	runStep(t, "filtered updates work too", func(t *testing.T) {
521		// Simulate multiple registrations happening in one Txn (all have same index)
522		batchEv := newEventBatchWithEvents(
523			// Deregister an existing node
524			newEventServiceHealthDeregister(20, 1, "web"),
525			// Register another
526			newEventServiceHealthRegister(20, 4, "web"),
527		)
528		streamClient.QueueEvents(batchEv)
529		result, err := store.Get(ctx, req)
530		require.NoError(t, err)
531
532		require.Equal(t, uint64(20), result.Index)
533		expected := newExpectedNodes("node2")
534		expected.Index = 20
535		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
536	})
537}
538
539// serviceRequestStub overrides NewMaterializer so that test can use a fake
540// StreamClient.
541type serviceRequestStub struct {
542	serviceRequest
543	streamClient submatview.StreamClient
544}
545
546func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) {
547	view, err := newHealthView(r.ServiceSpecificRequest)
548	if err != nil {
549		return nil, err
550	}
551	return submatview.NewMaterializer(submatview.Deps{
552		View:    view,
553		Client:  r.streamClient,
554		Logger:  hclog.New(nil),
555		Request: newMaterializerRequest(r.ServiceSpecificRequest),
556	}), nil
557}
558
559func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
560	node := fmt.Sprintf("node%d", nodeNum)
561	nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
562	addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
563
564	return &pbsubscribe.Event{
565		Index: index,
566		Payload: &pbsubscribe.Event_ServiceHealth{
567			ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
568				Op: pbsubscribe.CatalogOp_Register,
569				CheckServiceNode: &pbservice.CheckServiceNode{
570					Node: &pbservice.Node{
571						ID:         nodeID,
572						Node:       node,
573						Address:    addr,
574						Datacenter: "dc1",
575						RaftIndex: pbcommon.RaftIndex{
576							CreateIndex: index,
577							ModifyIndex: index,
578						},
579					},
580					Service: &pbservice.NodeService{
581						ID:      svc,
582						Service: svc,
583						Port:    8080,
584						RaftIndex: pbcommon.RaftIndex{
585							CreateIndex: index,
586							ModifyIndex: index,
587						},
588					},
589				},
590			},
591		},
592	}
593}
594
595func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
596	node := fmt.Sprintf("node%d", nodeNum)
597
598	return &pbsubscribe.Event{
599		Index: index,
600		Payload: &pbsubscribe.Event_ServiceHealth{
601			ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
602				Op: pbsubscribe.CatalogOp_Deregister,
603				CheckServiceNode: &pbservice.CheckServiceNode{
604					Node: &pbservice.Node{
605						Node: node,
606					},
607					Service: &pbservice.NodeService{
608						ID:      svc,
609						Service: svc,
610						Port:    8080,
611						Weights: &pbservice.Weights{
612							Passing: 1,
613							Warning: 1,
614						},
615						RaftIndex: pbcommon.RaftIndex{
616							// The original insertion index since a delete doesn't update
617							// this. This magic value came from state store tests where we
618							// setup at index 10 and then mutate at index 100. It can be
619							// modified by the caller later and makes it easier than having
620							// yet another argument in the common case.
621							CreateIndex: 10,
622							ModifyIndex: 10,
623						},
624					},
625				},
626			},
627		},
628	}
629}
630
631func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
632	events := make([]*pbsubscribe.Event, len(evs)+1)
633	events[0] = first
634	for i := range evs {
635		events[i+1] = evs[i]
636	}
637	return &pbsubscribe.Event{
638		Index: first.Index,
639		Payload: &pbsubscribe.Event_EventBatch{
640			EventBatch: &pbsubscribe.EventBatch{Events: events},
641		},
642	}
643}
644
645func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
646	return &pbsubscribe.Event{
647		Index:   index,
648		Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
649	}
650}
651
652func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
653	return &pbsubscribe.Event{
654		Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
655	}
656}
657
658// getNamespace returns a namespace if namespace support exists, otherwise
659// returns the empty string. It allows the same tests to work in both oss and ent
660// without duplicating the tests.
661func getNamespace(ns string) string {
662	meta := structs.NewEnterpriseMeta(ns)
663	return meta.NamespaceOrEmpty()
664}
665
666func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
667	return func(request *pbsubscribe.SubscribeRequest) error {
668		if request.Namespace != ns {
669			return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace)
670		}
671		return nil
672	}
673}
674
675func runStep(t *testing.T, name string, fn func(t *testing.T)) {
676	t.Helper()
677	if !t.Run(name, fn) {
678		t.FailNow()
679	}
680}
681
682func TestNewFilterEvaluator(t *testing.T) {
683	type testCase struct {
684		name     string
685		req      structs.ServiceSpecificRequest
686		data     structs.CheckServiceNode
687		expected bool
688	}
689
690	fn := func(t *testing.T, tc testCase) {
691		e, err := newFilterEvaluator(tc.req)
692		require.NoError(t, err)
693		actual, err := e.Evaluate(tc.data)
694		require.NoError(t, err)
695		require.Equal(t, tc.expected, actual)
696	}
697
698	var testCases = []testCase{
699		{
700			name: "single ServiceTags match",
701			req: structs.ServiceSpecificRequest{
702				ServiceTags: []string{"match"},
703				TagFilter:   true,
704			},
705			data: structs.CheckServiceNode{
706				Service: &structs.NodeService{
707					Tags: []string{"extra", "match"},
708				},
709			},
710			expected: true,
711		},
712		{
713			name: "single deprecated ServiceTag match",
714			req: structs.ServiceSpecificRequest{
715				ServiceTag: "match",
716				TagFilter:  true,
717			},
718			data: structs.CheckServiceNode{
719				Service: &structs.NodeService{
720					Tags: []string{"extra", "match"},
721				},
722			},
723			expected: true,
724		},
725		{
726			name: "single ServiceTags mismatch",
727			req: structs.ServiceSpecificRequest{
728				ServiceTags: []string{"other"},
729				TagFilter:   true,
730			},
731			data: structs.CheckServiceNode{
732				Service: &structs.NodeService{
733					Tags: []string{"extra", "match"},
734				},
735			},
736			expected: false,
737		},
738		{
739			name: "multiple ServiceTags match",
740			req: structs.ServiceSpecificRequest{
741				ServiceTags: []string{"match", "second"},
742				TagFilter:   true,
743			},
744			data: structs.CheckServiceNode{
745				Service: &structs.NodeService{
746					Tags: []string{"extra", "match", "second"},
747				},
748			},
749			expected: true,
750		},
751		{
752			name: "multiple ServiceTags mismatch",
753			req: structs.ServiceSpecificRequest{
754				ServiceTags: []string{"match", "not"},
755				TagFilter:   true,
756			},
757			data: structs.CheckServiceNode{
758				Service: &structs.NodeService{
759					Tags: []string{"extra", "match"},
760				},
761			},
762			expected: false,
763		},
764		{
765			name: "single NodeMetaFilter match",
766			req: structs.ServiceSpecificRequest{
767				NodeMetaFilters: map[string]string{"meta1": "match"},
768			},
769			data: structs.CheckServiceNode{
770				Node: &structs.Node{
771					Meta: map[string]string{
772						"meta1": "match",
773						"extra": "some",
774					},
775				},
776			},
777			expected: true,
778		},
779		{
780			name: "single NodeMetaFilter mismatch",
781			req: structs.ServiceSpecificRequest{
782				NodeMetaFilters: map[string]string{
783					"meta1": "match",
784				},
785			},
786			data: structs.CheckServiceNode{
787				Node: &structs.Node{
788					Meta: map[string]string{
789						"meta1": "other",
790						"extra": "some",
791					},
792				},
793			},
794			expected: false,
795		},
796		{
797			name: "multiple NodeMetaFilter match",
798			req: structs.ServiceSpecificRequest{
799				NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
800			},
801			data: structs.CheckServiceNode{
802				Node: &structs.Node{
803					Meta: map[string]string{
804						"meta1": "match",
805						"meta2": "a",
806						"extra": "some",
807					},
808				},
809			},
810			expected: true,
811		},
812		{
813			name: "multiple NodeMetaFilter mismatch",
814			req: structs.ServiceSpecificRequest{
815				NodeMetaFilters: map[string]string{
816					"meta1": "match",
817					"meta2": "beta",
818				},
819			},
820			data: structs.CheckServiceNode{
821				Node: &structs.Node{
822					Meta: map[string]string{
823						"meta1": "other",
824						"meta2": "gamma",
825					},
826				},
827			},
828			expected: false,
829		},
830		{
831			name: "QueryOptions.Filter match",
832			req: structs.ServiceSpecificRequest{
833				QueryOptions: structs.QueryOptions{
834					Filter: `Node.Node == "node3"`,
835				},
836			},
837			data: structs.CheckServiceNode{
838				Node: &structs.Node{Node: "node3"},
839			},
840			expected: true,
841		},
842		{
843			name: "QueryOptions.Filter mismatch",
844			req: structs.ServiceSpecificRequest{
845				QueryOptions: structs.QueryOptions{
846					Filter: `Node.Node == "node2"`,
847				},
848			},
849			data: structs.CheckServiceNode{
850				Node: &structs.Node{Node: "node3"},
851			},
852			expected: false,
853		},
854		{
855			name: "all match",
856			req: structs.ServiceSpecificRequest{
857				QueryOptions: structs.QueryOptions{
858					Filter: `Node.Node == "node3"`,
859				},
860				ServiceTags: []string{"tag1", "tag2"},
861				NodeMetaFilters: map[string]string{
862					"meta1": "match1",
863					"meta2": "match2",
864				},
865			},
866			data: structs.CheckServiceNode{
867				Node: &structs.Node{
868					Node: "node3",
869					Meta: map[string]string{
870						"meta1": "match1",
871						"meta2": "match2",
872						"extra": "other",
873					},
874				},
875				Service: &structs.NodeService{
876					Tags: []string{"tag1", "tag2", "extra"},
877				},
878			},
879			expected: true,
880		},
881	}
882
883	for _, tc := range testCases {
884		t.Run(tc.name, func(t *testing.T) {
885			fn(t, tc)
886		})
887	}
888}
889