1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"context"
9	"fmt"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/timescale/promscale/pkg/log"
15	"github.com/timescale/promscale/pkg/pgmodel/cache"
16	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
17	"github.com/timescale/promscale/pkg/pgmodel/metrics"
18	"github.com/timescale/promscale/pkg/pgmodel/model"
19	"github.com/timescale/promscale/pkg/pgxconn"
20	tput "github.com/timescale/promscale/pkg/util/throughput"
21)
22
23const (
24	MetricBatcherChannelCap = 1000
25	finalizeMetricCreation  = "CALL " + schema.Catalog + ".finalize_metric_creation()"
26	getEpochSQL             = "SELECT current_epoch FROM " + schema.Catalog + ".ids_epoch LIMIT 1"
27)
28
29// pgxDispatcher redirects incoming samples to the appropriate metricBatcher
30// corresponding to the metric in the sample.
31type pgxDispatcher struct {
32	conn                   pgxconn.PgxConn
33	metricTableNames       cache.MetricCache
34	scache                 cache.SeriesCache
35	exemplarKeyPosCache    cache.PositionCache
36	batchers               sync.Map
37	completeMetricCreation chan struct{}
38	asyncAcks              bool
39	copierReadRequestCh    chan<- readRequest
40	seriesEpochRefresh     *time.Ticker
41	doneChannel            chan struct{}
42	doneWG                 sync.WaitGroup
43	labelArrayOID          uint32
44}
45
46func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) {
47	numCopiers := cfg.NumCopiers
48	if numCopiers < 1 {
49		log.Warn("msg", "num copiers less than 1, setting to 1")
50		numCopiers = 1
51	}
52
53	//the copier read request channel keep the queue order
54	//between metrucs
55	maxMetrics := 10000
56	copierReadRequestCh := make(chan readRequest, maxMetrics)
57	setCopierChannelToMonitor(copierReadRequestCh)
58
59	if cfg.IgnoreCompressedChunks {
60		// Handle decompression to not decompress anything.
61		handleDecompression = skipDecompression
62	}
63
64	if err := model.RegisterCustomPgTypes(conn); err != nil {
65		return nil, fmt.Errorf("registering custom pg types: %w", err)
66	}
67
68	labelArrayOID := model.GetCustomTypeOID(model.LabelArray)
69	sw := NewSeriesWriter(conn, labelArrayOID)
70	elf := NewExamplarLabelFormatter(conn, eCache)
71
72	for i := 0; i < numCopiers; i++ {
73		go runCopier(conn, copierReadRequestCh, sw, elf)
74	}
75
76	inserter := &pgxDispatcher{
77		conn:                   conn,
78		metricTableNames:       cache,
79		scache:                 scache,
80		exemplarKeyPosCache:    eCache,
81		completeMetricCreation: make(chan struct{}, 1),
82		asyncAcks:              cfg.AsyncAcks,
83		copierReadRequestCh:    copierReadRequestCh,
84		// set to run at half our deletion interval
85		seriesEpochRefresh: time.NewTicker(30 * time.Minute),
86		doneChannel:        make(chan struct{}),
87	}
88	runBatchWatcher(inserter.doneChannel)
89
90	//on startup run a completeMetricCreation to recover any potentially
91	//incomplete metric
92	if err := inserter.CompleteMetricCreation(); err != nil {
93		return nil, err
94	}
95
96	go inserter.runCompleteMetricCreationWorker()
97
98	if !cfg.DisableEpochSync {
99		inserter.doneWG.Add(1)
100		go func() {
101			defer inserter.doneWG.Done()
102			inserter.runSeriesEpochSync()
103		}()
104	}
105	return inserter, nil
106}
107
108func (p *pgxDispatcher) runCompleteMetricCreationWorker() {
109	for range p.completeMetricCreation {
110		err := p.CompleteMetricCreation()
111		if err != nil {
112			log.Warn("msg", "Got an error finalizing metric", "err", err)
113		}
114	}
115}
116
117func (p *pgxDispatcher) runSeriesEpochSync() {
118	epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch)
119	// we don't have any great place to report errors, and if the
120	// connection recovers we can still make progress, so we'll just log it
121	// and continue execution
122	if err != nil {
123		log.Error("msg", "error refreshing the series cache", "err", err)
124	}
125	for {
126		select {
127		case <-p.seriesEpochRefresh.C:
128			epoch, err = p.refreshSeriesEpoch(epoch)
129			if err != nil {
130				log.Error("msg", "error refreshing the series cache", "err", err)
131			}
132		case <-p.doneChannel:
133			return
134		}
135	}
136}
137
138func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) {
139	dbEpoch, err := p.getServerEpoch()
140	if err != nil {
141		// Trash the cache just in case an epoch change occurred, seems safer
142		p.scache.Reset()
143		return model.InvalidSeriesEpoch, err
144	}
145	if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch {
146		p.scache.Reset()
147	}
148	return dbEpoch, nil
149}
150
151func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) {
152	var newEpoch int64
153	row := p.conn.QueryRow(context.Background(), getEpochSQL)
154	err := row.Scan(&newEpoch)
155	if err != nil {
156		return -1, err
157	}
158
159	return model.SeriesEpoch(newEpoch), nil
160}
161
162func (p *pgxDispatcher) CompleteMetricCreation() error {
163	_, err := p.conn.Exec(
164		context.Background(),
165		finalizeMetricCreation,
166	)
167	return err
168}
169
170func (p *pgxDispatcher) Close() {
171	close(p.completeMetricCreation)
172	p.batchers.Range(func(key, value interface{}) bool {
173		close(value.(chan *insertDataRequest))
174		return true
175	})
176	close(p.copierReadRequestCh)
177	close(p.doneChannel)
178	p.doneWG.Wait()
179}
180
181// InsertTs inserts a batch of data into the database.
182// The data should be grouped by metric name.
183// returns the number of rows we intended to insert (_not_ how many were
184// actually inserted) and any error.
185// Though we may insert data to multiple tables concurrently, if asyncAcks is
186// unset this function will wait until _all_ the insert attempts have completed.
187func (p *pgxDispatcher) InsertTs(dataTS model.Data) (uint64, error) {
188	var (
189		numRows      uint64
190		maxt         int64
191		rows         = dataTS.Rows
192		workFinished = new(sync.WaitGroup)
193	)
194	workFinished.Add(len(rows))
195	// we only allocate enough space for a single error message here as we only
196	// report one error back upstream. The inserter should not block on this
197	// channel, but only insert if it's empty, anything else can deadlock.
198	errChan := make(chan error, 1)
199	for metricName, data := range rows {
200		for _, insertable := range data {
201			numRows += uint64(insertable.Count())
202			ts := insertable.MaxTs()
203			if maxt < ts {
204				maxt = ts
205			}
206		}
207		// the following is usually non-blocking, just a channel insert
208		p.getMetricBatcher(metricName) <- &insertDataRequest{metric: metricName, data: data, finished: workFinished, errChan: errChan}
209	}
210	reportIncomingBatch(numRows)
211	reportOutgoing := func() {
212		reportOutgoingBatch(numRows)
213		reportBatchProcessingTime(dataTS.ReceivedTime)
214	}
215
216	var err error
217	if !p.asyncAcks {
218		workFinished.Wait()
219		reportOutgoing()
220		select {
221		case err = <-errChan:
222		default:
223		}
224		postIngestTasks(maxt, numRows, 0)
225		close(errChan)
226	} else {
227		go func() {
228			workFinished.Wait()
229			reportOutgoing()
230			select {
231			case err = <-errChan:
232			default:
233			}
234			close(errChan)
235			if err != nil {
236				log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err)
237			}
238			postIngestTasks(maxt, numRows, 0)
239		}()
240	}
241
242	return numRows, err
243}
244
245func (p *pgxDispatcher) InsertMetadata(metadata []model.Metadata) (uint64, error) {
246	totalRows := uint64(len(metadata))
247	insertedRows, err := insertMetadata(p.conn, metadata)
248	if err != nil {
249		return insertedRows, err
250	}
251	postIngestTasks(0, 0, insertedRows)
252	if totalRows != insertedRows {
253		return insertedRows, fmt.Errorf("failed to insert all metadata: inserted %d rows out of %d rows in total", insertedRows, totalRows)
254	}
255	return insertedRows, nil
256}
257
258// postIngestTasks performs a set of tasks that are due after ingesting series data.
259func postIngestTasks(maxTs int64, numSamples, numMetadata uint64) {
260	tput.ReportDataProcessed(maxTs, numSamples, numMetadata)
261
262	// Max_sent_timestamp stats.
263	if maxTs < atomic.LoadInt64(&MaxSentTimestamp) {
264		return
265	}
266	atomic.StoreInt64(&MaxSentTimestamp, maxTs)
267	metrics.MaxSentTimestamp.Set(float64(maxTs))
268}
269
270// Get the handler for a given metric name, creating a new one if none exists
271func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest {
272	batcher, ok := p.batchers.Load(metric)
273	if !ok {
274		// The ordering is important here: we need to ensure that every call
275		// to getMetricInserter() returns the same inserter. Therefore, we can
276		// only start up the inserter routine if we know that we won the race
277		// to create the inserter, anything else will leave a zombie inserter
278		// lying around.
279		c := make(chan *insertDataRequest, MetricBatcherChannelCap)
280		actual, old := p.batchers.LoadOrStore(metric, c)
281		batcher = actual
282		if !old {
283			go runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.copierReadRequestCh, p.labelArrayOID)
284		}
285	}
286	ch := batcher.(chan *insertDataRequest)
287	MetricBatcherChLen.Observe(float64(len(ch)))
288	return ch
289}
290
291type insertDataRequest struct {
292	metric   string
293	finished *sync.WaitGroup
294	data     []model.Insertable
295	errChan  chan error
296}
297
298func (idr *insertDataRequest) reportResult(err error) {
299	if err != nil {
300		select {
301		case idr.errChan <- err:
302		default:
303		}
304	}
305	idr.finished.Done()
306}
307