1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"context"
9	"fmt"
10
11	"github.com/timescale/promscale/pkg/log"
12	"github.com/timescale/promscale/pkg/pgmodel/cache"
13	pgErrors "github.com/timescale/promscale/pkg/pgmodel/common/errors"
14	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
15	"github.com/timescale/promscale/pkg/pgmodel/model"
16	"github.com/timescale/promscale/pkg/pgxconn"
17)
18
19const getCreateMetricsTableWithNewSQL = "SELECT table_name, possibly_new FROM " + schema.Catalog + ".get_or_create_metric_table_name($1)"
20const createExemplarTable = "SELECT * FROM " + schema.Catalog + ".create_exemplar_table_if_not_exists($1)"
21
22func containsExemplars(data []model.Insertable) bool {
23	for _, row := range data {
24		if row.IsOfType(model.Exemplar) {
25			return true
26		}
27	}
28	return false
29}
30
31type readRequest struct {
32	copySender <-chan copyRequest
33}
34
35func metricTableName(conn pgxconn.PgxConn, metric string) (string, bool, error) {
36	res, err := conn.Query(
37		context.Background(),
38		getCreateMetricsTableWithNewSQL,
39		metric,
40	)
41
42	if err != nil {
43		return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
44	}
45
46	var tableName string
47	var possiblyNew bool
48	defer res.Close()
49	if !res.Next() {
50		if err := res.Err(); err != nil {
51			return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
52		}
53		return "", true, pgErrors.ErrMissingTableName
54	}
55
56	if err := res.Scan(&tableName, &possiblyNew); err != nil {
57		return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
58	}
59
60	if err := res.Err(); err != nil {
61		return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
62	}
63
64	if tableName == "" {
65		return "", true, fmt.Errorf("failed to get the table name for metric %s: empty table name returned", metric)
66	}
67
68	return tableName, possiblyNew, nil
69}
70
71// Create the metric table for the metric we handle, if it does not already
72// exist. This only does the most critical part of metric table creation, the
73// rest is handled by completeMetricTableCreation().
74func initializeMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames cache.MetricCache) (tableName string, err error) {
75	// Metric batchers are always initialized with metric names of samples and not of exemplars.
76	mInfo, err := metricTableNames.Get(schema.Data, metricName, false)
77	if err == nil && mInfo.TableName != "" {
78		return mInfo.TableName, nil
79	}
80
81	tableName, possiblyNew, err := metricTableName(conn, metricName)
82	if err != nil || tableName == "" {
83		return "", err
84	}
85
86	// We ignore error here since this is just an optimization.
87	//
88	// Always set metric name while initializing with exemplars as false, since
89	// the metric name set here is via fetching the metric name from metric table.
90	//
91	// Metric table is filled during start, but exemplar table is filled when we
92	// first see an exemplar. Hence, that's the place to sent isExemplar as true.
93	_ = metricTableNames.Set(
94		schema.Data,
95		metricName,
96		model.MetricInfo{
97			TableSchema: schema.Data, TableName: tableName,
98			SeriesTable: tableName, // Series table name is always the same for raw metrics.
99		},
100		false,
101	)
102
103	if possiblyNew {
104		//pass a signal if there is space
105		select {
106		case completeMetricCreationSignal <- struct{}{}:
107		default:
108		}
109	}
110	return tableName, err
111}
112
113// initilizeExemplars creates the necessary tables for exemplars. Called lazily only if exemplars are found
114func initializeExemplars(conn pgxconn.PgxConn, metricName string) error {
115	// We are seeing the exemplar belonging to this metric first time. It may be the
116	// first time of this exemplar in the database. So, let's attempt to create a table
117	// if it does not exists.
118	var created bool
119	err := conn.QueryRow(context.Background(), createExemplarTable, metricName).Scan(&created)
120	if err != nil {
121		return fmt.Errorf("error initializing exemplar tables for %s: %w", metricName, err)
122	}
123	return nil
124}
125
126func runMetricBatcher(conn pgxconn.PgxConn,
127	input chan *insertDataRequest,
128	metricName string,
129	completeMetricCreationSignal chan struct{},
130	metricTableNames cache.MetricCache,
131	copierReadRequestCh chan<- readRequest,
132	labelArrayOID uint32) {
133
134	var (
135		tableName            string
136		firstReq             *insertDataRequest
137		firstReqSet          = false
138		exemplarsInitialized = false
139	)
140
141	addReq := func(req *insertDataRequest, buf *pendingBuffer) {
142		if !exemplarsInitialized && containsExemplars(req.data) {
143			if err := initializeExemplars(conn, metricName); err != nil {
144				log.Error("msg", err)
145				req.reportResult(err)
146				return
147			}
148			exemplarsInitialized = true
149		}
150		buf.addReq(req)
151	}
152	//This channel in synchronous (no buffering). This provides backpressure
153	//to the batcher to keep batching until the copier is ready to read.
154	copySender := make(chan copyRequest)
155	defer close(copySender)
156	readRequest := readRequest{copySender: copySender}
157
158	for firstReq = range input {
159		var err error
160		tableName, err = initializeMetricBatcher(conn, metricName, completeMetricCreationSignal, metricTableNames)
161		if err != nil {
162			err := fmt.Errorf("initializing the insert routine for metric %v has failed with %w", metricName, err)
163			log.Error("msg", err)
164			firstReq.reportResult(err)
165		} else {
166			firstReqSet = true
167			break
168		}
169	}
170
171	//input channel was closed before getting a successful request
172	if !firstReqSet {
173		return
174	}
175
176	//the basic structure of communication from the batcher to the copier is as follows:
177	// 1. the batcher gets a request from the input channel
178	// 2. the batcher sends a readRequest to the copier on a channel shared by all the metric batchers
179	// 3. the batcher keeps on batching together requests from the input channel as long as the copier isn't ready to receive the batch
180	// 4. the copier catches up and gets the read request from the shared channel
181	// 5. the copier reads from the channel specified in the read request
182	// 6. the batcher is able to send it's batch to the copier
183
184	// Notice some properties:
185	// 1. The shared channel in step 2 acts as a queue between metric batchers where the priority is approximately the earliest arrival time of any
186	//     request in the batch (that's why we only do step 2 after step 1). Note this means we probably want a single copier reading a batch
187	//     of requests consecutively so as to minimize processing delays. That's what the mutex in the copier does.
188	// 2. There is an auto-adjusting adaptation loop in step 3. The longer the copier takes to catch up to the readRequest in the queue, the more things will be batched
189	// 3. The batcher has only a single read request out at a time.
190
191	pending := NewPendingBuffer()
192	addReq(firstReq, pending)
193	copierReadRequestCh <- readRequest
194
195	for {
196		if pending.IsEmpty() {
197			req, ok := <-input
198			if !ok {
199				return
200			}
201			addReq(req, pending)
202			copierReadRequestCh <- readRequest
203		}
204
205		recvCh := input
206		if pending.IsFull() {
207			recvCh = nil
208		}
209
210		numSeries := pending.batch.CountSeries()
211		select {
212		//try to send first, if not then keep batching
213		case copySender <- copyRequest{pending, tableName}:
214			MetricBatcherFlushSeries.Observe(float64(numSeries))
215			pending = NewPendingBuffer()
216		case req, ok := <-recvCh:
217			if !ok {
218				if !pending.IsEmpty() {
219					copySender <- copyRequest{pending, tableName}
220					MetricBatcherFlushSeries.Observe(float64(numSeries))
221				}
222				return
223			}
224			addReq(req, pending)
225		}
226	}
227}
228