1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package delete 6 7import ( 8 "context" 9 "fmt" 10 "time" 11 12 "github.com/prometheus/prometheus/pkg/labels" 13 "github.com/timescale/promscale/pkg/pgmodel/common/schema" 14 "github.com/timescale/promscale/pkg/pgmodel/model" 15 "github.com/timescale/promscale/pkg/pgmodel/querier" 16 "github.com/timescale/promscale/pkg/pgxconn" 17) 18 19const queryDeleteSeries = "SELECT _prom_catalog.delete_series_from_metric($1, $2)" 20 21// PgDelete deletes the series based on matchers. 22type PgDelete struct { 23 Conn pgxconn.PgxConn 24} 25 26// DeleteSeries deletes the series that matches the provided label_matchers. 27func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time) ([]string, []model.SeriesID, int, error) { 28 var ( 29 deletedSeriesIDs []model.SeriesID 30 totalRowsDeleted int 31 err error 32 metricsTouched = make(map[string]struct{}) 33 ) 34 metricNames, seriesIDMatrix, err := getMetricNameSeriesIDFromMatchers(pgDel.Conn, matchers) 35 if err != nil { 36 return nil, nil, -1, fmt.Errorf("delete-series: %w", err) 37 } 38 for metricIndex, metricName := range metricNames { 39 seriesIDs := seriesIDMatrix[metricIndex] 40 var rowsDeleted int 41 if err = pgDel.Conn.QueryRow( 42 context.Background(), 43 queryDeleteSeries, 44 metricName, 45 convertSeriesIDsToInt64s(seriesIDs), 46 ).Scan(&rowsDeleted); err != nil { 47 return getKeys(metricsTouched), deletedSeriesIDs, totalRowsDeleted, fmt.Errorf("deleting series with metric_name=%s and series_ids=%v : %w", metricName, seriesIDs, err) 48 } 49 if _, ok := metricsTouched[metricName]; !ok { 50 metricsTouched[metricName] = struct{}{} 51 } 52 deletedSeriesIDs = append(deletedSeriesIDs, seriesIDs...) 53 totalRowsDeleted += rowsDeleted 54 } 55 return getKeys(metricsTouched), deletedSeriesIDs, totalRowsDeleted, nil 56} 57 58// getMetricNameSeriesIDFromMatchers returns the metric name list and the corresponding series ID array 59// as a matrix. 60func getMetricNameSeriesIDFromMatchers(conn pgxconn.PgxConn, matchers []*labels.Matcher) ([]string, [][]model.SeriesID, error) { 61 cb, err := querier.BuildSubQueries(matchers) 62 if err != nil { 63 return nil, nil, fmt.Errorf("delete series build subqueries: %w", err) 64 } 65 clauses, values, err := cb.Build(true) 66 if err != nil { 67 return nil, nil, fmt.Errorf("delete series build clauses: %w", err) 68 } 69 metrics, schemas, correspondingSeriesIDs, err := querier.GetMetricNameSeriesIds(conn, querier.GetMetadata(clauses, values)) 70 if err != nil { 71 return nil, nil, fmt.Errorf("get metric-name series-ids: %w", err) 72 } 73 metrics, correspondingSeriesIDs = filterMetricNameSeriesIds(metrics, schemas, correspondingSeriesIDs) 74 return metrics, correspondingSeriesIDs, nil 75} 76 77// filterMetricNameSeriesIds returns the metrics, schemas and corresonding series-ids that are associated with 78// actual metric hypertables only. 79func filterMetricNameSeriesIds(metrics, schemas []string, seriesIds [][]model.SeriesID) (filteredMetrics []string, filteredSeriesIds [][]model.SeriesID) { 80 for i := range metrics { 81 metricSchema := schemas[i] 82 if metricSchema != schema.Data { 83 continue 84 } 85 metric := metrics[i] 86 correspondingSeriesIds := seriesIds[i] 87 filteredMetrics = append(filteredMetrics, metric) 88 filteredSeriesIds = append(filteredSeriesIds, correspondingSeriesIds) 89 } 90 return 91} 92 93func convertSeriesIDsToInt64s(s []model.SeriesID) []int64 { 94 temp := make([]int64, len(s)) 95 for i := range s { 96 temp[i] = int64(s[i]) 97 } 98 return temp 99} 100 101func getKeys(mapStr map[string]struct{}) (keys []string) { 102 if mapStr == nil { 103 return nil 104 } 105 keys = make([]string, 0, len(mapStr)) 106 for k := range mapStr { 107 keys = append(keys, k) 108 } 109 return 110} 111