1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"sync"
9
10	"github.com/prometheus/client_golang/prometheus"
11	"github.com/timescale/promscale/pkg/util"
12)
13
14var (
15	// MaxSentTimestamp is the max timestamp sent to the database.
16	MaxSentTimestamp   = int64(0)
17	MetricBatcherChCap = prometheus.NewGauge(
18		prometheus.GaugeOpts{
19			Namespace: util.PromNamespace,
20			Name:      "metric_batcher_channel_cap",
21			Help:      "Capacity of metric batcher channel",
22		},
23	)
24	MetricBatcherChLen = prometheus.NewHistogram(
25		prometheus.HistogramOpts{
26			Namespace: util.PromNamespace,
27			Name:      "metric_batcher_channel_len",
28			Help:      "Length of metric batcher channels",
29			Buckets:   util.HistogramBucketsSaturating(0, 2, MetricBatcherChannelCap),
30		},
31	)
32	MetricBatcherFlushSeries = prometheus.NewHistogram(
33		prometheus.HistogramOpts{
34			Namespace: util.PromNamespace,
35			Name:      "metric_batcher_flush_series",
36			Help:      "Number of series batched by the batcher",
37			Buckets:   util.HistogramBucketsSaturating(1, 2, flushSize),
38		},
39	)
40
41	numSamplesInserted = prometheus.NewCounter(
42		prometheus.CounterOpts{
43			Namespace: util.PromNamespace,
44			Name:      "inserted_samples_total",
45			Help:      "Total samples inserted by copiers into the database.",
46		},
47	)
48
49	numExemplarsInserted = prometheus.NewCounter(
50		prometheus.CounterOpts{
51			Namespace: util.PromNamespace,
52			Name:      "inserted_exemplars_total",
53			Help:      "Total exemplars inserted by copiers into the database.",
54		},
55	)
56
57	NumInsertsPerBatch = prometheus.NewHistogram(
58		prometheus.HistogramOpts{
59			Namespace: util.PromNamespace,
60			Name:      "copier_inserts_per_batch",
61			Help:      "number of INSERTs in a single transaction",
62			Buckets:   util.HistogramBucketsSaturating(1, 2, maxInsertStmtPerTxn),
63		},
64	)
65
66	NumRowsPerBatch = prometheus.NewHistogram(
67		prometheus.HistogramOpts{
68			Namespace: util.PromNamespace,
69			Name:      "copier_rows_per_batch",
70			Help:      "number of rows inserted in a single transaction",
71			Buckets:   prometheus.ExponentialBuckets(1, 2, 15),
72		},
73	)
74
75	NumRowsPerInsert = prometheus.NewHistogram(
76		prometheus.HistogramOpts{
77			Namespace: util.PromNamespace,
78			Name:      "copier_rows_per_insert",
79			Help:      "number of rows inserted in a single insert statement",
80			Buckets:   prometheus.ExponentialBuckets(1, 2, 13),
81		},
82	)
83
84	DbBatchInsertDuration = prometheus.NewHistogram(
85		prometheus.HistogramOpts{
86			Namespace: util.PromNamespace,
87			Name:      "copier_insert_duration_seconds",
88			Help:      "Duration of sample batch insert calls to the DB.",
89			Buckets:   append(prometheus.DefBuckets, []float64{60, 120, 300}...),
90		},
91	)
92
93	MetadataBatchInsertDuration = prometheus.NewHistogram(
94		prometheus.HistogramOpts{
95			Namespace: util.PromNamespace,
96			Name:      "metadata_insert_duration_seconds",
97			Help:      "Duration of a single metadata batch to insert into the DB.",
98			Buckets:   append(prometheus.DefBuckets, []float64{60, 120, 300}...),
99		})
100
101	SamplesCopierChCap = prometheus.NewGauge(
102		prometheus.GaugeOpts{
103			Namespace: util.PromNamespace,
104			Name:      "samples_copier_channel_cap",
105			Help:      "Capacity of samples copier channel",
106		},
107	)
108
109	copierChannelMutex sync.Mutex
110
111	SamplesCopierChannelToMonitor chan readRequest
112	SamplesCopierChLen            = prometheus.NewGaugeFunc(
113		prometheus.GaugeOpts{
114			Namespace: util.PromNamespace,
115			Name:      "samples_copier_channel_len",
116			Help:      "Length of samples copier channel",
117		},
118		func() float64 { return float64(len(SamplesCopierChannelToMonitor)) },
119	)
120	activeWriteRequests = prometheus.NewGauge(
121		prometheus.GaugeOpts{
122			Namespace: util.PromNamespace,
123			Name:      "write_requests_processing",
124			Help:      "Number of active ingestion occurring in Promscale at the moment.",
125		},
126	)
127)
128
129func setCopierChannelToMonitor(toSamplesCopiers chan readRequest) {
130	copierChannelMutex.Lock()
131	defer copierChannelMutex.Unlock()
132
133	SamplesCopierChCap.Set(float64(cap(toSamplesCopiers)))
134	SamplesCopierChannelToMonitor = toSamplesCopiers
135}
136
137func init() {
138	prometheus.MustRegister(
139		MetricBatcherChCap,
140		MetricBatcherChLen,
141		MetricBatcherFlushSeries,
142		numSamplesInserted,
143		numExemplarsInserted,
144		NumInsertsPerBatch,
145		NumRowsPerBatch,
146		NumRowsPerInsert,
147		DbBatchInsertDuration,
148		MetadataBatchInsertDuration,
149		SamplesCopierChCap,
150		SamplesCopierChLen,
151		activeWriteRequests,
152	)
153
154	MetricBatcherChCap.Set(MetricBatcherChannelCap)
155}
156