1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
2// See LICENSE.txt for license information.
3
4package jobs
5
6import (
7	"context"
8	"errors"
9	"net/http"
10	"time"
11
12	"github.com/mattermost/mattermost-server/v6/model"
13	"github.com/mattermost/mattermost-server/v6/shared/mlog"
14	"github.com/mattermost/mattermost-server/v6/store"
15)
16
17const (
18	CancelWatcherPollingInterval = 5000
19)
20
21func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
22	job := model.Job{
23		Id:       model.NewId(),
24		Type:     jobType,
25		CreateAt: model.GetMillis(),
26		Status:   model.JobStatusPending,
27		Data:     jobData,
28	}
29
30	if err := job.IsValid(); err != nil {
31		return nil, err
32	}
33
34	if _, err := srv.Store.Job().Save(&job); err != nil {
35		return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, err.Error(), http.StatusInternalServerError)
36	}
37
38	return &job, nil
39}
40
41func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) {
42	job, err := srv.Store.Job().Get(id)
43	if err != nil {
44		var nfErr *store.ErrNotFound
45		switch {
46		case errors.As(err, &nfErr):
47			return nil, model.NewAppError("GetJob", "app.job.get.app_error", nil, nfErr.Error(), http.StatusNotFound)
48		default:
49			return nil, model.NewAppError("GetJob", "app.job.get.app_error", nil, err.Error(), http.StatusInternalServerError)
50		}
51	}
52
53	return job, nil
54}
55
56func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) {
57	updated, err := srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JobStatusPending, model.JobStatusInProgress)
58	if err != nil {
59		return false, model.NewAppError("ClaimJob", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
60	}
61
62	if updated && srv.metrics != nil {
63		srv.metrics.IncrementJobActive(job.Type)
64	}
65
66	return updated, nil
67}
68
69func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError {
70	job.Status = model.JobStatusInProgress
71	job.Progress = progress
72
73	if _, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil {
74		return model.NewAppError("SetJobProgress", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
75	}
76	return nil
77}
78
79func (srv *JobServer) SetJobWarning(job *model.Job) *model.AppError {
80	if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusWarning); err != nil {
81		return model.NewAppError("SetJobWarning", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
82	}
83	return nil
84}
85
86func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError {
87	if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusSuccess); err != nil {
88		return model.NewAppError("SetJobSuccess", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
89	}
90
91	if srv.metrics != nil {
92		srv.metrics.DecrementJobActive(job.Type)
93	}
94
95	return nil
96}
97
98func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
99	if jobError == nil {
100		_, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusError)
101		if err != nil {
102			return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
103		}
104
105		if srv.metrics != nil {
106			srv.metrics.DecrementJobActive(job.Type)
107		}
108
109		return nil
110	}
111
112	job.Status = model.JobStatusError
113	job.Progress = -1
114	if job.Data == nil {
115		job.Data = make(map[string]string)
116	}
117	job.Data["error"] = jobError.Message
118	if jobError.DetailedError != "" {
119		job.Data["error"] += " — " + jobError.DetailedError
120	}
121	updated, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress)
122	if err != nil {
123		return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
124	}
125	if updated && srv.metrics != nil {
126		srv.metrics.DecrementJobActive(job.Type)
127	}
128
129	if !updated {
130		updated, err = srv.Store.Job().UpdateOptimistically(job, model.JobStatusCancelRequested)
131		if err != nil {
132			return model.NewAppError("SetJobError", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
133		}
134		if !updated {
135			return model.NewAppError("SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError)
136		}
137	}
138
139	return nil
140}
141
142func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
143	if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JobStatusCanceled); err != nil {
144		return model.NewAppError("SetJobCanceled", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
145	}
146
147	if srv.metrics != nil {
148		srv.metrics.DecrementJobActive(job.Type)
149	}
150
151	return nil
152}
153
154func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError {
155	job.Status = model.JobStatusInProgress
156	job.LastActivityAt = model.GetMillis()
157	if _, err := srv.Store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil {
158		return model.NewAppError("UpdateInProgressJobData", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
159	}
160	return nil
161}
162
163func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
164	updated, err := srv.Store.Job().UpdateStatusOptimistically(jobId, model.JobStatusPending, model.JobStatusCanceled)
165	if err != nil {
166		return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
167	}
168	if updated {
169		if srv.metrics != nil {
170			job, err := srv.GetJob(jobId)
171			if err != nil {
172				return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
173			}
174
175			srv.metrics.DecrementJobActive(job.Type)
176		}
177
178		return nil
179	}
180
181	updated, err = srv.Store.Job().UpdateStatusOptimistically(jobId, model.JobStatusInProgress, model.JobStatusCancelRequested)
182	if err != nil {
183		return model.NewAppError("RequestCancellation", "app.job.update.app_error", nil, err.Error(), http.StatusInternalServerError)
184	}
185
186	if updated {
187		return nil
188	}
189
190	return model.NewAppError("RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError)
191}
192
193func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
194	for {
195		select {
196		case <-ctx.Done():
197			mlog.Debug("CancellationWatcher for Job Aborting as job has finished.", mlog.String("job_id", jobId))
198			return
199		case <-time.After(CancelWatcherPollingInterval * time.Millisecond):
200			mlog.Debug("CancellationWatcher for Job started polling.", mlog.String("job_id", jobId))
201			if jobStatus, err := srv.Store.Job().Get(jobId); err == nil {
202				if jobStatus.Status == model.JobStatusCancelRequested {
203					close(cancelChan)
204					return
205				}
206			}
207		}
208	}
209}
210
211func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time {
212	nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local)
213
214	if !now.Before(nextTime) {
215		nextTime = nextTime.AddDate(0, 0, 1)
216	}
217
218	return &nextTime
219}
220
221func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
222	count, err := srv.Store.Job().GetCountByStatusAndType(model.JobStatusPending, jobType)
223	if err != nil {
224		return false, model.NewAppError("CheckForPendingJobsByType", "app.job.get_count_by_status_and_type.app_error", nil, err.Error(), http.StatusInternalServerError)
225	}
226	return count > 0, nil
227}
228
229func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
230	statuses := []string{model.JobStatusSuccess}
231	if jobType == model.JobTypeMessageExport {
232		statuses = []string{model.JobStatusWarning, model.JobStatusSuccess}
233	}
234	job, err := srv.Store.Job().GetNewestJobByStatusesAndType(statuses, jobType)
235	var nfErr *store.ErrNotFound
236	if err != nil && !errors.As(err, &nfErr) {
237		return nil, model.NewAppError("GetLastSuccessfulJobByType", "app.job.get_newest_job_by_status_and_type.app_error", nil, err.Error(), http.StatusInternalServerError)
238	}
239	return job, nil
240}
241