1package lokipush 2 3import ( 4 "errors" 5 "fmt" 6 "strings" 7 8 "github.com/go-kit/kit/log" 9 "github.com/go-kit/kit/log/level" 10 "github.com/prometheus/client_golang/prometheus" 11 12 "github.com/grafana/loki/clients/pkg/logentry/stages" 13 "github.com/grafana/loki/clients/pkg/promtail/api" 14 "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" 15 "github.com/grafana/loki/clients/pkg/promtail/targets/target" 16) 17 18// PushTargetManager manages a series of PushTargets. 19type PushTargetManager struct { 20 logger log.Logger 21 targets map[string]*PushTarget 22} 23 24// NewPushTargetManager creates a new PushTargetManager. 25func NewPushTargetManager( 26 reg prometheus.Registerer, 27 logger log.Logger, 28 client api.EntryHandler, 29 scrapeConfigs []scrapeconfig.Config, 30) (*PushTargetManager, error) { 31 32 tm := &PushTargetManager{ 33 logger: logger, 34 targets: make(map[string]*PushTarget), 35 } 36 37 if err := validateJobName(scrapeConfigs); err != nil { 38 return nil, err 39 } 40 41 for _, cfg := range scrapeConfigs { 42 pipeline, err := stages.NewPipeline(log.With(logger, "component", "push_pipeline_"+cfg.JobName), cfg.PipelineStages, &cfg.JobName, reg) 43 if err != nil { 44 return nil, err 45 } 46 47 t, err := NewPushTarget(logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.JobName, cfg.PushConfig) 48 if err != nil { 49 return nil, err 50 } 51 52 tm.targets[cfg.JobName] = t 53 } 54 55 return tm, nil 56} 57 58func validateJobName(scrapeConfigs []scrapeconfig.Config) error { 59 jobNames := map[string]struct{}{} 60 for i, cfg := range scrapeConfigs { 61 if cfg.JobName == "" { 62 return errors.New("`job_name` must be defined for the `push` scrape_config with a " + 63 "unique name to properly register metrics, " + 64 "at least one `push` scrape_config has no `job_name` defined") 65 } 66 if _, ok := jobNames[cfg.JobName]; ok { 67 return fmt.Errorf("`job_name` must be unique for each `push` scrape_config, "+ 68 "a duplicate `job_name` of %s was found", cfg.JobName) 69 } 70 jobNames[cfg.JobName] = struct{}{} 71 72 scrapeConfigs[i].JobName = strings.Replace(cfg.JobName, " ", "_", -1) 73 } 74 return nil 75} 76 77// Ready returns true if at least one PushTarget is also ready. 78func (tm *PushTargetManager) Ready() bool { 79 for _, t := range tm.targets { 80 if t.Ready() { 81 return true 82 } 83 } 84 return false 85} 86 87// Stop stops the PushTargetManager and all of its PushTargets. 88func (tm *PushTargetManager) Stop() { 89 for _, t := range tm.targets { 90 if err := t.Stop(); err != nil { 91 level.Error(t.logger).Log("msg", "error stopping PushTarget", "err", err.Error()) 92 } 93 } 94} 95 96// ActiveTargets returns the list of PushTargets where Push data 97// is being read. ActiveTargets is an alias to AllTargets as 98// PushTargets cannot be deactivated, only stopped. 99func (tm *PushTargetManager) ActiveTargets() map[string][]target.Target { 100 return tm.AllTargets() 101} 102 103// AllTargets returns the list of all targets where Push data 104// is currently being read. 105func (tm *PushTargetManager) AllTargets() map[string][]target.Target { 106 result := make(map[string][]target.Target, len(tm.targets)) 107 for k, v := range tm.targets { 108 result[k] = []target.Target{v} 109 } 110 return result 111} 112