1// Copyright (c) 2018 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package scorch 16 17import ( 18 "encoding/json" 19 "fmt" 20 "io/ioutil" 21 "os" 22 "sync" 23 "sync/atomic" 24 "time" 25 26 "github.com/RoaringBitmap/roaring" 27 "github.com/blevesearch/bleve/analysis" 28 "github.com/blevesearch/bleve/document" 29 "github.com/blevesearch/bleve/index" 30 "github.com/blevesearch/bleve/index/scorch/segment" 31 "github.com/blevesearch/bleve/index/scorch/segment/zap" 32 "github.com/blevesearch/bleve/index/store" 33 "github.com/blevesearch/bleve/registry" 34 bolt "github.com/etcd-io/bbolt" 35) 36 37const Name = "scorch" 38 39const Version uint8 = 2 40 41var ErrClosed = fmt.Errorf("scorch closed") 42 43type Scorch struct { 44 nextSegmentID uint64 45 stats Stats 46 iStats internalStats 47 48 readOnly bool 49 version uint8 50 config map[string]interface{} 51 analysisQueue *index.AnalysisQueue 52 path string 53 54 unsafeBatch bool 55 56 rootLock sync.RWMutex 57 root *IndexSnapshot // holds 1 ref-count on the root 58 rootPersisted []chan error // closed when root is persisted 59 persistedCallbacks []index.BatchCallback 60 nextSnapshotEpoch uint64 61 eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. 62 ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. 63 64 numSnapshotsToKeep int 65 closeCh chan struct{} 66 introductions chan *segmentIntroduction 67 persists chan *persistIntroduction 68 merges chan *segmentMerge 69 introducerNotifier chan *epochWatcher 70 revertToSnapshots chan *snapshotReversion 71 persisterNotifier chan *epochWatcher 72 rootBolt *bolt.DB 73 asyncTasks sync.WaitGroup 74 75 onEvent func(event Event) 76 onAsyncError func(err error) 77 78 pauseLock sync.RWMutex 79 80 pauseCount uint64 81} 82 83type internalStats struct { 84 persistEpoch uint64 85 persistSnapshotSize uint64 86 mergeEpoch uint64 87 mergeSnapshotSize uint64 88 newSegBufBytesAdded uint64 89 newSegBufBytesRemoved uint64 90 analysisBytesAdded uint64 91 analysisBytesRemoved uint64 92} 93 94func NewScorch(storeName string, 95 config map[string]interface{}, 96 analysisQueue *index.AnalysisQueue) (index.Index, error) { 97 rv := &Scorch{ 98 version: Version, 99 config: config, 100 analysisQueue: analysisQueue, 101 nextSnapshotEpoch: 1, 102 closeCh: make(chan struct{}), 103 ineligibleForRemoval: map[string]bool{}, 104 } 105 rv.root = &IndexSnapshot{parent: rv, refs: 1, creator: "NewScorch"} 106 ro, ok := config["read_only"].(bool) 107 if ok { 108 rv.readOnly = ro 109 } 110 ub, ok := config["unsafe_batch"].(bool) 111 if ok { 112 rv.unsafeBatch = ub 113 } 114 ecbName, ok := config["eventCallbackName"].(string) 115 if ok { 116 rv.onEvent = RegistryEventCallbacks[ecbName] 117 } 118 aecbName, ok := config["asyncErrorCallbackName"].(string) 119 if ok { 120 rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName] 121 } 122 return rv, nil 123} 124 125func (s *Scorch) paused() uint64 { 126 s.pauseLock.Lock() 127 pc := s.pauseCount 128 s.pauseLock.Unlock() 129 return pc 130} 131 132func (s *Scorch) incrPause() { 133 s.pauseLock.Lock() 134 s.pauseCount++ 135 s.pauseLock.Unlock() 136} 137 138func (s *Scorch) decrPause() { 139 s.pauseLock.Lock() 140 s.pauseCount-- 141 s.pauseLock.Unlock() 142} 143 144func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) { 145 if s.onEvent != nil { 146 s.incrPause() 147 s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur}) 148 s.decrPause() 149 } 150} 151 152func (s *Scorch) fireAsyncError(err error) { 153 if s.onAsyncError != nil { 154 s.onAsyncError(err) 155 } 156 atomic.AddUint64(&s.stats.TotOnErrors, 1) 157} 158 159func (s *Scorch) Open() error { 160 err := s.openBolt() 161 if err != nil { 162 return err 163 } 164 165 s.asyncTasks.Add(1) 166 go s.mainLoop() 167 168 if !s.readOnly && s.path != "" { 169 s.asyncTasks.Add(1) 170 go s.persisterLoop() 171 s.asyncTasks.Add(1) 172 go s.mergerLoop() 173 } 174 175 return nil 176} 177 178func (s *Scorch) openBolt() error { 179 var ok bool 180 s.path, ok = s.config["path"].(string) 181 if !ok { 182 return fmt.Errorf("must specify path") 183 } 184 if s.path == "" { 185 s.unsafeBatch = true 186 } 187 188 var rootBoltOpt *bolt.Options 189 if s.readOnly { 190 rootBoltOpt = &bolt.Options{ 191 ReadOnly: true, 192 } 193 } else { 194 if s.path != "" { 195 err := os.MkdirAll(s.path, 0700) 196 if err != nil { 197 return err 198 } 199 } 200 } 201 202 rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt" 203 var err error 204 if s.path != "" { 205 s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt) 206 if err != nil { 207 return err 208 } 209 210 // now see if there is any existing state to load 211 err = s.loadFromBolt() 212 if err != nil { 213 _ = s.Close() 214 return err 215 } 216 } 217 218 atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, uint64(len(s.root.segment))) 219 220 s.introductions = make(chan *segmentIntroduction) 221 s.persists = make(chan *persistIntroduction) 222 s.merges = make(chan *segmentMerge) 223 s.introducerNotifier = make(chan *epochWatcher, 1) 224 s.revertToSnapshots = make(chan *snapshotReversion) 225 s.persisterNotifier = make(chan *epochWatcher, 1) 226 227 if !s.readOnly && s.path != "" { 228 err := s.removeOldZapFiles() // Before persister or merger create any new files. 229 if err != nil { 230 _ = s.Close() 231 return err 232 } 233 } 234 235 s.numSnapshotsToKeep = NumSnapshotsToKeep 236 if v, ok := s.config["numSnapshotsToKeep"]; ok { 237 var t int 238 if t, err = parseToInteger(v); err != nil { 239 return fmt.Errorf("numSnapshotsToKeep parse err: %v", err) 240 } 241 if t > 0 { 242 s.numSnapshotsToKeep = t 243 } 244 } 245 246 return nil 247} 248 249func (s *Scorch) Close() (err error) { 250 startTime := time.Now() 251 defer func() { 252 s.fireEvent(EventKindClose, time.Since(startTime)) 253 }() 254 255 s.fireEvent(EventKindCloseStart, 0) 256 257 // signal to async tasks we want to close 258 close(s.closeCh) 259 // wait for them to close 260 s.asyncTasks.Wait() 261 // now close the root bolt 262 if s.rootBolt != nil { 263 err = s.rootBolt.Close() 264 s.rootLock.Lock() 265 if s.root != nil { 266 _ = s.root.DecRef() 267 } 268 s.root = nil 269 s.rootLock.Unlock() 270 } 271 272 return 273} 274 275func (s *Scorch) Update(doc *document.Document) error { 276 b := index.NewBatch() 277 b.Update(doc) 278 return s.Batch(b) 279} 280 281func (s *Scorch) Delete(id string) error { 282 b := index.NewBatch() 283 b.Delete(id) 284 return s.Batch(b) 285} 286 287// Batch applices a batch of changes to the index atomically 288func (s *Scorch) Batch(batch *index.Batch) (err error) { 289 start := time.Now() 290 291 defer func() { 292 s.fireEvent(EventKindBatchIntroduction, time.Since(start)) 293 }() 294 295 resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) 296 297 var numUpdates uint64 298 var numDeletes uint64 299 var numPlainTextBytes uint64 300 var ids []string 301 for docID, doc := range batch.IndexOps { 302 if doc != nil { 303 // insert _id field 304 doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil)) 305 numUpdates++ 306 numPlainTextBytes += doc.NumPlainTextBytes() 307 } else { 308 numDeletes++ 309 } 310 ids = append(ids, docID) 311 } 312 313 // FIXME could sort ids list concurrent with analysis? 314 315 if numUpdates > 0 { 316 go func() { 317 for _, doc := range batch.IndexOps { 318 if doc != nil { 319 aw := index.NewAnalysisWork(s, doc, resultChan) 320 // put the work on the queue 321 s.analysisQueue.Queue(aw) 322 } 323 } 324 }() 325 } 326 327 // wait for analysis result 328 analysisResults := make([]*index.AnalysisResult, int(numUpdates)) 329 var itemsDeQueued uint64 330 var totalAnalysisSize int 331 for itemsDeQueued < numUpdates { 332 result := <-resultChan 333 resultSize := result.Size() 334 atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) 335 totalAnalysisSize += resultSize 336 analysisResults[itemsDeQueued] = result 337 itemsDeQueued++ 338 } 339 close(resultChan) 340 defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize)) 341 342 atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start))) 343 344 indexStart := time.Now() 345 346 // notify handlers that we're about to introduce a segment 347 s.fireEvent(EventKindBatchIntroductionStart, 0) 348 349 var newSegment segment.Segment 350 var bufBytes uint64 351 if len(analysisResults) > 0 { 352 newSegment, bufBytes, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor) 353 if err != nil { 354 return err 355 } 356 atomic.AddUint64(&s.iStats.newSegBufBytesAdded, bufBytes) 357 } else { 358 atomic.AddUint64(&s.stats.TotBatchesEmpty, 1) 359 } 360 361 err = s.prepareSegment(newSegment, ids, batch.InternalOps, batch.PersistedCallback()) 362 if err != nil { 363 if newSegment != nil { 364 _ = newSegment.Close() 365 } 366 atomic.AddUint64(&s.stats.TotOnErrors, 1) 367 } else { 368 atomic.AddUint64(&s.stats.TotUpdates, numUpdates) 369 atomic.AddUint64(&s.stats.TotDeletes, numDeletes) 370 atomic.AddUint64(&s.stats.TotBatches, 1) 371 atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes) 372 } 373 374 atomic.AddUint64(&s.iStats.newSegBufBytesRemoved, bufBytes) 375 atomic.AddUint64(&s.stats.TotIndexTime, uint64(time.Since(indexStart))) 376 377 return err 378} 379 380func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, 381 internalOps map[string][]byte, persistedCallback index.BatchCallback) error { 382 383 // new introduction 384 introduction := &segmentIntroduction{ 385 id: atomic.AddUint64(&s.nextSegmentID, 1), 386 data: newSegment, 387 ids: ids, 388 obsoletes: make(map[uint64]*roaring.Bitmap), 389 internal: internalOps, 390 applied: make(chan error), 391 persistedCallback: persistedCallback, 392 } 393 394 if !s.unsafeBatch { 395 introduction.persisted = make(chan error, 1) 396 } 397 398 // optimistically prepare obsoletes outside of rootLock 399 s.rootLock.RLock() 400 root := s.root 401 root.AddRef() 402 s.rootLock.RUnlock() 403 404 defer func() { _ = root.DecRef() }() 405 406 for _, seg := range root.segment { 407 delta, err := seg.segment.DocNumbers(ids) 408 if err != nil { 409 return err 410 } 411 introduction.obsoletes[seg.id] = delta 412 } 413 414 introStartTime := time.Now() 415 416 s.introductions <- introduction 417 418 // block until this segment is applied 419 err := <-introduction.applied 420 if err != nil { 421 return err 422 } 423 424 if introduction.persisted != nil { 425 err = <-introduction.persisted 426 } 427 428 introTime := uint64(time.Since(introStartTime)) 429 atomic.AddUint64(&s.stats.TotBatchIntroTime, introTime) 430 if atomic.LoadUint64(&s.stats.MaxBatchIntroTime) < introTime { 431 atomic.StoreUint64(&s.stats.MaxBatchIntroTime, introTime) 432 } 433 434 return err 435} 436 437func (s *Scorch) SetInternal(key, val []byte) error { 438 b := index.NewBatch() 439 b.SetInternal(key, val) 440 return s.Batch(b) 441} 442 443func (s *Scorch) DeleteInternal(key []byte) error { 444 b := index.NewBatch() 445 b.DeleteInternal(key) 446 return s.Batch(b) 447} 448 449// Reader returns a low-level accessor on the index data. Close it to 450// release associated resources. 451func (s *Scorch) Reader() (index.IndexReader, error) { 452 return s.currentSnapshot(), nil 453} 454 455func (s *Scorch) currentSnapshot() *IndexSnapshot { 456 s.rootLock.RLock() 457 rv := s.root 458 if rv != nil { 459 rv.AddRef() 460 } 461 s.rootLock.RUnlock() 462 return rv 463} 464 465func (s *Scorch) Stats() json.Marshaler { 466 return &s.stats 467} 468 469func (s *Scorch) diskFileStats(rootSegmentPaths map[string]struct{}) (uint64, 470 uint64, uint64) { 471 var numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot uint64 472 if s.path != "" { 473 finfos, err := ioutil.ReadDir(s.path) 474 if err == nil { 475 for _, finfo := range finfos { 476 if !finfo.IsDir() { 477 numBytesUsedDisk += uint64(finfo.Size()) 478 numFilesOnDisk++ 479 if rootSegmentPaths != nil { 480 fname := s.path + string(os.PathSeparator) + finfo.Name() 481 if _, fileAtRoot := rootSegmentPaths[fname]; fileAtRoot { 482 numBytesOnDiskByRoot += uint64(finfo.Size()) 483 } 484 } 485 } 486 } 487 } 488 } 489 // if no root files path given, then consider all disk files. 490 if rootSegmentPaths == nil { 491 return numFilesOnDisk, numBytesUsedDisk, numBytesUsedDisk 492 } 493 494 return numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot 495} 496 497func (s *Scorch) rootDiskSegmentsPaths() map[string]struct{} { 498 rv := make(map[string]struct{}, len(s.root.segment)) 499 for _, segmentSnapshot := range s.root.segment { 500 switch seg := segmentSnapshot.segment.(type) { 501 case *zap.Segment: 502 rv[seg.Path()] = struct{}{} 503 } 504 } 505 return rv 506} 507 508func (s *Scorch) StatsMap() map[string]interface{} { 509 m := s.stats.ToMap() 510 511 s.rootLock.RLock() 512 rootSegPaths := s.rootDiskSegmentsPaths() 513 m["CurFilesIneligibleForRemoval"] = uint64(len(s.ineligibleForRemoval)) 514 s.rootLock.RUnlock() 515 516 numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot := s.diskFileStats(rootSegPaths) 517 518 m["CurOnDiskBytes"] = numBytesUsedDisk 519 m["CurOnDiskFiles"] = numFilesOnDisk 520 521 // TODO: consider one day removing these backwards compatible 522 // names for apps using the old names 523 m["updates"] = m["TotUpdates"] 524 m["deletes"] = m["TotDeletes"] 525 m["batches"] = m["TotBatches"] 526 m["errors"] = m["TotOnErrors"] 527 m["analysis_time"] = m["TotAnalysisTime"] 528 m["index_time"] = m["TotIndexTime"] 529 m["term_searchers_started"] = m["TotTermSearchersStarted"] 530 m["term_searchers_finished"] = m["TotTermSearchersFinished"] 531 m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"] 532 m["num_items_introduced"] = m["TotIntroducedItems"] 533 m["num_items_persisted"] = m["TotPersistedItems"] 534 m["num_recs_to_persist"] = m["TotItemsToPersist"] 535 // total disk bytes found in index directory inclusive of older snapshots 536 m["num_bytes_used_disk"] = numBytesUsedDisk 537 // total disk bytes by the latest root index, exclusive of older snapshots 538 m["num_bytes_used_disk_by_root"] = numBytesOnDiskByRoot 539 m["num_files_on_disk"] = numFilesOnDisk 540 m["num_root_memorysegments"] = m["TotMemorySegmentsAtRoot"] 541 m["num_root_filesegments"] = m["TotFileSegmentsAtRoot"] 542 m["num_persister_nap_pause_completed"] = m["TotPersisterNapPauseCompleted"] 543 m["num_persister_nap_merger_break"] = m["TotPersisterMergerNapBreak"] 544 m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"] 545 546 return m 547} 548 549func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult { 550 rv := &index.AnalysisResult{ 551 Document: d, 552 Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)), 553 Length: make([]int, len(d.Fields)+len(d.CompositeFields)), 554 } 555 556 for i, field := range d.Fields { 557 if field.Options().IsIndexed() { 558 fieldLength, tokenFreqs := field.Analyze() 559 rv.Analyzed[i] = tokenFreqs 560 rv.Length[i] = fieldLength 561 562 if len(d.CompositeFields) > 0 && field.Name() != "_id" { 563 // see if any of the composite fields need this 564 for _, compositeField := range d.CompositeFields { 565 compositeField.Compose(field.Name(), fieldLength, tokenFreqs) 566 } 567 } 568 } 569 } 570 571 return rv 572} 573 574func (s *Scorch) Advanced() (store.KVStore, error) { 575 return nil, nil 576} 577 578func (s *Scorch) AddEligibleForRemoval(epoch uint64) { 579 s.rootLock.Lock() 580 if s.root == nil || s.root.epoch != epoch { 581 s.eligibleForRemoval = append(s.eligibleForRemoval, epoch) 582 } 583 s.rootLock.Unlock() 584} 585 586func (s *Scorch) MemoryUsed() (memUsed uint64) { 587 indexSnapshot := s.currentSnapshot() 588 if indexSnapshot == nil { 589 return 590 } 591 592 defer func() { 593 _ = indexSnapshot.Close() 594 }() 595 596 // Account for current root snapshot overhead 597 memUsed += uint64(indexSnapshot.Size()) 598 599 // Account for snapshot that the persister may be working on 600 persistEpoch := atomic.LoadUint64(&s.iStats.persistEpoch) 601 persistSnapshotSize := atomic.LoadUint64(&s.iStats.persistSnapshotSize) 602 if persistEpoch != 0 && indexSnapshot.epoch > persistEpoch { 603 // the snapshot that the persister is working on isn't the same as 604 // the current snapshot 605 memUsed += persistSnapshotSize 606 } 607 608 // Account for snapshot that the merger may be working on 609 mergeEpoch := atomic.LoadUint64(&s.iStats.mergeEpoch) 610 mergeSnapshotSize := atomic.LoadUint64(&s.iStats.mergeSnapshotSize) 611 if mergeEpoch != 0 && indexSnapshot.epoch > mergeEpoch { 612 // the snapshot that the merger is working on isn't the same as 613 // the current snapshot 614 memUsed += mergeSnapshotSize 615 } 616 617 memUsed += (atomic.LoadUint64(&s.iStats.newSegBufBytesAdded) - 618 atomic.LoadUint64(&s.iStats.newSegBufBytesRemoved)) 619 620 memUsed += (atomic.LoadUint64(&s.iStats.analysisBytesAdded) - 621 atomic.LoadUint64(&s.iStats.analysisBytesRemoved)) 622 623 return memUsed 624} 625 626func (s *Scorch) markIneligibleForRemoval(filename string) { 627 s.rootLock.Lock() 628 s.ineligibleForRemoval[filename] = true 629 s.rootLock.Unlock() 630} 631 632func (s *Scorch) unmarkIneligibleForRemoval(filename string) { 633 s.rootLock.Lock() 634 delete(s.ineligibleForRemoval, filename) 635 s.rootLock.Unlock() 636} 637 638func init() { 639 registry.RegisterIndexType(Name, NewScorch) 640} 641 642func parseToInteger(i interface{}) (int, error) { 643 switch v := i.(type) { 644 case float64: 645 return int(v), nil 646 case int: 647 return v, nil 648 649 default: 650 return 0, fmt.Errorf("expects int or float64 value") 651 } 652} 653