1package retention
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"time"
9
10	util_log "github.com/cortexproject/cortex/pkg/util/log"
11	"github.com/go-kit/kit/log/level"
12	"github.com/prometheus/client_golang/prometheus"
13	"github.com/prometheus/common/model"
14	"go.etcd.io/bbolt"
15
16	"github.com/grafana/loki/pkg/chunkenc"
17	"github.com/grafana/loki/pkg/storage"
18	"github.com/grafana/loki/pkg/storage/chunk"
19)
20
21var (
22	bucketName  = []byte("index")
23	chunkBucket = []byte("chunks")
24)
25
26const (
27	logMetricName = "logs"
28	markersFolder = "markers"
29	separator     = "\000"
30)
31
32type TableMarker interface {
33	// MarkForDelete marks chunks to delete for a given table and returns if it's empty and how many marks were created.
34	MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error)
35}
36
37type Marker struct {
38	workingDirectory string
39	config           storage.SchemaConfig
40	expiration       ExpirationChecker
41	markerMetrics    *markerMetrics
42	chunkClient      chunk.Client
43}
44
45func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration ExpirationChecker, chunkClient chunk.Client, r prometheus.Registerer) (*Marker, error) {
46	if err := validatePeriods(config); err != nil {
47		return nil, err
48	}
49	metrics := newMarkerMetrics(r)
50	return &Marker{
51		workingDirectory: workingDirectory,
52		config:           config,
53		expiration:       expiration,
54		markerMetrics:    metrics,
55		chunkClient:      chunkClient,
56	}, nil
57}
58
59// MarkForDelete marks all chunks expired for a given table.
60func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) {
61	start := time.Now()
62	status := statusSuccess
63	defer func() {
64		t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds())
65		level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start))
66	}()
67	level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName)
68
69	empty, markCount, err := t.markTable(ctx, tableName, db)
70	if err != nil {
71		status = statusFailure
72		return false, 0, err
73	}
74	return empty, markCount, nil
75}
76
77func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) {
78	schemaCfg, ok := schemaPeriodForTable(t.config, tableName)
79	if !ok {
80		return false, 0, fmt.Errorf("could not find schema for table: %s", tableName)
81	}
82
83	markerWriter, err := NewMarkerStorageWriter(t.workingDirectory)
84	if err != nil {
85		return false, 0, fmt.Errorf("failed to create marker writer: %w", err)
86	}
87
88	var empty bool
89	err = db.Update(func(tx *bbolt.Tx) error {
90		bucket := tx.Bucket(bucketName)
91		if bucket == nil {
92			return nil
93		}
94
95		chunkIt, err := newChunkIndexIterator(bucket, schemaCfg)
96		if err != nil {
97			return fmt.Errorf("failed to create chunk index iterator: %w", err)
98		}
99		if ctx.Err() != nil {
100			return ctx.Err()
101		}
102		chunkRewriter, err := newChunkRewriter(t.chunkClient, schemaCfg, tableName, bucket)
103		if err != nil {
104			return err
105		}
106
107		empty, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter)
108		if err != nil {
109			return err
110		}
111		t.markerMetrics.tableMarksCreatedTotal.WithLabelValues(tableName).Add(float64(markerWriter.Count()))
112		if err := markerWriter.Close(); err != nil {
113			return fmt.Errorf("failed to close marker writer: %w", err)
114		}
115		return nil
116	})
117	if err != nil {
118		return false, 0, err
119	}
120	if empty {
121		t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc()
122		return empty, markerWriter.Count(), nil
123	}
124	if markerWriter.Count() == 0 {
125		t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc()
126		return empty, markerWriter.Count(), nil
127	}
128	t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc()
129	return empty, markerWriter.Count(), nil
130}
131
132func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) {
133	seriesMap := newUserSeriesMap()
134	// tableInterval holds the interval for which the table is expected to have the chunks indexed
135	tableInterval := ExtractIntervalFromTableName(tableName)
136	empty := true
137	now := model.Now()
138
139	for chunkIt.Next() {
140		if chunkIt.Err() != nil {
141			return false, chunkIt.Err()
142		}
143		c := chunkIt.Entry()
144		seriesMap.Add(c.SeriesID, c.UserID, c.Labels)
145
146		// see if the chunk is deleted completely or partially
147		if expired, nonDeletedIntervals := expiration.Expired(c, now); expired {
148			if len(nonDeletedIntervals) > 0 {
149				wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals)
150				if err != nil {
151					return false, err
152				}
153
154				if wroteChunks {
155					// we have re-written chunk to the storage so the table won't be empty and the series are still being referred.
156					empty = false
157					seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
158				}
159			}
160
161			if err := chunkIt.Delete(); err != nil {
162				return false, err
163			}
164
165			// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
166			// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
167			// the retention would fail because it would fail to find it in the storage.
168			if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End {
169				if err := marker.Put(c.ChunkID); err != nil {
170					return false, err
171				}
172			}
173			continue
174		}
175
176		// The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval.
177		// If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table.
178		// We would now check if the end time of the tableInterval is out of retention period so that
179		// we can drop the chunk entry from this table without removing the chunk from the store.
180		if c.Through.After(tableInterval.End) {
181			if expiration.DropFromIndex(c, tableInterval.End, now) {
182				if err := chunkIt.Delete(); err != nil {
183					return false, err
184				}
185				continue
186			}
187		}
188
189		empty = false
190		seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
191	}
192	if empty {
193		return true, nil
194	}
195	if ctx.Err() != nil {
196		return false, ctx.Err()
197	}
198
199	return false, seriesMap.ForEach(func(info userSeriesInfo) error {
200		if !info.isDeleted {
201			return nil
202		}
203
204		return seriesCleaner.Cleanup(info.UserID(), info.lbls)
205	})
206}
207
208type ChunkClient interface {
209	DeleteChunk(ctx context.Context, userID, chunkID string) error
210	IsChunkNotFoundErr(err error) bool
211}
212
213type Sweeper struct {
214	markerProcessor MarkerProcessor
215	chunkClient     ChunkClient
216	sweeperMetrics  *sweeperMetrics
217}
218
219func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) {
220	m := newSweeperMetrics(r)
221	p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m)
222	if err != nil {
223		return nil, err
224	}
225	return &Sweeper{
226		markerProcessor: p,
227		chunkClient:     deleteClient,
228		sweeperMetrics:  m,
229	}, nil
230}
231
232func (s *Sweeper) Start() {
233	s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error {
234		status := statusSuccess
235		start := time.Now()
236		defer func() {
237			s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds())
238		}()
239		chunkIDString := unsafeGetString(chunkId)
240		userID, err := getUserIDFromChunkID(chunkId)
241		if err != nil {
242			return err
243		}
244
245		err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString)
246		if s.chunkClient.IsChunkNotFoundErr(err) {
247			status = statusNotFound
248			level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString)
249			return nil
250		}
251		if err != nil {
252			level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err)
253			status = statusFailure
254		}
255		return err
256	})
257}
258
259func getUserIDFromChunkID(chunkID []byte) ([]byte, error) {
260	idx := bytes.IndexByte(chunkID, '/')
261	if idx <= 0 {
262		return nil, fmt.Errorf("invalid chunk ID %q", chunkID)
263	}
264
265	return chunkID[:idx], nil
266}
267
268func (s *Sweeper) Stop() {
269	s.markerProcessor.Stop()
270}
271
272type chunkRewriter struct {
273	chunkClient chunk.Client
274	tableName   string
275	bucket      *bbolt.Bucket
276
277	seriesStoreSchema chunk.SeriesStoreSchema
278}
279
280func newChunkRewriter(chunkClient chunk.Client, schemaCfg chunk.PeriodConfig,
281	tableName string, bucket *bbolt.Bucket) (*chunkRewriter, error) {
282	schema, err := schemaCfg.CreateSchema()
283	if err != nil {
284		return nil, err
285	}
286
287	seriesStoreSchema, ok := schema.(chunk.SeriesStoreSchema)
288	if !ok {
289		return nil, errors.New("invalid schema")
290	}
291
292	return &chunkRewriter{
293		chunkClient:       chunkClient,
294		tableName:         tableName,
295		bucket:            bucket,
296		seriesStoreSchema: seriesStoreSchema,
297	}, nil
298}
299
300func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, intervals []model.Interval) (bool, error) {
301	userID := unsafeGetString(ce.UserID)
302	chunkID := unsafeGetString(ce.ChunkID)
303
304	chk, err := chunk.ParseExternalKey(userID, chunkID)
305	if err != nil {
306		return false, err
307	}
308
309	chks, err := c.chunkClient.GetChunks(ctx, []chunk.Chunk{chk})
310	if err != nil {
311		return false, err
312	}
313
314	if len(chks) != 1 {
315		return false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks))
316	}
317
318	wroteChunks := false
319
320	for _, interval := range intervals {
321		newChunkData, err := chks[0].Data.Rebound(interval.Start, interval.End)
322		if err != nil {
323			return false, err
324		}
325
326		facade, ok := newChunkData.(*chunkenc.Facade)
327		if !ok {
328			return false, errors.New("invalid chunk type")
329		}
330
331		newChunk := chunk.NewChunk(
332			userID, chks[0].Fingerprint, chks[0].Metric,
333			facade,
334			interval.Start,
335			interval.End,
336		)
337
338		err = newChunk.Encode()
339		if err != nil {
340			return false, err
341		}
342
343		entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, newChunk.ExternalKey())
344		if err != nil {
345			return false, err
346		}
347
348		uploadChunk := false
349
350		for _, entry := range entries {
351			// write an entry only if it belongs to this table
352			if entry.TableName == c.tableName {
353				key := entry.HashValue + separator + string(entry.RangeValue)
354				if err := c.bucket.Put([]byte(key), nil); err != nil {
355					return false, err
356				}
357				uploadChunk = true
358			}
359		}
360
361		// upload chunk only if an entry was written
362		if uploadChunk {
363			err = c.chunkClient.PutChunks(ctx, []chunk.Chunk{newChunk})
364			if err != nil {
365				return false, err
366			}
367			wroteChunks = true
368		}
369	}
370
371	return wroteChunks, nil
372}
373