1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "SegmentMerger.h"
9 #include "MergePolicy.h"
10 #include "IndexWriter.h"
11 #include "IndexOutput.h"
12 #include "FieldInfos.h"
13 #include "FieldInfo.h"
14 #include "FieldsReader.h"
15 #include "FieldsWriter.h"
16 #include "IndexFileNames.h"
17 #include "CompoundFileWriter.h"
18 #include "SegmentReader.h"
19 #include "_SegmentReader.h"
20 #include "Directory.h"
21 #include "TermPositions.h"
22 #include "TermVectorsReader.h"
23 #include "TermVectorsWriter.h"
24 #include "FormatPostingsDocsConsumer.h"
25 #include "FormatPostingsFieldsWriter.h"
26 #include "FormatPostingsPositionsConsumer.h"
27 #include "FormatPostingsTermsConsumer.h"
28 #include "SegmentMergeInfo.h"
29 #include "SegmentMergeQueue.h"
30 #include "SegmentWriteState.h"
31 #include "TestPoint.h"
32 #include "MiscUtils.h"
33 #include "StringUtils.h"
34 
35 namespace Lucene {
36 
37 /// Maximum number of contiguous documents to bulk-copy when merging stored fields
38 const int32_t SegmentMerger::MAX_RAW_MERGE_DOCS = 4192;
39 
40 /// norms header placeholder
41 const uint8_t SegmentMerger::NORMS_HEADER[] = {'N', 'R', 'M', static_cast<uint8_t>(-1) };
42 const int32_t SegmentMerger::NORMS_HEADER_LENGTH = 4;
43 
SegmentMerger(const DirectoryPtr & dir,const String & name)44 SegmentMerger::SegmentMerger(const DirectoryPtr& dir, const String& name) {
45     readers = Collection<IndexReaderPtr>::newInstance();
46     termIndexInterval = IndexWriter::DEFAULT_TERM_INDEX_INTERVAL;
47     mergedDocs = 0;
48     mergeDocStores = false;
49     omitTermFreqAndPositions = false;
50 
51     directory = dir;
52     segment = name;
53     checkAbort = newLucene<CheckAbortNull>();
54 }
55 
SegmentMerger(const IndexWriterPtr & writer,const String & name,const OneMergePtr & merge)56 SegmentMerger::SegmentMerger(const IndexWriterPtr& writer, const String& name, const OneMergePtr& merge) {
57     readers = Collection<IndexReaderPtr>::newInstance();
58     mergedDocs = 0;
59     mergeDocStores = false;
60     omitTermFreqAndPositions = false;
61 
62     directory = writer->getDirectory();
63     segment = name;
64 
65     if (merge) {
66         checkAbort = newLucene<CheckAbort>(merge, directory);
67     } else {
68         checkAbort = newLucene<CheckAbortNull>();
69     }
70     termIndexInterval = writer->getTermIndexInterval();
71 }
72 
~SegmentMerger()73 SegmentMerger::~SegmentMerger() {
74 }
75 
hasProx()76 bool SegmentMerger::hasProx() {
77     return fieldInfos->hasProx();
78 }
79 
add(const IndexReaderPtr & reader)80 void SegmentMerger::add(const IndexReaderPtr& reader) {
81     readers.add(reader);
82 }
83 
segmentReader(int32_t i)84 IndexReaderPtr SegmentMerger::segmentReader(int32_t i) {
85     return readers[i];
86 }
87 
merge()88 int32_t SegmentMerger::merge() {
89     return merge(true);
90 }
91 
merge(bool mergeDocStores)92 int32_t SegmentMerger::merge(bool mergeDocStores) {
93     this->mergeDocStores = mergeDocStores;
94 
95     // NOTE: it's important to add calls to checkAbort.work(...) if you make any changes to this method that will spend a lot of time.
96     // The frequency of this check impacts how long IndexWriter.close(false) takes to actually stop the threads.
97 
98     mergedDocs = mergeFields();
99     mergeTerms();
100     mergeNorms();
101 
102     if (mergeDocStores && fieldInfos->hasVectors()) {
103         mergeVectors();
104     }
105 
106     return mergedDocs;
107 }
108 
closeReaders()109 void SegmentMerger::closeReaders() {
110     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
111         (*reader)->close();
112     }
113 }
114 
getMergedFiles()115 HashSet<String> SegmentMerger::getMergedFiles() {
116     HashSet<String> fileSet(HashSet<String>::newInstance());
117 
118     // Basic files
119     for (HashSet<String>::iterator ext = IndexFileNames::COMPOUND_EXTENSIONS().begin(); ext != IndexFileNames::COMPOUND_EXTENSIONS().end(); ++ext) {
120         if (*ext == IndexFileNames::PROX_EXTENSION() && !hasProx()) {
121             continue;
122         }
123 
124         if (mergeDocStores || (*ext != IndexFileNames::FIELDS_EXTENSION() && *ext != IndexFileNames::FIELDS_INDEX_EXTENSION())) {
125             fileSet.add(segment + L"." + *ext);
126         }
127     }
128 
129     // Fieldable norm files
130     for (int32_t i = 0; i < fieldInfos->size(); ++i) {
131         FieldInfoPtr fi(fieldInfos->fieldInfo(i));
132         if (fi->isIndexed && !fi->omitNorms) {
133             fileSet.add(segment + L"." + IndexFileNames::NORMS_EXTENSION());
134             break;
135         }
136     }
137 
138     // Vector files
139     if (fieldInfos->hasVectors() && mergeDocStores) {
140         for (HashSet<String>::iterator ext = IndexFileNames::VECTOR_EXTENSIONS().begin(); ext != IndexFileNames::VECTOR_EXTENSIONS().end(); ++ext) {
141             fileSet.add(segment + L"." + *ext);
142         }
143     }
144 
145     return fileSet;
146 }
147 
createCompoundFile(const String & fileName)148 HashSet<String> SegmentMerger::createCompoundFile(const String& fileName) {
149     HashSet<String> files(getMergedFiles());
150     CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, fileName, checkAbort));
151 
152     // Now merge all added files
153     for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
154         cfsWriter->addFile(*file);
155     }
156 
157     // Perform the merge
158     cfsWriter->close();
159 
160     return files;
161 }
162 
addIndexed(const IndexReaderPtr & reader,const FieldInfosPtr & fInfos,HashSet<String> names,bool storeTermVectors,bool storePositionWithTermVector,bool storeOffsetWithTermVector,bool storePayloads,bool omitTFAndPositions)163 void SegmentMerger::addIndexed(const IndexReaderPtr& reader, const FieldInfosPtr& fInfos, HashSet<String> names,
164                                bool storeTermVectors, bool storePositionWithTermVector,
165                                bool storeOffsetWithTermVector, bool storePayloads, bool omitTFAndPositions) {
166     for (HashSet<String>::iterator field = names.begin(); field != names.end(); ++field) {
167         fInfos->add(*field, true, storeTermVectors, storePositionWithTermVector, storeOffsetWithTermVector,
168                     !reader->hasNorms(*field), storePayloads, omitTFAndPositions);
169     }
170 }
171 
setMatchingSegmentReaders()172 void SegmentMerger::setMatchingSegmentReaders() {
173     // If the i'th reader is a SegmentReader and has identical fieldName -> number mapping, then
174     // this array will be non-null at position i
175     int32_t numReaders = readers.size();
176     matchingSegmentReaders = Collection<SegmentReaderPtr>::newInstance(numReaders);
177 
178     // If this reader is a SegmentReader, and all of its field name -> number mappings match the
179     // "merged" FieldInfos, then we can do a bulk copy of the stored fields
180     for (int32_t i = 0; i < numReaders; ++i) {
181         IndexReaderPtr reader(readers[i]);
182         SegmentReaderPtr segmentReader(boost::dynamic_pointer_cast<SegmentReader>(reader));
183         if (segmentReader) {
184             bool same = true;
185             FieldInfosPtr segmentFieldInfos(segmentReader->fieldInfos());
186             int32_t numFieldInfos = segmentFieldInfos->size();
187             for (int32_t j = 0; same && j < numFieldInfos; ++j) {
188                 same = (fieldInfos->fieldName(j) == segmentFieldInfos->fieldName(j));
189             }
190             if (same) {
191                 matchingSegmentReaders[i] = segmentReader;
192             }
193         }
194     }
195 
196     // Used for bulk-reading raw bytes for stored fields
197     rawDocLengths = Collection<int32_t>::newInstance(MAX_RAW_MERGE_DOCS);
198     rawDocLengths2 = Collection<int32_t>::newInstance(MAX_RAW_MERGE_DOCS);
199 }
200 
mergeFields()201 int32_t SegmentMerger::mergeFields() {
202     if (!mergeDocStores) {
203         // When we are not merging by doc stores, their field name -> number mapping are the same.
204         // So, we start with the fieldInfos of the last segment in this case, to keep that numbering
205         fieldInfos = boost::dynamic_pointer_cast<FieldInfos>(boost::dynamic_pointer_cast<SegmentReader>(readers[readers.size() - 1])->core->fieldInfos->clone());
206     } else {
207         fieldInfos = newLucene<FieldInfos>();    // merge field names
208     }
209 
210     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
211         SegmentReaderPtr segmentReader(boost::dynamic_pointer_cast<SegmentReader>(*reader));
212         if (segmentReader) {
213             FieldInfosPtr readerFieldInfos(segmentReader->fieldInfos());
214             int32_t numReaderFieldInfos = readerFieldInfos->size();
215             for (int32_t j = 0; j < numReaderFieldInfos; ++j) {
216                 FieldInfoPtr fi(readerFieldInfos->fieldInfo(j));
217                 fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector, fi->storePositionWithTermVector,
218                                 fi->storeOffsetWithTermVector, !(*reader)->hasNorms(fi->name), fi->storePayloads,
219                                 fi->omitTermFreqAndPositions);
220             }
221         } else {
222             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_POSITION_OFFSET), true, true, true, false, false);
223             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_POSITION), true, true, false, false, false);
224             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_OFFSET), true, false, true, false, false);
225             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR), true, false, false, false, false);
226             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_OMIT_TERM_FREQ_AND_POSITIONS), false, false, false, false, true);
227             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_STORES_PAYLOADS), false, false, false, true, false);
228             addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_INDEXED), false, false, false, false, false);
229             fieldInfos->add((*reader)->getFieldNames(IndexReader::FIELD_OPTION_UNINDEXED), false);
230         }
231     }
232     fieldInfos->write(directory, segment + L".fnm");
233 
234     int32_t docCount = 0;
235 
236     setMatchingSegmentReaders();
237 
238     if (mergeDocStores) {
239         // merge field values
240         FieldsWriterPtr fieldsWriter(newLucene<FieldsWriter>(directory, segment, fieldInfos));
241 
242         LuceneException finally;
243         try {
244             int32_t idx = 0;
245             for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
246                 SegmentReaderPtr matchingSegmentReader(matchingSegmentReaders[idx++]);
247                 FieldsReaderPtr matchingFieldsReader;
248                 if (matchingSegmentReader) {
249                     FieldsReaderPtr fieldsReader(matchingSegmentReader->getFieldsReader());
250                     if (fieldsReader && fieldsReader->canReadRawDocs()) {
251                         matchingFieldsReader = fieldsReader;
252                     }
253                 }
254                 if ((*reader)->hasDeletions()) {
255                     docCount += copyFieldsWithDeletions(fieldsWriter, *reader, matchingFieldsReader);
256                 } else {
257                     docCount += copyFieldsNoDeletions(fieldsWriter, *reader, matchingFieldsReader);
258                 }
259             }
260         } catch (LuceneException& e) {
261             finally = e;
262         }
263         fieldsWriter->close();
264         finally.throwException();
265 
266         String fileName(segment + L"." + IndexFileNames::FIELDS_INDEX_EXTENSION());
267         int64_t fdxFileLength = directory->fileLength(fileName);
268 
269         if (4 + ((int64_t)docCount) * 8 != fdxFileLength) {
270             boost::throw_exception(RuntimeException(L"mergeFields produced an invalid result: docCount is " +
271                                                     StringUtils::toString(docCount) + L" but fdx file size is " +
272                                                     StringUtils::toString(fdxFileLength) + L" file=" + fileName +
273                                                     L" file exists?=" + StringUtils::toString(directory->fileExists(fileName)) +
274                                                     L"; now aborting this merge to prevent index corruption"));
275         }
276     } else {
277         // If we are skipping the doc stores, that means there are no deletions in any of these segments,
278         // so we just sum numDocs() of each segment to get total docCount
279         for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
280             docCount += (*reader)->numDocs();
281         }
282     }
283 
284     return docCount;
285 }
286 
copyFieldsWithDeletions(const FieldsWriterPtr & fieldsWriter,const IndexReaderPtr & reader,const FieldsReaderPtr & matchingFieldsReader)287 int32_t SegmentMerger::copyFieldsWithDeletions(const FieldsWriterPtr& fieldsWriter, const IndexReaderPtr& reader, const FieldsReaderPtr& matchingFieldsReader) {
288     int32_t docCount = 0;
289     int32_t maxDoc = reader->maxDoc();
290     if (matchingFieldsReader) {
291         // We can bulk-copy because the fieldInfos are "congruent"
292         for (int32_t j = 0; j < maxDoc;) {
293             if (reader->isDeleted(j)) {
294                 // skip deleted docs
295                 ++j;
296                 continue;
297             }
298             // We can optimize this case (doing a bulk byte copy) since the field numbers are identical
299             int32_t start = j;
300             int32_t numDocs = 0;
301             do {
302                 ++j;
303                 ++numDocs;
304                 if (j >= maxDoc) {
305                     break;
306                 }
307                 if (reader->isDeleted(j)) {
308                     ++j;
309                     break;
310                 }
311             } while (numDocs < MAX_RAW_MERGE_DOCS);
312 
313             IndexInputPtr stream(matchingFieldsReader->rawDocs(rawDocLengths, start, numDocs));
314             fieldsWriter->addRawDocuments(stream, rawDocLengths, numDocs);
315             docCount += numDocs;
316             checkAbort->work(300 * numDocs);
317         }
318     } else {
319         for (int32_t j = 0; j < maxDoc; ++j) {
320             if (reader->isDeleted(j)) {
321                 // skip deleted docs
322                 continue;
323             }
324 
325             // NOTE: it's very important to first assign to doc then pass it to termVectorsWriter.addAllDocVectors
326             fieldsWriter->addDocument(reader->document(j));
327             ++docCount;
328             checkAbort->work(300);
329         }
330     }
331     return docCount;
332 }
333 
copyFieldsNoDeletions(const FieldsWriterPtr & fieldsWriter,const IndexReaderPtr & reader,const FieldsReaderPtr & matchingFieldsReader)334 int32_t SegmentMerger::copyFieldsNoDeletions(const FieldsWriterPtr& fieldsWriter, const IndexReaderPtr& reader, const FieldsReaderPtr& matchingFieldsReader) {
335     int32_t docCount = 0;
336     int32_t maxDoc = reader->maxDoc();
337     if (matchingFieldsReader) {
338         // We can bulk-copy because the fieldInfos are "congruent"
339         while (docCount < maxDoc) {
340             int32_t len = std::min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
341             IndexInputPtr stream(matchingFieldsReader->rawDocs(rawDocLengths, docCount, len));
342             fieldsWriter->addRawDocuments(stream, rawDocLengths, len);
343             docCount += len;
344             checkAbort->work(300 * len);
345         }
346     } else {
347         for (; docCount < maxDoc; ++docCount) {
348             // NOTE: it's very important to first assign to doc then pass it to termVectorsWriter.addAllDocVectors
349             fieldsWriter->addDocument(reader->document(docCount));
350             checkAbort->work(300);
351         }
352     }
353     return docCount;
354 }
355 
mergeVectors()356 void SegmentMerger::mergeVectors() {
357     TermVectorsWriterPtr termVectorsWriter(newLucene<TermVectorsWriter>(directory, segment, fieldInfos));
358 
359     LuceneException finally;
360     try {
361         int32_t idx = 0;
362         for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
363             SegmentReaderPtr matchingSegmentReader(matchingSegmentReaders[idx++]);
364             TermVectorsReaderPtr matchingVectorsReader;
365             if (matchingSegmentReader) {
366                 TermVectorsReaderPtr vectorsReader(matchingSegmentReader->getTermVectorsReaderOrig());
367 
368                 // If the TV* files are an older format then they cannot read raw docs
369                 if (vectorsReader && vectorsReader->canReadRawDocs()) {
370                     matchingVectorsReader = vectorsReader;
371                 }
372             }
373             if ((*reader)->hasDeletions()) {
374                 copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, *reader);
375             } else {
376                 copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, *reader);
377             }
378         }
379     } catch (LuceneException& e) {
380         finally = e;
381     }
382     termVectorsWriter->close();
383     finally.throwException();
384 
385     String fileName(segment + L"." + IndexFileNames::VECTORS_INDEX_EXTENSION());
386     int64_t tvxSize = directory->fileLength(fileName);
387 
388     if (4 + ((int64_t)mergedDocs) * 16 != tvxSize) {
389         boost::throw_exception(RuntimeException(L"mergeVectors produced an invalid result: mergedDocs is " +
390                                                 StringUtils::toString(mergedDocs) + L" but tvx size is " +
391                                                 StringUtils::toString(tvxSize) + L" file=" + fileName +
392                                                 L" file exists?=" + StringUtils::toString(directory->fileExists(fileName)) +
393                                                 L"; now aborting this merge to prevent index corruption"));
394     }
395 }
396 
copyVectorsWithDeletions(const TermVectorsWriterPtr & termVectorsWriter,const TermVectorsReaderPtr & matchingVectorsReader,const IndexReaderPtr & reader)397 void SegmentMerger::copyVectorsWithDeletions(const TermVectorsWriterPtr& termVectorsWriter, const TermVectorsReaderPtr& matchingVectorsReader, const IndexReaderPtr& reader) {
398     int32_t maxDoc = reader->maxDoc();
399     if (matchingVectorsReader) {
400         // We can bulk-copy because the fieldInfos are "congruent"
401         for (int32_t docNum = 0; docNum < maxDoc;) {
402             if (reader->isDeleted(docNum)) {
403                 // skip deleted docs
404                 ++docNum;
405                 continue;
406             }
407             // We can optimize this case (doing a bulk byte copy) since the field numbers are identical
408             int32_t start = docNum;
409             int32_t numDocs = 0;
410             do {
411                 ++docNum;
412                 ++numDocs;
413                 if (docNum >= maxDoc) {
414                     break;
415                 }
416                 if (reader->isDeleted(docNum)) {
417                     ++docNum;
418                     break;
419                 }
420             } while (numDocs < MAX_RAW_MERGE_DOCS);
421 
422             matchingVectorsReader->rawDocs(rawDocLengths, rawDocLengths2, start, numDocs);
423             termVectorsWriter->addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs);
424             checkAbort->work(300 * numDocs);
425         }
426     } else {
427         for (int32_t docNum = 0; docNum < maxDoc; ++docNum) {
428             if (reader->isDeleted(docNum)) {
429                 // skip deleted docs
430                 continue;
431             }
432 
433             // NOTE: it's very important to first assign to vectors then pass it to termVectorsWriter.addAllDocVectors
434             termVectorsWriter->addAllDocVectors(reader->getTermFreqVectors(docNum));
435             checkAbort->work(300);
436         }
437     }
438 }
439 
copyVectorsNoDeletions(const TermVectorsWriterPtr & termVectorsWriter,const TermVectorsReaderPtr & matchingVectorsReader,const IndexReaderPtr & reader)440 void SegmentMerger::copyVectorsNoDeletions(const TermVectorsWriterPtr& termVectorsWriter, const TermVectorsReaderPtr& matchingVectorsReader, const IndexReaderPtr& reader) {
441     int32_t maxDoc = reader->maxDoc();
442     if (matchingVectorsReader) {
443         // We can bulk-copy because the fieldInfos are "congruent"
444         int32_t docCount = 0;
445         while (docCount < maxDoc) {
446             int32_t len = std::min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
447             matchingVectorsReader->rawDocs(rawDocLengths, rawDocLengths2, docCount, len);
448             termVectorsWriter->addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, len);
449             docCount += len;
450             checkAbort->work(300 * len);
451         }
452     } else {
453         for (int32_t docNum = 0; docNum < maxDoc; ++docNum) {
454             // NOTE: it's very important to first assign to vectors then pass it to termVectorsWriter.addAllDocVectors
455             termVectorsWriter->addAllDocVectors(reader->getTermFreqVectors(docNum));
456             checkAbort->work(300);
457         }
458     }
459 }
460 
mergeTerms()461 void SegmentMerger::mergeTerms() {
462     TestScope testScope(L"SegmentMerger", L"mergeTerms");
463 
464     SegmentWriteStatePtr state(newLucene<SegmentWriteState>(DocumentsWriterPtr(), directory, segment, L"", mergedDocs, 0, termIndexInterval));
465 
466     FormatPostingsFieldsConsumerPtr consumer(newLucene<FormatPostingsFieldsWriter>(state, fieldInfos));
467 
468     LuceneException finally;
469     try {
470         queue = newLucene<SegmentMergeQueue>(readers.size());
471         mergeTermInfos(consumer);
472     } catch (LuceneException& e) {
473         finally = e;
474     }
475     consumer->finish();
476     if (queue) {
477         queue->close();
478     }
479     finally.throwException();
480 }
481 
mergeTermInfos(const FormatPostingsFieldsConsumerPtr & consumer)482 void SegmentMerger::mergeTermInfos(const FormatPostingsFieldsConsumerPtr& consumer) {
483     int32_t base = 0;
484     int32_t readerCount = readers.size();
485     for (int32_t i = 0; i < readerCount; ++i) {
486         IndexReaderPtr reader(readers[i]);
487         TermEnumPtr termEnum(reader->terms());
488         SegmentMergeInfoPtr smi(newLucene<SegmentMergeInfo>(base, termEnum, reader));
489         Collection<int32_t> docMap(smi->getDocMap());
490         if (docMap) {
491             if (!docMaps) {
492                 docMaps = Collection< Collection<int32_t> >::newInstance(readerCount);
493                 delCounts = Collection<int32_t>::newInstance(readerCount);
494             }
495             docMaps[i] = docMap;
496             IndexReaderPtr segmentMergeReader(smi->_reader);
497             delCounts[i] = segmentMergeReader->maxDoc() - segmentMergeReader->numDocs();
498         }
499 
500         base += reader->numDocs();
501 
502         BOOST_ASSERT(reader->numDocs() == reader->maxDoc() - smi->delCount);
503 
504         if (smi->next()) {
505             queue->add(smi);    // initialize queue
506         } else {
507             smi->close();
508         }
509     }
510 
511     Collection<SegmentMergeInfoPtr> match(Collection<SegmentMergeInfoPtr>::newInstance(readers.size()));
512 
513     String currentField;
514     FormatPostingsTermsConsumerPtr termsConsumer;
515 
516     while (!queue->empty()) {
517         int32_t matchSize = 0; // pop matching terms
518         match[matchSize++] = queue->pop();
519         TermPtr term(match[0]->term);
520         SegmentMergeInfoPtr top(queue->empty() ? SegmentMergeInfoPtr() : queue->top());
521 
522         while (top && term->compareTo(top->term) == 0) {
523             match[matchSize++] = queue->pop();
524             top = queue->top();
525         }
526 
527         if (currentField != term->_field) {
528             currentField = term->_field;
529             if (termsConsumer) {
530                 termsConsumer->finish();
531             }
532             FieldInfoPtr fieldInfo(fieldInfos->fieldInfo(currentField));
533             termsConsumer = consumer->addField(fieldInfo);
534             omitTermFreqAndPositions = fieldInfo->omitTermFreqAndPositions;
535         }
536 
537         int32_t df = appendPostings(termsConsumer, match, matchSize); // add new TermInfo
538 
539         checkAbort->work(df / 3.0);
540 
541         while (matchSize > 0) {
542             SegmentMergeInfoPtr smi(match[--matchSize]);
543             if (smi->next()) {
544                 queue->add(smi);    // restore queue
545             } else {
546                 smi->close();    // done with a segment
547             }
548         }
549     }
550 }
551 
getDocMaps()552 Collection< Collection<int32_t> > SegmentMerger::getDocMaps() {
553     return docMaps;
554 }
555 
getDelCounts()556 Collection<int32_t> SegmentMerger::getDelCounts() {
557     return delCounts;
558 }
559 
appendPostings(const FormatPostingsTermsConsumerPtr & termsConsumer,Collection<SegmentMergeInfoPtr> smis,int32_t n)560 int32_t SegmentMerger::appendPostings(const FormatPostingsTermsConsumerPtr& termsConsumer, Collection<SegmentMergeInfoPtr> smis, int32_t n) {
561     FormatPostingsDocsConsumerPtr docConsumer(termsConsumer->addTerm(smis[0]->term->_text));
562     int32_t df = 0;
563     for (int32_t i = 0; i < n; ++i) {
564         SegmentMergeInfoPtr smi(smis[i]);
565         TermPositionsPtr postings(smi->getPositions());
566         BOOST_ASSERT(postings);
567         int32_t base = smi->base;
568         Collection<int32_t> docMap(smi->getDocMap());
569         postings->seek(smi->termEnum);
570 
571         while (postings->next()) {
572             ++df;
573             int32_t doc = postings->doc();
574             if (docMap) {
575                 doc = docMap[doc];    // map around deletions
576             }
577             doc += base; // convert to merged space
578 
579             int32_t freq = postings->freq();
580             FormatPostingsPositionsConsumerPtr posConsumer(docConsumer->addDoc(doc, freq));
581 
582             if (!omitTermFreqAndPositions) {
583                 for (int32_t j = 0; j < freq; ++j) {
584                     int32_t position = postings->nextPosition();
585                     int32_t payloadLength = postings->getPayloadLength();
586                     if (payloadLength > 0) {
587                         if (!payloadBuffer) {
588                             payloadBuffer = ByteArray::newInstance(payloadLength);
589                         }
590                         if (payloadBuffer.size() < payloadLength) {
591                             payloadBuffer.resize(payloadLength);
592                         }
593                         postings->getPayload(payloadBuffer, 0);
594                     }
595                     posConsumer->addPosition(position, payloadBuffer, 0, payloadLength);
596                 }
597                 posConsumer->finish();
598             }
599         }
600     }
601     docConsumer->finish();
602 
603     return df;
604 }
605 
mergeNorms()606 void SegmentMerger::mergeNorms() {
607     ByteArray normBuffer;
608     IndexOutputPtr output;
609     LuceneException finally;
610     try {
611         int32_t numFieldInfos = fieldInfos->size();
612         for (int32_t i = 0; i < numFieldInfos; ++i) {
613             FieldInfoPtr fi(fieldInfos->fieldInfo(i));
614             if (fi->isIndexed && !fi->omitNorms) {
615                 if (!output) {
616                     output = directory->createOutput(segment + L"." + IndexFileNames::NORMS_EXTENSION());
617                     output->writeBytes(NORMS_HEADER, SIZEOF_ARRAY(NORMS_HEADER));
618                 }
619                 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
620                     int32_t maxDoc = (*reader)->maxDoc();
621 
622                     if (!normBuffer) {
623                         normBuffer = ByteArray::newInstance(maxDoc);
624                     }
625                     if (normBuffer.size() < maxDoc) { // the buffer is too small for the current segment
626                         normBuffer.resize(maxDoc);
627                     }
628                     MiscUtils::arrayFill(normBuffer.get(), 0, normBuffer.size(), 0);
629                     (*reader)->norms(fi->name, normBuffer, 0);
630                     if (!(*reader)->hasDeletions()) {
631                         // optimized case for segments without deleted docs
632                         output->writeBytes(normBuffer.get(), maxDoc);
633                     } else {
634                         // this segment has deleted docs, so we have to check for every doc if it is deleted or not
635                         for (int32_t k = 0; k < maxDoc; ++k) {
636                             if (!(*reader)->isDeleted(k)) {
637                                 output->writeByte(normBuffer[k]);
638                             }
639                         }
640                     }
641                     checkAbort->work(maxDoc);
642                 }
643             }
644         }
645     } catch (LuceneException& e) {
646         finally = e;
647     }
648     if (output) {
649         output->close();
650     }
651     finally.throwException();
652 }
653 
CheckAbort(const OneMergePtr & merge,const DirectoryPtr & dir)654 CheckAbort::CheckAbort(const OneMergePtr& merge, const DirectoryPtr& dir) {
655     workCount = 0;
656     this->merge = merge;
657     this->_dir = dir;
658 }
659 
~CheckAbort()660 CheckAbort::~CheckAbort() {
661 }
662 
work(double units)663 void CheckAbort::work(double units) {
664     workCount += units;
665     if (workCount >= 10000.0) {
666         merge->checkAborted(DirectoryPtr(_dir));
667         workCount = 0;
668     }
669 }
670 
CheckAbortNull()671 CheckAbortNull::CheckAbortNull() : CheckAbort(OneMergePtr(), DirectoryPtr()) {
672 }
673 
~CheckAbortNull()674 CheckAbortNull::~CheckAbortNull() {
675 }
676 
work(double units)677 void CheckAbortNull::work(double units) {
678     // do nothing
679 }
680 
681 }
682