1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package remote 15 16import ( 17 "sync" 18 19 "github.com/prometheus/common/model" 20 21 "github.com/prometheus/prometheus/config" 22) 23 24// Writer allows queueing samples for remote writes. 25type Writer struct { 26 mtx sync.RWMutex 27 queues []*QueueManager 28} 29 30// ApplyConfig updates the state as the new config requires. 31func (w *Writer) ApplyConfig(conf *config.Config) error { 32 w.mtx.Lock() 33 defer w.mtx.Unlock() 34 35 newQueues := []*QueueManager{} 36 // TODO: we should only stop & recreate queues which have changes, 37 // as this can be quite disruptive. 38 for i, rwConf := range conf.RemoteWriteConfigs { 39 c, err := NewClient(i, &ClientConfig{ 40 URL: rwConf.URL, 41 Timeout: rwConf.RemoteTimeout, 42 HTTPClientConfig: rwConf.HTTPClientConfig, 43 }) 44 if err != nil { 45 return err 46 } 47 newQueues = append(newQueues, NewQueueManager( 48 rwConf.QueueConfig, 49 conf.GlobalConfig.ExternalLabels, 50 rwConf.WriteRelabelConfigs, 51 c, 52 )) 53 } 54 55 for _, q := range w.queues { 56 q.Stop() 57 } 58 59 w.queues = newQueues 60 for _, q := range w.queues { 61 q.Start() 62 } 63 return nil 64} 65 66// Stop the background processing of the storage queues. 67func (w *Writer) Stop() { 68 for _, q := range w.queues { 69 q.Stop() 70 } 71} 72 73// Append implements storage.SampleAppender. Always returns nil. 74func (w *Writer) Append(smpl *model.Sample) error { 75 w.mtx.RLock() 76 defer w.mtx.RUnlock() 77 78 for _, q := range w.queues { 79 q.Append(smpl) 80 } 81 return nil 82} 83 84// NeedsThrottling implements storage.SampleAppender. It will always return 85// false as a remote storage drops samples on the floor if backlogging instead 86// of asking for throttling. 87func (w *Writer) NeedsThrottling() bool { 88 return false 89} 90