/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Lucene.Net.Support;
using Analyzer = Lucene.Net.Analysis.Analyzer;
using Document = Lucene.Net.Documents.Document;
using AlreadyClosedException = Lucene.Net.Store.AlreadyClosedException;
using Directory = Lucene.Net.Store.Directory;
using ArrayUtil = Lucene.Net.Util.ArrayUtil;
using Constants = Lucene.Net.Util.Constants;
using IndexSearcher = Lucene.Net.Search.IndexSearcher;
using Query = Lucene.Net.Search.Query;
using Scorer = Lucene.Net.Search.Scorer;
using Similarity = Lucene.Net.Search.Similarity;
using Weight = Lucene.Net.Search.Weight;
namespace Lucene.Net.Index
{
/// This class accepts multiple added documents and directly
/// writes a single segment file. It does this more
/// efficiently than creating a single segment per document
/// (with DocumentWriter) and doing standard merges on those
/// segments.
///
/// Each added document is passed to the ,
/// which in turn processes the document and interacts with
/// other consumers in the indexing chain. Certain
/// consumers, like and
///, digest a document and
/// immediately write bytes to the "doc store" files (ie,
/// they do not consume RAM per document, except while they
/// are processing the document).
///
/// Other consumers, eg and
/// , buffer bytes in RAM and flush only
/// when a new segment is produced.
/// Once we have used our allowed RAM buffer, or the number
/// of added docs is large enough (in the case we are
/// flushing by doc count instead of RAM usage), we create a
/// real segment and flush it to the Directory.
///
/// Threads:
///
/// Multiple threads are allowed into addDocument at once.
/// There is an initial synchronized call to getThreadState
/// which allocates a ThreadState for this thread. The same
/// thread will get the same ThreadState over time (thread
/// affinity) so that if there are consistent patterns (for
/// example each thread is indexing a different content
/// source) then we make better use of RAM. Then
/// processDocument is called on that ThreadState without
/// synchronization (most of the "heavy lifting" is in this
/// call). Finally the synchronized "finishDocument" is
/// called to flush changes to the directory.
///
/// When flush is called by IndexWriter we forcefully idle
/// all threads and flush only once they are all idle. This
/// means you can call flush with a given thread even while
/// other threads are actively adding/deleting documents.
///
///
/// Exceptions:
///
/// Because this class directly updates in-memory posting
/// lists, and flushes stored fields and term vectors
/// directly to files in the directory, there are certain
/// limited times when an exception can corrupt this state.
/// For example, a disk full while flushing stored fields
/// leaves this file in a corrupt state. Or, an OOM
/// exception while appending to the in-memory posting lists
/// can corrupt that posting list. We call such exceptions
/// "aborting exceptions". In these cases we must call
/// abort() to discard all docs added since the last flush.
///
/// All other exceptions ("non-aborting exceptions") can
/// still partially update the index structures. These
/// updates are consistent, but, they represent only a part
/// of the document seen up until the exception was hit.
/// When this happens, we immediately mark the document as
/// deleted so that the document is always atomically ("all
/// or none") added to the index.
///
public sealed class DocumentsWriter : IDisposable
{
internal class AnonymousClassIndexingChain:IndexingChain
{
internal override DocConsumer GetChain(DocumentsWriter documentsWriter)
{
/*
This is the current indexing chain:
DocConsumer / DocConsumerPerThread
--> code: DocFieldProcessor / DocFieldProcessorPerThread
--> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
--> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
--> code: DocInverter / DocInverterPerThread / DocInverterPerField
--> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
--> code: TermsHash / TermsHashPerThread / TermsHashPerField
--> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
--> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
--> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
--> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
--> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
--> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
*/
// Build up indexing chain:
TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, new TermsHash(documentsWriter, false, termVectorsWriter, null));
NormsWriter normsWriter = new NormsWriter();
DocInverter docInverter = new DocInverter(termsHash, normsWriter);
return new DocFieldProcessor(documentsWriter, docInverter);
}
}
private void InitBlock()
{
maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024);
waitQueuePauseBytes = (long) (ramBufferSize * 0.1);
waitQueueResumeBytes = (long) (ramBufferSize * 0.05);
freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 1.05);
freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 0.95);
maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
skipDocWriter = new SkipDocWriter();
byteBlockAllocator = new ByteBlockAllocator(this, DocumentsWriter.BYTE_BLOCK_SIZE);
perDocAllocator = new ByteBlockAllocator(this,DocumentsWriter.PER_DOC_BLOCK_SIZE);
waitQueue = new WaitQueue(this);
}
internal IndexWriter writer;
internal Directory directory;
internal System.String segment; // Current segment we are working on
private System.String docStoreSegment; // Current doc-store segment we are writing
private int docStoreOffset; // Current starting doc-store offset of current segment
private int nextDocID; // Next docID to be added
private int numDocsInRAM; // # docs buffered in RAM
internal int numDocsInStore; // # docs written to doc stores
// Max # ThreadState instances; if there are more threads
// than this they share ThreadStates
private const int MAX_THREAD_STATE = 5;
private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
private HashMap threadBindings = new HashMap();
private int pauseThreads; // Non-zero when we need all threads to
// pause (eg to flush)
internal bool flushPending; // True when a thread has decided to flush
internal bool bufferIsFull; // True when it's time to write segment
private bool aborting; // True if an abort is pending
private DocFieldProcessor docFieldProcessor;
internal System.IO.StreamWriter infoStream;
internal int maxFieldLength;
internal Similarity similarity;
internal IList newFiles;
internal class DocState
{
internal DocumentsWriter docWriter;
internal Analyzer analyzer;
internal int maxFieldLength;
internal System.IO.StreamWriter infoStream;
internal Similarity similarity;
internal int docID;
internal Document doc;
internal System.String maxTermPrefix;
// Only called by asserts
public bool TestPoint(System.String name)
{
return docWriter.writer.TestPoint(name);
}
public void Clear()
{
// don't hold onto doc nor analyzer, in case it is
// largish:
doc = null;
analyzer = null;
}
}
/// Consumer returns this on each doc. This holds any
/// state that must be flushed synchronized "in docID
/// order". We gather these and flush them in order.
///
internal abstract class DocWriter
{
internal DocWriter next;
internal int docID;
public abstract void Finish();
public abstract void Abort();
public abstract long SizeInBytes();
internal void SetNext(DocWriter next)
{
this.next = next;
}
}
/*
* Create and return a new DocWriterBuffer.
*/
internal PerDocBuffer NewPerDocBuffer()
{
return new PerDocBuffer(this);
}
/*
* RAMFile buffer for DocWriters.
*/
internal class PerDocBuffer : Lucene.Net.Store.RAMFile
{
DocumentsWriter enclosingInstance;
public PerDocBuffer(DocumentsWriter enclosingInstance)
{
this.enclosingInstance = enclosingInstance;
}
/*
* Allocate bytes used from shared pool.
*/
public override byte[] NewBuffer(int size)
{
System.Diagnostics.Debug.Assert(size == PER_DOC_BLOCK_SIZE);
return enclosingInstance.perDocAllocator.GetByteBlock(false);
}
/*
* Recycle the bytes used.
*/
internal void Recycle()
{
lock (this)
{
if (buffers.Count > 0)
{
Length = 0;
// Recycle the blocks
enclosingInstance.perDocAllocator.RecycleByteBlocks(buffers);
buffers.Clear();
sizeInBytes = 0;
System.Diagnostics.Debug.Assert(NumBuffers() == 0);
}
}
}
}
/// The IndexingChain must define the method
/// which returns the DocConsumer that the DocumentsWriter calls to process the
/// documents.
///
internal abstract class IndexingChain
{
internal abstract DocConsumer GetChain(DocumentsWriter documentsWriter);
}
internal static readonly IndexingChain DefaultIndexingChain;
internal DocConsumer consumer;
// Deletes done after the last flush; these are discarded
// on abort
private BufferedDeletes deletesInRAM = new BufferedDeletes(false);
// Deletes done before the last flush; these are still
// kept on abort
private BufferedDeletes deletesFlushed = new BufferedDeletes(true);
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
private int maxBufferedDeleteTerms;
// How much RAM we can use before flushing. This is 0 if
// we are flushing by doc count instead.
private long ramBufferSize;
private long waitQueuePauseBytes;
private long waitQueueResumeBytes;
// If we've allocated 5% over our RAM budget, we then
// free down to 95%
private long freeTrigger;
private long freeLevel;
// Flush @ this number of docs. If ramBufferSize is
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs;
private int flushedDocCount; // How many docs already flushed to index
internal void UpdateFlushedDocCount(int n)
{
lock (this)
{
flushedDocCount += n;
}
}
internal int GetFlushedDocCount()
{
lock (this)
{
return flushedDocCount;
}
}
internal void SetFlushedDocCount(int n)
{
lock (this)
{
flushedDocCount = n;
}
}
private bool closed;
internal DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain)
{
InitBlock();
this.directory = directory;
this.writer = writer;
this.similarity = writer.Similarity;
flushedDocCount = writer.MaxDoc();
consumer = indexingChain.GetChain(this);
if (consumer is DocFieldProcessor)
{
docFieldProcessor = (DocFieldProcessor) consumer;
}
}
/// Returns true if any of the fields in the current
/// buffered docs have omitTermFreqAndPositions==false
///
internal bool HasProx()
{
return (docFieldProcessor != null)?docFieldProcessor.fieldInfos.HasProx():true;
}
/// If non-null, various details of indexing are printed
/// here.
///
internal void SetInfoStream(System.IO.StreamWriter infoStream)
{
lock (this)
{
this.infoStream = infoStream;
for (int i = 0; i < threadStates.Length; i++)
threadStates[i].docState.infoStream = infoStream;
}
}
internal void SetMaxFieldLength(int maxFieldLength)
{
lock (this)
{
this.maxFieldLength = maxFieldLength;
for (int i = 0; i < threadStates.Length; i++)
threadStates[i].docState.maxFieldLength = maxFieldLength;
}
}
internal void SetSimilarity(Similarity similarity)
{
lock (this)
{
this.similarity = similarity;
for (int i = 0; i < threadStates.Length; i++)
threadStates[i].docState.similarity = similarity;
}
}
/// Set how much RAM we can use before flushing.
internal void SetRAMBufferSizeMB(double mb)
{
lock (this)
{
if (mb == IndexWriter.DISABLE_AUTO_FLUSH)
{
ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;
waitQueuePauseBytes = 4 * 1024 * 1024;
waitQueueResumeBytes = 2 * 1024 * 1024;
}
else
{
ramBufferSize = (long) (mb * 1024 * 1024);
waitQueuePauseBytes = (long) (ramBufferSize * 0.1);
waitQueueResumeBytes = (long) (ramBufferSize * 0.05);
freeTrigger = (long) (1.05 * ramBufferSize);
freeLevel = (long) (0.95 * ramBufferSize);
}
}
}
internal double GetRAMBufferSizeMB()
{
lock (this)
{
if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH)
{
return ramBufferSize;
}
else
{
return ramBufferSize / 1024.0 / 1024.0;
}
}
}
/// Gets or sets max buffered docs, which means we will flush by
/// doc count instead of by RAM usage.
///
internal int MaxBufferedDocs
{
get { return maxBufferedDocs; }
set { maxBufferedDocs = value; }
}
/// Get current segment name we are writing.
internal string Segment
{
get { return segment; }
}
/// Returns how many docs are currently buffered in RAM.
internal int NumDocsInRAM
{
get { return numDocsInRAM; }
}
/// Returns the current doc store segment we are writing
/// to.
///
internal string DocStoreSegment
{
get
{
lock (this)
{
return docStoreSegment;
}
}
}
/// Returns the doc offset into the shared doc store for
/// the current buffered docs.
///
internal int DocStoreOffset
{
get { return docStoreOffset; }
}
/// Closes the current open doc stores an returns the doc
/// store segment name. This returns null if there are *
/// no buffered documents.
///
internal System.String CloseDocStore()
{
lock (this)
{
System.Diagnostics.Debug.Assert(AllThreadsIdle());
if (infoStream != null)
Message("closeDocStore: " + openFiles.Count + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
bool success = false;
try
{
InitFlushState(true);
closedFiles.Clear();
consumer.CloseDocStore(flushState);
System.Diagnostics.Debug.Assert(0 == openFiles.Count);
System.String s = docStoreSegment;
docStoreSegment = null;
docStoreOffset = 0;
numDocsInStore = 0;
success = true;
return s;
}
finally
{
if (!success)
{
Abort();
}
}
}
}
private ICollection abortedFiles; // List of files that were written before last abort()
private SegmentWriteState flushState;
internal ICollection AbortedFiles()
{
return abortedFiles;
}
internal void Message(System.String message)
{
if (infoStream != null)
writer.Message("DW: " + message);
}
internal IList openFiles = new List();
internal IList closedFiles = new List();
/* Returns Collection of files in use by this instance,
* including any flushed segments. */
internal IList OpenFiles()
{
lock (this)
{
// ToArray returns a copy
return openFiles.ToArray();
}
}
internal IList ClosedFiles()
{
lock (this)
{
// ToArray returns a copy
return closedFiles.ToArray();
}
}
internal void AddOpenFile(System.String name)
{
lock (this)
{
System.Diagnostics.Debug.Assert(!openFiles.Contains(name));
openFiles.Add(name);
}
}
internal void RemoveOpenFile(System.String name)
{
lock (this)
{
System.Diagnostics.Debug.Assert(openFiles.Contains(name));
openFiles.Remove(name);
closedFiles.Add(name);
}
}
internal void SetAborting()
{
lock (this)
{
aborting = true;
}
}
/// Called if we hit an exception at a bad time (when
/// updating the index files) and must discard all
/// currently buffered docs. This resets our state,
/// discarding any docs added since last flush.
///
internal void Abort()
{
lock (this)
{
try
{
if (infoStream != null)
{
Message("docWriter: now abort");
}
// Forcefully remove waiting ThreadStates from line
waitQueue.Abort();
// Wait for all other threads to finish with
// DocumentsWriter:
PauseAllThreads();
try
{
System.Diagnostics.Debug.Assert(0 == waitQueue.numWaiting);
waitQueue.waitingBytes = 0;
try
{
abortedFiles = OpenFiles();
}
catch (System.Exception)
{
abortedFiles = null;
}
deletesInRAM.Clear();
deletesFlushed.Clear();
openFiles.Clear();
for (int i = 0; i < threadStates.Length; i++)
try
{
threadStates[i].consumer.Abort();
}
catch (System.Exception)
{
}
try
{
consumer.Abort();
}
catch (System.Exception)
{
}
docStoreSegment = null;
numDocsInStore = 0;
docStoreOffset = 0;
// Reset all postings data
DoAfterFlush();
}
finally
{
ResumeAllThreads();
}
}
finally
{
aborting = false;
System.Threading.Monitor.PulseAll(this);
if (infoStream != null)
{
Message("docWriter: done abort; abortedFiles=" + abortedFiles);
}
}
}
}
/// Reset after a flush
private void DoAfterFlush()
{
// All ThreadStates should be idle when we are called
System.Diagnostics.Debug.Assert(AllThreadsIdle());
threadBindings.Clear();
waitQueue.Reset();
segment = null;
numDocsInRAM = 0;
nextDocID = 0;
bufferIsFull = false;
flushPending = false;
for (int i = 0; i < threadStates.Length; i++)
threadStates[i].DoAfterFlush();
numBytesUsed = 0;
}
// Returns true if an abort is in progress
internal bool PauseAllThreads()
{
lock (this)
{
pauseThreads++;
while (!AllThreadsIdle())
{
System.Threading.Monitor.Wait(this);
}
return aborting;
}
}
internal void ResumeAllThreads()
{
lock (this)
{
pauseThreads--;
System.Diagnostics.Debug.Assert(pauseThreads >= 0);
if (0 == pauseThreads)
System.Threading.Monitor.PulseAll(this);
}
}
private bool AllThreadsIdle()
{
lock (this)
{
for (int i = 0; i < threadStates.Length; i++)
if (!threadStates[i].isIdle)
return false;
return true;
}
}
internal bool AnyChanges
{
get
{
lock (this)
{
return numDocsInRAM != 0 || deletesInRAM.numTerms != 0 || deletesInRAM.docIDs.Count != 0 ||
deletesInRAM.queries.Count != 0;
}
}
}
private void InitFlushState(bool onlyDocStore)
{
lock (this)
{
InitSegmentName(onlyDocStore);
flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.TermIndexInterval);
}
}
/// Flush all pending docs to a new segment
internal int Flush(bool closeDocStore)
{
lock (this)
{
System.Diagnostics.Debug.Assert(AllThreadsIdle());
System.Diagnostics.Debug.Assert(numDocsInRAM > 0);
System.Diagnostics.Debug.Assert(nextDocID == numDocsInRAM);
System.Diagnostics.Debug.Assert(waitQueue.numWaiting == 0);
System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0);
InitFlushState(false);
docStoreOffset = numDocsInStore;
if (infoStream != null)
Message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
bool success = false;
try
{
if (closeDocStore)
{
System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName != null);
System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName.Equals(flushState.segmentName));
CloseDocStore();
flushState.numDocsInStore = 0;
}
ICollection threads = new HashSet();
for (int i = 0; i < threadStates.Length; i++)
threads.Add(threadStates[i].consumer);
consumer.Flush(threads, flushState);
if (infoStream != null)
{
SegmentInfo si = new SegmentInfo(flushState.segmentName, flushState.numDocs, directory);
long newSegmentSize = si.SizeInBytes();
System.String message = System.String.Format(nf, " oldRAMSize={0:d} newFlushedSize={1:d} docs/MB={2:f} new/old={3:%}",
new System.Object[] { numBytesUsed, newSegmentSize, (numDocsInRAM / (newSegmentSize / 1024.0 / 1024.0)), (100.0 * newSegmentSize / numBytesUsed) });
Message(message);
}
flushedDocCount += flushState.numDocs;
DoAfterFlush();
success = true;
}
finally
{
if (!success)
{
Abort();
}
}
System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0);
return flushState.numDocs;
}
}
internal ICollection GetFlushedFiles()
{
return flushState.flushedFiles;
}
/// Build compound file for the segment we just flushed
internal void CreateCompoundFile(System.String segment)
{
CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);
foreach(string flushedFile in flushState.flushedFiles)
{
cfsWriter.AddFile(flushedFile);
}
// Perform the merge
cfsWriter.Close();
}
/// Set flushPending if it is not already set and returns
/// whether it was set. This is used by IndexWriter to
/// trigger a single flush even when multiple threads are
/// trying to do so.
///
internal bool SetFlushPending()
{
lock (this)
{
if (flushPending)
return false;
else
{
flushPending = true;
return true;
}
}
}
internal void ClearFlushPending()
{
lock (this)
{
flushPending = false;
}
}
internal void PushDeletes()
{
lock (this)
{
deletesFlushed.Update(deletesInRAM);
}
}
public void Dispose()
{
// Move to protected method if class becomes unsealed
lock (this)
{
closed = true;
System.Threading.Monitor.PulseAll(this);
}
}
internal void InitSegmentName(bool onlyDocStore)
{
lock (this)
{
if (segment == null && (!onlyDocStore || docStoreSegment == null))
{
segment = writer.NewSegmentName();
System.Diagnostics.Debug.Assert(numDocsInRAM == 0);
}
if (docStoreSegment == null)
{
docStoreSegment = segment;
System.Diagnostics.Debug.Assert(numDocsInStore == 0);
}
}
}
/// Returns a free (idle) ThreadState that may be used for
/// indexing this one document. This call also pauses if a
/// flush is pending. If delTerm is non-null then we
/// buffer this deleted term after the thread state has
/// been acquired.
///
internal DocumentsWriterThreadState GetThreadState(Document doc, Term delTerm)
{
lock (this)
{
// First, find a thread state. If this thread already
// has affinity to a specific ThreadState, use that one
// again.
DocumentsWriterThreadState state = threadBindings[ThreadClass.Current()];
if (state == null)
{
// First time this thread has called us since last
// flush. Find the least loaded thread state:
DocumentsWriterThreadState minThreadState = null;
for (int i = 0; i < threadStates.Length; i++)
{
DocumentsWriterThreadState ts = threadStates[i];
if (minThreadState == null || ts.numThreads < minThreadState.numThreads)
minThreadState = ts;
}
if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.Length >= MAX_THREAD_STATE))
{
state = minThreadState;
state.numThreads++;
}
else
{
// Just create a new "private" thread state
DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1 + threadStates.Length];
if (threadStates.Length > 0)
Array.Copy(threadStates, 0, newArray, 0, threadStates.Length);
state = newArray[threadStates.Length] = new DocumentsWriterThreadState(this);
threadStates = newArray;
}
threadBindings[ThreadClass.Current()] = state;
}
// Next, wait until my thread state is idle (in case
// it's shared with other threads) and for threads to
// not be paused nor a flush pending:
WaitReady(state);
// Allocate segment name if this is the first doc since
// last flush:
InitSegmentName(false);
state.isIdle = false;
bool success = false;
try
{
state.docState.docID = nextDocID;
System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init start"));
if (delTerm != null)
{
AddDeleteTerm(delTerm, state.docState.docID);
state.doFlushAfter = TimeToFlushDeletes();
}
System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init after delTerm"));
nextDocID++;
numDocsInRAM++;
// We must at this point commit to flushing to ensure we
// always get N docs when we flush by doc count, even if
// > 1 thread is adding documents:
if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs)
{
flushPending = true;
state.doFlushAfter = true;
}
success = true;
}
finally
{
if (!success)
{
// Forcefully idle this ThreadState:
state.isIdle = true;
System.Threading.Monitor.PulseAll(this);
if (state.doFlushAfter)
{
state.doFlushAfter = false;
flushPending = false;
}
}
}
return state;
}
}
/// Returns true if the caller (IndexWriter) should now
/// flush.
///
internal bool AddDocument(Document doc, Analyzer analyzer)
{
return UpdateDocument(doc, analyzer, null);
}
internal bool UpdateDocument(Term t, Document doc, Analyzer analyzer)
{
return UpdateDocument(doc, analyzer, t);
}
internal bool UpdateDocument(Document doc, Analyzer analyzer, Term delTerm)
{
// This call is synchronized but fast
DocumentsWriterThreadState state = GetThreadState(doc, delTerm);
DocState docState = state.docState;
docState.doc = doc;
docState.analyzer = analyzer;
bool doReturnFalse = false; // {{Aroush-2.9}} to handle return from finally clause
bool success = false;
try
{
// This call is not synchronized and does all the
// work
DocWriter perDoc;
try
{
perDoc = state.consumer.ProcessDocument();
}
finally
{
docState.Clear();
}
// This call is synchronized but fast
FinishDocument(state, perDoc);
success = true;
}
finally
{
if (!success)
{
lock (this)
{
if (aborting)
{
state.isIdle = true;
System.Threading.Monitor.PulseAll(this);
Abort();
}
else
{
skipDocWriter.docID = docState.docID;
bool success2 = false;
try
{
waitQueue.Add(skipDocWriter);
success2 = true;
}
finally
{
if (!success2)
{
state.isIdle = true;
System.Threading.Monitor.PulseAll(this);
Abort();
// return false; // {{Aroush-2.9}} this 'return false' is move to outside finally
doReturnFalse = true;
}
}
if (!doReturnFalse) // {{Aroush-2.9}} added because of the above 'return false' removal
{
state.isIdle = true;
System.Threading.Monitor.PulseAll(this);
// If this thread state had decided to flush, we
// must clear it so another thread can flush
if (state.doFlushAfter)
{
state.doFlushAfter = false;
flushPending = false;
System.Threading.Monitor.PulseAll(this);
}
// Immediately mark this document as deleted
// since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when
// adding a document:
AddDeleteDocID(state.docState.docID);
}
}
}
}
}
if (doReturnFalse) // {{Aroush-2.9}} see comment abouve
{
return false;
}
return state.doFlushAfter || TimeToFlushDeletes();
}
// for testing
internal int GetNumBufferedDeleteTerms()
{
lock (this)
{
return deletesInRAM.numTerms;
}
}
// for testing
internal IDictionary GetBufferedDeleteTerms()
{
lock (this)
{
return deletesInRAM.terms;
}
}
/// Called whenever a merge has completed and the merged segments had deletions
internal void RemapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount)
{
lock (this)
{
if (docMaps == null)
// The merged segments had no deletes so docIDs did not change and we have nothing to do
return ;
MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount);
deletesInRAM.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
deletesFlushed.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
flushedDocCount -= mapper.docShift;
}
}
private void WaitReady(DocumentsWriterThreadState state)
{
lock (this)
{
while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting))
{
System.Threading.Monitor.Wait(this);
}
if (closed)
throw new AlreadyClosedException("this IndexWriter is closed");
}
}
internal bool BufferDeleteTerms(Term[] terms)
{
lock (this)
{
WaitReady(null);
for (int i = 0; i < terms.Length; i++)
AddDeleteTerm(terms[i], numDocsInRAM);
return TimeToFlushDeletes();
}
}
internal bool BufferDeleteTerm(Term term)
{
lock (this)
{
WaitReady(null);
AddDeleteTerm(term, numDocsInRAM);
return TimeToFlushDeletes();
}
}
internal bool BufferDeleteQueries(Query[] queries)
{
lock (this)
{
WaitReady(null);
for (int i = 0; i < queries.Length; i++)
AddDeleteQuery(queries[i], numDocsInRAM);
return TimeToFlushDeletes();
}
}
internal bool BufferDeleteQuery(Query query)
{
lock (this)
{
WaitReady(null);
AddDeleteQuery(query, numDocsInRAM);
return TimeToFlushDeletes();
}
}
internal bool DeletesFull()
{
lock (this)
{
return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms));
}
}
internal bool DoApplyDeletes()
{
lock (this)
{
// Very similar to deletesFull(), except we don't count
// numBytesAlloc, because we are checking whether
// deletes (alone) are consuming too many resources now
// and thus should be applied. We apply deletes if RAM
// usage is > 1/2 of our allowed RAM buffer, to prevent
// too-frequent flushing of a long tail of tiny segments
// when merges (which always apply deletes) are
// infrequent.
return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize / 2) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms));
}
}
private bool TimeToFlushDeletes()
{
lock (this)
{
return (bufferIsFull || DeletesFull()) && SetFlushPending();
}
}
internal int MaxBufferedDeleteTerms
{
set { this.maxBufferedDeleteTerms = value; }
get { return maxBufferedDeleteTerms; }
}
internal bool HasDeletes()
{
lock (this)
{
return deletesFlushed.Any();
}
}
internal bool ApplyDeletes(SegmentInfos infos)
{
lock (this)
{
if (!HasDeletes())
return false;
if (infoStream != null)
Message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + deletesFlushed.docIDs.Count + " deleted docIDs and " + deletesFlushed.queries.Count + " deleted queries on " + (+ infos.Count) + " segments.");
int infosEnd = infos.Count;
int docStart = 0;
bool any = false;
for (int i = 0; i < infosEnd; i++)
{
// Make sure we never attempt to apply deletes to
// segment in external dir
System.Diagnostics.Debug.Assert(infos.Info(i).dir == directory);
SegmentReader reader = writer.readerPool.Get(infos.Info(i), false);
try
{
any |= ApplyDeletes(reader, docStart);
docStart += reader.MaxDoc;
}
finally
{
writer.readerPool.Release(reader);
}
}
deletesFlushed.Clear();
return any;
}
}
// used only by assert
private Term lastDeleteTerm;
// used only by assert
private bool CheckDeleteTerm(Term term)
{
if (term != null) {
System.Diagnostics.Debug.Assert(lastDeleteTerm == null || term.CompareTo(lastDeleteTerm) > 0, "lastTerm=" + lastDeleteTerm + " vs term=" + term);
}
lastDeleteTerm = term;
return true;
}
// Apply buffered delete terms, queries and docIDs to the
// provided reader
private bool ApplyDeletes(IndexReader reader, int docIDStart)
{
lock (this)
{
int docEnd = docIDStart + reader.MaxDoc;
bool any = false;
System.Diagnostics.Debug.Assert(CheckDeleteTerm(null));
// Delete by term
TermDocs docs = reader.TermDocs();
try
{
foreach(KeyValuePair entry in deletesFlushed.terms)
{
Term term = entry.Key;
// LUCENE-2086: we should be iterating a TreeMap,
// here, so terms better be in order:
System.Diagnostics.Debug.Assert(CheckDeleteTerm(term));
docs.Seek(term);
int limit = entry.Value.GetNum();
while (docs.Next())
{
int docID = docs.Doc;
if (docIDStart + docID >= limit)
break;
reader.DeleteDocument(docID);
any = true;
}
}
}
finally
{
docs.Close();
}
// Delete by docID
foreach(int docIdInt in deletesFlushed.docIDs)
{
int docID = docIdInt;
if (docID >= docIDStart && docID < docEnd)
{
reader.DeleteDocument(docID - docIDStart);
any = true;
}
}
// Delete by query
IndexSearcher searcher = new IndexSearcher(reader);
foreach(KeyValuePair entry in deletesFlushed.queries)
{
Query query = (Query) entry.Key;
int limit = (int)entry.Value;
Weight weight = query.Weight(searcher);
Scorer scorer = weight.Scorer(reader, true, false);
if (scorer != null)
{
while (true)
{
int doc = scorer.NextDoc();
if (((long) docIDStart) + doc >= limit)
break;
reader.DeleteDocument(doc);
any = true;
}
}
}
searcher.Close();
return any;
}
}
// Buffer a term in bufferedDeleteTerms, which records the
// current number of documents buffered in ram so that the
// delete term will be applied to those documents as well
// as the disk segments.
private void AddDeleteTerm(Term term, int docCount)
{
lock (this)
{
BufferedDeletes.Num num = deletesInRAM.terms[term];
int docIDUpto = flushedDocCount + docCount;
if (num == null)
deletesInRAM.terms[term] = new BufferedDeletes.Num(docIDUpto);
else
num.SetNum(docIDUpto);
deletesInRAM.numTerms++;
deletesInRAM.AddBytesUsed(BYTES_PER_DEL_TERM + term.Text.Length * CHAR_NUM_BYTE);
}
}
// Buffer a specific docID for deletion. Currently only
// used when we hit a exception when adding a document
private void AddDeleteDocID(int docID)
{
lock (this)
{
deletesInRAM.docIDs.Add(flushedDocCount + docID);
deletesInRAM.AddBytesUsed(BYTES_PER_DEL_DOCID);
}
}
private void AddDeleteQuery(Query query, int docID)
{
lock (this)
{
deletesInRAM.queries[query] = flushedDocCount + docID;
deletesInRAM.AddBytesUsed(BYTES_PER_DEL_QUERY);
}
}
internal bool DoBalanceRAM()
{
lock (this)
{
return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger);
}
}
/// Does the synchronized work to finish/flush the
/// inverted document.
///
private void FinishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)
{
if (DoBalanceRAM())
// Must call this w/o holding synchronized(this) else
// we'll hit deadlock:
BalanceRAM();
lock (this)
{
System.Diagnostics.Debug.Assert(docWriter == null || docWriter.docID == perThread.docState.docID);
if (aborting)
{
// We are currently aborting, and another thread is
// waiting for me to become idle. We just forcefully
// idle this threadState; it will be fully reset by
// abort()
if (docWriter != null)
try
{
docWriter.Abort();
}
catch (System.Exception)
{
}
perThread.isIdle = true;
System.Threading.Monitor.PulseAll(this);
return ;
}
bool doPause;
if (docWriter != null)
doPause = waitQueue.Add(docWriter);
else
{
skipDocWriter.docID = perThread.docState.docID;
doPause = waitQueue.Add(skipDocWriter);
}
if (doPause)
WaitForWaitQueue();
if (bufferIsFull && !flushPending)
{
flushPending = true;
perThread.doFlushAfter = true;
}
perThread.isIdle = true;
System.Threading.Monitor.PulseAll(this);
}
}
internal void WaitForWaitQueue()
{
lock (this)
{
do
{
System.Threading.Monitor.Wait(this);
}
while (!waitQueue.DoResume());
}
}
internal class SkipDocWriter:DocWriter
{
public override void Finish()
{
}
public override void Abort()
{
}
public override long SizeInBytes()
{
return 0;
}
}
internal SkipDocWriter skipDocWriter;
internal long GetRAMUsed()
{
return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
}
internal long numBytesAlloc;
internal long numBytesUsed;
internal System.Globalization.NumberFormatInfo nf = System.Globalization.CultureInfo.CurrentCulture.NumberFormat;
// Coarse estimates used to measure RAM usage of buffered deletes
internal const int OBJECT_HEADER_BYTES = 8;
internal static readonly int POINTER_NUM_BYTE;
internal const int INT_NUM_BYTE = 4;
internal const int CHAR_NUM_BYTE = 2;
/* Rough logic: HashMap has an array[Entry] w/ varying
load factor (say 2 * POINTER). Entry is object w/ Term
key, BufferedDeletes.Num val, int hash, Entry next
(OBJ_HEADER + 3*POINTER + INT). Term is object w/
String field and String text (OBJ_HEADER + 2*POINTER).
We don't count Term's field since it's interned.
Term's text is String (OBJ_HEADER + 4*INT + POINTER +
OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is
OBJ_HEADER + INT. */
internal static readonly int BYTES_PER_DEL_TERM = 8 * POINTER_NUM_BYTE + 5 * OBJECT_HEADER_BYTES + 6 * INT_NUM_BYTE;
/* Rough logic: del docIDs are List. Say list
allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
+ int */
internal static readonly int BYTES_PER_DEL_DOCID = 2 * POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
/* Rough logic: HashMap has an array[Entry] w/ varying
load factor (say 2 * POINTER). Entry is object w/
Query key, Integer val, int hash, Entry next
(OBJ_HEADER + 3*POINTER + INT). Query we often
undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
internal static readonly int BYTES_PER_DEL_QUERY = 5 * POINTER_NUM_BYTE + 2 * OBJECT_HEADER_BYTES + 2 * INT_NUM_BYTE + 24;
/* Initial chunks size of the shared byte[] blocks used to
store postings data */
internal const int BYTE_BLOCK_SHIFT = 15;
internal static readonly int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
internal static readonly int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
internal static readonly int BYTE_BLOCK_NOT_MASK = ~ BYTE_BLOCK_MASK;
internal class ByteBlockAllocator : ByteBlockPool.Allocator
{
public ByteBlockAllocator(DocumentsWriter enclosingInstance, int blockSize)
{
this.blockSize = blockSize;
InitBlock(enclosingInstance);
}
private void InitBlock(DocumentsWriter enclosingInstance)
{
this.enclosingInstance = enclosingInstance;
}
private DocumentsWriter enclosingInstance;
public DocumentsWriter Enclosing_Instance
{
get
{
return enclosingInstance;
}
}
int blockSize;
internal List freeByteBlocks = new List();
/* Allocate another byte[] from the shared pool */
public /*internal*/ override byte[] GetByteBlock(bool trackAllocations)
{
lock (Enclosing_Instance)
{
int size = freeByteBlocks.Count;
byte[] b;
if (0 == size)
{
// Always record a block allocated, even if
// trackAllocations is false. This is necessary
// because this block will be shared between
// things that don't track allocations (term
// vectors) and things that do (freq/prox
// postings).
Enclosing_Instance.numBytesAlloc += blockSize;
b = new byte[blockSize];
}
else
{
b = freeByteBlocks[size - 1];
freeByteBlocks.RemoveAt(size - 1);
}
if (trackAllocations)
Enclosing_Instance.numBytesUsed += blockSize;
System.Diagnostics.Debug.Assert(Enclosing_Instance.numBytesUsed <= Enclosing_Instance.numBytesAlloc);
return b;
}
}
/* Return byte[]'s to the pool */
public /*internal*/ override void RecycleByteBlocks(byte[][] blocks, int start, int end)
{
lock (Enclosing_Instance)
{
for (int i = start; i < end; i++)
{
freeByteBlocks.Add(blocks[i]);
blocks[i] = null;
}
}
}
public /*internal*/ override void RecycleByteBlocks(IList blocks)
{
lock (Enclosing_Instance)
{
int size = blocks.Count;
for(int i=0;i freeIntBlocks = new List();
/* Allocate another int[] from the shared pool */
internal int[] GetIntBlock(bool trackAllocations)
{
lock (this)
{
int size = freeIntBlocks.Count;
int[] b;
if (0 == size)
{
// Always record a block allocated, even if
// trackAllocations is false. This is necessary
// because this block will be shared between
// things that don't track allocations (term
// vectors) and things that do (freq/prox
// postings).
numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE;
b = new int[INT_BLOCK_SIZE];
}
else
{
b = freeIntBlocks[size - 1];
freeIntBlocks.RemoveAt(size - 1);
}
if (trackAllocations)
numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE;
System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
return b;
}
}
internal void BytesAllocated(long numBytes)
{
lock (this)
{
numBytesAlloc += numBytes;
}
}
internal void BytesUsed(long numBytes)
{
lock (this)
{
numBytesUsed += numBytes;
System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
}
}
/* Return int[]s to the pool */
internal void RecycleIntBlocks(int[][] blocks, int start, int end)
{
lock (this)
{
for (int i = start; i < end; i++)
{
freeIntBlocks.Add(blocks[i]);
blocks[i] = null;
}
}
}
internal ByteBlockAllocator byteBlockAllocator;
internal static int PER_DOC_BLOCK_SIZE = 1024;
ByteBlockAllocator perDocAllocator;
/* Initial chunk size of the shared char[] blocks used to
store term text */
internal const int CHAR_BLOCK_SHIFT = 14;
internal static readonly int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
internal static readonly int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
internal static readonly int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1;
private List freeCharBlocks = new List();
/* Allocate another char[] from the shared pool */
internal char[] GetCharBlock()
{
lock (this)
{
int size = freeCharBlocks.Count;
char[] c;
if (0 == size)
{
numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
c = new char[CHAR_BLOCK_SIZE];
}
else
{
c = freeCharBlocks[size - 1];
freeCharBlocks.RemoveAt(size - 1);
}
// We always track allocations of char blocks, for now,
// because nothing that skips allocation tracking
// (currently only term vectors) uses its own char
// blocks.
numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
return c;
}
}
/* Return char[]s to the pool */
internal void RecycleCharBlocks(char[][] blocks, int numBlocks)
{
lock (this)
{
for (int i = 0; i < numBlocks; i++)
{
freeCharBlocks.Add(blocks[i]);
blocks[i] = null;
}
}
}
internal System.String ToMB(long v)
{
return System.String.Format(nf, "{0:f}", new System.Object[] { (v / 1024F / 1024F) });
}
/* We have four pools of RAM: Postings, byte blocks
* (holds freq/prox posting data), char blocks (holds
* characters in the term) and per-doc buffers (stored fields/term vectors).
* Different docs require varying amount of storage from
* these four classes.
*
* For example, docs with many unique single-occurrence
* short terms will use up the Postings RAM and hardly any
* of the other two. Whereas docs with very large terms
* will use alot of char blocks RAM and relatively less of
* the other two. This method just frees allocations from
* the pools once we are over-budget, which balances the
* pools to match the current docs. */
internal void BalanceRAM()
{
// We flush when we've used our target usage
long flushTrigger = ramBufferSize;
long deletesRAMUsed = deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
if (numBytesAlloc + deletesRAMUsed > freeTrigger)
{
if (infoStream != null)
Message(
" RAM: now balance allocations: usedMB=" + ToMB(numBytesUsed) +
" vs trigger=" + ToMB(flushTrigger) +
" allocMB=" + ToMB(numBytesAlloc) +
" deletesMB=" + ToMB(deletesRAMUsed) +
" vs trigger=" + ToMB(freeTrigger) +
" byteBlockFree=" + ToMB(byteBlockAllocator.freeByteBlocks.Count * BYTE_BLOCK_SIZE) +
" perDocFree=" + ToMB(perDocAllocator.freeByteBlocks.Count * PER_DOC_BLOCK_SIZE) +
" charBlockFree=" + ToMB(freeCharBlocks.Count * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE));
long startBytesAlloc = numBytesAlloc + deletesRAMUsed;
int iter = 0;
// We free equally from each pool in 32 KB
// chunks until we are below our threshold
// (freeLevel)
bool any = true;
while (numBytesAlloc + deletesRAMUsed > freeLevel)
{
lock (this)
{
if (0 == perDocAllocator.freeByteBlocks.Count
&& 0 == byteBlockAllocator.freeByteBlocks.Count
&& 0 == freeCharBlocks.Count
&& 0 == freeIntBlocks.Count
&& !any)
{
// Nothing else to free -- must flush now.
bufferIsFull = numBytesUsed + deletesRAMUsed > flushTrigger;
if (infoStream != null)
{
if (bufferIsFull)
Message(" nothing to free; now set bufferIsFull");
else
Message(" nothing to free");
}
System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
break;
}
if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.Count > 0)
{
byteBlockAllocator.freeByteBlocks.RemoveAt(byteBlockAllocator.freeByteBlocks.Count - 1);
numBytesAlloc -= BYTE_BLOCK_SIZE;
}
if ((1 == iter % 5) && freeCharBlocks.Count > 0)
{
freeCharBlocks.RemoveAt(freeCharBlocks.Count - 1);
numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
}
if ((2 == iter % 5) && freeIntBlocks.Count > 0)
{
freeIntBlocks.RemoveAt(freeIntBlocks.Count - 1);
numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
}
if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.Count > 0)
{
// Remove upwards of 32 blocks (each block is 1K)
for (int i = 0; i < 32; ++i)
{
perDocAllocator.freeByteBlocks.RemoveAt(perDocAllocator.freeByteBlocks.Count - 1);
numBytesAlloc -= PER_DOC_BLOCK_SIZE;
if (perDocAllocator.freeByteBlocks.Count == 0)
{
break;
}
}
}
}
if ((4 == iter % 5) && any)
// Ask consumer to free any recycled state
any = consumer.FreeRAM();
iter++;
}
if (infoStream != null)
Message(System.String.Format(nf, " after free: freedMB={0:f} usedMB={1:f} allocMB={2:f}",
new System.Object[] { ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0), (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0) }));
}
else
{
// If we have not crossed the 100% mark, but have
// crossed the 95% mark of RAM we are actually
// using, go ahead and flush. This prevents
// over-allocating and then freeing, with every
// flush.
lock (this)
{
if (numBytesUsed + deletesRAMUsed > flushTrigger)
{
if (infoStream != null)
Message(System.String.Format(nf, " RAM: now flush @ usedMB={0:f} allocMB={1:f} triggerMB={2:f}",
new object[] { (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0), (flushTrigger / 1024.0 / 1024.0) }));
bufferIsFull = true;
}
}
}
}
internal WaitQueue waitQueue;
internal class WaitQueue
{
private void InitBlock(DocumentsWriter enclosingInstance)
{
this.enclosingInstance = enclosingInstance;
}
private DocumentsWriter enclosingInstance;
public DocumentsWriter Enclosing_Instance
{
get
{
return enclosingInstance;
}
}
internal DocWriter[] waiting;
internal int nextWriteDocID;
internal int nextWriteLoc;
internal int numWaiting;
internal long waitingBytes;
public WaitQueue(DocumentsWriter enclosingInstance)
{
InitBlock(enclosingInstance);
waiting = new DocWriter[10];
}
internal void Reset()
{
lock (this)
{
// NOTE: nextWriteLoc doesn't need to be reset
System.Diagnostics.Debug.Assert(numWaiting == 0);
System.Diagnostics.Debug.Assert(waitingBytes == 0);
nextWriteDocID = 0;
}
}
internal bool DoResume()
{
lock (this)
{
return waitingBytes <= Enclosing_Instance.waitQueueResumeBytes;
}
}
internal bool DoPause()
{
lock (this)
{
return waitingBytes > Enclosing_Instance.waitQueuePauseBytes;
}
}
internal void Abort()
{
lock (this)
{
int count = 0;
for (int i = 0; i < waiting.Length; i++)
{
DocWriter doc = waiting[i];
if (doc != null)
{
doc.Abort();
waiting[i] = null;
count++;
}
}
waitingBytes = 0;
System.Diagnostics.Debug.Assert(count == numWaiting);
numWaiting = 0;
}
}
private void WriteDocument(DocWriter doc)
{
System.Diagnostics.Debug.Assert(doc == Enclosing_Instance.skipDocWriter || nextWriteDocID == doc.docID);
bool success = false;
try
{
doc.Finish();
nextWriteDocID++;
Enclosing_Instance.numDocsInStore++;
nextWriteLoc++;
System.Diagnostics.Debug.Assert(nextWriteLoc <= waiting.Length);
if (nextWriteLoc == waiting.Length)
nextWriteLoc = 0;
success = true;
}
finally
{
if (!success)
Enclosing_Instance.SetAborting();
}
}
public bool Add(DocWriter doc)
{
lock (this)
{
System.Diagnostics.Debug.Assert(doc.docID >= nextWriteDocID);
if (doc.docID == nextWriteDocID)
{
WriteDocument(doc);
while (true)
{
doc = waiting[nextWriteLoc];
if (doc != null)
{
numWaiting--;
waiting[nextWriteLoc] = null;
waitingBytes -= doc.SizeInBytes();
WriteDocument(doc);
}
else
break;
}
}
else
{
// I finished before documents that were added
// before me. This can easily happen when I am a
// small doc and the docs before me were large, or,
// just due to luck in the thread scheduling. Just
// add myself to the queue and when that large doc
// finishes, it will flush me:
int gap = doc.docID - nextWriteDocID;
if (gap >= waiting.Length)
{
// Grow queue
DocWriter[] newArray = new DocWriter[ArrayUtil.GetNextSize(gap)];
System.Diagnostics.Debug.Assert(nextWriteLoc >= 0);
Array.Copy(waiting, nextWriteLoc, newArray, 0, waiting.Length - nextWriteLoc);
Array.Copy(waiting, 0, newArray, waiting.Length - nextWriteLoc, nextWriteLoc);
nextWriteLoc = 0;
waiting = newArray;
gap = doc.docID - nextWriteDocID;
}
int loc = nextWriteLoc + gap;
if (loc >= waiting.Length)
loc -= waiting.Length;
// We should only wrap one time
System.Diagnostics.Debug.Assert(loc < waiting.Length);
// Nobody should be in my spot!
System.Diagnostics.Debug.Assert(waiting [loc] == null);
waiting[loc] = doc;
numWaiting++;
waitingBytes += doc.SizeInBytes();
}
return DoPause();
}
}
}
static DocumentsWriter()
{
DefaultIndexingChain = new AnonymousClassIndexingChain();
POINTER_NUM_BYTE = Constants.JRE_IS_64BIT?8:4;
}
public static int BYTE_BLOCK_SIZE_ForNUnit
{
get { return BYTE_BLOCK_SIZE; }
}
public static int CHAR_BLOCK_SIZE_ForNUnit
{
get { return CHAR_BLOCK_SIZE; }
}
}
}