1package health
3import (
4	"context"
5	"errors"
6	"fmt"
7	"strings"
8	"testing"
9	"time"
11	"github.com/google/go-cmp/cmp"
12	"github.com/hashicorp/go-hclog"
13	"github.com/hashicorp/go-uuid"
14	"github.com/stretchr/testify/require"
15	"google.golang.org/grpc/codes"
16	"google.golang.org/grpc/status"
18	"github.com/hashicorp/consul/agent/structs"
19	"github.com/hashicorp/consul/agent/submatview"
20	"github.com/hashicorp/consul/proto/pbcommon"
21	"github.com/hashicorp/consul/proto/pbservice"
22	"github.com/hashicorp/consul/proto/pbsubscribe"
23	"github.com/hashicorp/consul/types"
26func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
27	index := uint64(42)
28	buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
29		newID, err := uuid.GenerateUUID()
30		require.NoError(t, err)
31		return structs.CheckServiceNode{
32			Node: &structs.Node{
33				ID:         types.NodeID(strings.ToUpper(newID)),
34				Node:       nodeName,
35				Address:    nodeName,
36				Datacenter: "dc1",
37				RaftIndex: structs.RaftIndex{
38					CreateIndex: index,
39					ModifyIndex: index,
40				},
41			},
42			Service: &structs.NodeService{
43				ID:      serviceID,
44				Service: "testService",
45				Port:    8080,
46				Weights: &structs.Weights{
47					Passing: 1,
48					Warning: 1,
49				},
50				RaftIndex: structs.RaftIndex{
51					CreateIndex: index,
52					ModifyIndex: index,
53				},
54				EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
55			},
56			Checks: []*structs.HealthCheck{},
57		}
58	}
59	zero := buildTestNode("a-zero-node", "testService:1")
60	one := buildTestNode("node1", "testService:1")
61	two := buildTestNode("node1", "testService:2")
62	three := buildTestNode("node2", "testService")
63	result := structs.IndexedCheckServiceNodes{
64		Nodes:     structs.CheckServiceNodes{three, two, zero, one},
65		QueryMeta: structs.QueryMeta{Index: index},
66	}
67	sortCheckServiceNodes(&result)
68	expected := structs.CheckServiceNodes{zero, one, two, three}
69	require.Equal(t, expected, result.Nodes)
72func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
73	if testing.Short() {
74		t.Skip("too slow for testing.Short")
75	}
77	namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
78	streamClient := newStreamClient(validateNamespace(namespace))
80	ctx, cancel := context.WithCancel(context.Background())
81	defer cancel()
83	store := submatview.NewStore(hclog.New(nil))
84	go store.Run(ctx)
86	// Initially there are no services registered. Server should send an
87	// EndOfSnapshot message immediately with index of 1.
88	streamClient.QueueEvents(newEndOfSnapshotEvent(1))
90	req := serviceRequestStub{
91		serviceRequest: serviceRequest{
92			ServiceSpecificRequest: structs.ServiceSpecificRequest{
93				Datacenter:     "dc1",
94				ServiceName:    "web",
95				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
96				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
97			},
98		},
99		streamClient: streamClient,
100	}
101	empty := &structs.IndexedCheckServiceNodes{
102		Nodes: structs.CheckServiceNodes{},
103		QueryMeta: structs.QueryMeta{
104			Index:   1,
105			Backend: structs.QueryBackendStreaming,
106		},
107	}
109	runStep(t, "empty snapshot returned", func(t *testing.T) {
110		result, err := store.Get(ctx, req)
111		require.NoError(t, err)
113		require.Equal(t, uint64(1), result.Index)
114		require.Equal(t, empty, result.Value)
116		req.QueryOptions.MinQueryIndex = result.Index
117	})
119	runStep(t, "blocks for timeout", func(t *testing.T) {
120		// Subsequent fetch should block for the timeout
121		start := time.Now()
122		req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
123		result, err := store.Get(ctx, req)
124		require.NoError(t, err)
125		elapsed := time.Since(start)
126		require.True(t, elapsed >= 200*time.Millisecond,
127			"Fetch should have blocked until timeout")
129		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
130		require.Equal(t, empty, result.Value, "result value should not have changed")
132		req.QueryOptions.MinQueryIndex = result.Index
133	})
135	var lastResultValue structs.CheckServiceNodes
137	runStep(t, "blocks until update", func(t *testing.T) {
138		// Make another blocking query with a longer timeout and trigger an update
139		// event part way through.
140		start := time.Now()
141		go func() {
142			time.Sleep(200 * time.Millisecond)
143			streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
144		}()
146		req.QueryOptions.MaxQueryTime = time.Second
147		result, err := store.Get(ctx, req)
148		require.NoError(t, err)
149		elapsed := time.Since(start)
150		require.True(t, elapsed >= 200*time.Millisecond,
151			"Fetch should have blocked until the event was delivered")
152		require.True(t, elapsed < time.Second,
153			"Fetch should have returned before the timeout")
155		require.Equal(t, uint64(4), result.Index, "result index should not have changed")
156		lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
157		require.Len(t, lastResultValue, 1,
158			"result value should contain the new registration")
160		req.QueryOptions.MinQueryIndex = result.Index
161	})
163	runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
164		streamClient.QueueErr(tempError("broken pipe"))
166		// Next fetch will continue to block until timeout and receive the same
167		// result.
168		start := time.Now()
169		req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
170		result, err := store.Get(ctx, req)
171		require.NoError(t, err)
172		elapsed := time.Since(start)
173		require.True(t, elapsed >= 200*time.Millisecond,
174			"Fetch should have blocked until timeout")
176		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
177			"result index should not have changed")
178		require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
179			"result value should not have changed")
181		req.QueryOptions.MinQueryIndex = result.Index
183		// But an update should still be noticed due to reconnection
184		streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
186		start = time.Now()
187		req.QueryOptions.MaxQueryTime = time.Second
188		result, err = store.Get(ctx, req)
189		require.NoError(t, err)
190		elapsed = time.Since(start)
191		require.True(t, elapsed < time.Second,
192			"Fetch should have returned before the timeout")
194		require.Equal(t, uint64(10), result.Index, "result index should not have changed")
195		lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
196		require.Len(t, lastResultValue, 2,
197			"result value should contain the new registration")
199		req.QueryOptions.MinQueryIndex = result.Index
200	})
202	runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
203		// Wait and send the error while fetcher is waiting
204		go func() {
205			time.Sleep(200 * time.Millisecond)
206			streamClient.QueueErr(errors.New("invalid request"))
207		}()
209		// Next fetch should return the error
210		start := time.Now()
211		req.QueryOptions.MaxQueryTime = time.Second
212		result, err := store.Get(ctx, req)
213		require.Error(t, err)
214		elapsed := time.Since(start)
215		require.True(t, elapsed >= 200*time.Millisecond,
216			"Fetch should have blocked until error was sent")
217		require.True(t, elapsed < time.Second,
218			"Fetch should have returned before the timeout")
220		require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
221		require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
223		req.QueryOptions.MinQueryIndex = result.Index
225		// But an update should still be noticed due to reconnection
226		streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web"))
228		req.QueryOptions.MaxQueryTime = time.Second
229		result, err = store.Get(ctx, req)
230		require.NoError(t, err)
231		elapsed = time.Since(start)
232		require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
234		require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
235		require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
236			"result value should contain the new registration")
238		req.QueryOptions.MinQueryIndex = result.Index
239	})
242type tempError string
244func (e tempError) Error() string {
245	return string(e)
248func (e tempError) Temporary() bool {
249	return true
252func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
253	if testing.Short() {
254		t.Skip("too slow for testing.Short")
255	}
257	namespace := getNamespace("ns2")
258	client := newStreamClient(validateNamespace(namespace))
260	ctx, cancel := context.WithCancel(context.Background())
261	defer cancel()
263	store := submatview.NewStore(hclog.New(nil))
265	// Create an initial snapshot of 3 instances on different nodes
266	registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
267		return newEventServiceHealthRegister(index, nodeNum, "web")
268	}
269	client.QueueEvents(
270		registerServiceWeb(5, 1),
271		registerServiceWeb(5, 2),
272		registerServiceWeb(5, 3),
273		newEndOfSnapshotEvent(5))
275	req := serviceRequestStub{
276		serviceRequest: serviceRequest{
277			ServiceSpecificRequest: structs.ServiceSpecificRequest{
278				Datacenter:     "dc1",
279				ServiceName:    "web",
280				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
281				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
282			},
283		},
284		streamClient: client,
285	}
287	runStep(t, "full snapshot returned", func(t *testing.T) {
288		result, err := store.Get(ctx, req)
289		require.NoError(t, err)
291		require.Equal(t, uint64(5), result.Index)
292		expected := newExpectedNodes("node1", "node2", "node3")
293		expected.Index = 5
294		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
296		req.QueryOptions.MinQueryIndex = result.Index
297	})
299	runStep(t, "blocks until deregistration", func(t *testing.T) {
300		// Make another blocking query with a longer timeout and trigger an update
301		// event part way through.
302		start := time.Now()
303		go func() {
304			time.Sleep(200 * time.Millisecond)
306			// Deregister instance on node1
307			client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
308		}()
310		req.QueryOptions.MaxQueryTime = time.Second
311		result, err := store.Get(ctx, req)
312		require.NoError(t, err)
313		elapsed := time.Since(start)
314		require.True(t, elapsed >= 200*time.Millisecond,
315			"Fetch should have blocked until the event was delivered")
316		require.True(t, elapsed < time.Second,
317			"Fetch should have returned before the timeout")
319		require.Equal(t, uint64(20), result.Index)
320		expected := newExpectedNodes("node2", "node3")
321		expected.Index = 20
322		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
324		req.QueryOptions.MinQueryIndex = result.Index
325	})
327	runStep(t, "server reload is respected", func(t *testing.T) {
328		// Simulates the server noticing the request's ACL token privs changing. To
329		// detect this we'll queue up the new snapshot as a different set of nodes
330		// to the first.
331		client.QueueErr(status.Error(codes.Aborted, "reset by server"))
333		client.QueueEvents(
334			registerServiceWeb(50, 3), // overlap existing node
335			registerServiceWeb(50, 4),
336			registerServiceWeb(50, 5),
337			newEndOfSnapshotEvent(50))
339		// Make another blocking query with THE SAME index. It should immediately
340		// return the new snapshot.
341		start := time.Now()
342		req.QueryOptions.MaxQueryTime = time.Second
343		result, err := store.Get(ctx, req)
344		require.NoError(t, err)
345		elapsed := time.Since(start)
346		require.True(t, elapsed < time.Second,
347			"Fetch should have returned before the timeout")
349		require.Equal(t, uint64(50), result.Index)
350		expected := newExpectedNodes("node3", "node4", "node5")
351		expected.Index = 50
352		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
354		req.QueryOptions.MinQueryIndex = result.Index
355	})
357	runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
358		client.QueueErr(tempError("temporary connection error"))
360		client.QueueEvents(
361			newNewSnapshotToFollowEvent(),
362			registerServiceWeb(50, 3), // overlap existing node
363			registerServiceWeb(50, 4),
364			registerServiceWeb(50, 5),
365			newEndOfSnapshotEvent(50))
367		start := time.Now()
368		req.QueryOptions.MinQueryIndex = 49
369		req.QueryOptions.MaxQueryTime = time.Second
370		result, err := store.Get(ctx, req)
371		require.NoError(t, err)
372		elapsed := time.Since(start)
373		require.True(t, elapsed < time.Second,
374			"Fetch should have returned before the timeout")
376		require.Equal(t, uint64(50), result.Index)
377		expected := newExpectedNodes("node3", "node4", "node5")
378		expected.Index = 50
379		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
380	})
383func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
384	result := &structs.IndexedCheckServiceNodes{}
385	result.QueryMeta.Backend = structs.QueryBackendStreaming
386	for _, node := range nodes {
387		result.Nodes = append(result.Nodes, structs.CheckServiceNode{
388			Node: &structs.Node{Node: node},
389		})
390	}
391	return result
394// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
395// by Node name.
396var cmpCheckServiceNodeNames = cmp.Options{
397	cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
398		return x.Node.Node == y.Node.Node
399	}),
402func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
403	t.Helper()
404	if diff := cmp.Diff(x, y, opts...); diff != "" {
405		t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
406	}
409func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
410	namespace := getNamespace("ns3")
411	client := newStreamClient(validateNamespace(namespace))
413	ctx, cancel := context.WithCancel(context.Background())
414	defer cancel()
416	store := submatview.NewStore(hclog.New(nil))
418	// Create an initial snapshot of 3 instances but in a single event batch
419	batchEv := newEventBatchWithEvents(
420		newEventServiceHealthRegister(5, 1, "web"),
421		newEventServiceHealthRegister(5, 2, "web"),
422		newEventServiceHealthRegister(5, 3, "web"))
423	client.QueueEvents(
424		batchEv,
425		newEndOfSnapshotEvent(5))
427	req := serviceRequestStub{
428		serviceRequest: serviceRequest{
429			ServiceSpecificRequest: structs.ServiceSpecificRequest{
430				Datacenter:     "dc1",
431				ServiceName:    "web",
432				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
433				QueryOptions:   structs.QueryOptions{MaxQueryTime: time.Second},
434			},
435		},
436		streamClient: client,
437	}
439	runStep(t, "full snapshot returned", func(t *testing.T) {
440		result, err := store.Get(ctx, req)
441		require.NoError(t, err)
443		require.Equal(t, uint64(5), result.Index)
445		expected := newExpectedNodes("node1", "node2", "node3")
446		expected.Index = 5
447		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
448		req.QueryOptions.MinQueryIndex = result.Index
449	})
451	runStep(t, "batched updates work too", func(t *testing.T) {
452		// Simulate multiple registrations happening in one Txn (so all have same
453		// index)
454		batchEv := newEventBatchWithEvents(
455			// Deregister an existing node
456			newEventServiceHealthDeregister(20, 1, "web"),
457			// Register another
458			newEventServiceHealthRegister(20, 4, "web"),
459		)
460		client.QueueEvents(batchEv)
461		req.QueryOptions.MaxQueryTime = time.Second
462		result, err := store.Get(ctx, req)
463		require.NoError(t, err)
465		require.Equal(t, uint64(20), result.Index)
466		expected := newExpectedNodes("node2", "node3", "node4")
467		expected.Index = 20
468		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
470		req.QueryOptions.MinQueryIndex = result.Index
471	})
474func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
475	namespace := getNamespace("ns3")
476	streamClient := newStreamClient(validateNamespace(namespace))
478	ctx, cancel := context.WithCancel(context.Background())
479	defer cancel()
481	store := submatview.NewStore(hclog.New(nil))
482	go store.Run(ctx)
484	req := serviceRequestStub{
485		serviceRequest: serviceRequest{
486			ServiceSpecificRequest: structs.ServiceSpecificRequest{
487				Datacenter:     "dc1",
488				ServiceName:    "web",
489				EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
490				QueryOptions: structs.QueryOptions{
491					Filter:       `Node.Node == "node2"`,
492					MaxQueryTime: time.Second,
493				},
494			},
495		},
496		streamClient: streamClient,
497	}
499	// Create an initial snapshot of 3 instances but in a single event batch
500	batchEv := newEventBatchWithEvents(
501		newEventServiceHealthRegister(5, 1, "web"),
502		newEventServiceHealthRegister(5, 2, "web"),
503		newEventServiceHealthRegister(5, 3, "web"))
504	streamClient.QueueEvents(
505		batchEv,
506		newEndOfSnapshotEvent(5))
508	runStep(t, "filtered snapshot returned", func(t *testing.T) {
509		result, err := store.Get(ctx, req)
510		require.NoError(t, err)
512		require.Equal(t, uint64(5), result.Index)
513		expected := newExpectedNodes("node2")
514		expected.Index = 5
515		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
517		req.QueryOptions.MinQueryIndex = result.Index
518	})
520	runStep(t, "filtered updates work too", func(t *testing.T) {
521		// Simulate multiple registrations happening in one Txn (all have same index)
522		batchEv := newEventBatchWithEvents(
523			// Deregister an existing node
524			newEventServiceHealthDeregister(20, 1, "web"),
525			// Register another
526			newEventServiceHealthRegister(20, 4, "web"),
527		)
528		streamClient.QueueEvents(batchEv)
529		result, err := store.Get(ctx, req)
530		require.NoError(t, err)
532		require.Equal(t, uint64(20), result.Index)
533		expected := newExpectedNodes("node2")
534		expected.Index = 20
535		assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
536	})
539// serviceRequestStub overrides NewMaterializer so that test can use a fake
540// StreamClient.
541type serviceRequestStub struct {
542	serviceRequest
543	streamClient submatview.StreamClient
546func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) {
547	view, err := newHealthView(r.ServiceSpecificRequest)
548	if err != nil {
549		return nil, err
550	}
551	return submatview.NewMaterializer(submatview.Deps{
552		View:    view,
553		Client:  r.streamClient,
554		Logger:  hclog.New(nil),
555		Request: newMaterializerRequest(r.ServiceSpecificRequest),
556	}), nil
559func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
560	node := fmt.Sprintf("node%d", nodeNum)
561	nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
562	addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
564	return &pbsubscribe.Event{
565		Index: index,
566		Payload: &pbsubscribe.Event_ServiceHealth{
567			ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
568				Op: pbsubscribe.CatalogOp_Register,
569				CheckServiceNode: &pbservice.CheckServiceNode{
570					Node: &pbservice.Node{
571						ID:         nodeID,
572						Node:       node,
573						Address:    addr,
574						Datacenter: "dc1",
575						RaftIndex: pbcommon.RaftIndex{
576							CreateIndex: index,
577							ModifyIndex: index,
578						},
579					},
580					Service: &pbservice.NodeService{
581						ID:      svc,
582						Service: svc,
583						Port:    8080,
584						RaftIndex: pbcommon.RaftIndex{
585							CreateIndex: index,
586							ModifyIndex: index,
587						},
588					},
589				},
590			},
591		},
592	}
595func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
596	node := fmt.Sprintf("node%d", nodeNum)
598	return &pbsubscribe.Event{
599		Index: index,
600		Payload: &pbsubscribe.Event_ServiceHealth{
601			ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
602				Op: pbsubscribe.CatalogOp_Deregister,
603				CheckServiceNode: &pbservice.CheckServiceNode{
604					Node: &pbservice.Node{
605						Node: node,
606					},
607					Service: &pbservice.NodeService{
608						ID:      svc,
609						Service: svc,
610						Port:    8080,
611						Weights: &pbservice.Weights{
612							Passing: 1,
613							Warning: 1,
614						},
615						RaftIndex: pbcommon.RaftIndex{
616							// The original insertion index since a delete doesn't update
617							// this. This magic value came from state store tests where we
618							// setup at index 10 and then mutate at index 100. It can be
619							// modified by the caller later and makes it easier than having
620							// yet another argument in the common case.
621							CreateIndex: 10,
622							ModifyIndex: 10,
623						},
624					},
625				},
626			},
627		},
628	}
631func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
632	events := make([]*pbsubscribe.Event, len(evs)+1)
633	events[0] = first
634	for i := range evs {
635		events[i+1] = evs[i]
636	}
637	return &pbsubscribe.Event{
638		Index: first.Index,
639		Payload: &pbsubscribe.Event_EventBatch{
640			EventBatch: &pbsubscribe.EventBatch{Events: events},
641		},
642	}
645func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
646	return &pbsubscribe.Event{
647		Index:   index,
648		Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
649	}
652func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
653	return &pbsubscribe.Event{
654		Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
655	}
658// getNamespace returns a namespace if namespace support exists, otherwise
659// returns the empty string. It allows the same tests to work in both oss and ent
660// without duplicating the tests.
661func getNamespace(ns string) string {
662	meta := structs.NewEnterpriseMeta(ns)
663	return meta.NamespaceOrEmpty()
666func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
667	return func(request *pbsubscribe.SubscribeRequest) error {
668		if request.Namespace != ns {
669			return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace)
670		}
671		return nil
672	}
675func runStep(t *testing.T, name string, fn func(t *testing.T)) {
676	t.Helper()
677	if !t.Run(name, fn) {
678		t.FailNow()
679	}
682func TestNewFilterEvaluator(t *testing.T) {
683	type testCase struct {
684		name     string
685		req      structs.ServiceSpecificRequest
686		data     structs.CheckServiceNode
687		expected bool
688	}
690	fn := func(t *testing.T, tc testCase) {
691		e, err := newFilterEvaluator(tc.req)
692		require.NoError(t, err)
693		actual, err := e.Evaluate(tc.data)
694		require.NoError(t, err)
695		require.Equal(t, tc.expected, actual)
696	}
698	var testCases = []testCase{
699		{
700			name: "single ServiceTags match",
701			req: structs.ServiceSpecificRequest{
702				ServiceTags: []string{"match"},
703				TagFilter:   true,
704			},
705			data: structs.CheckServiceNode{
706				Service: &structs.NodeService{
707					Tags: []string{"extra", "match"},
708				},
709			},
710			expected: true,
711		},
712		{
713			name: "single deprecated ServiceTag match",
714			req: structs.ServiceSpecificRequest{
715				ServiceTag: "match",
716				TagFilter:  true,
717			},
718			data: structs.CheckServiceNode{
719				Service: &structs.NodeService{
720					Tags: []string{"extra", "match"},
721				},
722			},
723			expected: true,
724		},
725		{
726			name: "single ServiceTags mismatch",
727			req: structs.ServiceSpecificRequest{
728				ServiceTags: []string{"other"},
729				TagFilter:   true,
730			},
731			data: structs.CheckServiceNode{
732				Service: &structs.NodeService{
733					Tags: []string{"extra", "match"},
734				},
735			},
736			expected: false,
737		},
738		{
739			name: "multiple ServiceTags match",
740			req: structs.ServiceSpecificRequest{
741				ServiceTags: []string{"match", "second"},
742				TagFilter:   true,
743			},
744			data: structs.CheckServiceNode{
745				Service: &structs.NodeService{
746					Tags: []string{"extra", "match", "second"},
747				},
748			},
749			expected: true,
750		},
751		{
752			name: "multiple ServiceTags mismatch",
753			req: structs.ServiceSpecificRequest{
754				ServiceTags: []string{"match", "not"},
755				TagFilter:   true,
756			},
757			data: structs.CheckServiceNode{
758				Service: &structs.NodeService{
759					Tags: []string{"extra", "match"},
760				},
761			},
762			expected: false,
763		},
764		{
765			name: "single NodeMetaFilter match",
766			req: structs.ServiceSpecificRequest{
767				NodeMetaFilters: map[string]string{"meta1": "match"},
768			},
769			data: structs.CheckServiceNode{
770				Node: &structs.Node{
771					Meta: map[string]string{
772						"meta1": "match",
773						"extra": "some",
774					},
775				},
776			},
777			expected: true,
778		},
779		{
780			name: "single NodeMetaFilter mismatch",
781			req: structs.ServiceSpecificRequest{
782				NodeMetaFilters: map[string]string{
783					"meta1": "match",
784				},
785			},
786			data: structs.CheckServiceNode{
787				Node: &structs.Node{
788					Meta: map[string]string{
789						"meta1": "other",
790						"extra": "some",
791					},
792				},
793			},
794			expected: false,
795		},
796		{
797			name: "multiple NodeMetaFilter match",
798			req: structs.ServiceSpecificRequest{
799				NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
800			},
801			data: structs.CheckServiceNode{
802				Node: &structs.Node{
803					Meta: map[string]string{
804						"meta1": "match",
805						"meta2": "a",
806						"extra": "some",
807					},
808				},
809			},
810			expected: true,
811		},
812		{
813			name: "multiple NodeMetaFilter mismatch",
814			req: structs.ServiceSpecificRequest{
815				NodeMetaFilters: map[string]string{
816					"meta1": "match",
817					"meta2": "beta",
818				},
819			},
820			data: structs.CheckServiceNode{
821				Node: &structs.Node{
822					Meta: map[string]string{
823						"meta1": "other",
824						"meta2": "gamma",
825					},
826				},
827			},
828			expected: false,
829		},
830		{
831			name: "QueryOptions.Filter match",
832			req: structs.ServiceSpecificRequest{
833				QueryOptions: structs.QueryOptions{
834					Filter: `Node.Node == "node3"`,
835				},
836			},
837			data: structs.CheckServiceNode{
838				Node: &structs.Node{Node: "node3"},
839			},
840			expected: true,
841		},
842		{
843			name: "QueryOptions.Filter mismatch",
844			req: structs.ServiceSpecificRequest{
845				QueryOptions: structs.QueryOptions{
846					Filter: `Node.Node == "node2"`,
847				},
848			},
849			data: structs.CheckServiceNode{
850				Node: &structs.Node{Node: "node3"},
851			},
852			expected: false,
853		},
854		{
855			name: "all match",
856			req: structs.ServiceSpecificRequest{
857				QueryOptions: structs.QueryOptions{
858					Filter: `Node.Node == "node3"`,
859				},
860				ServiceTags: []string{"tag1", "tag2"},
861				NodeMetaFilters: map[string]string{
862					"meta1": "match1",
863					"meta2": "match2",
864				},
865			},
866			data: structs.CheckServiceNode{
867				Node: &structs.Node{
868					Node: "node3",
869					Meta: map[string]string{
870						"meta1": "match1",
871						"meta2": "match2",
872						"extra": "other",
873					},
874				},
875				Service: &structs.NodeService{
876					Tags: []string{"tag1", "tag2", "extra"},
877				},
878			},
879			expected: true,
880		},
881	}
883	for _, tc := range testCases {
884		t.Run(tc.name, func(t *testing.T) {
885			fn(t, tc)
886		})
887	}