1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "IndexWriter.h"
9 #include "_IndexWriter.h"
10 #include "Directory.h"
11 #include "Analyzer.h"
12 #include "KeepOnlyLastCommitDeletionPolicy.h"
13 #include "DocumentsWriter.h"
14 #include "IndexFileDeleter.h"
15 #include "IndexFileNames.h"
16 #include "Lock.h"
17 #include "SegmentInfo.h"
18 #include "SegmentReader.h"
19 #include "ReadOnlyDirectoryReader.h"
20 #include "BufferedIndexInput.h"
21 #include "LogByteSizeMergePolicy.h"
22 #include "LogDocMergePolicy.h"
23 #include "Similarity.h"
24 #include "ConcurrentMergeScheduler.h"
25 #include "CompoundFileWriter.h"
26 #include "SegmentMerger.h"
27 #include "DateTools.h"
28 #include "Constants.h"
29 #include "InfoStream.h"
30 #include "TestPoint.h"
31 #include "StringUtils.h"
32
33 namespace Lucene {
34
35 /// The normal read buffer size defaults to 1024, but increasing this during merging seems to
36 /// yield performance gains. However we don't want to increase it too much because there are
37 /// quite a few BufferedIndexInputs created during merging.
38 const int32_t IndexWriter::MERGE_READ_BUFFER_SIZE = 4096;
39
40 int32_t IndexWriter::MESSAGE_ID = 0;
41 InfoStreamPtr IndexWriter::defaultInfoStream;
42
43 /// Default value for the write lock timeout (1,000).
44 int64_t IndexWriter::WRITE_LOCK_TIMEOUT = 1000;
45
46 const String IndexWriter::WRITE_LOCK_NAME = L"write.lock";
47
48 /// Value to denote a flush trigger is disabled.
49 const int32_t IndexWriter::DISABLE_AUTO_FLUSH = -1;
50
51 /// Disabled by default (because IndexWriter flushes by RAM usage by default).
52 const int32_t IndexWriter::DEFAULT_MAX_BUFFERED_DOCS = IndexWriter::DISABLE_AUTO_FLUSH;
53
54 /// Default value is 16 MB (which means flush when buffered docs consume 16 MB RAM).
55 const double IndexWriter::DEFAULT_RAM_BUFFER_SIZE_MB = 16.0;
56
57 /// Disabled by default (because IndexWriter flushes by RAM usage by default).
58 const int32_t IndexWriter::DEFAULT_MAX_BUFFERED_DELETE_TERMS = IndexWriter::DISABLE_AUTO_FLUSH;
59
60 /// Default value is 10000.
61 const int32_t IndexWriter::DEFAULT_MAX_FIELD_LENGTH = 10000;
62
63 /// Default value is 128.
64 const int32_t IndexWriter::DEFAULT_TERM_INDEX_INTERVAL = 128;
65
66 /// Sets the maximum field length to INT_MAX
67 const int32_t IndexWriter::MaxFieldLengthUNLIMITED = INT_MAX;
68
69 /// Sets the maximum field length to {@link #DEFAULT_MAX_FIELD_LENGTH}
70 const int32_t IndexWriter::MaxFieldLengthLIMITED = IndexWriter::DEFAULT_MAX_FIELD_LENGTH;
71
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,int32_t mfl)72 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, int32_t mfl) {
73 this->directory = d;
74 this->analyzer = a;
75 this->create = create;
76 this->maxFieldLength = mfl;
77 }
78
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,int32_t mfl)79 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, int32_t mfl) {
80 this->directory = d;
81 this->analyzer = a;
82 this->create = !IndexReader::indexExists(d);
83 this->maxFieldLength = mfl;
84 }
85
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl)86 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl) {
87 this->directory = d;
88 this->analyzer = a;
89 this->deletionPolicy = deletionPolicy;
90 this->create = !IndexReader::indexExists(d);
91 this->maxFieldLength = mfl;
92 }
93
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl)94 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl) {
95 this->directory = d;
96 this->analyzer = a;
97 this->create = create;
98 this->deletionPolicy = deletionPolicy;
99 this->maxFieldLength = mfl;
100 }
101
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,bool create,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl,const IndexingChainPtr & indexingChain,const IndexCommitPtr & commit)102 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, bool create, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl, const IndexingChainPtr& indexingChain, const IndexCommitPtr& commit) {
103 this->directory = d;
104 this->analyzer = a;
105 this->create = create;
106 this->deletionPolicy = deletionPolicy;
107 this->maxFieldLength = mfl;
108 this->indexingChain = indexingChain;
109 this->indexCommit = commit;
110 }
111
IndexWriter(const DirectoryPtr & d,const AnalyzerPtr & a,const IndexDeletionPolicyPtr & deletionPolicy,int32_t mfl,const IndexCommitPtr & commit)112 IndexWriter::IndexWriter(const DirectoryPtr& d, const AnalyzerPtr& a, const IndexDeletionPolicyPtr& deletionPolicy, int32_t mfl, const IndexCommitPtr& commit) {
113 this->directory = d;
114 this->analyzer = a;
115 this->create = false;
116 this->deletionPolicy = deletionPolicy;
117 this->maxFieldLength = mfl;
118 this->indexCommit = commit;
119 }
120
~IndexWriter()121 IndexWriter::~IndexWriter() {
122 }
123
initialize()124 void IndexWriter::initialize() {
125 messageID = -1;
126 messageIDLock = newInstance<Synchronize>();
127 setMessageID(defaultInfoStream);
128 this->writeLockTimeout = WRITE_LOCK_TIMEOUT;
129 this->segmentInfos = newLucene<SegmentInfos>();
130 pendingMerges = Collection<OneMergePtr>::newInstance();
131 mergeExceptions = Collection<OneMergePtr>::newInstance();
132 segmentsToOptimize = SetSegmentInfo::newInstance();
133 optimizeMaxNumSegments = 0;
134 mergingSegments = SetSegmentInfo::newInstance();
135 runningMerges = SetOneMerge::newInstance();
136 synced = HashSet<String>::newInstance();
137 syncing = HashSet<String>::newInstance();
138 changeCount = 0;
139 lastCommitChangeCount = 0;
140 poolReaders = false;
141 readCount = 0;
142 writeThread = 0;
143 upgradeCount = 0;
144 readerTermsIndexDivisor = IndexReader::DEFAULT_TERMS_INDEX_DIVISOR;
145 readerPool = newLucene<ReaderPool>(shared_from_this());
146 closed = false;
147 closing = false;
148 hitOOM = false;
149 stopMerges = false;
150 mergeGen = 0;
151 flushCount = 0;
152 flushDeletesCount = 0;
153 localFlushedDocCount = 0;
154 pendingCommitChangeCount = 0;
155 mergePolicy = newLucene<LogByteSizeMergePolicy>(shared_from_this());
156 mergeScheduler = newLucene<ConcurrentMergeScheduler>();
157 similarity = Similarity::getDefault();
158 termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;
159 commitLock = newInstance<Synchronize>();
160
161 if (!indexingChain) {
162 indexingChain = DocumentsWriter::getDefaultIndexingChain();
163 }
164
165 if (create) {
166 directory->clearLock(WRITE_LOCK_NAME); // clear the write lock in case it's leftover
167 }
168
169 LockPtr writeLock(directory->makeLock(WRITE_LOCK_NAME));
170
171 if (!writeLock->obtain((int32_t)writeLockTimeout)) { // obtain write lock
172 boost::throw_exception(LockObtainFailedException(L"Index locked for write: " + writeLock->toString()));
173 }
174 this->writeLock = writeLock;
175
176 bool success = false;
177 LuceneException finally;
178
179 try {
180 if (create) {
181 // Try to read first. This is to allow create against an index that's currently open for
182 // searching. In this case we write the next segments_N file with no segments
183 bool doCommit;
184 try {
185 segmentInfos->read(directory);
186 segmentInfos->clear();
187 doCommit = false;
188 } catch (LuceneException&) {
189 // Likely this means it's a fresh directory
190 doCommit = true;
191 }
192
193 if (doCommit) {
194 // Only commit if there is no segments file in this dir already.
195 segmentInfos->commit(directory);
196 HashSet<String> files(segmentInfos->files(directory, true));
197 synced.addAll(files.begin(), files.end());
198 } else {
199 // Record that we have a change (zero out all segments) pending
200 ++changeCount;
201 }
202 } else {
203 segmentInfos->read(directory);
204
205 if (indexCommit) {
206 // Swap out all segments, but, keep metadata in SegmentInfos, like version & generation, to
207 // preserve write-once. This is important if readers are open against the future commit points.
208 if (indexCommit->getDirectory() != directory) {
209 boost::throw_exception(IllegalArgumentException(L"IndexCommit's directory doesn't match my directory"));
210 }
211 SegmentInfosPtr oldInfos(newLucene<SegmentInfos>());
212 oldInfos->read(directory, indexCommit->getSegmentsFileName());
213 segmentInfos->replace(oldInfos);
214 ++changeCount;
215 if (infoStream) {
216 message(L"init: loaded commit \"" + indexCommit->getSegmentsFileName() + L"\"");
217 }
218 }
219
220 // We assume that this segments_N was previously properly sync'd
221 HashSet<String> files(segmentInfos->files(directory, true));
222 synced.addAll(files.begin(), files.end());
223 }
224
225 setRollbackSegmentInfos(segmentInfos);
226
227 docWriter = newLucene<DocumentsWriter>(directory, shared_from_this(), indexingChain);
228 docWriter->setInfoStream(infoStream);
229 docWriter->setMaxFieldLength(maxFieldLength);
230
231 // Default deleter (for backwards compatibility) is KeepOnlyLastCommitDeleter
232 deleter = newLucene<IndexFileDeleter>(directory, deletionPolicy ? deletionPolicy : newLucene<KeepOnlyLastCommitDeletionPolicy>(), segmentInfos, infoStream, docWriter, synced);
233
234 if (deleter->startingCommitDeleted) {
235 // Deletion policy deleted the "head" commit point. We have to mark ourself as changed so that if we
236 // are closed without any further changes we write a new segments_N file.
237 ++changeCount;
238 }
239
240 pushMaxBufferedDocs();
241
242 if (infoStream) {
243 message(L"init: create=" + StringUtils::toString(create));
244 }
245 messageState();
246
247 success = true;
248 } catch (LuceneException& e) {
249 finally = e;
250 }
251
252 if (!success) {
253 if (infoStream) {
254 message(L"init: hit exception on init; releasing write lock");
255 }
256 try {
257 this->writeLock->release();
258 } catch (...) {
259 // don't mask the original exception
260 }
261 this->writeLock.reset();
262 }
263
264 finally.throwException();
265 }
266
MAX_TERM_LENGTH()267 int32_t IndexWriter::MAX_TERM_LENGTH() {
268 static int32_t _MAX_TERM_LENGTH = 0;
269 if (_MAX_TERM_LENGTH == 0) {
270 _MAX_TERM_LENGTH = DocumentsWriter::MAX_TERM_LENGTH;
271 }
272 return _MAX_TERM_LENGTH;
273 }
274
getReader()275 IndexReaderPtr IndexWriter::getReader() {
276 return getReader(readerTermsIndexDivisor);
277 }
278
getReader(int32_t termInfosIndexDivisor)279 IndexReaderPtr IndexWriter::getReader(int32_t termInfosIndexDivisor) {
280 ensureOpen();
281
282 if (infoStream) {
283 message(L"flush at getReader");
284 }
285
286 // Do this up front before flushing so that the readers obtained during this flush are pooled, the first time
287 // this method is called
288 poolReaders = true;
289
290 // Prevent segmentInfos from changing while opening the reader; in theory we could do similar retry logic,
291 // just like we do when loading segments_N
292 IndexReaderPtr r;
293 {
294 SyncLock syncLock(this);
295 flush(false, true, true);
296 r = newLucene<ReadOnlyDirectoryReader>(shared_from_this(), segmentInfos, termInfosIndexDivisor);
297 }
298 maybeMerge();
299 return r;
300 }
301
numDeletedDocs(const SegmentInfoPtr & info)302 int32_t IndexWriter::numDeletedDocs(const SegmentInfoPtr& info) {
303 SegmentReaderPtr reader(readerPool->getIfExists(info));
304 int32_t deletedDocs = 0;
305 LuceneException finally;
306 try {
307 deletedDocs = reader ? reader->numDeletedDocs() : info->getDelCount();
308 } catch (LuceneException& e) {
309 finally = e;
310 }
311 if (reader) {
312 readerPool->release(reader);
313 }
314 finally.throwException();
315 return deletedDocs;
316 }
317
acquireWrite()318 void IndexWriter::acquireWrite() {
319 SyncLock syncLock(this);
320 BOOST_ASSERT(writeThread != LuceneThread::currentId());
321 while (writeThread != 0 || readCount > 0) {
322 doWait();
323 }
324
325 // we could have been closed while we were waiting
326 ensureOpen();
327
328 writeThread = LuceneThread::currentId();
329 }
330
releaseWrite()331 void IndexWriter::releaseWrite() {
332 SyncLock syncLock(this);
333 BOOST_ASSERT(writeThread == LuceneThread::currentId());
334 writeThread = 0;
335 notifyAll();
336 }
337
acquireRead()338 void IndexWriter::acquireRead() {
339 SyncLock syncLock(this);
340 int64_t current = LuceneThread::currentId();
341 while (writeThread != 0 && writeThread != current) {
342 doWait();
343 }
344 ++readCount;
345 }
346
upgradeReadToWrite()347 void IndexWriter::upgradeReadToWrite() {
348 SyncLock syncLock(this);
349 BOOST_ASSERT(readCount > 0);
350 ++upgradeCount;
351 while (readCount > upgradeCount || writeThread != 0) {
352 doWait();
353 }
354 writeThread = LuceneThread::currentId();
355 --readCount;
356 --upgradeCount;
357 }
358
releaseRead()359 void IndexWriter::releaseRead() {
360 SyncLock syncLock(this);
361 --readCount;
362 BOOST_ASSERT(readCount >= 0);
363 notifyAll();
364 }
365
isOpen(bool includePendingClose)366 bool IndexWriter::isOpen(bool includePendingClose) {
367 SyncLock syncLock(this);
368 return !(closed || (includePendingClose && closing));
369 }
370
ensureOpen(bool includePendingClose)371 void IndexWriter::ensureOpen(bool includePendingClose) {
372 SyncLock syncLock(this);
373 if (!isOpen(includePendingClose)) {
374 boost::throw_exception(AlreadyClosedException(L"This IndexWriter is closed"));
375 }
376 }
377
ensureOpen()378 void IndexWriter::ensureOpen() {
379 ensureOpen(true);
380 }
381
message(const String & message)382 void IndexWriter::message(const String& message) {
383 if (infoStream) {
384 *infoStream << L"IW " << StringUtils::toString(messageID);
385 *infoStream << L" [" << DateTools::timeToString(MiscUtils::currentTimeMillis(), DateTools::RESOLUTION_SECOND);
386 *infoStream << L"; " << StringUtils::toString(LuceneThread::currentId()) << L"]: " << message << L"\n";
387 }
388 }
389
setMessageID(const InfoStreamPtr & infoStream)390 void IndexWriter::setMessageID(const InfoStreamPtr& infoStream) {
391 SyncLock syncLock(this);
392 if (infoStream && messageID == -1) {
393 SyncLock messageLock(messageIDLock);
394 messageID = MESSAGE_ID++;
395 }
396 this->infoStream = infoStream;
397 }
398
getLogMergePolicy()399 LogMergePolicyPtr IndexWriter::getLogMergePolicy() {
400 LogMergePolicyPtr logMergePolicy(boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy));
401 if (logMergePolicy) {
402 return logMergePolicy;
403 }
404 boost::throw_exception(IllegalArgumentException(L"This method can only be called when the merge policy is the default LogMergePolicy"));
405 return LogMergePolicyPtr();
406 }
407
getUseCompoundFile()408 bool IndexWriter::getUseCompoundFile() {
409 return getLogMergePolicy()->getUseCompoundFile();
410 }
411
setUseCompoundFile(bool value)412 void IndexWriter::setUseCompoundFile(bool value) {
413 getLogMergePolicy()->setUseCompoundFile(value);
414 getLogMergePolicy()->setUseCompoundDocStore(value);
415 }
416
setSimilarity(const SimilarityPtr & similarity)417 void IndexWriter::setSimilarity(const SimilarityPtr& similarity) {
418 ensureOpen();
419 this->similarity = similarity;
420 docWriter->setSimilarity(similarity);
421 }
422
getSimilarity()423 SimilarityPtr IndexWriter::getSimilarity() {
424 ensureOpen();
425 return this->similarity;
426 }
427
setTermIndexInterval(int32_t interval)428 void IndexWriter::setTermIndexInterval(int32_t interval) {
429 ensureOpen();
430 this->termIndexInterval = interval;
431 }
432
getTermIndexInterval()433 int32_t IndexWriter::getTermIndexInterval() {
434 // We pass false because this method is called by SegmentMerger while we are in the process of closing
435 ensureOpen(false);
436 return termIndexInterval;
437 }
438
setRollbackSegmentInfos(const SegmentInfosPtr & infos)439 void IndexWriter::setRollbackSegmentInfos(const SegmentInfosPtr& infos) {
440 SyncLock syncLock(this);
441 rollbackSegmentInfos = boost::dynamic_pointer_cast<SegmentInfos>(infos->clone());
442 BOOST_ASSERT(!rollbackSegmentInfos->hasExternalSegments(directory));
443 rollbackSegments = MapSegmentInfoInt::newInstance();
444 int32_t size = rollbackSegmentInfos->size();
445 for (int32_t i = 0; i < size; ++i) {
446 rollbackSegments.put(rollbackSegmentInfos->info(i), i);
447 }
448 }
449
setMergePolicy(const MergePolicyPtr & mp)450 void IndexWriter::setMergePolicy(const MergePolicyPtr& mp) {
451 ensureOpen();
452 if (!mp) {
453 boost::throw_exception(NullPointerException(L"MergePolicy must be non-null"));
454 }
455
456 if (mergePolicy != mp) {
457 mergePolicy->close();
458 }
459 mergePolicy = mp;
460 pushMaxBufferedDocs();
461 if (infoStream) {
462 message(L"setMergePolicy");
463 }
464 }
465
getMergePolicy()466 MergePolicyPtr IndexWriter::getMergePolicy() {
467 ensureOpen();
468 return mergePolicy;
469 }
470
setMergeScheduler(const MergeSchedulerPtr & mergeScheduler)471 void IndexWriter::setMergeScheduler(const MergeSchedulerPtr& mergeScheduler) {
472 SyncLock syncLock(this);
473 ensureOpen();
474 if (!mergeScheduler) {
475 boost::throw_exception(NullPointerException(L"MergeScheduler must be non-null"));
476 }
477 if (this->mergeScheduler != mergeScheduler) {
478 finishMerges(true);
479 this->mergeScheduler->close();
480 }
481 this->mergeScheduler = mergeScheduler;
482 if (infoStream) {
483 message(L"setMergeScheduler");
484 }
485 }
486
getMergeScheduler()487 MergeSchedulerPtr IndexWriter::getMergeScheduler() {
488 ensureOpen();
489 return mergeScheduler;
490 }
491
setMaxMergeDocs(int32_t maxMergeDocs)492 void IndexWriter::setMaxMergeDocs(int32_t maxMergeDocs) {
493 getLogMergePolicy()->setMaxMergeDocs(maxMergeDocs);
494 }
495
getMaxMergeDocs()496 int32_t IndexWriter::getMaxMergeDocs() {
497 return getLogMergePolicy()->getMaxMergeDocs();
498 }
499
setMaxFieldLength(int32_t maxFieldLength)500 void IndexWriter::setMaxFieldLength(int32_t maxFieldLength) {
501 ensureOpen();
502 this->maxFieldLength = maxFieldLength;
503 docWriter->setMaxFieldLength(maxFieldLength);
504 if (infoStream) {
505 message(L"setMaxFieldLength " + StringUtils::toString(maxFieldLength));
506 }
507 }
508
getMaxFieldLength()509 int32_t IndexWriter::getMaxFieldLength() {
510 ensureOpen();
511 return maxFieldLength;
512 }
513
setReaderTermsIndexDivisor(int32_t divisor)514 void IndexWriter::setReaderTermsIndexDivisor(int32_t divisor) {
515 ensureOpen();
516 if (divisor <= 0) {
517 boost::throw_exception(IllegalArgumentException(L"divisor must be >= 1 (got " + StringUtils::toString(divisor) + L")"));
518 }
519 readerTermsIndexDivisor = divisor;
520 if (infoStream) {
521 message(L"setReaderTermsIndexDivisor " + StringUtils::toString(readerTermsIndexDivisor));
522 }
523 }
524
getReaderTermsIndexDivisor()525 int32_t IndexWriter::getReaderTermsIndexDivisor() {
526 ensureOpen();
527 return readerTermsIndexDivisor;
528 }
529
setMaxBufferedDocs(int32_t maxBufferedDocs)530 void IndexWriter::setMaxBufferedDocs(int32_t maxBufferedDocs) {
531 ensureOpen();
532 if (maxBufferedDocs != DISABLE_AUTO_FLUSH && maxBufferedDocs < 2) {
533 boost::throw_exception(IllegalArgumentException(L"maxBufferedDocs must at least be 2 when enabled"));
534 }
535 if (maxBufferedDocs == DISABLE_AUTO_FLUSH && getRAMBufferSizeMB() == DISABLE_AUTO_FLUSH) {
536 boost::throw_exception(IllegalArgumentException(L"at least one of ramBufferSize and maxBufferedDocs must be enabled"));
537 }
538 docWriter->setMaxBufferedDocs(maxBufferedDocs);
539 pushMaxBufferedDocs();
540 if (infoStream) {
541 message(L"setMaxBufferedDocs " + StringUtils::toString(maxBufferedDocs));
542 }
543 }
544
pushMaxBufferedDocs()545 void IndexWriter::pushMaxBufferedDocs() {
546 if (docWriter->getMaxBufferedDocs() != DISABLE_AUTO_FLUSH) {
547 LogDocMergePolicyPtr lmp(boost::dynamic_pointer_cast<LogDocMergePolicy>(mergePolicy));
548 if (lmp) {
549 int32_t maxBufferedDocs = docWriter->getMaxBufferedDocs();
550 if (lmp->getMinMergeDocs() != maxBufferedDocs) {
551 if (infoStream) {
552 message(L"now push maxBufferedDocs " + StringUtils::toString(maxBufferedDocs) + L" to LogDocMergePolicy");
553 }
554 lmp->setMinMergeDocs(maxBufferedDocs);
555 }
556 }
557 }
558 }
559
getMaxBufferedDocs()560 int32_t IndexWriter::getMaxBufferedDocs() {
561 ensureOpen();
562 return docWriter->getMaxBufferedDocs();
563 }
564
setRAMBufferSizeMB(double mb)565 void IndexWriter::setRAMBufferSizeMB(double mb) {
566 if (mb > 2048.0) {
567 boost::throw_exception(IllegalArgumentException(L"ramBufferSize " + StringUtils::toString(mb) + L" is too large; should be comfortably less than 2048"));
568 }
569 if (mb != DISABLE_AUTO_FLUSH && mb <= 0.0) {
570 boost::throw_exception(IllegalArgumentException(L"ramBufferSize should be > 0.0 MB when enabled"));
571 }
572 if (mb == DISABLE_AUTO_FLUSH && getMaxBufferedDocs() == DISABLE_AUTO_FLUSH) {
573 boost::throw_exception(IllegalArgumentException(L"at least one of ramBufferSize and maxBufferedDocs must be enabled"));
574 }
575 docWriter->setRAMBufferSizeMB(mb);
576 if (infoStream) {
577 message(L"setRAMBufferSizeMB " + StringUtils::toString(mb));
578 }
579 }
580
getRAMBufferSizeMB()581 double IndexWriter::getRAMBufferSizeMB() {
582 return docWriter->getRAMBufferSizeMB();
583 }
584
setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms)585 void IndexWriter::setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms) {
586 ensureOpen();
587 if (maxBufferedDeleteTerms != DISABLE_AUTO_FLUSH && maxBufferedDeleteTerms < 1) {
588 boost::throw_exception(IllegalArgumentException(L"maxBufferedDeleteTerms must at least be 1 when enabled"));
589 }
590 docWriter->setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);
591 if (infoStream) {
592 message(L"setMaxBufferedDeleteTerms " + StringUtils::toString(maxBufferedDeleteTerms));
593 }
594 }
595
getMaxBufferedDeleteTerms()596 int32_t IndexWriter::getMaxBufferedDeleteTerms() {
597 ensureOpen();
598 return docWriter->getMaxBufferedDeleteTerms();
599 }
600
setMergeFactor(int32_t mergeFactor)601 void IndexWriter::setMergeFactor(int32_t mergeFactor) {
602 getLogMergePolicy()->setMergeFactor(mergeFactor);
603 }
604
getMergeFactor()605 int32_t IndexWriter::getMergeFactor() {
606 return getLogMergePolicy()->getMergeFactor();
607 }
608
setDefaultInfoStream(const InfoStreamPtr & infoStream)609 void IndexWriter::setDefaultInfoStream(const InfoStreamPtr& infoStream) {
610 IndexWriter::defaultInfoStream = infoStream;
611 }
612
getDefaultInfoStream()613 InfoStreamPtr IndexWriter::getDefaultInfoStream() {
614 return IndexWriter::defaultInfoStream;
615 }
616
setInfoStream(const InfoStreamPtr & infoStream)617 void IndexWriter::setInfoStream(const InfoStreamPtr& infoStream) {
618 ensureOpen();
619 setMessageID(infoStream);
620 docWriter->setInfoStream(infoStream);
621 deleter->setInfoStream(infoStream);
622 messageState();
623 }
624
messageState()625 void IndexWriter::messageState() {
626 if (infoStream) {
627 message(L"ramBufferSizeMB=" + StringUtils::toString(docWriter->getRAMBufferSizeMB()) +
628 L" maxBufferedDocs=" + StringUtils::toString(docWriter->getMaxBufferedDocs()) +
629 L" maxBuffereDeleteTerms=" + StringUtils::toString(docWriter->getMaxBufferedDeleteTerms()) +
630 L" maxFieldLength=" + StringUtils::toString(maxFieldLength) +
631 L" index=" + segString());
632 }
633 }
634
getInfoStream()635 InfoStreamPtr IndexWriter::getInfoStream() {
636 ensureOpen();
637 return infoStream;
638 }
639
verbose()640 bool IndexWriter::verbose() {
641 return infoStream.get() != NULL;
642 }
643
setWriteLockTimeout(int64_t writeLockTimeout)644 void IndexWriter::setWriteLockTimeout(int64_t writeLockTimeout) {
645 ensureOpen();
646 this->writeLockTimeout = writeLockTimeout;
647 }
648
getWriteLockTimeout()649 int64_t IndexWriter::getWriteLockTimeout() {
650 ensureOpen();
651 return writeLockTimeout;
652 }
653
setDefaultWriteLockTimeout(int64_t writeLockTimeout)654 void IndexWriter::setDefaultWriteLockTimeout(int64_t writeLockTimeout) {
655 IndexWriter::WRITE_LOCK_TIMEOUT = writeLockTimeout;
656 }
657
getDefaultWriteLockTimeout()658 int64_t IndexWriter::getDefaultWriteLockTimeout() {
659 return IndexWriter::WRITE_LOCK_TIMEOUT;
660 }
661
close()662 void IndexWriter::close() {
663 close(true);
664 }
665
close(bool waitForMerges)666 void IndexWriter::close(bool waitForMerges) {
667 // Ensure that only one thread actually gets to do the closing
668 if (shouldClose()) {
669 // If any methods have hit std::bad_alloc, then abort on close, in case the internal state of IndexWriter
670 // or DocumentsWriter is corrupt
671 if (hitOOM) {
672 rollbackInternal();
673 } else {
674 closeInternal(waitForMerges);
675 }
676 }
677 }
678
shouldClose()679 bool IndexWriter::shouldClose() {
680 SyncLock syncLock(this);
681 while (true) {
682 if (!closed) {
683 if (!closing) {
684 closing = true;
685 return true;
686 } else {
687 // Another thread is presently trying to close; wait until it finishes one way (closes
688 // successfully) or another (fails to close)
689 doWait();
690 }
691 } else {
692 return false;
693 }
694 }
695 }
696
closeInternal(bool waitForMerges)697 void IndexWriter::closeInternal(bool waitForMerges) {
698 docWriter->pauseAllThreads();
699
700 LuceneException finally;
701 try {
702 if (infoStream) {
703 message(L"now flush at close");
704 }
705
706 docWriter->close();
707
708 // Only allow a new merge to be triggered if we are going to wait for merges
709 if (!hitOOM) {
710 flush(waitForMerges, true, true);
711 }
712
713 // Give merge scheduler last chance to run, in case any pending merges are waiting
714 if (waitForMerges) {
715 mergeScheduler->merge(shared_from_this());
716 }
717
718 mergePolicy->close();
719
720 finishMerges(waitForMerges);
721 stopMerges = true;
722
723 mergeScheduler->close();
724
725 if (infoStream) {
726 message(L"now call final commit()");
727 }
728
729 if (!hitOOM) {
730 commit(0);
731 }
732
733 if (infoStream) {
734 message(L"at close: " + segString());
735 }
736
737 {
738 SyncLock syncLock(this);
739 readerPool->close();
740 docWriter.reset();
741 deleter->close();
742 }
743
744 if (writeLock) {
745 writeLock->release(); // release write lock
746 writeLock.reset();
747 }
748
749 {
750 SyncLock syncLock(this);
751 closed = true;
752 }
753 } catch (std::bad_alloc& oom) {
754 finally = handleOOM(oom, L"closeInternal");
755 } catch (LuceneException& e) {
756 finally = e;
757 }
758 {
759 SyncLock syncLock(this);
760 closing = false;
761 notifyAll();
762 if (!closed) {
763 if (docWriter) {
764 docWriter->resumeAllThreads();
765 }
766 if (infoStream) {
767 message(L"hit exception while closing");
768 }
769 }
770 }
771 finally.throwException();
772 }
773
flushDocStores()774 bool IndexWriter::flushDocStores() {
775 SyncLock syncLock(this);
776
777 if (infoStream) {
778 message(L"flushDocStores segment=" + docWriter->getDocStoreSegment());
779 }
780
781 bool useCompoundDocStore = false;
782
783 if (infoStream) {
784 message(L"closeDocStores segment=" + docWriter->getDocStoreSegment());
785 }
786
787 String docStoreSegment;
788
789 bool success = false;
790 LuceneException finally;
791 try {
792 docStoreSegment = docWriter->closeDocStore();
793 success = true;
794 } catch (LuceneException& e) {
795 finally = e;
796 }
797 if (!success && infoStream) {
798 message(L"hit exception closing doc store segment");
799 }
800 finally.throwException();
801
802 if (infoStream) {
803 message(L"flushDocStores files=" + StringUtils::toString(docWriter->closedFiles()));
804 }
805
806 useCompoundDocStore = mergePolicy->useCompoundDocStore(segmentInfos);
807 HashSet<String> closedFiles(docWriter->closedFiles());
808
809 if (useCompoundDocStore && !docStoreSegment.empty() && !closedFiles.empty()) {
810 // Now build compound doc store file
811 if (infoStream) {
812 message(L"create compound file " + docStoreSegment + L"." + IndexFileNames::COMPOUND_FILE_STORE_EXTENSION());
813 }
814
815 success = false;
816
817 int32_t numSegments = segmentInfos->size();
818 String compoundFileName(docStoreSegment + L"." + IndexFileNames::COMPOUND_FILE_STORE_EXTENSION());
819
820 try {
821 CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, compoundFileName));
822 for (HashSet<String>::iterator file = closedFiles.begin(); file != closedFiles.end(); ++file) {
823 cfsWriter->addFile(*file);
824 }
825
826 // Perform the merge
827 cfsWriter->close();
828 success = true;
829 } catch (LuceneException& e) {
830 finally = e;
831 }
832
833 if (!success) {
834 if (infoStream) {
835 message(L"hit exception building compound file doc store for segment " + docStoreSegment);
836 }
837 deleter->deleteFile(compoundFileName);
838 docWriter->abort();
839 }
840 finally.throwException();
841
842 for (int32_t i = 0; i < numSegments; ++i) {
843 SegmentInfoPtr si(segmentInfos->info(i));
844 if (si->getDocStoreOffset() != -1 && si->getDocStoreSegment() == docStoreSegment) {
845 si->setDocStoreIsCompoundFile(true);
846 }
847 }
848
849 checkpoint();
850
851 // In case the files we just merged into a CFS were not previously checkpointed
852 deleter->deleteNewFiles(docWriter->closedFiles());
853 }
854
855 return useCompoundDocStore;
856 }
857
getDirectory()858 DirectoryPtr IndexWriter::getDirectory() {
859 ensureOpen(false); // Pass false because the flush during closing calls getDirectory
860 return directory;
861 }
862
getAnalyzer()863 AnalyzerPtr IndexWriter::getAnalyzer() {
864 ensureOpen();
865 return analyzer;
866 }
867
maxDoc()868 int32_t IndexWriter::maxDoc() {
869 SyncLock syncLock(this);
870 int32_t count = docWriter ? docWriter->getNumDocsInRAM() : 0;
871 for (int32_t i = 0; i < segmentInfos->size(); ++i) {
872 count += segmentInfos->info(i)->docCount;
873 }
874 return count;
875 }
876
numDocs()877 int32_t IndexWriter::numDocs() {
878 SyncLock syncLock(this);
879 int32_t count = docWriter ? docWriter->getNumDocsInRAM() : 0;
880 for (int32_t i = 0; i < segmentInfos->size(); ++i) {
881 SegmentInfoPtr info(segmentInfos->info(i));
882 count += info->docCount - info->getDelCount();
883 }
884 return count;
885 }
886
hasDeletions()887 bool IndexWriter::hasDeletions() {
888 SyncLock syncLock(this);
889 ensureOpen();
890 if (docWriter->hasDeletes()) {
891 return true;
892 }
893 for (int32_t i = 0; i < segmentInfos->size(); ++i) {
894 if (segmentInfos->info(i)->hasDeletions()) {
895 return true;
896 }
897 }
898 return false;
899 }
900
addDocument(const DocumentPtr & doc)901 void IndexWriter::addDocument(const DocumentPtr& doc) {
902 addDocument(doc, analyzer);
903 }
904
addDocument(const DocumentPtr & doc,const AnalyzerPtr & analyzer)905 void IndexWriter::addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
906 ensureOpen();
907 bool doFlush = false;
908 bool success = false;
909 try {
910 LuceneException finally;
911 try {
912 doFlush = docWriter->addDocument(doc, analyzer);
913 success = true;
914 } catch (LuceneException& e) {
915 finally = e;
916 }
917 if (!success) {
918 if (infoStream) {
919 message(L"hit exception adding document");
920 }
921 {
922 SyncLock syncLock(this);
923 // If docWriter has some aborted files that were never incref'd, then we clean them up here
924 if (docWriter) {
925 HashSet<String> files(docWriter->abortedFiles());
926 if (files) {
927 deleter->deleteNewFiles(files);
928 }
929 }
930 }
931 }
932 finally.throwException();
933 if (doFlush) {
934 flush(true, false, false);
935 }
936 } catch (std::bad_alloc& oom) {
937 boost::throw_exception(handleOOM(oom, L"addDocument"));
938 }
939 }
940
deleteDocuments(const TermPtr & term)941 void IndexWriter::deleteDocuments(const TermPtr& term) {
942 ensureOpen();
943 try {
944 bool doFlush = docWriter->bufferDeleteTerm(term);
945 if (doFlush) {
946 flush(true, false, false);
947 }
948 } catch (std::bad_alloc& oom) {
949 boost::throw_exception(handleOOM(oom, L"deleteDocuments(Term)"));
950 }
951 }
952
deleteDocuments(Collection<TermPtr> terms)953 void IndexWriter::deleteDocuments(Collection<TermPtr> terms) {
954 ensureOpen();
955 try {
956 bool doFlush = docWriter->bufferDeleteTerms(terms);
957 if (doFlush) {
958 flush(true, false, false);
959 }
960 } catch (std::bad_alloc& oom) {
961 boost::throw_exception(handleOOM(oom, L"deleteDocuments(VectorTerm)"));
962 }
963 }
964
deleteDocuments(const QueryPtr & query)965 void IndexWriter::deleteDocuments(const QueryPtr& query) {
966 ensureOpen();
967 bool doFlush = docWriter->bufferDeleteQuery(query);
968 if (doFlush) {
969 flush(true, false, false);
970 }
971 }
972
deleteDocuments(Collection<QueryPtr> queries)973 void IndexWriter::deleteDocuments(Collection<QueryPtr> queries) {
974 ensureOpen();
975 bool doFlush = docWriter->bufferDeleteQueries(queries);
976 if (doFlush) {
977 flush(true, false, false);
978 }
979 }
980
updateDocument(const TermPtr & term,const DocumentPtr & doc)981 void IndexWriter::updateDocument(const TermPtr& term, const DocumentPtr& doc) {
982 ensureOpen();
983 updateDocument(term, doc, getAnalyzer());
984 }
985
updateDocument(const TermPtr & term,const DocumentPtr & doc,const AnalyzerPtr & analyzer)986 void IndexWriter::updateDocument(const TermPtr& term, const DocumentPtr& doc, const AnalyzerPtr& analyzer) {
987 ensureOpen();
988 try {
989 bool doFlush = false;
990 bool success = false;
991 LuceneException finally;
992 try {
993 doFlush = docWriter->updateDocument(term, doc, analyzer);
994 success = true;
995 } catch (LuceneException& e) {
996 finally = e;
997 }
998 if (!success) {
999 if (infoStream) {
1000 message(L"hit exception updating document");
1001 }
1002
1003 {
1004 SyncLock syncLock(this);
1005 // If docWriter has some aborted files that were never incref'd, then we clean them up here
1006 if (docWriter) {
1007 HashSet<String> files(docWriter->abortedFiles());
1008 if (files) {
1009 deleter->deleteNewFiles(files);
1010 }
1011 }
1012 }
1013 }
1014 finally.throwException();
1015 if (doFlush) {
1016 flush(true, false, false);
1017 }
1018 } catch (std::bad_alloc& oom) {
1019 boost::throw_exception(handleOOM(oom, L"updateDocument"));
1020 }
1021 }
1022
getSegmentCount()1023 int32_t IndexWriter::getSegmentCount() {
1024 SyncLock syncLock(this);
1025 return segmentInfos->size();
1026 }
1027
getNumBufferedDocuments()1028 int32_t IndexWriter::getNumBufferedDocuments() {
1029 SyncLock syncLock(this);
1030 return docWriter->getNumDocsInRAM();
1031 }
1032
getDocCount(int32_t i)1033 int32_t IndexWriter::getDocCount(int32_t i) {
1034 SyncLock syncLock(this);
1035 return (i >= 0 && i < segmentInfos->size()) ? segmentInfos->info(i)->docCount : -1;
1036 }
1037
getFlushCount()1038 int32_t IndexWriter::getFlushCount() {
1039 SyncLock syncLock(this);
1040 return flushCount;
1041 }
1042
getFlushDeletesCount()1043 int32_t IndexWriter::getFlushDeletesCount() {
1044 SyncLock syncLock(this);
1045 return flushDeletesCount;
1046 }
1047
newSegmentName()1048 String IndexWriter::newSegmentName() {
1049 // Cannot synchronize on IndexWriter because that causes deadlock
1050 SyncLock segmentLock(segmentInfos);
1051
1052 // Important to increment changeCount so that the segmentInfos is written on close.
1053 // Otherwise we could close, re-open and re-return the same segment name that was
1054 // previously returned which can cause problems at least with ConcurrentMergeScheduler.
1055 ++changeCount;
1056 return L"_" + StringUtils::toString(segmentInfos->counter++, StringUtils::CHARACTER_MAX_RADIX);
1057 }
1058
optimize()1059 void IndexWriter::optimize() {
1060 optimize(true);
1061 }
1062
optimize(int32_t maxNumSegments)1063 void IndexWriter::optimize(int32_t maxNumSegments) {
1064 optimize(maxNumSegments, true);
1065 }
1066
optimize(bool doWait)1067 void IndexWriter::optimize(bool doWait) {
1068 optimize(1, doWait);
1069 }
1070
optimize(int32_t maxNumSegments,bool doWait)1071 void IndexWriter::optimize(int32_t maxNumSegments, bool doWait) {
1072 ensureOpen();
1073
1074 if (maxNumSegments < 1) {
1075 boost::throw_exception(IllegalArgumentException(L"maxNumSegments must be >= 1; got " + StringUtils::toString(maxNumSegments)));
1076 }
1077
1078 if (infoStream) {
1079 message(L"optimize: index now " + segString());
1080 }
1081
1082 flush(true, false, true);
1083
1084 {
1085 SyncLock syncLock(this);
1086
1087 resetMergeExceptions();
1088 segmentsToOptimize.clear();
1089 optimizeMaxNumSegments = maxNumSegments;
1090 int32_t numSegments = segmentInfos->size();
1091 for (int32_t i = 0; i < numSegments; ++i) {
1092 segmentsToOptimize.add(segmentInfos->info(i));
1093 }
1094
1095 // Now mark all pending & running merges as optimize merge
1096 for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1097 (*merge)->optimize = true;
1098 (*merge)->maxNumSegmentsOptimize = maxNumSegments;
1099 }
1100
1101 for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1102 (*merge)->optimize = true;
1103 (*merge)->maxNumSegmentsOptimize = maxNumSegments;
1104 }
1105 }
1106
1107 maybeMerge(maxNumSegments, true);
1108
1109 if (doWait) {
1110 {
1111 SyncLock syncLock(this);
1112 while (true) {
1113 if (hitOOM) {
1114 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete optimize"));
1115 }
1116
1117 if (!mergeExceptions.empty()) {
1118 // Forward any exceptions in background merge threads to the current thread
1119 for (Collection<OneMergePtr>::iterator merge = mergeExceptions.begin(); merge != mergeExceptions.end(); ++merge) {
1120 if ((*merge)->optimize) {
1121 LuceneException err = (*merge)->getException();
1122 if (!err.isNull()) {
1123 boost::throw_exception(IOException(L"background merge hit exception: " + (*merge)->segString(directory)));
1124 }
1125 }
1126 }
1127 }
1128
1129 if (optimizeMergesPending()) {
1130 IndexWriter::doWait();
1131 } else {
1132 break;
1133 }
1134 }
1135 }
1136
1137 // If close is called while we are still running, throw an exception so the calling thread will know the
1138 // optimize did not complete
1139 ensureOpen();
1140 }
1141
1142 // NOTE: in the ConcurrentMergeScheduler case, when doWait is false, we can return immediately while background
1143 // threads accomplish the optimization
1144 }
1145
optimizeMergesPending()1146 bool IndexWriter::optimizeMergesPending() {
1147 SyncLock syncLock(this);
1148
1149 for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1150 if ((*merge)->optimize) {
1151 return true;
1152 }
1153 }
1154
1155 for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1156 if ((*merge)->optimize) {
1157 return true;
1158 }
1159 }
1160
1161 return false;
1162 }
1163
expungeDeletes(bool doWait)1164 void IndexWriter::expungeDeletes(bool doWait) {
1165 ensureOpen();
1166
1167 if (infoStream) {
1168 message(L"expungeDeletes: index now " + segString());
1169 }
1170
1171 MergeSpecificationPtr spec;
1172
1173 {
1174 SyncLock syncLock(this);
1175 spec = mergePolicy->findMergesToExpungeDeletes(segmentInfos);
1176 for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1177 registerMerge(*merge);
1178 }
1179 }
1180
1181 mergeScheduler->merge(shared_from_this());
1182
1183 if (doWait) {
1184 {
1185 SyncLock syncLock(this);
1186 bool running = true;
1187 while (running) {
1188 if (hitOOM) {
1189 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete expungeDeletes"));
1190 }
1191
1192 // Check each merge that MergePolicy asked us to do, to see if any of them are still running and
1193 // if any of them have hit an exception.
1194 running = false;
1195 for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1196 if (pendingMerges.contains(*merge) || runningMerges.contains(*merge)) {
1197 running = true;
1198 }
1199 LuceneException err = (*merge)->getException();
1200 if (!err.isNull()) {
1201 boost::throw_exception(IOException(L"background merge hit exception: " + (*merge)->segString(directory)));
1202 }
1203 }
1204
1205 // If any of our merges are still running, wait
1206 if (running) {
1207 IndexWriter::doWait();
1208 }
1209 }
1210 }
1211 }
1212
1213 // NOTE: in the ConcurrentMergeScheduler case, when doWait is false, we can return immediately while background
1214 // threads accomplish the optimization
1215 }
1216
expungeDeletes()1217 void IndexWriter::expungeDeletes() {
1218 expungeDeletes(true);
1219 }
1220
maybeMerge()1221 void IndexWriter::maybeMerge() {
1222 maybeMerge(false);
1223 }
1224
maybeMerge(bool optimize)1225 void IndexWriter::maybeMerge(bool optimize) {
1226 maybeMerge(1, optimize);
1227 }
1228
maybeMerge(int32_t maxNumSegmentsOptimize,bool optimize)1229 void IndexWriter::maybeMerge(int32_t maxNumSegmentsOptimize, bool optimize) {
1230 updatePendingMerges(maxNumSegmentsOptimize, optimize);
1231 mergeScheduler->merge(shared_from_this());
1232 }
1233
updatePendingMerges(int32_t maxNumSegmentsOptimize,bool optimize)1234 void IndexWriter::updatePendingMerges(int32_t maxNumSegmentsOptimize, bool optimize) {
1235 SyncLock syncLock(this);
1236 BOOST_ASSERT(!optimize || maxNumSegmentsOptimize > 0);
1237
1238 if (stopMerges) {
1239 return;
1240 }
1241
1242 // Do not start new merges if we've hit std::bad_alloc
1243 if (hitOOM) {
1244 return;
1245 }
1246
1247 MergeSpecificationPtr spec;
1248
1249 if (optimize) {
1250 spec = mergePolicy->findMergesForOptimize(segmentInfos, maxNumSegmentsOptimize, segmentsToOptimize);
1251
1252 if (spec) {
1253 for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1254 (*merge)->optimize = true;
1255 (*merge)->maxNumSegmentsOptimize = maxNumSegmentsOptimize;
1256 }
1257 }
1258 } else {
1259 spec = mergePolicy->findMerges(segmentInfos);
1260 }
1261
1262 if (spec) {
1263 for (Collection<OneMergePtr>::iterator merge = spec->merges.begin(); merge != spec->merges.end(); ++merge) {
1264 registerMerge(*merge);
1265 }
1266 }
1267 }
1268
getNextMerge()1269 OneMergePtr IndexWriter::getNextMerge() {
1270 SyncLock syncLock(this);
1271 if (pendingMerges.empty()) {
1272 return OneMergePtr();
1273 } else {
1274 // Advance the merge from pending to running
1275 OneMergePtr merge(pendingMerges.removeFirst());
1276 runningMerges.add(merge);
1277 return merge;
1278 }
1279 }
1280
getNextExternalMerge()1281 OneMergePtr IndexWriter::getNextExternalMerge() {
1282 SyncLock syncLock(this);
1283 if (pendingMerges.empty()) {
1284 return OneMergePtr();
1285 } else {
1286 for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1287 if ((*merge)->isExternal) {
1288 // Advance the merge from pending to running
1289 OneMergePtr running(*merge);
1290 runningMerges.add(*merge);
1291 pendingMerges.remove(merge);
1292 return running;
1293 }
1294 }
1295 }
1296
1297 // All existing merges do not involve external segments
1298 return OneMergePtr();
1299 }
1300
startTransaction(bool haveReadLock)1301 void IndexWriter::startTransaction(bool haveReadLock) {
1302 SyncLock syncLock(this);
1303 bool success = false;
1304 LuceneException finally;
1305 try {
1306 if (infoStream) {
1307 message(L"now start transaction");
1308 }
1309
1310 BOOST_ASSERT(docWriter->getNumBufferedDeleteTerms() == 0); // calling startTransaction with buffered delete terms not supported
1311 BOOST_ASSERT(docWriter->getNumDocsInRAM() == 0); // calling startTransaction with buffered documents not supported
1312
1313 ensureOpen();
1314
1315 // If a transaction is trying to roll back (because addIndexes hit an exception) then wait here until that's done
1316 while (stopMerges) {
1317 doWait();
1318 }
1319
1320 success = true;
1321 } catch (LuceneException& e) {
1322 finally = e;
1323 }
1324
1325 // Release the write lock if our caller held it, on hitting an exception
1326 if (!success && haveReadLock) {
1327 releaseRead();
1328 }
1329 finally.throwException();
1330
1331 if (haveReadLock) {
1332 upgradeReadToWrite();
1333 } else {
1334 acquireWrite();
1335 }
1336
1337 success = false;
1338
1339 try {
1340 localRollbackSegmentInfos = boost::dynamic_pointer_cast<SegmentInfos>(segmentInfos->clone());
1341
1342 BOOST_ASSERT(!hasExternalSegments());
1343
1344 localFlushedDocCount = docWriter->getFlushedDocCount();
1345
1346 // We must "protect" our files at this point from deletion in case we need to rollback
1347 deleter->incRef(segmentInfos, false);
1348
1349 success = true;
1350 } catch (LuceneException& e) {
1351 finally = e;
1352 }
1353
1354 if (!success) {
1355 finishAddIndexes();
1356 }
1357 finally.throwException();
1358 }
1359
rollbackTransaction()1360 void IndexWriter::rollbackTransaction() {
1361 SyncLock syncLock(this);
1362
1363 if (infoStream) {
1364 message(L"now rollback transaction");
1365 }
1366
1367 if (docWriter) {
1368 docWriter->setFlushedDocCount(localFlushedDocCount);
1369 }
1370
1371 // Must finish merges before rolling back segmentInfos so merges don't hit exceptions on trying to commit
1372 // themselves, don't get files deleted out from under them, etc.
1373 finishMerges(false);
1374
1375 // Keep the same segmentInfos instance but replace all of its SegmentInfo instances. This is so the next
1376 // attempt to commit using this instance of IndexWriter will always write to a new generation ("write once").
1377 segmentInfos->clear();
1378 segmentInfos->addAll(localRollbackSegmentInfos);
1379 localRollbackSegmentInfos.reset();
1380
1381 // This must come after we rollback segmentInfos, so that if a commit() kicks off it does not see the
1382 // segmentInfos with external segments.
1383 finishAddIndexes();
1384
1385 // Ask deleter to locate unreferenced files we had created & remove them
1386 deleter->checkpoint(segmentInfos, false);
1387
1388 // Remove the incRef we did in startTransaction
1389 deleter->decRef(segmentInfos);
1390
1391 // Also ask deleter to remove any newly created files that were never incref'd; this "garbage" is created
1392 // when a merge kicks off but aborts part way through before it had a chance to incRef the files it had
1393 // partially created
1394 deleter->refresh();
1395
1396 notifyAll();
1397
1398 BOOST_ASSERT(!hasExternalSegments());
1399 }
1400
commitTransaction()1401 void IndexWriter::commitTransaction() {
1402 SyncLock syncLock(this);
1403
1404 if (infoStream) {
1405 message(L"now commit transaction");
1406 }
1407
1408 // Give deleter a chance to remove files now
1409 checkpoint();
1410
1411 // Remove the incRef we did in startTransaction.
1412 deleter->decRef(localRollbackSegmentInfos);
1413
1414 localRollbackSegmentInfos.reset();
1415
1416 BOOST_ASSERT(!hasExternalSegments());
1417
1418 finishAddIndexes();
1419 }
1420
rollback()1421 void IndexWriter::rollback() {
1422 ensureOpen();
1423
1424 // Ensure that only one thread actually gets to do the closing
1425 if (shouldClose()) {
1426 rollbackInternal();
1427 }
1428 }
1429
rollbackInternal()1430 void IndexWriter::rollbackInternal() {
1431 bool success = false;
1432
1433 if (infoStream) {
1434 message(L"rollback");
1435 }
1436
1437 docWriter->pauseAllThreads();
1438 LuceneException finally;
1439 try {
1440 finishMerges(false);
1441
1442 // Must pre-close these two, in case they increment changeCount so that we can then set it to false before
1443 // calling closeInternal
1444 mergePolicy->close();
1445 mergeScheduler->close();
1446
1447 {
1448 SyncLock syncLock(this);
1449
1450 if (pendingCommit) {
1451 pendingCommit->rollbackCommit(directory);
1452 deleter->decRef(pendingCommit);
1453 pendingCommit.reset();
1454 notifyAll();
1455 }
1456
1457 // Keep the same segmentInfos instance but replace all of its SegmentInfo instances. This is so the next
1458 // attempt to commit using this instance of IndexWriter will always write to a new generation ("write once").
1459 segmentInfos->clear();
1460 segmentInfos->addAll(rollbackSegmentInfos);
1461
1462 BOOST_ASSERT(!hasExternalSegments());
1463
1464 docWriter->abort();
1465
1466 bool test = testPoint(L"rollback before checkpoint");
1467 BOOST_ASSERT(test);
1468
1469 // Ask deleter to locate unreferenced files & remove them
1470 deleter->checkpoint(segmentInfos, false);
1471 deleter->refresh();
1472 }
1473
1474 // Don't bother saving any changes in our segmentInfos
1475 readerPool->clear(SegmentInfosPtr());
1476
1477 lastCommitChangeCount = changeCount;
1478
1479 success = true;
1480 } catch (std::bad_alloc& oom) {
1481 finally = handleOOM(oom, L"rollbackInternal");
1482 } catch (LuceneException& e) {
1483 finally = e;
1484 }
1485 {
1486 SyncLock syncLock(this);
1487
1488 if (!success) {
1489 docWriter->resumeAllThreads();
1490 closing = false;
1491 notifyAll();
1492 if (infoStream) {
1493 message(L"hit exception during rollback");
1494 }
1495 }
1496 }
1497 finally.throwException();
1498
1499 closeInternal(false);
1500 }
1501
deleteAll()1502 void IndexWriter::deleteAll() {
1503 SyncLock syncLock(this);
1504 bool success = false;
1505 docWriter->pauseAllThreads();
1506 LuceneException finally;
1507 try {
1508 // Abort any running merges
1509 finishMerges(false);
1510
1511 // Remove any buffered docs
1512 docWriter->abort();
1513 docWriter->setFlushedDocCount(0);
1514
1515 // Remove all segments
1516 segmentInfos->clear();
1517
1518 // Ask deleter to locate unreferenced files & remove them
1519 deleter->checkpoint(segmentInfos, false);
1520 deleter->refresh();
1521
1522 // Don't bother saving any changes in our segmentInfos
1523 readerPool->clear(SegmentInfosPtr());
1524
1525 // Mark that the index has changed
1526 ++changeCount;
1527
1528 success = true;
1529 } catch (std::bad_alloc& oom) {
1530 finally = handleOOM(oom, L"deleteAll");
1531 } catch (LuceneException& e) {
1532 finally = e;
1533 }
1534
1535 docWriter->resumeAllThreads();
1536 if (!success && infoStream) {
1537 message(L"hit exception during deleteAll");
1538 }
1539
1540 finally.throwException();
1541 }
1542
finishMerges(bool waitForMerges)1543 void IndexWriter::finishMerges(bool waitForMerges) {
1544 SyncLock syncLock(this);
1545 if (!waitForMerges) {
1546 stopMerges = true;
1547
1548 // Abort all pending and running merges
1549 for (Collection<OneMergePtr>::iterator merge = pendingMerges.begin(); merge != pendingMerges.end(); ++merge) {
1550 if (infoStream) {
1551 message(L"now abort pending merge " + (*merge)->segString(directory));
1552 }
1553 (*merge)->abort();
1554 mergeFinish(*merge);
1555 }
1556 pendingMerges.clear();
1557
1558 for (SetOneMerge::iterator merge = runningMerges.begin(); merge != runningMerges.end(); ++merge) {
1559 if (infoStream) {
1560 message(L"now abort running merge " + (*merge)->segString(directory));
1561 }
1562 (*merge)->abort();
1563 }
1564
1565 // Ensure any running addIndexes finishes. It's fine if a new one attempts to start because its merges
1566 // will quickly see the stopMerges == true and abort.
1567 acquireRead();
1568 releaseRead();
1569
1570 // These merges periodically check whether they have been aborted, and stop if so. We wait here to make
1571 // sure they all stop. It should not take very long because the merge threads periodically check if they
1572 // are aborted.
1573 while (!runningMerges.empty()) {
1574 if (infoStream) {
1575 message(L"now wait for " + StringUtils::toString(runningMerges.size()) + L" running merge to abort");
1576 }
1577 doWait();
1578 }
1579
1580 stopMerges = false;
1581 notifyAll();
1582
1583 BOOST_ASSERT(mergingSegments.empty());
1584
1585 if (infoStream) {
1586 message(L"all running merges have aborted");
1587 }
1588 } else {
1589 // waitForMerges() will ensure any running addIndexes finishes. It's fine if a new one attempts to start
1590 // because from our caller above the call will see that we are in the process of closing, and will throw
1591 // an AlreadyClosed exception.
1592 IndexWriter::waitForMerges();
1593 }
1594 }
1595
waitForMerges()1596 void IndexWriter::waitForMerges() {
1597 SyncLock syncLock(this);
1598 // Ensure any running addIndexes finishes.
1599 acquireRead();
1600 releaseRead();
1601
1602 while (!pendingMerges.empty() || !runningMerges.empty()) {
1603 doWait();
1604 }
1605
1606 // sanity check
1607 BOOST_ASSERT(mergingSegments.empty());
1608 }
1609
checkpoint()1610 void IndexWriter::checkpoint() {
1611 SyncLock syncLock(this);
1612 ++changeCount;
1613 deleter->checkpoint(segmentInfos, false);
1614 }
1615
finishAddIndexes()1616 void IndexWriter::finishAddIndexes() {
1617 releaseWrite();
1618 }
1619
blockAddIndexes(bool includePendingClose)1620 void IndexWriter::blockAddIndexes(bool includePendingClose) {
1621 acquireRead();
1622
1623 bool success = false;
1624 LuceneException finally;
1625 try {
1626 // Make sure we are still open since we could have waited quite a while for last addIndexes to finish
1627 ensureOpen(includePendingClose);
1628 success = true;
1629 } catch (LuceneException& e) {
1630 finally = e;
1631 }
1632
1633 if (!success) {
1634 releaseRead();
1635 }
1636 finally.throwException();
1637 }
1638
resumeAddIndexes()1639 void IndexWriter::resumeAddIndexes() {
1640 releaseRead();
1641 }
1642
resetMergeExceptions()1643 void IndexWriter::resetMergeExceptions() {
1644 SyncLock syncLock(this);
1645 mergeExceptions.clear();
1646 ++mergeGen;
1647 }
1648
noDupDirs(Collection<DirectoryPtr> dirs)1649 void IndexWriter::noDupDirs(Collection<DirectoryPtr> dirs) {
1650 Collection<DirectoryPtr> dups(Collection<DirectoryPtr>::newInstance());
1651
1652 for (Collection<DirectoryPtr>::iterator dir = dirs.begin(); dir != dirs.end(); ++dir) {
1653 for (Collection<DirectoryPtr>::iterator dup = dups.begin(); dup != dups.end(); ++dup) {
1654 if (*dup == *dir) {
1655 boost::throw_exception(IllegalArgumentException(L"Directory " + (*dir)->getLockID() + L" appears more than once"));
1656 }
1657 }
1658 if (*dir == directory) {
1659 boost::throw_exception(IllegalArgumentException(L"Cannot add directory to itself"));
1660 }
1661 dups.add(*dir);
1662 }
1663 }
1664
addIndexesNoOptimize(Collection<DirectoryPtr> dirs)1665 void IndexWriter::addIndexesNoOptimize(Collection<DirectoryPtr> dirs) {
1666 ensureOpen();
1667
1668 noDupDirs(dirs);
1669
1670 // Do not allow add docs or deletes while we are running
1671 docWriter->pauseAllThreads();
1672
1673 LuceneException finally;
1674 try {
1675 if (infoStream) {
1676 message(L"flush at addIndexesNoOptimize");
1677 }
1678 flush(true, false, true);
1679
1680 bool success = false;
1681
1682 startTransaction(false);
1683
1684 try {
1685 int32_t docCount = 0;
1686
1687 {
1688 SyncLock syncLock(this);
1689 ensureOpen();
1690
1691 for (Collection<DirectoryPtr>::iterator dir = dirs.begin(); dir != dirs.end(); ++dir) {
1692 if (directory == *dir) {
1693 // cannot add this index: segments may be deleted in merge before added
1694 boost::throw_exception(IllegalArgumentException(L"Cannot add this index to itself"));
1695 }
1696
1697 SegmentInfosPtr sis(newLucene<SegmentInfos>()); // read infos from dir
1698 sis->read(*dir);
1699
1700 for (int32_t j = 0; j < sis->size(); ++j) {
1701 SegmentInfoPtr info(sis->info(j));
1702 BOOST_ASSERT(!segmentInfos->contains(info));
1703 docCount += info->docCount;
1704 segmentInfos->add(info); // add each info
1705 }
1706 }
1707 }
1708
1709 // Notify DocumentsWriter that the flushed count just increased
1710 docWriter->updateFlushedDocCount(docCount);
1711
1712 maybeMerge();
1713
1714 ensureOpen();
1715
1716 // If after merging there remain segments in the index that are in a different directory, just copy these
1717 // over into our index. This is necessary (before finishing the transaction) to avoid leaving the index
1718 // in an unusable (inconsistent) state.
1719 resolveExternalSegments();
1720
1721 ensureOpen();
1722
1723 success = true;
1724 } catch (LuceneException& e) {
1725 finally = e;
1726 }
1727
1728 if (success) {
1729 commitTransaction();
1730 } else {
1731 rollbackTransaction();
1732 }
1733 } catch (std::bad_alloc& oom) {
1734 finally = handleOOM(oom, L"addIndexesNoOptimize");
1735 } catch (LuceneException& e) {
1736 finally = e;
1737 }
1738 if (docWriter) {
1739 docWriter->resumeAllThreads();
1740 }
1741 finally.throwException();
1742 }
1743
hasExternalSegments()1744 bool IndexWriter::hasExternalSegments() {
1745 return segmentInfos->hasExternalSegments(directory);
1746 }
1747
resolveExternalSegments()1748 void IndexWriter::resolveExternalSegments() {
1749 bool any = false;
1750 bool done = false;
1751
1752 while (!done) {
1753 SegmentInfoPtr info;
1754 OneMergePtr merge;
1755
1756 {
1757 SyncLock syncLock(this);
1758 if (stopMerges) {
1759 boost::throw_exception(MergeAbortedException(L"rollback() was called or addIndexes* hit an unhandled exception"));
1760 }
1761
1762 int32_t numSegments = segmentInfos->size();
1763
1764 done = true;
1765 for (int32_t i = 0; i < numSegments; ++i) {
1766 info = segmentInfos->info(i);
1767 if (info->dir != directory) {
1768 done = false;
1769 OneMergePtr newMerge(newLucene<OneMerge>(segmentInfos->range(i, i + 1), boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy) && getUseCompoundFile()));
1770
1771 // Returns true if no running merge conflicts with this one (and, records this merge as
1772 // pending), ie, this segment is not currently being merged
1773 if (registerMerge(newMerge)) {
1774 merge = newMerge;
1775
1776 // If this segment is not currently being merged, then advance it to running & run
1777 // the merge ourself (below)
1778 pendingMerges.remove(merge);
1779 runningMerges.add(merge);
1780 break;
1781 }
1782 }
1783 }
1784
1785 if (!done && !merge) {
1786 // We are not yet done (external segments still exist in segmentInfos), yet, all such segments
1787 // are currently "covered" by a pending or running merge. We now try to grab any pending merge
1788 // that involves external segments
1789 merge = getNextExternalMerge();
1790 }
1791
1792 if (!done && !merge) {
1793 // We are not yet done, and, all external segments fall under merges that the merge scheduler is
1794 // currently running. So, we now wait and check back to see if the merge has completed.
1795 doWait();
1796 }
1797 }
1798
1799 if (merge) {
1800 any = true;
1801 IndexWriter::merge(merge);
1802 }
1803 }
1804
1805 if (any) {
1806 // Sometimes, on copying an external segment over, more merges may become necessary
1807 mergeScheduler->merge(shared_from_this());
1808 }
1809 }
1810
addIndexes(Collection<IndexReaderPtr> readers)1811 void IndexWriter::addIndexes(Collection<IndexReaderPtr> readers) {
1812 ensureOpen();
1813
1814 // Do not allow add docs or deletes while we are running
1815 docWriter->pauseAllThreads();
1816
1817 // We must pre-acquire a read lock here (and upgrade to write lock in startTransaction below) so that no
1818 // other addIndexes is allowed to start up after we have flushed & optimized but before we then start our
1819 // transaction. This is because the merging below requires that only one segment is present in the index
1820 acquireRead();
1821
1822 LuceneException finally;
1823 try {
1824 SegmentInfoPtr info;
1825 String mergedName;
1826 SegmentMergerPtr merger;
1827
1828 bool success = false;
1829
1830 try {
1831 flush(true, false, true);
1832 optimize(); // start with zero or 1 seg
1833 success = true;
1834 } catch (LuceneException& e) {
1835 finally = e;
1836 }
1837
1838 // Take care to release the read lock if we hit an exception before starting the transaction
1839 if (!success) {
1840 releaseRead();
1841 }
1842 finally.throwException();
1843
1844 // true means we already have a read lock; if this call hits an exception it will release the write lock
1845 startTransaction(true);
1846
1847 try {
1848 mergedName = newSegmentName();
1849 merger = newLucene<SegmentMerger>(shared_from_this(), mergedName, OneMergePtr());
1850
1851 SegmentReaderPtr sReader;
1852
1853 {
1854 SyncLock syncLock(this);
1855 if (segmentInfos->size() == 1) { // add existing index, if any
1856 sReader = readerPool->get(segmentInfos->info(0), true, BufferedIndexInput::BUFFER_SIZE, -1);
1857 }
1858 }
1859
1860 success = false;
1861
1862 try {
1863 if (sReader) {
1864 merger->add(sReader);
1865 }
1866
1867 for (Collection<IndexReaderPtr>::iterator i = readers.begin(); i != readers.end(); ++i) {
1868 merger->add(*i);
1869 }
1870
1871 int32_t docCount = merger->merge(); // merge 'em
1872
1873 {
1874 SyncLock syncLock(this);
1875 segmentInfos->clear(); // pop old infos & add new
1876 info = newLucene<SegmentInfo>(mergedName, docCount, directory, false, true, -1, L"", false, merger->hasProx());
1877 setDiagnostics(info, L"addIndexes(Collection<IndexReaderPtr>)");
1878 segmentInfos->add(info);
1879 }
1880
1881 // Notify DocumentsWriter that the flushed count just increased
1882 docWriter->updateFlushedDocCount(docCount);
1883
1884 success = true;
1885 } catch (LuceneException& e) {
1886 finally = e;
1887 }
1888
1889 if (sReader) {
1890 readerPool->release(sReader);
1891 }
1892 } catch (LuceneException& e) {
1893 finally = e;
1894 }
1895
1896 if (!success) {
1897 if (infoStream) {
1898 message(L"hit exception in addIndexes during merge");
1899 }
1900 rollbackTransaction();
1901 } else {
1902 commitTransaction();
1903 }
1904
1905 finally.throwException();
1906
1907 if (boost::dynamic_pointer_cast<LogMergePolicy>(mergePolicy) && getUseCompoundFile()) {
1908 HashSet<String> files;
1909
1910 {
1911 SyncLock syncLock(this);
1912 // Must incRef our files so that if another thread is running merge/optimize, it doesn't delete our
1913 // segment's files before we have a change to finish making the compound file.
1914 if (segmentInfos->contains(info)) {
1915 files = info->files();
1916 deleter->incRef(files);
1917 }
1918 }
1919
1920 if (files) {
1921 success = false;
1922
1923 startTransaction(false);
1924
1925 try {
1926 merger->createCompoundFile(mergedName + L".cfs");
1927
1928 {
1929 SyncLock syncLock(this);
1930 info->setUseCompoundFile(true);
1931 }
1932
1933 success = true;
1934 } catch (LuceneException& e) {
1935 finally = e;
1936 }
1937
1938 {
1939 SyncLock syncLock(this);
1940 deleter->decRef(files);
1941 }
1942
1943 if (!success) {
1944 if (infoStream) {
1945 message(L"hit exception building compound file in addIndexes during merge");
1946 }
1947 rollbackTransaction();
1948 } else {
1949 commitTransaction();
1950 }
1951 }
1952 }
1953 } catch (std::bad_alloc& oom) {
1954 finally = handleOOM(oom, L"addIndexes(Collection<IndexReaderPtr>)");
1955 } catch (LuceneException& e) {
1956 finally = e;
1957 }
1958 if (docWriter) {
1959 docWriter->resumeAllThreads();
1960 }
1961 finally.throwException();
1962 }
1963
doAfterFlush()1964 void IndexWriter::doAfterFlush() {
1965 // override
1966 }
1967
doBeforeFlush()1968 void IndexWriter::doBeforeFlush() {
1969 // override
1970 }
1971
prepareCommit()1972 void IndexWriter::prepareCommit() {
1973 ensureOpen();
1974 prepareCommit(MapStringString());
1975 }
1976
prepareCommit(MapStringString commitUserData)1977 void IndexWriter::prepareCommit(MapStringString commitUserData) {
1978 if (hitOOM) {
1979 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot commit"));
1980 }
1981
1982 if (pendingCommit) {
1983 boost::throw_exception(IllegalStateException(L"prepareCommit was already called with no corresponding call to commit"));
1984 }
1985
1986 if (infoStream) {
1987 message(L"prepareCommit: flush");
1988 }
1989
1990 flush(true, true, true);
1991
1992 startCommit(0, commitUserData);
1993 }
1994
commit(int64_t sizeInBytes)1995 void IndexWriter::commit(int64_t sizeInBytes) {
1996 SyncLock messageLock(commitLock);
1997 startCommit(sizeInBytes, MapStringString());
1998 finishCommit();
1999 }
2000
commit()2001 void IndexWriter::commit() {
2002 commit(MapStringString());
2003 }
2004
commit(MapStringString commitUserData)2005 void IndexWriter::commit(MapStringString commitUserData) {
2006 ensureOpen();
2007
2008 if (infoStream) {
2009 message(L"commit: start");
2010 }
2011
2012 {
2013 SyncLock messageLock(commitLock);
2014
2015 if (infoStream) {
2016 message(L"commit: enter lock");
2017 }
2018
2019 if (!pendingCommit) {
2020 if (infoStream) {
2021 message(L"commit: now prepare");
2022 }
2023 prepareCommit(commitUserData);
2024 } else if (infoStream) {
2025 message(L"commit: already prepared");
2026 }
2027
2028 finishCommit();
2029 }
2030 }
2031
finishCommit()2032 void IndexWriter::finishCommit() {
2033 SyncLock syncLock(this);
2034 if (pendingCommit) {
2035 LuceneException finally;
2036 try {
2037 if (infoStream) {
2038 message(L"commit: pendingCommit != null");
2039 }
2040 pendingCommit->finishCommit(directory);
2041 if (infoStream) {
2042 message(L"commit: wrote segments file \"" + pendingCommit->getCurrentSegmentFileName() + L"\"");
2043 }
2044 lastCommitChangeCount = pendingCommitChangeCount;
2045 segmentInfos->updateGeneration(pendingCommit);
2046 segmentInfos->setUserData(pendingCommit->getUserData());
2047 setRollbackSegmentInfos(pendingCommit);
2048 deleter->checkpoint(pendingCommit, true);
2049 } catch (LuceneException& e) {
2050 finally = e;
2051 }
2052
2053 deleter->decRef(pendingCommit);
2054 pendingCommit.reset();
2055 notifyAll();
2056 finally.throwException();
2057 } else if (infoStream) {
2058 message(L"commit: pendingCommit == null; skip");
2059 }
2060
2061 if (infoStream) {
2062 message(L"commit: done");
2063 }
2064 }
2065
flush(bool triggerMerge,bool flushDocStores,bool flushDeletes)2066 void IndexWriter::flush(bool triggerMerge, bool flushDocStores, bool flushDeletes) {
2067 // We can be called during close, when closing = true, so we must pass false to ensureOpen
2068 ensureOpen(false);
2069 if (doFlush(flushDocStores, flushDeletes) && triggerMerge) {
2070 maybeMerge();
2071 }
2072 }
2073
doFlush(bool flushDocStores,bool flushDeletes)2074 bool IndexWriter::doFlush(bool flushDocStores, bool flushDeletes) {
2075 TestScope testScope(L"IndexWriter", L"doFlush");
2076 SyncLock syncLock(this);
2077 bool success = false;
2078 LuceneException finally;
2079 try {
2080 try {
2081 success = doFlushInternal(flushDocStores, flushDeletes);
2082 } catch (LuceneException& e) {
2083 finally = e;
2084 }
2085 if (docWriter->doBalanceRAM()) {
2086 docWriter->balanceRAM();
2087 }
2088 finally.throwException();
2089 } catch (LuceneException& e) {
2090 finally = e;
2091 }
2092 docWriter->clearFlushPending();
2093 finally.throwException();
2094 return success;
2095 }
2096
doFlushInternal(bool flushDocStores,bool flushDeletes)2097 bool IndexWriter::doFlushInternal(bool flushDocStores, bool flushDeletes) {
2098 SyncLock syncLock(this);
2099 if (hitOOM) {
2100 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot flush"));
2101 }
2102
2103 ensureOpen(false);
2104
2105 BOOST_ASSERT(testPoint(L"startDoFlush"));
2106
2107 doBeforeFlush();
2108
2109 ++flushCount;
2110
2111 // If we are flushing because too many deletes accumulated, then we should apply the deletes to free RAM
2112 if (docWriter->doApplyDeletes()) {
2113 flushDeletes = true;
2114 }
2115
2116 // Make sure no threads are actively adding a document. Returns true if docWriter is currently aborting, in
2117 // which case we skip flushing this segment
2118 if (infoStream) {
2119 message(L"flush: now pause all indexing threads");
2120 }
2121 if (docWriter->pauseAllThreads()) {
2122 docWriter->resumeAllThreads();
2123 return false;
2124 }
2125
2126 bool flushDocs = false;
2127
2128 LuceneException finally;
2129 try {
2130 SegmentInfoPtr newSegment;
2131
2132 int32_t numDocs = docWriter->getNumDocsInRAM();
2133
2134 // Always flush docs if there are any
2135 flushDocs = (numDocs > 0);
2136
2137 String docStoreSegment(docWriter->getDocStoreSegment());
2138
2139 BOOST_ASSERT(!docStoreSegment.empty() || numDocs == 0);
2140
2141 if (docStoreSegment.empty()) {
2142 flushDocStores = false;
2143 }
2144
2145 int32_t docStoreOffset = docWriter->getDocStoreOffset();
2146
2147 bool docStoreIsCompoundFile = false;
2148
2149 if (infoStream) {
2150 message(L" flush: segment=" + docWriter->getSegment() +
2151 L" docStoreSegment=" + StringUtils::toString(docWriter->getDocStoreSegment()) +
2152 L" docStoreOffset=" + StringUtils::toString(docStoreOffset) +
2153 L" flushDocs=" + StringUtils::toString(flushDocs) +
2154 L" flushDeletes=" + StringUtils::toString(flushDeletes) +
2155 L" flushDocStores=" + StringUtils::toString(flushDocStores) +
2156 L" numDocs=" + StringUtils::toString(numDocs) +
2157 L" numBufDelTerms=" + StringUtils::toString(docWriter->getNumBufferedDeleteTerms()));
2158 message(L" index before flush " + segString());
2159 }
2160
2161 // Check if the doc stores must be separately flushed because other segments, besides the one we are
2162 // about to flush, reference it
2163 if (flushDocStores && (!flushDocs || docWriter->getSegment() != docWriter->getDocStoreSegment())) {
2164 // We must separately flush the doc store
2165 if (infoStream) {
2166 message(L" flush shared docStore segment " + docStoreSegment);
2167 }
2168
2169 docStoreIsCompoundFile = IndexWriter::flushDocStores();
2170 flushDocStores = false;
2171 }
2172
2173 String segment(docWriter->getSegment());
2174
2175 // If we are flushing docs, segment must not be null
2176 BOOST_ASSERT(!segment.empty() || !flushDocs);
2177
2178 if (flushDocs) {
2179 bool success = false;
2180 int32_t flushedDocCount;
2181
2182 try {
2183 flushedDocCount = docWriter->flush(flushDocStores);
2184 if (infoStream) {
2185 message(L"flushedFiles=" + StringUtils::toString(docWriter->getFlushedFiles()));
2186 }
2187 success = true;
2188 } catch (LuceneException& e) {
2189 finally = e;
2190 }
2191
2192 if (!success) {
2193 if (infoStream) {
2194 message(L"hit exception flushing segment " + segment);
2195 }
2196 deleter->refresh(segment);
2197 }
2198
2199 finally.throwException();
2200
2201 if (docStoreOffset == 0 && flushDocStores) {
2202 // This means we are flushing private doc stores with this segment, so it will not be shared
2203 // with other segments
2204 BOOST_ASSERT(!docStoreSegment.empty());
2205 BOOST_ASSERT(docStoreSegment == segment);
2206 docStoreOffset = -1;
2207 docStoreIsCompoundFile = false;
2208 docStoreSegment.clear();
2209 }
2210
2211 // Create new SegmentInfo, but do not add to our segmentInfos until deletes are flushed successfully.
2212 newSegment = newLucene<SegmentInfo>(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter->hasProx());
2213 setDiagnostics(newSegment, L"flush");
2214 }
2215
2216 docWriter->pushDeletes();
2217
2218 if (flushDocs) {
2219 segmentInfos->add(newSegment);
2220 checkpoint();
2221 }
2222
2223 if (flushDocs && mergePolicy->useCompoundFile(segmentInfos, newSegment)) {
2224 // Now build compound file
2225 bool success = false;
2226 try {
2227 docWriter->createCompoundFile(segment);
2228 success = true;
2229 } catch (LuceneException& e) {
2230 finally = e;
2231 }
2232
2233 if (!success) {
2234 if (infoStream) {
2235 message(L"hit exception creating compound file for newly flushed segment " + segment);
2236 }
2237 deleter->deleteFile(segment + L"." + IndexFileNames::COMPOUND_FILE_EXTENSION());
2238 }
2239
2240 finally.throwException();
2241
2242 newSegment->setUseCompoundFile(true);
2243 checkpoint();
2244 }
2245
2246 if (flushDeletes) {
2247 applyDeletes();
2248 }
2249
2250 if (flushDocs) {
2251 checkpoint();
2252 }
2253
2254 doAfterFlush();
2255 } catch (std::bad_alloc& oom) {
2256 finally = handleOOM(oom, L"doFlush");
2257 flushDocs = false;
2258 } catch (LuceneException& e) {
2259 finally = e;
2260 }
2261 docWriter->resumeAllThreads();
2262 finally.throwException();
2263
2264 return flushDocs;
2265 }
2266
ramSizeInBytes()2267 int64_t IndexWriter::ramSizeInBytes() {
2268 ensureOpen();
2269 return docWriter->getRAMUsed();
2270 }
2271
numRamDocs()2272 int32_t IndexWriter::numRamDocs() {
2273 SyncLock syncLock(this);
2274 ensureOpen();
2275 return docWriter->getNumDocsInRAM();
2276 }
2277
ensureContiguousMerge(const OneMergePtr & merge)2278 int32_t IndexWriter::ensureContiguousMerge(const OneMergePtr& merge) {
2279 int32_t first = segmentInfos->find(merge->segments->info(0));
2280 if (first == -1) {
2281 boost::throw_exception(MergeException(L"Could not find segment " + merge->segments->info(0)->name + L" in current index " + segString()));
2282 }
2283
2284 int32_t numSegments = segmentInfos->size();
2285 int32_t numSegmentsToMerge = merge->segments->size();
2286
2287 for (int32_t i = 0; i < numSegmentsToMerge; ++i) {
2288 SegmentInfoPtr info(merge->segments->info(i));
2289
2290 if (first + i >= numSegments || !segmentInfos->info(first + i)->equals(info)) {
2291 if (!segmentInfos->contains(info)) {
2292 boost::throw_exception(MergeException(L"MergePolicy selected a segment (" + info->name + L") that is not in the current index " + segString()));
2293 } else {
2294 boost::throw_exception(MergeException(L"MergePolicy selected non-contiguous segments to merge (" + merge->segString(directory) + L" vs " + segString() + L"), which IndexWriter (currently) cannot handle"));
2295 }
2296 }
2297 }
2298
2299 return first;
2300 }
2301
commitMergedDeletes(const OneMergePtr & merge,const SegmentReaderPtr & mergeReader)2302 void IndexWriter::commitMergedDeletes(const OneMergePtr& merge, const SegmentReaderPtr& mergeReader) {
2303 SyncLock syncLock(this);
2304 BOOST_ASSERT(testPoint(L"startCommitMergeDeletes"));
2305
2306 SegmentInfosPtr sourceSegments(merge->segments);
2307
2308 if (infoStream) {
2309 message(L"commitMergeDeletes " + merge->segString(directory));
2310 }
2311
2312 // Carefully merge deletes that occurred after we started merging
2313 int32_t docUpto = 0;
2314 int32_t delCount = 0;
2315
2316 for (int32_t i = 0; i < sourceSegments->size(); ++i) {
2317 SegmentInfoPtr info(sourceSegments->info(i));
2318 int32_t docCount = info->docCount;
2319 SegmentReaderPtr previousReader(merge->readersClone[i]);
2320 SegmentReaderPtr currentReader(merge->readers[i]);
2321 if (previousReader->hasDeletions()) {
2322 // There were deletes on this segment when the merge started. The merge has collapsed away those deletes,
2323 // but if new deletes were flushed since the merge started, we must now carefully keep any newly flushed
2324 // deletes but mapping them to the new docIDs.
2325
2326 if (currentReader->numDeletedDocs() > previousReader->numDeletedDocs()) {
2327 // This means this segment has had new deletes committed since we started the merge, so we must merge them
2328 for (int32_t j = 0; j < docCount; ++j) {
2329 if (previousReader->isDeleted(j)) {
2330 BOOST_ASSERT(currentReader->isDeleted(j));
2331 } else {
2332 if (currentReader->isDeleted(j)) {
2333 mergeReader->doDelete(docUpto);
2334 ++delCount;
2335 }
2336 ++docUpto;
2337 }
2338 }
2339 } else {
2340 docUpto += docCount - previousReader->numDeletedDocs();
2341 }
2342 } else if (currentReader->hasDeletions()) {
2343 // This segment had no deletes before but now it does
2344 for (int32_t j = 0; j < docCount; ++j) {
2345 if (currentReader->isDeleted(j)) {
2346 mergeReader->doDelete(docUpto);
2347 ++delCount;
2348 }
2349 ++docUpto;
2350 }
2351 } else {
2352 // No deletes before or after
2353 docUpto += info->docCount;
2354 }
2355 }
2356
2357 BOOST_ASSERT(mergeReader->numDeletedDocs() == delCount);
2358
2359 mergeReader->_hasChanges = (delCount > 0);
2360 }
2361
commitMerge(const OneMergePtr & merge,const SegmentMergerPtr & merger,int32_t mergedDocCount,const SegmentReaderPtr & mergedReader)2362 bool IndexWriter::commitMerge(const OneMergePtr& merge, const SegmentMergerPtr& merger, int32_t mergedDocCount, const SegmentReaderPtr& mergedReader) {
2363 SyncLock syncLock(this);
2364 BOOST_ASSERT(testPoint(L"startCommitMerge"));
2365
2366 if (hitOOM) {
2367 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot complete merge"));
2368 }
2369
2370 if (infoStream) {
2371 message(L"commitMerge: " + merge->segString(directory) + L" index=" + segString());
2372 }
2373
2374 BOOST_ASSERT(merge->registerDone);
2375
2376 // If merge was explicitly aborted, or, if rollback() or rollbackTransaction() had been called since our merge
2377 // started (which results in an unqualified deleter.refresh() call that will remove any index file that current
2378 // segments does not reference), we abort this merge
2379 if (merge->isAborted()) {
2380 if (infoStream) {
2381 message(L"commitMerge: skipping merge " + merge->segString(directory) + L": it was aborted");
2382 }
2383 return false;
2384 }
2385
2386 int32_t start = ensureContiguousMerge(merge);
2387
2388 commitMergedDeletes(merge, mergedReader);
2389 docWriter->remapDeletes(segmentInfos, merger->getDocMaps(), merger->getDelCounts(), merge, mergedDocCount);
2390
2391 // If the doc store we are using has been closed and is in now compound format (but wasn't when we started),
2392 // then we will switch to the compound format as well
2393 setMergeDocStoreIsCompoundFile(merge);
2394
2395 merge->info->setHasProx(merger->hasProx());
2396
2397 segmentInfos->remove(start, start + merge->segments->size());
2398 BOOST_ASSERT(!segmentInfos->contains(merge->info));
2399 segmentInfos->add(start, merge->info);
2400
2401 closeMergeReaders(merge, false);
2402
2403 // Must note the change to segmentInfos so any commits in-flight don't lose it
2404 checkpoint();
2405
2406 // If the merged segments had pending changes, clear them so that they don't bother writing
2407 // them to disk, updating SegmentInfo, etc.
2408 readerPool->clear(merge->segments);
2409
2410 if (merge->optimize) {
2411 // cascade the optimize
2412 segmentsToOptimize.add(merge->info);
2413 }
2414 return true;
2415 }
2416
handleMergeException(const LuceneException & exc,const OneMergePtr & merge)2417 LuceneException IndexWriter::handleMergeException(const LuceneException& exc, const OneMergePtr& merge) {
2418 if (infoStream) {
2419 message(L"handleMergeException: merge=" + merge->segString(directory) + L" exc=" + exc.getError());
2420 }
2421
2422 // Set the exception on the merge, so if optimize() is waiting on us it sees the root cause exception
2423 merge->setException(exc);
2424 addMergeException(merge);
2425
2426 switch (exc.getType()) {
2427 case LuceneException::MergeAborted:
2428 // We can ignore this exception (it happens when close(false) or rollback is called), unless the
2429 // merge involves segments from external directories, in which case we must throw it so, for
2430 // example, the rollbackTransaction code in addIndexes* is executed.
2431 if (merge->isExternal) {
2432 return exc;
2433 }
2434 break;
2435 case LuceneException::IO:
2436 case LuceneException::Runtime:
2437 return exc;
2438 default:
2439 return RuntimeException(); // Should not get here
2440 }
2441 return LuceneException();
2442 }
2443
merge(const OneMergePtr & merge)2444 void IndexWriter::merge(const OneMergePtr& merge) {
2445 bool success = false;
2446
2447 try {
2448 LuceneException finally;
2449 try {
2450 try {
2451 mergeInit(merge);
2452 if (infoStream) {
2453 message(L"now merge\n merge=" + merge->segString(directory) + L"\n index=" + segString());
2454 }
2455
2456 mergeMiddle(merge);
2457 mergeSuccess(merge);
2458 success = true;
2459 } catch (LuceneException& e) {
2460 finally = handleMergeException(e, merge);
2461 }
2462
2463 {
2464 SyncLock syncLock(this);
2465 mergeFinish(merge);
2466
2467 if (!success) {
2468 if (infoStream) {
2469 message(L"hit exception during merge");
2470 }
2471
2472 if (merge->info && !segmentInfos->contains(merge->info)) {
2473 deleter->refresh(merge->info->name);
2474 }
2475 }
2476
2477 // This merge (and, generally, any change to the segments) may now enable
2478 // new merges, so we call merge policy & update pending merges.
2479 if (success && !merge->isAborted() && !closed && !closing) {
2480 updatePendingMerges(merge->maxNumSegmentsOptimize, merge->optimize);
2481 }
2482 }
2483 } catch (LuceneException& e) {
2484 finally = e;
2485 }
2486 finally.throwException();
2487 } catch (std::bad_alloc& oom) {
2488 boost::throw_exception(handleOOM(oom, L"merge"));
2489 }
2490 }
2491
mergeSuccess(const OneMergePtr & merge)2492 void IndexWriter::mergeSuccess(const OneMergePtr& merge) {
2493 // override
2494 }
2495
registerMerge(const OneMergePtr & merge)2496 bool IndexWriter::registerMerge(const OneMergePtr& merge) {
2497 SyncLock syncLock(this);
2498
2499 if (merge->registerDone) {
2500 return true;
2501 }
2502
2503 if (stopMerges) {
2504 merge->abort();
2505 boost::throw_exception(MergeAbortedException(L"merge is aborted: " + merge->segString(directory)));
2506 }
2507
2508 int32_t count = merge->segments->size();
2509 bool isExternal = false;
2510 for (int32_t i = 0; i < count; ++i) {
2511 SegmentInfoPtr info(merge->segments->info(i));
2512 if (mergingSegments.contains(info)) {
2513 return false;
2514 }
2515 if (!segmentInfos->contains(info)) {
2516 return false;
2517 }
2518 if (info->dir != directory) {
2519 isExternal = true;
2520 }
2521 if (segmentsToOptimize.contains(info)) {
2522 merge->optimize = true;
2523 merge->maxNumSegmentsOptimize = optimizeMaxNumSegments;
2524 }
2525 }
2526
2527 ensureContiguousMerge(merge);
2528
2529 pendingMerges.add(merge);
2530
2531 if (infoStream) {
2532 message(L"add merge to pendingMerges: " + merge->segString(directory) + L" [total " + StringUtils::toString(pendingMerges.size()) + L" pending]");
2533 }
2534
2535 merge->mergeGen = mergeGen;
2536 merge->isExternal = isExternal;
2537
2538 // OK it does not conflict; now record that this merge is running (while synchronized)
2539 // to avoid race condition where two conflicting merges from different threads, start
2540 for (int32_t i = 0; i < count; ++i) {
2541 mergingSegments.add(merge->segments->info(i));
2542 }
2543
2544 // Merge is now registered
2545 merge->registerDone = true;
2546 return true;
2547 }
2548
mergeInit(const OneMergePtr & merge)2549 void IndexWriter::mergeInit(const OneMergePtr& merge) {
2550 SyncLock syncLock(this);
2551 bool success = false;
2552 LuceneException finally;
2553 try {
2554 _mergeInit(merge);
2555 success = true;
2556 } catch (LuceneException& e) {
2557 finally = e;
2558 }
2559
2560 if (!success) {
2561 mergeFinish(merge);
2562 }
2563 finally.throwException();
2564 }
2565
_mergeInit(const OneMergePtr & merge)2566 void IndexWriter::_mergeInit(const OneMergePtr& merge) {
2567 SyncLock syncLock(this);
2568 bool test = testPoint(L"startMergeInit");
2569 BOOST_ASSERT(test);
2570
2571 BOOST_ASSERT(merge->registerDone);
2572 BOOST_ASSERT(!merge->optimize || merge->maxNumSegmentsOptimize > 0);
2573
2574 if (hitOOM) {
2575 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot merge"));
2576 }
2577
2578 if (merge->info) {
2579 // mergeInit already done
2580 return;
2581 }
2582
2583 if (merge->isAborted()) {
2584 return;
2585 }
2586
2587 applyDeletes();
2588
2589 SegmentInfosPtr sourceSegments(merge->segments);
2590 int32_t end = sourceSegments->size();
2591
2592 // Check whether this merge will allow us to skip merging the doc stores (stored field & vectors).
2593 // This is a very substantial optimization (saves tons of IO).
2594 DirectoryPtr lastDir(directory);
2595 String lastDocStoreSegment;
2596 int32_t next = -1;
2597
2598 bool mergeDocStores = false;
2599 bool doFlushDocStore = false;
2600 String currentDocStoreSegment(docWriter->getDocStoreSegment());
2601
2602 // Test each segment to be merged: check if we need to flush/merge doc stores
2603 for (int32_t i = 0; i < end; ++i) {
2604 SegmentInfoPtr si(sourceSegments->info(i));
2605
2606 // If it has deletions we must merge the doc stores
2607 if (si->hasDeletions()) {
2608 mergeDocStores = true;
2609 }
2610
2611 // If it has its own (private) doc stores we must merge the doc stores
2612 if (si->getDocStoreOffset() == -1) {
2613 mergeDocStores = true;
2614 }
2615
2616 // If it has a different doc store segment than previous segments, we must merge the doc stores
2617 String docStoreSegment(si->getDocStoreSegment());
2618 if (docStoreSegment.empty()) {
2619 mergeDocStores = true;
2620 } else if (lastDocStoreSegment.empty()) {
2621 lastDocStoreSegment = docStoreSegment;
2622 } else if (lastDocStoreSegment != docStoreSegment) {
2623 mergeDocStores = true;
2624 }
2625
2626 // Segments' docScoreOffsets must be in-order, contiguous. For the default merge policy now
2627 // this will always be the case but for an arbitrary merge policy this may not be the case
2628 if (next == -1) {
2629 next = si->getDocStoreOffset() + si->docCount;
2630 } else if (next != si->getDocStoreOffset()) {
2631 mergeDocStores = true;
2632 } else {
2633 next = si->getDocStoreOffset() + si->docCount;
2634 }
2635
2636 // If the segment comes from a different directory we must merge
2637 if (lastDir != si->dir) {
2638 mergeDocStores = true;
2639 }
2640
2641 // If the segment is referencing the current "live" doc store outputs then we must merge
2642 if (si->getDocStoreOffset() != -1 && !currentDocStoreSegment.empty() && si->getDocStoreSegment() == currentDocStoreSegment) {
2643 doFlushDocStore = true;
2644 }
2645 }
2646
2647 // if a mergedSegmentWarmer is installed, we must merge the doc stores because we will open a full
2648 // SegmentReader on the merged segment
2649 if (!mergeDocStores && mergedSegmentWarmer && !currentDocStoreSegment.empty() && !lastDocStoreSegment.empty() && lastDocStoreSegment == currentDocStoreSegment) {
2650 mergeDocStores = true;
2651 }
2652
2653 int32_t docStoreOffset;
2654 String docStoreSegment;
2655 bool docStoreIsCompoundFile;
2656
2657 if (mergeDocStores) {
2658 docStoreOffset = -1;
2659 docStoreSegment.clear();
2660 docStoreIsCompoundFile = false;
2661 } else {
2662 SegmentInfoPtr si(sourceSegments->info(0));
2663 docStoreOffset = si->getDocStoreOffset();
2664 docStoreSegment = si->getDocStoreSegment();
2665 docStoreIsCompoundFile = si->getDocStoreIsCompoundFile();
2666 }
2667
2668 if (mergeDocStores && doFlushDocStore) {
2669 // SegmentMerger intends to merge the doc stores (stored fields, vectors), and at
2670 // least one of the segments to be merged refers to the currently live doc stores.
2671 if (infoStream) {
2672 message(L"now flush at merge");
2673 }
2674 doFlush(true, false);
2675 }
2676
2677 merge->mergeDocStores = mergeDocStores;
2678
2679 // Bind a new segment name here so even with ConcurrentMergePolicy we keep deterministic segment names.
2680 merge->info = newLucene<SegmentInfo>(newSegmentName(), 0, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, false);
2681
2682 MapStringString details(MapStringString::newInstance());
2683 details.put(L"optimize", StringUtils::toString(merge->optimize));
2684 details.put(L"mergeFactor", StringUtils::toString(end));
2685 details.put(L"mergeDocStores", StringUtils::toString(mergeDocStores));
2686 setDiagnostics(merge->info, L"merge", details);
2687
2688 // Also enroll the merged segment into mergingSegments; this prevents it from getting
2689 // selected for a merge after our merge is done but while we are building the CFS
2690 mergingSegments.add(merge->info);
2691 }
2692
setDiagnostics(const SegmentInfoPtr & info,const String & source)2693 void IndexWriter::setDiagnostics(const SegmentInfoPtr& info, const String& source) {
2694 setDiagnostics(info, source, MapStringString());
2695 }
2696
setDiagnostics(const SegmentInfoPtr & info,const String & source,MapStringString details)2697 void IndexWriter::setDiagnostics(const SegmentInfoPtr& info, const String& source, MapStringString details) {
2698 MapStringString diagnostics(MapStringString::newInstance());
2699 diagnostics.put(L"source", source);
2700 diagnostics.put(L"lucene.version", Constants::LUCENE_VERSION);
2701 diagnostics.put(L"os", Constants::OS_NAME);
2702 if (details) {
2703 diagnostics.putAll(details.begin(), details.end());
2704 }
2705 info->setDiagnostics(diagnostics);
2706 }
2707
mergeFinish(const OneMergePtr & merge)2708 void IndexWriter::mergeFinish(const OneMergePtr& merge) {
2709 SyncLock syncLock(this);
2710 // Optimize, addIndexes or finishMerges may be waiting on merges to finish.
2711 notifyAll();
2712
2713 // It's possible we are called twice, eg if there was an exception inside mergeInit
2714 if (merge->registerDone) {
2715 SegmentInfosPtr sourceSegments(merge->segments);
2716 int32_t end = sourceSegments->size();
2717 for (int32_t i = 0; i < end; ++i) {
2718 mergingSegments.remove(sourceSegments->info(i));
2719 }
2720
2721 mergingSegments.remove(merge->info);
2722 merge->registerDone = false;
2723 }
2724
2725 runningMerges.remove(merge);
2726 }
2727
setMergeDocStoreIsCompoundFile(const OneMergePtr & merge)2728 void IndexWriter::setMergeDocStoreIsCompoundFile(const OneMergePtr& merge) {
2729 SyncLock syncLock(this);
2730
2731 String mergeDocStoreSegment(merge->info->getDocStoreSegment());
2732 if (!mergeDocStoreSegment.empty() && !merge->info->getDocStoreIsCompoundFile()) {
2733 int32_t size = segmentInfos->size();
2734 for (int32_t i = 0; i < size; ++i) {
2735 SegmentInfoPtr info(segmentInfos->info(i));
2736 String docStoreSegment(info->getDocStoreSegment());
2737 if (!docStoreSegment.empty() && docStoreSegment == mergeDocStoreSegment && info->getDocStoreIsCompoundFile()) {
2738 merge->info->setDocStoreIsCompoundFile(true);
2739 break;
2740 }
2741 }
2742 }
2743 }
2744
closeMergeReaders(const OneMergePtr & merge,bool suppressExceptions)2745 void IndexWriter::closeMergeReaders(const OneMergePtr& merge, bool suppressExceptions) {
2746 SyncLock syncLock(this);
2747
2748 int32_t numSegments = merge->segments->size();
2749 if (suppressExceptions) {
2750 // Suppress any new exceptions so we throw the original cause
2751 for (int32_t i = 0; i < numSegments; ++i) {
2752 if (merge->readers[i]) {
2753 try {
2754 readerPool->release(merge->readers[i], false);
2755 } catch (...) {
2756 }
2757 merge->readers[i].reset();
2758 }
2759
2760 if (merge->readersClone[i]) {
2761 try {
2762 merge->readersClone[i]->close();
2763 } catch (...) {
2764 }
2765 // This was a private clone and we had the only reference
2766 BOOST_ASSERT(merge->readersClone[i]->getRefCount() == 0);
2767 merge->readersClone[i].reset();
2768 }
2769 }
2770 } else {
2771 for (int32_t i = 0; i < numSegments; ++i) {
2772 if (merge->readers[i]) {
2773 readerPool->release(merge->readers[i], true);
2774 merge->readers[i].reset();
2775 }
2776
2777 if (merge->readersClone[i]) {
2778 merge->readersClone[i]->close();
2779 // This was a private clone and we had the only reference
2780 BOOST_ASSERT(merge->readersClone[i]->getRefCount() == 0);
2781 merge->readersClone[i].reset();
2782 }
2783 }
2784 }
2785 }
2786
mergeMiddle(const OneMergePtr & merge)2787 int32_t IndexWriter::mergeMiddle(const OneMergePtr& merge) {
2788 merge->checkAborted(directory);
2789
2790 String mergedName(merge->info->name);
2791 int32_t mergedDocCount = 0;
2792
2793 SegmentInfosPtr sourceSegments(merge->segments);
2794 int32_t numSegments = sourceSegments->size();
2795
2796 if (infoStream) {
2797 message(L"merging " + merge->segString(directory));
2798 }
2799
2800 SegmentMergerPtr merger(newLucene<SegmentMerger>(shared_from_this(), mergedName, merge));
2801
2802 merge->readers = Collection<SegmentReaderPtr>::newInstance(numSegments);
2803 merge->readersClone = Collection<SegmentReaderPtr>::newInstance(numSegments);
2804
2805 bool mergeDocStores = false;
2806
2807 String currentDocStoreSegment;
2808 {
2809 SyncLock syncLock(this);
2810 currentDocStoreSegment = docWriter->getDocStoreSegment();
2811 }
2812
2813 bool currentDSSMerged = false;
2814
2815 LuceneException finally;
2816 // This is try/finally to make sure merger's readers are closed
2817 bool success = false;
2818 try {
2819 int32_t totDocCount = 0;
2820 for (int32_t i = 0; i < numSegments; ++i) {
2821 SegmentInfoPtr info(sourceSegments->info(i));
2822
2823 // Hold onto the "live" reader; we will use this to commit merged deletes
2824 merge->readers[i] = readerPool->get(info, merge->mergeDocStores, MERGE_READ_BUFFER_SIZE, -1);
2825 SegmentReaderPtr reader(merge->readers[i]);
2826
2827 // We clone the segment readers because other deletes may come in while we're merging so we need readers that will not change
2828 merge->readersClone[i] = boost::dynamic_pointer_cast<SegmentReader>(reader->clone(true));
2829 SegmentReaderPtr clone(merge->readersClone[i]);
2830 merger->add(clone);
2831
2832 if (clone->hasDeletions()) {
2833 mergeDocStores = true;
2834 }
2835
2836 if (info->getDocStoreOffset() != -1 && !currentDocStoreSegment.empty()) {
2837 currentDSSMerged = currentDSSMerged || (currentDocStoreSegment == info->getDocStoreSegment());
2838 }
2839
2840 totDocCount += clone->numDocs();
2841 }
2842
2843 if (infoStream) {
2844 message(L"merge: total " + StringUtils::toString(totDocCount) + L" docs");
2845 }
2846
2847 merge->checkAborted(directory);
2848
2849 // If deletions have arrived and it has now become necessary to merge doc stores, go and open them
2850 if (mergeDocStores && !merge->mergeDocStores) {
2851 merge->mergeDocStores = true;
2852
2853 {
2854 SyncLock syncLock(this);
2855 if (currentDSSMerged) {
2856 if (infoStream) {
2857 message(L"now flush at mergeMiddle");
2858 }
2859 doFlush(true, false);
2860 }
2861 }
2862
2863 for (Collection<SegmentReaderPtr>::iterator reader = merge->readersClone.begin(); reader != merge->readersClone.end(); ++reader) {
2864 (*reader)->openDocStores();
2865 }
2866
2867 // Clear DSS
2868 merge->info->setDocStore(-1, L"", false);
2869 }
2870
2871 // This is where all the work happens
2872 merge->info->docCount = merger->merge(merge->mergeDocStores);
2873 mergedDocCount = merge->info->docCount;
2874
2875 BOOST_ASSERT(mergedDocCount == totDocCount);
2876
2877 if (merge->useCompoundFile) {
2878 success = false;
2879
2880 String compoundFileName(IndexFileNames::segmentFileName(mergedName, IndexFileNames::COMPOUND_FILE_EXTENSION()));
2881
2882 try {
2883 if (infoStream) {
2884 message(L"create compound file " + compoundFileName);
2885 }
2886 merger->createCompoundFile(compoundFileName);
2887 success = true;
2888 } catch (IOException& ioe) {
2889 SyncLock syncLock(this);
2890 if (merge->isAborted()) {
2891 // This can happen if rollback or close(false) is called - fall through to logic
2892 // below to remove the partially created CFS
2893 } else {
2894 finally = handleMergeException(ioe, merge);
2895 }
2896 } catch (LuceneException& e) {
2897 finally = handleMergeException(e, merge);
2898 }
2899
2900 if (!success) {
2901 if (infoStream) {
2902 message(L"hit exception creating compound file during merge");
2903 }
2904 {
2905 SyncLock syncLock(this);
2906 deleter->deleteFile(compoundFileName);
2907 deleter->deleteNewFiles(merger->getMergedFiles());
2908 }
2909 }
2910
2911 finally.throwException();
2912
2913 success = false;
2914
2915 {
2916 SyncLock syncLock(this);
2917
2918 // delete new non cfs files directly: they were never registered with IFD
2919 deleter->deleteNewFiles(merger->getMergedFiles());
2920
2921 if (merge->isAborted()) {
2922 if (infoStream) {
2923 message(L"abort merge after building CFS");
2924 }
2925 deleter->deleteFile(compoundFileName);
2926 boost::throw_exception(TemporaryException());
2927 }
2928 }
2929
2930 merge->info->setUseCompoundFile(true);
2931 }
2932
2933 int32_t termsIndexDivisor = -1;
2934 bool loadDocStores = false;
2935
2936 // if the merged segment warmer was not installed when this merge was started, causing us
2937 // to not force the docStores to close, we can't warm it now
2938 bool canWarm = (merge->info->getDocStoreSegment().empty() || currentDocStoreSegment.empty() || merge->info->getDocStoreSegment() == currentDocStoreSegment);
2939
2940 if (poolReaders && mergedSegmentWarmer && canWarm) {
2941 // Load terms index & doc stores so the segment warmer can run searches, load documents/term vectors
2942 termsIndexDivisor = readerTermsIndexDivisor;
2943 loadDocStores = true;
2944 }
2945
2946 SegmentReaderPtr mergedReader(readerPool->get(merge->info, loadDocStores, BufferedIndexInput::BUFFER_SIZE, termsIndexDivisor));
2947
2948 try {
2949 if (poolReaders && mergedSegmentWarmer) {
2950 mergedSegmentWarmer->warm(mergedReader);
2951 }
2952 if (!commitMerge(merge, merger, mergedDocCount, mergedReader)) {
2953 // commitMerge will return false if this merge was aborted
2954 boost::throw_exception(TemporaryException());
2955 }
2956 } catch (LuceneException& e) {
2957 finally = e;
2958 }
2959
2960 {
2961 SyncLock syncLock(this);
2962 readerPool->release(mergedReader);
2963 }
2964
2965 finally.throwException();
2966
2967 success = true;
2968 } catch (LuceneException& e) {
2969 finally = e;
2970 }
2971
2972 // Readers are already closed in commitMerge if we didn't hit an exc
2973 if (!success) {
2974 closeMergeReaders(merge, true);
2975 }
2976
2977 // has this merge been aborted?
2978 if (finally.getType() == LuceneException::Temporary) {
2979 return 0;
2980 }
2981
2982 finally.throwException();
2983
2984 return mergedDocCount;
2985 }
2986
addMergeException(const OneMergePtr & merge)2987 void IndexWriter::addMergeException(const OneMergePtr& merge) {
2988 SyncLock syncLock(this);
2989 BOOST_ASSERT(!merge->getException().isNull());
2990 if (!mergeExceptions.contains(merge) && mergeGen == merge->mergeGen) {
2991 mergeExceptions.add(merge);
2992 }
2993 }
2994
applyDeletes()2995 bool IndexWriter::applyDeletes() {
2996 TestScope testScope(L"IndexWriter", L"applyDeletes");
2997 SyncLock syncLock(this);
2998 BOOST_ASSERT(testPoint(L"startApplyDeletes"));
2999 ++flushDeletesCount;
3000 bool success = false;
3001 bool changed = false;
3002
3003 LuceneException finally;
3004 try {
3005 changed = docWriter->applyDeletes(segmentInfos);
3006 success = true;
3007 } catch (LuceneException& e) {
3008 finally = e;
3009 }
3010
3011 if (!success && infoStream) {
3012 message(L"hit exception flushing deletes");
3013 }
3014
3015 finally.throwException();
3016
3017 if (changed) {
3018 checkpoint();
3019 }
3020 return changed;
3021 }
3022
getBufferedDeleteTermsSize()3023 int32_t IndexWriter::getBufferedDeleteTermsSize() {
3024 SyncLock syncLock(this);
3025 return docWriter->getBufferedDeleteTerms().size();
3026 }
3027
getNumBufferedDeleteTerms()3028 int32_t IndexWriter::getNumBufferedDeleteTerms() {
3029 SyncLock syncLock(this);
3030 return docWriter->getNumBufferedDeleteTerms();
3031 }
3032
newestSegment()3033 SegmentInfoPtr IndexWriter::newestSegment() {
3034 return !segmentInfos->empty() ? segmentInfos->info(segmentInfos->size() - 1) : SegmentInfoPtr();
3035 }
3036
segString()3037 String IndexWriter::segString() {
3038 return segString(segmentInfos);
3039 }
3040
segString(const SegmentInfosPtr & infos)3041 String IndexWriter::segString(const SegmentInfosPtr& infos) {
3042 SyncLock syncLock(this);
3043 StringStream buffer;
3044 int32_t count = infos->size();
3045 for (int32_t i = 0; i < count; ++i) {
3046 if (i > 0) {
3047 buffer << L" ";
3048 }
3049 SegmentInfoPtr info(infos->info(i));
3050 buffer << info->segString(directory);
3051 if (info->dir != directory) {
3052 buffer << L"**";
3053 }
3054 }
3055 return buffer.str();
3056 }
3057
startSync(const String & fileName,HashSet<String> pending)3058 bool IndexWriter::startSync(const String& fileName, HashSet<String> pending) {
3059 SyncLock syncedLock(&synced);
3060 if (!synced.contains(fileName)) {
3061 if (!syncing.contains(fileName)) {
3062 syncing.add(fileName);
3063 return true;
3064 } else {
3065 pending.add(fileName);
3066 return false;
3067 }
3068 } else {
3069 return false;
3070 }
3071 }
3072
finishSync(const String & fileName,bool success)3073 void IndexWriter::finishSync(const String& fileName, bool success) {
3074 SyncLock syncedLock(&synced);
3075 BOOST_ASSERT(syncing.contains(fileName));
3076 syncing.remove(fileName);
3077 if (success) {
3078 synced.add(fileName);
3079 }
3080 synced.notifyAll();
3081 }
3082
waitForAllSynced(HashSet<String> syncing)3083 bool IndexWriter::waitForAllSynced(HashSet<String> syncing) {
3084 SyncLock syncedLock(&synced);
3085 for (HashSet<String>::iterator fileName = syncing.begin(); fileName != syncing.end(); ++fileName) {
3086 while (!synced.contains(*fileName)) {
3087 if (!syncing.contains(*fileName)) {
3088 // There was an error because a file that was previously syncing failed to appear in synced
3089 return false;
3090 } else {
3091 synced.wait();
3092 }
3093 }
3094 }
3095 return true;
3096 }
3097
doWait()3098 void IndexWriter::doWait() {
3099 SyncLock syncLock(this);
3100 // NOTE: the callers of this method should in theory be able to do simply wait(), but, as a defense against
3101 // thread timing hazards where notifyAll() fails to be called, we wait for at most 1 second and then return
3102 // so caller can check if wait conditions are satisfied
3103 wait(1000);
3104 }
3105
startCommit(int64_t sizeInBytes,MapStringString commitUserData)3106 void IndexWriter::startCommit(int64_t sizeInBytes, MapStringString commitUserData) {
3107 BOOST_ASSERT(testPoint(L"startStartCommit"));
3108
3109 if (hitOOM) {
3110 boost::throw_exception(IllegalStateException(L"this writer hit an OutOfMemoryError; cannot commit"));
3111 }
3112
3113 try {
3114 if (infoStream) {
3115 message(L"startCommit(): start sizeInBytes=" + StringUtils::toString(sizeInBytes));
3116 }
3117
3118 SegmentInfosPtr toSync;
3119 int64_t myChangeCount = 0;
3120 LuceneException finally;
3121
3122 {
3123 SyncLock syncLock(this);
3124
3125 // Wait for any running addIndexes to complete first, then block any from running
3126 // until we've copied the segmentInfos we intend to sync
3127 blockAddIndexes(false);
3128
3129 // On commit the segmentInfos must never reference a segment in another directory
3130 BOOST_ASSERT(!hasExternalSegments());
3131
3132 try {
3133 BOOST_ASSERT(lastCommitChangeCount <= changeCount);
3134 myChangeCount = changeCount;
3135
3136 if (changeCount == lastCommitChangeCount) {
3137 if (infoStream) {
3138 message(L" skip startCommit(): no changes pending");
3139 }
3140 boost::throw_exception(TemporaryException());
3141 }
3142
3143 // First, we clone & incref the segmentInfos we intend to sync, then, without locking, we sync() each
3144 // file referenced by toSync, in the background. Multiple threads can be doing this at once, if say
3145 // a large merge and a small merge finish at the same time
3146
3147 if (infoStream) {
3148 message(L"startCommit index=" + segString(segmentInfos) + L" changeCount=" + StringUtils::toString(changeCount));
3149 }
3150
3151 readerPool->commit();
3152
3153 // It's possible another flush (that did not close the open do stores) snook in after the flush we
3154 // just did, so we remove any tail segments referencing the open doc store from the SegmentInfos
3155 // we are about to sync (the main SegmentInfos will keep them)
3156 toSync = boost::dynamic_pointer_cast<SegmentInfos>(segmentInfos->clone());
3157
3158 String dss(docWriter->getDocStoreSegment());
3159 if (!dss.empty()) {
3160 while (true) {
3161 String dss2(toSync->info(toSync->size() - 1)->getDocStoreSegment());
3162 if (dss2.empty() || dss2 != dss) {
3163 break;
3164 }
3165 toSync->remove(toSync->size() - 1);
3166 ++changeCount;
3167 }
3168 }
3169
3170 if (commitUserData) {
3171 toSync->setUserData(commitUserData);
3172 }
3173
3174 deleter->incRef(toSync, false);
3175
3176 HashSet<String> files(toSync->files(directory, false));
3177 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
3178 BOOST_ASSERT(directory->fileExists(*fileName));
3179
3180 // If this trips it means we are missing a call to .checkpoint somewhere, because by the
3181 // time we are called, deleter should know about every file referenced by the current head
3182 // segmentInfos
3183 BOOST_ASSERT(deleter->exists(*fileName));
3184 }
3185 } catch (LuceneException& e) {
3186 finally = e;
3187 }
3188 resumeAddIndexes();
3189
3190 // no changes pending?
3191 if (finally.getType() == LuceneException::Temporary) {
3192 return;
3193 }
3194
3195 finally.throwException();
3196 }
3197
3198 BOOST_ASSERT(testPoint(L"midStartCommit"));
3199
3200 bool setPending = false;
3201
3202 try {
3203 // Loop until all files toSync references are sync'd
3204 while (true) {
3205 HashSet<String> pending(HashSet<String>::newInstance());
3206 HashSet<String> files(toSync->files(directory, false));
3207 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
3208 if (startSync(*fileName, pending)) {
3209 bool success = false;
3210 try {
3211 // Because we incRef'd this commit point above, the file had better exist
3212 BOOST_ASSERT(directory->fileExists(*fileName));
3213
3214 if (infoStream) {
3215 message(L"now sync " + *fileName);
3216 }
3217 directory->sync(*fileName);
3218 success = true;
3219 } catch (LuceneException& e) {
3220 finally = e;
3221 }
3222 finishSync(*fileName, success);
3223 finally.throwException();
3224 }
3225 }
3226
3227 // All files that I require are either synced or being synced by other threads. If they are being
3228 // synced, we must at this point block until they are done. If this returns false, that means an
3229 // error in another thread resulted in failing to actually sync one of our files, so we repeat
3230 if (waitForAllSynced(pending)) {
3231 break;
3232 }
3233 }
3234
3235 BOOST_ASSERT(testPoint(L"midStartCommit2"));
3236
3237 {
3238 SyncLock syncLock(this);
3239
3240 // If someone saved a newer version of segments file since I first started syncing
3241 // my version, I can safely skip saving myself since I've been superseded
3242
3243 while (true) {
3244 if (myChangeCount <= lastCommitChangeCount) {
3245 if (infoStream) {
3246 message(L"sync superseded by newer infos");
3247 }
3248 break;
3249 } else if (!pendingCommit) {
3250 // My turn to commit
3251 if (segmentInfos->getGeneration() > toSync->getGeneration()) {
3252 toSync->updateGeneration(segmentInfos);
3253 }
3254
3255 bool success = false;
3256 try {
3257 // Exception here means nothing is prepared (this method unwinds
3258 // everything it did on an exception)
3259 try {
3260 toSync->prepareCommit(directory);
3261 } catch (LuceneException& e) {
3262 finally = e;
3263 }
3264
3265 // Have our master segmentInfos record the generations we just prepared. We do this on
3266 // error or success so we don't double-write a segments_N file.
3267 segmentInfos->updateGeneration(toSync);
3268 finally.throwException();
3269
3270 BOOST_ASSERT(!pendingCommit);
3271 setPending = true;
3272 pendingCommit = toSync;
3273 pendingCommitChangeCount = myChangeCount;
3274 success = true;
3275 } catch (LuceneException& e) {
3276 finally = e;
3277 }
3278
3279 if (!success && infoStream) {
3280 message(L"hit exception committing segments file");
3281 }
3282 finally.throwException();
3283 break;
3284 } else {
3285 // Must wait for other commit to complete
3286 doWait();
3287 }
3288 }
3289 }
3290
3291 if (infoStream) {
3292 message(L"done all syncs");
3293 }
3294 BOOST_ASSERT(testPoint(L"midStartCommitSuccess"));
3295 } catch (LuceneException& e) {
3296 finally = e;
3297 }
3298
3299 {
3300 SyncLock syncLock(this);
3301 if (!setPending) {
3302 deleter->decRef(toSync);
3303 }
3304 }
3305 finally.throwException();
3306 } catch (std::bad_alloc& oom) {
3307 boost::throw_exception(handleOOM(oom, L"startCommit"));
3308 }
3309 BOOST_ASSERT(testPoint(L"finishStartCommit"));
3310 }
3311
isLocked(const DirectoryPtr & directory)3312 bool IndexWriter::isLocked(const DirectoryPtr& directory) {
3313 return directory->makeLock(WRITE_LOCK_NAME)->isLocked();
3314 }
3315
unlock(const DirectoryPtr & directory)3316 void IndexWriter::unlock(const DirectoryPtr& directory) {
3317 directory->makeLock(IndexWriter::WRITE_LOCK_NAME)->release();
3318 }
3319
setMergedSegmentWarmer(const IndexReaderWarmerPtr & warmer)3320 void IndexWriter::setMergedSegmentWarmer(const IndexReaderWarmerPtr& warmer) {
3321 mergedSegmentWarmer = warmer;
3322 }
3323
getMergedSegmentWarmer()3324 IndexReaderWarmerPtr IndexWriter::getMergedSegmentWarmer() {
3325 return mergedSegmentWarmer;
3326 }
3327
handleOOM(const std::bad_alloc & oom,const String & location)3328 LuceneException IndexWriter::handleOOM(const std::bad_alloc& oom, const String& location) {
3329 if (infoStream) {
3330 message(L"hit OutOfMemoryError inside " + location);
3331 }
3332 hitOOM = true;
3333 return OutOfMemoryError();
3334 }
3335
testPoint(const String & name)3336 bool IndexWriter::testPoint(const String& name) {
3337 return true;
3338 }
3339
nrtIsCurrent(const SegmentInfosPtr & infos)3340 bool IndexWriter::nrtIsCurrent(const SegmentInfosPtr& infos) {
3341 SyncLock syncLock(this);
3342 if (!infos->equals(segmentInfos)) {
3343 // if any structural changes (new segments), we are stale
3344 return false;
3345 } else if (infos->getGeneration() != segmentInfos->getGeneration()) {
3346 // if any commit took place since we were opened, we are stale
3347 return false;
3348 } else {
3349 return !docWriter->anyChanges();
3350 }
3351 }
3352
isClosed()3353 bool IndexWriter::isClosed() {
3354 SyncLock syncLock(this);
3355 return closed;
3356 }
3357
ReaderPool(const IndexWriterPtr & writer)3358 ReaderPool::ReaderPool(const IndexWriterPtr& writer) {
3359 readerMap = MapSegmentInfoSegmentReader::newInstance();
3360 _indexWriter = writer;
3361 }
3362
~ReaderPool()3363 ReaderPool::~ReaderPool() {
3364 }
3365
clear(const SegmentInfosPtr & infos)3366 void ReaderPool::clear(const SegmentInfosPtr& infos) {
3367 SyncLock syncLock(this);
3368 if (!infos) {
3369 for (MapSegmentInfoSegmentReader::iterator ent = readerMap.begin(); ent != readerMap.end(); ++ent) {
3370 ent->second->_hasChanges = false;
3371 }
3372 } else {
3373 for (int32_t i = 0; i < infos->size(); ++i) {
3374 MapSegmentInfoSegmentReader::iterator ent = readerMap.find(infos->info(i));
3375 if (ent != readerMap.end()) {
3376 ent->second->_hasChanges = false;
3377 }
3378 }
3379 }
3380 }
3381
infoIsLive(const SegmentInfoPtr & info)3382 bool ReaderPool::infoIsLive(const SegmentInfoPtr& info) {
3383 SyncLock syncLock(this);
3384 IndexWriterPtr indexWriter(_indexWriter);
3385 int32_t idx = indexWriter->segmentInfos->find(info);
3386 BOOST_ASSERT(idx != -1);
3387 BOOST_ASSERT(indexWriter->segmentInfos->info(idx) == info);
3388 return true;
3389 }
3390
mapToLive(const SegmentInfoPtr & info)3391 SegmentInfoPtr ReaderPool::mapToLive(const SegmentInfoPtr& info) {
3392 SyncLock syncLock(this);
3393 IndexWriterPtr indexWriter(_indexWriter);
3394 int32_t idx = indexWriter->segmentInfos->find(info);
3395 SegmentInfoPtr _info(info);
3396 if (idx != -1) {
3397 _info = indexWriter->segmentInfos->info(idx);
3398 }
3399 return _info;
3400 }
3401
release(const SegmentReaderPtr & sr)3402 void ReaderPool::release(const SegmentReaderPtr& sr) {
3403 release(sr, false);
3404 }
3405
release(const SegmentReaderPtr & sr,bool drop)3406 void ReaderPool::release(const SegmentReaderPtr& sr, bool drop) {
3407 SyncLock syncLock(this);
3408 IndexWriterPtr indexWriter(_indexWriter);
3409
3410 bool pooled = readerMap.contains(sr->getSegmentInfo());
3411
3412 BOOST_ASSERT(!pooled || readerMap.get(sr->getSegmentInfo()) == sr);
3413
3414 // Drop caller's ref; for an external reader (not pooled), this decRef will close it
3415 sr->decRef();
3416
3417 if (pooled && (drop || (!indexWriter->poolReaders && sr->getRefCount() == 1))) {
3418 // We invoke deleter.checkpoint below, so we must be sync'd on IW if there are changes
3419 BOOST_ASSERT(!sr->_hasChanges || holdsLock());
3420
3421 // Discard (don't save) changes when we are dropping the reader; this is used only on the
3422 // sub-readers after a successful merge.
3423 sr->_hasChanges = sr->_hasChanges && !drop;
3424
3425 bool hasChanges = sr->_hasChanges;
3426
3427 // Drop our ref - this will commit any pending changes to the dir
3428 sr->close();
3429
3430 // We are the last ref to this reader; since we're not pooling readers, we release it
3431 readerMap.remove(sr->getSegmentInfo());
3432
3433 if (hasChanges) {
3434 // Must checkpoint with deleter, because this segment reader will have created new
3435 // _X_N.del file.
3436 indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3437 }
3438 }
3439 }
3440
close()3441 void ReaderPool::close() {
3442 SyncLock syncLock(this);
3443 IndexWriterPtr indexWriter(_indexWriter);
3444
3445 // We invoke deleter.checkpoint below, so we must be sync'd on IW
3446 BOOST_ASSERT(holdsLock());
3447
3448 for (MapSegmentInfoSegmentReader::iterator iter = readerMap.begin(); iter != readerMap.end(); ++iter) {
3449 if (iter->second->_hasChanges) {
3450 BOOST_ASSERT(infoIsLive(iter->second->getSegmentInfo()));
3451 iter->second->doCommit(MapStringString());
3452
3453 // Must checkpoint with deleter, because this segment reader will have created
3454 // new _X_N.del file.
3455 indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3456 }
3457
3458 // NOTE: it is allowed that this decRef does not actually close the SR; this can happen when a
3459 // near real-time reader is kept open after the IndexWriter instance is closed
3460 iter->second->decRef();
3461 }
3462 readerMap.clear();
3463 }
3464
commit()3465 void ReaderPool::commit() {
3466 SyncLock syncLock(this);
3467 IndexWriterPtr indexWriter(_indexWriter);
3468
3469 // We invoke deleter.checkpoint below, so we must be sync'd on IW
3470 BOOST_ASSERT(holdsLock());
3471
3472 for (MapSegmentInfoSegmentReader::iterator ent = readerMap.begin(); ent != readerMap.end(); ++ent) {
3473 if (ent->second->_hasChanges) {
3474 BOOST_ASSERT(infoIsLive(ent->second->getSegmentInfo()));
3475 ent->second->doCommit(MapStringString());
3476
3477 // Must checkpoint with deleter, because this segment reader will have created
3478 // new _X_N.del file.
3479 indexWriter->deleter->checkpoint(indexWriter->segmentInfos, false);
3480 }
3481 }
3482 }
3483
getReadOnlyClone(const SegmentInfoPtr & info,bool doOpenStores,int32_t termInfosIndexDivisor)3484 IndexReaderPtr ReaderPool::getReadOnlyClone(const SegmentInfoPtr& info, bool doOpenStores, int32_t termInfosIndexDivisor) {
3485 SyncLock syncLock(this);
3486 SegmentReaderPtr sr(get(info, doOpenStores, BufferedIndexInput::BUFFER_SIZE, termInfosIndexDivisor));
3487 IndexReaderPtr clone;
3488 LuceneException finally;
3489 try {
3490 clone = boost::dynamic_pointer_cast<IndexReader>(sr->clone(true));
3491 } catch (LuceneException& e) {
3492 finally = e;
3493 }
3494 sr->decRef();
3495 finally.throwException();
3496 return clone;
3497 }
3498
get(const SegmentInfoPtr & info,bool doOpenStores)3499 SegmentReaderPtr ReaderPool::get(const SegmentInfoPtr& info, bool doOpenStores) {
3500 return get(info, doOpenStores, BufferedIndexInput::BUFFER_SIZE, IndexWriterPtr(_indexWriter)->readerTermsIndexDivisor);
3501 }
3502
get(const SegmentInfoPtr & info,bool doOpenStores,int32_t readBufferSize,int32_t termsIndexDivisor)3503 SegmentReaderPtr ReaderPool::get(const SegmentInfoPtr& info, bool doOpenStores, int32_t readBufferSize, int32_t termsIndexDivisor) {
3504 SyncLock syncLock(this);
3505 IndexWriterPtr indexWriter(_indexWriter);
3506 if (indexWriter->poolReaders) {
3507 readBufferSize = BufferedIndexInput::BUFFER_SIZE;
3508 }
3509
3510 SegmentReaderPtr sr(readerMap.get(info));
3511 if (!sr) {
3512 // Returns a ref, which we xfer to readerMap
3513 sr = SegmentReader::get(false, info->dir, info, readBufferSize, doOpenStores, termsIndexDivisor);
3514 if (info->dir == indexWriter->directory) {
3515 // Only pool if reader is not external
3516 readerMap.put(info, sr);
3517 }
3518 } else {
3519 if (doOpenStores) {
3520 sr->openDocStores();
3521 }
3522 if (termsIndexDivisor != -1 && !sr->termsIndexLoaded()) {
3523 // If this reader was originally opened because we needed to merge it, we didn't load the terms
3524 // index. But now, if the caller wants the terms index (eg because it's doing deletes, or an NRT
3525 // reader is being opened) we ask the reader to load its terms index.
3526 sr->loadTermsIndex(termsIndexDivisor);
3527 }
3528 }
3529
3530 // Return a ref to our caller
3531 if (info->dir == indexWriter->directory) {
3532 // Only incRef if we pooled (reader is not external)
3533 sr->incRef();
3534 }
3535 return sr;
3536 }
3537
getIfExists(const SegmentInfoPtr & info)3538 SegmentReaderPtr ReaderPool::getIfExists(const SegmentInfoPtr& info) {
3539 SyncLock syncLock(this);
3540 SegmentReaderPtr sr(readerMap.get(info));
3541 if (sr) {
3542 sr->incRef();
3543 }
3544 return sr;
3545 }
3546
~IndexReaderWarmer()3547 IndexReaderWarmer::~IndexReaderWarmer() {
3548 }
3549
3550 }
3551