1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"fmt"
9	"sync"
10	"sync/atomic"
11	"time"
12
13	"github.com/timescale/promscale/pkg/ewma"
14	"github.com/timescale/promscale/pkg/log"
15)
16
17const (
18	// reportThreshold is a value above which the warn would be logged. This value is
19	// the value of the ratio between incoming batches vs outgoing batches.
20	reportRatioThreshold = 3
21	checkRatioInterval   = time.Minute
22
23	// Duration warnings constants.
24	reportDurationThreshold = time.Minute
25)
26
27// throughtputWatcher is a light weight samples batch watching type, that serves as a watching
28// routine and keeps the track of incoming write batches. It keeps a ratio of
29// incoming samples vs outgoing samples and warn about low throughput to the user
30// which might be due to external causes like network latency, resources allocated, etc.
31type batchWatcher struct {
32	// Warn based on ratio.
33	incomingBatches *ewma.Rate
34	outgoingBatches *ewma.Rate
35
36	// Warn based on ingestion duration.
37	shouldReportLastDuration atomic.Value
38	lastIngestionDuration    atomic.Value
39
40	stop chan struct{}
41}
42
43var (
44	watcher  *batchWatcher
45	tWatcher = new(sync.Once)
46)
47
48func runBatchWatcher(stop chan struct{}) {
49	tWatcher.Do(func() {
50		watcher := newThroughputWatcher(stop)
51		go watcher.watch()
52	})
53}
54
55func newThroughputWatcher(stop chan struct{}) *batchWatcher {
56	watcher = &batchWatcher{
57		incomingBatches: ewma.NewEWMARate(1, checkRatioInterval),
58		outgoingBatches: ewma.NewEWMARate(1, checkRatioInterval),
59		stop:            stop,
60	}
61	watcher.shouldReportLastDuration.Store(false)
62	return watcher
63}
64
65// watchBatches watches the ratio between incomingBatches and outgoingBatches every checkRatioInterval.
66func (w *batchWatcher) watch() {
67	t := time.NewTicker(checkRatioInterval)
68	defer t.Stop()
69	for {
70		select {
71		case <-t.C:
72		case <-w.stop:
73			return
74		}
75		w.outgoingBatches.Tick()
76		w.incomingBatches.Tick()
77		outgoingRate := w.outgoingBatches.Rate()
78		incomingRate := w.incomingBatches.Rate()
79		if outgoingRate > 0 {
80			r := incomingRate / outgoingRate
81			if r > reportRatioThreshold {
82				warnHighRatio(r, incomingRate, outgoingRate)
83			}
84		}
85		if w.shouldReportLastDuration.Load().(bool) {
86			warnSlowIngestion(w.lastIngestionDuration.Load().(time.Duration))
87			w.shouldReportLastDuration.Store(false)
88		}
89	}
90}
91
92func warnHighRatio(ratio float64, inRate, outRate float64) {
93	log.Warn("msg", "[WARNING] Incoming samples rate much higher than the rate of samples being saved. "+
94		"This may happen due to poor network latency, not enough resources allocated to Promscale, or some other performance-related reason. "+
95		"Please tune your system or reach out to the Promscale team.",
96		"incoming-rate", inRate, "outgoing-rate", outRate,
97		"throughput-ratio", fmt.Sprintf("%.2f", ratio), "threshold", reportRatioThreshold)
98}
99
100func warnSlowIngestion(duration time.Duration) {
101	log.Warn("msg", "[WARNING] Ingestion is a very long time", "duration",
102		duration.String(), "threshold", reportDurationThreshold.String())
103}
104
105func reportIncomingBatch(size uint64) {
106	watcher.incomingBatches.Incr(int64(size))
107}
108
109func reportOutgoingBatch(size uint64) {
110	watcher.outgoingBatches.Incr(int64(size))
111}
112
113func reportBatchProcessingTime(inTime time.Time) {
114	d := time.Since(inTime)
115	if d.Seconds() > reportDurationThreshold.Seconds() {
116		watcher.shouldReportLastDuration.Store(true)
117		watcher.lastIngestionDuration.Store(d)
118	}
119}
120