1// Copyright 2018 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package adapter 15 16// NOTE: you do not need to edit this file when implementing a custom sd. 17import ( 18 "context" 19 "encoding/json" 20 "fmt" 21 "io/ioutil" 22 "os" 23 "path/filepath" 24 "reflect" 25 "sort" 26 27 "github.com/go-kit/kit/log" 28 "github.com/go-kit/kit/log/level" 29 "github.com/prometheus/common/model" 30 "github.com/prometheus/prometheus/discovery" 31 "github.com/prometheus/prometheus/discovery/targetgroup" 32) 33 34type customSD struct { 35 Targets []string `json:"targets"` 36 Labels map[string]string `json:"labels"` 37} 38 39func fingerprint(group *targetgroup.Group) model.Fingerprint { 40 groupFingerprint := model.LabelSet{}.Fingerprint() 41 for _, targets := range group.Targets { 42 groupFingerprint ^= targets.Fingerprint() 43 } 44 groupFingerprint ^= group.Labels.Fingerprint() 45 return groupFingerprint 46} 47 48// Adapter runs an unknown service discovery implementation and converts its target groups 49// to JSON and writes to a file for file_sd. 50type Adapter struct { 51 ctx context.Context 52 disc discovery.Discoverer 53 groups map[string]*customSD 54 manager *discovery.Manager 55 output string 56 name string 57 logger log.Logger 58} 59 60func mapToArray(m map[string]*customSD) []customSD { 61 arr := make([]customSD, 0, len(m)) 62 for _, v := range m { 63 arr = append(arr, *v) 64 } 65 return arr 66} 67 68func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD { 69 groups := make(map[string]*customSD) 70 for k, sdTargetGroups := range allTargetGroups { 71 for _, group := range sdTargetGroups { 72 newTargets := make([]string, 0) 73 newLabels := make(map[string]string) 74 75 for _, targets := range group.Targets { 76 for _, target := range targets { 77 newTargets = append(newTargets, string(target)) 78 } 79 } 80 sort.Strings(newTargets) 81 for name, value := range group.Labels { 82 newLabels[string(name)] = string(value) 83 } 84 85 sdGroup := customSD{ 86 87 Targets: newTargets, 88 Labels: newLabels, 89 } 90 // Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique. 91 groupFingerprint := fingerprint(group) 92 key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String()) 93 groups[key] = &sdGroup 94 } 95 } 96 97 return groups 98} 99 100// Parses incoming target groups updates. If the update contains changes to the target groups 101// Adapter already knows about, or new target groups, we Marshal to JSON and write to file. 102func (a *Adapter) refreshTargetGroups(allTargetGroups map[string][]*targetgroup.Group) { 103 tempGroups := generateTargetGroups(allTargetGroups) 104 105 if !reflect.DeepEqual(a.groups, tempGroups) { 106 a.groups = tempGroups 107 err := a.writeOutput() 108 if err != nil { 109 level.Error(log.With(a.logger, "component", "sd-adapter")).Log("err", err) 110 } 111 } 112} 113 114// Writes JSON formatted targets to output file. 115func (a *Adapter) writeOutput() error { 116 arr := mapToArray(a.groups) 117 b, _ := json.MarshalIndent(arr, "", " ") 118 119 dir, _ := filepath.Split(a.output) 120 tmpfile, err := ioutil.TempFile(dir, "sd-adapter") 121 if err != nil { 122 return err 123 } 124 defer tmpfile.Close() 125 126 _, err = tmpfile.Write(b) 127 if err != nil { 128 return err 129 } 130 131 // Close the file immediately for platforms (eg. Windows) that cannot move 132 // a file while a process is holding a file handle. 133 tmpfile.Close() 134 err = os.Rename(tmpfile.Name(), a.output) 135 if err != nil { 136 return err 137 } 138 return nil 139} 140 141func (a *Adapter) runCustomSD(ctx context.Context) { 142 updates := a.manager.SyncCh() 143 for { 144 select { 145 case <-ctx.Done(): 146 case allTargetGroups, ok := <-updates: 147 // Handle the case that a target provider exits and closes the channel 148 // before the context is done. 149 if !ok { 150 return 151 } 152 a.refreshTargetGroups(allTargetGroups) 153 } 154 } 155} 156 157// Run starts a Discovery Manager and the custom service discovery implementation. 158func (a *Adapter) Run() { 159 go a.manager.Run() 160 a.manager.StartCustomProvider(a.ctx, a.name, a.disc) 161 go a.runCustomSD(a.ctx) 162} 163 164// NewAdapter creates a new instance of Adapter. 165func NewAdapter(ctx context.Context, file string, name string, d discovery.Discoverer, logger log.Logger) *Adapter { 166 return &Adapter{ 167 ctx: ctx, 168 disc: d, 169 groups: make(map[string]*customSD), 170 manager: discovery.NewManager(ctx, logger), 171 output: file, 172 name: name, 173 logger: logger, 174 } 175} 176