1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. 2// See LICENSE.txt for license information. 3 4package ebleveengine 5 6import ( 7 "context" 8 "net/http" 9 "strconv" 10 "time" 11 12 "github.com/mattermost/mattermost-server/v6/app" 13 "github.com/mattermost/mattermost-server/v6/jobs" 14 tjobs "github.com/mattermost/mattermost-server/v6/jobs/interfaces" 15 "github.com/mattermost/mattermost-server/v6/model" 16 "github.com/mattermost/mattermost-server/v6/services/searchengine/bleveengine" 17 "github.com/mattermost/mattermost-server/v6/shared/mlog" 18) 19 20const ( 21 BatchSize = 1000 22 TimeBetweenBatches = 100 23 EstimatedPostCount = 10000000 24 EstimatedFilesCount = 100000 25 EstimatedChannelCount = 100000 26 EstimatedUserCount = 10000 27) 28 29func init() { 30 app.RegisterJobsBleveIndexerInterface(func(s *app.Server) tjobs.IndexerJobInterface { 31 return &BleveIndexerInterfaceImpl{s} 32 }) 33} 34 35type BleveIndexerInterfaceImpl struct { 36 Server *app.Server 37} 38 39type BleveIndexerWorker struct { 40 name string 41 stop chan bool 42 stopped chan bool 43 jobs chan model.Job 44 jobServer *jobs.JobServer 45 46 engine *bleveengine.BleveEngine 47} 48 49func (bi *BleveIndexerInterfaceImpl) MakeWorker() model.Worker { 50 if bi.Server.SearchEngine.BleveEngine == nil { 51 return nil 52 } 53 return &BleveIndexerWorker{ 54 name: "BleveIndexer", 55 stop: make(chan bool, 1), 56 stopped: make(chan bool, 1), 57 jobs: make(chan model.Job), 58 jobServer: bi.Server.Jobs, 59 60 engine: bi.Server.SearchEngine.BleveEngine.(*bleveengine.BleveEngine), 61 } 62} 63 64type IndexingProgress struct { 65 Now time.Time 66 StartAtTime int64 67 EndAtTime int64 68 LastEntityTime int64 69 TotalPostsCount int64 70 DonePostsCount int64 71 DonePosts bool 72 TotalFilesCount int64 73 DoneFilesCount int64 74 DoneFiles bool 75 TotalChannelsCount int64 76 DoneChannelsCount int64 77 DoneChannels bool 78 TotalUsersCount int64 79 DoneUsersCount int64 80 DoneUsers bool 81} 82 83func (ip *IndexingProgress) CurrentProgress() int64 { 84 return (ip.DonePostsCount + ip.DoneChannelsCount + ip.DoneUsersCount + ip.DoneFilesCount) * 100 / (ip.TotalPostsCount + ip.TotalChannelsCount + ip.TotalUsersCount + ip.TotalFilesCount) 85} 86 87func (ip *IndexingProgress) IsDone() bool { 88 return ip.DonePosts && ip.DoneChannels && ip.DoneUsers && ip.DoneFiles 89} 90 91func (worker *BleveIndexerWorker) JobChannel() chan<- model.Job { 92 return worker.jobs 93} 94 95func (worker *BleveIndexerWorker) Run() { 96 mlog.Debug("Worker Started", mlog.String("workername", worker.name)) 97 98 defer func() { 99 mlog.Debug("Worker: Finished", mlog.String("workername", worker.name)) 100 worker.stopped <- true 101 }() 102 103 for { 104 select { 105 case <-worker.stop: 106 mlog.Debug("Worker: Received stop signal", mlog.String("workername", worker.name)) 107 return 108 case job := <-worker.jobs: 109 mlog.Debug("Worker: Received a new candidate job.", mlog.String("workername", worker.name)) 110 worker.DoJob(&job) 111 } 112 } 113} 114 115func (worker *BleveIndexerWorker) Stop() { 116 mlog.Debug("Worker Stopping", mlog.String("workername", worker.name)) 117 worker.stop <- true 118 <-worker.stopped 119} 120 121func (worker *BleveIndexerWorker) DoJob(job *model.Job) { 122 claimed, err := worker.jobServer.ClaimJob(job) 123 if err != nil { 124 mlog.Warn("Worker: Error occurred while trying to claim job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 125 return 126 } 127 if !claimed { 128 return 129 } 130 131 mlog.Info("Worker: Indexing job claimed by worker", mlog.String("workername", worker.name), mlog.String("job_id", job.Id)) 132 133 if !worker.engine.IsActive() { 134 appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.engine_inactive", nil, "", http.StatusInternalServerError) 135 if err := worker.jobServer.SetJobError(job, appError); err != nil { 136 mlog.Error("Worker: Failed to run job as ") 137 } 138 return 139 } 140 141 progress := IndexingProgress{ 142 Now: time.Now(), 143 DonePosts: false, 144 DoneChannels: false, 145 DoneUsers: false, 146 DoneFiles: false, 147 StartAtTime: 0, 148 EndAtTime: model.GetMillis(), 149 } 150 151 // Extract the start and end times, if they are set. 152 if startString, ok := job.Data["start_time"]; ok { 153 startInt, err := strconv.ParseInt(startString, 10, 64) 154 if err != nil { 155 mlog.Error("Worker: Failed to parse start_time for job", mlog.String("workername", worker.name), mlog.String("start_time", startString), mlog.String("job_id", job.Id), mlog.Err(err)) 156 appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.parse_start_time.error", nil, err.Error(), http.StatusInternalServerError) 157 if err := worker.jobServer.SetJobError(job, appError); err != nil { 158 mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError)) 159 } 160 return 161 } 162 progress.StartAtTime = startInt 163 progress.LastEntityTime = progress.StartAtTime 164 } else { 165 // Set start time to oldest entity in the database. 166 // A user or a channel may be created before any post. 167 oldestEntityCreationTime, err := worker.jobServer.Store.Post().GetOldestEntityCreationTime() 168 if err != nil { 169 mlog.Error("Worker: Failed to fetch oldest entity for job.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.String("start_time", startString), mlog.Err(err)) 170 appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.get_oldest_entity.error", nil, err.Error(), http.StatusInternalServerError) 171 if err := worker.jobServer.SetJobError(job, appError); err != nil { 172 mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError)) 173 } 174 return 175 } 176 progress.StartAtTime = oldestEntityCreationTime 177 progress.LastEntityTime = progress.StartAtTime 178 } 179 180 if endString, ok := job.Data["end_time"]; ok { 181 endInt, err := strconv.ParseInt(endString, 10, 64) 182 if err != nil { 183 mlog.Error("Worker: Failed to parse end_time for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.String("end_time", endString), mlog.Err(err)) 184 appError := model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.do_job.parse_end_time.error", nil, err.Error(), http.StatusInternalServerError) 185 if err := worker.jobServer.SetJobError(job, appError); err != nil { 186 mlog.Error("Worker: Failed to set job errorv", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err), mlog.NamedErr("set_error", appError)) 187 } 188 return 189 } 190 progress.EndAtTime = endInt 191 } 192 193 // Counting all posts may fail or timeout when the posts table is large. If this happens, log a warning, but carry 194 // on with the indexing job anyway. The only issue is that the progress % reporting will be inaccurate. 195 if count, err := worker.jobServer.Store.Post().AnalyticsPostCount("", false, false); err != nil { 196 mlog.Warn("Worker: Failed to fetch total post count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 197 progress.TotalPostsCount = EstimatedPostCount 198 } else { 199 progress.TotalPostsCount = count 200 } 201 202 // Same possible fail as above can happen when counting channels 203 if count, err := worker.jobServer.Store.Channel().AnalyticsTypeCount("", "O"); err != nil { 204 mlog.Warn("Worker: Failed to fetch total channel count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 205 progress.TotalChannelsCount = EstimatedChannelCount 206 } else { 207 progress.TotalChannelsCount = count 208 } 209 210 // Same possible fail as above can happen when counting users 211 if count, err := worker.jobServer.Store.User().Count(model.UserCountOptions{}); err != nil { 212 mlog.Warn("Worker: Failed to fetch total user count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 213 progress.TotalUsersCount = EstimatedUserCount 214 } else { 215 progress.TotalUsersCount = count 216 } 217 218 // Counting all files may fail or timeout when the file_info table is large. If this happens, log a warning, but carry 219 // on with the indexing job anyway. The only issue is that the progress % reporting will be inaccurate. 220 if count, err := worker.jobServer.Store.FileInfo().CountAll(); err != nil { 221 mlog.Warn("Worker: Failed to fetch total file info count for job. An estimated value will be used for progress reporting.", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 222 progress.TotalFilesCount = EstimatedFilesCount 223 } else { 224 progress.TotalFilesCount = count 225 } 226 227 cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) 228 cancelWatcherChan := make(chan interface{}, 1) 229 go worker.jobServer.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) 230 231 defer cancelCancelWatcher() 232 233 for { 234 select { 235 case <-cancelWatcherChan: 236 mlog.Info("Worker: Indexing job has been canceled via CancellationWatcher", mlog.String("workername", worker.name), mlog.String("job_id", job.Id)) 237 if err := worker.jobServer.SetJobCanceled(job); err != nil { 238 mlog.Error("Worker: Failed to mark job as cancelled", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 239 } 240 return 241 242 case <-worker.stop: 243 mlog.Info("Worker: Indexing has been canceled via Worker Stop", mlog.String("workername", worker.name), mlog.String("job_id", job.Id)) 244 if err := worker.jobServer.SetJobCanceled(job); err != nil { 245 mlog.Error("Worker: Failed to mark job as canceled", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 246 } 247 return 248 249 case <-time.After(TimeBetweenBatches * time.Millisecond): 250 var err *model.AppError 251 if progress, err = worker.IndexBatch(progress); err != nil { 252 mlog.Error("Worker: Failed to index batch for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 253 if err2 := worker.jobServer.SetJobError(job, err); err2 != nil { 254 mlog.Error("Worker: Failed to set job error", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err)) 255 } 256 return 257 } 258 259 if err := worker.jobServer.SetJobProgress(job, progress.CurrentProgress()); err != nil { 260 mlog.Error("Worker: Failed to set progress for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 261 if err2 := worker.jobServer.SetJobError(job, err); err2 != nil { 262 mlog.Error("Worker: Failed to set error for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err)) 263 } 264 return 265 } 266 267 if progress.IsDone() { 268 if err := worker.jobServer.SetJobSuccess(job); err != nil { 269 mlog.Error("Worker: Failed to set success for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err)) 270 if err2 := worker.jobServer.SetJobError(job, err); err2 != nil { 271 mlog.Error("Worker: Failed to set error for job", mlog.String("workername", worker.name), mlog.String("job_id", job.Id), mlog.Err(err2), mlog.NamedErr("set_error", err)) 272 } 273 } 274 mlog.Info("Worker: Indexing job finished successfully", mlog.String("workername", worker.name), mlog.String("job_id", job.Id)) 275 return 276 } 277 } 278 } 279} 280 281func (worker *BleveIndexerWorker) IndexBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) { 282 if !progress.DonePosts { 283 return worker.IndexPostsBatch(progress) 284 } 285 if !progress.DoneChannels { 286 return worker.IndexChannelsBatch(progress) 287 } 288 if !progress.DoneUsers { 289 return worker.IndexUsersBatch(progress) 290 } 291 if !progress.DoneFiles { 292 return worker.IndexFilesBatch(progress) 293 } 294 return progress, model.NewAppError("BleveIndexerWorker", "bleveengine.indexer.index_batch.nothing_left_to_index.error", nil, "", http.StatusInternalServerError) 295} 296 297func (worker *BleveIndexerWorker) IndexPostsBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) { 298 endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000) 299 300 var posts []*model.PostForIndexing 301 302 tries := 0 303 for posts == nil { 304 var err error 305 posts, err = worker.jobServer.Store.Post().GetPostsBatchForIndexing(progress.LastEntityTime, endTime, BatchSize) 306 if err != nil { 307 if tries >= 10 { 308 return progress, model.NewAppError("IndexPostsBatch", "app.post.get_posts_batch_for_indexing.get.app_error", nil, err.Error(), http.StatusInternalServerError) 309 } 310 mlog.Warn("Failed to get posts batch for indexing. Retrying.", mlog.Err(err)) 311 312 // Wait a bit before trying again. 313 time.Sleep(15 * time.Second) 314 } 315 316 tries++ 317 } 318 319 newLastMessageTime, err := worker.BulkIndexPosts(posts, progress) 320 if err != nil { 321 return progress, err 322 } 323 324 // Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this 325 // case, set the "newLastMessageTime" to the endTime so we don't get stuck running the same query in a loop. 326 if len(posts) < BatchSize { 327 newLastMessageTime = endTime 328 } 329 330 // When to Stop: we index either until we pass a batch of messages where the last 331 // message is created at or after the specified end time when setting up the batch 332 // index, or until two consecutive full batches have the same end time of their final 333 // messages. This second case is safe as long as the assumption that the database 334 // cannot contain more messages with the same CreateAt time than the batch size holds. 335 if progress.EndAtTime <= newLastMessageTime { 336 progress.DonePosts = true 337 progress.LastEntityTime = progress.StartAtTime 338 } else if progress.LastEntityTime == newLastMessageTime && len(posts) == BatchSize { 339 mlog.Warn("More posts with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastMessageTime), mlog.Int("Batch Size", BatchSize)) 340 progress.DonePosts = true 341 progress.LastEntityTime = progress.StartAtTime 342 } else { 343 progress.LastEntityTime = newLastMessageTime 344 } 345 346 progress.DonePostsCount += int64(len(posts)) 347 348 return progress, nil 349} 350 351func (worker *BleveIndexerWorker) BulkIndexPosts(posts []*model.PostForIndexing, progress IndexingProgress) (int64, *model.AppError) { 352 lastCreateAt := int64(0) 353 batch := worker.engine.PostIndex.NewBatch() 354 355 for _, post := range posts { 356 if post.DeleteAt == 0 { 357 searchPost := bleveengine.BLVPostFromPostForIndexing(post) 358 batch.Index(searchPost.Id, searchPost) 359 } else { 360 batch.Delete(post.Id) 361 } 362 363 lastCreateAt = post.CreateAt 364 } 365 366 worker.engine.Mutex.RLock() 367 defer worker.engine.Mutex.RUnlock() 368 369 if err := worker.engine.PostIndex.Batch(batch); err != nil { 370 return 0, model.NewAppError("BleveIndexerWorker.BulkIndexPosts", "bleveengine.indexer.do_job.bulk_index_posts.batch_error", nil, err.Error(), http.StatusInternalServerError) 371 } 372 return lastCreateAt, nil 373} 374 375func (worker *BleveIndexerWorker) IndexFilesBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) { 376 endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000) 377 378 var files []*model.FileForIndexing 379 380 tries := 0 381 for files == nil { 382 var err error 383 files, err = worker.jobServer.Store.FileInfo().GetFilesBatchForIndexing(progress.LastEntityTime, endTime, BatchSize) 384 if err != nil { 385 if tries >= 10 { 386 return progress, model.NewAppError("IndexFilesBatch", "app.post.get_files_batch_for_indexing.get.app_error", nil, err.Error(), http.StatusInternalServerError) 387 } 388 mlog.Warn("Failed to get files batch for indexing. Retrying.", mlog.Err(err)) 389 390 // Wait a bit before trying again. 391 time.Sleep(15 * time.Second) 392 } 393 394 tries++ 395 } 396 397 newLastFileTime, err := worker.BulkIndexFiles(files, progress) 398 if err != nil { 399 return progress, err 400 } 401 402 // Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this 403 // case, set the "newLastFileTime" to the endTime so we don't get stuck running the same query in a loop. 404 if len(files) < BatchSize { 405 newLastFileTime = endTime 406 } 407 408 // When to Stop: we index either until we pass a batch of messages where the last 409 // message is created at or after the specified end time when setting up the batch 410 // index, or until two consecutive full batches have the same end time of their final 411 // messages. This second case is safe as long as the assumption that the database 412 // cannot contain more messages with the same CreateAt time than the batch size holds. 413 if progress.EndAtTime <= newLastFileTime { 414 progress.DoneFiles = true 415 progress.LastEntityTime = progress.StartAtTime 416 } else if progress.LastEntityTime == newLastFileTime && len(files) == BatchSize { 417 mlog.Warn("More files with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastFileTime), mlog.Int("Batch Size", BatchSize)) 418 progress.DoneFiles = true 419 progress.LastEntityTime = progress.StartAtTime 420 } else { 421 progress.LastEntityTime = newLastFileTime 422 } 423 424 progress.DoneFilesCount += int64(len(files)) 425 426 return progress, nil 427} 428 429func (worker *BleveIndexerWorker) BulkIndexFiles(files []*model.FileForIndexing, progress IndexingProgress) (int64, *model.AppError) { 430 lastCreateAt := int64(0) 431 batch := worker.engine.FileIndex.NewBatch() 432 433 for _, file := range files { 434 if file.DeleteAt == 0 { 435 searchFile := bleveengine.BLVFileFromFileForIndexing(file) 436 batch.Index(searchFile.Id, searchFile) 437 } else { 438 batch.Delete(file.Id) 439 } 440 441 lastCreateAt = file.CreateAt 442 } 443 444 worker.engine.Mutex.RLock() 445 defer worker.engine.Mutex.RUnlock() 446 447 if err := worker.engine.FileIndex.Batch(batch); err != nil { 448 return 0, model.NewAppError("BleveIndexerWorker.BulkIndexPosts", "bleveengine.indexer.do_job.bulk_index_files.batch_error", nil, err.Error(), http.StatusInternalServerError) 449 } 450 return lastCreateAt, nil 451} 452 453func (worker *BleveIndexerWorker) IndexChannelsBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) { 454 endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000) 455 456 var channels []*model.Channel 457 458 tries := 0 459 for channels == nil { 460 var nErr error 461 channels, nErr = worker.jobServer.Store.Channel().GetChannelsBatchForIndexing(progress.LastEntityTime, endTime, BatchSize) 462 if nErr != nil { 463 if tries >= 10 { 464 return progress, model.NewAppError("BleveIndexerWorker.IndexChannelsBatch", "app.channel.get_channels_batch_for_indexing.get.app_error", nil, nErr.Error(), http.StatusInternalServerError) 465 } 466 467 mlog.Warn("Failed to get channels batch for indexing. Retrying.", mlog.Err(nErr)) 468 469 // Wait a bit before trying again. 470 time.Sleep(15 * time.Second) 471 } 472 tries++ 473 } 474 475 newLastChannelTime, err := worker.BulkIndexChannels(channels, progress) 476 if err != nil { 477 return progress, err 478 } 479 480 // Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this 481 // case, set the "newLastChannelTime" to the endTime so we don't get stuck running the same query in a loop. 482 if len(channels) < BatchSize { 483 newLastChannelTime = endTime 484 } 485 486 // When to Stop: we index either until we pass a batch of channels where the last 487 // channel is created at or after the specified end time when setting up the batch 488 // index, or until two consecutive full batches have the same end time of their final 489 // channels. This second case is safe as long as the assumption that the database 490 // cannot contain more channels with the same CreateAt time than the batch size holds. 491 if progress.EndAtTime <= newLastChannelTime { 492 progress.DoneChannels = true 493 progress.LastEntityTime = progress.StartAtTime 494 } else if progress.LastEntityTime == newLastChannelTime && len(channels) == BatchSize { 495 mlog.Warn("More channels with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastChannelTime), mlog.Int("Batch Size", BatchSize)) 496 progress.DoneChannels = true 497 progress.LastEntityTime = progress.StartAtTime 498 } else { 499 progress.LastEntityTime = newLastChannelTime 500 } 501 502 progress.DoneChannelsCount += int64(len(channels)) 503 504 return progress, nil 505} 506 507func (worker *BleveIndexerWorker) BulkIndexChannels(channels []*model.Channel, progress IndexingProgress) (int64, *model.AppError) { 508 lastCreateAt := int64(0) 509 batch := worker.engine.ChannelIndex.NewBatch() 510 511 for _, channel := range channels { 512 if channel.DeleteAt == 0 { 513 searchChannel := bleveengine.BLVChannelFromChannel(channel) 514 batch.Index(searchChannel.Id, searchChannel) 515 } else { 516 batch.Delete(channel.Id) 517 } 518 519 lastCreateAt = channel.CreateAt 520 } 521 522 worker.engine.Mutex.RLock() 523 defer worker.engine.Mutex.RUnlock() 524 525 if err := worker.engine.ChannelIndex.Batch(batch); err != nil { 526 return 0, model.NewAppError("BleveIndexerWorker.BulkIndexChannels", "bleveengine.indexer.do_job.bulk_index_channels.batch_error", nil, err.Error(), http.StatusInternalServerError) 527 } 528 return lastCreateAt, nil 529} 530 531func (worker *BleveIndexerWorker) IndexUsersBatch(progress IndexingProgress) (IndexingProgress, *model.AppError) { 532 endTime := progress.LastEntityTime + int64(*worker.jobServer.Config().BleveSettings.BulkIndexingTimeWindowSeconds*1000) 533 534 var users []*model.UserForIndexing 535 536 tries := 0 537 for users == nil { 538 if usersBatch, err := worker.jobServer.Store.User().GetUsersBatchForIndexing(progress.LastEntityTime, endTime, BatchSize); err != nil { 539 if tries >= 10 { 540 return progress, model.NewAppError("IndexUsersBatch", "app.user.get_users_batch_for_indexing.get_users.app_error", nil, err.Error(), http.StatusInternalServerError) 541 } 542 mlog.Warn("Failed to get users batch for indexing. Retrying.", mlog.Err(err)) 543 544 // Wait a bit before trying again. 545 time.Sleep(15 * time.Second) 546 } else { 547 users = usersBatch 548 } 549 550 tries++ 551 } 552 553 newLastUserTime, err := worker.BulkIndexUsers(users, progress) 554 if err != nil { 555 return progress, err 556 } 557 558 // Due to the "endTime" parameter in the store query, we might get an incomplete batch before the end. In this 559 // case, set the "newLastUserTime" to the endTime so we don't get stuck running the same query in a loop. 560 if len(users) < BatchSize { 561 newLastUserTime = endTime 562 } 563 564 // When to Stop: we index either until we pass a batch of users where the last 565 // user is created at or after the specified end time when setting up the batch 566 // index, or until two consecutive full batches have the same end time of their final 567 // users. This second case is safe as long as the assumption that the database 568 // cannot contain more users with the same CreateAt time than the batch size holds. 569 if progress.EndAtTime <= newLastUserTime { 570 progress.DoneUsers = true 571 progress.LastEntityTime = progress.StartAtTime 572 } else if progress.LastEntityTime == newLastUserTime && len(users) == BatchSize { 573 mlog.Warn("More users with the same CreateAt time were detected than the permitted batch size. Aborting indexing job.", mlog.Int64("CreateAt", newLastUserTime), mlog.Int("Batch Size", BatchSize)) 574 progress.DoneUsers = true 575 progress.LastEntityTime = progress.StartAtTime 576 } else { 577 progress.LastEntityTime = newLastUserTime 578 } 579 580 progress.DoneUsersCount += int64(len(users)) 581 582 return progress, nil 583} 584 585func (worker *BleveIndexerWorker) BulkIndexUsers(users []*model.UserForIndexing, progress IndexingProgress) (int64, *model.AppError) { 586 lastCreateAt := int64(0) 587 batch := worker.engine.UserIndex.NewBatch() 588 589 for _, user := range users { 590 if user.DeleteAt == 0 { 591 searchUser := bleveengine.BLVUserFromUserForIndexing(user) 592 batch.Index(searchUser.Id, searchUser) 593 } else { 594 batch.Delete(user.Id) 595 } 596 597 lastCreateAt = user.CreateAt 598 } 599 600 worker.engine.Mutex.RLock() 601 defer worker.engine.Mutex.RUnlock() 602 603 if err := worker.engine.UserIndex.Batch(batch); err != nil { 604 return 0, model.NewAppError("BleveIndexerWorker.BulkIndexUsers", "bleveengine.indexer.do_job.bulk_index_users.batch_error", nil, err.Error(), http.StatusInternalServerError) 605 } 606 return lastCreateAt, nil 607} 608