1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. 2// See LICENSE.txt for license information. 3 4package jobs 5 6import ( 7 "context" 8 "errors" 9 "net/http" 10 "time" 11 12 "github.com/mattermost/mattermost-server/v6/model" 13 "github.com/mattermost/mattermost-server/v6/shared/mlog" 14 "github.com/mattermost/mattermost-server/v6/store" 15) 16 17const ( 18 CancelWatcherPollingInterval = 5000 19) 20 21func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { 22 job := model.Job{ 23 Id: model.NewId(), 24 Type: jobType, 25 CreateAt: model.GetMillis(), 26 Status: model.JobStatusPending, 27 Data: jobData, 28 } 29 30 if err := job.IsValid(); err != nil { 31 return nil, err 32 } 33 34 if _, err := srv.Store.Job().Save(&job); err != nil { 35 return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, err.Error(), http.StatusInternalServerError) 36 } 37 38 return &job, nil 39} 40 41func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) { 42 job, err := srv.Store.Job().Get(id) 43 if err != nil { 44 var nfErr *store.ErrNotFound 45 switch { 46 case errors.As(err, &nfErr): 47 return nil, model.NewAppError("GetJob", "app.job.get.app_error", nil, nfErr.Error(), http.StatusNotFound) 48 default: 49 return nil, model.NewAppError("GetJob", "app.job.get.app_error", nil, err.Error(), http.StatusInternalServerError) 50 } 51 } 52 53 return job, nil 54} 55 56func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) { 57 updated, err := srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JobStatusPending, model.JobStatusInProgress) 58 if err != nil { 59 return false, model.NewAppError("ClaimJob", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 60 } 61 62 if updated && srv.metrics != nil { 63 srv.metrics.IncrementJobActive(job.Type) 64 } 65 66 return updated, nil 67} 68 69func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError { 70 job.Status = model.JobStatusInProgress 71 job.Progress = progress 72 73 if _, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { 74 return model.NewAppError("SetJobProgress", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 75 } 76 return nil 77} 78 79func (srv *JobServer) SetJobWarning(job *model.Job) *model.AppError { 80 if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusWarning); err != nil { 81 return model.NewAppError("SetJobWarning", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 82 } 83 return nil 84} 85 86func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError { 87 if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusSuccess); err != nil { 88 return model.NewAppError("SetJobSuccess", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 89 } 90 91 if srv.metrics != nil { 92 srv.metrics.DecrementJobActive(job.Type) 93 } 94 95 return nil 96} 97 98func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { 99 if jobError == nil { 100 _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusError) 101 if err != nil { 102 return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 103 } 104 105 if srv.metrics != nil { 106 srv.metrics.DecrementJobActive(job.Type) 107 } 108 109 return nil 110 } 111 112 job.Status = model.JobStatusError 113 job.Progress = -1 114 if job.Data == nil { 115 job.Data = make(map[string]string) 116 } 117 job.Data["error"] = jobError.Message 118 if jobError.DetailedError != "" { 119 job.Data["error"] += " — " + jobError.DetailedError 120 } 121 updated, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress) 122 if err != nil { 123 return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 124 } 125 if updated && srv.metrics != nil { 126 srv.metrics.DecrementJobActive(job.Type) 127 } 128 129 if !updated { 130 updated, err = srv.Store.Job().UpdateOptimistically(job, model.JobStatusCancelRequested) 131 if err != nil { 132 return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 133 } 134 if !updated { 135 return model.NewAppError("SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError) 136 } 137 } 138 139 return nil 140} 141 142func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { 143 if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusCanceled); err != nil { 144 return model.NewAppError("SetJobCanceled", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 145 } 146 147 if srv.metrics != nil { 148 srv.metrics.DecrementJobActive(job.Type) 149 } 150 151 return nil 152} 153 154func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError { 155 job.Status = model.JobStatusInProgress 156 job.LastActivityAt = model.GetMillis() 157 if _, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { 158 return model.NewAppError("UpdateInProgressJobData", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 159 } 160 return nil 161} 162 163func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { 164 updated, err := srv.Store.Job().UpdateStatusOptimistically(jobId, model.JobStatusPending, model.JobStatusCanceled) 165 if err != nil { 166 return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 167 } 168 if updated { 169 if srv.metrics != nil { 170 job, err := srv.GetJob(jobId) 171 if err != nil { 172 return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 173 } 174 175 srv.metrics.DecrementJobActive(job.Type) 176 } 177 178 return nil 179 } 180 181 updated, err = srv.Store.Job().UpdateStatusOptimistically(jobId, model.JobStatusInProgress, model.JobStatusCancelRequested) 182 if err != nil { 183 return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError) 184 } 185 186 if updated { 187 return nil 188 } 189 190 return model.NewAppError("RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError) 191} 192 193func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { 194 for { 195 select { 196 case <-ctx.Done(): 197 mlog.Debug("CancellationWatcher for Job Aborting as job has finished.", mlog.String("job_id", jobId)) 198 return 199 case <-time.After(CancelWatcherPollingInterval * time.Millisecond): 200 mlog.Debug("CancellationWatcher for Job started polling.", mlog.String("job_id", jobId)) 201 if jobStatus, err := srv.Store.Job().Get(jobId); err == nil { 202 if jobStatus.Status == model.JobStatusCancelRequested { 203 close(cancelChan) 204 return 205 } 206 } 207 } 208 } 209} 210 211func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time { 212 nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local) 213 214 if !now.Before(nextTime) { 215 nextTime = nextTime.AddDate(0, 0, 1) 216 } 217 218 return &nextTime 219} 220 221func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { 222 count, err := srv.Store.Job().GetCountByStatusAndType(model.JobStatusPending, jobType) 223 if err != nil { 224 return false, model.NewAppError("CheckForPendingJobsByType", "app.job.get_count_by_status_and_type.app_error", nil, err.Error(), http.StatusInternalServerError) 225 } 226 return count > 0, nil 227} 228 229func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { 230 statuses := []string{model.JobStatusSuccess} 231 if jobType == model.JobTypeMessageExport { 232 statuses = []string{model.JobStatusWarning, model.JobStatusSuccess} 233 } 234 job, err := srv.Store.Job().GetNewestJobByStatusesAndType(statuses, jobType) 235 var nfErr *store.ErrNotFound 236 if err != nil && !errors.As(err, &nfErr) { 237 return nil, model.NewAppError("GetLastSuccessfulJobByType", "app.job.get_newest_job_by_status_and_type.app_error", nil, err.Error(), http.StatusInternalServerError) 238 } 239 return job, nil 240} 241