1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
2// See LICENSE.txt for license information.
3
4package ebleveengine
5
6import (
7	"context"
8	"net/http"
9	"strconv"
10	"time"
11
12	"github.com/mattermost/mattermost-server/v6/app"
13	"github.com/mattermost/mattermost-server/v6/jobs"
14	tjobs "github.com/mattermost/mattermost-server/v6/jobs/interfaces"
15	"github.com/mattermost/mattermost-server/v6/model"
16	"github.com/mattermost/mattermost-server/v6/services/searchengine/bleveengine"
17	"github.com/mattermost/mattermost-server/v6/shared/mlog"
18)
19
20const (
21	BatchSize             = 1000
22	TimeBetweenBatches    = 100
23	EstimatedPostCount    = 10000000
24	EstimatedFilesCount   = 100000
25	EstimatedChannelCount = 100000
26	EstimatedUserCount    = 10000
27)
28
29func init() {
30	app.RegisterJobsBleveIndexerInterface(func(s *app.Server) tjobs.IndexerJobInterface {
31		return &BleveIndexerInterfaceImpl{s}
32	})
33}
34
35type BleveIndexerInterfaceImpl struct {
36	Server *app.Server
37}
38
39type BleveIndexerWorker struct {
40	name      string
41	stop      chan bool
42	stopped   chan bool
43	jobs      chan model.Job
44	jobServer *jobs.JobServer
45
46	engine *bleveengine.BleveEngine
47}
48
49func (bi *BleveIndexerInterfaceImpl) MakeWorker() model.Worker {
50	if bi.Server.SearchEngine.BleveEngine == nil {
51		return nil
52	}
53	return &BleveIndexerWorker{
54		name:      "BleveIndexer",
55		stop:      make(chan bool, 1),
56		stopped:   make(chan bool, 1),
57		jobs:      make(chan model.Job),
58		jobServer: bi.Server.Jobs,
59
60		engine: bi.Server.SearchEngine.BleveEngine.(*bleveengine.BleveEngine),
61	}
62}
63
64type IndexingProgress struct {
65	Now                time.Time
66	StartAtTime        int64
67	EndAtTime          int64
68	LastEntityTime     int64
69	TotalPostsCount    int64
70	DonePostsCount     int64
71	DonePosts          bool
72	TotalFilesCount    int64
73	DoneFilesCount     int64
74	DoneFiles          bool
75	TotalChannelsCount int64
76	DoneChannelsCount  int64
77	DoneChannels       bool
78	TotalUsersCount    int64
79	DoneUsersCount     int64
80	DoneUsers          bool
81}
82
83func (ip *IndexingProgress) CurrentProgress() int64 {
84	return (ip.DonePostsCount + ip.DoneChannelsCount + ip.DoneUsersCount + ip.DoneFilesCount) * 100 / (ip.TotalPostsCount + ip.TotalChannelsCount + ip.TotalUsersCount + ip.TotalFilesCount)
85}
86
87func (ip *IndexingProgress) IsDone() bool {
88	return ip.DonePosts && ip.DoneChannels && ip.DoneUsers && ip.DoneFiles
89}
90
91func (worker *BleveIndexerWorker) JobChannel() chan<- model.Job {
92	return worker.jobs
93}
94
95func (worker *BleveIndexerWorker) Run() {
96	mlog.Debug("Worker Started", mlog.String("workername", worker.name))
97
98	defer func() {
99		mlog.Debug("Worker: Finished", mlog.String("workername", worker.name))
100		worker.stopped <- true
101	}()
102
103	for {
104		select {
105		case <-worker.stop:
106			mlog.Debug("Worker: Received stop signal", mlog.String("workername", worker.name))
107			return
108		case job := <-worker.jobs:
109			mlog.Debug("Worker: Received a new candidate job.", mlog.String("workername", worker.name))
110			worker.DoJob(&job)
111		}
112	}
113}
114
115func (worker *BleveIndexerWorker) Stop() {
116	mlog.Debug("Worker Stopping", mlog.String("workername", worker.name))
117	worker.stop <- true
118	<-worker.stopped
119}
120
121func (worker *BleveIndexerWorker) DoJob(job *model.Job) {
122	claimed, err := worker.jobServer.ClaimJob(job)
123	if err != nil {
124		mlog.Warn("Worker: Error occurred while trying to claim job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
125		return
126	}
127	if !claimed {
128		return
129	}
130
131	mlog.Info("Worker: Indexing job claimed by worker", mlog.String("workername", worker.name), mlog.String("job_id", job.Id))
132
133	if !worker.engine.IsActive() {
134		appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.engine_inactive", nil, "", http.StatusInternalServerError)
135		if err := worker.jobServer.SetJobError(job, appError); err != nil {
136			mlog.Error("Worker: Failed to run job as ")
137		}
138		return
139	}
140
141	progress := IndexingProgress{
142		Now:          time.Now(),
143		DonePosts:    false,
144		DoneChannels: false,
145		DoneUsers:    false,
146		DoneFiles:    false,
147		StartAtTime:  0,
148		EndAtTime:    model.GetMillis(),
149	}
150
151	// Extract the start and end times, if they are set.
152	if startString, ok := job.Data["start_time"]; ok {
153		startInt, err := strconv.ParseInt(startString, 10, 64)
154		if err != nil {
155			mlog.Error("Worker: Failed to parse start_time for job", mlog.String("workername", worker.name), mlog.String("start_time", startString), mlog.String("job_id", job.Id), mlog.Err(err))
156			appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.parse_start_time.error", nil, err.Error(), http.StatusInternalServerError)
157			if err := worker.jobServer.SetJobError(job, appError); err != nil {
158				mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError))
159			}
160			return
161		}
162		progress.StartAtTime = startInt
163		progress.LastEntityTime = progress.StartAtTime
164	} else {
165		// Set start time to oldest entity in the database.
166		// A user or a channel may be created before any post.
167		oldestEntityCreationTime, err := worker.jobServer.Store.Post().GetOldestEntityCreationTime()
168		if err != nil {
169			mlog.Error("Worker: Failed to fetch oldest entity for job.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.String("start_time", startString), mlog.Err(err))
170			appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.get_oldest_entity.error", nil, err.Error(), http.StatusInternalServerError)
171			if err := worker.jobServer.SetJobError(job, appError); err != nil {
172				mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError))
173			}
174			return
175		}
176		progress.StartAtTime = oldestEntityCreationTime
177		progress.LastEntityTime = progress.StartAtTime
178	}
179
180	if endString, ok := job.Data["end_time"]; ok {
181		endInt, err := strconv.ParseInt(endString, 10, 64)
182		if err != nil {
183			mlog.Error("Worker: Failed to parse end_time for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.String("end_time", endString), mlog.Err(err))
184			appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.parse_end_time.error", nil, err.Error(), http.StatusInternalServerError)
185			if err := worker.jobServer.SetJobError(job, appError); err != nil {
186				mlog.Error("Worker: Failed to set job errorv", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError))
187			}
188			return
189		}
190		progress.EndAtTime = endInt
191	}
192
193	// Counting all posts may fail or timeout when the posts table is large. If this happens, log a warning, but carry
194	// on with the indexing job anyway. The only issue is that the progress % reporting will be inaccurate.
195	if count, err := worker.jobServer.Store.Post().AnalyticsPostCount("", false, false); err != nil {
196		mlog.Warn("Worker: Failed to fetch total post count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
197		progress.TotalPostsCount = EstimatedPostCount
198	} else {
199		progress.TotalPostsCount = count
200	}
201
202	// Same possible fail as above can happen when counting channels
203	if count, err := worker.jobServer.Store.Channel().AnalyticsTypeCount("", "O"); err != nil {
204		mlog.Warn("Worker: Failed to fetch total channel count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
205		progress.TotalChannelsCount = EstimatedChannelCount
206	} else {
207		progress.TotalChannelsCount = count
208	}
209
210	// Same possible fail as above can happen when counting users
211	if count, err := worker.jobServer.Store.User().Count(model.UserCountOptions{}); err != nil {
212		mlog.Warn("Worker: Failed to fetch total user count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
213		progress.TotalUsersCount = EstimatedUserCount
214	} else {
215		progress.TotalUsersCount = count
216	}
217
218	// Counting all files may fail or timeout when the file_info table is large. If this happens, log a warning, but carry
219	// on with the indexing job anyway. The only issue is that the progress % reporting will be inaccurate.
220	if count, err := worker.jobServer.Store.FileInfo().CountAll(); err != nil {
221		mlog.Warn("Worker: Failed to fetch total file info count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
222		progress.TotalFilesCount = EstimatedFilesCount
223	} else {
224		progress.TotalFilesCount = count
225	}
226
227	cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
228	cancelWatcherChan := make(chan interface{}, 1)
229	go worker.jobServer.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
230
231	defer cancelCancelWatcher()
232
233	for {
234		select {
235		case <-cancelWatcherChan:
236			mlog.Info("Worker: Indexing job has been canceled via CancellationWatcher", mlog.String("workername", worker.name), mlog.String("job_id", job.Id))
237			if err := worker.jobServer.SetJobCanceled(job); err != nil {
238				mlog.Error("Worker: Failed to mark job as cancelled", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
239			}
240			return
241
242		case <-worker.stop:
243			mlog.Info("Worker: Indexing has been canceled via Worker Stop", mlog.String("workername", worker.name), mlog.String("job_id", job.Id))
244			if err := worker.jobServer.SetJobCanceled(job); err != nil {
245				mlog.Error("Worker: Failed to mark job as canceled", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
246			}
247			return
248
249		case <-time.After(TimeBetweenBatches * time.Millisecond):
250			var err *model.AppError
251			if progress, err = worker.IndexBatch(progress); err != nil {
252				mlog.Error("Worker: Failed to index batch for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
253				if err2 := worker.jobServer.SetJobError(job, err); err2 != nil {
254					mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err))
255				}
256				return
257			}
258
259			if err := worker.jobServer.SetJobProgress(job, progress.CurrentProgress()); err != nil {
260				mlog.Error("Worker: Failed to set progress for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
261				if err2 := worker.jobServer.SetJobError(job, err); err2 != nil {
262					mlog.Error("Worker: Failed to set error for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err))
263				}
264				return
265			}
266
267			if progress.IsDone() {
268				if err := worker.jobServer.SetJobSuccess(job); err != nil {
269					mlog.Error("Worker: Failed to set success for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err))
270					if err2 := worker.jobServer.SetJobError(job, err); err2 != nil {
271						mlog.Error("Worker: Failed to set error for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err))
272					}
273				}
274				mlog.Info("Worker: Indexing job finished successfully", mlog.String("workername", worker.name), mlog.String("job_id", job.Id))
275				return
276			}
277		}
278	}
279}
280
281func (worker *BleveIndexerWorker) IndexBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) {
282	if !progress.DonePosts {
283		return worker.IndexPostsBatch(progress)
284	}
285	if !progress.DoneChannels {
286		return worker.IndexChannelsBatch(progress)
287	}
288	if !progress.DoneUsers {
289		return worker.IndexUsersBatch(progress)
290	}
291	if !progress.DoneFiles {
292		return worker.IndexFilesBatch(progress)
293	}
294	return progress, model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.index_batch.nothing_left_to_index.error", nil, "", http.StatusInternalServerError)
295}
296
297func (worker *BleveIndexerWorker) IndexPostsBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) {
298	endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000)
299
300	var posts []*model.PostForIndexing
301
302	tries := 0
303	for posts == nil {
304		var err error
305		posts, err = worker.jobServer.Store.Post().GetPostsBatchForIndexing(progress.LastEntityTime, endTime, BatchSize)
306		if err != nil {
307			if tries >= 10 {
308				return progress, model.NewAppError("IndexPostsBatch", "app.post.get_posts_batch_for_indexing.get.app_error", nil, err.Error(), http.StatusInternalServerError)
309			}
310			mlog.Warn("Failed to get posts batch for indexing. Retrying.", mlog.Err(err))
311
312			// Wait a bit before trying again.
313			time.Sleep(15 * time.Second)
314		}
315
316		tries++
317	}
318
319	newLastMessageTime, err := worker.BulkIndexPosts(posts, progress)
320	if err != nil {
321		return progress, err
322	}
323
324	// Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this
325	// case, set the "newLastMessageTime" to the endTime so we don't get stuck running the same query in a loop.
326	if len(posts) < BatchSize {
327		newLastMessageTime = endTime
328	}
329
330	// When to Stop: we index either until we pass a batch of messages where the last
331	// message is created at or after the specified end time when setting up the batch
332	// index, or until two consecutive full batches have the same end time of their final
333	// messages. This second case is safe as long as the assumption that the database
334	// cannot contain more messages with the same CreateAt time than the batch size holds.
335	if progress.EndAtTime <= newLastMessageTime {
336		progress.DonePosts = true
337		progress.LastEntityTime = progress.StartAtTime
338	} else if progress.LastEntityTime == newLastMessageTime && len(posts) == BatchSize {
339		mlog.Warn("More posts with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastMessageTime), mlog.Int("Batch Size", BatchSize))
340		progress.DonePosts = true
341		progress.LastEntityTime = progress.StartAtTime
342	} else {
343		progress.LastEntityTime = newLastMessageTime
344	}
345
346	progress.DonePostsCount += int64(len(posts))
347
348	return progress, nil
349}
350
351func (worker *BleveIndexerWorker) BulkIndexPosts(posts []*model.PostForIndexing, progress IndexingProgress) (int64, *model.AppError) {
352	lastCreateAt := int64(0)
353	batch := worker.engine.PostIndex.NewBatch()
354
355	for _, post := range posts {
356		if post.DeleteAt == 0 {
357			searchPost := bleveengine.BLVPostFromPostForIndexing(post)
358			batch.Index(searchPost.Id, searchPost)
359		} else {
360			batch.Delete(post.Id)
361		}
362
363		lastCreateAt = post.CreateAt
364	}
365
366	worker.engine.Mutex.RLock()
367	defer worker.engine.Mutex.RUnlock()
368
369	if err := worker.engine.PostIndex.Batch(batch); err != nil {
370		return 0, model.NewAppError("BleveIndexerWorker.BulkIndexPosts", "bleveengine.indexer.do_job.bulk_index_posts.batch_error", nil, err.Error(), http.StatusInternalServerError)
371	}
372	return lastCreateAt, nil
373}
374
375func (worker *BleveIndexerWorker) IndexFilesBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) {
376	endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000)
377
378	var files []*model.FileForIndexing
379
380	tries := 0
381	for files == nil {
382		var err error
383		files, err = worker.jobServer.Store.FileInfo().GetFilesBatchForIndexing(progress.LastEntityTime, endTime, BatchSize)
384		if err != nil {
385			if tries >= 10 {
386				return progress, model.NewAppError("IndexFilesBatch", "app.post.get_files_batch_for_indexing.get.app_error", nil, err.Error(), http.StatusInternalServerError)
387			}
388			mlog.Warn("Failed to get files batch for indexing. Retrying.", mlog.Err(err))
389
390			// Wait a bit before trying again.
391			time.Sleep(15 * time.Second)
392		}
393
394		tries++
395	}
396
397	newLastFileTime, err := worker.BulkIndexFiles(files, progress)
398	if err != nil {
399		return progress, err
400	}
401
402	// Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this
403	// case, set the "newLastFileTime" to the endTime so we don't get stuck running the same query in a loop.
404	if len(files) < BatchSize {
405		newLastFileTime = endTime
406	}
407
408	// When to Stop: we index either until we pass a batch of messages where the last
409	// message is created at or after the specified end time when setting up the batch
410	// index, or until two consecutive full batches have the same end time of their final
411	// messages. This second case is safe as long as the assumption that the database
412	// cannot contain more messages with the same CreateAt time than the batch size holds.
413	if progress.EndAtTime <= newLastFileTime {
414		progress.DoneFiles = true
415		progress.LastEntityTime = progress.StartAtTime
416	} else if progress.LastEntityTime == newLastFileTime && len(files) == BatchSize {
417		mlog.Warn("More files with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastFileTime), mlog.Int("Batch Size", BatchSize))
418		progress.DoneFiles = true
419		progress.LastEntityTime = progress.StartAtTime
420	} else {
421		progress.LastEntityTime = newLastFileTime
422	}
423
424	progress.DoneFilesCount += int64(len(files))
425
426	return progress, nil
427}
428
429func (worker *BleveIndexerWorker) BulkIndexFiles(files []*model.FileForIndexing, progress IndexingProgress) (int64, *model.AppError) {
430	lastCreateAt := int64(0)
431	batch := worker.engine.FileIndex.NewBatch()
432
433	for _, file := range files {
434		if file.DeleteAt == 0 {
435			searchFile := bleveengine.BLVFileFromFileForIndexing(file)
436			batch.Index(searchFile.Id, searchFile)
437		} else {
438			batch.Delete(file.Id)
439		}
440
441		lastCreateAt = file.CreateAt
442	}
443
444	worker.engine.Mutex.RLock()
445	defer worker.engine.Mutex.RUnlock()
446
447	if err := worker.engine.FileIndex.Batch(batch); err != nil {
448		return 0, model.NewAppError("BleveIndexerWorker.BulkIndexPosts", "bleveengine.indexer.do_job.bulk_index_files.batch_error", nil, err.Error(), http.StatusInternalServerError)
449	}
450	return lastCreateAt, nil
451}
452
453func (worker *BleveIndexerWorker) IndexChannelsBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) {
454	endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000)
455
456	var channels []*model.Channel
457
458	tries := 0
459	for channels == nil {
460		var nErr error
461		channels, nErr = worker.jobServer.Store.Channel().GetChannelsBatchForIndexing(progress.LastEntityTime, endTime, BatchSize)
462		if nErr != nil {
463			if tries >= 10 {
464				return progress, model.NewAppError("BleveIndexerWorker.IndexChannelsBatch", "app.channel.get_channels_batch_for_indexing.get.app_error", nil, nErr.Error(), http.StatusInternalServerError)
465			}
466
467			mlog.Warn("Failed to get channels batch for indexing. Retrying.", mlog.Err(nErr))
468
469			// Wait a bit before trying again.
470			time.Sleep(15 * time.Second)
471		}
472		tries++
473	}
474
475	newLastChannelTime, err := worker.BulkIndexChannels(channels, progress)
476	if err != nil {
477		return progress, err
478	}
479
480	// Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this
481	// case, set the "newLastChannelTime" to the endTime so we don't get stuck running the same query in a loop.
482	if len(channels) < BatchSize {
483		newLastChannelTime = endTime
484	}
485
486	// When to Stop: we index either until we pass a batch of channels where the last
487	// channel is created at or after the specified end time when setting up the batch
488	// index, or until two consecutive full batches have the same end time of their final
489	// channels. This second case is safe as long as the assumption that the database
490	// cannot contain more channels with the same CreateAt time than the batch size holds.
491	if progress.EndAtTime <= newLastChannelTime {
492		progress.DoneChannels = true
493		progress.LastEntityTime = progress.StartAtTime
494	} else if progress.LastEntityTime == newLastChannelTime && len(channels) == BatchSize {
495		mlog.Warn("More channels with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastChannelTime), mlog.Int("Batch Size", BatchSize))
496		progress.DoneChannels = true
497		progress.LastEntityTime = progress.StartAtTime
498	} else {
499		progress.LastEntityTime = newLastChannelTime
500	}
501
502	progress.DoneChannelsCount += int64(len(channels))
503
504	return progress, nil
505}
506
507func (worker *BleveIndexerWorker) BulkIndexChannels(channels []*model.Channel, progress IndexingProgress) (int64, *model.AppError) {
508	lastCreateAt := int64(0)
509	batch := worker.engine.ChannelIndex.NewBatch()
510
511	for _, channel := range channels {
512		if channel.DeleteAt == 0 {
513			searchChannel := bleveengine.BLVChannelFromChannel(channel)
514			batch.Index(searchChannel.Id, searchChannel)
515		} else {
516			batch.Delete(channel.Id)
517		}
518
519		lastCreateAt = channel.CreateAt
520	}
521
522	worker.engine.Mutex.RLock()
523	defer worker.engine.Mutex.RUnlock()
524
525	if err := worker.engine.ChannelIndex.Batch(batch); err != nil {
526		return 0, model.NewAppError("BleveIndexerWorker.BulkIndexChannels", "bleveengine.indexer.do_job.bulk_index_channels.batch_error", nil, err.Error(), http.StatusInternalServerError)
527	}
528	return lastCreateAt, nil
529}
530
531func (worker *BleveIndexerWorker) IndexUsersBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) {
532	endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000)
533
534	var users []*model.UserForIndexing
535
536	tries := 0
537	for users == nil {
538		if usersBatch, err := worker.jobServer.Store.User().GetUsersBatchForIndexing(progress.LastEntityTime, endTime, BatchSize); err != nil {
539			if tries >= 10 {
540				return progress, model.NewAppError("IndexUsersBatch", "app.user.get_users_batch_for_indexing.get_users.app_error", nil, err.Error(), http.StatusInternalServerError)
541			}
542			mlog.Warn("Failed to get users batch for indexing. Retrying.", mlog.Err(err))
543
544			// Wait a bit before trying again.
545			time.Sleep(15 * time.Second)
546		} else {
547			users = usersBatch
548		}
549
550		tries++
551	}
552
553	newLastUserTime, err := worker.BulkIndexUsers(users, progress)
554	if err != nil {
555		return progress, err
556	}
557
558	// Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this
559	// case, set the "newLastUserTime" to the endTime so we don't get stuck running the same query in a loop.
560	if len(users) < BatchSize {
561		newLastUserTime = endTime
562	}
563
564	// When to Stop: we index either until we pass a batch of users where the last
565	// user is created at or after the specified end time when setting up the batch
566	// index, or until two consecutive full batches have the same end time of their final
567	// users. This second case is safe as long as the assumption that the database
568	// cannot contain more users with the same CreateAt time than the batch size holds.
569	if progress.EndAtTime <= newLastUserTime {
570		progress.DoneUsers = true
571		progress.LastEntityTime = progress.StartAtTime
572	} else if progress.LastEntityTime == newLastUserTime && len(users) == BatchSize {
573		mlog.Warn("More users with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastUserTime), mlog.Int("Batch Size", BatchSize))
574		progress.DoneUsers = true
575		progress.LastEntityTime = progress.StartAtTime
576	} else {
577		progress.LastEntityTime = newLastUserTime
578	}
579
580	progress.DoneUsersCount += int64(len(users))
581
582	return progress, nil
583}
584
585func (worker *BleveIndexerWorker) BulkIndexUsers(users []*model.UserForIndexing, progress IndexingProgress) (int64, *model.AppError) {
586	lastCreateAt := int64(0)
587	batch := worker.engine.UserIndex.NewBatch()
588
589	for _, user := range users {
590		if user.DeleteAt == 0 {
591			searchUser := bleveengine.BLVUserFromUserForIndexing(user)
592			batch.Index(searchUser.Id, searchUser)
593		} else {
594			batch.Delete(user.Id)
595		}
596
597		lastCreateAt = user.CreateAt
598	}
599
600	worker.engine.Mutex.RLock()
601	defer worker.engine.Mutex.RUnlock()
602
603	if err := worker.engine.UserIndex.Batch(batch); err != nil {
604		return 0, model.NewAppError("BleveIndexerWorker.BulkIndexUsers", "bleveengine.indexer.do_job.bulk_index_users.batch_error", nil, err.Error(), http.StatusInternalServerError)
605	}
606	return lastCreateAt, nil
607}
608