1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "DocumentsWriter.h"
9 #include "DocumentsWriterThreadState.h"
10 #include "LuceneThread.h"
11 #include "IndexWriter.h"
12 #include "_IndexWriter.h"
13 #include "IndexReader.h"
14 #include "IndexSearcher.h"
15 #include "DocFieldProcessor.h"
16 #include "Term.h"
17 #include "TermDocs.h"
18 #include "TermVectorsTermsWriter.h"
19 #include "FreqProxTermsWriter.h"
20 #include "TermsHashConsumer.h"
21 #include "InvertedDocConsumer.h"
22 #include "TermsHash.h"
23 #include "DocInverter.h"
24 #include "NormsWriter.h"
25 #include "BufferedDeletes.h"
26 #include "FieldInfos.h"
27 #include "InfoStream.h"
28 #include "DocConsumerPerThread.h"
29 #include "SegmentWriteState.h"
30 #include "IndexFileNames.h"
31 #include "CompoundFileWriter.h"
32 #include "MergeDocIDRemapper.h"
33 #include "SegmentReader.h"
34 #include "SegmentInfos.h"
35 #include "SegmentInfo.h"
36 #include "Query.h"
37 #include "Weight.h"
38 #include "Scorer.h"
39 #include "TestPoint.h"
40 #include "MiscUtils.h"
41 #include "StringUtils.h"
42 
43 namespace Lucene {
44 
45 /// Max # ThreadState instances; if there are more threads than this they share ThreadStates
46 const int32_t DocumentsWriter::MAX_THREAD_STATE = 5;
47 
48 /// Coarse estimates used to measure RAM usage of buffered deletes
49 const int32_t DocumentsWriter::OBJECT_HEADER_BYTES = 8;
50 #ifdef LPP_BUILD_64
51 const int32_t DocumentsWriter::POINTER_NUM_BYTE = 8;
52 #else
53 const int32_t DocumentsWriter::POINTER_NUM_BYTE = 4;
54 #endif
55 const int32_t DocumentsWriter::INT_NUM_BYTE = 4;
56 #ifdef LPP_UNICODE_CHAR_SIZE_4
57 const int32_t DocumentsWriter::CHAR_NUM_BYTE = 4;
58 #else
59 const int32_t DocumentsWriter::CHAR_NUM_BYTE = 2;
60 #endif
61 
62 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER).  Entry is object
63 /// with Term key, BufferedDeletes.Num val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).  Term is
64 /// object with String field and String text (OBJ_HEADER + 2*POINTER).  We don't count Term's field since
65 /// it's interned.  Term's text is String (OBJ_HEADER + 4*INT + POINTER + OBJ_HEADER + string.length*CHAR).
66 /// BufferedDeletes.num is OBJ_HEADER + INT.
67 const int32_t DocumentsWriter::BYTES_PER_DEL_TERM = 8 * DocumentsWriter::POINTER_NUM_BYTE + 5 *
68         DocumentsWriter::OBJECT_HEADER_BYTES + 6 *
69         DocumentsWriter::INT_NUM_BYTE;
70 
71 /// Rough logic: del docIDs are List<Integer>.  Say list allocates ~2X size (2*POINTER).  Integer is
72 /// OBJ_HEADER + int
73 const int32_t DocumentsWriter::BYTES_PER_DEL_DOCID = 2 * DocumentsWriter::POINTER_NUM_BYTE +
74         DocumentsWriter::OBJECT_HEADER_BYTES +
75         DocumentsWriter::INT_NUM_BYTE;
76 
77 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER).  Entry is object
78 /// with Query key, Integer val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).  Query we often undercount
79 /// (say 24 bytes).  Integer is OBJ_HEADER + INT.
80 const int32_t DocumentsWriter::BYTES_PER_DEL_QUERY = 5 * DocumentsWriter::POINTER_NUM_BYTE + 2 *
81         DocumentsWriter::OBJECT_HEADER_BYTES + 2 *
82         DocumentsWriter::INT_NUM_BYTE + 24;
83 
84 /// Initial chunks size of the shared byte[] blocks used to store postings data
85 const int32_t DocumentsWriter::BYTE_BLOCK_SHIFT = 15;
86 const int32_t DocumentsWriter::BYTE_BLOCK_SIZE = 1 << DocumentsWriter::BYTE_BLOCK_SHIFT;
87 const int32_t DocumentsWriter::BYTE_BLOCK_MASK = DocumentsWriter::BYTE_BLOCK_SIZE - 1;
88 const int32_t DocumentsWriter::BYTE_BLOCK_NOT_MASK = ~DocumentsWriter::BYTE_BLOCK_MASK;
89 
90 /// Initial chunk size of the shared char[] blocks used to store term text
91 const int32_t DocumentsWriter::CHAR_BLOCK_SHIFT = 14;
92 const int32_t DocumentsWriter::CHAR_BLOCK_SIZE = 1 << DocumentsWriter::CHAR_BLOCK_SHIFT;
93 const int32_t DocumentsWriter::CHAR_BLOCK_MASK = DocumentsWriter::CHAR_BLOCK_SIZE - 1;
94 
95 const int32_t DocumentsWriter::MAX_TERM_LENGTH = DocumentsWriter::CHAR_BLOCK_SIZE - 1;
96 
97 /// Initial chunks size of the shared int[] blocks used to store postings data
98 const int32_t DocumentsWriter::INT_BLOCK_SHIFT = 13;
99 const int32_t DocumentsWriter::INT_BLOCK_SIZE = 1 << DocumentsWriter::INT_BLOCK_SHIFT;
100 const int32_t DocumentsWriter::INT_BLOCK_MASK = DocumentsWriter::INT_BLOCK_SIZE - 1;
101 
102 const int32_t DocumentsWriter::PER_DOC_BLOCK_SIZE = 1024;
103 
DocumentsWriter(const DirectoryPtr & directory,const IndexWriterPtr & writer,const IndexingChainPtr & indexingChain)104 DocumentsWriter::DocumentsWriter(const DirectoryPtr& directory,  const IndexWriterPtr& writer, const IndexingChainPtr& indexingChain) {
105     this->threadStates = Collection<DocumentsWriterThreadStatePtr>::newInstance();
106     this->threadBindings = MapThreadDocumentsWriterThreadState::newInstance();
107     this->_openFiles = HashSet<String>::newInstance();
108     this->_closedFiles = HashSet<String>::newInstance();
109     this->freeIntBlocks = Collection<IntArray>::newInstance();
110     this->freeCharBlocks = Collection<CharArray>::newInstance();
111 
112     this->directory = directory;
113     this->_writer = writer;
114     this->indexingChain = indexingChain;
115 }
116 
~DocumentsWriter()117 DocumentsWriter::~DocumentsWriter() {
118 }
119 
initialize()120 void DocumentsWriter::initialize() {
121     docStoreOffset = 0;
122     nextDocID = 0;
123     numDocsInRAM = 0;
124     numDocsInStore = 0;
125     pauseThreads = 0;
126     flushPending = false;
127     bufferIsFull = false;
128     aborting = false;
129     maxFieldLength = IndexWriter::DEFAULT_MAX_FIELD_LENGTH;
130     deletesInRAM = newLucene<BufferedDeletes>(false);
131     deletesFlushed = newLucene<BufferedDeletes>(true);
132     maxBufferedDeleteTerms = IndexWriter::DEFAULT_MAX_BUFFERED_DELETE_TERMS;
133     ramBufferSize = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024);
134     waitQueuePauseBytes = (int64_t)((double)ramBufferSize * 0.1);
135     waitQueueResumeBytes = (int64_t)((double)ramBufferSize * 0.05);
136     freeTrigger = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024.0 * 1024.0 * 1.05);
137     freeLevel = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024.0 * 1024.0 * 0.95);
138     maxBufferedDocs = IndexWriter::DEFAULT_MAX_BUFFERED_DOCS;
139     flushedDocCount = 0;
140     closed = false;
141     waitQueue = newLucene<WaitQueue>(shared_from_this());
142     skipDocWriter = newLucene<SkipDocWriter>();
143     numBytesAlloc = 0;
144     numBytesUsed = 0;
145     byteBlockAllocator = newLucene<ByteBlockAllocator>(shared_from_this(), BYTE_BLOCK_SIZE);
146     perDocAllocator = newLucene<ByteBlockAllocator>(shared_from_this(), PER_DOC_BLOCK_SIZE);
147 
148     IndexWriterPtr writer(_writer);
149     this->similarity = writer->getSimilarity();
150     flushedDocCount = writer->maxDoc();
151 
152     consumer = indexingChain->getChain(shared_from_this());
153     docFieldProcessor = boost::dynamic_pointer_cast<DocFieldProcessor>(consumer);
154 }
155 
newPerDocBuffer()156 PerDocBufferPtr DocumentsWriter::newPerDocBuffer() {
157     return newLucene<PerDocBuffer>(shared_from_this());
158 }
159 
getDefaultIndexingChain()160 IndexingChainPtr DocumentsWriter::getDefaultIndexingChain() {
161     static DefaultIndexingChainPtr defaultIndexingChain;
162     if (!defaultIndexingChain) {
163         defaultIndexingChain = newLucene<DefaultIndexingChain>();
164         CycleCheck::addStatic(defaultIndexingChain);
165     }
166     return defaultIndexingChain;
167 }
168 
updateFlushedDocCount(int32_t n)169 void DocumentsWriter::updateFlushedDocCount(int32_t n) {
170     SyncLock syncLock(this);
171     flushedDocCount += n;
172 }
173 
getFlushedDocCount()174 int32_t DocumentsWriter::getFlushedDocCount() {
175     SyncLock syncLock(this);
176     return flushedDocCount;
177 }
178 
setFlushedDocCount(int32_t n)179 void DocumentsWriter::setFlushedDocCount(int32_t n) {
180     SyncLock syncLock(this);
181     flushedDocCount = n;
182 }
183 
hasProx()184 bool DocumentsWriter::hasProx() {
185     return docFieldProcessor ? docFieldProcessor->fieldInfos->hasProx() : true;
186 }
187 
setInfoStream(const InfoStreamPtr & infoStream)188 void DocumentsWriter::setInfoStream(const InfoStreamPtr& infoStream) {
189     SyncLock syncLock(this);
190     this->infoStream = infoStream;
191     for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
192         (*threadState)->docState->infoStream = infoStream;
193     }
194 }
195 
setMaxFieldLength(int32_t maxFieldLength)196 void DocumentsWriter::setMaxFieldLength(int32_t maxFieldLength) {
197     SyncLock syncLock(this);
198     this->maxFieldLength = maxFieldLength;
199     for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
200         (*threadState)->docState->maxFieldLength = maxFieldLength;
201     }
202 }
203 
setSimilarity(const SimilarityPtr & similarity)204 void DocumentsWriter::setSimilarity(const SimilarityPtr& similarity) {
205     SyncLock syncLock(this);
206     this->similarity = similarity;
207     for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
208         (*threadState)->docState->similarity = similarity;
209     }
210 }
211 
setRAMBufferSizeMB(double mb)212 void DocumentsWriter::setRAMBufferSizeMB(double mb) {
213     SyncLock syncLock(this);
214     if (mb == IndexWriter::DISABLE_AUTO_FLUSH) {
215         ramBufferSize = IndexWriter::DISABLE_AUTO_FLUSH;
216         waitQueuePauseBytes = 4 * 1024 * 1024;
217         waitQueueResumeBytes = 2 * 1024 * 1024;
218     } else {
219         ramBufferSize = (int64_t)(mb * 1024.0 * 1024.0);
220         waitQueuePauseBytes = (int64_t)((double)ramBufferSize * 0.1);
221         waitQueueResumeBytes = (int64_t)((double)ramBufferSize * 0.05);
222         freeTrigger = (int64_t)(1.05 * (double)ramBufferSize);
223         freeLevel = (int64_t)(0.95 * (double)ramBufferSize);
224     }
225 }
226 
getRAMBufferSizeMB()227 double DocumentsWriter::getRAMBufferSizeMB() {
228     SyncLock syncLock(this);
229     if (ramBufferSize == IndexWriter::DISABLE_AUTO_FLUSH) {
230         return (double)ramBufferSize;
231     } else {
232         return (double)ramBufferSize / 1024.0 / 1024.0;
233     }
234 }
235 
setMaxBufferedDocs(int32_t count)236 void DocumentsWriter::setMaxBufferedDocs(int32_t count) {
237     maxBufferedDocs = count;
238 }
239 
getMaxBufferedDocs()240 int32_t DocumentsWriter::getMaxBufferedDocs() {
241     return maxBufferedDocs;
242 }
243 
getSegment()244 String DocumentsWriter::getSegment() {
245     return segment;
246 }
247 
getNumDocsInRAM()248 int32_t DocumentsWriter::getNumDocsInRAM() {
249     return numDocsInRAM;
250 }
251 
getDocStoreSegment()252 String DocumentsWriter::getDocStoreSegment() {
253     SyncLock syncLock(this);
254     return docStoreSegment;
255 }
256 
getDocStoreOffset()257 int32_t DocumentsWriter::getDocStoreOffset() {
258     return docStoreOffset;
259 }
260 
closeDocStore()261 String DocumentsWriter::closeDocStore() {
262     TestScope testScope(L"DocumentsWriter", L"closeDocStore");
263     SyncLock syncLock(this);
264     BOOST_ASSERT(allThreadsIdle());
265 
266     if (infoStream) {
267         message(L"closeDocStore: " + StringUtils::toString(_openFiles.size()) + L" files to flush to segment " +
268                 docStoreSegment + L" numDocs=" + StringUtils::toString(numDocsInStore));
269     }
270 
271     bool success = false;
272     LuceneException finally;
273     String s;
274     try {
275         initFlushState(true);
276         _closedFiles.clear();
277 
278         consumer->closeDocStore(flushState);
279         BOOST_ASSERT(_openFiles.empty());
280 
281         s = docStoreSegment;
282         docStoreSegment.clear();
283         docStoreOffset = 0;
284         numDocsInStore = 0;
285         success = true;
286     } catch (LuceneException& e) {
287         finally = e;
288     }
289     if (!success) {
290         abort();
291     }
292     finally.throwException();
293     return s;
294 }
295 
abortedFiles()296 HashSet<String> DocumentsWriter::abortedFiles() {
297     return _abortedFiles;
298 }
299 
message(const String & message)300 void DocumentsWriter::message(const String& message) {
301     if (infoStream) {
302         *infoStream << L"DW " << message << L"\n";
303     }
304 }
305 
openFiles()306 HashSet<String> DocumentsWriter::openFiles() {
307     SyncLock syncLock(this);
308     return HashSet<String>::newInstance(_openFiles.begin(), _openFiles.end());
309 }
310 
closedFiles()311 HashSet<String> DocumentsWriter::closedFiles() {
312     SyncLock syncLock(this);
313     return HashSet<String>::newInstance(_closedFiles.begin(), _closedFiles.end());
314 }
315 
addOpenFile(const String & name)316 void DocumentsWriter::addOpenFile(const String& name) {
317     SyncLock syncLock(this);
318     BOOST_ASSERT(!_openFiles.contains(name));
319     _openFiles.add(name);
320 }
321 
removeOpenFile(const String & name)322 void DocumentsWriter::removeOpenFile(const String& name) {
323     SyncLock syncLock(this);
324     BOOST_ASSERT(_openFiles.contains(name));
325     _openFiles.remove(name);
326     _closedFiles.add(name);
327 }
328 
setAborting()329 void DocumentsWriter::setAborting() {
330     SyncLock syncLock(this);
331     aborting = true;
332 }
333 
abort()334 void DocumentsWriter::abort() {
335     TestScope testScope(L"DocumentsWriter", L"abort");
336     SyncLock syncLock(this);
337     LuceneException finally;
338     try {
339         if (infoStream) {
340             message(L"docWriter: now abort");
341         }
342 
343         // Forcefully remove waiting ThreadStates from line
344         waitQueue->abort();
345 
346         // Wait for all other threads to finish with DocumentsWriter
347         pauseAllThreads();
348 
349         try {
350             BOOST_ASSERT(waitQueue->numWaiting == 0);
351 
352             waitQueue->waitingBytes = 0;
353 
354             try {
355                 _abortedFiles = openFiles();
356             } catch (...) {
357                 _abortedFiles.reset();
358             }
359 
360             deletesInRAM->clear();
361             deletesFlushed->clear();
362             _openFiles.clear();
363 
364             for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
365                 try {
366                     (*threadState)->consumer->abort();
367                 } catch (...) {
368                 }
369             }
370 
371             try {
372                 consumer->abort();
373             } catch (...) {
374             }
375 
376             docStoreSegment.clear();
377             numDocsInStore = 0;
378             docStoreOffset = 0;
379 
380             // Reset all postings data
381             doAfterFlush();
382         } catch (LuceneException& e) {
383             finally = e;
384         }
385         resumeAllThreads();
386     } catch (LuceneException& e) {
387         if (finally.isNull()) {
388             finally = e;
389         }
390     }
391     aborting = false;
392     notifyAll();
393     if (infoStream) {
394         message(L"docWriter: done abort");
395     }
396     finally.throwException();
397 }
398 
doAfterFlush()399 void DocumentsWriter::doAfterFlush() {
400     // All ThreadStates should be idle when we are called
401     BOOST_ASSERT(allThreadsIdle());
402     threadBindings.clear();
403     waitQueue->reset();
404     segment.clear();
405     numDocsInRAM = 0;
406     nextDocID = 0;
407     bufferIsFull = false;
408     flushPending = false;
409     for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
410         (*threadState)->doAfterFlush();
411     }
412     numBytesUsed = 0;
413 }
414 
pauseAllThreads()415 bool DocumentsWriter::pauseAllThreads() {
416     SyncLock syncLock(this);
417     ++pauseThreads;
418     while (!allThreadsIdle()) {
419         wait(1000);
420     }
421     return aborting;
422 }
423 
resumeAllThreads()424 void DocumentsWriter::resumeAllThreads() {
425     SyncLock syncLock(this);
426     --pauseThreads;
427     BOOST_ASSERT(pauseThreads >= 0);
428     if (pauseThreads == 0) {
429         notifyAll();
430     }
431 }
432 
allThreadsIdle()433 bool DocumentsWriter::allThreadsIdle() {
434     SyncLock syncLock(this);
435     for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
436         if (!(*threadState)->isIdle) {
437             return false;
438         }
439     }
440     return true;
441 }
442 
anyChanges()443 bool DocumentsWriter::anyChanges() {
444     SyncLock syncLock(this);
445     return (numDocsInRAM != 0 || deletesInRAM->numTerms != 0 || !deletesInRAM->docIDs.empty() || !deletesInRAM->queries.empty());
446 }
447 
initFlushState(bool onlyDocStore)448 void DocumentsWriter::initFlushState(bool onlyDocStore) {
449     SyncLock syncLock(this);
450     initSegmentName(onlyDocStore);
451     flushState = newLucene<SegmentWriteState>(shared_from_this(), directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, IndexWriterPtr(_writer)->getTermIndexInterval());
452 }
453 
flush(bool _closeDocStore)454 int32_t DocumentsWriter::flush(bool _closeDocStore) {
455     SyncLock syncLock(this);
456     BOOST_ASSERT(allThreadsIdle());
457 
458     BOOST_ASSERT(numDocsInRAM > 0);
459 
460     BOOST_ASSERT(nextDocID == numDocsInRAM);
461     BOOST_ASSERT(waitQueue->numWaiting == 0);
462     BOOST_ASSERT(waitQueue->waitingBytes == 0);
463 
464     initFlushState(false);
465 
466     docStoreOffset = numDocsInStore;
467 
468     if (infoStream) {
469         message(L"flush postings as segment " + flushState->segmentName + L" numDocs=" + StringUtils::toString(numDocsInRAM));
470     }
471 
472     bool success = false;
473     LuceneException finally;
474 
475     try {
476         if (_closeDocStore) {
477             BOOST_ASSERT(!flushState->docStoreSegmentName.empty());
478             BOOST_ASSERT(flushState->docStoreSegmentName == flushState->segmentName);
479 
480             closeDocStore();
481             flushState->numDocsInStore = 0;
482         }
483 
484         Collection<DocConsumerPerThreadPtr> threads(Collection<DocConsumerPerThreadPtr>::newInstance());
485         for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
486             threads.add((*threadState)->consumer);
487         }
488         consumer->flush(threads, flushState);
489 
490         if (infoStream) {
491             SegmentInfoPtr si(newLucene<SegmentInfo>(flushState->segmentName, flushState->numDocs, directory));
492             int64_t newSegmentSize = si->sizeInBytes();
493             if (infoStream) {
494                 message(L"  oldRAMSize=" + StringUtils::toString(numBytesUsed) + L" newFlushedSize=" +
495                         StringUtils::toString(newSegmentSize) + L" docs/MB=" +
496                         StringUtils::toString((double)numDocsInRAM / ((double)newSegmentSize / 1024.0 / 1024.0)) +
497                         L" new/old=" + StringUtils::toString(100.0 * (double)newSegmentSize / (double)numBytesUsed) + L"%");
498             }
499         }
500 
501         flushedDocCount += flushState->numDocs;
502 
503         doAfterFlush();
504 
505         success = true;
506     } catch (LuceneException& e) {
507         finally = e;
508     }
509     if (!success) {
510         abort();
511     }
512     finally.throwException();
513 
514     BOOST_ASSERT(waitQueue->waitingBytes == 0);
515 
516     return flushState->numDocs;
517 }
518 
getFlushedFiles()519 HashSet<String> DocumentsWriter::getFlushedFiles() {
520     return flushState->flushedFiles;
521 }
522 
createCompoundFile(const String & segment)523 void DocumentsWriter::createCompoundFile(const String& segment) {
524     CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, segment + L"." + IndexFileNames::COMPOUND_FILE_EXTENSION()));
525     for (HashSet<String>::iterator flushedFile = flushState->flushedFiles.begin(); flushedFile != flushState->flushedFiles.end(); ++flushedFile) {
526         cfsWriter->addFile(*flushedFile);
527     }
528 
529     // Perform the merge
530     cfsWriter->close();
531 }
532 
setFlushPending()533 bool DocumentsWriter::setFlushPending() {
534     SyncLock syncLock(this);
535     if (flushPending) {
536         return false;
537     } else {
538         flushPending = true;
539         return true;
540     }
541 }
542 
clearFlushPending()543 void DocumentsWriter::clearFlushPending() {
544     SyncLock syncLock(this);
545     flushPending = false;
546 }
547 
pushDeletes()548 void DocumentsWriter::pushDeletes() {
549     SyncLock syncLock(this);
550     deletesFlushed->update(deletesInRAM);
551 }
552 
close()553 void DocumentsWriter::close() {
554     SyncLock syncLock(this);
555     closed = true;
556     notifyAll();
557 }
558 
initSegmentName(bool onlyDocStore)559 void DocumentsWriter::initSegmentName(bool onlyDocStore) {
560     SyncLock syncLock(this);
561     if (segment.empty() && (!onlyDocStore || docStoreSegment.empty())) {
562         segment = IndexWriterPtr(_writer)->newSegmentName();
563         BOOST_ASSERT(numDocsInRAM == 0);
564     }
565     if (docStoreSegment.empty()) {
566         docStoreSegment = segment;
567         BOOST_ASSERT(numDocsInStore == 0);
568     }
569 }
570 
getThreadState(const DocumentPtr & doc,const TermPtr & delTerm)571 DocumentsWriterThreadStatePtr DocumentsWriter::getThreadState(const DocumentPtr& doc, const TermPtr& delTerm) {
572     SyncLock syncLock(this);
573     // First, find a thread state.  If this thread already has affinity to a specific ThreadState, use that one again.
574     DocumentsWriterThreadStatePtr state(threadBindings.get(LuceneThread::currentId()));
575     if (!state) {
576         // First time this thread has called us since last flush.  Find the least loaded thread state
577         DocumentsWriterThreadStatePtr minThreadState;
578         for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
579             if (!minThreadState || (*threadState)->numThreads < minThreadState->numThreads) {
580                 minThreadState = *threadState;
581             }
582         }
583         if (minThreadState && (minThreadState->numThreads == 0 || threadStates.size() >= MAX_THREAD_STATE)) {
584             state = minThreadState;
585             ++state->numThreads;
586         } else {
587             // Just create a new "private" thread state
588             threadStates.resize(threadStates.size() + 1);
589             state = newLucene<DocumentsWriterThreadState>(shared_from_this());
590             threadStates[threadStates.size() - 1] = state;
591         }
592         threadBindings.put(LuceneThread::currentId(), state);
593     }
594 
595     // Next, wait until my thread state is idle (in case it's shared with other threads) and for threads to
596     // not be paused nor a flush pending
597     waitReady(state);
598 
599     // Allocate segment name if this is the first doc since last flush
600     initSegmentName(false);
601 
602     state->isIdle = false;
603 
604     bool success = false;
605     LuceneException finally;
606     try {
607         state->docState->docID = nextDocID;
608 
609         BOOST_ASSERT(IndexWriterPtr(_writer)->testPoint(L"DocumentsWriter.ThreadState.init start"));
610 
611         if (delTerm) {
612             addDeleteTerm(delTerm, state->docState->docID);
613             state->doFlushAfter = timeToFlushDeletes();
614         }
615 
616         BOOST_ASSERT(IndexWriterPtr(_writer)->testPoint(L"DocumentsWriter.ThreadState.init after delTerm"));
617 
618         ++nextDocID;
619         ++numDocsInRAM;
620 
621         // We must at this point commit to flushing to ensure we always get N docs when we flush by doc
622         // count, even if > 1 thread is adding documents
623         if (!flushPending && maxBufferedDocs != IndexWriter::DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs) {
624             flushPending = true;
625             state->doFlushAfter = true;
626         }
627 
628         success = true;
629     } catch (LuceneException& e) {
630         finally = e;
631     }
632     if (!success) {
633         // Forcefully idle this ThreadState
634         state->isIdle = true;
635         notifyAll();
636         if (state->doFlushAfter) {
637             state->doFlushAfter = false;
638             flushPending = false;
639         }
640     }
641     finally.throwException();
642 
643     return state;
644 }
645 
addDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer)646 bool DocumentsWriter::addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
647     return updateDocument(doc, analyzer, TermPtr());
648 }
649 
updateDocument(const TermPtr & t,const DocumentPtr & doc,const AnalyzerPtr & analyzer)650 bool DocumentsWriter::updateDocument(const TermPtr& t, const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
651     return updateDocument(doc, analyzer, t);
652 }
653 
updateDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer,const TermPtr & delTerm)654 bool DocumentsWriter::updateDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer, const TermPtr& delTerm) {
655     // This call is synchronized but fast
656     DocumentsWriterThreadStatePtr state(getThreadState(doc, delTerm));
657 
658     DocStatePtr docState(state->docState);
659     docState->doc = doc;
660     docState->analyzer = analyzer;
661 
662     bool success = false;
663     LuceneException finally;
664     try {
665         // This call is not synchronized and does all the work
666         DocWriterPtr perDoc;
667         try {
668             perDoc = state->consumer->processDocument();
669         } catch (LuceneException& e) {
670             finally = e;
671         }
672         docState->clear();
673         finally.throwException();
674 
675         // This call is synchronized but fast
676         finishDocument(state, perDoc);
677 
678         success = true;
679     } catch (LuceneException& e) {
680         finally = e;
681     }
682     if (!success) {
683         SyncLock syncLock(this);
684         if (aborting) {
685             state->isIdle = true;
686             notifyAll();
687             abort();
688         } else {
689             skipDocWriter->docID = docState->docID;
690             bool success2 = false;
691             try {
692                 waitQueue->add(skipDocWriter);
693                 success2 = true;
694             } catch (LuceneException& e) {
695                 finally = e;
696             }
697             if (!success2) {
698                 state->isIdle = true;
699                 notifyAll();
700                 abort();
701                 return false;
702             }
703 
704             state->isIdle = true;
705             notifyAll();
706 
707             // If this thread state had decided to flush, we must clear it so another thread can flush
708             if (state->doFlushAfter) {
709                 state->doFlushAfter = false;
710                 flushPending = false;
711                 notifyAll();
712             }
713 
714             // Immediately mark this document as deleted since likely it was partially added.  This keeps
715             // indexing as "all or none" (atomic) when adding a document
716             addDeleteDocID(state->docState->docID);
717         }
718     }
719 
720     finally.throwException();
721 
722     return (state->doFlushAfter || timeToFlushDeletes());
723 }
724 
getNumBufferedDeleteTerms()725 int32_t DocumentsWriter::getNumBufferedDeleteTerms() {
726     SyncLock syncLock(this);
727     return deletesInRAM->numTerms;
728 }
729 
getBufferedDeleteTerms()730 MapTermNum DocumentsWriter::getBufferedDeleteTerms() {
731     SyncLock syncLock(this);
732     return deletesInRAM->terms;
733 }
734 
remapDeletes(const SegmentInfosPtr & infos,Collection<Collection<int32_t>> docMaps,Collection<int32_t> delCounts,const OneMergePtr & merge,int32_t mergeDocCount)735 void DocumentsWriter::remapDeletes(const SegmentInfosPtr& infos, Collection< Collection<int32_t> > docMaps, Collection<int32_t> delCounts, const OneMergePtr& merge, int32_t mergeDocCount) {
736     SyncLock syncLock(this);
737     if (!docMaps) {
738         // The merged segments had no deletes so docIDs did not change and we have nothing to do
739         return;
740     }
741     MergeDocIDRemapperPtr mapper(newLucene<MergeDocIDRemapper>(infos, docMaps, delCounts, merge, mergeDocCount));
742     deletesInRAM->remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
743     deletesFlushed->remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
744     flushedDocCount -= mapper->docShift;
745 }
746 
waitReady(const DocumentsWriterThreadStatePtr & state)747 void DocumentsWriter::waitReady(const DocumentsWriterThreadStatePtr& state) {
748     SyncLock syncLock(this);
749     while (!closed && ((state && !state->isIdle) || pauseThreads != 0 || flushPending || aborting)) {
750         wait(1000);
751     }
752     if (closed) {
753         boost::throw_exception(AlreadyClosedException(L"this IndexWriter is closed"));
754     }
755 }
756 
bufferDeleteTerms(Collection<TermPtr> terms)757 bool DocumentsWriter::bufferDeleteTerms(Collection<TermPtr> terms) {
758     SyncLock syncLock(this);
759     waitReady(DocumentsWriterThreadStatePtr());
760     for (Collection<TermPtr>::iterator term = terms.begin(); term != terms.end(); ++term) {
761         addDeleteTerm(*term, numDocsInRAM);
762     }
763     return timeToFlushDeletes();
764 }
765 
bufferDeleteTerm(const TermPtr & term)766 bool DocumentsWriter::bufferDeleteTerm(const TermPtr& term) {
767     SyncLock syncLock(this);
768     waitReady(DocumentsWriterThreadStatePtr());
769     addDeleteTerm(term, numDocsInRAM);
770     return timeToFlushDeletes();
771 }
772 
bufferDeleteQueries(Collection<QueryPtr> queries)773 bool DocumentsWriter::bufferDeleteQueries(Collection<QueryPtr> queries) {
774     SyncLock syncLock(this);
775     waitReady(DocumentsWriterThreadStatePtr());
776     for (Collection<QueryPtr>::iterator query = queries.begin(); query != queries.end(); ++query) {
777         addDeleteQuery(*query, numDocsInRAM);
778     }
779     return timeToFlushDeletes();
780 }
781 
bufferDeleteQuery(const QueryPtr & query)782 bool DocumentsWriter::bufferDeleteQuery(const QueryPtr& query) {
783     SyncLock syncLock(this);
784     waitReady(DocumentsWriterThreadStatePtr());
785     addDeleteQuery(query, numDocsInRAM);
786     return timeToFlushDeletes();
787 }
788 
deletesFull()789 bool DocumentsWriter::deletesFull() {
790     SyncLock syncLock(this);
791     return ((ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH &&
792              (deletesInRAM->bytesUsed + deletesFlushed->bytesUsed + numBytesUsed) >= ramBufferSize) ||
793             (maxBufferedDeleteTerms != IndexWriter::DISABLE_AUTO_FLUSH &&
794              ((deletesInRAM->size() + deletesFlushed->size()) >= maxBufferedDeleteTerms)));
795 }
796 
doApplyDeletes()797 bool DocumentsWriter::doApplyDeletes() {
798     SyncLock syncLock(this);
799     // Very similar to deletesFull(), except we don't count numBytesAlloc, because we are checking whether
800     // deletes (alone) are consuming too many resources now and thus should be applied.  We apply deletes
801     // if RAM usage is > 1/2 of our allowed RAM buffer, to prevent too-frequent flushing of a long tail of
802     // tiny segments when merges (which always apply deletes) are infrequent.
803     return ((ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH &&
804              (deletesInRAM->bytesUsed + deletesFlushed->bytesUsed) >= ramBufferSize / 2) ||
805             (maxBufferedDeleteTerms != IndexWriter::DISABLE_AUTO_FLUSH &&
806              ((deletesInRAM->size() + deletesFlushed->size()) >= maxBufferedDeleteTerms)));
807 }
808 
timeToFlushDeletes()809 bool DocumentsWriter::timeToFlushDeletes() {
810     SyncLock syncLock(this);
811     return ((bufferIsFull || deletesFull()) && setFlushPending());
812 }
813 
checkDeleteTerm(const TermPtr & term)814 bool DocumentsWriter::checkDeleteTerm(const TermPtr& term) {
815     if (term) {
816         BOOST_ASSERT(!lastDeleteTerm || term->compareTo(lastDeleteTerm) > 0);
817     }
818     lastDeleteTerm = term;
819     return true;
820 }
821 
setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms)822 void DocumentsWriter::setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms) {
823     this->maxBufferedDeleteTerms = maxBufferedDeleteTerms;
824 }
825 
getMaxBufferedDeleteTerms()826 int32_t DocumentsWriter::getMaxBufferedDeleteTerms() {
827     return maxBufferedDeleteTerms;
828 }
829 
hasDeletes()830 bool DocumentsWriter::hasDeletes() {
831     SyncLock syncLock(this);
832     return deletesFlushed->any();
833 }
834 
applyDeletes(const SegmentInfosPtr & infos)835 bool DocumentsWriter::applyDeletes(const SegmentInfosPtr& infos) {
836     SyncLock syncLock(this);
837     if (!hasDeletes()) {
838         return false;
839     }
840 
841     if (infoStream) {
842         message(L"apply " + StringUtils::toString(deletesFlushed->numTerms) + L" buffered deleted terms and " +
843                 StringUtils::toString(deletesFlushed->docIDs.size()) + L" deleted docIDs and " +
844                 StringUtils::toString(deletesFlushed->queries.size()) + L" deleted queries on " +
845                 StringUtils::toString(infos->size()) + L" segments.");
846     }
847 
848     int32_t infosEnd = infos->size();
849 
850     int32_t docStart = 0;
851     bool any = false;
852     IndexWriterPtr writer(_writer);
853 
854     for (int32_t i = 0; i < infosEnd; ++i) {
855         // Make sure we never attempt to apply deletes to segment in external dir
856         BOOST_ASSERT(infos->info(i)->dir == directory);
857 
858         SegmentReaderPtr reader(writer->readerPool->get(infos->info(i), false));
859         LuceneException finally;
860         try {
861             if (applyDeletes(reader, docStart)) {
862                 any = true;
863             }
864             docStart += reader->maxDoc();
865         } catch (LuceneException& e) {
866             finally = e;
867         }
868         writer->readerPool->release(reader);
869         finally.throwException();
870     }
871 
872     deletesFlushed->clear();
873 
874     return any;
875 }
876 
applyDeletes(const IndexReaderPtr & reader,int32_t docIDStart)877 bool DocumentsWriter::applyDeletes(const IndexReaderPtr& reader, int32_t docIDStart) {
878     SyncLock syncLock(this);
879     int32_t docEnd = docIDStart + reader->maxDoc();
880     bool any = false;
881 
882     BOOST_ASSERT(checkDeleteTerm(TermPtr()));
883 
884     // Delete by term
885     TermDocsPtr docs(reader->termDocs());
886     LuceneException finally;
887     try {
888         for (MapTermNum::iterator entry = deletesFlushed->terms.begin(); entry != deletesFlushed->terms.end(); ++entry) {
889             // we should be iterating a Map here, so terms better be in order
890             BOOST_ASSERT(checkDeleteTerm(entry->first));
891             docs->seek(entry->first);
892             int32_t limit = entry->second->getNum();
893             while (docs->next()) {
894                 int32_t docID = docs->doc();
895                 if (docIDStart + docID >= limit) {
896                     break;
897                 }
898                 reader->deleteDocument(docID);
899                 any = true;
900             }
901         }
902     } catch (LuceneException& e) {
903         finally = e;
904     }
905     docs->close();
906     finally.throwException();
907 
908     // Delete by docID
909     for (Collection<int32_t>::iterator docID = deletesFlushed->docIDs.begin(); docID != deletesFlushed->docIDs.end(); ++docID) {
910         if (*docID >= docIDStart && *docID < docEnd) {
911             reader->deleteDocument(*docID - docIDStart);
912             any = true;
913         }
914     }
915 
916     // Delete by query
917     IndexSearcherPtr searcher(newLucene<IndexSearcher>(reader));
918     for (MapQueryInt::iterator entry = deletesFlushed->queries.begin(); entry != deletesFlushed->queries.end(); ++entry) {
919         WeightPtr weight(entry->first->weight(searcher));
920         ScorerPtr scorer(weight->scorer(reader, true, false));
921         if (scorer) {
922             while (true) {
923                 int32_t doc = scorer->nextDoc();
924                 if ((int64_t)docIDStart + doc >= entry->second) {
925                     break;
926                 }
927                 reader->deleteDocument(doc);
928                 any = true;
929             }
930         }
931     }
932     searcher->close();
933     return any;
934 }
935 
addDeleteTerm(const TermPtr & term,int32_t docCount)936 void DocumentsWriter::addDeleteTerm(const TermPtr& term, int32_t docCount) {
937     SyncLock syncLock(this);
938     NumPtr num(deletesInRAM->terms.get(term));
939     int32_t docIDUpto = flushedDocCount + docCount;
940     if (!num) {
941         deletesInRAM->terms.put(term, newLucene<Num>(docIDUpto));
942     } else {
943         num->setNum(docIDUpto);
944     }
945     ++deletesInRAM->numTerms;
946 
947     deletesInRAM->addBytesUsed(BYTES_PER_DEL_TERM + term->_text.length() * CHAR_NUM_BYTE);
948 }
949 
addDeleteDocID(int32_t docID)950 void DocumentsWriter::addDeleteDocID(int32_t docID) {
951     SyncLock syncLock(this);
952     deletesInRAM->docIDs.add(flushedDocCount + docID);
953     deletesInRAM->addBytesUsed(BYTES_PER_DEL_DOCID);
954 }
955 
addDeleteQuery(const QueryPtr & query,int32_t docID)956 void DocumentsWriter::addDeleteQuery(const QueryPtr& query, int32_t docID) {
957     SyncLock syncLock(this);
958     deletesInRAM->queries.put(query, flushedDocCount + docID);
959     deletesInRAM->addBytesUsed(BYTES_PER_DEL_QUERY);
960 }
961 
doBalanceRAM()962 bool DocumentsWriter::doBalanceRAM() {
963     SyncLock syncLock(this);
964     return (ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH && !bufferIsFull &&
965             (numBytesUsed + deletesInRAM->bytesUsed + deletesFlushed->bytesUsed >= ramBufferSize ||
966              numBytesAlloc >= freeTrigger));
967 }
968 
finishDocument(const DocumentsWriterThreadStatePtr & perThread,const DocWriterPtr & docWriter)969 void DocumentsWriter::finishDocument(const DocumentsWriterThreadStatePtr& perThread, const DocWriterPtr& docWriter) {
970     if (doBalanceRAM()) {
971         // Must call this without holding synchronized(this) else we'll hit deadlock
972         balanceRAM();
973     }
974 
975     {
976         SyncLock syncLock(this);
977         BOOST_ASSERT(!docWriter || docWriter->docID == perThread->docState->docID);
978 
979         if (aborting) {
980             // We are currently aborting, and another thread is waiting for me to become idle.  We
981             // just forcefully idle this threadState; it will be fully reset by abort()
982             if (docWriter) {
983                 try {
984                     docWriter->abort();
985                 } catch (...) {
986                 }
987             }
988 
989             perThread->isIdle = true;
990             notifyAll();
991             return;
992         }
993 
994         bool doPause;
995 
996         if (docWriter) {
997             doPause = waitQueue->add(docWriter);
998         } else {
999             skipDocWriter->docID = perThread->docState->docID;
1000             doPause = waitQueue->add(skipDocWriter);
1001         }
1002 
1003         if (doPause) {
1004             waitForWaitQueue();
1005         }
1006 
1007         if (bufferIsFull && !flushPending) {
1008             flushPending = true;
1009             perThread->doFlushAfter = true;
1010         }
1011 
1012         perThread->isIdle = true;
1013         notifyAll();
1014     }
1015 }
1016 
waitForWaitQueue()1017 void DocumentsWriter::waitForWaitQueue() {
1018     SyncLock syncLock(this);
1019     do {
1020         wait(1000);
1021     } while (!waitQueue->doResume());
1022 }
1023 
getRAMUsed()1024 int64_t DocumentsWriter::getRAMUsed() {
1025     return numBytesUsed + deletesInRAM->bytesUsed + deletesFlushed->bytesUsed;
1026 }
1027 
getIntBlock(bool trackAllocations)1028 IntArray DocumentsWriter::getIntBlock(bool trackAllocations) {
1029     SyncLock syncLock(this);
1030     int32_t size = freeIntBlocks.size();
1031     IntArray b;
1032     if (size == 0) {
1033         // Always record a block allocated, even if trackAllocations is false.  This is necessary because
1034         // this block will be shared between things that don't track allocations (term vectors) and things
1035         // that do (freq/prox postings).
1036         numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE;
1037         b = IntArray::newInstance(INT_BLOCK_SIZE);
1038     } else {
1039         b = freeIntBlocks.removeLast();
1040     }
1041     if (trackAllocations) {
1042         numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE;
1043     }
1044     BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1045     return b;
1046 }
1047 
bytesAllocated(int64_t numBytes)1048 void DocumentsWriter::bytesAllocated(int64_t numBytes) {
1049     SyncLock syncLock(this);
1050     numBytesAlloc += numBytes;
1051 }
1052 
bytesUsed(int64_t numBytes)1053 void DocumentsWriter::bytesUsed(int64_t numBytes) {
1054     SyncLock syncLock(this);
1055     numBytesUsed += numBytes;
1056     BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1057 }
1058 
recycleIntBlocks(Collection<IntArray> blocks,int32_t start,int32_t end)1059 void DocumentsWriter::recycleIntBlocks(Collection<IntArray> blocks, int32_t start, int32_t end) {
1060     SyncLock syncLock(this);
1061     for (int32_t i = start; i < end; ++i) {
1062         freeIntBlocks.add(blocks[i]);
1063         blocks[i].reset();
1064     }
1065 }
1066 
getCharBlock()1067 CharArray DocumentsWriter::getCharBlock() {
1068     SyncLock syncLock(this);
1069     int32_t size = freeCharBlocks.size();
1070     CharArray c;
1071     if (size == 0) {
1072         numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1073         c = CharArray::newInstance(CHAR_BLOCK_SIZE);
1074     } else {
1075         c = freeCharBlocks.removeLast();
1076     }
1077     // We always track allocations of char blocks for now because nothing that skips allocation tracking
1078     // (currently only term vectors) uses its own char blocks.
1079     numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1080     BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1081     return c;
1082 }
1083 
recycleCharBlocks(Collection<CharArray> blocks,int32_t numBlocks)1084 void DocumentsWriter::recycleCharBlocks(Collection<CharArray> blocks, int32_t numBlocks) {
1085     SyncLock syncLock(this);
1086     for (int32_t i = 0; i < numBlocks; ++i) {
1087         freeCharBlocks.add(blocks[i]);
1088         blocks[i].reset();
1089     }
1090 }
1091 
toMB(int64_t v)1092 String DocumentsWriter::toMB(int64_t v) {
1093     return StringUtils::toString((double)v / 1024.0 / 1024.0);
1094 }
1095 
balanceRAM()1096 void DocumentsWriter::balanceRAM() {
1097     // We flush when we've used our target usage
1098     int64_t flushTrigger = ramBufferSize;
1099 
1100     int64_t deletesRAMUsed = deletesInRAM->bytesUsed + deletesFlushed->bytesUsed;
1101 
1102     if (numBytesAlloc + deletesRAMUsed > freeTrigger) {
1103         if (infoStream) {
1104             message(L"  RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +
1105                     L" vs trigger=" + toMB(flushTrigger) +
1106                     L" allocMB=" + toMB(numBytesAlloc) +
1107                     L" deletesMB=" + toMB(deletesRAMUsed) +
1108                     L" vs trigger=" + toMB(freeTrigger) +
1109                     L" byteBlockFree=" + toMB(byteBlockAllocator->freeByteBlocks.size() * BYTE_BLOCK_SIZE) +
1110                     L" perDocFree=" + toMB(perDocAllocator->freeByteBlocks.size() * PER_DOC_BLOCK_SIZE) +
1111                     L" charBlockFree=" + toMB(freeCharBlocks.size() * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE));
1112         }
1113 
1114         int64_t startBytesAlloc = numBytesAlloc + deletesRAMUsed;
1115 
1116         int32_t iter = 0;
1117 
1118         // We free equally from each pool in 32 KB chunks until we are below our threshold (freeLevel)
1119 
1120         bool any = true;
1121 
1122         while (numBytesAlloc + deletesRAMUsed > freeLevel) {
1123             {
1124                 SyncLock syncLock(this);
1125                 if (perDocAllocator->freeByteBlocks.empty() && byteBlockAllocator->freeByteBlocks.empty() &&
1126                         freeCharBlocks.empty() && freeIntBlocks.empty() && !any) {
1127                     // Nothing else to free -- must flush now.
1128                     bufferIsFull = (numBytesUsed + deletesRAMUsed > flushTrigger);
1129                     if (infoStream) {
1130                         if (bufferIsFull) {
1131                             message(L"    nothing to free; now set bufferIsFull");
1132                         } else {
1133                             message(L"    nothing to free");
1134                         }
1135                     }
1136                     BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1137                     break;
1138                 }
1139 
1140                 if ((iter % 5) == 0 && !byteBlockAllocator->freeByteBlocks.empty()) {
1141                     byteBlockAllocator->freeByteBlocks.removeLast();
1142                     numBytesAlloc -= BYTE_BLOCK_SIZE;
1143                 }
1144 
1145                 if ((iter % 5) == 1 && !freeCharBlocks.empty()) {
1146                     freeCharBlocks.removeLast();
1147                     numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1148                 }
1149 
1150                 if ((iter % 5) == 2 && !freeIntBlocks.empty()) {
1151                     freeIntBlocks.removeLast();
1152                     numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
1153                 }
1154 
1155                 if ((iter % 5) == 3 && !perDocAllocator->freeByteBlocks.empty()) {
1156                     // Remove upwards of 32 blocks (each block is 1K)
1157                     for (int32_t i = 0; i < 32; ++i) {
1158                         perDocAllocator->freeByteBlocks.removeLast();
1159                         numBytesAlloc -= PER_DOC_BLOCK_SIZE;
1160                         if (perDocAllocator->freeByteBlocks.empty()) {
1161                             break;
1162                         }
1163                     }
1164                 }
1165             }
1166 
1167             if ((iter % 5) == 4 && any) {
1168                 // Ask consumer to free any recycled state
1169                 any = consumer->freeRAM();
1170             }
1171 
1172             ++iter;
1173         }
1174 
1175         if (infoStream) {
1176             message(L"    after free: freedMB=" + StringUtils::toString((double)(startBytesAlloc - numBytesAlloc - deletesRAMUsed) / 1024.0 / 1024.0) +
1177                     L" usedMB=" + StringUtils::toString((double)(numBytesUsed + deletesRAMUsed) / 1024.0 / 1024.0) +
1178                     L" allocMB=" + StringUtils::toString((double)numBytesAlloc / 1024.0 / 1024.0));
1179         }
1180     } else {
1181         // If we have not crossed the 100% mark, but have crossed the 95% mark of RAM we are actually
1182         // using, go ahead and flush.  This prevents over-allocating and then freeing, with every flush.
1183         SyncLock syncLock(this);
1184         if (numBytesUsed + deletesRAMUsed > flushTrigger) {
1185             if (infoStream) {
1186                 message(L"  RAM: now flush @ usedMB=" + StringUtils::toString((double)numBytesUsed / 1024.0 / 1024.0) +
1187                         L" allocMB=" + StringUtils::toString((double)numBytesAlloc / 1024.0 / 1024.0) +
1188                         L" deletesMB=" + StringUtils::toString((double)deletesRAMUsed / 1024.0 / 1024.0) +
1189                         L" triggerMB=" + StringUtils::toString((double)flushTrigger / 1024.0 / 1024.0));
1190             }
1191             bufferIsFull = true;
1192         }
1193     }
1194 }
1195 
DocState()1196 DocState::DocState() {
1197     maxFieldLength = 0;
1198     docID = 0;
1199 }
1200 
~DocState()1201 DocState::~DocState() {
1202 }
1203 
testPoint(const String & name)1204 bool DocState::testPoint(const String& name) {
1205     return IndexWriterPtr(DocumentsWriterPtr(_docWriter)->_writer)->testPoint(name);
1206 }
1207 
clear()1208 void DocState::clear() {
1209     // don't hold onto doc nor analyzer, in case it is large
1210     doc.reset();
1211     analyzer.reset();
1212 }
1213 
PerDocBuffer(const DocumentsWriterPtr & docWriter)1214 PerDocBuffer::PerDocBuffer(const DocumentsWriterPtr& docWriter) {
1215     _docWriter = docWriter;
1216 }
1217 
~PerDocBuffer()1218 PerDocBuffer::~PerDocBuffer() {
1219 }
1220 
newBuffer(int32_t size)1221 ByteArray PerDocBuffer::newBuffer(int32_t size) {
1222     BOOST_ASSERT(size == DocumentsWriter::PER_DOC_BLOCK_SIZE);
1223     return DocumentsWriterPtr(_docWriter)->perDocAllocator->getByteBlock(false);
1224 }
1225 
recycle()1226 void PerDocBuffer::recycle() {
1227     SyncLock syncLock(this);
1228     if (!buffers.empty()) {
1229         setLength(0);
1230 
1231         // Recycle the blocks
1232         DocumentsWriterPtr(_docWriter)->perDocAllocator->recycleByteBlocks(buffers);
1233         buffers.clear();
1234         sizeInBytes = 0;
1235 
1236         BOOST_ASSERT(numBuffers() == 0);
1237     }
1238 }
1239 
DocWriter()1240 DocWriter::DocWriter() {
1241     docID = 0;
1242 }
1243 
~DocWriter()1244 DocWriter::~DocWriter() {
1245 }
1246 
setNext(const DocWriterPtr & next)1247 void DocWriter::setNext(const DocWriterPtr& next) {
1248     this->next = next;
1249 }
1250 
~IndexingChain()1251 IndexingChain::~IndexingChain() {
1252 }
1253 
~DefaultIndexingChain()1254 DefaultIndexingChain::~DefaultIndexingChain() {
1255 }
1256 
getChain(const DocumentsWriterPtr & documentsWriter)1257 DocConsumerPtr DefaultIndexingChain::getChain(const DocumentsWriterPtr& documentsWriter) {
1258     TermsHashConsumerPtr termVectorsWriter(newLucene<TermVectorsTermsWriter>(documentsWriter));
1259     TermsHashConsumerPtr freqProxWriter(newLucene<FreqProxTermsWriter>());
1260 
1261     InvertedDocConsumerPtr termsHash(newLucene<TermsHash>(documentsWriter, true, freqProxWriter,
1262                                      newLucene<TermsHash>(documentsWriter, false,
1263                                              termVectorsWriter, TermsHashPtr())));
1264 
1265     DocInverterPtr docInverter(newLucene<DocInverter>(termsHash, newLucene<NormsWriter>()));
1266     return newLucene<DocFieldProcessor>(documentsWriter, docInverter);
1267 }
1268 
~SkipDocWriter()1269 SkipDocWriter::~SkipDocWriter() {
1270 }
1271 
finish()1272 void SkipDocWriter::finish() {
1273 }
1274 
abort()1275 void SkipDocWriter::abort() {
1276 }
1277 
sizeInBytes()1278 int64_t SkipDocWriter::sizeInBytes() {
1279     return 0;
1280 }
1281 
WaitQueue(const DocumentsWriterPtr & docWriter)1282 WaitQueue::WaitQueue(const DocumentsWriterPtr& docWriter) {
1283     this->_docWriter = docWriter;
1284     waiting = Collection<DocWriterPtr>::newInstance(10);
1285     nextWriteDocID = 0;
1286     nextWriteLoc = 0;
1287     numWaiting = 0;
1288     waitingBytes = 0;
1289 }
1290 
~WaitQueue()1291 WaitQueue::~WaitQueue() {
1292 }
1293 
reset()1294 void WaitQueue::reset() {
1295     SyncLock syncLock(this);
1296     // NOTE: nextWriteLoc doesn't need to be reset
1297     BOOST_ASSERT(numWaiting == 0);
1298     BOOST_ASSERT(waitingBytes == 0);
1299     nextWriteDocID = 0;
1300 }
1301 
doResume()1302 bool WaitQueue::doResume() {
1303     SyncLock syncLock(this);
1304     return (waitingBytes <= DocumentsWriterPtr(_docWriter)->waitQueueResumeBytes);
1305 }
1306 
doPause()1307 bool WaitQueue::doPause() {
1308     SyncLock syncLock(this);
1309     return (waitingBytes > DocumentsWriterPtr(_docWriter)->waitQueuePauseBytes);
1310 }
1311 
abort()1312 void WaitQueue::abort() {
1313     SyncLock syncLock(this);
1314     int32_t count = 0;
1315     for (Collection<DocWriterPtr>::iterator doc = waiting.begin(); doc != waiting.end(); ++doc) {
1316         if (*doc) {
1317             (*doc)->abort();
1318             doc->reset();
1319             ++count;
1320         }
1321     }
1322     waitingBytes = 0;
1323     BOOST_ASSERT(count == numWaiting);
1324     numWaiting = 0;
1325 }
1326 
writeDocument(const DocWriterPtr & doc)1327 void WaitQueue::writeDocument(const DocWriterPtr& doc) {
1328     DocumentsWriterPtr docWriter(_docWriter);
1329     BOOST_ASSERT(doc == DocumentsWriterPtr(docWriter)->skipDocWriter || nextWriteDocID == doc->docID);
1330     bool success = false;
1331     LuceneException finally;
1332     try {
1333         doc->finish();
1334         ++nextWriteDocID;
1335         ++docWriter->numDocsInStore;
1336         ++nextWriteLoc;
1337         BOOST_ASSERT(nextWriteLoc <= waiting.size());
1338         if (nextWriteLoc == waiting.size()) {
1339             nextWriteLoc = 0;
1340         }
1341         success = true;
1342     } catch (LuceneException& e) {
1343         finally = e;
1344     }
1345     if (!success) {
1346         docWriter->setAborting();
1347     }
1348     finally.throwException();
1349 }
1350 
add(const DocWriterPtr & doc)1351 bool WaitQueue::add(const DocWriterPtr& doc) {
1352     DocWriterPtr _doc(doc);
1353     SyncLock syncLock(this);
1354     BOOST_ASSERT(_doc->docID >= nextWriteDocID);
1355     if (_doc->docID == nextWriteDocID) {
1356         writeDocument(_doc);
1357         while (true) {
1358             _doc = waiting[nextWriteLoc];
1359             if (_doc) {
1360                 --numWaiting;
1361                 waiting[nextWriteLoc].reset();
1362                 waitingBytes -= _doc->sizeInBytes();
1363                 writeDocument(_doc);
1364             } else {
1365                 break;
1366             }
1367         }
1368     } else {
1369         // I finished before documents that were added before me.  This can easily happen when I am a small doc
1370         // and the docs before me were large, or just due to luck in the thread scheduling.  Just add myself to
1371         // the queue and when that large doc finishes, it will flush me
1372         int32_t gap = _doc->docID - nextWriteDocID;
1373         if (gap >= waiting.size()) {
1374             // Grow queue
1375             Collection<DocWriterPtr> newArray(Collection<DocWriterPtr>::newInstance(MiscUtils::getNextSize(gap)));
1376             BOOST_ASSERT(nextWriteLoc >= 0);
1377             MiscUtils::arrayCopy(waiting.begin(), nextWriteLoc, newArray.begin(), 0, waiting.size() - nextWriteLoc);
1378             MiscUtils::arrayCopy(waiting.begin(), 0, newArray.begin(), waiting.size() - nextWriteLoc, nextWriteLoc);
1379             nextWriteLoc = 0;
1380             waiting = newArray;
1381             gap = _doc->docID - nextWriteDocID;
1382         }
1383 
1384         int32_t loc = nextWriteLoc + gap;
1385         if (loc >= waiting.size()) {
1386             loc -= waiting.size();
1387         }
1388 
1389         // We should only wrap one time
1390         BOOST_ASSERT(loc < waiting.size());
1391 
1392         // Nobody should be in my spot!
1393         BOOST_ASSERT(!waiting[loc]);
1394         waiting[loc] = _doc;
1395         ++numWaiting;
1396         waitingBytes += _doc->sizeInBytes();
1397     }
1398 
1399     return doPause();
1400 }
1401 
ByteBlockAllocator(const DocumentsWriterPtr & docWriter,int32_t blockSize)1402 ByteBlockAllocator::ByteBlockAllocator(const DocumentsWriterPtr& docWriter, int32_t blockSize) {
1403     this->blockSize = blockSize;
1404     this->freeByteBlocks = Collection<ByteArray>::newInstance();
1405     this->_docWriter = docWriter;
1406 }
1407 
~ByteBlockAllocator()1408 ByteBlockAllocator::~ByteBlockAllocator() {
1409 }
1410 
getByteBlock(bool trackAllocations)1411 ByteArray ByteBlockAllocator::getByteBlock(bool trackAllocations) {
1412     DocumentsWriterPtr docWriter(_docWriter);
1413     SyncLock syncLock(docWriter);
1414     int32_t size = freeByteBlocks.size();
1415     ByteArray b;
1416     if (size == 0) {
1417         // Always record a block allocated, even if trackAllocations is false.  This is necessary because this block will
1418         // be shared between things that don't track allocations (term vectors) and things that do (freq/prox postings).
1419         docWriter->numBytesAlloc += blockSize;
1420         b = ByteArray::newInstance(blockSize);
1421         MiscUtils::arrayFill(b.get(), 0, b.size(), 0);
1422     } else {
1423         b = freeByteBlocks.removeLast();
1424     }
1425     if (trackAllocations) {
1426         docWriter->numBytesUsed += blockSize;
1427     }
1428     BOOST_ASSERT(docWriter->numBytesUsed <= docWriter->numBytesAlloc);
1429     return b;
1430 }
1431 
recycleByteBlocks(Collection<ByteArray> blocks,int32_t start,int32_t end)1432 void ByteBlockAllocator::recycleByteBlocks(Collection<ByteArray> blocks, int32_t start, int32_t end) {
1433     DocumentsWriterPtr docWriter(_docWriter);
1434     SyncLock syncLock(docWriter);
1435     for (int32_t i = start; i < end; ++i) {
1436         freeByteBlocks.add(blocks[i]);
1437         blocks[i].reset();
1438     }
1439 }
1440 
recycleByteBlocks(Collection<ByteArray> blocks)1441 void ByteBlockAllocator::recycleByteBlocks(Collection<ByteArray> blocks) {
1442     DocumentsWriterPtr docWriter(_docWriter);
1443     SyncLock syncLock(docWriter);
1444     int32_t size = blocks.size();
1445     for (int32_t i = 0; i < size; ++i) {
1446         freeByteBlocks.add(blocks[i]);
1447     }
1448 }
1449 
1450 }
1451