1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. 2// See LICENSE.txt for license information. 3 4package export_delete 5 6import ( 7 "path/filepath" 8 "time" 9 10 "github.com/mattermost/mattermost-server/v6/app" 11 "github.com/mattermost/mattermost-server/v6/jobs" 12 tjobs "github.com/mattermost/mattermost-server/v6/jobs/interfaces" 13 "github.com/mattermost/mattermost-server/v6/model" 14 "github.com/mattermost/mattermost-server/v6/shared/mlog" 15) 16 17func init() { 18 app.RegisterJobsExportDeleteInterface(func(s *app.Server) tjobs.ExportDeleteInterface { 19 a := app.New(app.ServerConnector(s)) 20 return &ExportDeleteInterfaceImpl{a} 21 }) 22} 23 24type ExportDeleteInterfaceImpl struct { 25 app *app.App 26} 27 28type ExportDeleteWorker struct { 29 name string 30 stopChan chan struct{} 31 stoppedChan chan struct{} 32 jobsChan chan model.Job 33 jobServer *jobs.JobServer 34 app *app.App 35} 36 37func (i *ExportDeleteInterfaceImpl) MakeWorker() model.Worker { 38 return &ExportDeleteWorker{ 39 name: "ExportDelete", 40 stopChan: make(chan struct{}), 41 stoppedChan: make(chan struct{}), 42 jobsChan: make(chan model.Job), 43 jobServer: i.app.Srv().Jobs, 44 app: i.app, 45 } 46} 47 48func (w *ExportDeleteWorker) JobChannel() chan<- model.Job { 49 return w.jobsChan 50} 51 52func (w *ExportDeleteWorker) Run() { 53 mlog.Debug("Worker started", mlog.String("worker", w.name)) 54 55 defer func() { 56 mlog.Debug("Worker finished", mlog.String("worker", w.name)) 57 close(w.stoppedChan) 58 }() 59 60 for { 61 select { 62 case <-w.stopChan: 63 mlog.Debug("Worker received stop signal", mlog.String("worker", w.name)) 64 return 65 case job := <-w.jobsChan: 66 mlog.Debug("Worker received a new candidate job.", mlog.String("worker", w.name)) 67 w.doJob(&job) 68 } 69 } 70} 71 72func (w *ExportDeleteWorker) Stop() { 73 mlog.Debug("Worker stopping", mlog.String("worker", w.name)) 74 close(w.stopChan) 75 <-w.stoppedChan 76} 77 78func (w *ExportDeleteWorker) doJob(job *model.Job) { 79 if claimed, err := w.jobServer.ClaimJob(job); err != nil { 80 mlog.Warn("Worker experienced an error while trying to claim job", 81 mlog.String("worker", w.name), 82 mlog.String("job_id", job.Id), 83 mlog.String("error", err.Error())) 84 return 85 } else if !claimed { 86 return 87 } 88 89 exportPath := *w.app.Config().ExportSettings.Directory 90 retentionTime := time.Duration(*w.app.Config().ExportSettings.RetentionDays) * 24 * time.Hour 91 exports, appErr := w.app.ListDirectory(exportPath) 92 if appErr != nil { 93 w.setJobError(job, appErr) 94 return 95 } 96 97 var hasErrs bool 98 for i := range exports { 99 filename := filepath.Base(exports[i]) 100 modTime, appErr := w.app.FileModTime(filepath.Join(exportPath, filename)) 101 if appErr != nil { 102 mlog.Debug("Worker: Failed to get file modification time", 103 mlog.Err(appErr), mlog.String("export", exports[i])) 104 hasErrs = true 105 continue 106 } 107 108 if time.Now().After(modTime.Add(retentionTime)) { 109 // remove file data from storage. 110 if appErr := w.app.RemoveFile(exports[i]); appErr != nil { 111 mlog.Debug("Worker: Failed to remove file", 112 mlog.Err(appErr), mlog.String("export", exports[i])) 113 hasErrs = true 114 continue 115 } 116 } 117 } 118 119 if hasErrs { 120 mlog.Warn("Worker: errors occurred") 121 } 122 123 mlog.Info("Worker: Job is complete", mlog.String("worker", w.name), mlog.String("job_id", job.Id)) 124 w.setJobSuccess(job) 125} 126 127func (w *ExportDeleteWorker) setJobSuccess(job *model.Job) { 128 if err := w.app.Srv().Jobs.SetJobSuccess(job); err != nil { 129 mlog.Error("Worker: Failed to set success for job", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) 130 w.setJobError(job, err) 131 } 132} 133 134func (w *ExportDeleteWorker) setJobError(job *model.Job, appError *model.AppError) { 135 if err := w.app.Srv().Jobs.SetJobError(job, appError); err != nil { 136 mlog.Error("Worker: Failed to set job error", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) 137 } 138} 139