1package purger 2 3import ( 4 "context" 5 "encoding/binary" 6 "encoding/hex" 7 "errors" 8 "flag" 9 "fmt" 10 "hash/fnv" 11 "strconv" 12 "strings" 13 "time" 14 15 "github.com/grafana/loki/pkg/storage/chunk" 16 17 "github.com/prometheus/common/model" 18 "github.com/prometheus/prometheus/pkg/labels" 19) 20 21type ( 22 DeleteRequestStatus string 23 CacheKind string 24 indexType string 25) 26 27const ( 28 StatusReceived DeleteRequestStatus = "received" 29 StatusBuildingPlan DeleteRequestStatus = "buildingPlan" 30 StatusDeleting DeleteRequestStatus = "deleting" 31 StatusProcessed DeleteRequestStatus = "processed" 32 33 separator = "\000" // separator for series selectors in delete requests 34 35 // CacheKindStore is for cache gen number for store cache 36 CacheKindStore CacheKind = "store" 37 // CacheKindResults is for cache gen number for results cache 38 CacheKindResults CacheKind = "results" 39 40 deleteRequestID indexType = "1" 41 deleteRequestDetails indexType = "2" 42 cacheGenNum indexType = "3" 43) 44 45var ( 46 pendingDeleteRequestStatuses = []DeleteRequestStatus{StatusReceived, StatusBuildingPlan, StatusDeleting} 47 48 ErrDeleteRequestNotFound = errors.New("could not find matching delete request") 49) 50 51// DeleteRequest holds all the details about a delete request. 52type DeleteRequest struct { 53 RequestID string `json:"request_id"` 54 UserID string `json:"-"` 55 StartTime model.Time `json:"start_time"` 56 EndTime model.Time `json:"end_time"` 57 Selectors []string `json:"selectors"` 58 Status DeleteRequestStatus `json:"status"` 59 Matchers [][]*labels.Matcher `json:"-"` 60 CreatedAt model.Time `json:"created_at"` 61} 62 63// cacheGenNumbers holds store and results cache gen numbers for a user. 64type cacheGenNumbers struct { 65 store, results string 66} 67 68// DeleteStore provides all the methods required to manage lifecycle of delete request and things related to it. 69type DeleteStore struct { 70 cfg DeleteStoreConfig 71 indexClient chunk.IndexClient 72} 73 74// DeleteStoreConfig holds configuration for delete store. 75type DeleteStoreConfig struct { 76 Store string `yaml:"store"` 77 RequestsTableName string `yaml:"requests_table_name"` 78 ProvisionConfig TableProvisioningConfig `yaml:"table_provisioning"` 79} 80 81// RegisterFlags adds the flags required to configure this flag set. 82func (cfg *DeleteStoreConfig) RegisterFlags(f *flag.FlagSet) { 83 cfg.ProvisionConfig.RegisterFlags("deletes.table", f) 84 f.StringVar(&cfg.Store, "deletes.store", "", "Store for keeping delete request") 85 f.StringVar(&cfg.RequestsTableName, "deletes.requests-table-name", "delete_requests", "Name of the table which stores delete requests") 86} 87 88// NewDeleteStore creates a store for managing delete requests. 89func NewDeleteStore(cfg DeleteStoreConfig, indexClient chunk.IndexClient) (*DeleteStore, error) { 90 ds := DeleteStore{ 91 cfg: cfg, 92 indexClient: indexClient, 93 } 94 95 return &ds, nil 96} 97 98// Add creates entries for a new delete request. 99func (ds *DeleteStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error { 100 return ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, selectors) 101} 102 103// addDeleteRequest is also used for tests to create delete requests with different createdAt time. 104func (ds *DeleteStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, selectors []string) error { 105 requestID := generateUniqueID(userID, selectors) 106 107 for { 108 _, err := ds.GetDeleteRequest(ctx, userID, string(requestID)) 109 if err != nil { 110 if err == ErrDeleteRequestNotFound { 111 break 112 } 113 return err 114 } 115 116 // we have a collision here, lets recreate a new requestID and check for collision 117 time.Sleep(time.Millisecond) 118 requestID = generateUniqueID(userID, selectors) 119 } 120 121 // userID, requestID 122 userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) 123 124 // Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status 125 // We don't want to set anything in hash key here since we would want to find delete requests by just status 126 writeBatch := ds.indexClient.NewWriteBatch() 127 writeBatch.Add(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived)) 128 129 // Add another entry with additional details like creation time, time range of delete request and selectors in value 130 rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) 131 writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), 132 []byte(rangeValue), []byte(strings.Join(selectors, separator))) 133 134 // we update only cache gen number because only query responses are changing at this stage. 135 // we still have to query data from store for doing query time filtering and we don't want to invalidate its results now. 136 writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindResults), 137 []byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10))) 138 139 return ds.indexClient.BatchWrite(ctx, writeBatch) 140} 141 142// GetDeleteRequestsByStatus returns all delete requests for given status. 143func (ds *DeleteStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) { 144 return ds.queryDeleteRequests(ctx, chunk.IndexQuery{ 145 TableName: ds.cfg.RequestsTableName, 146 HashValue: string(deleteRequestID), 147 ValueEqual: []byte(status), 148 }) 149} 150 151// GetDeleteRequestsForUserByStatus returns all delete requests for a user with given status. 152func (ds *DeleteStore) GetDeleteRequestsForUserByStatus(ctx context.Context, userID string, status DeleteRequestStatus) ([]DeleteRequest, error) { 153 return ds.queryDeleteRequests(ctx, chunk.IndexQuery{ 154 TableName: ds.cfg.RequestsTableName, 155 HashValue: string(deleteRequestID), 156 RangeValuePrefix: []byte(userID), 157 ValueEqual: []byte(status), 158 }) 159} 160 161// GetAllDeleteRequestsForUser returns all delete requests for a user. 162func (ds *DeleteStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { 163 return ds.queryDeleteRequests(ctx, chunk.IndexQuery{ 164 TableName: ds.cfg.RequestsTableName, 165 HashValue: string(deleteRequestID), 166 RangeValuePrefix: []byte(userID), 167 }) 168} 169 170// UpdateStatus updates status of a delete request. 171func (ds *DeleteStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error { 172 userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) 173 174 writeBatch := ds.indexClient.NewWriteBatch() 175 writeBatch.Add(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus)) 176 177 if newStatus == StatusProcessed { 178 // we have deleted data from store so invalidate cache only for store since we don't have to do runtime filtering anymore. 179 // we don't have to change cache gen number because we were anyways doing runtime filtering 180 writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindStore), []byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10))) 181 } 182 183 return ds.indexClient.BatchWrite(ctx, writeBatch) 184} 185 186// GetDeleteRequest returns delete request with given requestID. 187func (ds *DeleteStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) { 188 userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) 189 190 deleteRequests, err := ds.queryDeleteRequests(ctx, chunk.IndexQuery{ 191 TableName: ds.cfg.RequestsTableName, 192 HashValue: string(deleteRequestID), 193 RangeValuePrefix: []byte(userIDAndRequestID), 194 }) 195 if err != nil { 196 return nil, err 197 } 198 199 if len(deleteRequests) == 0 { 200 return nil, ErrDeleteRequestNotFound 201 } 202 203 return &deleteRequests[0], nil 204} 205 206// GetPendingDeleteRequestsForUser returns all delete requests for a user which are not processed. 207func (ds *DeleteStore) GetPendingDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { 208 pendingDeleteRequests := []DeleteRequest{} 209 for _, status := range pendingDeleteRequestStatuses { 210 deleteRequests, err := ds.GetDeleteRequestsForUserByStatus(ctx, userID, status) 211 if err != nil { 212 return nil, err 213 } 214 215 pendingDeleteRequests = append(pendingDeleteRequests, deleteRequests...) 216 } 217 218 return pendingDeleteRequests, nil 219} 220 221func (ds *DeleteStore) queryDeleteRequests(ctx context.Context, deleteQuery chunk.IndexQuery) ([]DeleteRequest, error) { 222 deleteRequests := []DeleteRequest{} 223 // No need to lock inside the callback since we run a single index query. 224 err := ds.indexClient.QueryPages(ctx, []chunk.IndexQuery{deleteQuery}, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { 225 itr := batch.Iterator() 226 for itr.Next() { 227 userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue())) 228 229 deleteRequests = append(deleteRequests, DeleteRequest{ 230 UserID: userID, 231 RequestID: requestID, 232 Status: DeleteRequestStatus(itr.Value()), 233 }) 234 } 235 return true 236 }) 237 if err != nil { 238 return nil, err 239 } 240 241 for i, deleteRequest := range deleteRequests { 242 deleteRequestQuery := []chunk.IndexQuery{ 243 { 244 TableName: ds.cfg.RequestsTableName, 245 HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID), 246 }, 247 } 248 249 var parseError error 250 err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { 251 itr := batch.Iterator() 252 itr.Next() 253 254 deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest) 255 if err != nil { 256 parseError = err 257 return false 258 } 259 260 deleteRequest.Selectors = strings.Split(string(itr.Value()), separator) 261 deleteRequests[i] = deleteRequest 262 263 return true 264 }) 265 if err != nil { 266 return nil, err 267 } 268 269 if parseError != nil { 270 return nil, parseError 271 } 272 } 273 274 return deleteRequests, nil 275} 276 277// getCacheGenerationNumbers returns cache gen numbers for a user. 278func (ds *DeleteStore) getCacheGenerationNumbers(ctx context.Context, userID string) (*cacheGenNumbers, error) { 279 storeCacheGen, err := ds.queryCacheGenerationNumber(ctx, userID, CacheKindStore) 280 if err != nil { 281 return nil, err 282 } 283 284 resultsCacheGen, err := ds.queryCacheGenerationNumber(ctx, userID, CacheKindResults) 285 if err != nil { 286 return nil, err 287 } 288 289 return &cacheGenNumbers{storeCacheGen, resultsCacheGen}, nil 290} 291 292func (ds *DeleteStore) queryCacheGenerationNumber(ctx context.Context, userID string, kind CacheKind) (string, error) { 293 query := chunk.IndexQuery{TableName: ds.cfg.RequestsTableName, HashValue: fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, kind)} 294 295 genNumber := "" 296 err := ds.indexClient.QueryPages(ctx, []chunk.IndexQuery{query}, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { 297 itr := batch.Iterator() 298 for itr.Next() { 299 genNumber = string(itr.Value()) 300 break 301 } 302 return false 303 }) 304 if err != nil { 305 return "", err 306 } 307 308 return genNumber, nil 309} 310 311// RemoveDeleteRequest removes a delete request and increments cache gen number 312func (ds *DeleteStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error { 313 userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) 314 315 writeBatch := ds.indexClient.NewWriteBatch() 316 writeBatch.Delete(ds.cfg.RequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID)) 317 318 // Add another entry with additional details like creation time, time range of delete request and selectors in value 319 rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) 320 writeBatch.Delete(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), 321 []byte(rangeValue)) 322 323 // we need to invalidate results cache since removal of delete request would cause query results to change 324 writeBatch.Add(ds.cfg.RequestsTableName, fmt.Sprintf("%s:%s:%s", cacheGenNum, userID, CacheKindResults), 325 []byte{}, []byte(strconv.FormatInt(time.Now().Unix(), 10))) 326 327 return ds.indexClient.BatchWrite(ctx, writeBatch) 328} 329 330func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest) (DeleteRequest, error) { 331 hexParts := strings.Split(string(rangeValue), ":") 332 if len(hexParts) != 3 { 333 return deleteRequest, errors.New("invalid key in parsing delete request lookup response") 334 } 335 336 createdAt, err := strconv.ParseInt(hexParts[0], 16, 64) 337 if err != nil { 338 return deleteRequest, err 339 } 340 341 from, err := strconv.ParseInt(hexParts[1], 16, 64) 342 if err != nil { 343 return deleteRequest, err 344 } 345 through, err := strconv.ParseInt(hexParts[2], 16, 64) 346 if err != nil { 347 return deleteRequest, err 348 } 349 350 deleteRequest.CreatedAt = model.Time(createdAt) 351 deleteRequest.StartTime = model.Time(from) 352 deleteRequest.EndTime = model.Time(through) 353 354 return deleteRequest, nil 355} 356 357// An id is useful in managing delete requests 358func generateUniqueID(orgID string, selectors []string) []byte { 359 uniqueID := fnv.New32() 360 _, _ = uniqueID.Write([]byte(orgID)) 361 362 timeNow := make([]byte, 8) 363 binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano())) 364 _, _ = uniqueID.Write(timeNow) 365 366 for _, selector := range selectors { 367 _, _ = uniqueID.Write([]byte(selector)) 368 } 369 370 return encodeUniqueID(uniqueID.Sum32()) 371} 372 373func encodeUniqueID(t uint32) []byte { 374 throughBytes := make([]byte, 4) 375 binary.BigEndian.PutUint32(throughBytes, t) 376 encodedThroughBytes := make([]byte, 8) 377 hex.Encode(encodedThroughBytes, throughBytes) 378 return encodedThroughBytes 379} 380 381func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) { 382 lastIndex := strings.LastIndex(rangeValue, ":") 383 384 userID = rangeValue[:lastIndex] 385 requestID = rangeValue[lastIndex+1:] 386 387 return 388} 389