1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "DocumentsWriter.h"
9 #include "DocumentsWriterThreadState.h"
10 #include "LuceneThread.h"
11 #include "IndexWriter.h"
12 #include "_IndexWriter.h"
13 #include "IndexReader.h"
14 #include "IndexSearcher.h"
15 #include "DocFieldProcessor.h"
16 #include "Term.h"
17 #include "TermDocs.h"
18 #include "TermVectorsTermsWriter.h"
19 #include "FreqProxTermsWriter.h"
20 #include "TermsHashConsumer.h"
21 #include "InvertedDocConsumer.h"
22 #include "TermsHash.h"
23 #include "DocInverter.h"
24 #include "NormsWriter.h"
25 #include "BufferedDeletes.h"
26 #include "FieldInfos.h"
27 #include "InfoStream.h"
28 #include "DocConsumerPerThread.h"
29 #include "SegmentWriteState.h"
30 #include "IndexFileNames.h"
31 #include "CompoundFileWriter.h"
32 #include "MergeDocIDRemapper.h"
33 #include "SegmentReader.h"
34 #include "SegmentInfos.h"
35 #include "SegmentInfo.h"
36 #include "Query.h"
37 #include "Weight.h"
38 #include "Scorer.h"
39 #include "TestPoint.h"
40 #include "MiscUtils.h"
41 #include "StringUtils.h"
42
43 namespace Lucene {
44
45 /// Max # ThreadState instances; if there are more threads than this they share ThreadStates
46 const int32_t DocumentsWriter::MAX_THREAD_STATE = 5;
47
48 /// Coarse estimates used to measure RAM usage of buffered deletes
49 const int32_t DocumentsWriter::OBJECT_HEADER_BYTES = 8;
50 #ifdef LPP_BUILD_64
51 const int32_t DocumentsWriter::POINTER_NUM_BYTE = 8;
52 #else
53 const int32_t DocumentsWriter::POINTER_NUM_BYTE = 4;
54 #endif
55 const int32_t DocumentsWriter::INT_NUM_BYTE = 4;
56 #ifdef LPP_UNICODE_CHAR_SIZE_4
57 const int32_t DocumentsWriter::CHAR_NUM_BYTE = 4;
58 #else
59 const int32_t DocumentsWriter::CHAR_NUM_BYTE = 2;
60 #endif
61
62 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER). Entry is object
63 /// with Term key, BufferedDeletes.Num val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). Term is
64 /// object with String field and String text (OBJ_HEADER + 2*POINTER). We don't count Term's field since
65 /// it's interned. Term's text is String (OBJ_HEADER + 4*INT + POINTER + OBJ_HEADER + string.length*CHAR).
66 /// BufferedDeletes.num is OBJ_HEADER + INT.
67 const int32_t DocumentsWriter::BYTES_PER_DEL_TERM = 8 * DocumentsWriter::POINTER_NUM_BYTE + 5 *
68 DocumentsWriter::OBJECT_HEADER_BYTES + 6 *
69 DocumentsWriter::INT_NUM_BYTE;
70
71 /// Rough logic: del docIDs are List<Integer>. Say list allocates ~2X size (2*POINTER). Integer is
72 /// OBJ_HEADER + int
73 const int32_t DocumentsWriter::BYTES_PER_DEL_DOCID = 2 * DocumentsWriter::POINTER_NUM_BYTE +
74 DocumentsWriter::OBJECT_HEADER_BYTES +
75 DocumentsWriter::INT_NUM_BYTE;
76
77 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER). Entry is object
78 /// with Query key, Integer val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). Query we often undercount
79 /// (say 24 bytes). Integer is OBJ_HEADER + INT.
80 const int32_t DocumentsWriter::BYTES_PER_DEL_QUERY = 5 * DocumentsWriter::POINTER_NUM_BYTE + 2 *
81 DocumentsWriter::OBJECT_HEADER_BYTES + 2 *
82 DocumentsWriter::INT_NUM_BYTE + 24;
83
84 /// Initial chunks size of the shared byte[] blocks used to store postings data
85 const int32_t DocumentsWriter::BYTE_BLOCK_SHIFT = 15;
86 const int32_t DocumentsWriter::BYTE_BLOCK_SIZE = 1 << DocumentsWriter::BYTE_BLOCK_SHIFT;
87 const int32_t DocumentsWriter::BYTE_BLOCK_MASK = DocumentsWriter::BYTE_BLOCK_SIZE - 1;
88 const int32_t DocumentsWriter::BYTE_BLOCK_NOT_MASK = ~DocumentsWriter::BYTE_BLOCK_MASK;
89
90 /// Initial chunk size of the shared char[] blocks used to store term text
91 const int32_t DocumentsWriter::CHAR_BLOCK_SHIFT = 14;
92 const int32_t DocumentsWriter::CHAR_BLOCK_SIZE = 1 << DocumentsWriter::CHAR_BLOCK_SHIFT;
93 const int32_t DocumentsWriter::CHAR_BLOCK_MASK = DocumentsWriter::CHAR_BLOCK_SIZE - 1;
94
95 const int32_t DocumentsWriter::MAX_TERM_LENGTH = DocumentsWriter::CHAR_BLOCK_SIZE - 1;
96
97 /// Initial chunks size of the shared int[] blocks used to store postings data
98 const int32_t DocumentsWriter::INT_BLOCK_SHIFT = 13;
99 const int32_t DocumentsWriter::INT_BLOCK_SIZE = 1 << DocumentsWriter::INT_BLOCK_SHIFT;
100 const int32_t DocumentsWriter::INT_BLOCK_MASK = DocumentsWriter::INT_BLOCK_SIZE - 1;
101
102 const int32_t DocumentsWriter::PER_DOC_BLOCK_SIZE = 1024;
103
DocumentsWriter(const DirectoryPtr & directory,const IndexWriterPtr & writer,const IndexingChainPtr & indexingChain)104 DocumentsWriter::DocumentsWriter(const DirectoryPtr& directory, const IndexWriterPtr& writer, const IndexingChainPtr& indexingChain) {
105 this->threadStates = Collection<DocumentsWriterThreadStatePtr>::newInstance();
106 this->threadBindings = MapThreadDocumentsWriterThreadState::newInstance();
107 this->_openFiles = HashSet<String>::newInstance();
108 this->_closedFiles = HashSet<String>::newInstance();
109 this->freeIntBlocks = Collection<IntArray>::newInstance();
110 this->freeCharBlocks = Collection<CharArray>::newInstance();
111
112 this->directory = directory;
113 this->_writer = writer;
114 this->indexingChain = indexingChain;
115 }
116
~DocumentsWriter()117 DocumentsWriter::~DocumentsWriter() {
118 }
119
initialize()120 void DocumentsWriter::initialize() {
121 docStoreOffset = 0;
122 nextDocID = 0;
123 numDocsInRAM = 0;
124 numDocsInStore = 0;
125 pauseThreads = 0;
126 flushPending = false;
127 bufferIsFull = false;
128 aborting = false;
129 maxFieldLength = IndexWriter::DEFAULT_MAX_FIELD_LENGTH;
130 deletesInRAM = newLucene<BufferedDeletes>(false);
131 deletesFlushed = newLucene<BufferedDeletes>(true);
132 maxBufferedDeleteTerms = IndexWriter::DEFAULT_MAX_BUFFERED_DELETE_TERMS;
133 ramBufferSize = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024);
134 waitQueuePauseBytes = (int64_t)((double)ramBufferSize * 0.1);
135 waitQueueResumeBytes = (int64_t)((double)ramBufferSize * 0.05);
136 freeTrigger = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024.0 * 1024.0 * 1.05);
137 freeLevel = (int64_t)(IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB * 1024.0 * 1024.0 * 0.95);
138 maxBufferedDocs = IndexWriter::DEFAULT_MAX_BUFFERED_DOCS;
139 flushedDocCount = 0;
140 closed = false;
141 waitQueue = newLucene<WaitQueue>(shared_from_this());
142 skipDocWriter = newLucene<SkipDocWriter>();
143 numBytesAlloc = 0;
144 numBytesUsed = 0;
145 byteBlockAllocator = newLucene<ByteBlockAllocator>(shared_from_this(), BYTE_BLOCK_SIZE);
146 perDocAllocator = newLucene<ByteBlockAllocator>(shared_from_this(), PER_DOC_BLOCK_SIZE);
147
148 IndexWriterPtr writer(_writer);
149 this->similarity = writer->getSimilarity();
150 flushedDocCount = writer->maxDoc();
151
152 consumer = indexingChain->getChain(shared_from_this());
153 docFieldProcessor = boost::dynamic_pointer_cast<DocFieldProcessor>(consumer);
154 }
155
newPerDocBuffer()156 PerDocBufferPtr DocumentsWriter::newPerDocBuffer() {
157 return newLucene<PerDocBuffer>(shared_from_this());
158 }
159
getDefaultIndexingChain()160 IndexingChainPtr DocumentsWriter::getDefaultIndexingChain() {
161 static DefaultIndexingChainPtr defaultIndexingChain;
162 if (!defaultIndexingChain) {
163 defaultIndexingChain = newLucene<DefaultIndexingChain>();
164 CycleCheck::addStatic(defaultIndexingChain);
165 }
166 return defaultIndexingChain;
167 }
168
updateFlushedDocCount(int32_t n)169 void DocumentsWriter::updateFlushedDocCount(int32_t n) {
170 SyncLock syncLock(this);
171 flushedDocCount += n;
172 }
173
getFlushedDocCount()174 int32_t DocumentsWriter::getFlushedDocCount() {
175 SyncLock syncLock(this);
176 return flushedDocCount;
177 }
178
setFlushedDocCount(int32_t n)179 void DocumentsWriter::setFlushedDocCount(int32_t n) {
180 SyncLock syncLock(this);
181 flushedDocCount = n;
182 }
183
hasProx()184 bool DocumentsWriter::hasProx() {
185 return docFieldProcessor ? docFieldProcessor->fieldInfos->hasProx() : true;
186 }
187
setInfoStream(const InfoStreamPtr & infoStream)188 void DocumentsWriter::setInfoStream(const InfoStreamPtr& infoStream) {
189 SyncLock syncLock(this);
190 this->infoStream = infoStream;
191 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
192 (*threadState)->docState->infoStream = infoStream;
193 }
194 }
195
setMaxFieldLength(int32_t maxFieldLength)196 void DocumentsWriter::setMaxFieldLength(int32_t maxFieldLength) {
197 SyncLock syncLock(this);
198 this->maxFieldLength = maxFieldLength;
199 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
200 (*threadState)->docState->maxFieldLength = maxFieldLength;
201 }
202 }
203
setSimilarity(const SimilarityPtr & similarity)204 void DocumentsWriter::setSimilarity(const SimilarityPtr& similarity) {
205 SyncLock syncLock(this);
206 this->similarity = similarity;
207 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
208 (*threadState)->docState->similarity = similarity;
209 }
210 }
211
setRAMBufferSizeMB(double mb)212 void DocumentsWriter::setRAMBufferSizeMB(double mb) {
213 SyncLock syncLock(this);
214 if (mb == IndexWriter::DISABLE_AUTO_FLUSH) {
215 ramBufferSize = IndexWriter::DISABLE_AUTO_FLUSH;
216 waitQueuePauseBytes = 4 * 1024 * 1024;
217 waitQueueResumeBytes = 2 * 1024 * 1024;
218 } else {
219 ramBufferSize = (int64_t)(mb * 1024.0 * 1024.0);
220 waitQueuePauseBytes = (int64_t)((double)ramBufferSize * 0.1);
221 waitQueueResumeBytes = (int64_t)((double)ramBufferSize * 0.05);
222 freeTrigger = (int64_t)(1.05 * (double)ramBufferSize);
223 freeLevel = (int64_t)(0.95 * (double)ramBufferSize);
224 }
225 }
226
getRAMBufferSizeMB()227 double DocumentsWriter::getRAMBufferSizeMB() {
228 SyncLock syncLock(this);
229 if (ramBufferSize == IndexWriter::DISABLE_AUTO_FLUSH) {
230 return (double)ramBufferSize;
231 } else {
232 return (double)ramBufferSize / 1024.0 / 1024.0;
233 }
234 }
235
setMaxBufferedDocs(int32_t count)236 void DocumentsWriter::setMaxBufferedDocs(int32_t count) {
237 maxBufferedDocs = count;
238 }
239
getMaxBufferedDocs()240 int32_t DocumentsWriter::getMaxBufferedDocs() {
241 return maxBufferedDocs;
242 }
243
getSegment()244 String DocumentsWriter::getSegment() {
245 return segment;
246 }
247
getNumDocsInRAM()248 int32_t DocumentsWriter::getNumDocsInRAM() {
249 return numDocsInRAM;
250 }
251
getDocStoreSegment()252 String DocumentsWriter::getDocStoreSegment() {
253 SyncLock syncLock(this);
254 return docStoreSegment;
255 }
256
getDocStoreOffset()257 int32_t DocumentsWriter::getDocStoreOffset() {
258 return docStoreOffset;
259 }
260
closeDocStore()261 String DocumentsWriter::closeDocStore() {
262 TestScope testScope(L"DocumentsWriter", L"closeDocStore");
263 SyncLock syncLock(this);
264 BOOST_ASSERT(allThreadsIdle());
265
266 if (infoStream) {
267 message(L"closeDocStore: " + StringUtils::toString(_openFiles.size()) + L" files to flush to segment " +
268 docStoreSegment + L" numDocs=" + StringUtils::toString(numDocsInStore));
269 }
270
271 bool success = false;
272 LuceneException finally;
273 String s;
274 try {
275 initFlushState(true);
276 _closedFiles.clear();
277
278 consumer->closeDocStore(flushState);
279 BOOST_ASSERT(_openFiles.empty());
280
281 s = docStoreSegment;
282 docStoreSegment.clear();
283 docStoreOffset = 0;
284 numDocsInStore = 0;
285 success = true;
286 } catch (LuceneException& e) {
287 finally = e;
288 }
289 if (!success) {
290 abort();
291 }
292 finally.throwException();
293 return s;
294 }
295
abortedFiles()296 HashSet<String> DocumentsWriter::abortedFiles() {
297 return _abortedFiles;
298 }
299
message(const String & message)300 void DocumentsWriter::message(const String& message) {
301 if (infoStream) {
302 *infoStream << L"DW " << message << L"\n";
303 }
304 }
305
openFiles()306 HashSet<String> DocumentsWriter::openFiles() {
307 SyncLock syncLock(this);
308 return HashSet<String>::newInstance(_openFiles.begin(), _openFiles.end());
309 }
310
closedFiles()311 HashSet<String> DocumentsWriter::closedFiles() {
312 SyncLock syncLock(this);
313 return HashSet<String>::newInstance(_closedFiles.begin(), _closedFiles.end());
314 }
315
addOpenFile(const String & name)316 void DocumentsWriter::addOpenFile(const String& name) {
317 SyncLock syncLock(this);
318 BOOST_ASSERT(!_openFiles.contains(name));
319 _openFiles.add(name);
320 }
321
removeOpenFile(const String & name)322 void DocumentsWriter::removeOpenFile(const String& name) {
323 SyncLock syncLock(this);
324 BOOST_ASSERT(_openFiles.contains(name));
325 _openFiles.remove(name);
326 _closedFiles.add(name);
327 }
328
setAborting()329 void DocumentsWriter::setAborting() {
330 SyncLock syncLock(this);
331 aborting = true;
332 }
333
abort()334 void DocumentsWriter::abort() {
335 TestScope testScope(L"DocumentsWriter", L"abort");
336 SyncLock syncLock(this);
337 LuceneException finally;
338 try {
339 if (infoStream) {
340 message(L"docWriter: now abort");
341 }
342
343 // Forcefully remove waiting ThreadStates from line
344 waitQueue->abort();
345
346 // Wait for all other threads to finish with DocumentsWriter
347 pauseAllThreads();
348
349 try {
350 BOOST_ASSERT(waitQueue->numWaiting == 0);
351
352 waitQueue->waitingBytes = 0;
353
354 try {
355 _abortedFiles = openFiles();
356 } catch (...) {
357 _abortedFiles.reset();
358 }
359
360 deletesInRAM->clear();
361 deletesFlushed->clear();
362 _openFiles.clear();
363
364 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
365 try {
366 (*threadState)->consumer->abort();
367 } catch (...) {
368 }
369 }
370
371 try {
372 consumer->abort();
373 } catch (...) {
374 }
375
376 docStoreSegment.clear();
377 numDocsInStore = 0;
378 docStoreOffset = 0;
379
380 // Reset all postings data
381 doAfterFlush();
382 } catch (LuceneException& e) {
383 finally = e;
384 }
385 resumeAllThreads();
386 } catch (LuceneException& e) {
387 if (finally.isNull()) {
388 finally = e;
389 }
390 }
391 aborting = false;
392 notifyAll();
393 if (infoStream) {
394 message(L"docWriter: done abort");
395 }
396 finally.throwException();
397 }
398
doAfterFlush()399 void DocumentsWriter::doAfterFlush() {
400 // All ThreadStates should be idle when we are called
401 BOOST_ASSERT(allThreadsIdle());
402 threadBindings.clear();
403 waitQueue->reset();
404 segment.clear();
405 numDocsInRAM = 0;
406 nextDocID = 0;
407 bufferIsFull = false;
408 flushPending = false;
409 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
410 (*threadState)->doAfterFlush();
411 }
412 numBytesUsed = 0;
413 }
414
pauseAllThreads()415 bool DocumentsWriter::pauseAllThreads() {
416 SyncLock syncLock(this);
417 ++pauseThreads;
418 while (!allThreadsIdle()) {
419 wait(1000);
420 }
421 return aborting;
422 }
423
resumeAllThreads()424 void DocumentsWriter::resumeAllThreads() {
425 SyncLock syncLock(this);
426 --pauseThreads;
427 BOOST_ASSERT(pauseThreads >= 0);
428 if (pauseThreads == 0) {
429 notifyAll();
430 }
431 }
432
allThreadsIdle()433 bool DocumentsWriter::allThreadsIdle() {
434 SyncLock syncLock(this);
435 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
436 if (!(*threadState)->isIdle) {
437 return false;
438 }
439 }
440 return true;
441 }
442
anyChanges()443 bool DocumentsWriter::anyChanges() {
444 SyncLock syncLock(this);
445 return (numDocsInRAM != 0 || deletesInRAM->numTerms != 0 || !deletesInRAM->docIDs.empty() || !deletesInRAM->queries.empty());
446 }
447
initFlushState(bool onlyDocStore)448 void DocumentsWriter::initFlushState(bool onlyDocStore) {
449 SyncLock syncLock(this);
450 initSegmentName(onlyDocStore);
451 flushState = newLucene<SegmentWriteState>(shared_from_this(), directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, IndexWriterPtr(_writer)->getTermIndexInterval());
452 }
453
flush(bool _closeDocStore)454 int32_t DocumentsWriter::flush(bool _closeDocStore) {
455 SyncLock syncLock(this);
456 BOOST_ASSERT(allThreadsIdle());
457
458 BOOST_ASSERT(numDocsInRAM > 0);
459
460 BOOST_ASSERT(nextDocID == numDocsInRAM);
461 BOOST_ASSERT(waitQueue->numWaiting == 0);
462 BOOST_ASSERT(waitQueue->waitingBytes == 0);
463
464 initFlushState(false);
465
466 docStoreOffset = numDocsInStore;
467
468 if (infoStream) {
469 message(L"flush postings as segment " + flushState->segmentName + L" numDocs=" + StringUtils::toString(numDocsInRAM));
470 }
471
472 bool success = false;
473 LuceneException finally;
474
475 try {
476 if (_closeDocStore) {
477 BOOST_ASSERT(!flushState->docStoreSegmentName.empty());
478 BOOST_ASSERT(flushState->docStoreSegmentName == flushState->segmentName);
479
480 closeDocStore();
481 flushState->numDocsInStore = 0;
482 }
483
484 Collection<DocConsumerPerThreadPtr> threads(Collection<DocConsumerPerThreadPtr>::newInstance());
485 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
486 threads.add((*threadState)->consumer);
487 }
488 consumer->flush(threads, flushState);
489
490 if (infoStream) {
491 SegmentInfoPtr si(newLucene<SegmentInfo>(flushState->segmentName, flushState->numDocs, directory));
492 int64_t newSegmentSize = si->sizeInBytes();
493 if (infoStream) {
494 message(L" oldRAMSize=" + StringUtils::toString(numBytesUsed) + L" newFlushedSize=" +
495 StringUtils::toString(newSegmentSize) + L" docs/MB=" +
496 StringUtils::toString((double)numDocsInRAM / ((double)newSegmentSize / 1024.0 / 1024.0)) +
497 L" new/old=" + StringUtils::toString(100.0 * (double)newSegmentSize / (double)numBytesUsed) + L"%");
498 }
499 }
500
501 flushedDocCount += flushState->numDocs;
502
503 doAfterFlush();
504
505 success = true;
506 } catch (LuceneException& e) {
507 finally = e;
508 }
509 if (!success) {
510 abort();
511 }
512 finally.throwException();
513
514 BOOST_ASSERT(waitQueue->waitingBytes == 0);
515
516 return flushState->numDocs;
517 }
518
getFlushedFiles()519 HashSet<String> DocumentsWriter::getFlushedFiles() {
520 return flushState->flushedFiles;
521 }
522
createCompoundFile(const String & segment)523 void DocumentsWriter::createCompoundFile(const String& segment) {
524 CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, segment + L"." + IndexFileNames::COMPOUND_FILE_EXTENSION()));
525 for (HashSet<String>::iterator flushedFile = flushState->flushedFiles.begin(); flushedFile != flushState->flushedFiles.end(); ++flushedFile) {
526 cfsWriter->addFile(*flushedFile);
527 }
528
529 // Perform the merge
530 cfsWriter->close();
531 }
532
setFlushPending()533 bool DocumentsWriter::setFlushPending() {
534 SyncLock syncLock(this);
535 if (flushPending) {
536 return false;
537 } else {
538 flushPending = true;
539 return true;
540 }
541 }
542
clearFlushPending()543 void DocumentsWriter::clearFlushPending() {
544 SyncLock syncLock(this);
545 flushPending = false;
546 }
547
pushDeletes()548 void DocumentsWriter::pushDeletes() {
549 SyncLock syncLock(this);
550 deletesFlushed->update(deletesInRAM);
551 }
552
close()553 void DocumentsWriter::close() {
554 SyncLock syncLock(this);
555 closed = true;
556 notifyAll();
557 }
558
initSegmentName(bool onlyDocStore)559 void DocumentsWriter::initSegmentName(bool onlyDocStore) {
560 SyncLock syncLock(this);
561 if (segment.empty() && (!onlyDocStore || docStoreSegment.empty())) {
562 segment = IndexWriterPtr(_writer)->newSegmentName();
563 BOOST_ASSERT(numDocsInRAM == 0);
564 }
565 if (docStoreSegment.empty()) {
566 docStoreSegment = segment;
567 BOOST_ASSERT(numDocsInStore == 0);
568 }
569 }
570
getThreadState(const DocumentPtr & doc,const TermPtr & delTerm)571 DocumentsWriterThreadStatePtr DocumentsWriter::getThreadState(const DocumentPtr& doc, const TermPtr& delTerm) {
572 SyncLock syncLock(this);
573 // First, find a thread state. If this thread already has affinity to a specific ThreadState, use that one again.
574 DocumentsWriterThreadStatePtr state(threadBindings.get(LuceneThread::currentId()));
575 if (!state) {
576 // First time this thread has called us since last flush. Find the least loaded thread state
577 DocumentsWriterThreadStatePtr minThreadState;
578 for (Collection<DocumentsWriterThreadStatePtr>::iterator threadState = threadStates.begin(); threadState != threadStates.end(); ++threadState) {
579 if (!minThreadState || (*threadState)->numThreads < minThreadState->numThreads) {
580 minThreadState = *threadState;
581 }
582 }
583 if (minThreadState && (minThreadState->numThreads == 0 || threadStates.size() >= MAX_THREAD_STATE)) {
584 state = minThreadState;
585 ++state->numThreads;
586 } else {
587 // Just create a new "private" thread state
588 threadStates.resize(threadStates.size() + 1);
589 state = newLucene<DocumentsWriterThreadState>(shared_from_this());
590 threadStates[threadStates.size() - 1] = state;
591 }
592 threadBindings.put(LuceneThread::currentId(), state);
593 }
594
595 // Next, wait until my thread state is idle (in case it's shared with other threads) and for threads to
596 // not be paused nor a flush pending
597 waitReady(state);
598
599 // Allocate segment name if this is the first doc since last flush
600 initSegmentName(false);
601
602 state->isIdle = false;
603
604 bool success = false;
605 LuceneException finally;
606 try {
607 state->docState->docID = nextDocID;
608
609 BOOST_ASSERT(IndexWriterPtr(_writer)->testPoint(L"DocumentsWriter.ThreadState.init start"));
610
611 if (delTerm) {
612 addDeleteTerm(delTerm, state->docState->docID);
613 state->doFlushAfter = timeToFlushDeletes();
614 }
615
616 BOOST_ASSERT(IndexWriterPtr(_writer)->testPoint(L"DocumentsWriter.ThreadState.init after delTerm"));
617
618 ++nextDocID;
619 ++numDocsInRAM;
620
621 // We must at this point commit to flushing to ensure we always get N docs when we flush by doc
622 // count, even if > 1 thread is adding documents
623 if (!flushPending && maxBufferedDocs != IndexWriter::DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs) {
624 flushPending = true;
625 state->doFlushAfter = true;
626 }
627
628 success = true;
629 } catch (LuceneException& e) {
630 finally = e;
631 }
632 if (!success) {
633 // Forcefully idle this ThreadState
634 state->isIdle = true;
635 notifyAll();
636 if (state->doFlushAfter) {
637 state->doFlushAfter = false;
638 flushPending = false;
639 }
640 }
641 finally.throwException();
642
643 return state;
644 }
645
addDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer)646 bool DocumentsWriter::addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
647 return updateDocument(doc, analyzer, TermPtr());
648 }
649
updateDocument(const TermPtr & t,const DocumentPtr & doc,const AnalyzerPtr & analyzer)650 bool DocumentsWriter::updateDocument(const TermPtr& t, const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
651 return updateDocument(doc, analyzer, t);
652 }
653
updateDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer,const TermPtr & delTerm)654 bool DocumentsWriter::updateDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer, const TermPtr& delTerm) {
655 // This call is synchronized but fast
656 DocumentsWriterThreadStatePtr state(getThreadState(doc, delTerm));
657
658 DocStatePtr docState(state->docState);
659 docState->doc = doc;
660 docState->analyzer = analyzer;
661
662 bool success = false;
663 LuceneException finally;
664 try {
665 // This call is not synchronized and does all the work
666 DocWriterPtr perDoc;
667 try {
668 perDoc = state->consumer->processDocument();
669 } catch (LuceneException& e) {
670 finally = e;
671 }
672 docState->clear();
673 finally.throwException();
674
675 // This call is synchronized but fast
676 finishDocument(state, perDoc);
677
678 success = true;
679 } catch (LuceneException& e) {
680 finally = e;
681 }
682 if (!success) {
683 SyncLock syncLock(this);
684 if (aborting) {
685 state->isIdle = true;
686 notifyAll();
687 abort();
688 } else {
689 skipDocWriter->docID = docState->docID;
690 bool success2 = false;
691 try {
692 waitQueue->add(skipDocWriter);
693 success2 = true;
694 } catch (LuceneException& e) {
695 finally = e;
696 }
697 if (!success2) {
698 state->isIdle = true;
699 notifyAll();
700 abort();
701 return false;
702 }
703
704 state->isIdle = true;
705 notifyAll();
706
707 // If this thread state had decided to flush, we must clear it so another thread can flush
708 if (state->doFlushAfter) {
709 state->doFlushAfter = false;
710 flushPending = false;
711 notifyAll();
712 }
713
714 // Immediately mark this document as deleted since likely it was partially added. This keeps
715 // indexing as "all or none" (atomic) when adding a document
716 addDeleteDocID(state->docState->docID);
717 }
718 }
719
720 finally.throwException();
721
722 return (state->doFlushAfter || timeToFlushDeletes());
723 }
724
getNumBufferedDeleteTerms()725 int32_t DocumentsWriter::getNumBufferedDeleteTerms() {
726 SyncLock syncLock(this);
727 return deletesInRAM->numTerms;
728 }
729
getBufferedDeleteTerms()730 MapTermNum DocumentsWriter::getBufferedDeleteTerms() {
731 SyncLock syncLock(this);
732 return deletesInRAM->terms;
733 }
734
remapDeletes(const SegmentInfosPtr & infos,Collection<Collection<int32_t>> docMaps,Collection<int32_t> delCounts,const OneMergePtr & merge,int32_t mergeDocCount)735 void DocumentsWriter::remapDeletes(const SegmentInfosPtr& infos, Collection< Collection<int32_t> > docMaps, Collection<int32_t> delCounts, const OneMergePtr& merge, int32_t mergeDocCount) {
736 SyncLock syncLock(this);
737 if (!docMaps) {
738 // The merged segments had no deletes so docIDs did not change and we have nothing to do
739 return;
740 }
741 MergeDocIDRemapperPtr mapper(newLucene<MergeDocIDRemapper>(infos, docMaps, delCounts, merge, mergeDocCount));
742 deletesInRAM->remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
743 deletesFlushed->remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
744 flushedDocCount -= mapper->docShift;
745 }
746
waitReady(const DocumentsWriterThreadStatePtr & state)747 void DocumentsWriter::waitReady(const DocumentsWriterThreadStatePtr& state) {
748 SyncLock syncLock(this);
749 while (!closed && ((state && !state->isIdle) || pauseThreads != 0 || flushPending || aborting)) {
750 wait(1000);
751 }
752 if (closed) {
753 boost::throw_exception(AlreadyClosedException(L"this IndexWriter is closed"));
754 }
755 }
756
bufferDeleteTerms(Collection<TermPtr> terms)757 bool DocumentsWriter::bufferDeleteTerms(Collection<TermPtr> terms) {
758 SyncLock syncLock(this);
759 waitReady(DocumentsWriterThreadStatePtr());
760 for (Collection<TermPtr>::iterator term = terms.begin(); term != terms.end(); ++term) {
761 addDeleteTerm(*term, numDocsInRAM);
762 }
763 return timeToFlushDeletes();
764 }
765
bufferDeleteTerm(const TermPtr & term)766 bool DocumentsWriter::bufferDeleteTerm(const TermPtr& term) {
767 SyncLock syncLock(this);
768 waitReady(DocumentsWriterThreadStatePtr());
769 addDeleteTerm(term, numDocsInRAM);
770 return timeToFlushDeletes();
771 }
772
bufferDeleteQueries(Collection<QueryPtr> queries)773 bool DocumentsWriter::bufferDeleteQueries(Collection<QueryPtr> queries) {
774 SyncLock syncLock(this);
775 waitReady(DocumentsWriterThreadStatePtr());
776 for (Collection<QueryPtr>::iterator query = queries.begin(); query != queries.end(); ++query) {
777 addDeleteQuery(*query, numDocsInRAM);
778 }
779 return timeToFlushDeletes();
780 }
781
bufferDeleteQuery(const QueryPtr & query)782 bool DocumentsWriter::bufferDeleteQuery(const QueryPtr& query) {
783 SyncLock syncLock(this);
784 waitReady(DocumentsWriterThreadStatePtr());
785 addDeleteQuery(query, numDocsInRAM);
786 return timeToFlushDeletes();
787 }
788
deletesFull()789 bool DocumentsWriter::deletesFull() {
790 SyncLock syncLock(this);
791 return ((ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH &&
792 (deletesInRAM->bytesUsed + deletesFlushed->bytesUsed + numBytesUsed) >= ramBufferSize) ||
793 (maxBufferedDeleteTerms != IndexWriter::DISABLE_AUTO_FLUSH &&
794 ((deletesInRAM->size() + deletesFlushed->size()) >= maxBufferedDeleteTerms)));
795 }
796
doApplyDeletes()797 bool DocumentsWriter::doApplyDeletes() {
798 SyncLock syncLock(this);
799 // Very similar to deletesFull(), except we don't count numBytesAlloc, because we are checking whether
800 // deletes (alone) are consuming too many resources now and thus should be applied. We apply deletes
801 // if RAM usage is > 1/2 of our allowed RAM buffer, to prevent too-frequent flushing of a long tail of
802 // tiny segments when merges (which always apply deletes) are infrequent.
803 return ((ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH &&
804 (deletesInRAM->bytesUsed + deletesFlushed->bytesUsed) >= ramBufferSize / 2) ||
805 (maxBufferedDeleteTerms != IndexWriter::DISABLE_AUTO_FLUSH &&
806 ((deletesInRAM->size() + deletesFlushed->size()) >= maxBufferedDeleteTerms)));
807 }
808
timeToFlushDeletes()809 bool DocumentsWriter::timeToFlushDeletes() {
810 SyncLock syncLock(this);
811 return ((bufferIsFull || deletesFull()) && setFlushPending());
812 }
813
checkDeleteTerm(const TermPtr & term)814 bool DocumentsWriter::checkDeleteTerm(const TermPtr& term) {
815 if (term) {
816 BOOST_ASSERT(!lastDeleteTerm || term->compareTo(lastDeleteTerm) > 0);
817 }
818 lastDeleteTerm = term;
819 return true;
820 }
821
setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms)822 void DocumentsWriter::setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms) {
823 this->maxBufferedDeleteTerms = maxBufferedDeleteTerms;
824 }
825
getMaxBufferedDeleteTerms()826 int32_t DocumentsWriter::getMaxBufferedDeleteTerms() {
827 return maxBufferedDeleteTerms;
828 }
829
hasDeletes()830 bool DocumentsWriter::hasDeletes() {
831 SyncLock syncLock(this);
832 return deletesFlushed->any();
833 }
834
applyDeletes(const SegmentInfosPtr & infos)835 bool DocumentsWriter::applyDeletes(const SegmentInfosPtr& infos) {
836 SyncLock syncLock(this);
837 if (!hasDeletes()) {
838 return false;
839 }
840
841 if (infoStream) {
842 message(L"apply " + StringUtils::toString(deletesFlushed->numTerms) + L" buffered deleted terms and " +
843 StringUtils::toString(deletesFlushed->docIDs.size()) + L" deleted docIDs and " +
844 StringUtils::toString(deletesFlushed->queries.size()) + L" deleted queries on " +
845 StringUtils::toString(infos->size()) + L" segments.");
846 }
847
848 int32_t infosEnd = infos->size();
849
850 int32_t docStart = 0;
851 bool any = false;
852 IndexWriterPtr writer(_writer);
853
854 for (int32_t i = 0; i < infosEnd; ++i) {
855 // Make sure we never attempt to apply deletes to segment in external dir
856 BOOST_ASSERT(infos->info(i)->dir == directory);
857
858 SegmentReaderPtr reader(writer->readerPool->get(infos->info(i), false));
859 LuceneException finally;
860 try {
861 if (applyDeletes(reader, docStart)) {
862 any = true;
863 }
864 docStart += reader->maxDoc();
865 } catch (LuceneException& e) {
866 finally = e;
867 }
868 writer->readerPool->release(reader);
869 finally.throwException();
870 }
871
872 deletesFlushed->clear();
873
874 return any;
875 }
876
applyDeletes(const IndexReaderPtr & reader,int32_t docIDStart)877 bool DocumentsWriter::applyDeletes(const IndexReaderPtr& reader, int32_t docIDStart) {
878 SyncLock syncLock(this);
879 int32_t docEnd = docIDStart + reader->maxDoc();
880 bool any = false;
881
882 BOOST_ASSERT(checkDeleteTerm(TermPtr()));
883
884 // Delete by term
885 TermDocsPtr docs(reader->termDocs());
886 LuceneException finally;
887 try {
888 for (MapTermNum::iterator entry = deletesFlushed->terms.begin(); entry != deletesFlushed->terms.end(); ++entry) {
889 // we should be iterating a Map here, so terms better be in order
890 BOOST_ASSERT(checkDeleteTerm(entry->first));
891 docs->seek(entry->first);
892 int32_t limit = entry->second->getNum();
893 while (docs->next()) {
894 int32_t docID = docs->doc();
895 if (docIDStart + docID >= limit) {
896 break;
897 }
898 reader->deleteDocument(docID);
899 any = true;
900 }
901 }
902 } catch (LuceneException& e) {
903 finally = e;
904 }
905 docs->close();
906 finally.throwException();
907
908 // Delete by docID
909 for (Collection<int32_t>::iterator docID = deletesFlushed->docIDs.begin(); docID != deletesFlushed->docIDs.end(); ++docID) {
910 if (*docID >= docIDStart && *docID < docEnd) {
911 reader->deleteDocument(*docID - docIDStart);
912 any = true;
913 }
914 }
915
916 // Delete by query
917 IndexSearcherPtr searcher(newLucene<IndexSearcher>(reader));
918 for (MapQueryInt::iterator entry = deletesFlushed->queries.begin(); entry != deletesFlushed->queries.end(); ++entry) {
919 WeightPtr weight(entry->first->weight(searcher));
920 ScorerPtr scorer(weight->scorer(reader, true, false));
921 if (scorer) {
922 while (true) {
923 int32_t doc = scorer->nextDoc();
924 if ((int64_t)docIDStart + doc >= entry->second) {
925 break;
926 }
927 reader->deleteDocument(doc);
928 any = true;
929 }
930 }
931 }
932 searcher->close();
933 return any;
934 }
935
addDeleteTerm(const TermPtr & term,int32_t docCount)936 void DocumentsWriter::addDeleteTerm(const TermPtr& term, int32_t docCount) {
937 SyncLock syncLock(this);
938 NumPtr num(deletesInRAM->terms.get(term));
939 int32_t docIDUpto = flushedDocCount + docCount;
940 if (!num) {
941 deletesInRAM->terms.put(term, newLucene<Num>(docIDUpto));
942 } else {
943 num->setNum(docIDUpto);
944 }
945 ++deletesInRAM->numTerms;
946
947 deletesInRAM->addBytesUsed(BYTES_PER_DEL_TERM + term->_text.length() * CHAR_NUM_BYTE);
948 }
949
addDeleteDocID(int32_t docID)950 void DocumentsWriter::addDeleteDocID(int32_t docID) {
951 SyncLock syncLock(this);
952 deletesInRAM->docIDs.add(flushedDocCount + docID);
953 deletesInRAM->addBytesUsed(BYTES_PER_DEL_DOCID);
954 }
955
addDeleteQuery(const QueryPtr & query,int32_t docID)956 void DocumentsWriter::addDeleteQuery(const QueryPtr& query, int32_t docID) {
957 SyncLock syncLock(this);
958 deletesInRAM->queries.put(query, flushedDocCount + docID);
959 deletesInRAM->addBytesUsed(BYTES_PER_DEL_QUERY);
960 }
961
doBalanceRAM()962 bool DocumentsWriter::doBalanceRAM() {
963 SyncLock syncLock(this);
964 return (ramBufferSize != IndexWriter::DISABLE_AUTO_FLUSH && !bufferIsFull &&
965 (numBytesUsed + deletesInRAM->bytesUsed + deletesFlushed->bytesUsed >= ramBufferSize ||
966 numBytesAlloc >= freeTrigger));
967 }
968
finishDocument(const DocumentsWriterThreadStatePtr & perThread,const DocWriterPtr & docWriter)969 void DocumentsWriter::finishDocument(const DocumentsWriterThreadStatePtr& perThread, const DocWriterPtr& docWriter) {
970 if (doBalanceRAM()) {
971 // Must call this without holding synchronized(this) else we'll hit deadlock
972 balanceRAM();
973 }
974
975 {
976 SyncLock syncLock(this);
977 BOOST_ASSERT(!docWriter || docWriter->docID == perThread->docState->docID);
978
979 if (aborting) {
980 // We are currently aborting, and another thread is waiting for me to become idle. We
981 // just forcefully idle this threadState; it will be fully reset by abort()
982 if (docWriter) {
983 try {
984 docWriter->abort();
985 } catch (...) {
986 }
987 }
988
989 perThread->isIdle = true;
990 notifyAll();
991 return;
992 }
993
994 bool doPause;
995
996 if (docWriter) {
997 doPause = waitQueue->add(docWriter);
998 } else {
999 skipDocWriter->docID = perThread->docState->docID;
1000 doPause = waitQueue->add(skipDocWriter);
1001 }
1002
1003 if (doPause) {
1004 waitForWaitQueue();
1005 }
1006
1007 if (bufferIsFull && !flushPending) {
1008 flushPending = true;
1009 perThread->doFlushAfter = true;
1010 }
1011
1012 perThread->isIdle = true;
1013 notifyAll();
1014 }
1015 }
1016
waitForWaitQueue()1017 void DocumentsWriter::waitForWaitQueue() {
1018 SyncLock syncLock(this);
1019 do {
1020 wait(1000);
1021 } while (!waitQueue->doResume());
1022 }
1023
getRAMUsed()1024 int64_t DocumentsWriter::getRAMUsed() {
1025 return numBytesUsed + deletesInRAM->bytesUsed + deletesFlushed->bytesUsed;
1026 }
1027
getIntBlock(bool trackAllocations)1028 IntArray DocumentsWriter::getIntBlock(bool trackAllocations) {
1029 SyncLock syncLock(this);
1030 int32_t size = freeIntBlocks.size();
1031 IntArray b;
1032 if (size == 0) {
1033 // Always record a block allocated, even if trackAllocations is false. This is necessary because
1034 // this block will be shared between things that don't track allocations (term vectors) and things
1035 // that do (freq/prox postings).
1036 numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE;
1037 b = IntArray::newInstance(INT_BLOCK_SIZE);
1038 } else {
1039 b = freeIntBlocks.removeLast();
1040 }
1041 if (trackAllocations) {
1042 numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE;
1043 }
1044 BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1045 return b;
1046 }
1047
bytesAllocated(int64_t numBytes)1048 void DocumentsWriter::bytesAllocated(int64_t numBytes) {
1049 SyncLock syncLock(this);
1050 numBytesAlloc += numBytes;
1051 }
1052
bytesUsed(int64_t numBytes)1053 void DocumentsWriter::bytesUsed(int64_t numBytes) {
1054 SyncLock syncLock(this);
1055 numBytesUsed += numBytes;
1056 BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1057 }
1058
recycleIntBlocks(Collection<IntArray> blocks,int32_t start,int32_t end)1059 void DocumentsWriter::recycleIntBlocks(Collection<IntArray> blocks, int32_t start, int32_t end) {
1060 SyncLock syncLock(this);
1061 for (int32_t i = start; i < end; ++i) {
1062 freeIntBlocks.add(blocks[i]);
1063 blocks[i].reset();
1064 }
1065 }
1066
getCharBlock()1067 CharArray DocumentsWriter::getCharBlock() {
1068 SyncLock syncLock(this);
1069 int32_t size = freeCharBlocks.size();
1070 CharArray c;
1071 if (size == 0) {
1072 numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1073 c = CharArray::newInstance(CHAR_BLOCK_SIZE);
1074 } else {
1075 c = freeCharBlocks.removeLast();
1076 }
1077 // We always track allocations of char blocks for now because nothing that skips allocation tracking
1078 // (currently only term vectors) uses its own char blocks.
1079 numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1080 BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1081 return c;
1082 }
1083
recycleCharBlocks(Collection<CharArray> blocks,int32_t numBlocks)1084 void DocumentsWriter::recycleCharBlocks(Collection<CharArray> blocks, int32_t numBlocks) {
1085 SyncLock syncLock(this);
1086 for (int32_t i = 0; i < numBlocks; ++i) {
1087 freeCharBlocks.add(blocks[i]);
1088 blocks[i].reset();
1089 }
1090 }
1091
toMB(int64_t v)1092 String DocumentsWriter::toMB(int64_t v) {
1093 return StringUtils::toString((double)v / 1024.0 / 1024.0);
1094 }
1095
balanceRAM()1096 void DocumentsWriter::balanceRAM() {
1097 // We flush when we've used our target usage
1098 int64_t flushTrigger = ramBufferSize;
1099
1100 int64_t deletesRAMUsed = deletesInRAM->bytesUsed + deletesFlushed->bytesUsed;
1101
1102 if (numBytesAlloc + deletesRAMUsed > freeTrigger) {
1103 if (infoStream) {
1104 message(L" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +
1105 L" vs trigger=" + toMB(flushTrigger) +
1106 L" allocMB=" + toMB(numBytesAlloc) +
1107 L" deletesMB=" + toMB(deletesRAMUsed) +
1108 L" vs trigger=" + toMB(freeTrigger) +
1109 L" byteBlockFree=" + toMB(byteBlockAllocator->freeByteBlocks.size() * BYTE_BLOCK_SIZE) +
1110 L" perDocFree=" + toMB(perDocAllocator->freeByteBlocks.size() * PER_DOC_BLOCK_SIZE) +
1111 L" charBlockFree=" + toMB(freeCharBlocks.size() * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE));
1112 }
1113
1114 int64_t startBytesAlloc = numBytesAlloc + deletesRAMUsed;
1115
1116 int32_t iter = 0;
1117
1118 // We free equally from each pool in 32 KB chunks until we are below our threshold (freeLevel)
1119
1120 bool any = true;
1121
1122 while (numBytesAlloc + deletesRAMUsed > freeLevel) {
1123 {
1124 SyncLock syncLock(this);
1125 if (perDocAllocator->freeByteBlocks.empty() && byteBlockAllocator->freeByteBlocks.empty() &&
1126 freeCharBlocks.empty() && freeIntBlocks.empty() && !any) {
1127 // Nothing else to free -- must flush now.
1128 bufferIsFull = (numBytesUsed + deletesRAMUsed > flushTrigger);
1129 if (infoStream) {
1130 if (bufferIsFull) {
1131 message(L" nothing to free; now set bufferIsFull");
1132 } else {
1133 message(L" nothing to free");
1134 }
1135 }
1136 BOOST_ASSERT(numBytesUsed <= numBytesAlloc);
1137 break;
1138 }
1139
1140 if ((iter % 5) == 0 && !byteBlockAllocator->freeByteBlocks.empty()) {
1141 byteBlockAllocator->freeByteBlocks.removeLast();
1142 numBytesAlloc -= BYTE_BLOCK_SIZE;
1143 }
1144
1145 if ((iter % 5) == 1 && !freeCharBlocks.empty()) {
1146 freeCharBlocks.removeLast();
1147 numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1148 }
1149
1150 if ((iter % 5) == 2 && !freeIntBlocks.empty()) {
1151 freeIntBlocks.removeLast();
1152 numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
1153 }
1154
1155 if ((iter % 5) == 3 && !perDocAllocator->freeByteBlocks.empty()) {
1156 // Remove upwards of 32 blocks (each block is 1K)
1157 for (int32_t i = 0; i < 32; ++i) {
1158 perDocAllocator->freeByteBlocks.removeLast();
1159 numBytesAlloc -= PER_DOC_BLOCK_SIZE;
1160 if (perDocAllocator->freeByteBlocks.empty()) {
1161 break;
1162 }
1163 }
1164 }
1165 }
1166
1167 if ((iter % 5) == 4 && any) {
1168 // Ask consumer to free any recycled state
1169 any = consumer->freeRAM();
1170 }
1171
1172 ++iter;
1173 }
1174
1175 if (infoStream) {
1176 message(L" after free: freedMB=" + StringUtils::toString((double)(startBytesAlloc - numBytesAlloc - deletesRAMUsed) / 1024.0 / 1024.0) +
1177 L" usedMB=" + StringUtils::toString((double)(numBytesUsed + deletesRAMUsed) / 1024.0 / 1024.0) +
1178 L" allocMB=" + StringUtils::toString((double)numBytesAlloc / 1024.0 / 1024.0));
1179 }
1180 } else {
1181 // If we have not crossed the 100% mark, but have crossed the 95% mark of RAM we are actually
1182 // using, go ahead and flush. This prevents over-allocating and then freeing, with every flush.
1183 SyncLock syncLock(this);
1184 if (numBytesUsed + deletesRAMUsed > flushTrigger) {
1185 if (infoStream) {
1186 message(L" RAM: now flush @ usedMB=" + StringUtils::toString((double)numBytesUsed / 1024.0 / 1024.0) +
1187 L" allocMB=" + StringUtils::toString((double)numBytesAlloc / 1024.0 / 1024.0) +
1188 L" deletesMB=" + StringUtils::toString((double)deletesRAMUsed / 1024.0 / 1024.0) +
1189 L" triggerMB=" + StringUtils::toString((double)flushTrigger / 1024.0 / 1024.0));
1190 }
1191 bufferIsFull = true;
1192 }
1193 }
1194 }
1195
DocState()1196 DocState::DocState() {
1197 maxFieldLength = 0;
1198 docID = 0;
1199 }
1200
~DocState()1201 DocState::~DocState() {
1202 }
1203
testPoint(const String & name)1204 bool DocState::testPoint(const String& name) {
1205 return IndexWriterPtr(DocumentsWriterPtr(_docWriter)->_writer)->testPoint(name);
1206 }
1207
clear()1208 void DocState::clear() {
1209 // don't hold onto doc nor analyzer, in case it is large
1210 doc.reset();
1211 analyzer.reset();
1212 }
1213
PerDocBuffer(const DocumentsWriterPtr & docWriter)1214 PerDocBuffer::PerDocBuffer(const DocumentsWriterPtr& docWriter) {
1215 _docWriter = docWriter;
1216 }
1217
~PerDocBuffer()1218 PerDocBuffer::~PerDocBuffer() {
1219 }
1220
newBuffer(int32_t size)1221 ByteArray PerDocBuffer::newBuffer(int32_t size) {
1222 BOOST_ASSERT(size == DocumentsWriter::PER_DOC_BLOCK_SIZE);
1223 return DocumentsWriterPtr(_docWriter)->perDocAllocator->getByteBlock(false);
1224 }
1225
recycle()1226 void PerDocBuffer::recycle() {
1227 SyncLock syncLock(this);
1228 if (!buffers.empty()) {
1229 setLength(0);
1230
1231 // Recycle the blocks
1232 DocumentsWriterPtr(_docWriter)->perDocAllocator->recycleByteBlocks(buffers);
1233 buffers.clear();
1234 sizeInBytes = 0;
1235
1236 BOOST_ASSERT(numBuffers() == 0);
1237 }
1238 }
1239
DocWriter()1240 DocWriter::DocWriter() {
1241 docID = 0;
1242 }
1243
~DocWriter()1244 DocWriter::~DocWriter() {
1245 }
1246
setNext(const DocWriterPtr & next)1247 void DocWriter::setNext(const DocWriterPtr& next) {
1248 this->next = next;
1249 }
1250
~IndexingChain()1251 IndexingChain::~IndexingChain() {
1252 }
1253
~DefaultIndexingChain()1254 DefaultIndexingChain::~DefaultIndexingChain() {
1255 }
1256
getChain(const DocumentsWriterPtr & documentsWriter)1257 DocConsumerPtr DefaultIndexingChain::getChain(const DocumentsWriterPtr& documentsWriter) {
1258 TermsHashConsumerPtr termVectorsWriter(newLucene<TermVectorsTermsWriter>(documentsWriter));
1259 TermsHashConsumerPtr freqProxWriter(newLucene<FreqProxTermsWriter>());
1260
1261 InvertedDocConsumerPtr termsHash(newLucene<TermsHash>(documentsWriter, true, freqProxWriter,
1262 newLucene<TermsHash>(documentsWriter, false,
1263 termVectorsWriter, TermsHashPtr())));
1264
1265 DocInverterPtr docInverter(newLucene<DocInverter>(termsHash, newLucene<NormsWriter>()));
1266 return newLucene<DocFieldProcessor>(documentsWriter, docInverter);
1267 }
1268
~SkipDocWriter()1269 SkipDocWriter::~SkipDocWriter() {
1270 }
1271
finish()1272 void SkipDocWriter::finish() {
1273 }
1274
abort()1275 void SkipDocWriter::abort() {
1276 }
1277
sizeInBytes()1278 int64_t SkipDocWriter::sizeInBytes() {
1279 return 0;
1280 }
1281
WaitQueue(const DocumentsWriterPtr & docWriter)1282 WaitQueue::WaitQueue(const DocumentsWriterPtr& docWriter) {
1283 this->_docWriter = docWriter;
1284 waiting = Collection<DocWriterPtr>::newInstance(10);
1285 nextWriteDocID = 0;
1286 nextWriteLoc = 0;
1287 numWaiting = 0;
1288 waitingBytes = 0;
1289 }
1290
~WaitQueue()1291 WaitQueue::~WaitQueue() {
1292 }
1293
reset()1294 void WaitQueue::reset() {
1295 SyncLock syncLock(this);
1296 // NOTE: nextWriteLoc doesn't need to be reset
1297 BOOST_ASSERT(numWaiting == 0);
1298 BOOST_ASSERT(waitingBytes == 0);
1299 nextWriteDocID = 0;
1300 }
1301
doResume()1302 bool WaitQueue::doResume() {
1303 SyncLock syncLock(this);
1304 return (waitingBytes <= DocumentsWriterPtr(_docWriter)->waitQueueResumeBytes);
1305 }
1306
doPause()1307 bool WaitQueue::doPause() {
1308 SyncLock syncLock(this);
1309 return (waitingBytes > DocumentsWriterPtr(_docWriter)->waitQueuePauseBytes);
1310 }
1311
abort()1312 void WaitQueue::abort() {
1313 SyncLock syncLock(this);
1314 int32_t count = 0;
1315 for (Collection<DocWriterPtr>::iterator doc = waiting.begin(); doc != waiting.end(); ++doc) {
1316 if (*doc) {
1317 (*doc)->abort();
1318 doc->reset();
1319 ++count;
1320 }
1321 }
1322 waitingBytes = 0;
1323 BOOST_ASSERT(count == numWaiting);
1324 numWaiting = 0;
1325 }
1326
writeDocument(const DocWriterPtr & doc)1327 void WaitQueue::writeDocument(const DocWriterPtr& doc) {
1328 DocumentsWriterPtr docWriter(_docWriter);
1329 BOOST_ASSERT(doc == DocumentsWriterPtr(docWriter)->skipDocWriter || nextWriteDocID == doc->docID);
1330 bool success = false;
1331 LuceneException finally;
1332 try {
1333 doc->finish();
1334 ++nextWriteDocID;
1335 ++docWriter->numDocsInStore;
1336 ++nextWriteLoc;
1337 BOOST_ASSERT(nextWriteLoc <= waiting.size());
1338 if (nextWriteLoc == waiting.size()) {
1339 nextWriteLoc = 0;
1340 }
1341 success = true;
1342 } catch (LuceneException& e) {
1343 finally = e;
1344 }
1345 if (!success) {
1346 docWriter->setAborting();
1347 }
1348 finally.throwException();
1349 }
1350
add(const DocWriterPtr & doc)1351 bool WaitQueue::add(const DocWriterPtr& doc) {
1352 DocWriterPtr _doc(doc);
1353 SyncLock syncLock(this);
1354 BOOST_ASSERT(_doc->docID >= nextWriteDocID);
1355 if (_doc->docID == nextWriteDocID) {
1356 writeDocument(_doc);
1357 while (true) {
1358 _doc = waiting[nextWriteLoc];
1359 if (_doc) {
1360 --numWaiting;
1361 waiting[nextWriteLoc].reset();
1362 waitingBytes -= _doc->sizeInBytes();
1363 writeDocument(_doc);
1364 } else {
1365 break;
1366 }
1367 }
1368 } else {
1369 // I finished before documents that were added before me. This can easily happen when I am a small doc
1370 // and the docs before me were large, or just due to luck in the thread scheduling. Just add myself to
1371 // the queue and when that large doc finishes, it will flush me
1372 int32_t gap = _doc->docID - nextWriteDocID;
1373 if (gap >= waiting.size()) {
1374 // Grow queue
1375 Collection<DocWriterPtr> newArray(Collection<DocWriterPtr>::newInstance(MiscUtils::getNextSize(gap)));
1376 BOOST_ASSERT(nextWriteLoc >= 0);
1377 MiscUtils::arrayCopy(waiting.begin(), nextWriteLoc, newArray.begin(), 0, waiting.size() - nextWriteLoc);
1378 MiscUtils::arrayCopy(waiting.begin(), 0, newArray.begin(), waiting.size() - nextWriteLoc, nextWriteLoc);
1379 nextWriteLoc = 0;
1380 waiting = newArray;
1381 gap = _doc->docID - nextWriteDocID;
1382 }
1383
1384 int32_t loc = nextWriteLoc + gap;
1385 if (loc >= waiting.size()) {
1386 loc -= waiting.size();
1387 }
1388
1389 // We should only wrap one time
1390 BOOST_ASSERT(loc < waiting.size());
1391
1392 // Nobody should be in my spot!
1393 BOOST_ASSERT(!waiting[loc]);
1394 waiting[loc] = _doc;
1395 ++numWaiting;
1396 waitingBytes += _doc->sizeInBytes();
1397 }
1398
1399 return doPause();
1400 }
1401
ByteBlockAllocator(const DocumentsWriterPtr & docWriter,int32_t blockSize)1402 ByteBlockAllocator::ByteBlockAllocator(const DocumentsWriterPtr& docWriter, int32_t blockSize) {
1403 this->blockSize = blockSize;
1404 this->freeByteBlocks = Collection<ByteArray>::newInstance();
1405 this->_docWriter = docWriter;
1406 }
1407
~ByteBlockAllocator()1408 ByteBlockAllocator::~ByteBlockAllocator() {
1409 }
1410
getByteBlock(bool trackAllocations)1411 ByteArray ByteBlockAllocator::getByteBlock(bool trackAllocations) {
1412 DocumentsWriterPtr docWriter(_docWriter);
1413 SyncLock syncLock(docWriter);
1414 int32_t size = freeByteBlocks.size();
1415 ByteArray b;
1416 if (size == 0) {
1417 // Always record a block allocated, even if trackAllocations is false. This is necessary because this block will
1418 // be shared between things that don't track allocations (term vectors) and things that do (freq/prox postings).
1419 docWriter->numBytesAlloc += blockSize;
1420 b = ByteArray::newInstance(blockSize);
1421 MiscUtils::arrayFill(b.get(), 0, b.size(), 0);
1422 } else {
1423 b = freeByteBlocks.removeLast();
1424 }
1425 if (trackAllocations) {
1426 docWriter->numBytesUsed += blockSize;
1427 }
1428 BOOST_ASSERT(docWriter->numBytesUsed <= docWriter->numBytesAlloc);
1429 return b;
1430 }
1431
recycleByteBlocks(Collection<ByteArray> blocks,int32_t start,int32_t end)1432 void ByteBlockAllocator::recycleByteBlocks(Collection<ByteArray> blocks, int32_t start, int32_t end) {
1433 DocumentsWriterPtr docWriter(_docWriter);
1434 SyncLock syncLock(docWriter);
1435 for (int32_t i = start; i < end; ++i) {
1436 freeByteBlocks.add(blocks[i]);
1437 blocks[i].reset();
1438 }
1439 }
1440
recycleByteBlocks(Collection<ByteArray> blocks)1441 void ByteBlockAllocator::recycleByteBlocks(Collection<ByteArray> blocks) {
1442 DocumentsWriterPtr docWriter(_docWriter);
1443 SyncLock syncLock(docWriter);
1444 int32_t size = blocks.size();
1445 for (int32_t i = 0; i < size; ++i) {
1446 freeByteBlocks.add(blocks[i]);
1447 }
1448 }
1449
1450 }
1451