1package purger 2 3import ( 4 "encoding/json" 5 "fmt" 6 "net/http" 7 "time" 8 9 "github.com/go-kit/log/level" 10 11 "github.com/prometheus/client_golang/prometheus" 12 "github.com/prometheus/client_golang/prometheus/promauto" 13 "github.com/prometheus/common/model" 14 "github.com/prometheus/prometheus/promql/parser" 15 16 "github.com/cortexproject/cortex/pkg/tenant" 17 "github.com/cortexproject/cortex/pkg/util" 18 util_log "github.com/cortexproject/cortex/pkg/util/log" 19) 20 21type deleteRequestHandlerMetrics struct { 22 deleteRequestsReceivedTotal *prometheus.CounterVec 23} 24 25func newDeleteRequestHandlerMetrics(r prometheus.Registerer) *deleteRequestHandlerMetrics { 26 m := deleteRequestHandlerMetrics{} 27 28 m.deleteRequestsReceivedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ 29 Namespace: "cortex", 30 Name: "purger_delete_requests_received_total", 31 Help: "Number of delete requests received per user", 32 }, []string{"user"}) 33 34 return &m 35} 36 37// DeleteRequestHandler provides handlers for delete requests 38type DeleteRequestHandler struct { 39 deleteStore *DeleteStore 40 metrics *deleteRequestHandlerMetrics 41 deleteRequestCancelPeriod time.Duration 42} 43 44// NewDeleteRequestHandler creates a DeleteRequestHandler 45func NewDeleteRequestHandler(deleteStore *DeleteStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { 46 deleteMgr := DeleteRequestHandler{ 47 deleteStore: deleteStore, 48 deleteRequestCancelPeriod: deleteRequestCancelPeriod, 49 metrics: newDeleteRequestHandlerMetrics(registerer), 50 } 51 52 return &deleteMgr 53} 54 55// AddDeleteRequestHandler handles addition of new delete request 56func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { 57 ctx := r.Context() 58 userID, err := tenant.TenantID(ctx) 59 if err != nil { 60 http.Error(w, err.Error(), http.StatusBadRequest) 61 return 62 } 63 64 params := r.URL.Query() 65 match := params["match[]"] 66 if len(match) == 0 { 67 http.Error(w, "selectors not set", http.StatusBadRequest) 68 return 69 } 70 71 for i := range match { 72 _, err := parser.ParseMetricSelector(match[i]) 73 if err != nil { 74 http.Error(w, err.Error(), http.StatusBadRequest) 75 return 76 } 77 } 78 79 startParam := params.Get("start") 80 startTime := int64(0) 81 if startParam != "" { 82 startTime, err = util.ParseTime(startParam) 83 if err != nil { 84 http.Error(w, err.Error(), http.StatusBadRequest) 85 return 86 } 87 } 88 89 endParam := params.Get("end") 90 endTime := int64(model.Now()) 91 92 if endParam != "" { 93 endTime, err = util.ParseTime(endParam) 94 if err != nil { 95 http.Error(w, err.Error(), http.StatusBadRequest) 96 return 97 } 98 99 if endTime > int64(model.Now()) { 100 http.Error(w, "deletes in future not allowed", http.StatusBadRequest) 101 return 102 } 103 } 104 105 if startTime > endTime { 106 http.Error(w, "start time can't be greater than end time", http.StatusBadRequest) 107 return 108 } 109 110 if err := dm.deleteStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil { 111 level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err) 112 http.Error(w, err.Error(), http.StatusInternalServerError) 113 return 114 } 115 116 dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc() 117 w.WriteHeader(http.StatusNoContent) 118} 119 120// GetAllDeleteRequestsHandler handles get all delete requests 121func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { 122 ctx := r.Context() 123 userID, err := tenant.TenantID(ctx) 124 if err != nil { 125 http.Error(w, err.Error(), http.StatusBadRequest) 126 return 127 } 128 129 deleteRequests, err := dm.deleteStore.GetAllDeleteRequestsForUser(ctx, userID) 130 if err != nil { 131 level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err) 132 http.Error(w, err.Error(), http.StatusInternalServerError) 133 return 134 } 135 136 if err := json.NewEncoder(w).Encode(deleteRequests); err != nil { 137 level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err) 138 http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError) 139 } 140} 141 142// CancelDeleteRequestHandler handles delete request cancellation 143func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { 144 ctx := r.Context() 145 userID, err := tenant.TenantID(ctx) 146 if err != nil { 147 http.Error(w, err.Error(), http.StatusBadRequest) 148 return 149 } 150 151 params := r.URL.Query() 152 requestID := params.Get("request_id") 153 154 deleteRequest, err := dm.deleteStore.GetDeleteRequest(ctx, userID, requestID) 155 if err != nil { 156 level.Error(util_log.Logger).Log("msg", "error getting delete request from the store", "err", err) 157 http.Error(w, err.Error(), http.StatusInternalServerError) 158 return 159 } 160 161 if deleteRequest == nil { 162 http.Error(w, "could not find delete request with given id", http.StatusBadRequest) 163 return 164 } 165 166 if deleteRequest.Status != StatusReceived { 167 http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest) 168 return 169 } 170 171 if deleteRequest.CreatedAt.Add(dm.deleteRequestCancelPeriod).Before(model.Now()) { 172 http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", dm.deleteRequestCancelPeriod.String()), http.StatusBadRequest) 173 return 174 } 175 176 if err := dm.deleteStore.RemoveDeleteRequest(ctx, userID, requestID, deleteRequest.CreatedAt, deleteRequest.StartTime, deleteRequest.EndTime); err != nil { 177 level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err) 178 http.Error(w, err.Error(), http.StatusInternalServerError) 179 return 180 } 181 182 w.WriteHeader(http.StatusNoContent) 183} 184