1package purger
2
3import (
4	"context"
5	"encoding/binary"
6	"encoding/hex"
7	"errors"
8	"flag"
9	"fmt"
10	"hash/fnv"
11	"strconv"
12	"strings"
13	"time"
14
15	"github.com/grafana/loki/pkg/storage/chunk"
16
17	"github.com/prometheus/common/model"
18	"github.com/prometheus/prometheus/pkg/labels"
19)
20
21type (
22	DeleteRequestStatus string
23	CacheKind           string
24	indexType           string
25)
26
27const (
28	StatusReceived     DeleteRequestStatus = "received"
29	StatusBuildingPlan DeleteRequestStatus = "buildingPlan"
30	StatusDeleting     DeleteRequestStatus = "deleting"
31	StatusProcessed    DeleteRequestStatus = "processed"
32
33	separator = "\000" // separator for series selectors in delete requests
34
35	// CacheKindStore is for cache gen number for store cache
36	CacheKindStore CacheKind = "store"
37	// CacheKindResults is for cache gen number for results cache
38	CacheKindResults CacheKind = "results"
39
40	deleteRequestID      indexType = "1"
41	deleteRequestDetails indexType = "2"
42	cacheGenNum          indexType = "3"
43)
44
45var (
46	pendingDeleteRequestStatuses = []DeleteRequestStatus{StatusReceived, StatusBuildingPlan, StatusDeleting}
47
48	ErrDeleteRequestNotFound = errors.New("could not find matching delete request")
49)
50
51// DeleteRequest holds all the details about a delete request.
52type DeleteRequest struct {
53	RequestID string              `json:"request_id"`
54	UserID    string              `json:"-"`
55	StartTime model.Time          `json:"start_time"`
56	EndTime   model.Time          `json:"end_time"`
57	Selectors []string            `json:"selectors"`
58	Status    DeleteRequestStatus `json:"status"`
59	Matchers  [][]*labels.Matcher `json:"-"`
60	CreatedAt model.Time          `json:"created_at"`
61}
62
63// cacheGenNumbers holds store and results cache gen numbers for a user.
64type cacheGenNumbers struct {
65	store, results string
66}
67
68// DeleteStore provides all the methods required to manage lifecycle of delete request and things related to it.
69type DeleteStore struct {
70	cfg         DeleteStoreConfig
71	indexClient chunk.IndexClient
72}
73
74// DeleteStoreConfig holds configuration for delete store.
75type DeleteStoreConfig struct {
76	Store             string                  `yaml:"store"`
77	RequestsTableName string                  `yaml:"requests_table_name"`
78	ProvisionConfig   TableProvisioningConfig `yaml:"table_provisioning"`
79}
80
81// RegisterFlags adds the flags required to configure this flag set.
82func (cfg *DeleteStoreConfig) RegisterFlags(f *flag.FlagSet) {
83	cfg.ProvisionConfig.RegisterFlags("deletes.table", f)
84	f.StringVar(&cfg.Store, "deletes.store", "", "Store for keeping delete request")
85	f.StringVar(&cfg.RequestsTableName, "deletes.requests-table-name", "delete_requests", "Name of the table which stores delete requests")
86}
87
88// NewDeleteStore creates a store for managing delete requests.
89func NewDeleteStore(cfg DeleteStoreConfig, indexClient chunk.IndexClient) (*DeleteStore, error) {
90	ds := DeleteStore{
91		cfg:         cfg,
92		indexClient: indexClient,
93	}
94
95	return &ds, nil
96}
97
98// Add creates entries for a new delete request.
99func (ds *DeleteStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
100	return ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, selectors)
101}
102
103// addDeleteRequest is also used for tests to create delete requests with different createdAt time.
104func (ds *DeleteStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, selectors []string) error {
105	requestID := generateUniqueID(userID, selectors)
106
107	for {
108		_, err := ds.GetDeleteRequest(ctx, userID, string(requestID))
109		if err != nil {
110			if err == ErrDeleteRequestNotFound {
111				break
112			}
113			return err
114		}
115
116		// we have a collision here, lets recreate a new requestID and check for collision
117		time.Sleep(time.Millisecond)
118		requestID = generateUniqueID(userID, selectors)
119	}
120
121	// userID, requestID
122	userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
123
124	// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
125	// We don't want to set anything in hash key here since we would want to find delete requests by just status
126	writeBatch := ds.indexClient.NewWriteBatch()
127	writeBatch.Add(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
128
129	// Add another entry with additional details like creation time, time range of delete request and selectors in value
130	rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
131	writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
132		[]byte(rangeValue), []byte(strings.Join(selectors, separator)))
133
134	// we update only cache gen number because only query responses are changing at this stage.
135	// we still have to query data from store for doing query time filtering and we don't want to invalidate its results now.
136	writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindResults),
137		[]byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10)))
138
139	return ds.indexClient.BatchWrite(ctx, writeBatch)
140}
141
142// GetDeleteRequestsByStatus returns all delete requests for given status.
143func (ds *DeleteStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
144	return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
145		TableName:  ds.cfg.RequestsTableName,
146		HashValue:  string(deleteRequestID),
147		ValueEqual: []byte(status),
148	})
149}
150
151// GetDeleteRequestsForUserByStatus returns all delete requests for a user with given status.
152func (ds *DeleteStore) GetDeleteRequestsForUserByStatus(ctx context.Context, userID string, status DeleteRequestStatus) ([]DeleteRequest, error) {
153	return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
154		TableName:        ds.cfg.RequestsTableName,
155		HashValue:        string(deleteRequestID),
156		RangeValuePrefix: []byte(userID),
157		ValueEqual:       []byte(status),
158	})
159}
160
161// GetAllDeleteRequestsForUser returns all delete requests for a user.
162func (ds *DeleteStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
163	return ds.queryDeleteRequests(ctx, chunk.IndexQuery{
164		TableName:        ds.cfg.RequestsTableName,
165		HashValue:        string(deleteRequestID),
166		RangeValuePrefix: []byte(userID),
167	})
168}
169
170// UpdateStatus updates status of a delete request.
171func (ds *DeleteStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
172	userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
173
174	writeBatch := ds.indexClient.NewWriteBatch()
175	writeBatch.Add(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))
176
177	if newStatus == StatusProcessed {
178		// we have deleted data from store so invalidate cache only for store since we don't have to do runtime filtering anymore.
179		// we don't have to change cache gen number because we were anyways doing runtime filtering
180		writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindStore), []byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10)))
181	}
182
183	return ds.indexClient.BatchWrite(ctx, writeBatch)
184}
185
186// GetDeleteRequest returns delete request with given requestID.
187func (ds *DeleteStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
188	userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
189
190	deleteRequests, err := ds.queryDeleteRequests(ctx, chunk.IndexQuery{
191		TableName:        ds.cfg.RequestsTableName,
192		HashValue:        string(deleteRequestID),
193		RangeValuePrefix: []byte(userIDAndRequestID),
194	})
195	if err != nil {
196		return nil, err
197	}
198
199	if len(deleteRequests) == 0 {
200		return nil, ErrDeleteRequestNotFound
201	}
202
203	return &deleteRequests[0], nil
204}
205
206// GetPendingDeleteRequestsForUser returns all delete requests for a user which are not processed.
207func (ds *DeleteStore) GetPendingDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
208	pendingDeleteRequests := []DeleteRequest{}
209	for _, status := range pendingDeleteRequestStatuses {
210		deleteRequests, err := ds.GetDeleteRequestsForUserByStatus(ctx, userID, status)
211		if err != nil {
212			return nil, err
213		}
214
215		pendingDeleteRequests = append(pendingDeleteRequests, deleteRequests...)
216	}
217
218	return pendingDeleteRequests, nil
219}
220
221func (ds *DeleteStore) queryDeleteRequests(ctx context.Context, deleteQuery chunk.IndexQuery) ([]DeleteRequest, error) {
222	deleteRequests := []DeleteRequest{}
223	// No need to lock inside the callback since we run a single index query.
224	err := ds.indexClient.QueryPages(ctx, []chunk.IndexQuery{deleteQuery}, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
225		itr := batch.Iterator()
226		for itr.Next() {
227			userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue()))
228
229			deleteRequests = append(deleteRequests, DeleteRequest{
230				UserID:    userID,
231				RequestID: requestID,
232				Status:    DeleteRequestStatus(itr.Value()),
233			})
234		}
235		return true
236	})
237	if err != nil {
238		return nil, err
239	}
240
241	for i, deleteRequest := range deleteRequests {
242		deleteRequestQuery := []chunk.IndexQuery{
243			{
244				TableName: ds.cfg.RequestsTableName,
245				HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID),
246			},
247		}
248
249		var parseError error
250		err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
251			itr := batch.Iterator()
252			itr.Next()
253
254			deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest)
255			if err != nil {
256				parseError = err
257				return false
258			}
259
260			deleteRequest.Selectors = strings.Split(string(itr.Value()), separator)
261			deleteRequests[i] = deleteRequest
262
263			return true
264		})
265		if err != nil {
266			return nil, err
267		}
268
269		if parseError != nil {
270			return nil, parseError
271		}
272	}
273
274	return deleteRequests, nil
275}
276
277// getCacheGenerationNumbers returns cache gen numbers for a user.
278func (ds *DeleteStore) getCacheGenerationNumbers(ctx context.Context, userID string) (*cacheGenNumbers, error) {
279	storeCacheGen, err := ds.queryCacheGenerationNumber(ctx, userID, CacheKindStore)
280	if err != nil {
281		return nil, err
282	}
283
284	resultsCacheGen, err := ds.queryCacheGenerationNumber(ctx, userID, CacheKindResults)
285	if err != nil {
286		return nil, err
287	}
288
289	return &cacheGenNumbers{storeCacheGen, resultsCacheGen}, nil
290}
291
292func (ds *DeleteStore) queryCacheGenerationNumber(ctx context.Context, userID string, kind CacheKind) (string, error) {
293	query := chunk.IndexQuery{TableName: ds.cfg.RequestsTableName, HashValue: fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, kind)}
294
295	genNumber := ""
296	err := ds.indexClient.QueryPages(ctx, []chunk.IndexQuery{query}, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
297		itr := batch.Iterator()
298		for itr.Next() {
299			genNumber = string(itr.Value())
300			break
301		}
302		return false
303	})
304	if err != nil {
305		return "", err
306	}
307
308	return genNumber, nil
309}
310
311// RemoveDeleteRequest removes a delete request and increments cache gen number
312func (ds *DeleteStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error {
313	userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
314
315	writeBatch := ds.indexClient.NewWriteBatch()
316	writeBatch.Delete(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))
317
318	// Add another entry with additional details like creation time, time range of delete request and selectors in value
319	rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
320	writeBatch.Delete(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
321		[]byte(rangeValue))
322
323	// we need to invalidate results cache since removal of delete request would cause query results to change
324	writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindResults),
325		[]byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10)))
326
327	return ds.indexClient.BatchWrite(ctx, writeBatch)
328}
329
330func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest) (DeleteRequest, error) {
331	hexParts := strings.Split(string(rangeValue), ":")
332	if len(hexParts) != 3 {
333		return deleteRequest, errors.New("invalid key in parsing delete request lookup response")
334	}
335
336	createdAt, err := strconv.ParseInt(hexParts[0], 16, 64)
337	if err != nil {
338		return deleteRequest, err
339	}
340
341	from, err := strconv.ParseInt(hexParts[1], 16, 64)
342	if err != nil {
343		return deleteRequest, err
344	}
345	through, err := strconv.ParseInt(hexParts[2], 16, 64)
346	if err != nil {
347		return deleteRequest, err
348	}
349
350	deleteRequest.CreatedAt = model.Time(createdAt)
351	deleteRequest.StartTime = model.Time(from)
352	deleteRequest.EndTime = model.Time(through)
353
354	return deleteRequest, nil
355}
356
357// An id is useful in managing delete requests
358func generateUniqueID(orgID string, selectors []string) []byte {
359	uniqueID := fnv.New32()
360	_, _ = uniqueID.Write([]byte(orgID))
361
362	timeNow := make([]byte, 8)
363	binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano()))
364	_, _ = uniqueID.Write(timeNow)
365
366	for _, selector := range selectors {
367		_, _ = uniqueID.Write([]byte(selector))
368	}
369
370	return encodeUniqueID(uniqueID.Sum32())
371}
372
373func encodeUniqueID(t uint32) []byte {
374	throughBytes := make([]byte, 4)
375	binary.BigEndian.PutUint32(throughBytes, t)
376	encodedThroughBytes := make([]byte, 8)
377	hex.Encode(encodedThroughBytes, throughBytes)
378	return encodedThroughBytes
379}
380
381func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) {
382	lastIndex := strings.LastIndex(rangeValue, ":")
383
384	userID = rangeValue[:lastIndex]
385	requestID = rangeValue[lastIndex+1:]
386
387	return
388}
389