1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 using System; 19 using System.Collections.Generic; 20 using System.Linq; 21 using System.Threading; 22 using Lucene.Net.Support; 23 using Analyzer = Lucene.Net.Analysis.Analyzer; 24 using Document = Lucene.Net.Documents.Document; 25 using AlreadyClosedException = Lucene.Net.Store.AlreadyClosedException; 26 using Directory = Lucene.Net.Store.Directory; 27 using ArrayUtil = Lucene.Net.Util.ArrayUtil; 28 using Constants = Lucene.Net.Util.Constants; 29 using IndexSearcher = Lucene.Net.Search.IndexSearcher; 30 using Query = Lucene.Net.Search.Query; 31 using Scorer = Lucene.Net.Search.Scorer; 32 using Similarity = Lucene.Net.Search.Similarity; 33 using Weight = Lucene.Net.Search.Weight; 34 35 namespace Lucene.Net.Index 36 { 37 38 /// <summary> This class accepts multiple added documents and directly 39 /// writes a single segment file. It does this more 40 /// efficiently than creating a single segment per document 41 /// (with DocumentWriter) and doing standard merges on those 42 /// segments. 43 /// 44 /// Each added document is passed to the <see cref="DocConsumer" />, 45 /// which in turn processes the document and interacts with 46 /// other consumers in the indexing chain. Certain 47 /// consumers, like <see cref="StoredFieldsWriter" /> and <see cref="TermVectorsTermsWriter" /> 48 ///, digest a document and 49 /// immediately write bytes to the "doc store" files (ie, 50 /// they do not consume RAM per document, except while they 51 /// are processing the document). 52 /// 53 /// Other consumers, eg <see cref="FreqProxTermsWriter" /> and 54 /// <see cref="NormsWriter" />, buffer bytes in RAM and flush only 55 /// when a new segment is produced. 56 /// Once we have used our allowed RAM buffer, or the number 57 /// of added docs is large enough (in the case we are 58 /// flushing by doc count instead of RAM usage), we create a 59 /// real segment and flush it to the Directory. 60 /// 61 /// Threads: 62 /// 63 /// Multiple threads are allowed into addDocument at once. 64 /// There is an initial synchronized call to getThreadState 65 /// which allocates a ThreadState for this thread. The same 66 /// thread will get the same ThreadState over time (thread 67 /// affinity) so that if there are consistent patterns (for 68 /// example each thread is indexing a different content 69 /// source) then we make better use of RAM. Then 70 /// processDocument is called on that ThreadState without 71 /// synchronization (most of the "heavy lifting" is in this 72 /// call). Finally the synchronized "finishDocument" is 73 /// called to flush changes to the directory. 74 /// 75 /// When flush is called by IndexWriter we forcefully idle 76 /// all threads and flush only once they are all idle. This 77 /// means you can call flush with a given thread even while 78 /// other threads are actively adding/deleting documents. 79 /// 80 /// 81 /// Exceptions: 82 /// 83 /// Because this class directly updates in-memory posting 84 /// lists, and flushes stored fields and term vectors 85 /// directly to files in the directory, there are certain 86 /// limited times when an exception can corrupt this state. 87 /// For example, a disk full while flushing stored fields 88 /// leaves this file in a corrupt state. Or, an OOM 89 /// exception while appending to the in-memory posting lists 90 /// can corrupt that posting list. We call such exceptions 91 /// "aborting exceptions". In these cases we must call 92 /// abort() to discard all docs added since the last flush. 93 /// 94 /// All other exceptions ("non-aborting exceptions") can 95 /// still partially update the index structures. These 96 /// updates are consistent, but, they represent only a part 97 /// of the document seen up until the exception was hit. 98 /// When this happens, we immediately mark the document as 99 /// deleted so that the document is always atomically ("all 100 /// or none") added to the index. 101 /// </summary> 102 103 public sealed class DocumentsWriter : IDisposable 104 { 105 internal class AnonymousClassIndexingChain:IndexingChain 106 { 107 GetChain(DocumentsWriter documentsWriter)108 internal override DocConsumer GetChain(DocumentsWriter documentsWriter) 109 { 110 /* 111 This is the current indexing chain: 112 113 DocConsumer / DocConsumerPerThread 114 --> code: DocFieldProcessor / DocFieldProcessorPerThread 115 --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField 116 --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField 117 --> code: DocInverter / DocInverterPerThread / DocInverterPerField 118 --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField 119 --> code: TermsHash / TermsHashPerThread / TermsHashPerField 120 --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField 121 --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField 122 --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField 123 --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField 124 --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField 125 --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField 126 */ 127 128 // Build up indexing chain: 129 130 TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter); 131 TermsHashConsumer freqProxWriter = new FreqProxTermsWriter(); 132 133 InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, new TermsHash(documentsWriter, false, termVectorsWriter, null)); 134 NormsWriter normsWriter = new NormsWriter(); 135 DocInverter docInverter = new DocInverter(termsHash, normsWriter); 136 return new DocFieldProcessor(documentsWriter, docInverter); 137 } 138 } InitBlock()139 private void InitBlock() 140 { 141 maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH; 142 maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; 143 ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024); 144 waitQueuePauseBytes = (long) (ramBufferSize * 0.1); 145 waitQueueResumeBytes = (long) (ramBufferSize * 0.05); 146 freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 1.05); 147 freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 0.95); 148 maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; 149 skipDocWriter = new SkipDocWriter(); 150 byteBlockAllocator = new ByteBlockAllocator(this, DocumentsWriter.BYTE_BLOCK_SIZE); 151 perDocAllocator = new ByteBlockAllocator(this,DocumentsWriter.PER_DOC_BLOCK_SIZE); 152 waitQueue = new WaitQueue(this); 153 } 154 155 internal IndexWriter writer; 156 internal Directory directory; 157 158 internal System.String segment; // Current segment we are working on 159 private System.String docStoreSegment; // Current doc-store segment we are writing 160 private int docStoreOffset; // Current starting doc-store offset of current segment 161 162 private int nextDocID; // Next docID to be added 163 private int numDocsInRAM; // # docs buffered in RAM 164 internal int numDocsInStore; // # docs written to doc stores 165 166 // Max # ThreadState instances; if there are more threads 167 // than this they share ThreadStates 168 private const int MAX_THREAD_STATE = 5; 169 private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; 170 private HashMap<ThreadClass, DocumentsWriterThreadState> threadBindings = new HashMap<ThreadClass, DocumentsWriterThreadState>(); 171 172 private int pauseThreads; // Non-zero when we need all threads to 173 // pause (eg to flush) 174 internal bool flushPending; // True when a thread has decided to flush 175 internal bool bufferIsFull; // True when it's time to write segment 176 private bool aborting; // True if an abort is pending 177 178 private DocFieldProcessor docFieldProcessor; 179 180 internal System.IO.StreamWriter infoStream; 181 internal int maxFieldLength; 182 internal Similarity similarity; 183 184 internal IList<string> newFiles; 185 186 internal class DocState 187 { 188 internal DocumentsWriter docWriter; 189 internal Analyzer analyzer; 190 internal int maxFieldLength; 191 internal System.IO.StreamWriter infoStream; 192 internal Similarity similarity; 193 internal int docID; 194 internal Document doc; 195 internal System.String maxTermPrefix; 196 197 // Only called by asserts TestPoint(System.String name)198 public bool TestPoint(System.String name) 199 { 200 return docWriter.writer.TestPoint(name); 201 } 202 Clear()203 public void Clear() 204 { 205 // don't hold onto doc nor analyzer, in case it is 206 // largish: 207 doc = null; 208 analyzer = null; 209 } 210 } 211 212 /// <summary>Consumer returns this on each doc. This holds any 213 /// state that must be flushed synchronized "in docID 214 /// order". We gather these and flush them in order. 215 /// </summary> 216 internal abstract class DocWriter 217 { 218 internal DocWriter next; 219 internal int docID; Finish()220 public abstract void Finish(); Abort()221 public abstract void Abort(); SizeInBytes()222 public abstract long SizeInBytes(); 223 SetNext(DocWriter next)224 internal void SetNext(DocWriter next) 225 { 226 this.next = next; 227 } 228 } 229 230 /* 231 * Create and return a new DocWriterBuffer. 232 */ NewPerDocBuffer()233 internal PerDocBuffer NewPerDocBuffer() 234 { 235 return new PerDocBuffer(this); 236 } 237 238 /* 239 * RAMFile buffer for DocWriters. 240 */ 241 internal class PerDocBuffer : Lucene.Net.Store.RAMFile 242 { 243 DocumentsWriter enclosingInstance; PerDocBuffer(DocumentsWriter enclosingInstance)244 public PerDocBuffer(DocumentsWriter enclosingInstance) 245 { 246 this.enclosingInstance = enclosingInstance; 247 } 248 /* 249 * Allocate bytes used from shared pool. 250 */ NewBuffer(int size)251 public override byte[] NewBuffer(int size) 252 { 253 System.Diagnostics.Debug.Assert(size == PER_DOC_BLOCK_SIZE); 254 return enclosingInstance.perDocAllocator.GetByteBlock(false); 255 } 256 257 /* 258 * Recycle the bytes used. 259 */ Recycle()260 internal void Recycle() 261 { 262 lock (this) 263 { 264 if (buffers.Count > 0) 265 { 266 Length = 0; 267 268 // Recycle the blocks 269 enclosingInstance.perDocAllocator.RecycleByteBlocks(buffers); 270 buffers.Clear(); 271 sizeInBytes = 0; 272 273 System.Diagnostics.Debug.Assert(NumBuffers() == 0); 274 } 275 } 276 } 277 } 278 279 /// <summary> The IndexingChain must define the <see cref="GetChain(DocumentsWriter)" /> method 280 /// which returns the DocConsumer that the DocumentsWriter calls to process the 281 /// documents. 282 /// </summary> 283 internal abstract class IndexingChain 284 { GetChain(DocumentsWriter documentsWriter)285 internal abstract DocConsumer GetChain(DocumentsWriter documentsWriter); 286 } 287 288 internal static readonly IndexingChain DefaultIndexingChain; 289 290 internal DocConsumer consumer; 291 292 // Deletes done after the last flush; these are discarded 293 // on abort 294 private BufferedDeletes deletesInRAM = new BufferedDeletes(false); 295 296 // Deletes done before the last flush; these are still 297 // kept on abort 298 private BufferedDeletes deletesFlushed = new BufferedDeletes(true); 299 300 // The max number of delete terms that can be buffered before 301 // they must be flushed to disk. 302 private int maxBufferedDeleteTerms; 303 304 // How much RAM we can use before flushing. This is 0 if 305 // we are flushing by doc count instead. 306 private long ramBufferSize; 307 private long waitQueuePauseBytes; 308 private long waitQueueResumeBytes; 309 310 // If we've allocated 5% over our RAM budget, we then 311 // free down to 95% 312 private long freeTrigger; 313 private long freeLevel; 314 315 // Flush @ this number of docs. If ramBufferSize is 316 // non-zero we will flush by RAM usage instead. 317 private int maxBufferedDocs; 318 319 private int flushedDocCount; // How many docs already flushed to index 320 UpdateFlushedDocCount(int n)321 internal void UpdateFlushedDocCount(int n) 322 { 323 lock (this) 324 { 325 flushedDocCount += n; 326 } 327 } GetFlushedDocCount()328 internal int GetFlushedDocCount() 329 { 330 lock (this) 331 { 332 return flushedDocCount; 333 } 334 } SetFlushedDocCount(int n)335 internal void SetFlushedDocCount(int n) 336 { 337 lock (this) 338 { 339 flushedDocCount = n; 340 } 341 } 342 343 private bool closed; 344 DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain)345 internal DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain) 346 { 347 InitBlock(); 348 this.directory = directory; 349 this.writer = writer; 350 this.similarity = writer.Similarity; 351 flushedDocCount = writer.MaxDoc(); 352 353 consumer = indexingChain.GetChain(this); 354 if (consumer is DocFieldProcessor) 355 { 356 docFieldProcessor = (DocFieldProcessor) consumer; 357 } 358 } 359 360 /// <summary>Returns true if any of the fields in the current 361 /// buffered docs have omitTermFreqAndPositions==false 362 /// </summary> HasProx()363 internal bool HasProx() 364 { 365 return (docFieldProcessor != null)?docFieldProcessor.fieldInfos.HasProx():true; 366 } 367 368 /// <summary>If non-null, various details of indexing are printed 369 /// here. 370 /// </summary> SetInfoStream(System.IO.StreamWriter infoStream)371 internal void SetInfoStream(System.IO.StreamWriter infoStream) 372 { 373 lock (this) 374 { 375 this.infoStream = infoStream; 376 for (int i = 0; i < threadStates.Length; i++) 377 threadStates[i].docState.infoStream = infoStream; 378 } 379 } 380 SetMaxFieldLength(int maxFieldLength)381 internal void SetMaxFieldLength(int maxFieldLength) 382 { 383 lock (this) 384 { 385 this.maxFieldLength = maxFieldLength; 386 for (int i = 0; i < threadStates.Length; i++) 387 threadStates[i].docState.maxFieldLength = maxFieldLength; 388 } 389 } 390 SetSimilarity(Similarity similarity)391 internal void SetSimilarity(Similarity similarity) 392 { 393 lock (this) 394 { 395 this.similarity = similarity; 396 for (int i = 0; i < threadStates.Length; i++) 397 threadStates[i].docState.similarity = similarity; 398 } 399 } 400 401 /// <summary>Set how much RAM we can use before flushing. </summary> SetRAMBufferSizeMB(double mb)402 internal void SetRAMBufferSizeMB(double mb) 403 { 404 lock (this) 405 { 406 if (mb == IndexWriter.DISABLE_AUTO_FLUSH) 407 { 408 ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; 409 waitQueuePauseBytes = 4 * 1024 * 1024; 410 waitQueueResumeBytes = 2 * 1024 * 1024; 411 } 412 else 413 { 414 ramBufferSize = (long) (mb * 1024 * 1024); 415 waitQueuePauseBytes = (long) (ramBufferSize * 0.1); 416 waitQueueResumeBytes = (long) (ramBufferSize * 0.05); 417 freeTrigger = (long) (1.05 * ramBufferSize); 418 freeLevel = (long) (0.95 * ramBufferSize); 419 } 420 } 421 } 422 GetRAMBufferSizeMB()423 internal double GetRAMBufferSizeMB() 424 { 425 lock (this) 426 { 427 if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) 428 { 429 return ramBufferSize; 430 } 431 else 432 { 433 return ramBufferSize / 1024.0 / 1024.0; 434 } 435 } 436 } 437 438 /// <summary>Gets or sets max buffered docs, which means we will flush by 439 /// doc count instead of by RAM usage. 440 /// </summary> 441 internal int MaxBufferedDocs 442 { 443 get { return maxBufferedDocs; } 444 set { maxBufferedDocs = value; } 445 } 446 447 /// <summary>Get current segment name we are writing. </summary> 448 internal string Segment 449 { 450 get { return segment; } 451 } 452 453 /// <summary>Returns how many docs are currently buffered in RAM. </summary> 454 internal int NumDocsInRAM 455 { 456 get { return numDocsInRAM; } 457 } 458 459 /// <summary>Returns the current doc store segment we are writing 460 /// to. 461 /// </summary> 462 internal string DocStoreSegment 463 { 464 get 465 { 466 lock (this) 467 { 468 return docStoreSegment; 469 } 470 } 471 } 472 473 /// <summary>Returns the doc offset into the shared doc store for 474 /// the current buffered docs. 475 /// </summary> 476 internal int DocStoreOffset 477 { 478 get { return docStoreOffset; } 479 } 480 481 /// <summary>Closes the current open doc stores an returns the doc 482 /// store segment name. This returns null if there are * 483 /// no buffered documents. 484 /// </summary> CloseDocStore()485 internal System.String CloseDocStore() 486 { 487 lock (this) 488 { 489 490 System.Diagnostics.Debug.Assert(AllThreadsIdle()); 491 492 if (infoStream != null) 493 Message("closeDocStore: " + openFiles.Count + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); 494 495 bool success = false; 496 497 try 498 { 499 InitFlushState(true); 500 closedFiles.Clear(); 501 502 consumer.CloseDocStore(flushState); 503 System.Diagnostics.Debug.Assert(0 == openFiles.Count); 504 505 System.String s = docStoreSegment; 506 docStoreSegment = null; 507 docStoreOffset = 0; 508 numDocsInStore = 0; 509 success = true; 510 return s; 511 } 512 finally 513 { 514 if (!success) 515 { 516 Abort(); 517 } 518 } 519 } 520 } 521 522 private ICollection<string> abortedFiles; // List of files that were written before last abort() 523 524 private SegmentWriteState flushState; 525 AbortedFiles()526 internal ICollection<string> AbortedFiles() 527 { 528 return abortedFiles; 529 } 530 Message(System.String message)531 internal void Message(System.String message) 532 { 533 if (infoStream != null) 534 writer.Message("DW: " + message); 535 } 536 537 internal IList<string> openFiles = new List<string>(); 538 internal IList<string> closedFiles = new List<string>(); 539 540 /* Returns Collection of files in use by this instance, 541 * including any flushed segments. */ OpenFiles()542 internal IList<string> OpenFiles() 543 { 544 lock (this) 545 { 546 // ToArray returns a copy 547 return openFiles.ToArray(); 548 } 549 } 550 ClosedFiles()551 internal IList<string> ClosedFiles() 552 { 553 lock (this) 554 { 555 // ToArray returns a copy 556 return closedFiles.ToArray(); 557 } 558 } 559 AddOpenFile(System.String name)560 internal void AddOpenFile(System.String name) 561 { 562 lock (this) 563 { 564 System.Diagnostics.Debug.Assert(!openFiles.Contains(name)); 565 openFiles.Add(name); 566 } 567 } 568 RemoveOpenFile(System.String name)569 internal void RemoveOpenFile(System.String name) 570 { 571 lock (this) 572 { 573 System.Diagnostics.Debug.Assert(openFiles.Contains(name)); 574 openFiles.Remove(name); 575 closedFiles.Add(name); 576 } 577 } 578 SetAborting()579 internal void SetAborting() 580 { 581 lock (this) 582 { 583 aborting = true; 584 } 585 } 586 587 /// <summary>Called if we hit an exception at a bad time (when 588 /// updating the index files) and must discard all 589 /// currently buffered docs. This resets our state, 590 /// discarding any docs added since last flush. 591 /// </summary> Abort()592 internal void Abort() 593 { 594 lock (this) 595 { 596 try 597 { 598 if (infoStream != null) 599 { 600 Message("docWriter: now abort"); 601 } 602 603 // Forcefully remove waiting ThreadStates from line 604 waitQueue.Abort(); 605 606 // Wait for all other threads to finish with 607 // DocumentsWriter: 608 PauseAllThreads(); 609 610 try 611 { 612 613 System.Diagnostics.Debug.Assert(0 == waitQueue.numWaiting); 614 615 waitQueue.waitingBytes = 0; 616 617 try 618 { 619 abortedFiles = OpenFiles(); 620 } 621 catch (System.Exception) 622 { 623 abortedFiles = null; 624 } 625 626 deletesInRAM.Clear(); 627 deletesFlushed.Clear(); 628 openFiles.Clear(); 629 630 for (int i = 0; i < threadStates.Length; i++) 631 try 632 { 633 threadStates[i].consumer.Abort(); 634 } 635 catch (System.Exception) 636 { 637 } 638 639 try 640 { 641 consumer.Abort(); 642 } 643 catch (System.Exception) 644 { 645 } 646 647 docStoreSegment = null; 648 numDocsInStore = 0; 649 docStoreOffset = 0; 650 651 // Reset all postings data 652 DoAfterFlush(); 653 } 654 finally 655 { 656 ResumeAllThreads(); 657 } 658 } 659 finally 660 { 661 aborting = false; 662 System.Threading.Monitor.PulseAll(this); 663 if (infoStream != null) 664 { 665 Message("docWriter: done abort; abortedFiles=" + abortedFiles); 666 } 667 } 668 } 669 } 670 671 /// <summary>Reset after a flush </summary> DoAfterFlush()672 private void DoAfterFlush() 673 { 674 // All ThreadStates should be idle when we are called 675 System.Diagnostics.Debug.Assert(AllThreadsIdle()); 676 threadBindings.Clear(); 677 waitQueue.Reset(); 678 segment = null; 679 numDocsInRAM = 0; 680 nextDocID = 0; 681 bufferIsFull = false; 682 flushPending = false; 683 for (int i = 0; i < threadStates.Length; i++) 684 threadStates[i].DoAfterFlush(); 685 numBytesUsed = 0; 686 } 687 688 // Returns true if an abort is in progress PauseAllThreads()689 internal bool PauseAllThreads() 690 { 691 lock (this) 692 { 693 pauseThreads++; 694 while (!AllThreadsIdle()) 695 { 696 System.Threading.Monitor.Wait(this); 697 } 698 699 return aborting; 700 } 701 } 702 ResumeAllThreads()703 internal void ResumeAllThreads() 704 { 705 lock (this) 706 { 707 pauseThreads--; 708 System.Diagnostics.Debug.Assert(pauseThreads >= 0); 709 if (0 == pauseThreads) 710 System.Threading.Monitor.PulseAll(this); 711 } 712 } 713 AllThreadsIdle()714 private bool AllThreadsIdle() 715 { 716 lock (this) 717 { 718 for (int i = 0; i < threadStates.Length; i++) 719 if (!threadStates[i].isIdle) 720 return false; 721 return true; 722 } 723 } 724 725 internal bool AnyChanges 726 { 727 get 728 { 729 lock (this) 730 { 731 return numDocsInRAM != 0 || deletesInRAM.numTerms != 0 || deletesInRAM.docIDs.Count != 0 || 732 deletesInRAM.queries.Count != 0; 733 } 734 } 735 } 736 InitFlushState(bool onlyDocStore)737 private void InitFlushState(bool onlyDocStore) 738 { 739 lock (this) 740 { 741 InitSegmentName(onlyDocStore); 742 flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.TermIndexInterval); 743 } 744 } 745 746 /// <summary>Flush all pending docs to a new segment </summary> Flush(bool closeDocStore)747 internal int Flush(bool closeDocStore) 748 { 749 lock (this) 750 { 751 752 System.Diagnostics.Debug.Assert(AllThreadsIdle()); 753 754 System.Diagnostics.Debug.Assert(numDocsInRAM > 0); 755 756 System.Diagnostics.Debug.Assert(nextDocID == numDocsInRAM); 757 System.Diagnostics.Debug.Assert(waitQueue.numWaiting == 0); 758 System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0); 759 760 InitFlushState(false); 761 762 docStoreOffset = numDocsInStore; 763 764 if (infoStream != null) 765 Message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); 766 767 bool success = false; 768 769 try 770 { 771 772 if (closeDocStore) 773 { 774 System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName != null); 775 System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName.Equals(flushState.segmentName)); 776 CloseDocStore(); 777 flushState.numDocsInStore = 0; 778 } 779 780 ICollection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>(); 781 for (int i = 0; i < threadStates.Length; i++) 782 threads.Add(threadStates[i].consumer); 783 consumer.Flush(threads, flushState); 784 785 if (infoStream != null) 786 { 787 SegmentInfo si = new SegmentInfo(flushState.segmentName, flushState.numDocs, directory); 788 long newSegmentSize = si.SizeInBytes(); 789 System.String message = System.String.Format(nf, " oldRAMSize={0:d} newFlushedSize={1:d} docs/MB={2:f} new/old={3:%}", 790 new System.Object[] { numBytesUsed, newSegmentSize, (numDocsInRAM / (newSegmentSize / 1024.0 / 1024.0)), (100.0 * newSegmentSize / numBytesUsed) }); 791 Message(message); 792 } 793 794 flushedDocCount += flushState.numDocs; 795 796 DoAfterFlush(); 797 798 success = true; 799 } 800 finally 801 { 802 if (!success) 803 { 804 Abort(); 805 } 806 } 807 808 System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0); 809 810 return flushState.numDocs; 811 } 812 } 813 GetFlushedFiles()814 internal ICollection<string> GetFlushedFiles() 815 { 816 return flushState.flushedFiles; 817 } 818 819 /// <summary>Build compound file for the segment we just flushed </summary> CreateCompoundFile(System.String segment)820 internal void CreateCompoundFile(System.String segment) 821 { 822 823 CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); 824 foreach(string flushedFile in flushState.flushedFiles) 825 { 826 cfsWriter.AddFile(flushedFile); 827 } 828 829 // Perform the merge 830 cfsWriter.Close(); 831 } 832 833 /// <summary>Set flushPending if it is not already set and returns 834 /// whether it was set. This is used by IndexWriter to 835 /// trigger a single flush even when multiple threads are 836 /// trying to do so. 837 /// </summary> SetFlushPending()838 internal bool SetFlushPending() 839 { 840 lock (this) 841 { 842 if (flushPending) 843 return false; 844 else 845 { 846 flushPending = true; 847 return true; 848 } 849 } 850 } 851 ClearFlushPending()852 internal void ClearFlushPending() 853 { 854 lock (this) 855 { 856 flushPending = false; 857 } 858 } 859 PushDeletes()860 internal void PushDeletes() 861 { 862 lock (this) 863 { 864 deletesFlushed.Update(deletesInRAM); 865 } 866 } 867 Dispose()868 public void Dispose() 869 { 870 // Move to protected method if class becomes unsealed 871 lock (this) 872 { 873 closed = true; 874 System.Threading.Monitor.PulseAll(this); 875 } 876 } 877 InitSegmentName(bool onlyDocStore)878 internal void InitSegmentName(bool onlyDocStore) 879 { 880 lock (this) 881 { 882 if (segment == null && (!onlyDocStore || docStoreSegment == null)) 883 { 884 segment = writer.NewSegmentName(); 885 System.Diagnostics.Debug.Assert(numDocsInRAM == 0); 886 } 887 if (docStoreSegment == null) 888 { 889 docStoreSegment = segment; 890 System.Diagnostics.Debug.Assert(numDocsInStore == 0); 891 } 892 } 893 } 894 895 /// <summary>Returns a free (idle) ThreadState that may be used for 896 /// indexing this one document. This call also pauses if a 897 /// flush is pending. If delTerm is non-null then we 898 /// buffer this deleted term after the thread state has 899 /// been acquired. 900 /// </summary> GetThreadState(Document doc, Term delTerm)901 internal DocumentsWriterThreadState GetThreadState(Document doc, Term delTerm) 902 { 903 lock (this) 904 { 905 906 // First, find a thread state. If this thread already 907 // has affinity to a specific ThreadState, use that one 908 // again. 909 DocumentsWriterThreadState state = threadBindings[ThreadClass.Current()]; 910 if (state == null) 911 { 912 913 // First time this thread has called us since last 914 // flush. Find the least loaded thread state: 915 DocumentsWriterThreadState minThreadState = null; 916 for (int i = 0; i < threadStates.Length; i++) 917 { 918 DocumentsWriterThreadState ts = threadStates[i]; 919 if (minThreadState == null || ts.numThreads < minThreadState.numThreads) 920 minThreadState = ts; 921 } 922 if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.Length >= MAX_THREAD_STATE)) 923 { 924 state = minThreadState; 925 state.numThreads++; 926 } 927 else 928 { 929 // Just create a new "private" thread state 930 DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1 + threadStates.Length]; 931 if (threadStates.Length > 0) 932 Array.Copy(threadStates, 0, newArray, 0, threadStates.Length); 933 state = newArray[threadStates.Length] = new DocumentsWriterThreadState(this); 934 threadStates = newArray; 935 } 936 threadBindings[ThreadClass.Current()] = state; 937 } 938 939 // Next, wait until my thread state is idle (in case 940 // it's shared with other threads) and for threads to 941 // not be paused nor a flush pending: 942 WaitReady(state); 943 944 // Allocate segment name if this is the first doc since 945 // last flush: 946 InitSegmentName(false); 947 948 state.isIdle = false; 949 950 bool success = false; 951 try 952 { 953 state.docState.docID = nextDocID; 954 955 System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init start")); 956 957 if (delTerm != null) 958 { 959 AddDeleteTerm(delTerm, state.docState.docID); 960 state.doFlushAfter = TimeToFlushDeletes(); 961 } 962 963 System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init after delTerm")); 964 965 nextDocID++; 966 numDocsInRAM++; 967 968 // We must at this point commit to flushing to ensure we 969 // always get N docs when we flush by doc count, even if 970 // > 1 thread is adding documents: 971 if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs) 972 { 973 flushPending = true; 974 state.doFlushAfter = true; 975 } 976 977 success = true; 978 } 979 finally 980 { 981 if (!success) 982 { 983 // Forcefully idle this ThreadState: 984 state.isIdle = true; 985 System.Threading.Monitor.PulseAll(this); 986 if (state.doFlushAfter) 987 { 988 state.doFlushAfter = false; 989 flushPending = false; 990 } 991 } 992 } 993 994 return state; 995 } 996 } 997 998 /// <summary>Returns true if the caller (IndexWriter) should now 999 /// flush. 1000 /// </summary> AddDocument(Document doc, Analyzer analyzer)1001 internal bool AddDocument(Document doc, Analyzer analyzer) 1002 { 1003 return UpdateDocument(doc, analyzer, null); 1004 } 1005 UpdateDocument(Term t, Document doc, Analyzer analyzer)1006 internal bool UpdateDocument(Term t, Document doc, Analyzer analyzer) 1007 { 1008 return UpdateDocument(doc, analyzer, t); 1009 } 1010 UpdateDocument(Document doc, Analyzer analyzer, Term delTerm)1011 internal bool UpdateDocument(Document doc, Analyzer analyzer, Term delTerm) 1012 { 1013 1014 // This call is synchronized but fast 1015 DocumentsWriterThreadState state = GetThreadState(doc, delTerm); 1016 1017 DocState docState = state.docState; 1018 docState.doc = doc; 1019 docState.analyzer = analyzer; 1020 1021 bool doReturnFalse = false; // {{Aroush-2.9}} to handle return from finally clause 1022 1023 bool success = false; 1024 try 1025 { 1026 // This call is not synchronized and does all the 1027 // work 1028 DocWriter perDoc; 1029 try 1030 { 1031 perDoc = state.consumer.ProcessDocument(); 1032 } 1033 finally 1034 { 1035 docState.Clear(); 1036 } 1037 // This call is synchronized but fast 1038 FinishDocument(state, perDoc); 1039 success = true; 1040 } 1041 finally 1042 { 1043 if (!success) 1044 { 1045 lock (this) 1046 { 1047 1048 if (aborting) 1049 { 1050 state.isIdle = true; 1051 System.Threading.Monitor.PulseAll(this); 1052 Abort(); 1053 } 1054 else 1055 { 1056 skipDocWriter.docID = docState.docID; 1057 bool success2 = false; 1058 try 1059 { 1060 waitQueue.Add(skipDocWriter); 1061 success2 = true; 1062 } 1063 finally 1064 { 1065 if (!success2) 1066 { 1067 state.isIdle = true; 1068 System.Threading.Monitor.PulseAll(this); 1069 Abort(); 1070 // return false; // {{Aroush-2.9}} this 'return false' is move to outside finally 1071 doReturnFalse = true; 1072 } 1073 } 1074 1075 if (!doReturnFalse) // {{Aroush-2.9}} added because of the above 'return false' removal 1076 { 1077 state.isIdle = true; 1078 System.Threading.Monitor.PulseAll(this); 1079 1080 // If this thread state had decided to flush, we 1081 // must clear it so another thread can flush 1082 if (state.doFlushAfter) 1083 { 1084 state.doFlushAfter = false; 1085 flushPending = false; 1086 System.Threading.Monitor.PulseAll(this); 1087 } 1088 1089 // Immediately mark this document as deleted 1090 // since likely it was partially added. This 1091 // keeps indexing as "all or none" (atomic) when 1092 // adding a document: 1093 AddDeleteDocID(state.docState.docID); 1094 } 1095 } 1096 } 1097 } 1098 } 1099 1100 if (doReturnFalse) // {{Aroush-2.9}} see comment abouve 1101 { 1102 return false; 1103 } 1104 1105 return state.doFlushAfter || TimeToFlushDeletes(); 1106 } 1107 1108 // for testing GetNumBufferedDeleteTerms()1109 internal int GetNumBufferedDeleteTerms() 1110 { 1111 lock (this) 1112 { 1113 return deletesInRAM.numTerms; 1114 } 1115 } 1116 1117 // for testing GetBufferedDeleteTerms()1118 internal IDictionary<Term, BufferedDeletes.Num> GetBufferedDeleteTerms() 1119 { 1120 lock (this) 1121 { 1122 return deletesInRAM.terms; 1123 } 1124 } 1125 1126 /// <summary>Called whenever a merge has completed and the merged segments had deletions </summary> RemapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount)1127 internal void RemapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) 1128 { 1129 lock (this) 1130 { 1131 if (docMaps == null) 1132 // The merged segments had no deletes so docIDs did not change and we have nothing to do 1133 return ; 1134 MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); 1135 deletesInRAM.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); 1136 deletesFlushed.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); 1137 flushedDocCount -= mapper.docShift; 1138 } 1139 } 1140 WaitReady(DocumentsWriterThreadState state)1141 private void WaitReady(DocumentsWriterThreadState state) 1142 { 1143 lock (this) 1144 { 1145 1146 while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) 1147 { 1148 System.Threading.Monitor.Wait(this); 1149 } 1150 1151 if (closed) 1152 throw new AlreadyClosedException("this IndexWriter is closed"); 1153 } 1154 } 1155 BufferDeleteTerms(Term[] terms)1156 internal bool BufferDeleteTerms(Term[] terms) 1157 { 1158 lock (this) 1159 { 1160 WaitReady(null); 1161 for (int i = 0; i < terms.Length; i++) 1162 AddDeleteTerm(terms[i], numDocsInRAM); 1163 return TimeToFlushDeletes(); 1164 } 1165 } 1166 BufferDeleteTerm(Term term)1167 internal bool BufferDeleteTerm(Term term) 1168 { 1169 lock (this) 1170 { 1171 WaitReady(null); 1172 AddDeleteTerm(term, numDocsInRAM); 1173 return TimeToFlushDeletes(); 1174 } 1175 } 1176 BufferDeleteQueries(Query[] queries)1177 internal bool BufferDeleteQueries(Query[] queries) 1178 { 1179 lock (this) 1180 { 1181 WaitReady(null); 1182 for (int i = 0; i < queries.Length; i++) 1183 AddDeleteQuery(queries[i], numDocsInRAM); 1184 return TimeToFlushDeletes(); 1185 } 1186 } 1187 BufferDeleteQuery(Query query)1188 internal bool BufferDeleteQuery(Query query) 1189 { 1190 lock (this) 1191 { 1192 WaitReady(null); 1193 AddDeleteQuery(query, numDocsInRAM); 1194 return TimeToFlushDeletes(); 1195 } 1196 } 1197 DeletesFull()1198 internal bool DeletesFull() 1199 { 1200 lock (this) 1201 { 1202 return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms)); 1203 } 1204 } 1205 DoApplyDeletes()1206 internal bool DoApplyDeletes() 1207 { 1208 lock (this) 1209 { 1210 // Very similar to deletesFull(), except we don't count 1211 // numBytesAlloc, because we are checking whether 1212 // deletes (alone) are consuming too many resources now 1213 // and thus should be applied. We apply deletes if RAM 1214 // usage is > 1/2 of our allowed RAM buffer, to prevent 1215 // too-frequent flushing of a long tail of tiny segments 1216 // when merges (which always apply deletes) are 1217 // infrequent. 1218 return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize / 2) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms)); 1219 } 1220 } 1221 TimeToFlushDeletes()1222 private bool TimeToFlushDeletes() 1223 { 1224 lock (this) 1225 { 1226 return (bufferIsFull || DeletesFull()) && SetFlushPending(); 1227 } 1228 } 1229 1230 internal int MaxBufferedDeleteTerms 1231 { 1232 set { this.maxBufferedDeleteTerms = value; } 1233 get { return maxBufferedDeleteTerms; } 1234 } 1235 HasDeletes()1236 internal bool HasDeletes() 1237 { 1238 lock (this) 1239 { 1240 return deletesFlushed.Any(); 1241 } 1242 } 1243 ApplyDeletes(SegmentInfos infos)1244 internal bool ApplyDeletes(SegmentInfos infos) 1245 { 1246 lock (this) 1247 { 1248 if (!HasDeletes()) 1249 return false; 1250 1251 if (infoStream != null) 1252 Message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + deletesFlushed.docIDs.Count + " deleted docIDs and " + deletesFlushed.queries.Count + " deleted queries on " + (+ infos.Count) + " segments."); 1253 1254 int infosEnd = infos.Count; 1255 1256 int docStart = 0; 1257 bool any = false; 1258 for (int i = 0; i < infosEnd; i++) 1259 { 1260 1261 // Make sure we never attempt to apply deletes to 1262 // segment in external dir 1263 System.Diagnostics.Debug.Assert(infos.Info(i).dir == directory); 1264 1265 SegmentReader reader = writer.readerPool.Get(infos.Info(i), false); 1266 try 1267 { 1268 any |= ApplyDeletes(reader, docStart); 1269 docStart += reader.MaxDoc; 1270 } 1271 finally 1272 { 1273 writer.readerPool.Release(reader); 1274 } 1275 } 1276 1277 deletesFlushed.Clear(); 1278 1279 return any; 1280 } 1281 } 1282 1283 // used only by assert 1284 private Term lastDeleteTerm; 1285 1286 // used only by assert CheckDeleteTerm(Term term)1287 private bool CheckDeleteTerm(Term term) 1288 { 1289 if (term != null) { 1290 System.Diagnostics.Debug.Assert(lastDeleteTerm == null || term.CompareTo(lastDeleteTerm) > 0, "lastTerm=" + lastDeleteTerm + " vs term=" + term); 1291 } 1292 lastDeleteTerm = term; 1293 return true; 1294 } 1295 1296 // Apply buffered delete terms, queries and docIDs to the 1297 // provided reader ApplyDeletes(IndexReader reader, int docIDStart)1298 private bool ApplyDeletes(IndexReader reader, int docIDStart) 1299 { 1300 lock (this) 1301 { 1302 int docEnd = docIDStart + reader.MaxDoc; 1303 bool any = false; 1304 1305 System.Diagnostics.Debug.Assert(CheckDeleteTerm(null)); 1306 1307 // Delete by term 1308 TermDocs docs = reader.TermDocs(); 1309 try 1310 { 1311 foreach(KeyValuePair<Term, BufferedDeletes.Num> entry in deletesFlushed.terms) 1312 { 1313 Term term = entry.Key; 1314 // LUCENE-2086: we should be iterating a TreeMap, 1315 // here, so terms better be in order: 1316 System.Diagnostics.Debug.Assert(CheckDeleteTerm(term)); 1317 docs.Seek(term); 1318 int limit = entry.Value.GetNum(); 1319 while (docs.Next()) 1320 { 1321 int docID = docs.Doc; 1322 if (docIDStart + docID >= limit) 1323 break; 1324 reader.DeleteDocument(docID); 1325 any = true; 1326 } 1327 } 1328 } 1329 finally 1330 { 1331 docs.Close(); 1332 } 1333 1334 // Delete by docID 1335 foreach(int docIdInt in deletesFlushed.docIDs) 1336 { 1337 int docID = docIdInt; 1338 if (docID >= docIDStart && docID < docEnd) 1339 { 1340 reader.DeleteDocument(docID - docIDStart); 1341 any = true; 1342 } 1343 } 1344 1345 // Delete by query 1346 IndexSearcher searcher = new IndexSearcher(reader); 1347 foreach(KeyValuePair<Query, int> entry in deletesFlushed.queries) 1348 { 1349 Query query = (Query) entry.Key; 1350 int limit = (int)entry.Value; 1351 Weight weight = query.Weight(searcher); 1352 Scorer scorer = weight.Scorer(reader, true, false); 1353 if (scorer != null) 1354 { 1355 while (true) 1356 { 1357 int doc = scorer.NextDoc(); 1358 if (((long) docIDStart) + doc >= limit) 1359 break; 1360 reader.DeleteDocument(doc); 1361 any = true; 1362 } 1363 } 1364 } 1365 searcher.Close(); 1366 return any; 1367 } 1368 } 1369 1370 // Buffer a term in bufferedDeleteTerms, which records the 1371 // current number of documents buffered in ram so that the 1372 // delete term will be applied to those documents as well 1373 // as the disk segments. AddDeleteTerm(Term term, int docCount)1374 private void AddDeleteTerm(Term term, int docCount) 1375 { 1376 lock (this) 1377 { 1378 BufferedDeletes.Num num = deletesInRAM.terms[term]; 1379 int docIDUpto = flushedDocCount + docCount; 1380 if (num == null) 1381 deletesInRAM.terms[term] = new BufferedDeletes.Num(docIDUpto); 1382 else 1383 num.SetNum(docIDUpto); 1384 deletesInRAM.numTerms++; 1385 1386 deletesInRAM.AddBytesUsed(BYTES_PER_DEL_TERM + term.Text.Length * CHAR_NUM_BYTE); 1387 } 1388 } 1389 1390 // Buffer a specific docID for deletion. Currently only 1391 // used when we hit a exception when adding a document AddDeleteDocID(int docID)1392 private void AddDeleteDocID(int docID) 1393 { 1394 lock (this) 1395 { 1396 deletesInRAM.docIDs.Add(flushedDocCount + docID); 1397 deletesInRAM.AddBytesUsed(BYTES_PER_DEL_DOCID); 1398 } 1399 } 1400 AddDeleteQuery(Query query, int docID)1401 private void AddDeleteQuery(Query query, int docID) 1402 { 1403 lock (this) 1404 { 1405 deletesInRAM.queries[query] = flushedDocCount + docID; 1406 deletesInRAM.AddBytesUsed(BYTES_PER_DEL_QUERY); 1407 } 1408 } 1409 DoBalanceRAM()1410 internal bool DoBalanceRAM() 1411 { 1412 lock (this) 1413 { 1414 return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger); 1415 } 1416 } 1417 1418 /// <summary>Does the synchronized work to finish/flush the 1419 /// inverted document. 1420 /// </summary> FinishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)1421 private void FinishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) 1422 { 1423 1424 if (DoBalanceRAM()) 1425 // Must call this w/o holding synchronized(this) else 1426 // we'll hit deadlock: 1427 BalanceRAM(); 1428 1429 lock (this) 1430 { 1431 1432 System.Diagnostics.Debug.Assert(docWriter == null || docWriter.docID == perThread.docState.docID); 1433 1434 if (aborting) 1435 { 1436 1437 // We are currently aborting, and another thread is 1438 // waiting for me to become idle. We just forcefully 1439 // idle this threadState; it will be fully reset by 1440 // abort() 1441 if (docWriter != null) 1442 try 1443 { 1444 docWriter.Abort(); 1445 } 1446 catch (System.Exception) 1447 { 1448 } 1449 1450 perThread.isIdle = true; 1451 System.Threading.Monitor.PulseAll(this); 1452 return ; 1453 } 1454 1455 bool doPause; 1456 1457 if (docWriter != null) 1458 doPause = waitQueue.Add(docWriter); 1459 else 1460 { 1461 skipDocWriter.docID = perThread.docState.docID; 1462 doPause = waitQueue.Add(skipDocWriter); 1463 } 1464 1465 if (doPause) 1466 WaitForWaitQueue(); 1467 1468 if (bufferIsFull && !flushPending) 1469 { 1470 flushPending = true; 1471 perThread.doFlushAfter = true; 1472 } 1473 1474 perThread.isIdle = true; 1475 System.Threading.Monitor.PulseAll(this); 1476 } 1477 } 1478 WaitForWaitQueue()1479 internal void WaitForWaitQueue() 1480 { 1481 lock (this) 1482 { 1483 do 1484 { 1485 System.Threading.Monitor.Wait(this); 1486 } 1487 while (!waitQueue.DoResume()); 1488 } 1489 } 1490 1491 internal class SkipDocWriter:DocWriter 1492 { Finish()1493 public override void Finish() 1494 { 1495 } Abort()1496 public override void Abort() 1497 { 1498 } SizeInBytes()1499 public override long SizeInBytes() 1500 { 1501 return 0; 1502 } 1503 } 1504 internal SkipDocWriter skipDocWriter; 1505 GetRAMUsed()1506 internal long GetRAMUsed() 1507 { 1508 return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; 1509 } 1510 1511 internal long numBytesAlloc; 1512 internal long numBytesUsed; 1513 1514 internal System.Globalization.NumberFormatInfo nf = System.Globalization.CultureInfo.CurrentCulture.NumberFormat; 1515 1516 // Coarse estimates used to measure RAM usage of buffered deletes 1517 internal const int OBJECT_HEADER_BYTES = 8; 1518 internal static readonly int POINTER_NUM_BYTE; 1519 internal const int INT_NUM_BYTE = 4; 1520 internal const int CHAR_NUM_BYTE = 2; 1521 1522 /* Rough logic: HashMap has an array[Entry] w/ varying 1523 load factor (say 2 * POINTER). Entry is object w/ Term 1524 key, BufferedDeletes.Num val, int hash, Entry next 1525 (OBJ_HEADER + 3*POINTER + INT). Term is object w/ 1526 String field and String text (OBJ_HEADER + 2*POINTER). 1527 We don't count Term's field since it's interned. 1528 Term's text is String (OBJ_HEADER + 4*INT + POINTER + 1529 OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is 1530 OBJ_HEADER + INT. */ 1531 1532 internal static readonly int BYTES_PER_DEL_TERM = 8 * POINTER_NUM_BYTE + 5 * OBJECT_HEADER_BYTES + 6 * INT_NUM_BYTE; 1533 1534 /* Rough logic: del docIDs are List<Integer>. Say list 1535 allocates ~2X size (2*POINTER). Integer is OBJ_HEADER 1536 + int */ 1537 internal static readonly int BYTES_PER_DEL_DOCID = 2 * POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE; 1538 1539 /* Rough logic: HashMap has an array[Entry] w/ varying 1540 load factor (say 2 * POINTER). Entry is object w/ 1541 Query key, Integer val, int hash, Entry next 1542 (OBJ_HEADER + 3*POINTER + INT). Query we often 1543 undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ 1544 internal static readonly int BYTES_PER_DEL_QUERY = 5 * POINTER_NUM_BYTE + 2 * OBJECT_HEADER_BYTES + 2 * INT_NUM_BYTE + 24; 1545 1546 /* Initial chunks size of the shared byte[] blocks used to 1547 store postings data */ 1548 internal const int BYTE_BLOCK_SHIFT = 15; 1549 internal static readonly int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT; 1550 internal static readonly int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1; 1551 internal static readonly int BYTE_BLOCK_NOT_MASK = ~ BYTE_BLOCK_MASK; 1552 1553 internal class ByteBlockAllocator : ByteBlockPool.Allocator 1554 { ByteBlockAllocator(DocumentsWriter enclosingInstance, int blockSize)1555 public ByteBlockAllocator(DocumentsWriter enclosingInstance, int blockSize) 1556 { 1557 this.blockSize = blockSize; 1558 InitBlock(enclosingInstance); 1559 } InitBlock(DocumentsWriter enclosingInstance)1560 private void InitBlock(DocumentsWriter enclosingInstance) 1561 { 1562 this.enclosingInstance = enclosingInstance; 1563 } 1564 private DocumentsWriter enclosingInstance; 1565 public DocumentsWriter Enclosing_Instance 1566 { 1567 get 1568 { 1569 return enclosingInstance; 1570 } 1571 1572 } 1573 1574 int blockSize; 1575 internal List<byte[]> freeByteBlocks = new List<byte[]>(); 1576 1577 /* Allocate another byte[] from the shared pool */ GetByteBlock(bool trackAllocations)1578 public /*internal*/ override byte[] GetByteBlock(bool trackAllocations) 1579 { 1580 lock (Enclosing_Instance) 1581 { 1582 int size = freeByteBlocks.Count; 1583 byte[] b; 1584 if (0 == size) 1585 { 1586 // Always record a block allocated, even if 1587 // trackAllocations is false. This is necessary 1588 // because this block will be shared between 1589 // things that don't track allocations (term 1590 // vectors) and things that do (freq/prox 1591 // postings). 1592 Enclosing_Instance.numBytesAlloc += blockSize; 1593 b = new byte[blockSize]; 1594 } 1595 else 1596 { 1597 b = freeByteBlocks[size - 1]; 1598 freeByteBlocks.RemoveAt(size - 1); 1599 } 1600 if (trackAllocations) 1601 Enclosing_Instance.numBytesUsed += blockSize; 1602 System.Diagnostics.Debug.Assert(Enclosing_Instance.numBytesUsed <= Enclosing_Instance.numBytesAlloc); 1603 return b; 1604 } 1605 } 1606 1607 /* Return byte[]'s to the pool */ RecycleByteBlocks(byte[][] blocks, int start, int end)1608 public /*internal*/ override void RecycleByteBlocks(byte[][] blocks, int start, int end) 1609 { 1610 lock (Enclosing_Instance) 1611 { 1612 for (int i = start; i < end; i++) 1613 { 1614 freeByteBlocks.Add(blocks[i]); 1615 blocks[i] = null; 1616 } 1617 } 1618 } 1619 RecycleByteBlocks(IList<byte[]> blocks)1620 public /*internal*/ override void RecycleByteBlocks(IList<byte[]> blocks) 1621 { 1622 lock (Enclosing_Instance) 1623 { 1624 int size = blocks.Count; 1625 for(int i=0;i<size;i++) 1626 freeByteBlocks.Add(blocks[i]); 1627 } 1628 } 1629 } 1630 1631 /* Initial chunks size of the shared int[] blocks used to 1632 store postings data */ 1633 internal const int INT_BLOCK_SHIFT = 13; 1634 internal static readonly int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT; 1635 internal static readonly int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1; 1636 1637 private List<int[]> freeIntBlocks = new List<int[]>(); 1638 1639 /* Allocate another int[] from the shared pool */ GetIntBlock(bool trackAllocations)1640 internal int[] GetIntBlock(bool trackAllocations) 1641 { 1642 lock (this) 1643 { 1644 int size = freeIntBlocks.Count; 1645 int[] b; 1646 if (0 == size) 1647 { 1648 // Always record a block allocated, even if 1649 // trackAllocations is false. This is necessary 1650 // because this block will be shared between 1651 // things that don't track allocations (term 1652 // vectors) and things that do (freq/prox 1653 // postings). 1654 numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE; 1655 b = new int[INT_BLOCK_SIZE]; 1656 } 1657 else 1658 { 1659 b = freeIntBlocks[size - 1]; 1660 freeIntBlocks.RemoveAt(size - 1); 1661 } 1662 if (trackAllocations) 1663 numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE; 1664 System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); 1665 return b; 1666 } 1667 } 1668 BytesAllocated(long numBytes)1669 internal void BytesAllocated(long numBytes) 1670 { 1671 lock (this) 1672 { 1673 numBytesAlloc += numBytes; 1674 } 1675 } 1676 BytesUsed(long numBytes)1677 internal void BytesUsed(long numBytes) 1678 { 1679 lock (this) 1680 { 1681 numBytesUsed += numBytes; 1682 System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); 1683 } 1684 } 1685 1686 /* Return int[]s to the pool */ RecycleIntBlocks(int[][] blocks, int start, int end)1687 internal void RecycleIntBlocks(int[][] blocks, int start, int end) 1688 { 1689 lock (this) 1690 { 1691 for (int i = start; i < end; i++) 1692 { 1693 freeIntBlocks.Add(blocks[i]); 1694 blocks[i] = null; 1695 } 1696 } 1697 } 1698 1699 internal ByteBlockAllocator byteBlockAllocator; 1700 1701 internal static int PER_DOC_BLOCK_SIZE = 1024; 1702 1703 ByteBlockAllocator perDocAllocator; 1704 1705 /* Initial chunk size of the shared char[] blocks used to 1706 store term text */ 1707 internal const int CHAR_BLOCK_SHIFT = 14; 1708 internal static readonly int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT; 1709 internal static readonly int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1; 1710 1711 internal static readonly int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1; 1712 1713 private List<char[]> freeCharBlocks = new List<char[]>(); 1714 1715 /* Allocate another char[] from the shared pool */ GetCharBlock()1716 internal char[] GetCharBlock() 1717 { 1718 lock (this) 1719 { 1720 int size = freeCharBlocks.Count; 1721 char[] c; 1722 if (0 == size) 1723 { 1724 numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; 1725 c = new char[CHAR_BLOCK_SIZE]; 1726 } 1727 else 1728 { 1729 c = freeCharBlocks[size - 1]; 1730 freeCharBlocks.RemoveAt(size - 1); 1731 } 1732 // We always track allocations of char blocks, for now, 1733 // because nothing that skips allocation tracking 1734 // (currently only term vectors) uses its own char 1735 // blocks. 1736 numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; 1737 System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); 1738 return c; 1739 } 1740 } 1741 1742 /* Return char[]s to the pool */ RecycleCharBlocks(char[][] blocks, int numBlocks)1743 internal void RecycleCharBlocks(char[][] blocks, int numBlocks) 1744 { 1745 lock (this) 1746 { 1747 for (int i = 0; i < numBlocks; i++) 1748 { 1749 freeCharBlocks.Add(blocks[i]); 1750 blocks[i] = null; 1751 } 1752 } 1753 } 1754 ToMB(long v)1755 internal System.String ToMB(long v) 1756 { 1757 return System.String.Format(nf, "{0:f}", new System.Object[] { (v / 1024F / 1024F) }); 1758 } 1759 1760 1761 /* We have four pools of RAM: Postings, byte blocks 1762 * (holds freq/prox posting data), char blocks (holds 1763 * characters in the term) and per-doc buffers (stored fields/term vectors). 1764 * Different docs require varying amount of storage from 1765 * these four classes. 1766 * 1767 * For example, docs with many unique single-occurrence 1768 * short terms will use up the Postings RAM and hardly any 1769 * of the other two. Whereas docs with very large terms 1770 * will use alot of char blocks RAM and relatively less of 1771 * the other two. This method just frees allocations from 1772 * the pools once we are over-budget, which balances the 1773 * pools to match the current docs. */ BalanceRAM()1774 internal void BalanceRAM() 1775 { 1776 1777 // We flush when we've used our target usage 1778 long flushTrigger = ramBufferSize; 1779 1780 long deletesRAMUsed = deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; 1781 1782 if (numBytesAlloc + deletesRAMUsed > freeTrigger) 1783 { 1784 1785 if (infoStream != null) 1786 Message( 1787 " RAM: now balance allocations: usedMB=" + ToMB(numBytesUsed) + 1788 " vs trigger=" + ToMB(flushTrigger) + 1789 " allocMB=" + ToMB(numBytesAlloc) + 1790 " deletesMB=" + ToMB(deletesRAMUsed) + 1791 " vs trigger=" + ToMB(freeTrigger) + 1792 " byteBlockFree=" + ToMB(byteBlockAllocator.freeByteBlocks.Count * BYTE_BLOCK_SIZE) + 1793 " perDocFree=" + ToMB(perDocAllocator.freeByteBlocks.Count * PER_DOC_BLOCK_SIZE) + 1794 " charBlockFree=" + ToMB(freeCharBlocks.Count * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE)); 1795 1796 long startBytesAlloc = numBytesAlloc + deletesRAMUsed; 1797 1798 int iter = 0; 1799 1800 // We free equally from each pool in 32 KB 1801 // chunks until we are below our threshold 1802 // (freeLevel) 1803 1804 bool any = true; 1805 1806 while (numBytesAlloc + deletesRAMUsed > freeLevel) 1807 { 1808 1809 lock (this) 1810 { 1811 if (0 == perDocAllocator.freeByteBlocks.Count 1812 && 0 == byteBlockAllocator.freeByteBlocks.Count 1813 && 0 == freeCharBlocks.Count 1814 && 0 == freeIntBlocks.Count 1815 && !any) 1816 { 1817 // Nothing else to free -- must flush now. 1818 bufferIsFull = numBytesUsed + deletesRAMUsed > flushTrigger; 1819 if (infoStream != null) 1820 { 1821 if (bufferIsFull) 1822 Message(" nothing to free; now set bufferIsFull"); 1823 else 1824 Message(" nothing to free"); 1825 } 1826 System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); 1827 break; 1828 } 1829 1830 if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.Count > 0) 1831 { 1832 byteBlockAllocator.freeByteBlocks.RemoveAt(byteBlockAllocator.freeByteBlocks.Count - 1); 1833 numBytesAlloc -= BYTE_BLOCK_SIZE; 1834 } 1835 1836 if ((1 == iter % 5) && freeCharBlocks.Count > 0) 1837 { 1838 freeCharBlocks.RemoveAt(freeCharBlocks.Count - 1); 1839 numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; 1840 } 1841 1842 if ((2 == iter % 5) && freeIntBlocks.Count > 0) 1843 { 1844 freeIntBlocks.RemoveAt(freeIntBlocks.Count - 1); 1845 numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; 1846 } 1847 1848 if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.Count > 0) 1849 { 1850 // Remove upwards of 32 blocks (each block is 1K) 1851 for (int i = 0; i < 32; ++i) 1852 { 1853 perDocAllocator.freeByteBlocks.RemoveAt(perDocAllocator.freeByteBlocks.Count - 1); 1854 numBytesAlloc -= PER_DOC_BLOCK_SIZE; 1855 if (perDocAllocator.freeByteBlocks.Count == 0) 1856 { 1857 break; 1858 } 1859 } 1860 } 1861 } 1862 1863 if ((4 == iter % 5) && any) 1864 // Ask consumer to free any recycled state 1865 any = consumer.FreeRAM(); 1866 1867 iter++; 1868 } 1869 1870 if (infoStream != null) 1871 Message(System.String.Format(nf, " after free: freedMB={0:f} usedMB={1:f} allocMB={2:f}", 1872 new System.Object[] { ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0), (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0) })); 1873 } 1874 else 1875 { 1876 // If we have not crossed the 100% mark, but have 1877 // crossed the 95% mark of RAM we are actually 1878 // using, go ahead and flush. This prevents 1879 // over-allocating and then freeing, with every 1880 // flush. 1881 lock (this) 1882 { 1883 1884 if (numBytesUsed + deletesRAMUsed > flushTrigger) 1885 { 1886 if (infoStream != null) 1887 Message(System.String.Format(nf, " RAM: now flush @ usedMB={0:f} allocMB={1:f} triggerMB={2:f}", 1888 new object[] { (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0), (flushTrigger / 1024.0 / 1024.0) })); 1889 1890 bufferIsFull = true; 1891 } 1892 } 1893 } 1894 } 1895 1896 internal WaitQueue waitQueue; 1897 1898 internal class WaitQueue 1899 { InitBlock(DocumentsWriter enclosingInstance)1900 private void InitBlock(DocumentsWriter enclosingInstance) 1901 { 1902 this.enclosingInstance = enclosingInstance; 1903 } 1904 private DocumentsWriter enclosingInstance; 1905 public DocumentsWriter Enclosing_Instance 1906 { 1907 get 1908 { 1909 return enclosingInstance; 1910 } 1911 1912 } 1913 internal DocWriter[] waiting; 1914 internal int nextWriteDocID; 1915 internal int nextWriteLoc; 1916 internal int numWaiting; 1917 internal long waitingBytes; 1918 WaitQueue(DocumentsWriter enclosingInstance)1919 public WaitQueue(DocumentsWriter enclosingInstance) 1920 { 1921 InitBlock(enclosingInstance); 1922 waiting = new DocWriter[10]; 1923 } 1924 Reset()1925 internal void Reset() 1926 { 1927 lock (this) 1928 { 1929 // NOTE: nextWriteLoc doesn't need to be reset 1930 System.Diagnostics.Debug.Assert(numWaiting == 0); 1931 System.Diagnostics.Debug.Assert(waitingBytes == 0); 1932 nextWriteDocID = 0; 1933 } 1934 } 1935 DoResume()1936 internal bool DoResume() 1937 { 1938 lock (this) 1939 { 1940 return waitingBytes <= Enclosing_Instance.waitQueueResumeBytes; 1941 } 1942 } 1943 DoPause()1944 internal bool DoPause() 1945 { 1946 lock (this) 1947 { 1948 return waitingBytes > Enclosing_Instance.waitQueuePauseBytes; 1949 } 1950 } 1951 Abort()1952 internal void Abort() 1953 { 1954 lock (this) 1955 { 1956 int count = 0; 1957 for (int i = 0; i < waiting.Length; i++) 1958 { 1959 DocWriter doc = waiting[i]; 1960 if (doc != null) 1961 { 1962 doc.Abort(); 1963 waiting[i] = null; 1964 count++; 1965 } 1966 } 1967 waitingBytes = 0; 1968 System.Diagnostics.Debug.Assert(count == numWaiting); 1969 numWaiting = 0; 1970 } 1971 } 1972 WriteDocument(DocWriter doc)1973 private void WriteDocument(DocWriter doc) 1974 { 1975 System.Diagnostics.Debug.Assert(doc == Enclosing_Instance.skipDocWriter || nextWriteDocID == doc.docID); 1976 bool success = false; 1977 try 1978 { 1979 doc.Finish(); 1980 nextWriteDocID++; 1981 Enclosing_Instance.numDocsInStore++; 1982 nextWriteLoc++; 1983 System.Diagnostics.Debug.Assert(nextWriteLoc <= waiting.Length); 1984 if (nextWriteLoc == waiting.Length) 1985 nextWriteLoc = 0; 1986 success = true; 1987 } 1988 finally 1989 { 1990 if (!success) 1991 Enclosing_Instance.SetAborting(); 1992 } 1993 } 1994 Add(DocWriter doc)1995 public bool Add(DocWriter doc) 1996 { 1997 lock (this) 1998 { 1999 2000 System.Diagnostics.Debug.Assert(doc.docID >= nextWriteDocID); 2001 2002 if (doc.docID == nextWriteDocID) 2003 { 2004 WriteDocument(doc); 2005 while (true) 2006 { 2007 doc = waiting[nextWriteLoc]; 2008 if (doc != null) 2009 { 2010 numWaiting--; 2011 waiting[nextWriteLoc] = null; 2012 waitingBytes -= doc.SizeInBytes(); 2013 WriteDocument(doc); 2014 } 2015 else 2016 break; 2017 } 2018 } 2019 else 2020 { 2021 2022 // I finished before documents that were added 2023 // before me. This can easily happen when I am a 2024 // small doc and the docs before me were large, or, 2025 // just due to luck in the thread scheduling. Just 2026 // add myself to the queue and when that large doc 2027 // finishes, it will flush me: 2028 int gap = doc.docID - nextWriteDocID; 2029 if (gap >= waiting.Length) 2030 { 2031 // Grow queue 2032 DocWriter[] newArray = new DocWriter[ArrayUtil.GetNextSize(gap)]; 2033 System.Diagnostics.Debug.Assert(nextWriteLoc >= 0); 2034 Array.Copy(waiting, nextWriteLoc, newArray, 0, waiting.Length - nextWriteLoc); 2035 Array.Copy(waiting, 0, newArray, waiting.Length - nextWriteLoc, nextWriteLoc); 2036 nextWriteLoc = 0; 2037 waiting = newArray; 2038 gap = doc.docID - nextWriteDocID; 2039 } 2040 2041 int loc = nextWriteLoc + gap; 2042 if (loc >= waiting.Length) 2043 loc -= waiting.Length; 2044 2045 // We should only wrap one time 2046 System.Diagnostics.Debug.Assert(loc < waiting.Length); 2047 2048 // Nobody should be in my spot! 2049 System.Diagnostics.Debug.Assert(waiting [loc] == null); 2050 waiting[loc] = doc; 2051 numWaiting++; 2052 waitingBytes += doc.SizeInBytes(); 2053 } 2054 2055 return DoPause(); 2056 } 2057 } 2058 } DocumentsWriter()2059 static DocumentsWriter() 2060 { 2061 DefaultIndexingChain = new AnonymousClassIndexingChain(); 2062 POINTER_NUM_BYTE = Constants.JRE_IS_64BIT?8:4; 2063 } 2064 2065 public static int BYTE_BLOCK_SIZE_ForNUnit 2066 { 2067 get { return BYTE_BLOCK_SIZE; } 2068 } 2069 2070 public static int CHAR_BLOCK_SIZE_ForNUnit 2071 { 2072 get { return CHAR_BLOCK_SIZE; } 2073 } 2074 } 2075 }