1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "fmt" 9 "time" 10 11 "github.com/timescale/promscale/pkg/pgmodel/cache" 12 "github.com/timescale/promscale/pkg/pgmodel/common/errors" 13 "github.com/timescale/promscale/pkg/pgmodel/model" 14 "github.com/timescale/promscale/pkg/pgxconn" 15 "github.com/timescale/promscale/pkg/prompb" 16) 17 18type Cfg struct { 19 AsyncAcks bool 20 ReportInterval int 21 NumCopiers int 22 DisableEpochSync bool 23 IgnoreCompressedChunks bool 24} 25 26// DBIngestor ingest the TimeSeries data into Timescale database. 27type DBIngestor struct { 28 sCache cache.SeriesCache 29 dispatcher model.Dispatcher 30} 31 32// NewPgxIngestor returns a new Ingestor that uses connection pool and a metrics cache 33// for caching metric table names. 34func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*DBIngestor, error) { 35 dispatcher, err := newPgxDispatcher(conn, cache, sCache, eCache, cfg) 36 if err != nil { 37 return nil, err 38 } 39 return &DBIngestor{ 40 sCache: sCache, 41 dispatcher: dispatcher, 42 }, nil 43} 44 45// NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX 46// with an empty config, a new default size metrics cache and a non-ha-aware data parser 47func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) { 48 if cfg == nil { 49 cfg = &Cfg{} 50 } 51 cacheConfig := cache.DefaultConfig 52 c := cache.NewMetricCache(cacheConfig) 53 s := cache.NewSeriesCache(cacheConfig, nil) 54 e := cache.NewExemplarLabelsPosCache(cacheConfig) 55 return NewPgxIngestor(conn, c, s, e, cfg) 56} 57 58const ( 59 meta = iota 60 series 61) 62 63// result contains insert stats and is used when we ingest samples and metadata concurrently. 64type result struct { 65 id int8 66 numRows uint64 67 err error 68} 69 70// Ingest transforms and ingests the timeseries data into Timescale database. 71// input: 72// tts the []Timeseries to insert 73// req the WriteRequest backing tts. It will be added to our WriteRequest 74// pool when it is no longer needed. 75func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) { 76 activeWriteRequests.Inc() 77 defer activeWriteRequests.Dec() // Dec() is defered otherwise it will lead to loosing a decrement if some error occurs. 78 var ( 79 timeseries = r.Timeseries 80 metadata = r.Metadata 81 ) 82 release := func() { FinishWriteRequest(r) } 83 switch numTs, numMeta := len(timeseries), len(metadata); { 84 case numTs > 0 && numMeta == 0: 85 // Write request contains only time-series. 86 n, err := ingestor.ingestTimeseries(timeseries, release) 87 return n, 0, err 88 case numTs == 0 && numMeta == 0: 89 release() 90 return 0, 0, nil 91 case numMeta > 0 && numTs == 0: 92 // Write request contains only metadata. 93 n, err := ingestor.ingestMetadata(metadata, release) 94 return 0, n, err 95 default: 96 } 97 release = func() { 98 // We do not want to re-initialize the write-request when ingesting concurrently. 99 // A concurrent ingestion may not have read the write-request and we initialized it 100 // leading to data loss. 101 FinishWriteRequest(nil) 102 } 103 res := make(chan result, 2) 104 defer close(res) 105 106 go func() { 107 n, err := ingestor.ingestTimeseries(timeseries, release) 108 res <- result{series, n, err} 109 }() 110 go func() { 111 n, err := ingestor.ingestMetadata(metadata, release) 112 res <- result{meta, n, err} 113 }() 114 115 mergeErr := func(prevErr, err error, message string) error { 116 if prevErr != nil { 117 err = fmt.Errorf("%s: %s: %w", prevErr.Error(), message, err) 118 } 119 return err 120 } 121 122 for i := 0; i < 2; i++ { 123 response := <-res 124 switch response.id { 125 case series: 126 numInsertablesIngested = response.numRows 127 err = mergeErr(err, response.err, "ingesting timeseries") 128 case meta: 129 numMetadataIngested = response.numRows 130 err = mergeErr(err, response.err, "ingesting metadata") 131 } 132 } 133 // WriteRequests can contain pointers into the original buffer we deserialized 134 // them out of, and can be quite large in and of themselves. In order to prevent 135 // memory blowup, and to allow faster deserializing, we recycle the WriteRequest 136 // here, allowing it to be either garbage collected or reused for a new request. 137 // In order for this to work correctly, any data we wish to keep using (e.g. 138 // samples) must no longer be reachable from req. 139 FinishWriteRequest(r) 140 return numInsertablesIngested, numMetadataIngested, err 141} 142 143func (ingestor *DBIngestor) ingestTimeseries(timeseries []prompb.TimeSeries, releaseMem func()) (uint64, error) { 144 var ( 145 totalRowsExpected uint64 146 147 insertables = make(map[string][]model.Insertable) 148 ) 149 150 for i := range timeseries { 151 var ( 152 err error 153 series *model.Series 154 metricName string 155 156 ts = ×eries[i] 157 ) 158 if len(ts.Labels) == 0 { 159 continue 160 } 161 // Normalize and canonicalize t.Labels. 162 // After this point t.Labels should never be used again. 163 series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels) 164 if err != nil { 165 return 0, err 166 } 167 if metricName == "" { 168 return 0, errors.ErrNoMetricName 169 } 170 171 if len(ts.Samples) > 0 { 172 samples, count, err := ingestor.samples(series, ts) 173 if err != nil { 174 return 0, fmt.Errorf("samples: %w", err) 175 } 176 totalRowsExpected += uint64(count) 177 insertables[metricName] = append(insertables[metricName], samples) 178 } 179 if len(ts.Exemplars) > 0 { 180 exemplars, count, err := ingestor.exemplars(series, ts) 181 if err != nil { 182 return 0, fmt.Errorf("exemplars: %w", err) 183 } 184 totalRowsExpected += uint64(count) 185 insertables[metricName] = append(insertables[metricName], exemplars) 186 } 187 // we're going to free req after this, but we still need the samples, 188 // so nil the field 189 ts.Samples = nil 190 ts.Exemplars = nil 191 } 192 releaseMem() 193 194 numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(model.Data{Rows: insertables, ReceivedTime: time.Now()}) 195 if errSamples == nil && numInsertablesIngested != totalRowsExpected { 196 return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested) 197 } 198 return numInsertablesIngested, errSamples 199} 200 201func (ingestor *DBIngestor) samples(l *model.Series, ts *prompb.TimeSeries) (model.Insertable, int, error) { 202 return model.NewPromSamples(l, ts.Samples), len(ts.Samples), nil 203} 204 205func (ingestor *DBIngestor) exemplars(l *model.Series, ts *prompb.TimeSeries) (model.Insertable, int, error) { 206 return model.NewPromExemplars(l, ts.Exemplars), len(ts.Exemplars), nil 207} 208 209// ingestMetadata ingests metric metadata received from Prometheus. It runs as a secondary routine, independent from 210// the main dataflow (i.e., samples ingestion) since metadata ingestion is not as frequent as that of samples. 211func (ingestor *DBIngestor) ingestMetadata(metadata []prompb.MetricMetadata, releaseMem func()) (uint64, error) { 212 num := len(metadata) 213 data := make([]model.Metadata, num) 214 for i := 0; i < num; i++ { 215 tmp := metadata[i] 216 data[i] = model.Metadata{ 217 MetricFamily: tmp.MetricFamilyName, 218 Unit: tmp.Unit, 219 Type: tmp.Type.String(), 220 Help: tmp.Help, 221 } 222 } 223 releaseMem() 224 numMetadataIngested, errMetadata := ingestor.dispatcher.InsertMetadata(data) 225 if errMetadata != nil { 226 return 0, errMetadata 227 } 228 return numMetadataIngested, nil 229} 230 231// Parts of metric creation not needed to insert data 232func (ingestor *DBIngestor) CompleteMetricCreation() error { 233 return ingestor.dispatcher.CompleteMetricCreation() 234} 235 236// Close closes the ingestor 237func (ingestor *DBIngestor) Close() { 238 ingestor.dispatcher.Close() 239} 240