1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "IndexWriter.h"
9 #include "_IndexWriter.h"
10 #include "Directory.h"
11 #include "Analyzer.h"
12 #include "KeepOnlyLastCommitDeletionPolicy.h"
13 #include "DocumentsWriter.h"
14 #include "IndexFileDeleter.h"
15 #include "IndexFileNames.h"
16 #include "Lock.h"
17 #include "SegmentInfo.h"
18 #include "SegmentReader.h"
19 #include "ReadOnlyDirectoryReader.h"
20 #include "BufferedIndexInput.h"
21 #include "LogByteSizeMergePolicy.h"
22 #include "LogDocMergePolicy.h"
23 #include "Similarity.h"
24 #include "ConcurrentMergeScheduler.h"
25 #include "CompoundFileWriter.h"
26 #include "SegmentMerger.h"
27 #include "DateTools.h"
28 #include "Constants.h"
29 #include "InfoStream.h"
30 #include "TestPoint.h"
31 #include "StringUtils.h"
32 
33 namespace Lucene {
34 
35 /// The normal read buffer size defaults to 1024, but increasing this during merging seems to
36 /// yield performance gains.  However we don't want to increase it too much because there are
37 /// quite a few BufferedIndexInputs created during merging.
38 const int32_t IndexWriter::MERGE_READ_BUFFER_SIZE = 4096;
39 
40 int32_t IndexWriter::MESSAGE_ID = 0;
41 InfoStreamPtr IndexWriter::defaultInfoStream;
42 
43 /// Default value for the write lock timeout (1,000).
44 int64_t IndexWriter::WRITE_LOCK_TIMEOUT = 1000;
45 
46 const String IndexWriter::WRITE_LOCK_NAME = L"write.lock";
47 
48 /// Value to denote a flush trigger is disabled.
49 const int32_t IndexWriter::DISABLE_AUTO_FLUSH = -1;
50 
51 /// Disabled by default (because IndexWriter flushes by RAM usage by default).
52 const int32_t IndexWriter::DEFAULT_MAX_BUFFERED_DOCS = IndexWriter::DISABLE_AUTO_FLUSH;
53 
54 /// Default value is 16 MB (which means flush when buffered docs consume 16 MB RAM).
55 const double IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB = 16.0;
56 
57 /// Disabled by default (because IndexWriter flushes by RAM usage by default).
58 const int32_t IndexWriter::DEFAULT_MAX_BUFFERED_DELETE_TERMS = IndexWriter::DISABLE_AUTO_FLUSH;
59 
60 /// Default value is 10000.
61 const int32_t IndexWriter::DEFAULT_MAX_FIELD_LENGTH = 10000;
62 
63 /// Default value is 128.
64 const int32_t IndexWriter::DEFAULT_TERM_INDEX_INTERVAL = 128;
65 
66 /// Sets the maximum field length to INT_MAX
67 const int32_t IndexWriter::MaxFieldLengthUNLIMITED = INT_MAX;
68 
69 /// Sets the maximum field length to {@link #DEFAULT_MAX_FIELD_LENGTH}
70 const int32_t IndexWriter::MaxFieldLengthLIMITED = IndexWriter::DEFAULT_MAX_FIELD_LENGTH;
71 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,int32_t mfl)72 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, int32_t mfl) {
73     this->directory = d;
74     this->analyzer = a;
75     this->create = create;
76     this->maxFieldLength = mfl;
77 }
78 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,int32_t mfl)79 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, int32_t mfl) {
80     this->directory = d;
81     this->analyzer = a;
82     this->create = !IndexReader::indexExists(d);
83     this->maxFieldLength = mfl;
84 }
85 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl)86 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl) {
87     this->directory = d;
88     this->analyzer = a;
89     this->deletionPolicy = deletionPolicy;
90     this->create = !IndexReader::indexExists(d);
91     this->maxFieldLength = mfl;
92 }
93 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl)94 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl) {
95     this->directory = d;
96     this->analyzer = a;
97     this->create = create;
98     this->deletionPolicy = deletionPolicy;
99     this->maxFieldLength = mfl;
100 }
101 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl,const IndexingChainPtr & indexingChain,const IndexCommitPtr & commit)102 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl, const IndexingChainPtr& indexingChain, const IndexCommitPtr& commit) {
103     this->directory = d;
104     this->analyzer = a;
105     this->create = create;
106     this->deletionPolicy = deletionPolicy;
107     this->maxFieldLength = mfl;
108     this->indexingChain = indexingChain;
109     this->indexCommit = commit;
110 }
111 
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl,const IndexCommitPtr & commit)112 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl, const IndexCommitPtr& commit) {
113     this->directory = d;
114     this->analyzer = a;
115     this->create = false;
116     this->deletionPolicy = deletionPolicy;
117     this->maxFieldLength = mfl;
118     this->indexCommit = commit;
119 }
120 
~IndexWriter()121 IndexWriter::~IndexWriter() {
122 }
123 
initialize()124 void IndexWriter::initialize() {
125     messageID = -1;
126     messageIDLock = newInstance<Synchronize>();
127     setMessageID(defaultInfoStream);
128     this->writeLockTimeout = WRITE_LOCK_TIMEOUT;
129     this->segmentInfos = newLucene<SegmentInfos>();
130     pendingMerges = Collection<OneMergePtr>::newInstance();
131     mergeExceptions = Collection<OneMergePtr>::newInstance();
132     segmentsToOptimize = SetSegmentInfo::newInstance();
133     optimizeMaxNumSegments = 0;
134     mergingSegments = SetSegmentInfo::newInstance();
135     runningMerges = SetOneMerge::newInstance();
136     synced = HashSet<String>::newInstance();
137     syncing = HashSet<String>::newInstance();
138     changeCount = 0;
139     lastCommitChangeCount = 0;
140     poolReaders = false;
141     readCount = 0;
142     writeThread = 0;
143     upgradeCount = 0;
144     readerTermsIndexDivisor = IndexReader::DEFAULT_TERMS_INDEX_DIVISOR;
145     readerPool = newLucene<ReaderPool>(shared_from_this());
146     closed = false;
147     closing = false;
148     hitOOM = false;
149     stopMerges = false;
150     mergeGen = 0;
151     flushCount = 0;
152     flushDeletesCount = 0;
153     localFlushedDocCount = 0;
154     pendingCommitChangeCount = 0;
155     mergePolicy = newLucene<LogByteSizeMergePolicy>(shared_from_this());
156     mergeScheduler = newLucene<ConcurrentMergeScheduler>();
157     similarity = Similarity::getDefault();
158     termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;
159     commitLock  = newInstance<Synchronize>();
160 
161     if (!indexingChain) {
162         indexingChain = DocumentsWriter::getDefaultIndexingChain();
163     }
164 
165     if (create) {
166         directory->clearLock(WRITE_LOCK_NAME);    // clear the write lock in case it's leftover
167     }
168 
169     LockPtr writeLock(directory->makeLock(WRITE_LOCK_NAME));
170 
171     if (!writeLock->obtain((int32_t)writeLockTimeout)) { // obtain write lock
172         boost::throw_exception(LockObtainFailedException(L"Index locked for write: " + writeLock->toString()));
173     }
174     this->writeLock = writeLock;
175 
176     bool success = false;
177     LuceneException finally;
178 
179     try {
180         if (create) {
181             // Try to read first.  This is to allow create against an index that's currently open for
182             // searching.  In this case we write the next segments_N file with no segments
183             bool doCommit;
184             try {
185                 segmentInfos->read(directory);
186                 segmentInfos->clear();
187                 doCommit = false;
188             } catch (LuceneException&) {
189                 // Likely this means it's a fresh directory
190                 doCommit = true;
191             }
192 
193             if (doCommit) {
194                 // Only commit if there is no segments file in this dir already.
195                 segmentInfos->commit(directory);
196                 HashSet<String> files(segmentInfos->files(directory, true));
197                 synced.addAll(files.begin(), files.end());
198             } else {
199                 // Record that we have a change (zero out all segments) pending
200                 ++changeCount;
201             }
202         } else {
203             segmentInfos->read(directory);
204 
205             if (indexCommit) {
206                 // Swap out all segments, but, keep metadata in SegmentInfos, like version & generation, to
207                 // preserve write-once.  This is important if readers are open against the future commit points.
208                 if (indexCommit->getDirectory() != directory) {
209                     boost::throw_exception(IllegalArgumentException(L"IndexCommit's directory doesn't match my directory"));
210                 }
211                 SegmentInfosPtr oldInfos(newLucene<SegmentInfos>());
212                 oldInfos->read(directory, indexCommit->getSegmentsFileName());
213                 segmentInfos->replace(oldInfos);
214                 ++changeCount;
215                 if (infoStream) {
216                     message(L"init: loaded commit \"" + indexCommit->getSegmentsFileName() + L"\"");
217                 }
218             }
219 
220             // We assume that this segments_N was previously properly sync'd
221             HashSet<String> files(segmentInfos->files(directory, true));
222             synced.addAll(files.begin(), files.end());
223         }
224 
225         setRollbackSegmentInfos(segmentInfos);
226 
227         docWriter = newLucene<DocumentsWriter>(directory, shared_from_this(), indexingChain);
228         docWriter->setInfoStream(infoStream);
229         docWriter->setMaxFieldLength(maxFieldLength);
230 
231         // Default deleter (for backwards compatibility) is KeepOnlyLastCommitDeleter
232         deleter = newLucene<IndexFileDeleter>(directory, deletionPolicy ? deletionPolicy : newLucene<KeepOnlyLastCommitDeletionPolicy>(), segmentInfos, infoStream, docWriter, synced);
233 
234         if (deleter->startingCommitDeleted) {
235             // Deletion policy deleted the "head" commit point.  We have to mark ourself as changed so that if we
236             // are closed without any further changes we write a new segments_N file.
237             ++changeCount;
238         }
239 
240         pushMaxBufferedDocs();
241 
242         if (infoStream) {
243             message(L"init: create=" + StringUtils::toString(create));
244         }
245         messageState();
246 
247         success = true;
248     } catch (LuceneException& e) {
249         finally = e;
250     }
251 
252     if (!success) {
253         if (infoStream) {
254             message(L"init: hit exception on init; releasing write lock");
255         }
256         try {
257             this->writeLock->release();
258         } catch (...) {
259             // don't mask the original exception
260         }
261         this->writeLock.reset();
262     }
263 
264     finally.throwException();
265 }
266 
MAX_TERM_LENGTH()267 int32_t IndexWriter::MAX_TERM_LENGTH() {
268     static int32_t _MAX_TERM_LENGTH = 0;
269     if (_MAX_TERM_LENGTH == 0) {
270         _MAX_TERM_LENGTH = DocumentsWriter::MAX_TERM_LENGTH;
271     }
272     return _MAX_TERM_LENGTH;
273 }
274 
getReader()275 IndexReaderPtr IndexWriter::getReader() {
276     return getReader(readerTermsIndexDivisor);
277 }
278 
getReader(int32_t termInfosIndexDivisor)279 IndexReaderPtr IndexWriter::getReader(int32_t termInfosIndexDivisor) {
280     ensureOpen();
281 
282     if (infoStream) {
283         message(L"flush at getReader");
284     }
285 
286     // Do this up front before flushing so that the readers obtained during this flush are pooled, the first time
287     // this method is called
288     poolReaders = true;
289 
290     // Prevent segmentInfos from changing while opening the reader; in theory we could do similar retry logic,
291     // just like we do when loading segments_N
292     IndexReaderPtr r;
293     {
294         SyncLock syncLock(this);
295         flush(false, true, true);
296         r = newLucene<ReadOnlyDirectoryReader>(shared_from_this(), segmentInfos, termInfosIndexDivisor);
297     }
298     maybeMerge();
299     return r;
300 }
301 
numDeletedDocs(const SegmentInfoPtr & info)302 int32_t IndexWriter::numDeletedDocs(const SegmentInfoPtr& info) {
303     SegmentReaderPtr reader(readerPool->getIfExists(info));
304     int32_t deletedDocs = 0;
305     LuceneException finally;
306     try {
307         deletedDocs = reader ? reader->numDeletedDocs() : info->getDelCount();
308     } catch (LuceneException& e) {
309         finally = e;
310     }
311     if (reader) {
312         readerPool->release(reader);
313     }
314     finally.throwException();
315     return deletedDocs;
316 }
317 
acquireWrite()318 void IndexWriter::acquireWrite() {
319     SyncLock syncLock(this);
320     BOOST_ASSERT(writeThread != LuceneThread::currentId());
321     while (writeThread != 0 || readCount > 0) {
322         doWait();
323     }
324 
325     // we could have been closed while we were waiting
326     ensureOpen();
327 
328     writeThread = LuceneThread::currentId();
329 }
330 
releaseWrite()331 void IndexWriter::releaseWrite() {
332     SyncLock syncLock(this);
333     BOOST_ASSERT(writeThread == LuceneThread::currentId());
334     writeThread = 0;
335     notifyAll();
336 }
337 
acquireRead()338 void IndexWriter::acquireRead() {
339     SyncLock syncLock(this);
340     int64_t current = LuceneThread::currentId();
341     while (writeThread != 0 && writeThread != current) {
342         doWait();
343     }
344     ++readCount;
345 }
346 
upgradeReadToWrite()347 void IndexWriter::upgradeReadToWrite() {
348     SyncLock syncLock(this);
349     BOOST_ASSERT(readCount > 0);
350     ++upgradeCount;
351     while (readCount > upgradeCount || writeThread != 0) {
352         doWait();
353     }
354     writeThread = LuceneThread::currentId();
355     --readCount;
356     --upgradeCount;
357 }
358 
releaseRead()359 void IndexWriter::releaseRead() {
360     SyncLock syncLock(this);
361     --readCount;
362     BOOST_ASSERT(readCount >= 0);
363     notifyAll();
364 }
365 
isOpen(bool includePendingClose)366 bool IndexWriter::isOpen(bool includePendingClose) {
367     SyncLock syncLock(this);
368     return !(closed || (includePendingClose && closing));
369 }
370 
ensureOpen(bool includePendingClose)371 void IndexWriter::ensureOpen(bool includePendingClose) {
372     SyncLock syncLock(this);
373     if (!isOpen(includePendingClose)) {
374         boost::throw_exception(AlreadyClosedException(L"This IndexWriter is closed"));
375     }
376 }
377 
ensureOpen()378 void IndexWriter::ensureOpen() {
379     ensureOpen(true);
380 }
381 
message(const String & message)382 void IndexWriter::message(const String& message) {
383     if (infoStream) {
384         *infoStream << L"IW " << StringUtils::toString(messageID);
385         *infoStream << L" [" << DateTools::timeToString(MiscUtils::currentTimeMillis(), DateTools::RESOLUTION_SECOND);
386         *infoStream << L"; " << StringUtils::toString(LuceneThread::currentId()) << L"]: " << message << L"\n";
387     }
388 }
389 
setMessageID(const InfoStreamPtr & infoStream)390 void IndexWriter::setMessageID(const InfoStreamPtr& infoStream) {
391     SyncLock syncLock(this);
392     if (infoStream && messageID == -1) {
393         SyncLock messageLock(messageIDLock);
394         messageID = MESSAGE_ID++;
395     }
396     this->infoStream = infoStream;
397 }
398 
getLogMergePolicy()399 LogMergePolicyPtr IndexWriter::getLogMergePolicy() {
400     LogMergePolicyPtr logMergePolicy(boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy));
401     if (logMergePolicy) {
402         return logMergePolicy;
403     }
404     boost::throw_exception(IllegalArgumentException(L"This method can only be called when the merge policy is the default LogMergePolicy"));
405     return LogMergePolicyPtr();
406 }
407 
getUseCompoundFile()408 bool IndexWriter::getUseCompoundFile() {
409     return getLogMergePolicy()->getUseCompoundFile();
410 }
411 
setUseCompoundFile(bool value)412 void IndexWriter::setUseCompoundFile(bool value) {
413     getLogMergePolicy()->setUseCompoundFile(value);
414     getLogMergePolicy()->setUseCompoundDocStore(value);
415 }
416 
setSimilarity(const SimilarityPtr & similarity)417 void IndexWriter::setSimilarity(const SimilarityPtr& similarity) {
418     ensureOpen();
419     this->similarity = similarity;
420     docWriter->setSimilarity(similarity);
421 }
422 
getSimilarity()423 SimilarityPtr IndexWriter::getSimilarity() {
424     ensureOpen();
425     return this->similarity;
426 }
427 
setTermIndexInterval(int32_t interval)428 void IndexWriter::setTermIndexInterval(int32_t interval) {
429     ensureOpen();
430     this->termIndexInterval = interval;
431 }
432 
getTermIndexInterval()433 int32_t IndexWriter::getTermIndexInterval() {
434     // We pass false because this method is called by SegmentMerger while we are in the process of closing
435     ensureOpen(false);
436     return termIndexInterval;
437 }
438 
setRollbackSegmentInfos(const SegmentInfosPtr & infos)439 void IndexWriter::setRollbackSegmentInfos(const SegmentInfosPtr& infos) {
440     SyncLock syncLock(this);
441     rollbackSegmentInfos = boost::dynamic_pointer_cast<SegmentInfos>(infos->clone());
442     BOOST_ASSERT(!rollbackSegmentInfos->hasExternalSegments(directory));
443     rollbackSegments = MapSegmentInfoInt::newInstance();
444     int32_t size = rollbackSegmentInfos->size();
445     for (int32_t i = 0; i < size; ++i) {
446         rollbackSegments.put(rollbackSegmentInfos->info(i), i);
447     }
448 }
449 
setMergePolicy(const MergePolicyPtr & mp)450 void IndexWriter::setMergePolicy(const MergePolicyPtr& mp) {
451     ensureOpen();
452     if (!mp) {
453         boost::throw_exception(NullPointerException(L"MergePolicy must be non-null"));
454     }
455 
456     if (mergePolicy != mp) {
457         mergePolicy->close();
458     }
459     mergePolicy = mp;
460     pushMaxBufferedDocs();
461     if (infoStream) {
462         message(L"setMergePolicy");
463     }
464 }
465 
getMergePolicy()466 MergePolicyPtr IndexWriter::getMergePolicy() {
467     ensureOpen();
468     return mergePolicy;
469 }
470 
setMergeScheduler(const MergeSchedulerPtr & mergeScheduler)471 void IndexWriter::setMergeScheduler(const MergeSchedulerPtr& mergeScheduler) {
472     SyncLock syncLock(this);
473     ensureOpen();
474     if (!mergeScheduler) {
475         boost::throw_exception(NullPointerException(L"MergeScheduler must be non-null"));
476     }
477     if (this->mergeScheduler != mergeScheduler) {
478         finishMerges(true);
479         this->mergeScheduler->close();
480     }
481     this->mergeScheduler = mergeScheduler;
482     if (infoStream) {
483         message(L"setMergeScheduler");
484     }
485 }
486 
getMergeScheduler()487 MergeSchedulerPtr IndexWriter::getMergeScheduler() {
488     ensureOpen();
489     return mergeScheduler;
490 }
491 
setMaxMergeDocs(int32_t maxMergeDocs)492 void IndexWriter::setMaxMergeDocs(int32_t maxMergeDocs) {
493     getLogMergePolicy()->setMaxMergeDocs(maxMergeDocs);
494 }
495 
getMaxMergeDocs()496 int32_t IndexWriter::getMaxMergeDocs() {
497     return getLogMergePolicy()->getMaxMergeDocs();
498 }
499 
setMaxFieldLength(int32_t maxFieldLength)500 void IndexWriter::setMaxFieldLength(int32_t maxFieldLength) {
501     ensureOpen();
502     this->maxFieldLength = maxFieldLength;
503     docWriter->setMaxFieldLength(maxFieldLength);
504     if (infoStream) {
505         message(L"setMaxFieldLength " + StringUtils::toString(maxFieldLength));
506     }
507 }
508 
getMaxFieldLength()509 int32_t IndexWriter::getMaxFieldLength() {
510     ensureOpen();
511     return maxFieldLength;
512 }
513 
setReaderTermsIndexDivisor(int32_t divisor)514 void IndexWriter::setReaderTermsIndexDivisor(int32_t divisor) {
515     ensureOpen();
516     if (divisor <= 0) {
517         boost::throw_exception(IllegalArgumentException(L"divisor must be >= 1 (got " + StringUtils::toString(divisor) + L")"));
518     }
519     readerTermsIndexDivisor = divisor;
520     if (infoStream) {
521         message(L"setReaderTermsIndexDivisor " + StringUtils::toString(readerTermsIndexDivisor));
522     }
523 }
524 
getReaderTermsIndexDivisor()525 int32_t IndexWriter::getReaderTermsIndexDivisor() {
526     ensureOpen();
527     return readerTermsIndexDivisor;
528 }
529 
setMaxBufferedDocs(int32_t maxBufferedDocs)530 void IndexWriter::setMaxBufferedDocs(int32_t maxBufferedDocs) {
531     ensureOpen();
532     if (maxBufferedDocs != DISABLE_AUTO_FLUSH && maxBufferedDocs < 2) {
533         boost::throw_exception(IllegalArgumentException(L"maxBufferedDocs must at least be 2 when enabled"));
534     }
535     if (maxBufferedDocs == DISABLE_AUTO_FLUSH && getRAMBufferSizeMB() == DISABLE_AUTO_FLUSH) {
536         boost::throw_exception(IllegalArgumentException(L"at least one of ramBufferSize and maxBufferedDocs must be enabled"));
537     }
538     docWriter->setMaxBufferedDocs(maxBufferedDocs);
539     pushMaxBufferedDocs();
540     if (infoStream) {
541         message(L"setMaxBufferedDocs " + StringUtils::toString(maxBufferedDocs));
542     }
543 }
544 
pushMaxBufferedDocs()545 void IndexWriter::pushMaxBufferedDocs() {
546     if (docWriter->getMaxBufferedDocs() != DISABLE_AUTO_FLUSH) {
547         LogDocMergePolicyPtr lmp(boost::dynamic_pointer_cast<LogDocMergePolicy>(mergePolicy));
548         if (lmp) {
549             int32_t maxBufferedDocs = docWriter->getMaxBufferedDocs();
550             if (lmp->getMinMergeDocs() != maxBufferedDocs) {
551                 if (infoStream) {
552                     message(L"now push maxBufferedDocs " + StringUtils::toString(maxBufferedDocs) + L" to LogDocMergePolicy");
553                 }
554                 lmp->setMinMergeDocs(maxBufferedDocs);
555             }
556         }
557     }
558 }
559 
getMaxBufferedDocs()560 int32_t IndexWriter::getMaxBufferedDocs() {
561     ensureOpen();
562     return docWriter->getMaxBufferedDocs();
563 }
564 
setRAMBufferSizeMB(double mb)565 void IndexWriter::setRAMBufferSizeMB(double mb) {
566     if (mb > 2048.0) {
567         boost::throw_exception(IllegalArgumentException(L"ramBufferSize " + StringUtils::toString(mb) + L" is too large; should be comfortably less than 2048"));
568     }
569     if (mb != DISABLE_AUTO_FLUSH && mb <= 0.0) {
570         boost::throw_exception(IllegalArgumentException(L"ramBufferSize should be > 0.0 MB when enabled"));
571     }
572     if (mb == DISABLE_AUTO_FLUSH && getMaxBufferedDocs() == DISABLE_AUTO_FLUSH) {
573         boost::throw_exception(IllegalArgumentException(L"at least one of ramBufferSize and maxBufferedDocs must be enabled"));
574     }
575     docWriter->setRAMBufferSizeMB(mb);
576     if (infoStream) {
577         message(L"setRAMBufferSizeMB " + StringUtils::toString(mb));
578     }
579 }
580 
getRAMBufferSizeMB()581 double IndexWriter::getRAMBufferSizeMB() {
582     return docWriter->getRAMBufferSizeMB();
583 }
584 
setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms)585 void IndexWriter::setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms) {
586     ensureOpen();
587     if (maxBufferedDeleteTerms != DISABLE_AUTO_FLUSH && maxBufferedDeleteTerms < 1) {
588         boost::throw_exception(IllegalArgumentException(L"maxBufferedDeleteTerms must at least be 1 when enabled"));
589     }
590     docWriter->setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);
591     if (infoStream) {
592         message(L"setMaxBufferedDeleteTerms " + StringUtils::toString(maxBufferedDeleteTerms));
593     }
594 }
595 
getMaxBufferedDeleteTerms()596 int32_t IndexWriter::getMaxBufferedDeleteTerms() {
597     ensureOpen();
598     return docWriter->getMaxBufferedDeleteTerms();
599 }
600 
setMergeFactor(int32_t mergeFactor)601 void IndexWriter::setMergeFactor(int32_t mergeFactor) {
602     getLogMergePolicy()->setMergeFactor(mergeFactor);
603 }
604 
getMergeFactor()605 int32_t IndexWriter::getMergeFactor() {
606     return getLogMergePolicy()->getMergeFactor();
607 }
608 
setDefaultInfoStream(const InfoStreamPtr & infoStream)609 void IndexWriter::setDefaultInfoStream(const InfoStreamPtr& infoStream) {
610     IndexWriter::defaultInfoStream = infoStream;
611 }
612 
getDefaultInfoStream()613 InfoStreamPtr IndexWriter::getDefaultInfoStream() {
614     return IndexWriter::defaultInfoStream;
615 }
616 
setInfoStream(const InfoStreamPtr & infoStream)617 void IndexWriter::setInfoStream(const InfoStreamPtr& infoStream) {
618     ensureOpen();
619     setMessageID(infoStream);
620     docWriter->setInfoStream(infoStream);
621     deleter->setInfoStream(infoStream);
622     messageState();
623 }
624 
messageState()625 void IndexWriter::messageState() {
626     if (infoStream) {
627         message(L"ramBufferSizeMB=" + StringUtils::toString(docWriter->getRAMBufferSizeMB()) +
628                 L" maxBufferedDocs=" + StringUtils::toString(docWriter->getMaxBufferedDocs()) +
629                 L" maxBuffereDeleteTerms=" + StringUtils::toString(docWriter->getMaxBufferedDeleteTerms()) +
630                 L" maxFieldLength=" + StringUtils::toString(maxFieldLength) +
631                 L" index=" + segString());
632     }
633 }
634 
getInfoStream()635 InfoStreamPtr IndexWriter::getInfoStream() {
636     ensureOpen();
637     return infoStream;
638 }
639 
verbose()640 bool IndexWriter::verbose() {
641     return infoStream.get() != NULL;
642 }
643 
setWriteLockTimeout(int64_t writeLockTimeout)644 void IndexWriter::setWriteLockTimeout(int64_t writeLockTimeout) {
645     ensureOpen();
646     this->writeLockTimeout = writeLockTimeout;
647 }
648 
getWriteLockTimeout()649 int64_t IndexWriter::getWriteLockTimeout() {
650     ensureOpen();
651     return writeLockTimeout;
652 }
653 
setDefaultWriteLockTimeout(int64_t writeLockTimeout)654 void IndexWriter::setDefaultWriteLockTimeout(int64_t writeLockTimeout) {
655     IndexWriter::WRITE_LOCK_TIMEOUT = writeLockTimeout;
656 }
657 
getDefaultWriteLockTimeout()658 int64_t IndexWriter::getDefaultWriteLockTimeout() {
659     return IndexWriter::WRITE_LOCK_TIMEOUT;
660 }
661 
close()662 void IndexWriter::close() {
663     close(true);
664 }
665 
close(bool waitForMerges)666 void IndexWriter::close(bool waitForMerges) {
667     // Ensure that only one thread actually gets to do the closing
668     if (shouldClose()) {
669         // If any methods have hit std::bad_alloc, then abort on close, in case the internal state of IndexWriter
670         // or DocumentsWriter is corrupt
671         if (hitOOM) {
672             rollbackInternal();
673         } else {
674             closeInternal(waitForMerges);
675         }
676     }
677 }
678 
shouldClose()679 bool IndexWriter::shouldClose() {
680     SyncLock syncLock(this);
681     while (true) {
682         if (!closed) {
683             if (!closing) {
684                 closing = true;
685                 return true;
686             } else {
687                 // Another thread is presently trying to close; wait until it finishes one way (closes
688                 // successfully) or another (fails to close)
689                 doWait();
690             }
691         } else {
692             return false;
693         }
694     }
695 }
696 
closeInternal(bool waitForMerges)697 void IndexWriter::closeInternal(bool waitForMerges) {
698     docWriter->pauseAllThreads();
699 
700     LuceneException finally;
701     try {
702         if (infoStream) {
703             message(L"now flush at close");
704         }
705 
706         docWriter->close();
707 
708         // Only allow a new merge to be triggered if we are going to wait for merges
709         if (!hitOOM) {
710             flush(waitForMerges, true, true);
711         }
712 
713         // Give merge scheduler last chance to run, in case any pending merges are waiting
714         if (waitForMerges) {
715             mergeScheduler->merge(shared_from_this());
716         }
717 
718         mergePolicy->close();
719 
720         finishMerges(waitForMerges);
721         stopMerges = true;
722 
723         mergeScheduler->close();
724 
725         if (infoStream) {
726             message(L"now call final commit()");
727         }
728 
729         if (!hitOOM) {
730             commit(0);
731         }
732 
733         if (infoStream) {
734             message(L"at close: " + segString());
735         }
736 
737         {
738             SyncLock syncLock(this);
739             readerPool->close();
740             docWriter.reset();
741             deleter->close();
742         }
743 
744         if (writeLock) {
745             writeLock->release(); // release write lock
746             writeLock.reset();
747         }
748 
749         {
750             SyncLock syncLock(this);
751             closed = true;
752         }
753     } catch (std::bad_alloc& oom) {
754         finally = handleOOM(oom, L"closeInternal");
755     } catch (LuceneException& e) {
756         finally = e;
757     }
758     {
759         SyncLock syncLock(this);
760         closing = false;
761         notifyAll();
762         if (!closed) {
763             if (docWriter) {
764                 docWriter->resumeAllThreads();
765             }
766             if (infoStream) {
767                 message(L"hit exception while closing");
768             }
769         }
770     }
771     finally.throwException();
772 }
773 
flushDocStores()774 bool IndexWriter::flushDocStores() {
775     SyncLock syncLock(this);
776 
777     if (infoStream) {
778         message(L"flushDocStores segment=" + docWriter->getDocStoreSegment());
779     }
780 
781     bool useCompoundDocStore = false;
782 
783     if (infoStream) {
784         message(L"closeDocStores segment=" + docWriter->getDocStoreSegment());
785     }
786 
787     String docStoreSegment;
788 
789     bool success = false;
790     LuceneException finally;
791     try {
792         docStoreSegment = docWriter->closeDocStore();
793         success = true;
794     } catch (LuceneException& e) {
795         finally = e;
796     }
797     if (!success && infoStream) {
798         message(L"hit exception closing doc store segment");
799     }
800     finally.throwException();
801 
802     if (infoStream) {
803         message(L"flushDocStores files=" + StringUtils::toString(docWriter->closedFiles()));
804     }
805 
806     useCompoundDocStore = mergePolicy->useCompoundDocStore(segmentInfos);
807     HashSet<String> closedFiles(docWriter->closedFiles());
808 
809     if (useCompoundDocStore && !docStoreSegment.empty() && !closedFiles.empty()) {
810         // Now build compound doc store file
811         if (infoStream) {
812             message(L"create compound file " + docStoreSegment + L"." + IndexFileNames::COMPOUND_FILE_STORE_EXTENSION());
813         }
814 
815         success = false;
816 
817         int32_t numSegments = segmentInfos->size();
818         String compoundFileName(docStoreSegment + L"." + IndexFileNames::COMPOUND_FILE_STORE_EXTENSION());
819 
820         try {
821             CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, compoundFileName));
822             for (HashSet<String>::iterator file = closedFiles.begin(); file != closedFiles.end(); ++file) {
823                 cfsWriter->addFile(*file);
824             }
825 
826             // Perform the merge
827             cfsWriter->close();
828             success = true;
829         } catch (LuceneException& e) {
830             finally = e;
831         }
832 
833         if (!success) {
834             if (infoStream) {
835                 message(L"hit exception building compound file doc store for segment " + docStoreSegment);
836             }
837             deleter->deleteFile(compoundFileName);
838             docWriter->abort();
839         }
840         finally.throwException();
841 
842         for (int32_t i = 0; i < numSegments; ++i) {
843             SegmentInfoPtr si(segmentInfos->info(i));
844             if (si->getDocStoreOffset() != -1 && si->getDocStoreSegment() == docStoreSegment) {
845                 si->setDocStoreIsCompoundFile(true);
846             }
847         }
848 
849         checkpoint();
850 
851         // In case the files we just merged into a CFS were not previously checkpointed
852         deleter->deleteNewFiles(docWriter->closedFiles());
853     }
854 
855     return useCompoundDocStore;
856 }
857 
getDirectory()858 DirectoryPtr IndexWriter::getDirectory() {
859     ensureOpen(false); // Pass false because the flush during closing calls getDirectory
860     return directory;
861 }
862 
getAnalyzer()863 AnalyzerPtr IndexWriter::getAnalyzer() {
864     ensureOpen();
865     return analyzer;
866 }
867 
maxDoc()868 int32_t IndexWriter::maxDoc() {
869     SyncLock syncLock(this);
870     int32_t count = docWriter ? docWriter->getNumDocsInRAM() : 0;
871     for (int32_t i = 0; i < segmentInfos->size(); ++i) {
872         count += segmentInfos->info(i)->docCount;
873     }
874     return count;
875 }
876 
numDocs()877 int32_t IndexWriter::numDocs() {
878     SyncLock syncLock(this);
879     int32_t count = docWriter ? docWriter->getNumDocsInRAM() : 0;
880     for (int32_t i = 0; i < segmentInfos->size(); ++i) {
881         SegmentInfoPtr info(segmentInfos->info(i));
882         count += info->docCount - info->getDelCount();
883     }
884     return count;
885 }
886 
hasDeletions()887 bool IndexWriter::hasDeletions() {
888     SyncLock syncLock(this);
889     ensureOpen();
890     if (docWriter->hasDeletes()) {
891         return true;
892     }
893     for (int32_t i = 0; i < segmentInfos->size(); ++i) {
894         if (segmentInfos->info(i)->hasDeletions()) {
895             return true;
896         }
897     }
898     return false;
899 }
900 
addDocument(const DocumentPtr & doc)901 void IndexWriter::addDocument(const DocumentPtr& doc) {
902     addDocument(doc, analyzer);
903 }
904 
addDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer)905 void IndexWriter::addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
906     ensureOpen();
907     bool doFlush = false;
908     bool success = false;
909     try {
910         LuceneException finally;
911         try {
912             doFlush = docWriter->addDocument(doc, analyzer);
913             success = true;
914         } catch (LuceneException& e) {
915             finally = e;
916         }
917         if (!success) {
918             if (infoStream) {
919                 message(L"hit exception adding document");
920             }
921             {
922                 SyncLock syncLock(this);
923                 // If docWriter has some aborted files that were never incref'd, then we clean them up here
924                 if (docWriter) {
925                     HashSet<String> files(docWriter->abortedFiles());
926                     if (files) {
927                         deleter->deleteNewFiles(files);
928                     }
929                 }
930             }
931         }
932         finally.throwException();
933         if (doFlush) {
934             flush(true, false, false);
935         }
936     } catch (std::bad_alloc& oom) {
937         boost::throw_exception(handleOOM(oom, L"addDocument"));
938     }
939 }
940 
deleteDocuments(const TermPtr & term)941 void IndexWriter::deleteDocuments(const TermPtr& term) {
942     ensureOpen();
943     try {
944         bool doFlush = docWriter->bufferDeleteTerm(term);
945         if (doFlush) {
946             flush(true, false, false);
947         }
948     } catch (std::bad_alloc& oom) {
949         boost::throw_exception(handleOOM(oom, L"deleteDocuments(Term)"));
950     }
951 }
952 
deleteDocuments(Collection<TermPtr> terms)953 void IndexWriter::deleteDocuments(Collection<TermPtr> terms) {
954     ensureOpen();
955     try {
956         bool doFlush = docWriter->bufferDeleteTerms(terms);
957         if (doFlush) {
958             flush(true, false, false);
959         }
960     } catch (std::bad_alloc& oom) {
961         boost::throw_exception(handleOOM(oom, L"deleteDocuments(VectorTerm)"));
962     }
963 }
964 
deleteDocuments(const QueryPtr & query)965 void IndexWriter::deleteDocuments(const QueryPtr& query) {
966     ensureOpen();
967     bool doFlush = docWriter->bufferDeleteQuery(query);
968     if (doFlush) {
969         flush(true, false, false);
970     }
971 }
972 
deleteDocuments(Collection<QueryPtr> queries)973 void IndexWriter::deleteDocuments(Collection<QueryPtr> queries) {
974     ensureOpen();
975     bool doFlush = docWriter->bufferDeleteQueries(queries);
976     if (doFlush) {
977         flush(true, false, false);
978     }
979 }
980 
updateDocument(const TermPtr & term,const DocumentPtr & doc)981 void IndexWriter::updateDocument(const TermPtr& term, const DocumentPtr& doc) {
982     ensureOpen();
983     updateDocument(term, doc, getAnalyzer());
984 }
985 
updateDocument(const TermPtr & term,const DocumentPtr & doc,const AnalyzerPtr & analyzer)986 void IndexWriter::updateDocument(const TermPtr& term, const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
987     ensureOpen();
988     try {
989         bool doFlush = false;
990         bool success = false;
991         LuceneException finally;
992         try {
993             doFlush = docWriter->updateDocument(term, doc, analyzer);
994             success = true;
995         } catch (LuceneException& e) {
996             finally = e;
997         }
998         if (!success) {
999             if (infoStream) {
1000                 message(L"hit exception updating document");
1001             }
1002 
1003             {
1004                 SyncLock syncLock(this);
1005                 // If docWriter has some aborted files that were never incref'd, then we clean them up here
1006                 if (docWriter) {
1007                     HashSet<String> files(docWriter->abortedFiles());
1008                     if (files) {
1009                         deleter->deleteNewFiles(files);
1010                     }
1011                 }
1012             }
1013         }
1014         finally.throwException();
1015         if (doFlush) {
1016             flush(true, false, false);
1017         }
1018     } catch (std::bad_alloc& oom) {
1019         boost::throw_exception(handleOOM(oom, L"updateDocument"));
1020     }
1021 }
1022 
getSegmentCount()1023 int32_t IndexWriter::getSegmentCount() {
1024     SyncLock syncLock(this);
1025     return segmentInfos->size();
1026 }
1027 
getNumBufferedDocuments()1028 int32_t IndexWriter::getNumBufferedDocuments() {
1029     SyncLock syncLock(this);
1030     return docWriter->getNumDocsInRAM();
1031 }
1032 
getDocCount(int32_t i)1033 int32_t IndexWriter::getDocCount(int32_t i) {
1034     SyncLock syncLock(this);
1035     return (i >= 0 && i < segmentInfos->size()) ? segmentInfos->info(i)->docCount : -1;
1036 }
1037 
getFlushCount()1038 int32_t IndexWriter::getFlushCount() {
1039     SyncLock syncLock(this);
1040     return flushCount;
1041 }
1042 
getFlushDeletesCount()1043 int32_t IndexWriter::getFlushDeletesCount() {
1044     SyncLock syncLock(this);
1045     return flushDeletesCount;
1046 }
1047 
newSegmentName()1048 String IndexWriter::newSegmentName() {
1049     // Cannot synchronize on IndexWriter because that causes deadlock
1050     SyncLock segmentLock(segmentInfos);
1051 
1052     // Important to increment changeCount so that the segmentInfos is written on close.
1053     // Otherwise we could close, re-open and re-return the same segment name that was
1054     // previously returned which can cause problems at least with ConcurrentMergeScheduler.
1055     ++changeCount;
1056     return L"_" + StringUtils::toString(segmentInfos->counter++, StringUtils::CHARACTER_MAX_RADIX);
1057 }
1058 
optimize()1059 void IndexWriter::optimize() {
1060     optimize(true);
1061 }
1062 
optimize(int32_t maxNumSegments)1063 void IndexWriter::optimize(int32_t maxNumSegments) {
1064     optimize(maxNumSegments, true);
1065 }
1066 
optimize(bool doWait)1067 void IndexWriter::optimize(bool doWait) {
1068     optimize(1, doWait);
1069 }
1070 
optimize(int32_t maxNumSegments,bool doWait)1071 void IndexWriter::optimize(int32_t maxNumSegments, bool doWait) {
1072     ensureOpen();
1073 
1074     if (maxNumSegments < 1) {
1075         boost::throw_exception(IllegalArgumentException(L"maxNumSegments must be >= 1; got " + StringUtils::toString(maxNumSegments)));
1076     }
1077 
1078     if (infoStream) {
1079         message(L"optimize: index now " + segString());
1080     }
1081 
1082     flush(true, false, true);
1083 
1084     {
1085         SyncLock syncLock(this);
1086 
1087         resetMergeExceptions();
1088         segmentsToOptimize.clear();
1089         optimizeMaxNumSegments = maxNumSegments;
1090         int32_t numSegments = segmentInfos->size();
1091         for (int32_t i = 0; i < numSegments; ++i) {
1092             segmentsToOptimize.add(segmentInfos->info(i));
1093         }
1094 
1095         // Now mark all pending & running merges as optimize merge
1096         for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1097             (*merge)->optimize = true;
1098             (*merge)->maxNumSegmentsOptimize = maxNumSegments;
1099         }
1100 
1101         for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1102             (*merge)->optimize = true;
1103             (*merge)->maxNumSegmentsOptimize = maxNumSegments;
1104         }
1105     }
1106 
1107     maybeMerge(maxNumSegments, true);
1108 
1109     if (doWait) {
1110         {
1111             SyncLock syncLock(this);
1112             while (true) {
1113                 if (hitOOM) {
1114                     boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete optimize"));
1115                 }
1116 
1117                 if (!mergeExceptions.empty()) {
1118                     // Forward any exceptions in background merge threads to the current thread
1119                     for (Collection<OneMergePtr>::iterator merge = mergeExceptions.begin(); merge != mergeExceptions.end(); ++merge) {
1120                         if ((*merge)->optimize) {
1121                             LuceneException err = (*merge)->getException();
1122                             if (!err.isNull()) {
1123                                 boost::throw_exception(IOException(L"background merge hit exception: " + (*merge)->segString(directory)));
1124                             }
1125                         }
1126                     }
1127                 }
1128 
1129                 if (optimizeMergesPending()) {
1130                     IndexWriter::doWait();
1131                 } else {
1132                     break;
1133                 }
1134             }
1135         }
1136 
1137         // If close is called while we are still running, throw an exception so the calling thread will know the
1138         // optimize did not complete
1139         ensureOpen();
1140     }
1141 
1142     // NOTE: in the ConcurrentMergeScheduler case, when doWait is false, we can return immediately while background
1143     // threads accomplish the optimization
1144 }
1145 
optimizeMergesPending()1146 bool IndexWriter::optimizeMergesPending() {
1147     SyncLock syncLock(this);
1148 
1149     for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1150         if ((*merge)->optimize) {
1151             return true;
1152         }
1153     }
1154 
1155     for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1156         if ((*merge)->optimize) {
1157             return true;
1158         }
1159     }
1160 
1161     return false;
1162 }
1163 
expungeDeletes(bool doWait)1164 void IndexWriter::expungeDeletes(bool doWait) {
1165     ensureOpen();
1166 
1167     if (infoStream) {
1168         message(L"expungeDeletes: index now " + segString());
1169     }
1170 
1171     MergeSpecificationPtr spec;
1172 
1173     {
1174         SyncLock syncLock(this);
1175         spec = mergePolicy->findMergesToExpungeDeletes(segmentInfos);
1176         for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1177             registerMerge(*merge);
1178         }
1179     }
1180 
1181     mergeScheduler->merge(shared_from_this());
1182 
1183     if (doWait) {
1184         {
1185             SyncLock syncLock(this);
1186             bool running = true;
1187             while (running) {
1188                 if (hitOOM) {
1189                     boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete expungeDeletes"));
1190                 }
1191 
1192                 // Check each merge that MergePolicy asked us to do, to see if any of them are still running and
1193                 // if any of them have hit an exception.
1194                 running = false;
1195                 for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1196                     if (pendingMerges.contains(*merge) || runningMerges.contains(*merge)) {
1197                         running = true;
1198                     }
1199                     LuceneException err = (*merge)->getException();
1200                     if (!err.isNull()) {
1201                         boost::throw_exception(IOException(L"background merge hit exception: " + (*merge)->segString(directory)));
1202                     }
1203                 }
1204 
1205                 // If any of our merges are still running, wait
1206                 if (running) {
1207                     IndexWriter::doWait();
1208                 }
1209             }
1210         }
1211     }
1212 
1213     // NOTE: in the ConcurrentMergeScheduler case, when doWait is false, we can return immediately while background
1214     // threads accomplish the optimization
1215 }
1216 
expungeDeletes()1217 void IndexWriter::expungeDeletes() {
1218     expungeDeletes(true);
1219 }
1220 
maybeMerge()1221 void IndexWriter::maybeMerge() {
1222     maybeMerge(false);
1223 }
1224 
maybeMerge(bool optimize)1225 void IndexWriter::maybeMerge(bool optimize) {
1226     maybeMerge(1, optimize);
1227 }
1228 
maybeMerge(int32_t maxNumSegmentsOptimize,bool optimize)1229 void IndexWriter::maybeMerge(int32_t maxNumSegmentsOptimize, bool optimize) {
1230     updatePendingMerges(maxNumSegmentsOptimize, optimize);
1231     mergeScheduler->merge(shared_from_this());
1232 }
1233 
updatePendingMerges(int32_t maxNumSegmentsOptimize,bool optimize)1234 void IndexWriter::updatePendingMerges(int32_t maxNumSegmentsOptimize, bool optimize) {
1235     SyncLock syncLock(this);
1236     BOOST_ASSERT(!optimize || maxNumSegmentsOptimize > 0);
1237 
1238     if (stopMerges) {
1239         return;
1240     }
1241 
1242     // Do not start new merges if we've hit std::bad_alloc
1243     if (hitOOM) {
1244         return;
1245     }
1246 
1247     MergeSpecificationPtr spec;
1248 
1249     if (optimize) {
1250         spec = mergePolicy->findMergesForOptimize(segmentInfos, maxNumSegmentsOptimize, segmentsToOptimize);
1251 
1252         if (spec) {
1253             for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1254                 (*merge)->optimize = true;
1255                 (*merge)->maxNumSegmentsOptimize = maxNumSegmentsOptimize;
1256             }
1257         }
1258     } else {
1259         spec = mergePolicy->findMerges(segmentInfos);
1260     }
1261 
1262     if (spec) {
1263         for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1264             registerMerge(*merge);
1265         }
1266     }
1267 }
1268 
getNextMerge()1269 OneMergePtr IndexWriter::getNextMerge() {
1270     SyncLock syncLock(this);
1271     if (pendingMerges.empty()) {
1272         return OneMergePtr();
1273     } else {
1274         // Advance the merge from pending to running
1275         OneMergePtr merge(pendingMerges.removeFirst());
1276         runningMerges.add(merge);
1277         return merge;
1278     }
1279 }
1280 
getNextExternalMerge()1281 OneMergePtr IndexWriter::getNextExternalMerge() {
1282     SyncLock syncLock(this);
1283     if (pendingMerges.empty()) {
1284         return OneMergePtr();
1285     } else {
1286         for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1287             if ((*merge)->isExternal) {
1288                 // Advance the merge from pending to running
1289                 OneMergePtr running(*merge);
1290                 runningMerges.add(*merge);
1291                 pendingMerges.remove(merge);
1292                 return running;
1293             }
1294         }
1295     }
1296 
1297     // All existing merges do not involve external segments
1298     return OneMergePtr();
1299 }
1300 
startTransaction(bool haveReadLock)1301 void IndexWriter::startTransaction(bool haveReadLock) {
1302     SyncLock syncLock(this);
1303     bool success = false;
1304     LuceneException finally;
1305     try {
1306         if (infoStream) {
1307             message(L"now start transaction");
1308         }
1309 
1310         BOOST_ASSERT(docWriter->getNumBufferedDeleteTerms() == 0); // calling startTransaction with buffered delete terms not supported
1311         BOOST_ASSERT(docWriter->getNumDocsInRAM() == 0); // calling startTransaction with buffered documents not supported
1312 
1313         ensureOpen();
1314 
1315         // If a transaction is trying to roll back (because addIndexes hit an exception) then wait here until that's done
1316         while (stopMerges) {
1317             doWait();
1318         }
1319 
1320         success = true;
1321     } catch (LuceneException& e) {
1322         finally = e;
1323     }
1324 
1325     // Release the write lock if our caller held it, on hitting an exception
1326     if (!success && haveReadLock) {
1327         releaseRead();
1328     }
1329     finally.throwException();
1330 
1331     if (haveReadLock) {
1332         upgradeReadToWrite();
1333     } else {
1334         acquireWrite();
1335     }
1336 
1337     success = false;
1338 
1339     try {
1340         localRollbackSegmentInfos = boost::dynamic_pointer_cast<SegmentInfos>(segmentInfos->clone());
1341 
1342         BOOST_ASSERT(!hasExternalSegments());
1343 
1344         localFlushedDocCount = docWriter->getFlushedDocCount();
1345 
1346         // We must "protect" our files at this point from deletion in case we need to rollback
1347         deleter->incRef(segmentInfos, false);
1348 
1349         success = true;
1350     } catch (LuceneException& e) {
1351         finally = e;
1352     }
1353 
1354     if (!success) {
1355         finishAddIndexes();
1356     }
1357     finally.throwException();
1358 }
1359 
rollbackTransaction()1360 void IndexWriter::rollbackTransaction() {
1361     SyncLock syncLock(this);
1362 
1363     if (infoStream) {
1364         message(L"now rollback transaction");
1365     }
1366 
1367     if (docWriter) {
1368         docWriter->setFlushedDocCount(localFlushedDocCount);
1369     }
1370 
1371     // Must finish merges before rolling back segmentInfos so merges don't hit  exceptions on trying to commit
1372     // themselves, don't get files deleted out  from under them, etc.
1373     finishMerges(false);
1374 
1375     // Keep the same segmentInfos instance but replace all of its SegmentInfo instances.  This is so the next
1376     // attempt to commit using this instance of IndexWriter will always write to a new generation ("write once").
1377     segmentInfos->clear();
1378     segmentInfos->addAll(localRollbackSegmentInfos);
1379     localRollbackSegmentInfos.reset();
1380 
1381     // This must come after we rollback segmentInfos, so that if a commit() kicks off it does not see the
1382     // segmentInfos with external segments.
1383     finishAddIndexes();
1384 
1385     // Ask deleter to locate unreferenced files we had created & remove them
1386     deleter->checkpoint(segmentInfos, false);
1387 
1388     // Remove the incRef we did in startTransaction
1389     deleter->decRef(segmentInfos);
1390 
1391     // Also ask deleter to remove any newly created files that were never incref'd; this "garbage" is created
1392     // when a merge kicks off but aborts part way through before it had a chance to incRef the files it had
1393     // partially created
1394     deleter->refresh();
1395 
1396     notifyAll();
1397 
1398     BOOST_ASSERT(!hasExternalSegments());
1399 }
1400 
commitTransaction()1401 void IndexWriter::commitTransaction() {
1402     SyncLock syncLock(this);
1403 
1404     if (infoStream) {
1405         message(L"now commit transaction");
1406     }
1407 
1408     // Give deleter a chance to remove files now
1409     checkpoint();
1410 
1411     // Remove the incRef we did in startTransaction.
1412     deleter->decRef(localRollbackSegmentInfos);
1413 
1414     localRollbackSegmentInfos.reset();
1415 
1416     BOOST_ASSERT(!hasExternalSegments());
1417 
1418     finishAddIndexes();
1419 }
1420 
rollback()1421 void IndexWriter::rollback() {
1422     ensureOpen();
1423 
1424     // Ensure that only one thread actually gets to do the closing
1425     if (shouldClose()) {
1426         rollbackInternal();
1427     }
1428 }
1429 
rollbackInternal()1430 void IndexWriter::rollbackInternal() {
1431     bool success = false;
1432 
1433     if (infoStream) {
1434         message(L"rollback");
1435     }
1436 
1437     docWriter->pauseAllThreads();
1438     LuceneException finally;
1439     try {
1440         finishMerges(false);
1441 
1442         // Must pre-close these two, in case they increment changeCount so that we can then set it to false before
1443         // calling closeInternal
1444         mergePolicy->close();
1445         mergeScheduler->close();
1446 
1447         {
1448             SyncLock syncLock(this);
1449 
1450             if (pendingCommit) {
1451                 pendingCommit->rollbackCommit(directory);
1452                 deleter->decRef(pendingCommit);
1453                 pendingCommit.reset();
1454                 notifyAll();
1455             }
1456 
1457             // Keep the same segmentInfos instance but replace all of its SegmentInfo instances.  This is so the next
1458             // attempt to commit using this instance of IndexWriter will always write to a new generation ("write once").
1459             segmentInfos->clear();
1460             segmentInfos->addAll(rollbackSegmentInfos);
1461 
1462             BOOST_ASSERT(!hasExternalSegments());
1463 
1464             docWriter->abort();
1465 
1466             bool test = testPoint(L"rollback before checkpoint");
1467             BOOST_ASSERT(test);
1468 
1469             // Ask deleter to locate unreferenced files & remove them
1470             deleter->checkpoint(segmentInfos, false);
1471             deleter->refresh();
1472         }
1473 
1474         // Don't bother saving any changes in our segmentInfos
1475         readerPool->clear(SegmentInfosPtr());
1476 
1477         lastCommitChangeCount = changeCount;
1478 
1479         success = true;
1480     } catch (std::bad_alloc& oom) {
1481         finally = handleOOM(oom, L"rollbackInternal");
1482     } catch (LuceneException& e) {
1483         finally = e;
1484     }
1485     {
1486         SyncLock syncLock(this);
1487 
1488         if (!success) {
1489             docWriter->resumeAllThreads();
1490             closing = false;
1491             notifyAll();
1492             if (infoStream) {
1493                 message(L"hit exception during rollback");
1494             }
1495         }
1496     }
1497     finally.throwException();
1498 
1499     closeInternal(false);
1500 }
1501 
deleteAll()1502 void IndexWriter::deleteAll() {
1503     SyncLock syncLock(this);
1504     bool success = false;
1505     docWriter->pauseAllThreads();
1506     LuceneException finally;
1507     try {
1508         // Abort any running merges
1509         finishMerges(false);
1510 
1511         // Remove any buffered docs
1512         docWriter->abort();
1513         docWriter->setFlushedDocCount(0);
1514 
1515         // Remove all segments
1516         segmentInfos->clear();
1517 
1518         // Ask deleter to locate unreferenced files & remove them
1519         deleter->checkpoint(segmentInfos, false);
1520         deleter->refresh();
1521 
1522         // Don't bother saving any changes in our segmentInfos
1523         readerPool->clear(SegmentInfosPtr());
1524 
1525         // Mark that the index has changed
1526         ++changeCount;
1527 
1528         success = true;
1529     } catch (std::bad_alloc& oom) {
1530         finally = handleOOM(oom, L"deleteAll");
1531     } catch (LuceneException& e) {
1532         finally = e;
1533     }
1534 
1535     docWriter->resumeAllThreads();
1536     if (!success && infoStream) {
1537         message(L"hit exception during deleteAll");
1538     }
1539 
1540     finally.throwException();
1541 }
1542 
finishMerges(bool waitForMerges)1543 void IndexWriter::finishMerges(bool waitForMerges) {
1544     SyncLock syncLock(this);
1545     if (!waitForMerges) {
1546         stopMerges = true;
1547 
1548         // Abort all pending and running merges
1549         for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1550             if (infoStream) {
1551                 message(L"now abort pending merge " + (*merge)->segString(directory));
1552             }
1553             (*merge)->abort();
1554             mergeFinish(*merge);
1555         }
1556         pendingMerges.clear();
1557 
1558         for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1559             if (infoStream) {
1560                 message(L"now abort running merge " + (*merge)->segString(directory));
1561             }
1562             (*merge)->abort();
1563         }
1564 
1565         // Ensure any running addIndexes finishes.  It's fine if a new one attempts to start because its merges
1566         // will quickly see the stopMerges == true and abort.
1567         acquireRead();
1568         releaseRead();
1569 
1570         // These merges periodically check whether they have been aborted, and stop if so.  We wait here to make
1571         // sure they all stop.  It should not take very long because the merge threads periodically check if they
1572         // are aborted.
1573         while (!runningMerges.empty()) {
1574             if (infoStream) {
1575                 message(L"now wait for " + StringUtils::toString(runningMerges.size()) + L" running merge to abort");
1576             }
1577             doWait();
1578         }
1579 
1580         stopMerges = false;
1581         notifyAll();
1582 
1583         BOOST_ASSERT(mergingSegments.empty());
1584 
1585         if (infoStream) {
1586             message(L"all running merges have aborted");
1587         }
1588     } else {
1589         // waitForMerges() will ensure any running addIndexes finishes.   It's fine if a new one attempts to start
1590         // because from our caller above the call will see that we are in the process of closing, and will throw
1591         // an AlreadyClosed exception.
1592         IndexWriter::waitForMerges();
1593     }
1594 }
1595 
waitForMerges()1596 void IndexWriter::waitForMerges() {
1597     SyncLock syncLock(this);
1598     // Ensure any running addIndexes finishes.
1599     acquireRead();
1600     releaseRead();
1601 
1602     while (!pendingMerges.empty() || !runningMerges.empty()) {
1603         doWait();
1604     }
1605 
1606     // sanity check
1607     BOOST_ASSERT(mergingSegments.empty());
1608 }
1609 
checkpoint()1610 void IndexWriter::checkpoint() {
1611     SyncLock syncLock(this);
1612     ++changeCount;
1613     deleter->checkpoint(segmentInfos, false);
1614 }
1615 
finishAddIndexes()1616 void IndexWriter::finishAddIndexes() {
1617     releaseWrite();
1618 }
1619 
blockAddIndexes(bool includePendingClose)1620 void IndexWriter::blockAddIndexes(bool includePendingClose) {
1621     acquireRead();
1622 
1623     bool success = false;
1624     LuceneException finally;
1625     try {
1626         // Make sure we are still open since we could have waited quite a while for last addIndexes to finish
1627         ensureOpen(includePendingClose);
1628         success = true;
1629     } catch (LuceneException& e) {
1630         finally = e;
1631     }
1632 
1633     if (!success) {
1634         releaseRead();
1635     }
1636     finally.throwException();
1637 }
1638 
resumeAddIndexes()1639 void IndexWriter::resumeAddIndexes() {
1640     releaseRead();
1641 }
1642 
resetMergeExceptions()1643 void IndexWriter::resetMergeExceptions() {
1644     SyncLock syncLock(this);
1645     mergeExceptions.clear();
1646     ++mergeGen;
1647 }
1648 
noDupDirs(Collection<DirectoryPtr> dirs)1649 void IndexWriter::noDupDirs(Collection<DirectoryPtr> dirs) {
1650     Collection<DirectoryPtr> dups(Collection<DirectoryPtr>::newInstance());
1651 
1652     for (Collection<DirectoryPtr>::iterator dir = dirs.begin(); dir != dirs.end(); ++dir) {
1653         for (Collection<DirectoryPtr>::iterator dup = dups.begin(); dup != dups.end(); ++dup) {
1654             if (*dup == *dir) {
1655                 boost::throw_exception(IllegalArgumentException(L"Directory " + (*dir)->getLockID() + L" appears more than once"));
1656             }
1657         }
1658         if (*dir == directory) {
1659             boost::throw_exception(IllegalArgumentException(L"Cannot add directory to itself"));
1660         }
1661         dups.add(*dir);
1662     }
1663 }
1664 
addIndexesNoOptimize(Collection<DirectoryPtr> dirs)1665 void IndexWriter::addIndexesNoOptimize(Collection<DirectoryPtr> dirs) {
1666     ensureOpen();
1667 
1668     noDupDirs(dirs);
1669 
1670     // Do not allow add docs or deletes while we are running
1671     docWriter->pauseAllThreads();
1672 
1673     LuceneException finally;
1674     try {
1675         if (infoStream) {
1676             message(L"flush at addIndexesNoOptimize");
1677         }
1678         flush(true, false, true);
1679 
1680         bool success = false;
1681 
1682         startTransaction(false);
1683 
1684         try {
1685             int32_t docCount = 0;
1686 
1687             {
1688                 SyncLock syncLock(this);
1689                 ensureOpen();
1690 
1691                 for (Collection<DirectoryPtr>::iterator dir = dirs.begin(); dir != dirs.end(); ++dir) {
1692                     if (directory == *dir) {
1693                         // cannot add this index: segments may be deleted in merge before added
1694                         boost::throw_exception(IllegalArgumentException(L"Cannot add this index to itself"));
1695                     }
1696 
1697                     SegmentInfosPtr sis(newLucene<SegmentInfos>()); // read infos from dir
1698                     sis->read(*dir);
1699 
1700                     for (int32_t j = 0; j < sis->size(); ++j) {
1701                         SegmentInfoPtr info(sis->info(j));
1702                         BOOST_ASSERT(!segmentInfos->contains(info));
1703                         docCount += info->docCount;
1704                         segmentInfos->add(info); // add each info
1705                     }
1706                 }
1707             }
1708 
1709             // Notify DocumentsWriter that the flushed count just increased
1710             docWriter->updateFlushedDocCount(docCount);
1711 
1712             maybeMerge();
1713 
1714             ensureOpen();
1715 
1716             // If after merging there remain segments in the index that are in a different directory, just copy these
1717             // over into our index.  This is necessary (before finishing the transaction) to avoid leaving the index
1718             // in an unusable (inconsistent) state.
1719             resolveExternalSegments();
1720 
1721             ensureOpen();
1722 
1723             success = true;
1724         } catch (LuceneException& e) {
1725             finally = e;
1726         }
1727 
1728         if (success) {
1729             commitTransaction();
1730         } else {
1731             rollbackTransaction();
1732         }
1733     } catch (std::bad_alloc& oom) {
1734         finally = handleOOM(oom, L"addIndexesNoOptimize");
1735     } catch (LuceneException& e) {
1736         finally = e;
1737     }
1738     if (docWriter) {
1739         docWriter->resumeAllThreads();
1740     }
1741     finally.throwException();
1742 }
1743 
hasExternalSegments()1744 bool IndexWriter::hasExternalSegments() {
1745     return segmentInfos->hasExternalSegments(directory);
1746 }
1747 
resolveExternalSegments()1748 void IndexWriter::resolveExternalSegments() {
1749     bool any = false;
1750     bool done = false;
1751 
1752     while (!done) {
1753         SegmentInfoPtr info;
1754         OneMergePtr merge;
1755 
1756         {
1757             SyncLock syncLock(this);
1758             if (stopMerges) {
1759                 boost::throw_exception(MergeAbortedException(L"rollback() was called or addIndexes* hit an unhandled exception"));
1760             }
1761 
1762             int32_t numSegments = segmentInfos->size();
1763 
1764             done = true;
1765             for (int32_t i = 0; i < numSegments; ++i) {
1766                 info = segmentInfos->info(i);
1767                 if (info->dir != directory) {
1768                     done = false;
1769                     OneMergePtr newMerge(newLucene<OneMerge>(segmentInfos->range(i, i + 1), boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy) && getUseCompoundFile()));
1770 
1771                     // Returns true if no running merge conflicts with this one (and, records this merge as
1772                     // pending), ie, this segment is not currently being merged
1773                     if (registerMerge(newMerge)) {
1774                         merge = newMerge;
1775 
1776                         // If this segment is not currently being merged, then advance it to running & run
1777                         // the merge ourself (below)
1778                         pendingMerges.remove(merge);
1779                         runningMerges.add(merge);
1780                         break;
1781                     }
1782                 }
1783             }
1784 
1785             if (!done && !merge) {
1786                 // We are not yet done (external segments still exist in segmentInfos), yet, all such segments
1787                 // are currently "covered" by a pending or running merge.  We now try to grab any pending merge
1788                 // that involves external segments
1789                 merge = getNextExternalMerge();
1790             }
1791 
1792             if (!done && !merge) {
1793                 // We are not yet done, and, all external segments fall under merges that the merge scheduler is
1794                 // currently running.  So, we now wait and check back to see if the merge has completed.
1795                 doWait();
1796             }
1797         }
1798 
1799         if (merge) {
1800             any = true;
1801             IndexWriter::merge(merge);
1802         }
1803     }
1804 
1805     if (any) {
1806         // Sometimes, on copying an external segment over, more merges may become necessary
1807         mergeScheduler->merge(shared_from_this());
1808     }
1809 }
1810 
addIndexes(Collection<IndexReaderPtr> readers)1811 void IndexWriter::addIndexes(Collection<IndexReaderPtr> readers) {
1812     ensureOpen();
1813 
1814     // Do not allow add docs or deletes while we are running
1815     docWriter->pauseAllThreads();
1816 
1817     // We must pre-acquire a read lock here (and upgrade to write lock in startTransaction below) so that no
1818     // other addIndexes is allowed to start up after we have flushed & optimized but before we then start our
1819     // transaction.  This is because the merging below requires that only one segment is present in the index
1820     acquireRead();
1821 
1822     LuceneException finally;
1823     try {
1824         SegmentInfoPtr info;
1825         String mergedName;
1826         SegmentMergerPtr merger;
1827 
1828         bool success = false;
1829 
1830         try {
1831             flush(true, false, true);
1832             optimize(); // start with zero or 1 seg
1833             success = true;
1834         } catch (LuceneException& e) {
1835             finally = e;
1836         }
1837 
1838         // Take care to release the read lock if we hit an exception before starting the transaction
1839         if (!success) {
1840             releaseRead();
1841         }
1842         finally.throwException();
1843 
1844         // true means we already have a read lock; if this call hits an exception it will release the write lock
1845         startTransaction(true);
1846 
1847         try {
1848             mergedName = newSegmentName();
1849             merger = newLucene<SegmentMerger>(shared_from_this(), mergedName, OneMergePtr());
1850 
1851             SegmentReaderPtr sReader;
1852 
1853             {
1854                 SyncLock syncLock(this);
1855                 if (segmentInfos->size() == 1) { // add existing index, if any
1856                     sReader = readerPool->get(segmentInfos->info(0), true, BufferedIndexInput::BUFFER_SIZE, -1);
1857                 }
1858             }
1859 
1860             success = false;
1861 
1862             try {
1863                 if (sReader) {
1864                     merger->add(sReader);
1865                 }
1866 
1867                 for (Collection<IndexReaderPtr>::iterator i = readers.begin(); i != readers.end(); ++i) {
1868                     merger->add(*i);
1869                 }
1870 
1871                 int32_t docCount = merger->merge(); // merge 'em
1872 
1873                 {
1874                     SyncLock syncLock(this);
1875                     segmentInfos->clear(); // pop old infos & add new
1876                     info = newLucene<SegmentInfo>(mergedName, docCount, directory, false, true, -1, L"", false, merger->hasProx());
1877                     setDiagnostics(info, L"addIndexes(Collection<IndexReaderPtr>)");
1878                     segmentInfos->add(info);
1879                 }
1880 
1881                 // Notify DocumentsWriter that the flushed count just increased
1882                 docWriter->updateFlushedDocCount(docCount);
1883 
1884                 success = true;
1885             } catch (LuceneException& e) {
1886                 finally = e;
1887             }
1888 
1889             if (sReader) {
1890                 readerPool->release(sReader);
1891             }
1892         } catch (LuceneException& e) {
1893             finally = e;
1894         }
1895 
1896         if (!success) {
1897             if (infoStream) {
1898                 message(L"hit exception in addIndexes during merge");
1899             }
1900             rollbackTransaction();
1901         } else {
1902             commitTransaction();
1903         }
1904 
1905         finally.throwException();
1906 
1907         if (boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy) && getUseCompoundFile()) {
1908             HashSet<String> files;
1909 
1910             {
1911                 SyncLock syncLock(this);
1912                 // Must incRef our files so that if another thread is running merge/optimize, it doesn't delete our
1913                 // segment's files before we have a change to finish making the compound file.
1914                 if (segmentInfos->contains(info)) {
1915                     files = info->files();
1916                     deleter->incRef(files);
1917                 }
1918             }
1919 
1920             if (files) {
1921                 success = false;
1922 
1923                 startTransaction(false);
1924 
1925                 try {
1926                     merger->createCompoundFile(mergedName + L".cfs");
1927 
1928                     {
1929                         SyncLock syncLock(this);
1930                         info->setUseCompoundFile(true);
1931                     }
1932 
1933                     success = true;
1934                 } catch (LuceneException& e) {
1935                     finally = e;
1936                 }
1937 
1938                 {
1939                     SyncLock syncLock(this);
1940                     deleter->decRef(files);
1941                 }
1942 
1943                 if (!success) {
1944                     if (infoStream) {
1945                         message(L"hit exception building compound file in addIndexes during merge");
1946                     }
1947                     rollbackTransaction();
1948                 } else {
1949                     commitTransaction();
1950                 }
1951             }
1952         }
1953     } catch (std::bad_alloc& oom) {
1954         finally = handleOOM(oom, L"addIndexes(Collection<IndexReaderPtr>)");
1955     } catch (LuceneException& e) {
1956         finally = e;
1957     }
1958     if (docWriter) {
1959         docWriter->resumeAllThreads();
1960     }
1961     finally.throwException();
1962 }
1963 
doAfterFlush()1964 void IndexWriter::doAfterFlush() {
1965     // override
1966 }
1967 
doBeforeFlush()1968 void IndexWriter::doBeforeFlush() {
1969     // override
1970 }
1971 
prepareCommit()1972 void IndexWriter::prepareCommit() {
1973     ensureOpen();
1974     prepareCommit(MapStringString());
1975 }
1976 
prepareCommit(MapStringString commitUserData)1977 void IndexWriter::prepareCommit(MapStringString commitUserData) {
1978     if (hitOOM) {
1979         boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot commit"));
1980     }
1981 
1982     if (pendingCommit) {
1983         boost::throw_exception(IllegalStateException(L"prepareCommit was already called with no corresponding call to commit"));
1984     }
1985 
1986     if (infoStream) {
1987         message(L"prepareCommit: flush");
1988     }
1989 
1990     flush(true, true, true);
1991 
1992     startCommit(0, commitUserData);
1993 }
1994 
commit(int64_t sizeInBytes)1995 void IndexWriter::commit(int64_t sizeInBytes) {
1996     SyncLock messageLock(commitLock);
1997     startCommit(sizeInBytes, MapStringString());
1998     finishCommit();
1999 }
2000 
commit()2001 void IndexWriter::commit() {
2002     commit(MapStringString());
2003 }
2004 
commit(MapStringString commitUserData)2005 void IndexWriter::commit(MapStringString commitUserData) {
2006     ensureOpen();
2007 
2008     if (infoStream) {
2009         message(L"commit: start");
2010     }
2011 
2012     {
2013         SyncLock messageLock(commitLock);
2014 
2015         if (infoStream) {
2016             message(L"commit: enter lock");
2017         }
2018 
2019         if (!pendingCommit) {
2020             if (infoStream) {
2021                 message(L"commit: now prepare");
2022             }
2023             prepareCommit(commitUserData);
2024         } else if (infoStream) {
2025             message(L"commit: already prepared");
2026         }
2027 
2028         finishCommit();
2029     }
2030 }
2031 
finishCommit()2032 void IndexWriter::finishCommit() {
2033     SyncLock syncLock(this);
2034     if (pendingCommit) {
2035         LuceneException finally;
2036         try {
2037             if (infoStream) {
2038                 message(L"commit: pendingCommit != null");
2039             }
2040             pendingCommit->finishCommit(directory);
2041             if (infoStream) {
2042                 message(L"commit: wrote segments file \"" + pendingCommit->getCurrentSegmentFileName() + L"\"");
2043             }
2044             lastCommitChangeCount = pendingCommitChangeCount;
2045             segmentInfos->updateGeneration(pendingCommit);
2046             segmentInfos->setUserData(pendingCommit->getUserData());
2047             setRollbackSegmentInfos(pendingCommit);
2048             deleter->checkpoint(pendingCommit, true);
2049         } catch (LuceneException& e) {
2050             finally = e;
2051         }
2052 
2053         deleter->decRef(pendingCommit);
2054         pendingCommit.reset();
2055         notifyAll();
2056         finally.throwException();
2057     } else if (infoStream) {
2058         message(L"commit: pendingCommit == null; skip");
2059     }
2060 
2061     if (infoStream) {
2062         message(L"commit: done");
2063     }
2064 }
2065 
flush(bool triggerMerge,bool flushDocStores,bool flushDeletes)2066 void IndexWriter::flush(bool triggerMerge, bool flushDocStores, bool flushDeletes) {
2067     // We can be called during close, when closing = true, so we must pass false to ensureOpen
2068     ensureOpen(false);
2069     if (doFlush(flushDocStores, flushDeletes) && triggerMerge) {
2070         maybeMerge();
2071     }
2072 }
2073 
doFlush(bool flushDocStores,bool flushDeletes)2074 bool IndexWriter::doFlush(bool flushDocStores, bool flushDeletes) {
2075     TestScope testScope(L"IndexWriter", L"doFlush");
2076     SyncLock syncLock(this);
2077     bool success = false;
2078     LuceneException finally;
2079     try {
2080         try {
2081             success = doFlushInternal(flushDocStores, flushDeletes);
2082         } catch (LuceneException& e) {
2083             finally = e;
2084         }
2085         if (docWriter->doBalanceRAM()) {
2086             docWriter->balanceRAM();
2087         }
2088         finally.throwException();
2089     } catch (LuceneException& e) {
2090         finally = e;
2091     }
2092     docWriter->clearFlushPending();
2093     finally.throwException();
2094     return success;
2095 }
2096 
doFlushInternal(bool flushDocStores,bool flushDeletes)2097 bool IndexWriter::doFlushInternal(bool flushDocStores, bool flushDeletes) {
2098     SyncLock syncLock(this);
2099     if (hitOOM) {
2100         boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot flush"));
2101     }
2102 
2103     ensureOpen(false);
2104 
2105     BOOST_ASSERT(testPoint(L"startDoFlush"));
2106 
2107     doBeforeFlush();
2108 
2109     ++flushCount;
2110 
2111     // If we are flushing because too many deletes accumulated, then we should apply the deletes to free RAM
2112     if (docWriter->doApplyDeletes()) {
2113         flushDeletes = true;
2114     }
2115 
2116     // Make sure no threads are actively adding a document. Returns true if docWriter is currently aborting, in
2117     // which case we skip flushing this segment
2118     if (infoStream) {
2119         message(L"flush: now pause all indexing threads");
2120     }
2121     if (docWriter->pauseAllThreads()) {
2122         docWriter->resumeAllThreads();
2123         return false;
2124     }
2125 
2126     bool flushDocs = false;
2127 
2128     LuceneException finally;
2129     try {
2130         SegmentInfoPtr newSegment;
2131 
2132         int32_t numDocs = docWriter->getNumDocsInRAM();
2133 
2134         // Always flush docs if there are any
2135         flushDocs = (numDocs > 0);
2136 
2137         String docStoreSegment(docWriter->getDocStoreSegment());
2138 
2139         BOOST_ASSERT(!docStoreSegment.empty() || numDocs == 0);
2140 
2141         if (docStoreSegment.empty()) {
2142             flushDocStores = false;
2143         }
2144 
2145         int32_t docStoreOffset = docWriter->getDocStoreOffset();
2146 
2147         bool docStoreIsCompoundFile = false;
2148 
2149         if (infoStream) {
2150             message(L" flush: segment=" + docWriter->getSegment() +
2151                     L" docStoreSegment=" + StringUtils::toString(docWriter->getDocStoreSegment()) +
2152                     L" docStoreOffset=" + StringUtils::toString(docStoreOffset) +
2153                     L" flushDocs=" + StringUtils::toString(flushDocs) +
2154                     L" flushDeletes=" + StringUtils::toString(flushDeletes) +
2155                     L" flushDocStores=" + StringUtils::toString(flushDocStores) +
2156                     L" numDocs=" + StringUtils::toString(numDocs) +
2157                     L" numBufDelTerms=" + StringUtils::toString(docWriter->getNumBufferedDeleteTerms()));
2158             message(L" index before flush " + segString());
2159         }
2160 
2161         // Check if the doc stores must be separately flushed because other segments, besides the one we are
2162         // about to flush, reference it
2163         if (flushDocStores && (!flushDocs || docWriter->getSegment() != docWriter->getDocStoreSegment())) {
2164             // We must separately flush the doc store
2165             if (infoStream) {
2166                 message(L" flush shared docStore segment " + docStoreSegment);
2167             }
2168 
2169             docStoreIsCompoundFile = IndexWriter::flushDocStores();
2170             flushDocStores = false;
2171         }
2172 
2173         String segment(docWriter->getSegment());
2174 
2175         // If we are flushing docs, segment must not be null
2176         BOOST_ASSERT(!segment.empty() || !flushDocs);
2177 
2178         if (flushDocs) {
2179             bool success = false;
2180             int32_t flushedDocCount;
2181 
2182             try {
2183                 flushedDocCount = docWriter->flush(flushDocStores);
2184                 if (infoStream) {
2185                     message(L"flushedFiles=" + StringUtils::toString(docWriter->getFlushedFiles()));
2186                 }
2187                 success = true;
2188             } catch (LuceneException& e) {
2189                 finally = e;
2190             }
2191 
2192             if (!success) {
2193                 if (infoStream) {
2194                     message(L"hit exception flushing segment " + segment);
2195                 }
2196                 deleter->refresh(segment);
2197             }
2198 
2199             finally.throwException();
2200 
2201             if (docStoreOffset == 0 && flushDocStores) {
2202                 // This means we are flushing private doc stores with this segment, so it will not be shared
2203                 // with other segments
2204                 BOOST_ASSERT(!docStoreSegment.empty());
2205                 BOOST_ASSERT(docStoreSegment == segment);
2206                 docStoreOffset = -1;
2207                 docStoreIsCompoundFile = false;
2208                 docStoreSegment.clear();
2209             }
2210 
2211             // Create new SegmentInfo, but do not add to our segmentInfos until deletes are flushed successfully.
2212             newSegment = newLucene<SegmentInfo>(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter->hasProx());
2213             setDiagnostics(newSegment, L"flush");
2214         }
2215 
2216         docWriter->pushDeletes();
2217 
2218         if (flushDocs) {
2219             segmentInfos->add(newSegment);
2220             checkpoint();
2221         }
2222 
2223         if (flushDocs && mergePolicy->useCompoundFile(segmentInfos, newSegment)) {
2224             // Now build compound file
2225             bool success = false;
2226             try {
2227                 docWriter->createCompoundFile(segment);
2228                 success = true;
2229             } catch (LuceneException& e) {
2230                 finally = e;
2231             }
2232 
2233             if (!success) {
2234                 if (infoStream) {
2235                     message(L"hit exception creating compound file for newly flushed segment " + segment);
2236                 }
2237                 deleter->deleteFile(segment + L"." + IndexFileNames::COMPOUND_FILE_EXTENSION());
2238             }
2239 
2240             finally.throwException();
2241 
2242             newSegment->setUseCompoundFile(true);
2243             checkpoint();
2244         }
2245 
2246         if (flushDeletes) {
2247             applyDeletes();
2248         }
2249 
2250         if (flushDocs) {
2251             checkpoint();
2252         }
2253 
2254         doAfterFlush();
2255     } catch (std::bad_alloc& oom) {
2256         finally = handleOOM(oom, L"doFlush");
2257         flushDocs = false;
2258     } catch (LuceneException& e) {
2259         finally = e;
2260     }
2261     docWriter->resumeAllThreads();
2262     finally.throwException();
2263 
2264     return flushDocs;
2265 }
2266 
ramSizeInBytes()2267 int64_t IndexWriter::ramSizeInBytes() {
2268     ensureOpen();
2269     return docWriter->getRAMUsed();
2270 }
2271 
numRamDocs()2272 int32_t IndexWriter::numRamDocs() {
2273     SyncLock syncLock(this);
2274     ensureOpen();
2275     return docWriter->getNumDocsInRAM();
2276 }
2277 
ensureContiguousMerge(const OneMergePtr & merge)2278 int32_t IndexWriter::ensureContiguousMerge(const OneMergePtr& merge) {
2279     int32_t first = segmentInfos->find(merge->segments->info(0));
2280     if (first == -1) {
2281         boost::throw_exception(MergeException(L"Could not find segment " + merge->segments->info(0)->name + L" in current index " + segString()));
2282     }
2283 
2284     int32_t numSegments = segmentInfos->size();
2285     int32_t numSegmentsToMerge = merge->segments->size();
2286 
2287     for (int32_t i = 0; i < numSegmentsToMerge; ++i) {
2288         SegmentInfoPtr info(merge->segments->info(i));
2289 
2290         if (first + i >= numSegments || !segmentInfos->info(first + i)->equals(info)) {
2291             if (!segmentInfos->contains(info)) {
2292                 boost::throw_exception(MergeException(L"MergePolicy selected a segment (" + info->name + L") that is not in the current index " + segString()));
2293             } else {
2294                 boost::throw_exception(MergeException(L"MergePolicy selected non-contiguous segments to merge (" + merge->segString(directory) + L" vs " + segString() + L"), which IndexWriter (currently) cannot handle"));
2295             }
2296         }
2297     }
2298 
2299     return first;
2300 }
2301 
commitMergedDeletes(const OneMergePtr & merge,const SegmentReaderPtr & mergeReader)2302 void IndexWriter::commitMergedDeletes(const OneMergePtr& merge, const SegmentReaderPtr& mergeReader) {
2303     SyncLock syncLock(this);
2304     BOOST_ASSERT(testPoint(L"startCommitMergeDeletes"));
2305 
2306     SegmentInfosPtr sourceSegments(merge->segments);
2307 
2308     if (infoStream) {
2309         message(L"commitMergeDeletes " + merge->segString(directory));
2310     }
2311 
2312     // Carefully merge deletes that occurred after we started merging
2313     int32_t docUpto = 0;
2314     int32_t delCount = 0;
2315 
2316     for (int32_t i = 0; i < sourceSegments->size(); ++i) {
2317         SegmentInfoPtr info(sourceSegments->info(i));
2318         int32_t docCount = info->docCount;
2319         SegmentReaderPtr previousReader(merge->readersClone[i]);
2320         SegmentReaderPtr currentReader(merge->readers[i]);
2321         if (previousReader->hasDeletions()) {
2322             // There were deletes on this segment when the merge started.  The merge has collapsed away those deletes,
2323             // but if new deletes were flushed since the merge started, we must now carefully keep any newly flushed
2324             // deletes but mapping them to the new docIDs.
2325 
2326             if (currentReader->numDeletedDocs() > previousReader->numDeletedDocs()) {
2327                 // This means this segment has had new deletes committed since we started the merge, so we must merge them
2328                 for (int32_t j = 0; j < docCount; ++j) {
2329                     if (previousReader->isDeleted(j)) {
2330                         BOOST_ASSERT(currentReader->isDeleted(j));
2331                     } else {
2332                         if (currentReader->isDeleted(j)) {
2333                             mergeReader->doDelete(docUpto);
2334                             ++delCount;
2335                         }
2336                         ++docUpto;
2337                     }
2338                 }
2339             } else {
2340                 docUpto += docCount - previousReader->numDeletedDocs();
2341             }
2342         } else if (currentReader->hasDeletions()) {
2343             // This segment had no deletes before but now it does
2344             for (int32_t j = 0; j < docCount; ++j) {
2345                 if (currentReader->isDeleted(j)) {
2346                     mergeReader->doDelete(docUpto);
2347                     ++delCount;
2348                 }
2349                 ++docUpto;
2350             }
2351         } else {
2352             // No deletes before or after
2353             docUpto += info->docCount;
2354         }
2355     }
2356 
2357     BOOST_ASSERT(mergeReader->numDeletedDocs() == delCount);
2358 
2359     mergeReader->_hasChanges = (delCount > 0);
2360 }
2361 
commitMerge(const OneMergePtr & merge,const SegmentMergerPtr & merger,int32_t mergedDocCount,const SegmentReaderPtr & mergedReader)2362 bool IndexWriter::commitMerge(const OneMergePtr& merge, const SegmentMergerPtr& merger, int32_t mergedDocCount, const SegmentReaderPtr& mergedReader) {
2363     SyncLock syncLock(this);
2364     BOOST_ASSERT(testPoint(L"startCommitMerge"));
2365 
2366     if (hitOOM) {
2367         boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete merge"));
2368     }
2369 
2370     if (infoStream) {
2371         message(L"commitMerge: " + merge->segString(directory) + L" index=" + segString());
2372     }
2373 
2374     BOOST_ASSERT(merge->registerDone);
2375 
2376     // If merge was explicitly aborted, or, if rollback() or rollbackTransaction() had been called since our merge
2377     // started (which results in an unqualified deleter.refresh() call that will remove any index file that current
2378     // segments does not reference), we abort this merge
2379     if (merge->isAborted()) {
2380         if (infoStream) {
2381             message(L"commitMerge: skipping merge " + merge->segString(directory) + L": it was aborted");
2382         }
2383         return false;
2384     }
2385 
2386     int32_t start = ensureContiguousMerge(merge);
2387 
2388     commitMergedDeletes(merge, mergedReader);
2389     docWriter->remapDeletes(segmentInfos, merger->getDocMaps(), merger->getDelCounts(), merge, mergedDocCount);
2390 
2391     // If the doc store we are using has been closed and is in now compound format (but wasn't when we started),
2392     // then we will switch to the compound format as well
2393     setMergeDocStoreIsCompoundFile(merge);
2394 
2395     merge->info->setHasProx(merger->hasProx());
2396 
2397     segmentInfos->remove(start, start + merge->segments->size());
2398     BOOST_ASSERT(!segmentInfos->contains(merge->info));
2399     segmentInfos->add(start, merge->info);
2400 
2401     closeMergeReaders(merge, false);
2402 
2403     // Must note the change to segmentInfos so any commits in-flight don't lose it
2404     checkpoint();
2405 
2406     // If the merged segments had pending changes, clear them so that they don't bother writing
2407     // them to disk, updating SegmentInfo, etc.
2408     readerPool->clear(merge->segments);
2409 
2410     if (merge->optimize) {
2411         // cascade the optimize
2412         segmentsToOptimize.add(merge->info);
2413     }
2414     return true;
2415 }
2416 
handleMergeException(const LuceneException & exc,const OneMergePtr & merge)2417 LuceneException IndexWriter::handleMergeException(const LuceneException& exc, const OneMergePtr& merge) {
2418     if (infoStream) {
2419         message(L"handleMergeException: merge=" + merge->segString(directory) + L" exc=" + exc.getError());
2420     }
2421 
2422     // Set the exception on the merge, so if optimize() is waiting on us it sees the root cause exception
2423     merge->setException(exc);
2424     addMergeException(merge);
2425 
2426     switch (exc.getType()) {
2427     case LuceneException::MergeAborted:
2428         // We can ignore this exception (it happens when close(false) or rollback is called), unless the
2429         // merge involves segments from external directories, in which case we must throw it so, for
2430         // example, the rollbackTransaction code in addIndexes* is executed.
2431         if (merge->isExternal) {
2432             return exc;
2433         }
2434         break;
2435     case LuceneException::IO:
2436     case LuceneException::Runtime:
2437         return exc;
2438     default:
2439         return RuntimeException(); // Should not get here
2440     }
2441     return LuceneException();
2442 }
2443 
merge(const OneMergePtr & merge)2444 void IndexWriter::merge(const OneMergePtr& merge) {
2445     bool success = false;
2446 
2447     try {
2448         LuceneException finally;
2449         try {
2450             try {
2451                 mergeInit(merge);
2452                 if (infoStream) {
2453                     message(L"now merge\n merge=" + merge->segString(directory) + L"\n index=" + segString());
2454                 }
2455 
2456                 mergeMiddle(merge);
2457                 mergeSuccess(merge);
2458                 success = true;
2459             } catch (LuceneException& e) {
2460                 finally = handleMergeException(e, merge);
2461             }
2462 
2463             {
2464                 SyncLock syncLock(this);
2465                 mergeFinish(merge);
2466 
2467                 if (!success) {
2468                     if (infoStream) {
2469                         message(L"hit exception during merge");
2470                     }
2471 
2472                     if (merge->info && !segmentInfos->contains(merge->info)) {
2473                         deleter->refresh(merge->info->name);
2474                     }
2475                 }
2476 
2477                 // This merge (and, generally, any change to the segments) may now enable
2478                 // new merges, so we call merge policy & update pending merges.
2479                 if (success && !merge->isAborted() && !closed && !closing) {
2480                     updatePendingMerges(merge->maxNumSegmentsOptimize, merge->optimize);
2481                 }
2482             }
2483         } catch (LuceneException& e) {
2484             finally = e;
2485         }
2486         finally.throwException();
2487     } catch (std::bad_alloc& oom) {
2488         boost::throw_exception(handleOOM(oom, L"merge"));
2489     }
2490 }
2491 
mergeSuccess(const OneMergePtr & merge)2492 void IndexWriter::mergeSuccess(const OneMergePtr& merge) {
2493     // override
2494 }
2495 
registerMerge(const OneMergePtr & merge)2496 bool IndexWriter::registerMerge(const OneMergePtr& merge) {
2497     SyncLock syncLock(this);
2498 
2499     if (merge->registerDone) {
2500         return true;
2501     }
2502 
2503     if (stopMerges) {
2504         merge->abort();
2505         boost::throw_exception(MergeAbortedException(L"merge is aborted: " + merge->segString(directory)));
2506     }
2507 
2508     int32_t count = merge->segments->size();
2509     bool isExternal = false;
2510     for (int32_t i = 0; i < count; ++i) {
2511         SegmentInfoPtr info(merge->segments->info(i));
2512         if (mergingSegments.contains(info)) {
2513             return false;
2514         }
2515         if (!segmentInfos->contains(info)) {
2516             return false;
2517         }
2518         if (info->dir != directory) {
2519             isExternal = true;
2520         }
2521         if (segmentsToOptimize.contains(info)) {
2522             merge->optimize = true;
2523             merge->maxNumSegmentsOptimize = optimizeMaxNumSegments;
2524         }
2525     }
2526 
2527     ensureContiguousMerge(merge);
2528 
2529     pendingMerges.add(merge);
2530 
2531     if (infoStream) {
2532         message(L"add merge to pendingMerges: " + merge->segString(directory) + L" [total " + StringUtils::toString(pendingMerges.size()) + L" pending]");
2533     }
2534 
2535     merge->mergeGen = mergeGen;
2536     merge->isExternal = isExternal;
2537 
2538     // OK it does not conflict; now record that this merge is running (while synchronized)
2539     // to avoid race condition where two conflicting merges from different threads, start
2540     for (int32_t i = 0; i < count; ++i) {
2541         mergingSegments.add(merge->segments->info(i));
2542     }
2543 
2544     // Merge is now registered
2545     merge->registerDone = true;
2546     return true;
2547 }
2548 
mergeInit(const OneMergePtr & merge)2549 void IndexWriter::mergeInit(const OneMergePtr& merge) {
2550     SyncLock syncLock(this);
2551     bool success = false;
2552     LuceneException finally;
2553     try {
2554         _mergeInit(merge);
2555         success = true;
2556     } catch (LuceneException& e) {
2557         finally = e;
2558     }
2559 
2560     if (!success) {
2561         mergeFinish(merge);
2562     }
2563     finally.throwException();
2564 }
2565 
_mergeInit(const OneMergePtr & merge)2566 void IndexWriter::_mergeInit(const OneMergePtr& merge) {
2567     SyncLock syncLock(this);
2568     bool test = testPoint(L"startMergeInit");
2569     BOOST_ASSERT(test);
2570 
2571     BOOST_ASSERT(merge->registerDone);
2572     BOOST_ASSERT(!merge->optimize || merge->maxNumSegmentsOptimize > 0);
2573 
2574     if (hitOOM) {
2575         boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot merge"));
2576     }
2577 
2578     if (merge->info) {
2579         // mergeInit already done
2580         return;
2581     }
2582 
2583     if (merge->isAborted()) {
2584         return;
2585     }
2586 
2587     applyDeletes();
2588 
2589     SegmentInfosPtr sourceSegments(merge->segments);
2590     int32_t end = sourceSegments->size();
2591 
2592     // Check whether this merge will allow us to skip merging the doc stores (stored field & vectors).
2593     // This is a very substantial optimization (saves tons of IO).
2594     DirectoryPtr lastDir(directory);
2595     String lastDocStoreSegment;
2596     int32_t next = -1;
2597 
2598     bool mergeDocStores = false;
2599     bool doFlushDocStore = false;
2600     String currentDocStoreSegment(docWriter->getDocStoreSegment());
2601 
2602     // Test each segment to be merged: check if we need to flush/merge doc stores
2603     for (int32_t i = 0; i < end; ++i) {
2604         SegmentInfoPtr si(sourceSegments->info(i));
2605 
2606         // If it has deletions we must merge the doc stores
2607         if (si->hasDeletions()) {
2608             mergeDocStores = true;
2609         }
2610 
2611         // If it has its own (private) doc stores we must merge the doc stores
2612         if (si->getDocStoreOffset() == -1) {
2613             mergeDocStores = true;
2614         }
2615 
2616         // If it has a different doc store segment than previous segments, we must merge the doc stores
2617         String docStoreSegment(si->getDocStoreSegment());
2618         if (docStoreSegment.empty()) {
2619             mergeDocStores = true;
2620         } else if (lastDocStoreSegment.empty()) {
2621             lastDocStoreSegment = docStoreSegment;
2622         } else if (lastDocStoreSegment != docStoreSegment) {
2623             mergeDocStores = true;
2624         }
2625 
2626         // Segments' docScoreOffsets must be in-order, contiguous.  For the default merge policy now
2627         // this will always be the case but for an arbitrary merge policy this may not be the case
2628         if (next == -1) {
2629             next = si->getDocStoreOffset() + si->docCount;
2630         } else if (next != si->getDocStoreOffset()) {
2631             mergeDocStores = true;
2632         } else {
2633             next = si->getDocStoreOffset() + si->docCount;
2634         }
2635 
2636         // If the segment comes from a different directory we must merge
2637         if (lastDir != si->dir) {
2638             mergeDocStores = true;
2639         }
2640 
2641         // If the segment is referencing the current "live" doc store outputs then we must merge
2642         if (si->getDocStoreOffset() != -1 && !currentDocStoreSegment.empty() && si->getDocStoreSegment() == currentDocStoreSegment) {
2643             doFlushDocStore = true;
2644         }
2645     }
2646 
2647     // if a mergedSegmentWarmer is installed, we must merge the doc stores because we will open a full
2648     // SegmentReader on the merged segment
2649     if (!mergeDocStores && mergedSegmentWarmer && !currentDocStoreSegment.empty() && !lastDocStoreSegment.empty() && lastDocStoreSegment == currentDocStoreSegment) {
2650         mergeDocStores = true;
2651     }
2652 
2653     int32_t docStoreOffset;
2654     String docStoreSegment;
2655     bool docStoreIsCompoundFile;
2656 
2657     if (mergeDocStores) {
2658         docStoreOffset = -1;
2659         docStoreSegment.clear();
2660         docStoreIsCompoundFile = false;
2661     } else {
2662         SegmentInfoPtr si(sourceSegments->info(0));
2663         docStoreOffset = si->getDocStoreOffset();
2664         docStoreSegment = si->getDocStoreSegment();
2665         docStoreIsCompoundFile = si->getDocStoreIsCompoundFile();
2666     }
2667 
2668     if (mergeDocStores && doFlushDocStore) {
2669         // SegmentMerger intends to merge the doc stores (stored fields, vectors), and at
2670         // least one of the segments to be merged refers to the currently live doc stores.
2671         if (infoStream) {
2672             message(L"now flush at merge");
2673         }
2674         doFlush(true, false);
2675     }
2676 
2677     merge->mergeDocStores = mergeDocStores;
2678 
2679     // Bind a new segment name here so even with ConcurrentMergePolicy we keep deterministic segment names.
2680     merge->info = newLucene<SegmentInfo>(newSegmentName(), 0, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, false);
2681 
2682     MapStringString details(MapStringString::newInstance());
2683     details.put(L"optimize", StringUtils::toString(merge->optimize));
2684     details.put(L"mergeFactor", StringUtils::toString(end));
2685     details.put(L"mergeDocStores", StringUtils::toString(mergeDocStores));
2686     setDiagnostics(merge->info, L"merge", details);
2687 
2688     // Also enroll the merged segment into mergingSegments; this prevents it from getting
2689     // selected for a merge after our merge is done but while we are building the CFS
2690     mergingSegments.add(merge->info);
2691 }
2692 
setDiagnostics(const SegmentInfoPtr & info,const String & source)2693 void IndexWriter::setDiagnostics(const SegmentInfoPtr& info, const String& source) {
2694     setDiagnostics(info, source, MapStringString());
2695 }
2696 
setDiagnostics(const SegmentInfoPtr & info,const String & source,MapStringString details)2697 void IndexWriter::setDiagnostics(const SegmentInfoPtr& info, const String& source, MapStringString details) {
2698     MapStringString diagnostics(MapStringString::newInstance());
2699     diagnostics.put(L"source", source);
2700     diagnostics.put(L"lucene.version", Constants::LUCENE_VERSION);
2701     diagnostics.put(L"os", Constants::OS_NAME);
2702     if (details) {
2703         diagnostics.putAll(details.begin(), details.end());
2704     }
2705     info->setDiagnostics(diagnostics);
2706 }
2707 
mergeFinish(const OneMergePtr & merge)2708 void IndexWriter::mergeFinish(const OneMergePtr& merge) {
2709     SyncLock syncLock(this);
2710     // Optimize, addIndexes or finishMerges may be waiting on merges to finish.
2711     notifyAll();
2712 
2713     // It's possible we are called twice, eg if there was an exception inside mergeInit
2714     if (merge->registerDone) {
2715         SegmentInfosPtr sourceSegments(merge->segments);
2716         int32_t end = sourceSegments->size();
2717         for (int32_t i = 0; i < end; ++i) {
2718             mergingSegments.remove(sourceSegments->info(i));
2719         }
2720 
2721         mergingSegments.remove(merge->info);
2722         merge->registerDone = false;
2723     }
2724 
2725     runningMerges.remove(merge);
2726 }
2727 
setMergeDocStoreIsCompoundFile(const OneMergePtr & merge)2728 void IndexWriter::setMergeDocStoreIsCompoundFile(const OneMergePtr& merge) {
2729     SyncLock syncLock(this);
2730 
2731     String mergeDocStoreSegment(merge->info->getDocStoreSegment());
2732     if (!mergeDocStoreSegment.empty() && !merge->info->getDocStoreIsCompoundFile()) {
2733         int32_t size = segmentInfos->size();
2734         for (int32_t i = 0; i < size; ++i) {
2735             SegmentInfoPtr info(segmentInfos->info(i));
2736             String docStoreSegment(info->getDocStoreSegment());
2737             if (!docStoreSegment.empty() && docStoreSegment == mergeDocStoreSegment && info->getDocStoreIsCompoundFile()) {
2738                 merge->info->setDocStoreIsCompoundFile(true);
2739                 break;
2740             }
2741         }
2742     }
2743 }
2744 
closeMergeReaders(const OneMergePtr & merge,bool suppressExceptions)2745 void IndexWriter::closeMergeReaders(const OneMergePtr& merge, bool suppressExceptions) {
2746     SyncLock syncLock(this);
2747 
2748     int32_t numSegments = merge->segments->size();
2749     if (suppressExceptions) {
2750         // Suppress any new exceptions so we throw the original cause
2751         for (int32_t i = 0; i < numSegments; ++i) {
2752             if (merge->readers[i]) {
2753                 try {
2754                     readerPool->release(merge->readers[i], false);
2755                 } catch (...) {
2756                 }
2757                 merge->readers[i].reset();
2758             }
2759 
2760             if (merge->readersClone[i]) {
2761                 try {
2762                     merge->readersClone[i]->close();
2763                 } catch (...) {
2764                 }
2765                 // This was a private clone and we had the only reference
2766                 BOOST_ASSERT(merge->readersClone[i]->getRefCount() == 0);
2767                 merge->readersClone[i].reset();
2768             }
2769         }
2770     } else {
2771         for (int32_t i = 0; i < numSegments; ++i) {
2772             if (merge->readers[i]) {
2773                 readerPool->release(merge->readers[i], true);
2774                 merge->readers[i].reset();
2775             }
2776 
2777             if (merge->readersClone[i]) {
2778                 merge->readersClone[i]->close();
2779                 // This was a private clone and we had the only reference
2780                 BOOST_ASSERT(merge->readersClone[i]->getRefCount() == 0);
2781                 merge->readersClone[i].reset();
2782             }
2783         }
2784     }
2785 }
2786 
mergeMiddle(const OneMergePtr & merge)2787 int32_t IndexWriter::mergeMiddle(const OneMergePtr& merge) {
2788     merge->checkAborted(directory);
2789 
2790     String mergedName(merge->info->name);
2791     int32_t mergedDocCount = 0;
2792 
2793     SegmentInfosPtr sourceSegments(merge->segments);
2794     int32_t numSegments = sourceSegments->size();
2795 
2796     if (infoStream) {
2797         message(L"merging " + merge->segString(directory));
2798     }
2799 
2800     SegmentMergerPtr merger(newLucene<SegmentMerger>(shared_from_this(), mergedName, merge));
2801 
2802     merge->readers = Collection<SegmentReaderPtr>::newInstance(numSegments);
2803     merge->readersClone = Collection<SegmentReaderPtr>::newInstance(numSegments);
2804 
2805     bool mergeDocStores = false;
2806 
2807     String currentDocStoreSegment;
2808     {
2809         SyncLock syncLock(this);
2810         currentDocStoreSegment = docWriter->getDocStoreSegment();
2811     }
2812 
2813     bool currentDSSMerged = false;
2814 
2815     LuceneException finally;
2816     // This is try/finally to make sure merger's readers are closed
2817     bool success = false;
2818     try {
2819         int32_t totDocCount = 0;
2820         for (int32_t i = 0; i < numSegments; ++i) {
2821             SegmentInfoPtr info(sourceSegments->info(i));
2822 
2823             // Hold onto the "live" reader; we will use this to commit merged deletes
2824             merge->readers[i] = readerPool->get(info, merge->mergeDocStores, MERGE_READ_BUFFER_SIZE, -1);
2825             SegmentReaderPtr reader(merge->readers[i]);
2826 
2827             // We clone the segment readers because other deletes may come in while we're merging so we need readers that will not change
2828             merge->readersClone[i] = boost::dynamic_pointer_cast<SegmentReader>(reader->clone(true));
2829             SegmentReaderPtr clone(merge->readersClone[i]);
2830             merger->add(clone);
2831 
2832             if (clone->hasDeletions()) {
2833                 mergeDocStores = true;
2834             }
2835 
2836             if (info->getDocStoreOffset() != -1 && !currentDocStoreSegment.empty()) {
2837                 currentDSSMerged = currentDSSMerged || (currentDocStoreSegment == info->getDocStoreSegment());
2838             }
2839 
2840             totDocCount += clone->numDocs();
2841         }
2842 
2843         if (infoStream) {
2844             message(L"merge: total " + StringUtils::toString(totDocCount) + L" docs");
2845         }
2846 
2847         merge->checkAborted(directory);
2848 
2849         // If deletions have arrived and it has now become necessary to merge doc stores, go and open them
2850         if (mergeDocStores && !merge->mergeDocStores) {
2851             merge->mergeDocStores = true;
2852 
2853             {
2854                 SyncLock syncLock(this);
2855                 if (currentDSSMerged) {
2856                     if (infoStream) {
2857                         message(L"now flush at mergeMiddle");
2858                     }
2859                     doFlush(true, false);
2860                 }
2861             }
2862 
2863             for (Collection<SegmentReaderPtr>::iterator reader = merge->readersClone.begin(); reader != merge->readersClone.end(); ++reader) {
2864                 (*reader)->openDocStores();
2865             }
2866 
2867             // Clear DSS
2868             merge->info->setDocStore(-1, L"", false);
2869         }
2870 
2871         // This is where all the work happens
2872         merge->info->docCount = merger->merge(merge->mergeDocStores);
2873         mergedDocCount = merge->info->docCount;
2874 
2875         BOOST_ASSERT(mergedDocCount == totDocCount);
2876 
2877         if (merge->useCompoundFile) {
2878             success = false;
2879 
2880             String compoundFileName(IndexFileNames::segmentFileName(mergedName, IndexFileNames::COMPOUND_FILE_EXTENSION()));
2881 
2882             try {
2883                 if (infoStream) {
2884                     message(L"create compound file " + compoundFileName);
2885                 }
2886                 merger->createCompoundFile(compoundFileName);
2887                 success = true;
2888             } catch (IOException& ioe) {
2889                 SyncLock syncLock(this);
2890                 if (merge->isAborted()) {
2891                     // This can happen if rollback or close(false) is called - fall through to logic
2892                     // below to remove the partially created CFS
2893                 } else {
2894                     finally = handleMergeException(ioe, merge);
2895                 }
2896             } catch (LuceneException& e) {
2897                 finally = handleMergeException(e, merge);
2898             }
2899 
2900             if (!success) {
2901                 if (infoStream) {
2902                     message(L"hit exception creating compound file during merge");
2903                 }
2904                 {
2905                     SyncLock syncLock(this);
2906                     deleter->deleteFile(compoundFileName);
2907                     deleter->deleteNewFiles(merger->getMergedFiles());
2908                 }
2909             }
2910 
2911             finally.throwException();
2912 
2913             success = false;
2914 
2915             {
2916                 SyncLock syncLock(this);
2917 
2918                 // delete new non cfs files directly: they were never registered with IFD
2919                 deleter->deleteNewFiles(merger->getMergedFiles());
2920 
2921                 if (merge->isAborted()) {
2922                     if (infoStream) {
2923                         message(L"abort merge after building CFS");
2924                     }
2925                     deleter->deleteFile(compoundFileName);
2926                     boost::throw_exception(TemporaryException());
2927                 }
2928             }
2929 
2930             merge->info->setUseCompoundFile(true);
2931         }
2932 
2933         int32_t termsIndexDivisor = -1;
2934         bool loadDocStores = false;
2935 
2936         // if the merged segment warmer was not installed when this merge was started, causing us
2937         // to not force the docStores to close, we can't warm it now
2938         bool canWarm = (merge->info->getDocStoreSegment().empty() || currentDocStoreSegment.empty() || merge->info->getDocStoreSegment() == currentDocStoreSegment);
2939 
2940         if (poolReaders && mergedSegmentWarmer && canWarm) {
2941             // Load terms index & doc stores so the segment warmer can run searches, load documents/term vectors
2942             termsIndexDivisor = readerTermsIndexDivisor;
2943             loadDocStores = true;
2944         }
2945 
2946         SegmentReaderPtr mergedReader(readerPool->get(merge->info, loadDocStores, BufferedIndexInput::BUFFER_SIZE, termsIndexDivisor));
2947 
2948         try {
2949             if (poolReaders && mergedSegmentWarmer) {
2950                 mergedSegmentWarmer->warm(mergedReader);
2951             }
2952             if (!commitMerge(merge, merger, mergedDocCount, mergedReader)) {
2953                 // commitMerge will return false if this merge was aborted
2954                 boost::throw_exception(TemporaryException());
2955             }
2956         } catch (LuceneException& e) {
2957             finally = e;
2958         }
2959 
2960         {
2961             SyncLock syncLock(this);
2962             readerPool->release(mergedReader);
2963         }
2964 
2965         finally.throwException();
2966 
2967         success = true;
2968     } catch (LuceneException& e) {
2969         finally = e;
2970     }
2971 
2972     // Readers are already closed in commitMerge if we didn't hit an exc
2973     if (!success) {
2974         closeMergeReaders(merge, true);
2975     }
2976 
2977     // has this merge been aborted?
2978     if (finally.getType() == LuceneException::Temporary) {
2979         return 0;
2980     }
2981 
2982     finally.throwException();
2983 
2984     return mergedDocCount;
2985 }
2986 
addMergeException(const OneMergePtr & merge)2987 void IndexWriter::addMergeException(const OneMergePtr& merge) {
2988     SyncLock syncLock(this);
2989     BOOST_ASSERT(!merge->getException().isNull());
2990     if (!mergeExceptions.contains(merge) && mergeGen == merge->mergeGen) {
2991         mergeExceptions.add(merge);
2992     }
2993 }
2994 
applyDeletes()2995 bool IndexWriter::applyDeletes() {
2996     TestScope testScope(L"IndexWriter", L"applyDeletes");
2997     SyncLock syncLock(this);
2998     BOOST_ASSERT(testPoint(L"startApplyDeletes"));
2999     ++flushDeletesCount;
3000     bool success = false;
3001     bool changed = false;
3002 
3003     LuceneException finally;
3004     try {
3005         changed = docWriter->applyDeletes(segmentInfos);
3006         success = true;
3007     } catch (LuceneException& e) {
3008         finally = e;
3009     }
3010 
3011     if (!success && infoStream) {
3012         message(L"hit exception flushing deletes");
3013     }
3014 
3015     finally.throwException();
3016 
3017     if (changed) {
3018         checkpoint();
3019     }
3020     return changed;
3021 }
3022 
getBufferedDeleteTermsSize()3023 int32_t IndexWriter::getBufferedDeleteTermsSize() {
3024     SyncLock syncLock(this);
3025     return docWriter->getBufferedDeleteTerms().size();
3026 }
3027 
getNumBufferedDeleteTerms()3028 int32_t IndexWriter::getNumBufferedDeleteTerms() {
3029     SyncLock syncLock(this);
3030     return docWriter->getNumBufferedDeleteTerms();
3031 }
3032 
newestSegment()3033 SegmentInfoPtr IndexWriter::newestSegment() {
3034     return !segmentInfos->empty() ? segmentInfos->info(segmentInfos->size() - 1) : SegmentInfoPtr();
3035 }
3036 
segString()3037 String IndexWriter::segString() {
3038     return segString(segmentInfos);
3039 }
3040 
segString(const SegmentInfosPtr & infos)3041 String IndexWriter::segString(const SegmentInfosPtr& infos) {
3042     SyncLock syncLock(this);
3043     StringStream buffer;
3044     int32_t count = infos->size();
3045     for (int32_t i = 0; i < count; ++i) {
3046         if (i > 0) {
3047             buffer << L" ";
3048         }
3049         SegmentInfoPtr info(infos->info(i));
3050         buffer << info->segString(directory);
3051         if (info->dir != directory) {
3052             buffer << L"**";
3053         }
3054     }
3055     return buffer.str();
3056 }
3057 
startSync(const String & fileName,HashSet<String> pending)3058 bool IndexWriter::startSync(const String& fileName, HashSet<String> pending) {
3059     SyncLock syncedLock(&synced);
3060     if (!synced.contains(fileName)) {
3061         if (!syncing.contains(fileName)) {
3062             syncing.add(fileName);
3063             return true;
3064         } else {
3065             pending.add(fileName);
3066             return false;
3067         }
3068     } else {
3069         return false;
3070     }
3071 }
3072 
finishSync(const String & fileName,bool success)3073 void IndexWriter::finishSync(const String& fileName, bool success) {
3074     SyncLock syncedLock(&synced);
3075     BOOST_ASSERT(syncing.contains(fileName));
3076     syncing.remove(fileName);
3077     if (success) {
3078         synced.add(fileName);
3079     }
3080     synced.notifyAll();
3081 }
3082 
waitForAllSynced(HashSet<String> syncing)3083 bool IndexWriter::waitForAllSynced(HashSet<String> syncing) {
3084     SyncLock syncedLock(&synced);
3085     for (HashSet<String>::iterator fileName = syncing.begin(); fileName != syncing.end(); ++fileName) {
3086         while (!synced.contains(*fileName)) {
3087             if (!syncing.contains(*fileName)) {
3088                 // There was an error because a file that was previously syncing failed to appear in synced
3089                 return false;
3090             } else {
3091                 synced.wait();
3092             }
3093         }
3094     }
3095     return true;
3096 }
3097 
doWait()3098 void IndexWriter::doWait() {
3099     SyncLock syncLock(this);
3100     // NOTE: the callers of this method should in theory be able to do simply wait(), but, as a defense against
3101     // thread timing hazards where notifyAll() fails to be called, we wait for at most 1 second and then return
3102     // so caller can check if wait conditions are satisfied
3103     wait(1000);
3104 }
3105 
startCommit(int64_t sizeInBytes,MapStringString commitUserData)3106 void IndexWriter::startCommit(int64_t sizeInBytes, MapStringString commitUserData) {
3107     BOOST_ASSERT(testPoint(L"startStartCommit"));
3108 
3109     if (hitOOM) {
3110         boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot commit"));
3111     }
3112 
3113     try {
3114         if (infoStream) {
3115             message(L"startCommit(): start sizeInBytes=" + StringUtils::toString(sizeInBytes));
3116         }
3117 
3118         SegmentInfosPtr toSync;
3119         int64_t myChangeCount = 0;
3120         LuceneException finally;
3121 
3122         {
3123             SyncLock syncLock(this);
3124 
3125             // Wait for any running addIndexes to complete first, then block any from running
3126             // until we've copied the segmentInfos we intend to sync
3127             blockAddIndexes(false);
3128 
3129             // On commit the segmentInfos must never reference a segment in another directory
3130             BOOST_ASSERT(!hasExternalSegments());
3131 
3132             try {
3133                 BOOST_ASSERT(lastCommitChangeCount <= changeCount);
3134                 myChangeCount = changeCount;
3135 
3136                 if (changeCount == lastCommitChangeCount) {
3137                     if (infoStream) {
3138                         message(L" skip startCommit(): no changes pending");
3139                     }
3140                     boost::throw_exception(TemporaryException());
3141                 }
3142 
3143                 // First, we clone & incref the segmentInfos we intend to sync, then, without locking, we sync() each
3144                 // file referenced by toSync, in the background.  Multiple threads can be doing this at once, if say
3145                 // a large merge and a small merge finish at the same time
3146 
3147                 if (infoStream) {
3148                     message(L"startCommit index=" + segString(segmentInfos) + L" changeCount=" + StringUtils::toString(changeCount));
3149                 }
3150 
3151                 readerPool->commit();
3152 
3153                 // It's possible another flush (that did not close the open do stores) snook in after the flush we
3154                 // just did, so we remove any tail segments referencing the open doc store from the SegmentInfos
3155                 // we are about to sync (the main SegmentInfos will keep them)
3156                 toSync = boost::dynamic_pointer_cast<SegmentInfos>(segmentInfos->clone());
3157 
3158                 String dss(docWriter->getDocStoreSegment());
3159                 if (!dss.empty()) {
3160                     while (true) {
3161                         String dss2(toSync->info(toSync->size() - 1)->getDocStoreSegment());
3162                         if (dss2.empty() || dss2 != dss) {
3163                             break;
3164                         }
3165                         toSync->remove(toSync->size() - 1);
3166                         ++changeCount;
3167                     }
3168                 }
3169 
3170                 if (commitUserData) {
3171                     toSync->setUserData(commitUserData);
3172                 }
3173 
3174                 deleter->incRef(toSync, false);
3175 
3176                 HashSet<String> files(toSync->files(directory, false));
3177                 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
3178                     BOOST_ASSERT(directory->fileExists(*fileName));
3179 
3180                     // If this trips it means we are missing a call to .checkpoint somewhere, because by the
3181                     // time we are called, deleter should know about every file referenced by the current head
3182                     // segmentInfos
3183                     BOOST_ASSERT(deleter->exists(*fileName));
3184                 }
3185             } catch (LuceneException& e) {
3186                 finally = e;
3187             }
3188             resumeAddIndexes();
3189 
3190             // no changes pending?
3191             if (finally.getType() == LuceneException::Temporary) {
3192                 return;
3193             }
3194 
3195             finally.throwException();
3196         }
3197 
3198         BOOST_ASSERT(testPoint(L"midStartCommit"));
3199 
3200         bool setPending = false;
3201 
3202         try {
3203             // Loop until all files toSync references are sync'd
3204             while (true) {
3205                 HashSet<String> pending(HashSet<String>::newInstance());
3206                 HashSet<String> files(toSync->files(directory, false));
3207                 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
3208                     if (startSync(*fileName, pending)) {
3209                         bool success = false;
3210                         try {
3211                             // Because we incRef'd this commit point above, the file had better exist
3212                             BOOST_ASSERT(directory->fileExists(*fileName));
3213 
3214                             if (infoStream) {
3215                                 message(L"now sync " + *fileName);
3216                             }
3217                             directory->sync(*fileName);
3218                             success = true;
3219                         } catch (LuceneException& e) {
3220                             finally = e;
3221                         }
3222                         finishSync(*fileName, success);
3223                         finally.throwException();
3224                     }
3225                 }
3226 
3227                 // All files that I require are either synced or being synced by other threads.  If they are being
3228                 // synced, we must at this point block until they are done.  If this returns false, that means an
3229                 // error in another thread resulted in failing to actually sync one of our files, so we repeat
3230                 if (waitForAllSynced(pending)) {
3231                     break;
3232                 }
3233             }
3234 
3235             BOOST_ASSERT(testPoint(L"midStartCommit2"));
3236 
3237             {
3238                 SyncLock syncLock(this);
3239 
3240                 // If someone saved a newer version of segments file since I first started syncing
3241                 // my version, I can safely skip saving myself since I've been superseded
3242 
3243                 while (true) {
3244                     if (myChangeCount <= lastCommitChangeCount) {
3245                         if (infoStream) {
3246                             message(L"sync superseded by newer infos");
3247                         }
3248                         break;
3249                     } else if (!pendingCommit) {
3250                         // My turn to commit
3251                         if (segmentInfos->getGeneration() > toSync->getGeneration()) {
3252                             toSync->updateGeneration(segmentInfos);
3253                         }
3254 
3255                         bool success = false;
3256                         try {
3257                             // Exception here means nothing is prepared (this method unwinds
3258                             // everything it did on an exception)
3259                             try {
3260                                 toSync->prepareCommit(directory);
3261                             } catch (LuceneException& e) {
3262                                 finally = e;
3263                             }
3264 
3265                             // Have our master segmentInfos record the generations we just prepared.  We do this on
3266                             // error or success so we don't double-write a segments_N file.
3267                             segmentInfos->updateGeneration(toSync);
3268                             finally.throwException();
3269 
3270                             BOOST_ASSERT(!pendingCommit);
3271                             setPending = true;
3272                             pendingCommit = toSync;
3273                             pendingCommitChangeCount = myChangeCount;
3274                             success = true;
3275                         } catch (LuceneException& e) {
3276                             finally = e;
3277                         }
3278 
3279                         if (!success && infoStream) {
3280                             message(L"hit exception committing segments file");
3281                         }
3282                         finally.throwException();
3283                         break;
3284                     } else {
3285                         // Must wait for other commit to complete
3286                         doWait();
3287                     }
3288                 }
3289             }
3290 
3291             if (infoStream) {
3292                 message(L"done all syncs");
3293             }
3294             BOOST_ASSERT(testPoint(L"midStartCommitSuccess"));
3295         } catch (LuceneException& e) {
3296             finally = e;
3297         }
3298 
3299         {
3300             SyncLock syncLock(this);
3301             if (!setPending) {
3302                 deleter->decRef(toSync);
3303             }
3304         }
3305         finally.throwException();
3306     } catch (std::bad_alloc& oom) {
3307         boost::throw_exception(handleOOM(oom, L"startCommit"));
3308     }
3309     BOOST_ASSERT(testPoint(L"finishStartCommit"));
3310 }
3311 
isLocked(const DirectoryPtr & directory)3312 bool IndexWriter::isLocked(const DirectoryPtr& directory) {
3313     return directory->makeLock(WRITE_LOCK_NAME)->isLocked();
3314 }
3315 
unlock(const DirectoryPtr & directory)3316 void IndexWriter::unlock(const DirectoryPtr& directory) {
3317     directory->makeLock(IndexWriter::WRITE_LOCK_NAME)->release();
3318 }
3319 
setMergedSegmentWarmer(const IndexReaderWarmerPtr & warmer)3320 void IndexWriter::setMergedSegmentWarmer(const IndexReaderWarmerPtr& warmer) {
3321     mergedSegmentWarmer = warmer;
3322 }
3323 
getMergedSegmentWarmer()3324 IndexReaderWarmerPtr IndexWriter::getMergedSegmentWarmer() {
3325     return mergedSegmentWarmer;
3326 }
3327 
handleOOM(const std::bad_alloc & oom,const String & location)3328 LuceneException IndexWriter::handleOOM(const std::bad_alloc& oom, const String& location) {
3329     if (infoStream) {
3330         message(L"hit OutOfMemoryError inside " + location);
3331     }
3332     hitOOM = true;
3333     return OutOfMemoryError();
3334 }
3335 
testPoint(const String & name)3336 bool IndexWriter::testPoint(const String& name) {
3337     return true;
3338 }
3339 
nrtIsCurrent(const SegmentInfosPtr & infos)3340 bool IndexWriter::nrtIsCurrent(const SegmentInfosPtr& infos) {
3341     SyncLock syncLock(this);
3342     if (!infos->equals(segmentInfos)) {
3343         // if any structural changes (new segments), we are stale
3344         return false;
3345     } else if (infos->getGeneration() != segmentInfos->getGeneration()) {
3346         // if any commit took place since we were opened, we are stale
3347         return false;
3348     } else {
3349         return !docWriter->anyChanges();
3350     }
3351 }
3352 
isClosed()3353 bool IndexWriter::isClosed() {
3354     SyncLock syncLock(this);
3355     return closed;
3356 }
3357 
ReaderPool(const IndexWriterPtr & writer)3358 ReaderPool::ReaderPool(const IndexWriterPtr& writer) {
3359     readerMap = MapSegmentInfoSegmentReader::newInstance();
3360     _indexWriter = writer;
3361 }
3362 
~ReaderPool()3363 ReaderPool::~ReaderPool() {
3364 }
3365 
clear(const SegmentInfosPtr & infos)3366 void ReaderPool::clear(const SegmentInfosPtr& infos) {
3367     SyncLock syncLock(this);
3368     if (!infos) {
3369         for (MapSegmentInfoSegmentReader::iterator ent = readerMap.begin(); ent != readerMap.end(); ++ent) {
3370             ent->second->_hasChanges = false;
3371         }
3372     } else {
3373         for (int32_t i = 0; i < infos->size(); ++i) {
3374             MapSegmentInfoSegmentReader::iterator ent = readerMap.find(infos->info(i));
3375             if (ent != readerMap.end()) {
3376                 ent->second->_hasChanges = false;
3377             }
3378         }
3379     }
3380 }
3381 
infoIsLive(const SegmentInfoPtr & info)3382 bool ReaderPool::infoIsLive(const SegmentInfoPtr& info) {
3383     SyncLock syncLock(this);
3384     IndexWriterPtr indexWriter(_indexWriter);
3385     int32_t idx = indexWriter->segmentInfos->find(info);
3386     BOOST_ASSERT(idx != -1);
3387     BOOST_ASSERT(indexWriter->segmentInfos->info(idx) == info);
3388     return true;
3389 }
3390 
mapToLive(const SegmentInfoPtr & info)3391 SegmentInfoPtr ReaderPool::mapToLive(const SegmentInfoPtr& info) {
3392     SyncLock syncLock(this);
3393     IndexWriterPtr indexWriter(_indexWriter);
3394     int32_t idx = indexWriter->segmentInfos->find(info);
3395     SegmentInfoPtr _info(info);
3396     if (idx != -1) {
3397         _info = indexWriter->segmentInfos->info(idx);
3398     }
3399     return _info;
3400 }
3401 
release(const SegmentReaderPtr & sr)3402 void ReaderPool::release(const SegmentReaderPtr& sr) {
3403     release(sr, false);
3404 }
3405 
release(const SegmentReaderPtr & sr,bool drop)3406 void ReaderPool::release(const SegmentReaderPtr& sr, bool drop) {
3407     SyncLock syncLock(this);
3408     IndexWriterPtr indexWriter(_indexWriter);
3409 
3410     bool pooled = readerMap.contains(sr->getSegmentInfo());
3411 
3412     BOOST_ASSERT(!pooled || readerMap.get(sr->getSegmentInfo()) == sr);
3413 
3414     // Drop caller's ref; for an external reader (not pooled), this decRef will close it
3415     sr->decRef();
3416 
3417     if (pooled && (drop || (!indexWriter->poolReaders && sr->getRefCount() == 1))) {
3418         // We invoke deleter.checkpoint below, so we must be sync'd on IW if there are changes
3419         BOOST_ASSERT(!sr->_hasChanges || holdsLock());
3420 
3421         // Discard (don't save) changes when we are dropping the reader; this is used only on the
3422         // sub-readers after a successful merge.
3423         sr->_hasChanges = sr->_hasChanges && !drop;
3424 
3425         bool hasChanges = sr->_hasChanges;
3426 
3427         // Drop our ref - this will commit any pending changes to the dir
3428         sr->close();
3429 
3430         // We are the last ref to this reader; since we're not pooling readers, we release it
3431         readerMap.remove(sr->getSegmentInfo());
3432 
3433         if (hasChanges) {
3434             // Must checkpoint with deleter, because this segment reader will have created new
3435             // _X_N.del file.
3436             indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3437         }
3438     }
3439 }
3440 
close()3441 void ReaderPool::close() {
3442     SyncLock syncLock(this);
3443     IndexWriterPtr indexWriter(_indexWriter);
3444 
3445     // We invoke deleter.checkpoint below, so we must be sync'd on IW
3446     BOOST_ASSERT(holdsLock());
3447 
3448     for (MapSegmentInfoSegmentReader::iterator iter = readerMap.begin(); iter != readerMap.end(); ++iter) {
3449         if (iter->second->_hasChanges) {
3450             BOOST_ASSERT(infoIsLive(iter->second->getSegmentInfo()));
3451             iter->second->doCommit(MapStringString());
3452 
3453             // Must checkpoint with deleter, because this segment reader will have created
3454             // new _X_N.del file.
3455             indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3456         }
3457 
3458         // NOTE: it is allowed that this decRef does not actually close the SR; this can happen when a
3459         // near real-time reader is kept open after the IndexWriter instance is closed
3460         iter->second->decRef();
3461     }
3462     readerMap.clear();
3463 }
3464 
commit()3465 void ReaderPool::commit() {
3466     SyncLock syncLock(this);
3467     IndexWriterPtr indexWriter(_indexWriter);
3468 
3469     // We invoke deleter.checkpoint below, so we must be sync'd on IW
3470     BOOST_ASSERT(holdsLock());
3471 
3472     for (MapSegmentInfoSegmentReader::iterator ent = readerMap.begin(); ent != readerMap.end(); ++ent) {
3473         if (ent->second->_hasChanges) {
3474             BOOST_ASSERT(infoIsLive(ent->second->getSegmentInfo()));
3475             ent->second->doCommit(MapStringString());
3476 
3477             // Must checkpoint with deleter, because this segment reader will have created
3478             // new _X_N.del file.
3479             indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3480         }
3481     }
3482 }
3483 
getReadOnlyClone(const SegmentInfoPtr & info,bool doOpenStores,int32_t termInfosIndexDivisor)3484 IndexReaderPtr ReaderPool::getReadOnlyClone(const SegmentInfoPtr& info, bool doOpenStores, int32_t termInfosIndexDivisor) {
3485     SyncLock syncLock(this);
3486     SegmentReaderPtr sr(get(info, doOpenStores, BufferedIndexInput::BUFFER_SIZE, termInfosIndexDivisor));
3487     IndexReaderPtr clone;
3488     LuceneException finally;
3489     try {
3490         clone = boost::dynamic_pointer_cast<IndexReader>(sr->clone(true));
3491     } catch (LuceneException& e) {
3492         finally = e;
3493     }
3494     sr->decRef();
3495     finally.throwException();
3496     return clone;
3497 }
3498 
get(const SegmentInfoPtr & info,bool doOpenStores)3499 SegmentReaderPtr ReaderPool::get(const SegmentInfoPtr& info, bool doOpenStores) {
3500     return get(info, doOpenStores, BufferedIndexInput::BUFFER_SIZE, IndexWriterPtr(_indexWriter)->readerTermsIndexDivisor);
3501 }
3502 
get(const SegmentInfoPtr & info,bool doOpenStores,int32_t readBufferSize,int32_t termsIndexDivisor)3503 SegmentReaderPtr ReaderPool::get(const SegmentInfoPtr& info, bool doOpenStores, int32_t readBufferSize, int32_t termsIndexDivisor) {
3504     SyncLock syncLock(this);
3505     IndexWriterPtr indexWriter(_indexWriter);
3506     if (indexWriter->poolReaders) {
3507         readBufferSize = BufferedIndexInput::BUFFER_SIZE;
3508     }
3509 
3510     SegmentReaderPtr sr(readerMap.get(info));
3511     if (!sr) {
3512         // Returns a ref, which we xfer to readerMap
3513         sr = SegmentReader::get(false, info->dir, info, readBufferSize, doOpenStores, termsIndexDivisor);
3514         if (info->dir == indexWriter->directory) {
3515             // Only pool if reader is not external
3516             readerMap.put(info, sr);
3517         }
3518     } else {
3519         if (doOpenStores) {
3520             sr->openDocStores();
3521         }
3522         if (termsIndexDivisor != -1 && !sr->termsIndexLoaded()) {
3523             // If this reader was originally opened because we needed to merge it, we didn't load the terms
3524             // index.  But now, if the caller wants the terms index (eg because it's doing deletes, or an NRT
3525             // reader is being opened) we ask the reader to load its terms index.
3526             sr->loadTermsIndex(termsIndexDivisor);
3527         }
3528     }
3529 
3530     // Return a ref to our caller
3531     if (info->dir == indexWriter->directory) {
3532         // Only incRef if we pooled (reader is not external)
3533         sr->incRef();
3534     }
3535     return sr;
3536 }
3537 
getIfExists(const SegmentInfoPtr & info)3538 SegmentReaderPtr ReaderPool::getIfExists(const SegmentInfoPtr& info) {
3539     SyncLock syncLock(this);
3540     SegmentReaderPtr sr(readerMap.get(info));
3541     if (sr) {
3542         sr->incRef();
3543     }
3544     return sr;
3545 }
3546 
~IndexReaderWarmer()3547 IndexReaderWarmer::~IndexReaderWarmer() {
3548 }
3549 
3550 }
3551