1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package ingestor
6
7import (
8	"fmt"
9	"time"
10
11	"github.com/timescale/promscale/pkg/pgmodel/cache"
12	"github.com/timescale/promscale/pkg/pgmodel/common/errors"
13	"github.com/timescale/promscale/pkg/pgmodel/model"
14	"github.com/timescale/promscale/pkg/pgxconn"
15	"github.com/timescale/promscale/pkg/prompb"
16)
17
18type Cfg struct {
19	AsyncAcks              bool
20	ReportInterval         int
21	NumCopiers             int
22	DisableEpochSync       bool
23	IgnoreCompressedChunks bool
24}
25
26// DBIngestor ingest the TimeSeries data into Timescale database.
27type DBIngestor struct {
28	sCache     cache.SeriesCache
29	dispatcher model.Dispatcher
30}
31
32// NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache
33// for caching metric table names.
34func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) {
35	dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg)
36	if err != nil {
37		return nil, err
38	}
39	return &DBIngestor{
40		sCache:     sCache,
41		dispatcher: dispatcher,
42	}, nil
43}
44
45// NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX
46// with an empty config, a new default size metrics cache and a non-ha-aware data parser
47func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
48	if cfg == nil {
49		cfg = &Cfg{}
50	}
51	cacheConfig := cache.DefaultConfig
52	c := cache.NewMetricCache(cacheConfig)
53	s := cache.NewSeriesCache(cacheConfig, nil)
54	e := cache.NewExemplarLabelsPosCache(cacheConfig)
55	return NewPgxIngestor(conn, c, s, e, cfg)
56}
57
58const (
59	meta = iota
60	series
61)
62
63// result contains insert stats and is used when we ingest samples and metadata concurrently.
64type result struct {
65	id      int8
66	numRows uint64
67	err     error
68}
69
70// Ingest transforms and ingests the timeseries data into Timescale database.
71// input:
72//     tts the []Timeseries to insert
73//     req the WriteRequest backing tts. It will be added to our WriteRequest
74//         pool when it is no longer needed.
75func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) {
76	activeWriteRequests.Inc()
77	defer activeWriteRequests.Dec() // Dec() is defered otherwise it will lead to loosing a decrement if some error occurs.
78	var (
79		timeseries = r.Timeseries
80		metadata   = r.Metadata
81	)
82	release := func() { FinishWriteRequest(r) }
83	switch numTs, numMeta := len(timeseries), len(metadata); {
84	case numTs > 0 && numMeta == 0:
85		// Write request contains only time-series.
86		n, err := ingestor.ingestTimeseries(timeseries, release)
87		return n, 0, err
88	case numTs == 0 && numMeta == 0:
89		release()
90		return 0, 0, nil
91	case numMeta > 0 && numTs == 0:
92		// Write request contains only metadata.
93		n, err := ingestor.ingestMetadata(metadata, release)
94		return 0, n, err
95	default:
96	}
97	release = func() {
98		// We do not want to re-initialize the write-request when ingesting concurrently.
99		// A concurrent ingestion may not have read the write-request and we initialized it
100		// leading to data loss.
101		FinishWriteRequest(nil)
102	}
103	res := make(chan result, 2)
104	defer close(res)
105
106	go func() {
107		n, err := ingestor.ingestTimeseries(timeseries, release)
108		res <- result{series, n, err}
109	}()
110	go func() {
111		n, err := ingestor.ingestMetadata(metadata, release)
112		res <- result{meta, n, err}
113	}()
114
115	mergeErr := func(prevErr, err error, message string) error {
116		if prevErr != nil {
117			err = fmt.Errorf("%s: %s: %w", prevErr.Error(), message, err)
118		}
119		return err
120	}
121
122	for i := 0; i < 2; i++ {
123		response := <-res
124		switch response.id {
125		case series:
126			numInsertablesIngested = response.numRows
127			err = mergeErr(err, response.err, "ingesting timeseries")
128		case meta:
129			numMetadataIngested = response.numRows
130			err = mergeErr(err, response.err, "ingesting metadata")
131		}
132	}
133	// WriteRequests can contain pointers into the original buffer we deserialized
134	// them out of, and can be quite large in and of themselves. In order to prevent
135	// memory blowup, and to allow faster deserializing, we recycle the WriteRequest
136	// here, allowing it to be either garbage collected or reused for a new request.
137	// In order for this to work correctly, any data we wish to keep using (e.g.
138	// samples) must no longer be reachable from req.
139	FinishWriteRequest(r)
140	return numInsertablesIngested, numMetadataIngested, err
141}
142
143func (ingestor *DBIngestor) ingestTimeseries(timeseries []prompb.TimeSeries, releaseMem func()) (uint64, error) {
144	var (
145		totalRowsExpected uint64
146
147		insertables = make(map[string][]model.Insertable)
148	)
149
150	for i := range timeseries {
151		var (
152			err        error
153			series     *model.Series
154			metricName string
155
156			ts = &timeseries[i]
157		)
158		if len(ts.Labels) == 0 {
159			continue
160		}
161		// Normalize and canonicalize t.Labels.
162		// After this point t.Labels should never be used again.
163		series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels)
164		if err != nil {
165			return 0, err
166		}
167		if metricName == "" {
168			return 0, errors.ErrNoMetricName
169		}
170
171		if len(ts.Samples) > 0 {
172			samples, count, err := ingestor.samples(series, ts)
173			if err != nil {
174				return 0, fmt.Errorf("samples: %w", err)
175			}
176			totalRowsExpected += uint64(count)
177			insertables[metricName] = append(insertables[metricName], samples)
178		}
179		if len(ts.Exemplars) > 0 {
180			exemplars, count, err := ingestor.exemplars(series, ts)
181			if err != nil {
182				return 0, fmt.Errorf("exemplars: %w", err)
183			}
184			totalRowsExpected += uint64(count)
185			insertables[metricName] = append(insertables[metricName], exemplars)
186		}
187		// we're going to free req after this, but we still need the samples,
188		// so nil the field
189		ts.Samples = nil
190		ts.Exemplars = nil
191	}
192	releaseMem()
193
194	numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(model.Data{Rows: insertables, ReceivedTime: time.Now()})
195	if errSamples == nil && numInsertablesIngested != totalRowsExpected {
196		return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested)
197	}
198	return numInsertablesIngested, errSamples
199}
200
201func (ingestor *DBIngestor) samples(l *model.Series, ts *prompb.TimeSeries) (model.Insertable, int, error) {
202	return model.NewPromSamples(l, ts.Samples), len(ts.Samples), nil
203}
204
205func (ingestor *DBIngestor) exemplars(l *model.Series, ts *prompb.TimeSeries) (model.Insertable, int, error) {
206	return model.NewPromExemplars(l, ts.Exemplars), len(ts.Exemplars), nil
207}
208
209// ingestMetadata ingests metric metadata received from Prometheus. It runs as a secondary routine, independent from
210// the main dataflow (i.e., samples ingestion) since metadata ingestion is not as frequent as that of samples.
211func (ingestor *DBIngestor) ingestMetadata(metadata []prompb.MetricMetadata, releaseMem func()) (uint64, error) {
212	num := len(metadata)
213	data := make([]model.Metadata, num)
214	for i := 0; i < num; i++ {
215		tmp := metadata[i]
216		data[i] = model.Metadata{
217			MetricFamily: tmp.MetricFamilyName,
218			Unit:         tmp.Unit,
219			Type:         tmp.Type.String(),
220			Help:         tmp.Help,
221		}
222	}
223	releaseMem()
224	numMetadataIngested, errMetadata := ingestor.dispatcher.InsertMetadata(data)
225	if errMetadata != nil {
226		return 0, errMetadata
227	}
228	return numMetadataIngested, nil
229}
230
231// Parts of metric creation not needed to insert data
232func (ingestor *DBIngestor) CompleteMetricCreation() error {
233	return ingestor.dispatcher.CompleteMetricCreation()
234}
235
236// Close closes the ingestor
237func (ingestor *DBIngestor) Close() {
238	ingestor.dispatcher.Close()
239}
240