1package lokipush
2
3import (
4	"errors"
5	"fmt"
6	"strings"
7
8	"github.com/go-kit/kit/log"
9	"github.com/go-kit/kit/log/level"
10	"github.com/prometheus/client_golang/prometheus"
11
12	"github.com/grafana/loki/clients/pkg/logentry/stages"
13	"github.com/grafana/loki/clients/pkg/promtail/api"
14	"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
15	"github.com/grafana/loki/clients/pkg/promtail/targets/target"
16)
17
18// PushTargetManager manages a series of PushTargets.
19type PushTargetManager struct {
20	logger  log.Logger
21	targets map[string]*PushTarget
22}
23
24// NewPushTargetManager creates a new PushTargetManager.
25func NewPushTargetManager(
26	reg prometheus.Registerer,
27	logger log.Logger,
28	client api.EntryHandler,
29	scrapeConfigs []scrapeconfig.Config,
30) (*PushTargetManager, error) {
31
32	tm := &PushTargetManager{
33		logger:  logger,
34		targets: make(map[string]*PushTarget),
35	}
36
37	if err := validateJobName(scrapeConfigs); err != nil {
38		return nil, err
39	}
40
41	for _, cfg := range scrapeConfigs {
42		pipeline, err := stages.NewPipeline(log.With(logger, "component", "push_pipeline_"+cfg.JobName), cfg.PipelineStages, &cfg.JobName, reg)
43		if err != nil {
44			return nil, err
45		}
46
47		t, err := NewPushTarget(logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.JobName, cfg.PushConfig)
48		if err != nil {
49			return nil, err
50		}
51
52		tm.targets[cfg.JobName] = t
53	}
54
55	return tm, nil
56}
57
58func validateJobName(scrapeConfigs []scrapeconfig.Config) error {
59	jobNames := map[string]struct{}{}
60	for i, cfg := range scrapeConfigs {
61		if cfg.JobName == "" {
62			return errors.New("`job_name` must be defined for the `push` scrape_config with a " +
63				"unique name to properly register metrics, " +
64				"at least one `push` scrape_config has no `job_name` defined")
65		}
66		if _, ok := jobNames[cfg.JobName]; ok {
67			return fmt.Errorf("`job_name` must be unique for each `push` scrape_config, "+
68				"a duplicate `job_name` of %s was found", cfg.JobName)
69		}
70		jobNames[cfg.JobName] = struct{}{}
71
72		scrapeConfigs[i].JobName = strings.Replace(cfg.JobName, " ", "_", -1)
73	}
74	return nil
75}
76
77// Ready returns true if at least one PushTarget is also ready.
78func (tm *PushTargetManager) Ready() bool {
79	for _, t := range tm.targets {
80		if t.Ready() {
81			return true
82		}
83	}
84	return false
85}
86
87// Stop stops the PushTargetManager and all of its PushTargets.
88func (tm *PushTargetManager) Stop() {
89	for _, t := range tm.targets {
90		if err := t.Stop(); err != nil {
91			level.Error(t.logger).Log("msg", "error stopping PushTarget", "err", err.Error())
92		}
93	}
94}
95
96// ActiveTargets returns the list of PushTargets where Push data
97// is being read. ActiveTargets is an alias to AllTargets as
98// PushTargets cannot be deactivated, only stopped.
99func (tm *PushTargetManager) ActiveTargets() map[string][]target.Target {
100	return tm.AllTargets()
101}
102
103// AllTargets returns the list of all targets where Push data
104// is currently being read.
105func (tm *PushTargetManager) AllTargets() map[string][]target.Target {
106	result := make(map[string][]target.Target, len(tm.targets))
107	for k, v := range tm.targets {
108		result[k] = []target.Target{v}
109	}
110	return result
111}
112