1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "fmt" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "github.com/timescale/promscale/pkg/ewma" 14 "github.com/timescale/promscale/pkg/log" 15) 16 17const ( 18 // reportThreshold is a value above which the warn would be logged. This value is 19 // the value of the ratio between incoming batches vs outgoing batches. 20 reportRatioThreshold = 3 21 checkRatioInterval = time.Minute 22 23 // Duration warnings constants. 24 reportDurationThreshold = time.Minute 25) 26 27// throughtputWatcher is a light weight samples batch watching type, that serves as a watching 28// routine and keeps the track of incoming write batches. It keeps a ratio of 29// incoming samples vs outgoing samples and warn about low throughput to the user 30// which might be due to external causes like network latency, resources allocated, etc. 31type batchWatcher struct { 32 // Warn based on ratio. 33 incomingBatches *ewma.Rate 34 outgoingBatches *ewma.Rate 35 36 // Warn based on ingestion duration. 37 shouldReportLastDuration atomic.Value 38 lastIngestionDuration atomic.Value 39 40 stop chan struct{} 41} 42 43var ( 44 watcher *batchWatcher 45 tWatcher = new(sync.Once) 46) 47 48func runBatchWatcher(stop chan struct{}) { 49 tWatcher.Do(func() { 50 watcher := newThroughputWatcher(stop) 51 go watcher.watch() 52 }) 53} 54 55func newThroughputWatcher(stop chan struct{}) *batchWatcher { 56 watcher = &batchWatcher{ 57 incomingBatches: ewma.NewEWMARate(1, checkRatioInterval), 58 outgoingBatches: ewma.NewEWMARate(1, checkRatioInterval), 59 stop: stop, 60 } 61 watcher.shouldReportLastDuration.Store(false) 62 return watcher 63} 64 65// watchBatches watches the ratio between incomingBatches and outgoingBatches every checkRatioInterval. 66func (w *batchWatcher) watch() { 67 t := time.NewTicker(checkRatioInterval) 68 defer t.Stop() 69 for { 70 select { 71 case <-t.C: 72 case <-w.stop: 73 return 74 } 75 w.outgoingBatches.Tick() 76 w.incomingBatches.Tick() 77 outgoingRate := w.outgoingBatches.Rate() 78 incomingRate := w.incomingBatches.Rate() 79 if outgoingRate > 0 { 80 r := incomingRate / outgoingRate 81 if r > reportRatioThreshold { 82 warnHighRatio(r, incomingRate, outgoingRate) 83 } 84 } 85 if w.shouldReportLastDuration.Load().(bool) { 86 warnSlowIngestion(w.lastIngestionDuration.Load().(time.Duration)) 87 w.shouldReportLastDuration.Store(false) 88 } 89 } 90} 91 92func warnHighRatio(ratio float64, inRate, outRate float64) { 93 log.Warn("msg", "[WARNING] Incoming samples rate much higher than the rate of samples being saved. "+ 94 "This may happen due to poor network latency, not enough resources allocated to Promscale, or some other performance-related reason. "+ 95 "Please tune your system or reach out to the Promscale team.", 96 "incoming-rate", inRate, "outgoing-rate", outRate, 97 "throughput-ratio", fmt.Sprintf("%.2f", ratio), "threshold", reportRatioThreshold) 98} 99 100func warnSlowIngestion(duration time.Duration) { 101 log.Warn("msg", "[WARNING] Ingestion is a very long time", "duration", 102 duration.String(), "threshold", reportDurationThreshold.String()) 103} 104 105func reportIncomingBatch(size uint64) { 106 watcher.incomingBatches.Incr(int64(size)) 107} 108 109func reportOutgoingBatch(size uint64) { 110 watcher.outgoingBatches.Incr(int64(size)) 111} 112 113func reportBatchProcessingTime(inTime time.Time) { 114 d := time.Since(inTime) 115 if d.Seconds() > reportDurationThreshold.Seconds() { 116 watcher.shouldReportLastDuration.Store(true) 117 watcher.lastIngestionDuration.Store(d) 118 } 119} 120