1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package delete
6
7import (
8	"context"
9	"fmt"
10	"time"
11
12	"github.com/prometheus/prometheus/pkg/labels"
13	"github.com/timescale/promscale/pkg/pgmodel/common/schema"
14	"github.com/timescale/promscale/pkg/pgmodel/model"
15	"github.com/timescale/promscale/pkg/pgmodel/querier"
16	"github.com/timescale/promscale/pkg/pgxconn"
17)
18
19const queryDeleteSeries = "SELECT _prom_catalog.delete_series_from_metric($1, $2)"
20
21// PgDelete deletes the series based on matchers.
22type PgDelete struct {
23	Conn pgxconn.PgxConn
24}
25
26// DeleteSeries deletes the series that matches the provided label_matchers.
27func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time) ([]string, []model.SeriesID, int, error) {
28	var (
29		deletedSeriesIDs []model.SeriesID
30		totalRowsDeleted int
31		err              error
32		metricsTouched   = make(map[string]struct{})
33	)
34	metricNames, seriesIDMatrix, err := getMetricNameSeriesIDFromMatchers(pgDel.Conn, matchers)
35	if err != nil {
36		return nil, nil, -1, fmt.Errorf("delete-series: %w", err)
37	}
38	for metricIndex, metricName := range metricNames {
39		seriesIDs := seriesIDMatrix[metricIndex]
40		var rowsDeleted int
41		if err = pgDel.Conn.QueryRow(
42			context.Background(),
43			queryDeleteSeries,
44			metricName,
45			convertSeriesIDsToInt64s(seriesIDs),
46		).Scan(&rowsDeleted); err != nil {
47			return getKeys(metricsTouched), deletedSeriesIDs, totalRowsDeleted, fmt.Errorf("deleting series with metric_name=%s and series_ids=%v : %w", metricName, seriesIDs, err)
48		}
49		if _, ok := metricsTouched[metricName]; !ok {
50			metricsTouched[metricName] = struct{}{}
51		}
52		deletedSeriesIDs = append(deletedSeriesIDs, seriesIDs...)
53		totalRowsDeleted += rowsDeleted
54	}
55	return getKeys(metricsTouched), deletedSeriesIDs, totalRowsDeleted, nil
56}
57
58// getMetricNameSeriesIDFromMatchers returns the metric name list and the corresponding series ID array
59// as a matrix.
60func getMetricNameSeriesIDFromMatchers(conn pgxconn.PgxConn, matchers []*labels.Matcher) ([]string, [][]model.SeriesID, error) {
61	cb, err := querier.BuildSubQueries(matchers)
62	if err != nil {
63		return nil, nil, fmt.Errorf("delete series build subqueries: %w", err)
64	}
65	clauses, values, err := cb.Build(true)
66	if err != nil {
67		return nil, nil, fmt.Errorf("delete series build clauses: %w", err)
68	}
69	metrics, schemas, correspondingSeriesIDs, err := querier.GetMetricNameSeriesIds(conn, querier.GetMetadata(clauses, values))
70	if err != nil {
71		return nil, nil, fmt.Errorf("get metric-name series-ids: %w", err)
72	}
73	metrics, correspondingSeriesIDs = filterMetricNameSeriesIds(metrics, schemas, correspondingSeriesIDs)
74	return metrics, correspondingSeriesIDs, nil
75}
76
77// filterMetricNameSeriesIds returns the metrics, schemas and corresonding series-ids that are associated with
78// actual metric hypertables only.
79func filterMetricNameSeriesIds(metrics, schemas []string, seriesIds [][]model.SeriesID) (filteredMetrics []string, filteredSeriesIds [][]model.SeriesID) {
80	for i := range metrics {
81		metricSchema := schemas[i]
82		if metricSchema != schema.Data {
83			continue
84		}
85		metric := metrics[i]
86		correspondingSeriesIds := seriesIds[i]
87		filteredMetrics = append(filteredMetrics, metric)
88		filteredSeriesIds = append(filteredSeriesIds, correspondingSeriesIds)
89	}
90	return
91}
92
93func convertSeriesIDsToInt64s(s []model.SeriesID) []int64 {
94	temp := make([]int64, len(s))
95	for i := range s {
96		temp[i] = int64(s[i])
97	}
98	return temp
99}
100
101func getKeys(mapStr map[string]struct{}) (keys []string) {
102	if mapStr == nil {
103		return nil
104	}
105	keys = make([]string, 0, len(mapStr))
106	for k := range mapStr {
107		keys = append(keys, k)
108	}
109	return
110}
111