1// Copyright 2018 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package adapter
15
16// NOTE: you do not need to edit this file when implementing a custom sd.
17import (
18	"context"
19	"encoding/json"
20	"fmt"
21	"io/ioutil"
22	"os"
23	"path/filepath"
24	"reflect"
25	"sort"
26
27	"github.com/go-kit/log"
28	"github.com/go-kit/log/level"
29	"github.com/prometheus/common/model"
30
31	"github.com/prometheus/prometheus/discovery"
32	"github.com/prometheus/prometheus/discovery/targetgroup"
33)
34
35type customSD struct {
36	Targets []string          `json:"targets"`
37	Labels  map[string]string `json:"labels"`
38}
39
40func fingerprint(group *targetgroup.Group) model.Fingerprint {
41	groupFingerprint := model.LabelSet{}.Fingerprint()
42	for _, targets := range group.Targets {
43		groupFingerprint ^= targets.Fingerprint()
44	}
45	groupFingerprint ^= group.Labels.Fingerprint()
46	return groupFingerprint
47}
48
49// Adapter runs an unknown service discovery implementation and converts its target groups
50// to JSON and writes to a file for file_sd.
51type Adapter struct {
52	ctx     context.Context
53	disc    discovery.Discoverer
54	groups  map[string]*customSD
55	manager *discovery.Manager
56	output  string
57	name    string
58	logger  log.Logger
59}
60
61func mapToArray(m map[string]*customSD) []customSD {
62	arr := make([]customSD, 0, len(m))
63	for _, v := range m {
64		arr = append(arr, *v)
65	}
66	return arr
67}
68
69func generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) map[string]*customSD {
70	groups := make(map[string]*customSD)
71	for k, sdTargetGroups := range allTargetGroups {
72		for _, group := range sdTargetGroups {
73			newTargets := make([]string, 0)
74			newLabels := make(map[string]string)
75
76			for _, targets := range group.Targets {
77				for _, target := range targets {
78					newTargets = append(newTargets, string(target))
79				}
80			}
81			sort.Strings(newTargets)
82			for name, value := range group.Labels {
83				newLabels[string(name)] = string(value)
84			}
85
86			sdGroup := customSD{
87
88				Targets: newTargets,
89				Labels:  newLabels,
90			}
91			// Make a unique key, including group's fingerprint, in case the sd_type (map key) and group.Source is not unique.
92			groupFingerprint := fingerprint(group)
93			key := fmt.Sprintf("%s:%s:%s", k, group.Source, groupFingerprint.String())
94			groups[key] = &sdGroup
95		}
96	}
97
98	return groups
99}
100
101// Parses incoming target groups updates. If the update contains changes to the target groups
102// Adapter already knows about, or new target groups, we Marshal to JSON and write to file.
103func (a *Adapter) refreshTargetGroups(allTargetGroups map[string][]*targetgroup.Group) {
104	tempGroups := generateTargetGroups(allTargetGroups)
105
106	if !reflect.DeepEqual(a.groups, tempGroups) {
107		a.groups = tempGroups
108		err := a.writeOutput()
109		if err != nil {
110			level.Error(log.With(a.logger, "component", "sd-adapter")).Log("err", err)
111		}
112	}
113}
114
115// Writes JSON formatted targets to output file.
116func (a *Adapter) writeOutput() error {
117	arr := mapToArray(a.groups)
118	b, _ := json.MarshalIndent(arr, "", "    ")
119
120	dir, _ := filepath.Split(a.output)
121	tmpfile, err := ioutil.TempFile(dir, "sd-adapter")
122	if err != nil {
123		return err
124	}
125	defer tmpfile.Close()
126
127	_, err = tmpfile.Write(b)
128	if err != nil {
129		return err
130	}
131
132	// Close the file immediately for platforms (eg. Windows) that cannot move
133	// a file while a process is holding a file handle.
134	tmpfile.Close()
135	err = os.Rename(tmpfile.Name(), a.output)
136	if err != nil {
137		return err
138	}
139	return nil
140}
141
142func (a *Adapter) runCustomSD(ctx context.Context) {
143	updates := a.manager.SyncCh()
144	for {
145		select {
146		case <-ctx.Done():
147		case allTargetGroups, ok := <-updates:
148			// Handle the case that a target provider exits and closes the channel
149			// before the context is done.
150			if !ok {
151				return
152			}
153			a.refreshTargetGroups(allTargetGroups)
154		}
155	}
156}
157
158// Run starts a Discovery Manager and the custom service discovery implementation.
159func (a *Adapter) Run() {
160	go a.manager.Run()
161	a.manager.StartCustomProvider(a.ctx, a.name, a.disc)
162	go a.runCustomSD(a.ctx)
163}
164
165// NewAdapter creates a new instance of Adapter.
166func NewAdapter(ctx context.Context, file string, name string, d discovery.Discoverer, logger log.Logger) *Adapter {
167	return &Adapter{
168		ctx:     ctx,
169		disc:    d,
170		groups:  make(map[string]*customSD),
171		manager: discovery.NewManager(ctx, logger),
172		output:  file,
173		name:    name,
174		logger:  logger,
175	}
176}
177