1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "context" 9 "fmt" 10 "math" 11 "sort" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/jackc/pgconn" 17 "github.com/prometheus/common/model" 18 "github.com/timescale/promscale/pkg/log" 19 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 20 "github.com/timescale/promscale/pkg/pgmodel/metrics" 21 pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" 22 "github.com/timescale/promscale/pkg/pgxconn" 23) 24 25const maxInsertStmtPerTxn = 100 26 27type copyRequest struct { 28 data *pendingBuffer 29 table string 30} 31 32var ( 33 getBatchMutex = &sync.Mutex{} 34 handleDecompression = retryAfterDecompression 35) 36 37type copyBatch []copyRequest 38 39func (reqs copyBatch) VisitSeries(callBack func(s *pgmodel.Series) error) error { 40 for _, req := range reqs { 41 insertables := req.data.batch.Data() 42 for i := range insertables { 43 if err := callBack(insertables[i].Series()); err != nil { 44 return err 45 } 46 } 47 } 48 return nil 49} 50 51func (reqs copyBatch) VisitExemplar(callBack func(s *pgmodel.PromExemplars) error) error { 52 for _, req := range reqs { 53 insertables := req.data.batch.Data() 54 for i := range insertables { 55 exemplar, ok := insertables[i].(*pgmodel.PromExemplars) 56 if ok { 57 if err := callBack(exemplar); err != nil { 58 return err 59 } 60 } 61 } 62 } 63 return nil 64} 65 66// Handles actual insertion into the DB. 67// We have one of these per connection reserved for insertion. 68func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf *ExemplarLabelFormatter) { 69 requestBatch := make([]readRequest, 0, maxInsertStmtPerTxn) 70 insertBatch := make([]copyRequest, 0, cap(requestBatch)) 71 for { 72 var ok bool 73 74 //fetch the batch of request upfront to make sure all of the 75 //requests fetched are for unique metrics. This is insured 76 //by the fact that the batcher only has one outstanding readRequest 77 //at a time and the fact that we fetch the entire batch before 78 //executing any of the reads. This guarantees that we never 79 //need to batch the same metrics together in the copier 80 requestBatch, ok = copierGetBatch(requestBatch, in) 81 if !ok { 82 return 83 } 84 85 for i := range requestBatch { 86 copyRequest, ok := <-requestBatch[i].copySender 87 if !ok { 88 continue 89 } 90 insertBatch = append(insertBatch, copyRequest) 91 } 92 93 // sort to prevent deadlocks on table locks 94 sort.Slice(insertBatch, func(i, j int) bool { 95 return insertBatch[i].table < insertBatch[j].table 96 }) 97 98 err := persistBatch(conn, sw, elf, insertBatch) 99 if err != nil { 100 for i := range insertBatch { 101 insertBatch[i].data.reportResults(err) 102 insertBatch[i].data.release() 103 } 104 } 105 for i := range requestBatch { 106 requestBatch[i] = readRequest{} 107 } 108 for i := range insertBatch { 109 insertBatch[i] = copyRequest{} 110 } 111 insertBatch = insertBatch[:0] 112 requestBatch = requestBatch[:0] 113 } 114} 115 116func persistBatch(conn pgxconn.PgxConn, sw *seriesWriter, elf *ExemplarLabelFormatter, insertBatch []copyRequest) error { 117 batch := copyBatch(insertBatch) 118 err := sw.WriteSeries(batch) 119 if err != nil { 120 return fmt.Errorf("copier: writing series: %w", err) 121 } 122 err = elf.orderExemplarLabelValues(batch) 123 if err != nil { 124 return fmt.Errorf("copier: formatting exemplar label values: %w", err) 125 } 126 127 doInsertOrFallback(conn, insertBatch...) 128 return nil 129} 130 131func copierGetBatch(batch []readRequest, in <-chan readRequest) ([]readRequest, bool) { 132 //This mutex is not for safety, but rather for better batching. 133 //It guarantees that only one copier is reading from the channel at one time 134 //This ensures bigger batches as well as less spread of a 135 //single http request to multiple copiers, decreasing latency via less sync 136 //overhead especially on low-pressure systems. 137 getBatchMutex.Lock() 138 defer getBatchMutex.Unlock() 139 req, ok := <-in 140 if !ok { 141 return batch, false 142 } 143 batch = append(batch, req) 144 145 //we use a small timeout to prevent low-pressure systems from using up too many 146 //txns and putting pressure on system 147 timeout := time.After(20 * time.Millisecond) 148hot_gather: 149 for len(batch) < cap(batch) { 150 select { 151 case r2 := <-in: 152 batch = append(batch, r2) 153 case <-timeout: 154 break hot_gather 155 } 156 } 157 return batch, true 158} 159 160func doInsertOrFallback(conn pgxconn.PgxConn, reqs ...copyRequest) { 161 err, _ := insertSeries(conn, reqs...) 162 if err != nil { 163 insertBatchErrorFallback(conn, reqs...) 164 return 165 } 166 167 for i := range reqs { 168 reqs[i].data.reportResults(nil) 169 reqs[i].data.release() 170 } 171} 172 173func insertBatchErrorFallback(conn pgxconn.PgxConn, reqs ...copyRequest) { 174 for i := range reqs { 175 err, minTime := insertSeries(conn, reqs[i]) 176 if err != nil { 177 err = tryRecovery(conn, err, reqs[i], minTime) 178 } 179 180 reqs[i].data.reportResults(err) 181 reqs[i].data.release() 182 } 183} 184 185// we can currently recover from one error: 186// If we inserted into a compressed chunk, we decompress the chunk and try again. 187// Since a single batch can have both errors, we need to remember the insert method 188// we're using, so that we deduplicate if needed. 189func tryRecovery(conn pgxconn.PgxConn, err error, req copyRequest, minTime int64) error { 190 // we only recover from postgres errors right now 191 pgErr, ok := err.(*pgconn.PgError) 192 if !ok { 193 errMsg := err.Error() 194 log.Warn("msg", fmt.Sprintf("unexpected error while inserting to %s", req.table), "err", errMsg) 195 return err 196 } 197 198 if pgErr.Code == "0A000" || strings.Contains(pgErr.Message, "compressed") || strings.Contains(pgErr.Message, "insert/update/delete not permitted") { 199 // If the error was that the table is already compressed, decompress and try again. 200 return handleDecompression(conn, req, minTime) 201 } 202 203 log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "err", pgErr.Error()) 204 return pgErr 205} 206 207func skipDecompression(_ pgxconn.PgxConn, _ copyRequest, _ int64) error { 208 log.WarnRateLimited("msg", "Rejecting samples falling on compressed chunks as decompression is disabled") 209 return nil 210} 211 212// In the event we filling in old data and the chunk we want to INSERT into has 213// already been compressed, we decompress the chunk and try again. When we do 214// this we delay the recompression to give us time to insert additional data. 215func retryAfterDecompression(conn pgxconn.PgxConn, req copyRequest, minTimeInt int64) error { 216 var ( 217 table = req.table 218 minTime = model.Time(minTimeInt).Time() 219 ) 220 //how much faster are we at ingestion than wall-clock time? 221 ingestSpeedup := 2 222 //delay the next compression job proportional to the duration between now and the data time + a constant safety 223 delayBy := (time.Since(minTime) / time.Duration(ingestSpeedup)) + 60*time.Minute 224 maxDelayBy := time.Hour * 24 225 if delayBy > maxDelayBy { 226 delayBy = maxDelayBy 227 } 228 log.Warn("msg", fmt.Sprintf("Table %s was compressed, decompressing", table), "table", table, "min-time", minTime, "age", time.Since(minTime), "delay-job-by", delayBy) 229 230 _, rescheduleErr := conn.Exec(context.Background(), "SELECT "+schema.Catalog+".delay_compression_job($1, $2)", 231 table, time.Now().Add(delayBy)) 232 if rescheduleErr != nil { 233 log.Error("msg", rescheduleErr, "context", "Rescheduling compression") 234 return rescheduleErr 235 } 236 237 _, decompressErr := conn.Exec(context.Background(), "CALL "+schema.Catalog+".decompress_chunks_after($1, $2);", table, minTime) 238 if decompressErr != nil { 239 log.Error("msg", decompressErr, "context", "Decompressing chunks") 240 return decompressErr 241 } 242 243 metrics.DecompressCalls.Inc() 244 metrics.DecompressEarliest.WithLabelValues(table).Set(float64(minTime.UnixNano()) / 1e9) 245 err, _ := insertSeries(conn, req) // Attempt an insert again. 246 return err 247} 248 249/* 250Useful output for debugging batching issues 251func debugInsert() { 252 m := &dto.Metric{} 253 dbBatchInsertDuration.Write(m) 254 durs := time.Duration(*m.Histogram.SampleSum * float64(time.Second)) 255 cnt := *m.Histogram.SampleCount 256 numRowsPerBatch.Write(m) 257 rows := *m.Histogram.SampleSum 258 numInsertsPerBatch.Write(m) 259 inserts := *m.Histogram.SampleSum 260 261 fmt.Println("avg: duration/row", durs/time.Duration(rows), "rows/batch", uint64(rows)/cnt, "inserts/batch", uint64(inserts)/cnt, "rows/insert", rows/inserts) 262} 263*/ 264 265// insertSeries performs the insertion of time-series into the DB. 266func insertSeries(conn pgxconn.PgxConn, reqs ...copyRequest) (error, int64) { 267 batch := conn.NewBatch() 268 269 numRowsPerInsert := make([]int, 0, len(reqs)) 270 numRowsTotal := 0 271 totalSamples := 0 272 totalExemplars := 0 273 lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) 274 lowestMinTime := int64(math.MaxInt64) 275 for r := range reqs { 276 req := &reqs[r] 277 numSamples, numExemplars := req.data.batch.Count() 278 NumRowsPerInsert.Observe(float64(numSamples + numExemplars)) 279 280 // flatten the various series into arrays. 281 // there are four main bottlenecks for insertion: 282 // 1. The round trip time. 283 // 2. The number of requests sent. 284 // 3. The number of individual INSERT statements. 285 // 4. The amount of data sent. 286 // While the first two of these can be handled by batching, for the latter 287 // two we need to actually reduce the work done. It turns out that simply 288 // collecting all the data into a postgres array and performing a single 289 // INSERT using that overcomes most of the performance issues for sending 290 // multiple data, and brings INSERT nearly on par with CopyFrom. In the 291 // future we may wish to send compressed data instead. 292 var ( 293 hasSamples bool 294 hasExemplars bool 295 296 timeSamples []time.Time 297 timeExemplars []time.Time 298 299 valSamples []float64 300 valExemplars []float64 301 302 seriesIdSamples []int64 303 seriesIdExemplars []int64 304 305 exemplarLbls [][]string 306 ) 307 308 if numSamples > 0 { 309 timeSamples = make([]time.Time, 0, numSamples) 310 valSamples = make([]float64, 0, numSamples) 311 seriesIdSamples = make([]int64, 0, numSamples) 312 } 313 if numExemplars > 0 { 314 timeExemplars = make([]time.Time, 0, numExemplars) 315 valExemplars = make([]float64, 0, numExemplars) 316 seriesIdExemplars = make([]int64, 0, numExemplars) 317 exemplarLbls = make([][]string, 0, numExemplars) 318 } 319 320 visitor := req.data.batch.Visitor() 321 err := visitor.Visit( 322 func(t time.Time, v float64, seriesId int64) { 323 hasSamples = true 324 timeSamples = append(timeSamples, t) 325 valSamples = append(valSamples, v) 326 seriesIdSamples = append(seriesIdSamples, seriesId) 327 }, 328 func(t time.Time, v float64, seriesId int64, lvalues []string) { 329 hasExemplars = true 330 timeExemplars = append(timeExemplars, t) 331 valExemplars = append(valExemplars, v) 332 seriesIdExemplars = append(seriesIdExemplars, seriesId) 333 exemplarLbls = append(exemplarLbls, lvalues) 334 }, 335 ) 336 if err != nil { 337 return err, lowestMinTime 338 } 339 epoch := visitor.LowestEpoch() 340 if epoch < lowestEpoch { 341 lowestEpoch = epoch 342 } 343 minTime := visitor.MinTime() 344 if minTime < lowestMinTime { 345 lowestMinTime = minTime 346 } 347 348 numRowsTotal += numSamples + numExemplars 349 totalSamples += numSamples 350 totalExemplars += numExemplars 351 if hasSamples { 352 numRowsPerInsert = append(numRowsPerInsert, numSamples) 353 batch.Queue("SELECT "+schema.Catalog+".insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.table, timeSamples, valSamples, seriesIdSamples) 354 } 355 if hasExemplars { 356 // We cannot send 2-D [][]TEXT to postgres via the pgx.encoder. For this and easier querying reasons, we create a 357 // new type in postgres by the name SCHEMA_PROM.label_value_array and use that type as array (which forms a 2D array of TEXT) 358 // which is then used to push using the unnest method apprach. 359 labelValues := pgmodel.GetCustomType(pgmodel.LabelValueArray) 360 if err := labelValues.Set(exemplarLbls); err != nil { 361 return fmt.Errorf("setting prom_api.label_value_array[] value: %w", err), lowestMinTime 362 } 363 numRowsPerInsert = append(numRowsPerInsert, numExemplars) 364 batch.Queue("SELECT "+schema.Catalog+".insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::"+schema.Prom+".label_value_array[], $5::DOUBLE PRECISION[])", req.table, timeExemplars, seriesIdExemplars, labelValues, valExemplars) 365 } 366 } 367 368 //note the epoch increment takes an access exclusive on the table before incrementing. 369 //thus we don't need row locking here. Note by doing this check at the end we can 370 //have some wasted work for the inserts before this fails but this is rare. 371 //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. 372 epochCheck := fmt.Sprintf("SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN %s.epoch_abort($1) END FROM %s.ids_epoch LIMIT 1", schema.Catalog, schema.Catalog) 373 batch.Queue(epochCheck, int64(lowestEpoch)) 374 375 NumRowsPerBatch.Observe(float64(numRowsTotal)) 376 NumInsertsPerBatch.Observe(float64(len(reqs))) 377 start := time.Now() 378 results, err := conn.SendBatch(context.Background(), batch) 379 if err != nil { 380 return err, lowestMinTime 381 } 382 defer results.Close() 383 384 var affectedMetrics uint64 385 for _, numRows := range numRowsPerInsert { 386 var insertedRows int64 387 err := results.QueryRow().Scan(&insertedRows) 388 if err != nil { 389 return err, lowestMinTime 390 } 391 numRowsExpected := int64(numRows) 392 if numRowsExpected != insertedRows { 393 affectedMetrics++ 394 registerDuplicates(numRowsExpected - insertedRows) 395 } 396 } 397 numSamplesInserted.Add(float64(totalSamples)) 398 numExemplarsInserted.Add(float64(totalExemplars)) 399 400 var val []byte 401 row := results.QueryRow() 402 err = row.Scan(&val) 403 if err != nil { 404 return err, lowestMinTime 405 } 406 reportDuplicates(affectedMetrics) 407 DbBatchInsertDuration.Observe(time.Since(start).Seconds()) 408 return nil, lowestMinTime 409} 410 411func insertMetadata(conn pgxconn.PgxConn, reqs []pgmodel.Metadata) (insertedRows uint64, err error) { 412 numRows := len(reqs) 413 timeSlice := make([]time.Time, numRows) 414 metricFamilies := make([]string, numRows) 415 units := make([]string, numRows) 416 types := make([]string, numRows) 417 helps := make([]string, numRows) 418 n := time.Now() 419 for i := range reqs { 420 timeSlice[i] = n 421 metricFamilies[i] = reqs[i].MetricFamily 422 units[i] = reqs[i].Unit 423 types[i] = reqs[i].Type 424 helps[i] = reqs[i].Help 425 } 426 start := time.Now() 427 row := conn.QueryRow(context.Background(), "SELECT "+schema.Catalog+".insert_metric_metadatas($1::TIMESTAMPTZ[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[])", 428 timeSlice, metricFamilies, types, units, helps) 429 if err := row.Scan(&insertedRows); err != nil { 430 return 0, fmt.Errorf("send metadata batch: %w", err) 431 } 432 MetadataBatchInsertDuration.Observe(time.Since(start).Seconds()) 433 return insertedRows, nil 434} 435