1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. 2// See LICENSE.txt for license information. 3 4package jobs 5 6import ( 7 "errors" 8 9 "github.com/mattermost/mattermost-server/v6/model" 10 "github.com/mattermost/mattermost-server/v6/services/configservice" 11 "github.com/mattermost/mattermost-server/v6/shared/mlog" 12) 13 14type Workers struct { 15 ConfigService configservice.ConfigService 16 Watcher *Watcher 17 18 DataRetention model.Worker 19 MessageExport model.Worker 20 ElasticsearchIndexing model.Worker 21 ElasticsearchAggregation model.Worker 22 LdapSync model.Worker 23 Migrations model.Worker 24 Plugins model.Worker 25 BleveIndexing model.Worker 26 ExpiryNotify model.Worker 27 ProductNotices model.Worker 28 ActiveUsers model.Worker 29 ImportProcess model.Worker 30 ImportDelete model.Worker 31 ExportProcess model.Worker 32 ExportDelete model.Worker 33 Cloud model.Worker 34 ResendInvitationEmail model.Worker 35 ExtractContent model.Worker 36 37 listenerId string 38 running bool 39} 40 41var ( 42 ErrWorkersNotRunning = errors.New("job workers are not running") 43 ErrWorkersRunning = errors.New("job workers are running") 44 ErrWorkersUninitialized = errors.New("job workers are not initialized") 45) 46 47func (srv *JobServer) InitWorkers() error { 48 srv.mut.Lock() 49 defer srv.mut.Unlock() 50 51 if srv.workers != nil && srv.workers.running { 52 return ErrWorkersRunning 53 } 54 55 workers := &Workers{ 56 ConfigService: srv.ConfigService, 57 } 58 workers.Watcher = srv.MakeWatcher(workers, DefaultWatcherPollingInterval) 59 60 if srv.DataRetentionJob != nil { 61 workers.DataRetention = srv.DataRetentionJob.MakeWorker() 62 } 63 64 if srv.MessageExportJob != nil { 65 workers.MessageExport = srv.MessageExportJob.MakeWorker() 66 } 67 68 if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil { 69 workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() 70 } 71 72 if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { 73 workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker() 74 } 75 76 if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil { 77 workers.LdapSync = ldapSyncInterface.MakeWorker() 78 } 79 80 if migrationsInterface := srv.Migrations; migrationsInterface != nil { 81 workers.Migrations = migrationsInterface.MakeWorker() 82 } 83 84 if pluginsInterface := srv.Plugins; pluginsInterface != nil { 85 workers.Plugins = pluginsInterface.MakeWorker() 86 } 87 88 if bleveIndexerInterface := srv.BleveIndexer; bleveIndexerInterface != nil { 89 workers.BleveIndexing = bleveIndexerInterface.MakeWorker() 90 } 91 92 if expiryNotifyInterface := srv.ExpiryNotify; expiryNotifyInterface != nil { 93 workers.ExpiryNotify = expiryNotifyInterface.MakeWorker() 94 } 95 96 if activeUsersInterface := srv.ActiveUsers; activeUsersInterface != nil { 97 workers.ActiveUsers = activeUsersInterface.MakeWorker() 98 } 99 100 if productNoticesInterface := srv.ProductNotices; productNoticesInterface != nil { 101 workers.ProductNotices = productNoticesInterface.MakeWorker() 102 } 103 104 if importProcessInterface := srv.ImportProcess; importProcessInterface != nil { 105 workers.ImportProcess = importProcessInterface.MakeWorker() 106 } 107 108 if importDeleteInterface := srv.ImportDelete; importDeleteInterface != nil { 109 workers.ImportDelete = importDeleteInterface.MakeWorker() 110 } 111 112 if exportProcessInterface := srv.ExportProcess; exportProcessInterface != nil { 113 workers.ExportProcess = exportProcessInterface.MakeWorker() 114 } 115 116 if exportDeleteInterface := srv.ExportDelete; exportDeleteInterface != nil { 117 workers.ExportDelete = exportDeleteInterface.MakeWorker() 118 } 119 120 if cloudInterface := srv.Cloud; cloudInterface != nil { 121 workers.Cloud = cloudInterface.MakeWorker() 122 } 123 124 if resendInvitationEmailInterface := srv.ResendInvitationEmails; resendInvitationEmailInterface != nil { 125 workers.ResendInvitationEmail = resendInvitationEmailInterface.MakeWorker() 126 } 127 128 if extractContentInterface := srv.ExtractContent; extractContentInterface != nil { 129 workers.ExtractContent = extractContentInterface.MakeWorker() 130 } 131 132 srv.workers = workers 133 134 return nil 135} 136 137// Start starts the workers. This call is not safe for concurrent use. 138// Synchronization should be implemented by the caller. 139func (workers *Workers) Start() { 140 mlog.Info("Starting workers") 141 142 if workers.DataRetention != nil { 143 go workers.DataRetention.Run() 144 } 145 146 if workers.MessageExport != nil && *workers.ConfigService.Config().MessageExportSettings.EnableExport { 147 go workers.MessageExport.Run() 148 } 149 150 if workers.ElasticsearchIndexing != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing { 151 go workers.ElasticsearchIndexing.Run() 152 } 153 154 if workers.ElasticsearchAggregation != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing { 155 go workers.ElasticsearchAggregation.Run() 156 } 157 158 if workers.LdapSync != nil && *workers.ConfigService.Config().LdapSettings.EnableSync { 159 go workers.LdapSync.Run() 160 } 161 162 if workers.Migrations != nil { 163 go workers.Migrations.Run() 164 } 165 166 if workers.Plugins != nil { 167 go workers.Plugins.Run() 168 } 169 170 if workers.BleveIndexing != nil && *workers.ConfigService.Config().BleveSettings.EnableIndexing && *workers.ConfigService.Config().BleveSettings.IndexDir != "" { 171 go workers.BleveIndexing.Run() 172 } 173 174 if workers.ExpiryNotify != nil { 175 go workers.ExpiryNotify.Run() 176 } 177 178 if workers.ActiveUsers != nil { 179 go workers.ActiveUsers.Run() 180 } 181 182 if workers.ProductNotices != nil { 183 go workers.ProductNotices.Run() 184 } 185 186 if workers.ImportProcess != nil { 187 go workers.ImportProcess.Run() 188 } 189 190 if workers.ImportDelete != nil { 191 go workers.ImportDelete.Run() 192 } 193 194 if workers.ExportProcess != nil { 195 go workers.ExportProcess.Run() 196 } 197 198 if workers.ExportDelete != nil { 199 go workers.ExportDelete.Run() 200 } 201 202 if workers.Cloud != nil { 203 go workers.Cloud.Run() 204 } 205 206 if workers.ResendInvitationEmail != nil { 207 go workers.ResendInvitationEmail.Run() 208 } 209 210 if workers.ExtractContent != nil { 211 go workers.ExtractContent.Run() 212 } 213 214 go workers.Watcher.Start() 215 216 workers.listenerId = workers.ConfigService.AddConfigListener(workers.handleConfigChange) 217 workers.running = true 218} 219 220func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { 221 mlog.Debug("Workers received config change.") 222 223 if workers.DataRetention != nil { 224 if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) { 225 go workers.DataRetention.Run() 226 } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) { 227 workers.DataRetention.Stop() 228 } 229 } 230 231 if workers.MessageExport != nil { 232 if !*oldConfig.MessageExportSettings.EnableExport && *newConfig.MessageExportSettings.EnableExport { 233 go workers.MessageExport.Run() 234 } else if *oldConfig.MessageExportSettings.EnableExport && !*newConfig.MessageExportSettings.EnableExport { 235 workers.MessageExport.Stop() 236 } 237 } 238 239 if workers.ElasticsearchIndexing != nil { 240 if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { 241 go workers.ElasticsearchIndexing.Run() 242 } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { 243 workers.ElasticsearchIndexing.Stop() 244 } 245 } 246 247 if workers.ElasticsearchAggregation != nil { 248 if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { 249 go workers.ElasticsearchAggregation.Run() 250 } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { 251 workers.ElasticsearchAggregation.Stop() 252 } 253 } 254 255 if workers.LdapSync != nil { 256 if !*oldConfig.LdapSettings.EnableSync && *newConfig.LdapSettings.EnableSync { 257 go workers.LdapSync.Run() 258 } else if *oldConfig.LdapSettings.EnableSync && !*newConfig.LdapSettings.EnableSync { 259 workers.LdapSync.Stop() 260 } 261 } 262 263 if workers.BleveIndexing != nil { 264 if !*oldConfig.BleveSettings.EnableIndexing && *newConfig.BleveSettings.EnableIndexing { 265 go workers.BleveIndexing.Run() 266 } else if *oldConfig.BleveSettings.EnableIndexing && !*newConfig.BleveSettings.EnableIndexing { 267 workers.BleveIndexing.Stop() 268 } 269 } 270} 271 272// Stop stops the workers. This call is not safe for concurrent use. 273// Synchronization should be implemented by the caller. 274func (workers *Workers) Stop() { 275 workers.ConfigService.RemoveConfigListener(workers.listenerId) 276 277 workers.Watcher.Stop() 278 279 if workers.DataRetention != nil && (*workers.ConfigService.Config().DataRetentionSettings.EnableMessageDeletion || *workers.ConfigService.Config().DataRetentionSettings.EnableFileDeletion) { 280 workers.DataRetention.Stop() 281 } 282 283 if workers.MessageExport != nil && *workers.ConfigService.Config().MessageExportSettings.EnableExport { 284 workers.MessageExport.Stop() 285 } 286 287 if workers.ElasticsearchIndexing != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing { 288 workers.ElasticsearchIndexing.Stop() 289 } 290 291 if workers.ElasticsearchAggregation != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing { 292 workers.ElasticsearchAggregation.Stop() 293 } 294 295 if workers.LdapSync != nil && *workers.ConfigService.Config().LdapSettings.EnableSync { 296 workers.LdapSync.Stop() 297 } 298 299 if workers.Migrations != nil { 300 workers.Migrations.Stop() 301 } 302 303 if workers.Plugins != nil { 304 workers.Plugins.Stop() 305 } 306 307 if workers.BleveIndexing != nil && *workers.ConfigService.Config().BleveSettings.EnableIndexing { 308 workers.BleveIndexing.Stop() 309 } 310 311 if workers.ExpiryNotify != nil { 312 workers.ExpiryNotify.Stop() 313 } 314 315 if workers.ActiveUsers != nil { 316 workers.ActiveUsers.Stop() 317 } 318 319 if workers.ProductNotices != nil { 320 workers.ProductNotices.Stop() 321 } 322 323 if workers.ImportProcess != nil { 324 workers.ImportProcess.Stop() 325 } 326 327 if workers.ImportDelete != nil { 328 workers.ImportDelete.Stop() 329 } 330 331 if workers.ExportProcess != nil { 332 workers.ExportProcess.Stop() 333 } 334 335 if workers.ExportDelete != nil { 336 workers.ExportDelete.Stop() 337 } 338 339 if workers.Cloud != nil { 340 workers.Cloud.Stop() 341 } 342 343 if workers.ResendInvitationEmail != nil { 344 workers.ResendInvitationEmail.Stop() 345 } 346 347 if workers.ExtractContent != nil { 348 workers.ExtractContent.Stop() 349 } 350 351 workers.running = false 352 353 mlog.Info("Stopped workers") 354} 355