1// Copyright 2018 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package adapter
15
16// NOTE: you do not need to edit this file when implementing a custom sd.
17import (
18	"context"
19	"encoding/json"
20	"fmt"
21	"io/ioutil"
22	"os"
23	"path/filepath"
24	"reflect"
25	"sort"
26
27	"github.com/go-kit/kit/log"
28	"github.com/go-kit/kit/log/level"
29	"github.com/prometheus/common/model"
30	"github.com/prometheus/prometheus/discovery"
31	"github.com/prometheus/prometheus/discovery/targetgroup"
32)
33
34type customSD struct {
35	Targets []string          `json:"targets"`
36	Labels  map[string]string `json:"labels"`
37}
38
39func fingerprint(group *targetgroup.Group) model.Fingerprint {
40	groupFingerprint := model.LabelSet{}.Fingerprint()
41	for _, targets := range group.Targets {
42		groupFingerprint ^= targets.Fingerprint()
43	}
44	groupFingerprint ^= group.Labels.Fingerprint()
45	return groupFingerprint
46}
47
48// Adapter runs an unknown service discovery implementation and converts its target groups
49// to JSON and writes to a file for file_sd.
50type Adapter struct {
51	ctx     context.Context
52	disc    discovery.Discoverer
53	groups  map[string]*customSD
54	manager *discovery.Manager
55	output  string
56	name    string
57	logger  log.Logger
58}
59
60func mapToArray(m map[string]*customSD) []customSD {
61	arr := make([]customSD, 0, len(m))
62	for _, v := range m {
63		arr = append(arr, *v)
64	}
65	return arr
66}
67
68func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD {
69	groups := make(map[string]*customSD)
70	for k, sdTargetGroups := range allTargetGroups {
71		for _, group := range sdTargetGroups {
72			newTargets := make([]string, 0)
73			newLabels := make(map[string]string)
74
75			for _, targets := range group.Targets {
76				for _, target := range targets {
77					newTargets = append(newTargets, string(target))
78				}
79			}
80			sort.Strings(newTargets)
81			for name, value := range group.Labels {
82				newLabels[string(name)] = string(value)
83			}
84
85			sdGroup := customSD{
86
87				Targets: newTargets,
88				Labels:  newLabels,
89			}
90			// Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique.
91			groupFingerprint := fingerprint(group)
92			key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String())
93			groups[key] = &sdGroup
94		}
95	}
96
97	return groups
98}
99
100// Parses incoming target groups updates. If the update contains changes to the target groups
101// Adapter already knows about, or new target groups, we Marshal to JSON and write to file.
102func (a *Adapter) refreshTargetGroups(allTargetGroups map[string][]*targetgroup.Group) {
103	tempGroups := generateTargetGroups(allTargetGroups)
104
105	if !reflect.DeepEqual(a.groups, tempGroups) {
106		a.groups = tempGroups
107		err := a.writeOutput()
108		if err != nil {
109			level.Error(log.With(a.logger, "component", "sd-adapter")).Log("err", err)
110		}
111	}
112}
113
114// Writes JSON formatted targets to output file.
115func (a *Adapter) writeOutput() error {
116	arr := mapToArray(a.groups)
117	b, _ := json.MarshalIndent(arr, "", "    ")
118
119	dir, _ := filepath.Split(a.output)
120	tmpfile, err := ioutil.TempFile(dir, "sd-adapter")
121	if err != nil {
122		return err
123	}
124	defer tmpfile.Close()
125
126	_, err = tmpfile.Write(b)
127	if err != nil {
128		return err
129	}
130
131	// Close the file immediately for platforms (eg. Windows) that cannot move
132	// a file while a process is holding a file handle.
133	tmpfile.Close()
134	err = os.Rename(tmpfile.Name(), a.output)
135	if err != nil {
136		return err
137	}
138	return nil
139}
140
141func (a *Adapter) runCustomSD(ctx context.Context) {
142	updates := a.manager.SyncCh()
143	for {
144		select {
145		case <-ctx.Done():
146		case allTargetGroups, ok := <-updates:
147			// Handle the case that a target provider exits and closes the channel
148			// before the context is done.
149			if !ok {
150				return
151			}
152			a.refreshTargetGroups(allTargetGroups)
153		}
154	}
155}
156
157// Run starts a Discovery Manager and the custom service discovery implementation.
158func (a *Adapter) Run() {
159	go a.manager.Run()
160	a.manager.StartCustomProvider(a.ctx, a.name, a.disc)
161	go a.runCustomSD(a.ctx)
162}
163
164// NewAdapter creates a new instance of Adapter.
165func NewAdapter(ctx context.Context, file string, name string, d discovery.Discoverer, logger log.Logger) *Adapter {
166	return &Adapter{
167		ctx:     ctx,
168		disc:    d,
169		groups:  make(map[string]*customSD),
170		manager: discovery.NewManager(ctx, logger),
171		output:  file,
172		name:    name,
173		logger:  logger,
174	}
175}
176