1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package targets
5
6import (
7	"context"
8	"sort"
9
10	"github.com/pkg/errors"
11	"github.com/prometheus/prometheus/storage"
12	"github.com/thanos-io/thanos/pkg/store/storepb"
13	"github.com/thanos-io/thanos/pkg/targets/targetspb"
14)
15
16var _ UnaryClient = &GRPCClient{}
17
18// UnaryClient is gRPC targetspb.Targets client which expands streaming targets API. Useful for consumers that does not
19// support streaming.
20type UnaryClient interface {
21	Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error)
22}
23
24// GRPCClient allows to retrieve targets from local gRPC streaming server implementation.
25// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available.
26type GRPCClient struct {
27	proxy targetspb.TargetsServer
28
29	replicaLabels map[string]struct{}
30}
31
32func NewGRPCClient(ts targetspb.TargetsServer) *GRPCClient {
33	return NewGRPCClientWithDedup(ts, nil)
34}
35
36func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string) *GRPCClient {
37	c := &GRPCClient{
38		proxy:         ts,
39		replicaLabels: map[string]struct{}{},
40	}
41
42	for _, label := range replicaLabels {
43		c.replicaLabels[label] = struct{}{}
44	}
45	return c
46}
47
48func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) {
49	resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{
50		ActiveTargets:  make([]*targetspb.ActiveTarget, 0),
51		DroppedTargets: make([]*targetspb.DroppedTarget, 0),
52	}}
53
54	if err := rr.proxy.Targets(req, resp); err != nil {
55		return nil, nil, errors.Wrap(err, "proxy Targets")
56	}
57
58	resp.targets = dedupTargets(resp.targets, rr.replicaLabels)
59
60	return resp.targets, resp.warnings, nil
61}
62
63// dedupTargets re-sorts the set so that the same target with different replica
64// labels are coming right after each other.
65func dedupTargets(targets *targetspb.TargetDiscovery, replicaLabels map[string]struct{}) *targetspb.TargetDiscovery {
66	if targets == nil {
67		return nil
68	}
69
70	targets.ActiveTargets = dedupActiveTargets(targets.ActiveTargets, replicaLabels)
71	targets.DroppedTargets = dedupDroppedTargets(targets.DroppedTargets, replicaLabels)
72
73	return targets
74}
75
76func dedupDroppedTargets(droppedTargets []*targetspb.DroppedTarget, replicaLabels map[string]struct{}) []*targetspb.DroppedTarget {
77	if len(droppedTargets) == 0 {
78		return droppedTargets
79	}
80
81	// Sort each target's label names such that they are comparable.
82	for _, t := range droppedTargets {
83		sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool {
84			return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name
85		})
86	}
87
88	// Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values.
89	sort.Slice(droppedTargets, func(i, j int) bool {
90		return droppedTargets[i].Compare(droppedTargets[j]) < 0
91	})
92
93	// Remove targets based on synthesized deduplication labels, this time ignoring replica labels
94	i := 0
95	droppedTargets[i].DiscoveredLabels.Labels = removeReplicaLabels(
96		droppedTargets[i].DiscoveredLabels.Labels,
97		replicaLabels,
98	)
99	for j := 1; j < len(droppedTargets); j++ {
100		droppedTargets[j].DiscoveredLabels.Labels = removeReplicaLabels(
101			droppedTargets[j].DiscoveredLabels.Labels,
102			replicaLabels,
103		)
104		if droppedTargets[i].Compare(droppedTargets[j]) != 0 {
105			// Effectively retain targets[j] in the resulting slice.
106			i++
107			droppedTargets[i] = droppedTargets[j]
108			continue
109		}
110	}
111
112	return droppedTargets[:i+1]
113}
114
115func dedupActiveTargets(activeTargets []*targetspb.ActiveTarget, replicaLabels map[string]struct{}) []*targetspb.ActiveTarget {
116	if len(activeTargets) == 0 {
117		return activeTargets
118	}
119
120	// Sort each target's label names such that they are comparable.
121	for _, t := range activeTargets {
122		sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool {
123			return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name
124		})
125	}
126
127	// Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values.
128	sort.Slice(activeTargets, func(i, j int) bool {
129		return activeTargets[i].Compare(activeTargets[j]) < 0
130	})
131
132	// Remove targets based on synthesized deduplication labels, this time ignoring replica labels and last scrape.
133	i := 0
134	activeTargets[i].DiscoveredLabels.Labels = removeReplicaLabels(
135		activeTargets[i].DiscoveredLabels.Labels,
136		replicaLabels,
137	)
138	activeTargets[i].Labels.Labels = removeReplicaLabels(
139		activeTargets[i].Labels.Labels,
140		replicaLabels,
141	)
142	for j := 1; j < len(activeTargets); j++ {
143		activeTargets[j].DiscoveredLabels.Labels = removeReplicaLabels(
144			activeTargets[j].DiscoveredLabels.Labels,
145			replicaLabels,
146		)
147		activeTargets[j].Labels.Labels = removeReplicaLabels(
148			activeTargets[j].Labels.Labels,
149			replicaLabels,
150		)
151
152		if activeTargets[i].Compare(activeTargets[j]) != 0 {
153			// Effectively retain targets[j] in the resulting slice.
154			i++
155			activeTargets[i] = activeTargets[j]
156			continue
157		}
158
159		if activeTargets[i].CompareState(activeTargets[j]) <= 0 {
160			continue
161		}
162
163		// Swap if we found a younger target.
164		activeTargets[i] = activeTargets[j]
165	}
166
167	return activeTargets[:i+1]
168}
169
170func removeReplicaLabels(labels []storepb.Label, replicaLabels map[string]struct{}) []storepb.Label {
171	newLabels := make([]storepb.Label, 0, len(labels))
172	for _, l := range labels {
173		if _, ok := replicaLabels[l.Name]; !ok {
174			newLabels = append(newLabels, l)
175		}
176	}
177
178	return newLabels
179}
180
181type targetsServer struct {
182	// This field just exist to pseudo-implement the unused methods of the interface.
183	targetspb.Targets_TargetsServer
184	ctx context.Context
185
186	warnings []error
187	targets  *targetspb.TargetDiscovery
188}
189
190func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error {
191	if res.GetWarning() != "" {
192		srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
193		return nil
194	}
195
196	if res.GetTargets() == nil {
197		return errors.New("no targets")
198	}
199	srv.targets.ActiveTargets = append(srv.targets.ActiveTargets, res.GetTargets().ActiveTargets...)
200	srv.targets.DroppedTargets = append(srv.targets.DroppedTargets, res.GetTargets().DroppedTargets...)
201
202	return nil
203}
204
205func (srv *targetsServer) Context() context.Context {
206	return srv.ctx
207}
208