1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
2// See LICENSE.txt for license information.
3
4package jobs
5
6import (
7	"errors"
8
9	"github.com/mattermost/mattermost-server/v6/model"
10	"github.com/mattermost/mattermost-server/v6/services/configservice"
11	"github.com/mattermost/mattermost-server/v6/shared/mlog"
12)
13
14type Workers struct {
15	ConfigService configservice.ConfigService
16	Watcher       *Watcher
17
18	DataRetention            model.Worker
19	MessageExport            model.Worker
20	ElasticsearchIndexing    model.Worker
21	ElasticsearchAggregation model.Worker
22	LdapSync                 model.Worker
23	Migrations               model.Worker
24	Plugins                  model.Worker
25	BleveIndexing            model.Worker
26	ExpiryNotify             model.Worker
27	ProductNotices           model.Worker
28	ActiveUsers              model.Worker
29	ImportProcess            model.Worker
30	ImportDelete             model.Worker
31	ExportProcess            model.Worker
32	ExportDelete             model.Worker
33	Cloud                    model.Worker
34	ResendInvitationEmail    model.Worker
35	ExtractContent           model.Worker
36
37	listenerId string
38	running    bool
39}
40
41var (
42	ErrWorkersNotRunning    = errors.New("job workers are not running")
43	ErrWorkersRunning       = errors.New("job workers are running")
44	ErrWorkersUninitialized = errors.New("job workers are not initialized")
45)
46
47func (srv *JobServer) InitWorkers() error {
48	srv.mut.Lock()
49	defer srv.mut.Unlock()
50
51	if srv.workers != nil && srv.workers.running {
52		return ErrWorkersRunning
53	}
54
55	workers := &Workers{
56		ConfigService: srv.ConfigService,
57	}
58	workers.Watcher = srv.MakeWatcher(workers, DefaultWatcherPollingInterval)
59
60	if srv.DataRetentionJob != nil {
61		workers.DataRetention = srv.DataRetentionJob.MakeWorker()
62	}
63
64	if srv.MessageExportJob != nil {
65		workers.MessageExport = srv.MessageExportJob.MakeWorker()
66	}
67
68	if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil {
69		workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
70	}
71
72	if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
73		workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker()
74	}
75
76	if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
77		workers.LdapSync = ldapSyncInterface.MakeWorker()
78	}
79
80	if migrationsInterface := srv.Migrations; migrationsInterface != nil {
81		workers.Migrations = migrationsInterface.MakeWorker()
82	}
83
84	if pluginsInterface := srv.Plugins; pluginsInterface != nil {
85		workers.Plugins = pluginsInterface.MakeWorker()
86	}
87
88	if bleveIndexerInterface := srv.BleveIndexer; bleveIndexerInterface != nil {
89		workers.BleveIndexing = bleveIndexerInterface.MakeWorker()
90	}
91
92	if expiryNotifyInterface := srv.ExpiryNotify; expiryNotifyInterface != nil {
93		workers.ExpiryNotify = expiryNotifyInterface.MakeWorker()
94	}
95
96	if activeUsersInterface := srv.ActiveUsers; activeUsersInterface != nil {
97		workers.ActiveUsers = activeUsersInterface.MakeWorker()
98	}
99
100	if productNoticesInterface := srv.ProductNotices; productNoticesInterface != nil {
101		workers.ProductNotices = productNoticesInterface.MakeWorker()
102	}
103
104	if importProcessInterface := srv.ImportProcess; importProcessInterface != nil {
105		workers.ImportProcess = importProcessInterface.MakeWorker()
106	}
107
108	if importDeleteInterface := srv.ImportDelete; importDeleteInterface != nil {
109		workers.ImportDelete = importDeleteInterface.MakeWorker()
110	}
111
112	if exportProcessInterface := srv.ExportProcess; exportProcessInterface != nil {
113		workers.ExportProcess = exportProcessInterface.MakeWorker()
114	}
115
116	if exportDeleteInterface := srv.ExportDelete; exportDeleteInterface != nil {
117		workers.ExportDelete = exportDeleteInterface.MakeWorker()
118	}
119
120	if cloudInterface := srv.Cloud; cloudInterface != nil {
121		workers.Cloud = cloudInterface.MakeWorker()
122	}
123
124	if resendInvitationEmailInterface := srv.ResendInvitationEmails; resendInvitationEmailInterface != nil {
125		workers.ResendInvitationEmail = resendInvitationEmailInterface.MakeWorker()
126	}
127
128	if extractContentInterface := srv.ExtractContent; extractContentInterface != nil {
129		workers.ExtractContent = extractContentInterface.MakeWorker()
130	}
131
132	srv.workers = workers
133
134	return nil
135}
136
137// Start starts the workers. This call is not safe for concurrent use.
138// Synchronization should be implemented by the caller.
139func (workers *Workers) Start() {
140	mlog.Info("Starting workers")
141
142	if workers.DataRetention != nil {
143		go workers.DataRetention.Run()
144	}
145
146	if workers.MessageExport != nil && *workers.ConfigService.Config().MessageExportSettings.EnableExport {
147		go workers.MessageExport.Run()
148	}
149
150	if workers.ElasticsearchIndexing != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing {
151		go workers.ElasticsearchIndexing.Run()
152	}
153
154	if workers.ElasticsearchAggregation != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing {
155		go workers.ElasticsearchAggregation.Run()
156	}
157
158	if workers.LdapSync != nil && *workers.ConfigService.Config().LdapSettings.EnableSync {
159		go workers.LdapSync.Run()
160	}
161
162	if workers.Migrations != nil {
163		go workers.Migrations.Run()
164	}
165
166	if workers.Plugins != nil {
167		go workers.Plugins.Run()
168	}
169
170	if workers.BleveIndexing != nil && *workers.ConfigService.Config().BleveSettings.EnableIndexing && *workers.ConfigService.Config().BleveSettings.IndexDir != "" {
171		go workers.BleveIndexing.Run()
172	}
173
174	if workers.ExpiryNotify != nil {
175		go workers.ExpiryNotify.Run()
176	}
177
178	if workers.ActiveUsers != nil {
179		go workers.ActiveUsers.Run()
180	}
181
182	if workers.ProductNotices != nil {
183		go workers.ProductNotices.Run()
184	}
185
186	if workers.ImportProcess != nil {
187		go workers.ImportProcess.Run()
188	}
189
190	if workers.ImportDelete != nil {
191		go workers.ImportDelete.Run()
192	}
193
194	if workers.ExportProcess != nil {
195		go workers.ExportProcess.Run()
196	}
197
198	if workers.ExportDelete != nil {
199		go workers.ExportDelete.Run()
200	}
201
202	if workers.Cloud != nil {
203		go workers.Cloud.Run()
204	}
205
206	if workers.ResendInvitationEmail != nil {
207		go workers.ResendInvitationEmail.Run()
208	}
209
210	if workers.ExtractContent != nil {
211		go workers.ExtractContent.Run()
212	}
213
214	go workers.Watcher.Start()
215
216	workers.listenerId = workers.ConfigService.AddConfigListener(workers.handleConfigChange)
217	workers.running = true
218}
219
220func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
221	mlog.Debug("Workers received config change.")
222
223	if workers.DataRetention != nil {
224		if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) {
225			go workers.DataRetention.Run()
226		} else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) {
227			workers.DataRetention.Stop()
228		}
229	}
230
231	if workers.MessageExport != nil {
232		if !*oldConfig.MessageExportSettings.EnableExport && *newConfig.MessageExportSettings.EnableExport {
233			go workers.MessageExport.Run()
234		} else if *oldConfig.MessageExportSettings.EnableExport && !*newConfig.MessageExportSettings.EnableExport {
235			workers.MessageExport.Stop()
236		}
237	}
238
239	if workers.ElasticsearchIndexing != nil {
240		if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
241			go workers.ElasticsearchIndexing.Run()
242		} else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
243			workers.ElasticsearchIndexing.Stop()
244		}
245	}
246
247	if workers.ElasticsearchAggregation != nil {
248		if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
249			go workers.ElasticsearchAggregation.Run()
250		} else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
251			workers.ElasticsearchAggregation.Stop()
252		}
253	}
254
255	if workers.LdapSync != nil {
256		if !*oldConfig.LdapSettings.EnableSync && *newConfig.LdapSettings.EnableSync {
257			go workers.LdapSync.Run()
258		} else if *oldConfig.LdapSettings.EnableSync && !*newConfig.LdapSettings.EnableSync {
259			workers.LdapSync.Stop()
260		}
261	}
262
263	if workers.BleveIndexing != nil {
264		if !*oldConfig.BleveSettings.EnableIndexing && *newConfig.BleveSettings.EnableIndexing {
265			go workers.BleveIndexing.Run()
266		} else if *oldConfig.BleveSettings.EnableIndexing && !*newConfig.BleveSettings.EnableIndexing {
267			workers.BleveIndexing.Stop()
268		}
269	}
270}
271
272// Stop stops the workers. This call is not safe for concurrent use.
273// Synchronization should be implemented by the caller.
274func (workers *Workers) Stop() {
275	workers.ConfigService.RemoveConfigListener(workers.listenerId)
276
277	workers.Watcher.Stop()
278
279	if workers.DataRetention != nil && (*workers.ConfigService.Config().DataRetentionSettings.EnableMessageDeletion || *workers.ConfigService.Config().DataRetentionSettings.EnableFileDeletion) {
280		workers.DataRetention.Stop()
281	}
282
283	if workers.MessageExport != nil && *workers.ConfigService.Config().MessageExportSettings.EnableExport {
284		workers.MessageExport.Stop()
285	}
286
287	if workers.ElasticsearchIndexing != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing {
288		workers.ElasticsearchIndexing.Stop()
289	}
290
291	if workers.ElasticsearchAggregation != nil && *workers.ConfigService.Config().ElasticsearchSettings.EnableIndexing {
292		workers.ElasticsearchAggregation.Stop()
293	}
294
295	if workers.LdapSync != nil && *workers.ConfigService.Config().LdapSettings.EnableSync {
296		workers.LdapSync.Stop()
297	}
298
299	if workers.Migrations != nil {
300		workers.Migrations.Stop()
301	}
302
303	if workers.Plugins != nil {
304		workers.Plugins.Stop()
305	}
306
307	if workers.BleveIndexing != nil && *workers.ConfigService.Config().BleveSettings.EnableIndexing {
308		workers.BleveIndexing.Stop()
309	}
310
311	if workers.ExpiryNotify != nil {
312		workers.ExpiryNotify.Stop()
313	}
314
315	if workers.ActiveUsers != nil {
316		workers.ActiveUsers.Stop()
317	}
318
319	if workers.ProductNotices != nil {
320		workers.ProductNotices.Stop()
321	}
322
323	if workers.ImportProcess != nil {
324		workers.ImportProcess.Stop()
325	}
326
327	if workers.ImportDelete != nil {
328		workers.ImportDelete.Stop()
329	}
330
331	if workers.ExportProcess != nil {
332		workers.ExportProcess.Stop()
333	}
334
335	if workers.ExportDelete != nil {
336		workers.ExportDelete.Stop()
337	}
338
339	if workers.Cloud != nil {
340		workers.Cloud.Stop()
341	}
342
343	if workers.ResendInvitationEmail != nil {
344		workers.ResendInvitationEmail.Stop()
345	}
346
347	if workers.ExtractContent != nil {
348		workers.ExtractContent.Stop()
349	}
350
351	workers.running = false
352
353	mlog.Info("Stopped workers")
354}
355