1// Copyright 2018 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package adapter 15 16// NOTE: you do not need to edit this file when implementing a custom sd. 17import ( 18 "context" 19 "encoding/json" 20 "fmt" 21 "io/ioutil" 22 "os" 23 "path/filepath" 24 "reflect" 25 "sort" 26 27 "github.com/go-kit/log" 28 "github.com/go-kit/log/level" 29 "github.com/prometheus/common/model" 30 31 "github.com/prometheus/prometheus/discovery" 32 "github.com/prometheus/prometheus/discovery/targetgroup" 33) 34 35type customSD struct { 36 Targets []string `json:"targets"` 37 Labels map[string]string `json:"labels"` 38} 39 40func fingerprint(group *targetgroup.Group) model.Fingerprint { 41 groupFingerprint := model.LabelSet{}.Fingerprint() 42 for _, targets := range group.Targets { 43 groupFingerprint ^= targets.Fingerprint() 44 } 45 groupFingerprint ^= group.Labels.Fingerprint() 46 return groupFingerprint 47} 48 49// Adapter runs an unknown service discovery implementation and converts its target groups 50// to JSON and writes to a file for file_sd. 51type Adapter struct { 52 ctx context.Context 53 disc discovery.Discoverer 54 groups map[string]*customSD 55 manager *discovery.Manager 56 output string 57 name string 58 logger log.Logger 59} 60 61func mapToArray(m map[string]*customSD) []customSD { 62 arr := make([]customSD, 0, len(m)) 63 for _, v := range m { 64 arr = append(arr, *v) 65 } 66 return arr 67} 68 69func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD { 70 groups := make(map[string]*customSD) 71 for k, sdTargetGroups := range allTargetGroups { 72 for _, group := range sdTargetGroups { 73 newTargets := make([]string, 0) 74 newLabels := make(map[string]string) 75 76 for _, targets := range group.Targets { 77 for _, target := range targets { 78 newTargets = append(newTargets, string(target)) 79 } 80 } 81 sort.Strings(newTargets) 82 for name, value := range group.Labels { 83 newLabels[string(name)] = string(value) 84 } 85 86 sdGroup := customSD{ 87 88 Targets: newTargets, 89 Labels: newLabels, 90 } 91 // Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique. 92 groupFingerprint := fingerprint(group) 93 key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String()) 94 groups[key] = &sdGroup 95 } 96 } 97 98 return groups 99} 100 101// Parses incoming target groups updates. If the update contains changes to the target groups 102// Adapter already knows about, or new target groups, we Marshal to JSON and write to file. 103func (a *Adapter) refreshTargetGroups(allTargetGroups map[string][]*targetgroup.Group) { 104 tempGroups := generateTargetGroups(allTargetGroups) 105 106 if !reflect.DeepEqual(a.groups, tempGroups) { 107 a.groups = tempGroups 108 err := a.writeOutput() 109 if err != nil { 110 level.Error(log.With(a.logger, "component", "sd-adapter")).Log("err", err) 111 } 112 } 113} 114 115// Writes JSON formatted targets to output file. 116func (a *Adapter) writeOutput() error { 117 arr := mapToArray(a.groups) 118 b, _ := json.MarshalIndent(arr, "", " ") 119 120 dir, _ := filepath.Split(a.output) 121 tmpfile, err := ioutil.TempFile(dir, "sd-adapter") 122 if err != nil { 123 return err 124 } 125 defer tmpfile.Close() 126 127 _, err = tmpfile.Write(b) 128 if err != nil { 129 return err 130 } 131 132 // Close the file immediately for platforms (eg. Windows) that cannot move 133 // a file while a process is holding a file handle. 134 tmpfile.Close() 135 err = os.Rename(tmpfile.Name(), a.output) 136 if err != nil { 137 return err 138 } 139 return nil 140} 141 142func (a *Adapter) runCustomSD(ctx context.Context) { 143 updates := a.manager.SyncCh() 144 for { 145 select { 146 case <-ctx.Done(): 147 case allTargetGroups, ok := <-updates: 148 // Handle the case that a target provider exits and closes the channel 149 // before the context is done. 150 if !ok { 151 return 152 } 153 a.refreshTargetGroups(allTargetGroups) 154 } 155 } 156} 157 158// Run starts a Discovery Manager and the custom service discovery implementation. 159func (a *Adapter) Run() { 160 go a.manager.Run() 161 a.manager.StartCustomProvider(a.ctx, a.name, a.disc) 162 go a.runCustomSD(a.ctx) 163} 164 165// NewAdapter creates a new instance of Adapter. 166func NewAdapter(ctx context.Context, file string, name string, d discovery.Discoverer, logger log.Logger) *Adapter { 167 return &Adapter{ 168 ctx: ctx, 169 disc: d, 170 groups: make(map[string]*customSD), 171 manager: discovery.NewManager(ctx, logger), 172 output: file, 173 name: name, 174 logger: logger, 175 } 176} 177