1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package remote
15
16import (
17	"sync"
18
19	"github.com/prometheus/common/model"
20
21	"github.com/prometheus/prometheus/config"
22)
23
24// Writer allows queueing samples for remote writes.
25type Writer struct {
26	mtx    sync.RWMutex
27	queues []*QueueManager
28}
29
30// ApplyConfig updates the state as the new config requires.
31func (w *Writer) ApplyConfig(conf *config.Config) error {
32	w.mtx.Lock()
33	defer w.mtx.Unlock()
34
35	newQueues := []*QueueManager{}
36	// TODO: we should only stop & recreate queues which have changes,
37	// as this can be quite disruptive.
38	for i, rwConf := range conf.RemoteWriteConfigs {
39		c, err := NewClient(i, &ClientConfig{
40			URL:              rwConf.URL,
41			Timeout:          rwConf.RemoteTimeout,
42			HTTPClientConfig: rwConf.HTTPClientConfig,
43		})
44		if err != nil {
45			return err
46		}
47		newQueues = append(newQueues, NewQueueManager(
48			rwConf.QueueConfig,
49			conf.GlobalConfig.ExternalLabels,
50			rwConf.WriteRelabelConfigs,
51			c,
52		))
53	}
54
55	for _, q := range w.queues {
56		q.Stop()
57	}
58
59	w.queues = newQueues
60	for _, q := range w.queues {
61		q.Start()
62	}
63	return nil
64}
65
66// Stop the background processing of the storage queues.
67func (w *Writer) Stop() {
68	for _, q := range w.queues {
69		q.Stop()
70	}
71}
72
73// Append implements storage.SampleAppender. Always returns nil.
74func (w *Writer) Append(smpl *model.Sample) error {
75	w.mtx.RLock()
76	defer w.mtx.RUnlock()
77
78	for _, q := range w.queues {
79		q.Append(smpl)
80	}
81	return nil
82}
83
84// NeedsThrottling implements storage.SampleAppender. It will always return
85// false as a remote storage drops samples on the floor if backlogging instead
86// of asking for throttling.
87func (w *Writer) NeedsThrottling() bool {
88	return false
89}
90