1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package targets 5 6import ( 7 "context" 8 "sort" 9 10 "github.com/pkg/errors" 11 "github.com/prometheus/prometheus/storage" 12 "github.com/thanos-io/thanos/pkg/store/storepb" 13 "github.com/thanos-io/thanos/pkg/targets/targetspb" 14) 15 16var _ UnaryClient = &GRPCClient{} 17 18// UnaryClient is gRPC targetspb.Targets client which expands streaming targets API. Useful for consumers that does not 19// support streaming. 20type UnaryClient interface { 21 Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) 22} 23 24// GRPCClient allows to retrieve targets from local gRPC streaming server implementation. 25// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. 26type GRPCClient struct { 27 proxy targetspb.TargetsServer 28 29 replicaLabels map[string]struct{} 30} 31 32func NewGRPCClient(ts targetspb.TargetsServer) *GRPCClient { 33 return NewGRPCClientWithDedup(ts, nil) 34} 35 36func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string) *GRPCClient { 37 c := &GRPCClient{ 38 proxy: ts, 39 replicaLabels: map[string]struct{}{}, 40 } 41 42 for _, label := range replicaLabels { 43 c.replicaLabels[label] = struct{}{} 44 } 45 return c 46} 47 48func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) { 49 resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{ 50 ActiveTargets: make([]*targetspb.ActiveTarget, 0), 51 DroppedTargets: make([]*targetspb.DroppedTarget, 0), 52 }} 53 54 if err := rr.proxy.Targets(req, resp); err != nil { 55 return nil, nil, errors.Wrap(err, "proxy Targets") 56 } 57 58 resp.targets = dedupTargets(resp.targets, rr.replicaLabels) 59 60 return resp.targets, resp.warnings, nil 61} 62 63// dedupTargets re-sorts the set so that the same target with different replica 64// labels are coming right after each other. 65func dedupTargets(targets *targetspb.TargetDiscovery, replicaLabels map[string]struct{}) *targetspb.TargetDiscovery { 66 if targets == nil { 67 return nil 68 } 69 70 targets.ActiveTargets = dedupActiveTargets(targets.ActiveTargets, replicaLabels) 71 targets.DroppedTargets = dedupDroppedTargets(targets.DroppedTargets, replicaLabels) 72 73 return targets 74} 75 76func dedupDroppedTargets(droppedTargets []*targetspb.DroppedTarget, replicaLabels map[string]struct{}) []*targetspb.DroppedTarget { 77 if len(droppedTargets) == 0 { 78 return droppedTargets 79 } 80 81 // Sort each target's label names such that they are comparable. 82 for _, t := range droppedTargets { 83 sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool { 84 return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name 85 }) 86 } 87 88 // Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values. 89 sort.Slice(droppedTargets, func(i, j int) bool { 90 return droppedTargets[i].Compare(droppedTargets[j]) < 0 91 }) 92 93 // Remove targets based on synthesized deduplication labels, this time ignoring replica labels 94 i := 0 95 droppedTargets[i].DiscoveredLabels.Labels = removeReplicaLabels( 96 droppedTargets[i].DiscoveredLabels.Labels, 97 replicaLabels, 98 ) 99 for j := 1; j < len(droppedTargets); j++ { 100 droppedTargets[j].DiscoveredLabels.Labels = removeReplicaLabels( 101 droppedTargets[j].DiscoveredLabels.Labels, 102 replicaLabels, 103 ) 104 if droppedTargets[i].Compare(droppedTargets[j]) != 0 { 105 // Effectively retain targets[j] in the resulting slice. 106 i++ 107 droppedTargets[i] = droppedTargets[j] 108 continue 109 } 110 } 111 112 return droppedTargets[:i+1] 113} 114 115func dedupActiveTargets(activeTargets []*targetspb.ActiveTarget, replicaLabels map[string]struct{}) []*targetspb.ActiveTarget { 116 if len(activeTargets) == 0 { 117 return activeTargets 118 } 119 120 // Sort each target's label names such that they are comparable. 121 for _, t := range activeTargets { 122 sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool { 123 return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name 124 }) 125 } 126 127 // Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values. 128 sort.Slice(activeTargets, func(i, j int) bool { 129 return activeTargets[i].Compare(activeTargets[j]) < 0 130 }) 131 132 // Remove targets based on synthesized deduplication labels, this time ignoring replica labels and last scrape. 133 i := 0 134 activeTargets[i].DiscoveredLabels.Labels = removeReplicaLabels( 135 activeTargets[i].DiscoveredLabels.Labels, 136 replicaLabels, 137 ) 138 activeTargets[i].Labels.Labels = removeReplicaLabels( 139 activeTargets[i].Labels.Labels, 140 replicaLabels, 141 ) 142 for j := 1; j < len(activeTargets); j++ { 143 activeTargets[j].DiscoveredLabels.Labels = removeReplicaLabels( 144 activeTargets[j].DiscoveredLabels.Labels, 145 replicaLabels, 146 ) 147 activeTargets[j].Labels.Labels = removeReplicaLabels( 148 activeTargets[j].Labels.Labels, 149 replicaLabels, 150 ) 151 152 if activeTargets[i].Compare(activeTargets[j]) != 0 { 153 // Effectively retain targets[j] in the resulting slice. 154 i++ 155 activeTargets[i] = activeTargets[j] 156 continue 157 } 158 159 if activeTargets[i].CompareState(activeTargets[j]) <= 0 { 160 continue 161 } 162 163 // Swap if we found a younger target. 164 activeTargets[i] = activeTargets[j] 165 } 166 167 return activeTargets[:i+1] 168} 169 170func removeReplicaLabels(labels []storepb.Label, replicaLabels map[string]struct{}) []storepb.Label { 171 newLabels := make([]storepb.Label, 0, len(labels)) 172 for _, l := range labels { 173 if _, ok := replicaLabels[l.Name]; !ok { 174 newLabels = append(newLabels, l) 175 } 176 } 177 178 return newLabels 179} 180 181type targetsServer struct { 182 // This field just exist to pseudo-implement the unused methods of the interface. 183 targetspb.Targets_TargetsServer 184 ctx context.Context 185 186 warnings []error 187 targets *targetspb.TargetDiscovery 188} 189 190func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error { 191 if res.GetWarning() != "" { 192 srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) 193 return nil 194 } 195 196 if res.GetTargets() == nil { 197 return errors.New("no targets") 198 } 199 srv.targets.ActiveTargets = append(srv.targets.ActiveTargets, res.GetTargets().ActiveTargets...) 200 srv.targets.DroppedTargets = append(srv.targets.DroppedTargets, res.GetTargets().DroppedTargets...) 201 202 return nil 203} 204 205func (srv *targetsServer) Context() context.Context { 206 return srv.ctx 207} 208