1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package ingestor 6 7import ( 8 "fmt" 9 "testing" 10 "time" 11 12 "github.com/jackc/pgtype" 13 "github.com/pkg/errors" 14 "github.com/prometheus/prometheus/pkg/labels" 15 "github.com/stretchr/testify/require" 16 "github.com/timescale/promscale/pkg/pgmodel/cache" 17 pgmodelErrs "github.com/timescale/promscale/pkg/pgmodel/common/errors" 18 "github.com/timescale/promscale/pkg/pgmodel/model" 19 pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" 20 "github.com/timescale/promscale/pkg/pgmodel/model/pgutf8str" 21 "github.com/timescale/promscale/pkg/prompb" 22 tput "github.com/timescale/promscale/pkg/util/throughput" 23) 24 25func getTestLabelArray(t *testing.T, l [][]int32) *pgtype.ArrayType { 26 model.SetLabelArrayOIDForTest(0) 27 labelArrayArray := model.GetCustomType(model.LabelArray) 28 err := labelArrayArray.Set(l) 29 require.NoError(t, err) 30 return labelArrayArray 31} 32 33func init() { 34 tput.InitWatcher(time.Second) 35} 36 37type sVisitor []model.Insertable 38 39func (c sVisitor) VisitSeries(cb func(s *pgmodel.Series) error) error { 40 for _, insertable := range c { 41 err := cb(insertable.Series()) 42 if err != nil { 43 return err 44 } 45 } 46 return nil 47} 48 49func TestPGXInserterInsertSeries(t *testing.T) { 50 testCases := []struct { 51 name string 52 series []labels.Labels 53 sqlQueries []model.SqlQuery 54 }{ 55 { 56 name: "Zero series", 57 }, 58 { 59 name: "One series", 60 series: []labels.Labels{ 61 { 62 {Name: "name_1", Value: "value_1"}, 63 {Name: "__name__", Value: "metric_1"}, 64 }, 65 }, 66 67 sqlQueries: []model.SqlQuery{ 68 {Sql: "BEGIN;"}, 69 { 70 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 71 Args: []interface{}(nil), 72 Results: model.RowResults{{int64(1)}}, 73 Err: error(nil), 74 }, 75 {Sql: "COMMIT;"}, 76 {Sql: "BEGIN;"}, 77 { 78 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 79 Args: []interface{}{ 80 "metric_1", 81 []string{"__name__", "name_1"}, 82 []string{"metric_1", "value_1"}, 83 }, 84 Results: model.RowResults{ 85 {[]int32{1, 2}, []int32{1, 2}, []string{"__name__", "name_1"}, []string{"metric_1", "value_1"}}, 86 }, 87 Err: error(nil), 88 }, 89 {Sql: "COMMIT;"}, 90 {Sql: "BEGIN;"}, 91 { 92 Sql: seriesInsertSQL, 93 Args: []interface{}{ 94 "metric_1", 95 getTestLabelArray(t, [][]int32{{1, 2}}), 96 }, 97 Results: model.RowResults{{int64(1), int64(1)}}, 98 Err: error(nil), 99 }, 100 {Sql: "COMMIT;"}, 101 }, 102 }, 103 { 104 name: "Two series", 105 series: []labels.Labels{ 106 { 107 {Name: "name_1", Value: "value_1"}, 108 {Name: "__name__", Value: "metric_1"}, 109 }, 110 { 111 {Name: "name_2", Value: "value_2"}, 112 {Name: "__name__", Value: "metric_1"}, 113 }, 114 }, 115 sqlQueries: []model.SqlQuery{ 116 {Sql: "BEGIN;"}, 117 { 118 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 119 Args: []interface{}(nil), 120 Results: model.RowResults{{int64(1)}}, 121 Err: error(nil), 122 }, 123 {Sql: "COMMIT;"}, 124 {Sql: "BEGIN;"}, 125 { 126 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 127 Args: []interface{}{ 128 "metric_1", 129 []string{"__name__", "name_1", "name_2"}, 130 []string{"metric_1", "value_1", "value_2"}, 131 }, 132 Results: model.RowResults{ 133 {[]int32{1, 2, 3}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_2"}, []string{"metric_1", "value_1", "value_2"}}, 134 }, 135 Err: error(nil), 136 }, 137 {Sql: "COMMIT;"}, 138 {Sql: "BEGIN;"}, 139 { 140 Sql: seriesInsertSQL, 141 Args: []interface{}{ 142 "metric_1", 143 getTestLabelArray(t, [][]int32{{1, 2}, {1, 0, 3}}), 144 }, 145 Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}}, 146 Err: error(nil), 147 }, 148 {Sql: "COMMIT;"}, 149 }, 150 }, 151 { 152 name: "Double series", 153 series: []labels.Labels{ 154 { 155 {Name: "name_1", Value: "value_1"}, 156 {Name: "__name__", Value: "metric_1"}}, 157 { 158 {Name: "name_2", Value: "value_2"}, 159 {Name: "__name__", Value: "metric_1"}}, 160 { 161 {Name: "name_1", Value: "value_1"}, 162 {Name: "__name__", Value: "metric_1"}, 163 }, 164 }, 165 sqlQueries: []model.SqlQuery{ 166 {Sql: "BEGIN;"}, 167 { 168 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 169 Args: []interface{}(nil), 170 Results: model.RowResults{{int64(1)}}, 171 Err: error(nil), 172 }, 173 {Sql: "COMMIT;"}, 174 {Sql: "BEGIN;"}, 175 { 176 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 177 Args: []interface{}{ 178 "metric_1", 179 []string{"__name__", "name_1", "name_2"}, 180 []string{"metric_1", "value_1", "value_2"}, 181 }, 182 Results: model.RowResults{ 183 {[]int32{1, 2, 3}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_2"}, []string{"metric_1", "value_1", "value_2"}}, 184 }, 185 Err: error(nil), 186 }, 187 {Sql: "COMMIT;"}, 188 {Sql: "BEGIN;"}, 189 { 190 Sql: seriesInsertSQL, 191 Args: []interface{}{ 192 "metric_1", 193 getTestLabelArray(t, [][]int32{{1, 2}, {1, 0, 3}, {1, 2}}), 194 }, 195 Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}, {int64(1), int64(1)}}, 196 Err: error(nil), 197 }, 198 {Sql: "COMMIT;"}, 199 }, 200 }, 201 { 202 name: "Query err", 203 series: []labels.Labels{ 204 { 205 {Name: "name_1", Value: "value_1"}, 206 {Name: "__name__", Value: "metric_1"}}, 207 { 208 {Name: "name_2", Value: "value_2"}, 209 {Name: "__name__", Value: "metric_1"}, 210 }, 211 }, 212 sqlQueries: []model.SqlQuery{ 213 {Sql: "BEGIN;"}, 214 { 215 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 216 Args: []interface{}(nil), 217 Results: model.RowResults{{int64(1)}}, 218 Err: error(nil), 219 }, 220 {Sql: "COMMIT;"}, 221 {Sql: "BEGIN;"}, 222 { 223 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 224 Args: []interface{}{ 225 "metric_1", 226 []string{"__name__", "name_1", "name_2"}, 227 []string{"metric_1", "value_1", "value_2"}, 228 }, 229 Results: model.RowResults{ 230 {int32(1), int32(1), "__name__", "metric_1"}, 231 {int32(1), int32(2), "__name__", "metric_2"}, 232 {int32(2), int32(3), "name_1", "value_1"}, 233 {int32(3), int32(4), "name_2", "value_2"}, 234 }, 235 Err: fmt.Errorf("some query error"), 236 }, 237 {Sql: "COMMIT;"}, 238 }, 239 }, 240 } 241 242 for _, c := range testCases { 243 t.Run(c.name, func(t *testing.T) { 244 for i := range c.sqlQueries { 245 for j := range c.sqlQueries[i].Args { 246 if _, ok := c.sqlQueries[i].Args[j].([]string); ok { 247 tmp := &pgutf8str.TextArray{} 248 err := tmp.Set(c.sqlQueries[i].Args[j]) 249 require.NoError(t, err) 250 c.sqlQueries[i].Args[j] = tmp 251 } 252 } 253 } 254 mock := model.NewSqlRecorder(c.sqlQueries, t) 255 scache := cache.NewSeriesCache(cache.DefaultConfig, nil) 256 scache.Reset() 257 258 sw := NewSeriesWriter(mock, 0) 259 260 lsi := make([]model.Insertable, 0) 261 for _, ser := range c.series { 262 ls, err := scache.GetSeriesFromLabels(ser) 263 if err != nil { 264 t.Errorf("invalid labels %+v, %v", ls, err) 265 } 266 lsi = append(lsi, model.NewPromExemplars(ls, nil)) 267 } 268 269 err := sw.WriteSeries(sVisitor(lsi)) 270 if err != nil { 271 foundErr := false 272 for _, q := range c.sqlQueries { 273 if q.Err != nil { 274 foundErr = true 275 if !errors.Is(err, q.Err) { 276 t.Errorf("unexpected query error:\ngot\n%s\nwanted\n%s", err, q.Err) 277 } 278 } 279 } 280 if !foundErr { 281 t.Errorf("unexpected error: %v", err) 282 } 283 } 284 285 if err == nil { 286 for _, si := range lsi { 287 si, se, err := si.Series().GetSeriesID() 288 require.NoError(t, err) 289 require.True(t, si > 0, "series id not set") 290 require.True(t, se > 0, "epoch not set") 291 } 292 } 293 }) 294 } 295} 296 297func TestPGXInserterCacheReset(t *testing.T) { 298 series := []labels.Labels{ 299 { 300 {Name: "__name__", Value: "metric_1"}, 301 {Name: "name_1", Value: "value_1"}, 302 }, 303 { 304 {Name: "name_1", Value: "value_2"}, 305 {Name: "__name__", Value: "metric_1"}, 306 }, 307 } 308 309 sqlQueries := []model.SqlQuery{ 310 311 // first series cache fetch 312 {Sql: "BEGIN;"}, 313 { 314 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 315 Args: []interface{}(nil), 316 Results: model.RowResults{{int64(1)}}, 317 Err: error(nil), 318 }, 319 {Sql: "COMMIT;"}, 320 {Sql: "BEGIN;"}, 321 { 322 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 323 Args: []interface{}{ 324 "metric_1", 325 []string{"__name__", "name_1", "name_1"}, 326 []string{"metric_1", "value_1", "value_2"}, 327 }, 328 Results: model.RowResults{ 329 {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, 330 }, 331 Err: error(nil), 332 }, 333 {Sql: "COMMIT;"}, 334 {Sql: "BEGIN;"}, 335 { 336 Sql: seriesInsertSQL, 337 Args: []interface{}{ 338 "metric_1", 339 getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), 340 }, 341 Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}}, 342 Err: error(nil), 343 }, 344 {Sql: "COMMIT;"}, 345 346 // first labels cache refresh, does not trash 347 { 348 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 349 Args: []interface{}(nil), 350 Results: model.RowResults{{int64(1)}}, 351 Err: error(nil), 352 }, 353 354 // second labels cache refresh, trash the cache 355 { 356 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 357 Args: []interface{}(nil), 358 Results: model.RowResults{{int64(2)}}, 359 Err: error(nil), 360 }, 361 {Sql: "BEGIN;"}, 362 363 // repopulate the cache 364 { 365 Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", 366 Args: []interface{}(nil), 367 Results: model.RowResults{{int64(2)}}, 368 Err: error(nil), 369 }, 370 {Sql: "COMMIT;"}, 371 {Sql: "BEGIN;"}, 372 { 373 Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3)", 374 Args: []interface{}{ 375 "metric_1", 376 []string{"__name__", "name_1", "name_1"}, 377 []string{"metric_1", "value_1", "value_2"}, 378 }, 379 Results: model.RowResults{ 380 {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, 381 }, 382 Err: error(nil), 383 }, 384 {Sql: "COMMIT;"}, 385 {Sql: "BEGIN;"}, 386 { 387 Sql: seriesInsertSQL, 388 Args: []interface{}{ 389 "metric_1", 390 getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), 391 }, 392 Results: model.RowResults{{int64(3), int64(1)}, {int64(4), int64(2)}}, 393 Err: error(nil), 394 }, 395 {Sql: "COMMIT;"}, 396 } 397 398 for i := range sqlQueries { 399 for j := range sqlQueries[i].Args { 400 if _, ok := sqlQueries[i].Args[j].([]string); ok { 401 tmp := &pgutf8str.TextArray{} 402 err := tmp.Set(sqlQueries[i].Args[j]) 403 require.NoError(t, err) 404 sqlQueries[i].Args[j] = tmp 405 } 406 } 407 } 408 409 mock := model.NewSqlRecorder(sqlQueries, t) 410 scache := cache.NewSeriesCache(cache.DefaultConfig, nil) 411 412 sw := NewSeriesWriter(mock, 0) 413 inserter := pgxDispatcher{ 414 conn: mock, 415 scache: scache, 416 } 417 418 makeSamples := func(series []labels.Labels) []model.Insertable { 419 lsi := make([]model.Insertable, 0) 420 for _, ser := range series { 421 ls, err := scache.GetSeriesFromLabels(ser) 422 if err != nil { 423 t.Errorf("invalid labels %+v, %v", ls, err) 424 } 425 lsi = append(lsi, model.NewPromSamples(ls, nil)) 426 } 427 return lsi 428 } 429 430 samples := makeSamples(series) 431 err := sw.WriteSeries(sVisitor(samples)) 432 if err != nil { 433 t.Fatal(err) 434 } 435 436 expectedIds := []model.SeriesID{ 437 model.SeriesID(1), 438 model.SeriesID(2), 439 } 440 441 for index, si := range samples { 442 _, _, ok := si.Series().NameValues() 443 require.False(t, ok) 444 expectedId := expectedIds[index] 445 gotId, _, err := si.Series().GetSeriesID() 446 require.NoError(t, err) 447 if gotId != expectedId { 448 t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) 449 } 450 } 451 452 // refreshing during the same epoch givesthe same IDs without checking the DB 453 _, err = inserter.refreshSeriesEpoch(1) 454 require.NoError(t, err) 455 456 samples = makeSamples(series) 457 err = sw.WriteSeries(sVisitor(samples)) 458 if err != nil { 459 t.Fatal(err) 460 } 461 462 for index, si := range samples { 463 _, _, ok := si.Series().NameValues() 464 require.False(t, ok) 465 expectedId := expectedIds[index] 466 gotId, _, err := si.Series().GetSeriesID() 467 require.NoError(t, err) 468 if gotId != expectedId { 469 t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) 470 } 471 } 472 473 // trash the cache 474 _, err = inserter.refreshSeriesEpoch(1) 475 require.NoError(t, err) 476 477 // retrying rechecks the DB and uses the new IDs 478 samples = makeSamples(series) 479 err = sw.WriteSeries(sVisitor(samples)) 480 if err != nil { 481 t.Fatal(err) 482 } 483 484 expectedIds = []model.SeriesID{ 485 model.SeriesID(3), 486 model.SeriesID(4), 487 } 488 489 for index, si := range samples { 490 _, _, ok := si.Series().NameValues() 491 require.False(t, ok) 492 expectedId := expectedIds[index] 493 gotId, _, err := si.Series().GetSeriesID() 494 require.NoError(t, err) 495 if gotId != expectedId { 496 t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) 497 } 498 } 499} 500 501func TestPGXInserterInsertData(t *testing.T) { 502 makeLabel := func() *model.Series { 503 l := &model.Series{} 504 l.SetSeriesID(1, 1) 505 return l 506 } 507 508 testCases := []struct { 509 name string 510 rows map[string][]model.Insertable 511 sqlQueries []model.SqlQuery 512 metricsGetErr error 513 }{ 514 { 515 name: "Zero data", 516 sqlQueries: []model.SqlQuery{ 517 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 518 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 519 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 520 }, 521 }, 522 { 523 name: "One data", 524 rows: map[string][]model.Insertable{ 525 "metric_0": {model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1))}, 526 }, 527 sqlQueries: []model.SqlQuery{ 528 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 529 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 530 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 531 { 532 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 533 Args: []interface{}{"metric_0"}, 534 Results: model.RowResults{{"metric_0", false}}, 535 Err: error(nil), 536 }, 537 { 538 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 539 Args: []interface{}{ 540 "metric_0", 541 []time.Time{time.Unix(0, 0)}, 542 []float64{0}, 543 []int64{1}, 544 }, 545 Results: model.RowResults{{int64(1)}}, 546 Err: error(nil), 547 }, 548 { 549 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 550 Args: []interface{}{int64(1)}, 551 Results: model.RowResults{{[]byte{}}}, 552 Err: error(nil), 553 }, 554 }, 555 }, 556 { 557 name: "Two data", 558 rows: map[string][]model.Insertable{ 559 "metric_0": { 560 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 561 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 562 }, 563 }, 564 sqlQueries: []model.SqlQuery{ 565 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 566 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 567 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 568 { 569 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 570 Args: []interface{}{"metric_0"}, 571 Results: model.RowResults{{"metric_0", false}}, 572 Err: error(nil), 573 }, 574 575 { 576 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 577 Args: []interface{}{ 578 "metric_0", 579 []time.Time{time.Unix(0, 0), time.Unix(0, 0)}, 580 []float64{0, 0}, 581 []int64{1, 1}, 582 }, 583 Results: model.RowResults{{int64(1)}}, 584 Err: error(nil), 585 }, 586 { 587 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 588 Args: []interface{}{int64(1)}, 589 Results: model.RowResults{{[]byte{}}}, 590 Err: error(nil), 591 }, 592 }, 593 }, 594 { 595 name: "Create table error", 596 rows: map[string][]model.Insertable{ 597 "metric_0": { 598 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 599 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 600 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 601 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 602 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 603 }, 604 }, 605 sqlQueries: []model.SqlQuery{ 606 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 607 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 608 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 609 { 610 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 611 Args: []interface{}{"metric_0"}, 612 Results: model.RowResults{{"metric_0", false}}, 613 Err: fmt.Errorf("create table error"), 614 }, 615 }, 616 }, 617 { 618 name: "Epoch Error", 619 rows: map[string][]model.Insertable{ 620 "metric_0": { 621 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 622 }, 623 }, 624 sqlQueries: []model.SqlQuery{ 625 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 626 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 627 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 628 { 629 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 630 Args: []interface{}{"metric_0"}, 631 Results: model.RowResults{{"metric_0", false}}, 632 Err: error(nil), 633 }, 634 635 { 636 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 637 Args: []interface{}{ 638 "metric_0", 639 []time.Time{time.Unix(0, 0)}, 640 []float64{0}, 641 []int64{1}, 642 }, 643 Results: model.RowResults{{int64(1)}}, 644 Err: error(nil), 645 }, 646 { 647 //this is the attempt on the full batch 648 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 649 Args: []interface{}{int64(1)}, 650 Results: model.RowResults{{[]byte{}}}, 651 Err: fmt.Errorf("epoch error"), 652 }, 653 654 { 655 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 656 Args: []interface{}{ 657 "metric_0", 658 []time.Time{time.Unix(0, 0)}, 659 []float64{0}, 660 []int64{1}, 661 }, 662 Results: model.RowResults{{int64(1)}}, 663 Err: error(nil), 664 }, 665 { 666 //this is the attempt on the individual copyRequests 667 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 668 Args: []interface{}{int64(1)}, 669 Results: model.RowResults{{[]byte{}}}, 670 Err: fmt.Errorf("epoch error"), 671 }, 672 }, 673 }, 674 { 675 name: "Copy from error", 676 rows: map[string][]model.Insertable{ 677 "metric_0": { 678 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 679 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 680 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 681 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 682 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 683 }, 684 }, 685 686 sqlQueries: []model.SqlQuery{ 687 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 688 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 689 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 690 { 691 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 692 Args: []interface{}{"metric_0"}, 693 Results: model.RowResults{{"metric_0", false}}, 694 Err: error(nil), 695 }, 696 697 { 698 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 699 Args: []interface{}{ 700 "metric_0", 701 []time.Time{time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0)}, 702 make([]float64, 5), 703 []int64{1, 1, 1, 1, 1}, 704 }, 705 Results: model.RowResults{{int64(1)}}, 706 Err: fmt.Errorf("some INSERT error"), 707 }, 708 { 709 // this is the entire batch insert 710 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 711 Args: []interface{}{int64(1)}, 712 Results: model.RowResults{{[]byte{}}}, 713 Err: error(nil), 714 }, 715 716 { 717 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 718 Args: []interface{}{ 719 "metric_0", 720 []time.Time{time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0), time.Unix(0, 0)}, 721 make([]float64, 5), 722 []int64{1, 1, 1, 1, 1}, 723 }, 724 Results: model.RowResults{{int64(1)}}, 725 Err: fmt.Errorf("some INSERT error"), 726 }, 727 { 728 // this is the retry on individual copy requests 729 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 730 Args: []interface{}{int64(1)}, 731 Results: model.RowResults{{[]byte{}}}, 732 Err: error(nil), 733 }, 734 }, 735 }, 736 { 737 name: "Can't find/create table in DB", 738 rows: map[string][]model.Insertable{ 739 "metric_0": { 740 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 741 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 742 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 743 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 744 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 745 }, 746 }, 747 sqlQueries: []model.SqlQuery{ 748 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 749 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 750 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 751 { 752 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 753 Args: []interface{}{"metric_0"}, 754 // no results is deliberate 755 Results: model.RowResults{}, 756 Err: error(nil), 757 }, 758 }, 759 }, 760 { 761 //cache errors get recovered from and the insert succeeds 762 name: "Metrics cache get error", 763 rows: map[string][]model.Insertable{ 764 "metric_0": { 765 model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), 766 }, 767 }, 768 metricsGetErr: fmt.Errorf("some metrics error"), 769 sqlQueries: []model.SqlQuery{ 770 {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, 771 {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, 772 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 773 { 774 Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)", 775 Args: []interface{}{"metric_0"}, 776 Results: model.RowResults{{"metric_0", true}}, 777 Err: error(nil), 778 }, 779 {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, 780 { 781 Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", 782 Args: []interface{}{ 783 "metric_0", 784 []time.Time{time.Unix(0, 0)}, 785 []float64{0}, 786 []int64{1}, 787 }, 788 Results: model.RowResults{{int64(1)}}, 789 Err: error(nil), 790 }, 791 { 792 Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", 793 Args: []interface{}{int64(1)}, 794 Results: model.RowResults{{[]byte{}}}, 795 Err: error(nil), 796 }, 797 }, 798 }, 799 } 800 801 for _, co := range testCases { 802 c := co 803 t.Run(c.name, func(t *testing.T) { 804 mock := model.NewSqlRecorder(c.sqlQueries, t) 805 scache := cache.NewSeriesCache(cache.DefaultConfig, nil) 806 807 mockMetrics := &model.MockMetricCache{ 808 MetricCache: make(map[string]model.MetricInfo), 809 GetMetricErr: c.metricsGetErr, 810 } 811 err := mockMetrics.Set( 812 "prom_data", 813 "metric_1", 814 model.MetricInfo{ 815 TableSchema: "prom_data", 816 TableName: "metricTableName_1", 817 SeriesTable: "metric_1", 818 }, false) 819 if err != nil { 820 t.Fatalf("error setting up mock cache: %s", err.Error()) 821 } 822 inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true}) 823 if err != nil { 824 t.Fatal(err) 825 } 826 defer inserter.Close() 827 828 _, err = inserter.InsertTs(model.Data{Rows: c.rows}) 829 830 var expErr error 831 switch { 832 case c.metricsGetErr != nil: 833 //cache errors recover 834 expErr = nil 835 case c.name == "Can't find/create table in DB": 836 expErr = pgmodelErrs.ErrMissingTableName 837 default: 838 for _, q := range c.sqlQueries { 839 if q.Err != nil { 840 expErr = q.Err 841 } 842 } 843 } 844 845 if err != nil { 846 if !errors.Is(err, expErr) { 847 t.Errorf("unexpected error:\ngot\n%s\nwanted\n%s", err, expErr) 848 } 849 850 return 851 } 852 853 if expErr != nil { 854 t.Errorf("expected error:\ngot\nnil\nwanted\n%s", expErr) 855 } 856 857 if len(c.rows) == 0 { 858 return 859 } 860 }) 861 } 862} 863