1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "SegmentMerger.h"
9 #include "MergePolicy.h"
10 #include "IndexWriter.h"
11 #include "IndexOutput.h"
12 #include "FieldInfos.h"
13 #include "FieldInfo.h"
14 #include "FieldsReader.h"
15 #include "FieldsWriter.h"
16 #include "IndexFileNames.h"
17 #include "CompoundFileWriter.h"
18 #include "SegmentReader.h"
19 #include "_SegmentReader.h"
20 #include "Directory.h"
21 #include "TermPositions.h"
22 #include "TermVectorsReader.h"
23 #include "TermVectorsWriter.h"
24 #include "FormatPostingsDocsConsumer.h"
25 #include "FormatPostingsFieldsWriter.h"
26 #include "FormatPostingsPositionsConsumer.h"
27 #include "FormatPostingsTermsConsumer.h"
28 #include "SegmentMergeInfo.h"
29 #include "SegmentMergeQueue.h"
30 #include "SegmentWriteState.h"
31 #include "TestPoint.h"
32 #include "MiscUtils.h"
33 #include "StringUtils.h"
34
35 namespace Lucene {
36
37 /// Maximum number of contiguous documents to bulk-copy when merging stored fields
38 const int32_t SegmentMerger::MAX_RAW_MERGE_DOCS = 4192;
39
40 /// norms header placeholder
41 const uint8_t SegmentMerger::NORMS_HEADER[] = {'N', 'R', 'M', static_cast<uint8_t>(-1) };
42 const int32_t SegmentMerger::NORMS_HEADER_LENGTH = 4;
43
SegmentMerger(const DirectoryPtr & dir,const String & name)44 SegmentMerger::SegmentMerger(const DirectoryPtr& dir, const String& name) {
45 readers = Collection<IndexReaderPtr>::newInstance();
46 termIndexInterval = IndexWriter::DEFAULT_TERM_INDEX_INTERVAL;
47 mergedDocs = 0;
48 mergeDocStores = false;
49 omitTermFreqAndPositions = false;
50
51 directory = dir;
52 segment = name;
53 checkAbort = newLucene<CheckAbortNull>();
54 }
55
SegmentMerger(const IndexWriterPtr & writer,const String & name,const OneMergePtr & merge)56 SegmentMerger::SegmentMerger(const IndexWriterPtr& writer, const String& name, const OneMergePtr& merge) {
57 readers = Collection<IndexReaderPtr>::newInstance();
58 mergedDocs = 0;
59 mergeDocStores = false;
60 omitTermFreqAndPositions = false;
61
62 directory = writer->getDirectory();
63 segment = name;
64
65 if (merge) {
66 checkAbort = newLucene<CheckAbort>(merge, directory);
67 } else {
68 checkAbort = newLucene<CheckAbortNull>();
69 }
70 termIndexInterval = writer->getTermIndexInterval();
71 }
72
~SegmentMerger()73 SegmentMerger::~SegmentMerger() {
74 }
75
hasProx()76 bool SegmentMerger::hasProx() {
77 return fieldInfos->hasProx();
78 }
79
add(const IndexReaderPtr & reader)80 void SegmentMerger::add(const IndexReaderPtr& reader) {
81 readers.add(reader);
82 }
83
segmentReader(int32_t i)84 IndexReaderPtr SegmentMerger::segmentReader(int32_t i) {
85 return readers[i];
86 }
87
merge()88 int32_t SegmentMerger::merge() {
89 return merge(true);
90 }
91
merge(bool mergeDocStores)92 int32_t SegmentMerger::merge(bool mergeDocStores) {
93 this->mergeDocStores = mergeDocStores;
94
95 // NOTE: it's important to add calls to checkAbort.work(...) if you make any changes to this method that will spend a lot of time.
96 // The frequency of this check impacts how long IndexWriter.close(false) takes to actually stop the threads.
97
98 mergedDocs = mergeFields();
99 mergeTerms();
100 mergeNorms();
101
102 if (mergeDocStores && fieldInfos->hasVectors()) {
103 mergeVectors();
104 }
105
106 return mergedDocs;
107 }
108
closeReaders()109 void SegmentMerger::closeReaders() {
110 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
111 (*reader)->close();
112 }
113 }
114
getMergedFiles()115 HashSet<String> SegmentMerger::getMergedFiles() {
116 HashSet<String> fileSet(HashSet<String>::newInstance());
117
118 // Basic files
119 for (HashSet<String>::iterator ext = IndexFileNames::COMPOUND_EXTENSIONS().begin(); ext != IndexFileNames::COMPOUND_EXTENSIONS().end(); ++ext) {
120 if (*ext == IndexFileNames::PROX_EXTENSION() && !hasProx()) {
121 continue;
122 }
123
124 if (mergeDocStores || (*ext != IndexFileNames::FIELDS_EXTENSION() && *ext != IndexFileNames::FIELDS_INDEX_EXTENSION())) {
125 fileSet.add(segment + L"." + *ext);
126 }
127 }
128
129 // Fieldable norm files
130 for (int32_t i = 0; i < fieldInfos->size(); ++i) {
131 FieldInfoPtr fi(fieldInfos->fieldInfo(i));
132 if (fi->isIndexed && !fi->omitNorms) {
133 fileSet.add(segment + L"." + IndexFileNames::NORMS_EXTENSION());
134 break;
135 }
136 }
137
138 // Vector files
139 if (fieldInfos->hasVectors() && mergeDocStores) {
140 for (HashSet<String>::iterator ext = IndexFileNames::VECTOR_EXTENSIONS().begin(); ext != IndexFileNames::VECTOR_EXTENSIONS().end(); ++ext) {
141 fileSet.add(segment + L"." + *ext);
142 }
143 }
144
145 return fileSet;
146 }
147
createCompoundFile(const String & fileName)148 HashSet<String> SegmentMerger::createCompoundFile(const String& fileName) {
149 HashSet<String> files(getMergedFiles());
150 CompoundFileWriterPtr cfsWriter(newLucene<CompoundFileWriter>(directory, fileName, checkAbort));
151
152 // Now merge all added files
153 for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
154 cfsWriter->addFile(*file);
155 }
156
157 // Perform the merge
158 cfsWriter->close();
159
160 return files;
161 }
162
addIndexed(const IndexReaderPtr & reader,const FieldInfosPtr & fInfos,HashSet<String> names,bool storeTermVectors,bool storePositionWithTermVector,bool storeOffsetWithTermVector,bool storePayloads,bool omitTFAndPositions)163 void SegmentMerger::addIndexed(const IndexReaderPtr& reader, const FieldInfosPtr& fInfos, HashSet<String> names,
164 bool storeTermVectors, bool storePositionWithTermVector,
165 bool storeOffsetWithTermVector, bool storePayloads, bool omitTFAndPositions) {
166 for (HashSet<String>::iterator field = names.begin(); field != names.end(); ++field) {
167 fInfos->add(*field, true, storeTermVectors, storePositionWithTermVector, storeOffsetWithTermVector,
168 !reader->hasNorms(*field), storePayloads, omitTFAndPositions);
169 }
170 }
171
setMatchingSegmentReaders()172 void SegmentMerger::setMatchingSegmentReaders() {
173 // If the i'th reader is a SegmentReader and has identical fieldName -> number mapping, then
174 // this array will be non-null at position i
175 int32_t numReaders = readers.size();
176 matchingSegmentReaders = Collection<SegmentReaderPtr>::newInstance(numReaders);
177
178 // If this reader is a SegmentReader, and all of its field name -> number mappings match the
179 // "merged" FieldInfos, then we can do a bulk copy of the stored fields
180 for (int32_t i = 0; i < numReaders; ++i) {
181 IndexReaderPtr reader(readers[i]);
182 SegmentReaderPtr segmentReader(boost::dynamic_pointer_cast<SegmentReader>(reader));
183 if (segmentReader) {
184 bool same = true;
185 FieldInfosPtr segmentFieldInfos(segmentReader->fieldInfos());
186 int32_t numFieldInfos = segmentFieldInfos->size();
187 for (int32_t j = 0; same && j < numFieldInfos; ++j) {
188 same = (fieldInfos->fieldName(j) == segmentFieldInfos->fieldName(j));
189 }
190 if (same) {
191 matchingSegmentReaders[i] = segmentReader;
192 }
193 }
194 }
195
196 // Used for bulk-reading raw bytes for stored fields
197 rawDocLengths = Collection<int32_t>::newInstance(MAX_RAW_MERGE_DOCS);
198 rawDocLengths2 = Collection<int32_t>::newInstance(MAX_RAW_MERGE_DOCS);
199 }
200
mergeFields()201 int32_t SegmentMerger::mergeFields() {
202 if (!mergeDocStores) {
203 // When we are not merging by doc stores, their field name -> number mapping are the same.
204 // So, we start with the fieldInfos of the last segment in this case, to keep that numbering
205 fieldInfos = boost::dynamic_pointer_cast<FieldInfos>(boost::dynamic_pointer_cast<SegmentReader>(readers[readers.size() - 1])->core->fieldInfos->clone());
206 } else {
207 fieldInfos = newLucene<FieldInfos>(); // merge field names
208 }
209
210 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
211 SegmentReaderPtr segmentReader(boost::dynamic_pointer_cast<SegmentReader>(*reader));
212 if (segmentReader) {
213 FieldInfosPtr readerFieldInfos(segmentReader->fieldInfos());
214 int32_t numReaderFieldInfos = readerFieldInfos->size();
215 for (int32_t j = 0; j < numReaderFieldInfos; ++j) {
216 FieldInfoPtr fi(readerFieldInfos->fieldInfo(j));
217 fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector, fi->storePositionWithTermVector,
218 fi->storeOffsetWithTermVector, !(*reader)->hasNorms(fi->name), fi->storePayloads,
219 fi->omitTermFreqAndPositions);
220 }
221 } else {
222 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_POSITION_OFFSET), true, true, true, false, false);
223 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_POSITION), true, true, false, false, false);
224 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR_WITH_OFFSET), true, false, true, false, false);
225 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_TERMVECTOR), true, false, false, false, false);
226 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_OMIT_TERM_FREQ_AND_POSITIONS), false, false, false, false, true);
227 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_STORES_PAYLOADS), false, false, false, true, false);
228 addIndexed(*reader, fieldInfos, (*reader)->getFieldNames(IndexReader::FIELD_OPTION_INDEXED), false, false, false, false, false);
229 fieldInfos->add((*reader)->getFieldNames(IndexReader::FIELD_OPTION_UNINDEXED), false);
230 }
231 }
232 fieldInfos->write(directory, segment + L".fnm");
233
234 int32_t docCount = 0;
235
236 setMatchingSegmentReaders();
237
238 if (mergeDocStores) {
239 // merge field values
240 FieldsWriterPtr fieldsWriter(newLucene<FieldsWriter>(directory, segment, fieldInfos));
241
242 LuceneException finally;
243 try {
244 int32_t idx = 0;
245 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
246 SegmentReaderPtr matchingSegmentReader(matchingSegmentReaders[idx++]);
247 FieldsReaderPtr matchingFieldsReader;
248 if (matchingSegmentReader) {
249 FieldsReaderPtr fieldsReader(matchingSegmentReader->getFieldsReader());
250 if (fieldsReader && fieldsReader->canReadRawDocs()) {
251 matchingFieldsReader = fieldsReader;
252 }
253 }
254 if ((*reader)->hasDeletions()) {
255 docCount += copyFieldsWithDeletions(fieldsWriter, *reader, matchingFieldsReader);
256 } else {
257 docCount += copyFieldsNoDeletions(fieldsWriter, *reader, matchingFieldsReader);
258 }
259 }
260 } catch (LuceneException& e) {
261 finally = e;
262 }
263 fieldsWriter->close();
264 finally.throwException();
265
266 String fileName(segment + L"." + IndexFileNames::FIELDS_INDEX_EXTENSION());
267 int64_t fdxFileLength = directory->fileLength(fileName);
268
269 if (4 + ((int64_t)docCount) * 8 != fdxFileLength) {
270 boost::throw_exception(RuntimeException(L"mergeFields produced an invalid result: docCount is " +
271 StringUtils::toString(docCount) + L" but fdx file size is " +
272 StringUtils::toString(fdxFileLength) + L" file=" + fileName +
273 L" file exists?=" + StringUtils::toString(directory->fileExists(fileName)) +
274 L"; now aborting this merge to prevent index corruption"));
275 }
276 } else {
277 // If we are skipping the doc stores, that means there are no deletions in any of these segments,
278 // so we just sum numDocs() of each segment to get total docCount
279 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
280 docCount += (*reader)->numDocs();
281 }
282 }
283
284 return docCount;
285 }
286
copyFieldsWithDeletions(const FieldsWriterPtr & fieldsWriter,const IndexReaderPtr & reader,const FieldsReaderPtr & matchingFieldsReader)287 int32_t SegmentMerger::copyFieldsWithDeletions(const FieldsWriterPtr& fieldsWriter, const IndexReaderPtr& reader, const FieldsReaderPtr& matchingFieldsReader) {
288 int32_t docCount = 0;
289 int32_t maxDoc = reader->maxDoc();
290 if (matchingFieldsReader) {
291 // We can bulk-copy because the fieldInfos are "congruent"
292 for (int32_t j = 0; j < maxDoc;) {
293 if (reader->isDeleted(j)) {
294 // skip deleted docs
295 ++j;
296 continue;
297 }
298 // We can optimize this case (doing a bulk byte copy) since the field numbers are identical
299 int32_t start = j;
300 int32_t numDocs = 0;
301 do {
302 ++j;
303 ++numDocs;
304 if (j >= maxDoc) {
305 break;
306 }
307 if (reader->isDeleted(j)) {
308 ++j;
309 break;
310 }
311 } while (numDocs < MAX_RAW_MERGE_DOCS);
312
313 IndexInputPtr stream(matchingFieldsReader->rawDocs(rawDocLengths, start, numDocs));
314 fieldsWriter->addRawDocuments(stream, rawDocLengths, numDocs);
315 docCount += numDocs;
316 checkAbort->work(300 * numDocs);
317 }
318 } else {
319 for (int32_t j = 0; j < maxDoc; ++j) {
320 if (reader->isDeleted(j)) {
321 // skip deleted docs
322 continue;
323 }
324
325 // NOTE: it's very important to first assign to doc then pass it to termVectorsWriter.addAllDocVectors
326 fieldsWriter->addDocument(reader->document(j));
327 ++docCount;
328 checkAbort->work(300);
329 }
330 }
331 return docCount;
332 }
333
copyFieldsNoDeletions(const FieldsWriterPtr & fieldsWriter,const IndexReaderPtr & reader,const FieldsReaderPtr & matchingFieldsReader)334 int32_t SegmentMerger::copyFieldsNoDeletions(const FieldsWriterPtr& fieldsWriter, const IndexReaderPtr& reader, const FieldsReaderPtr& matchingFieldsReader) {
335 int32_t docCount = 0;
336 int32_t maxDoc = reader->maxDoc();
337 if (matchingFieldsReader) {
338 // We can bulk-copy because the fieldInfos are "congruent"
339 while (docCount < maxDoc) {
340 int32_t len = std::min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
341 IndexInputPtr stream(matchingFieldsReader->rawDocs(rawDocLengths, docCount, len));
342 fieldsWriter->addRawDocuments(stream, rawDocLengths, len);
343 docCount += len;
344 checkAbort->work(300 * len);
345 }
346 } else {
347 for (; docCount < maxDoc; ++docCount) {
348 // NOTE: it's very important to first assign to doc then pass it to termVectorsWriter.addAllDocVectors
349 fieldsWriter->addDocument(reader->document(docCount));
350 checkAbort->work(300);
351 }
352 }
353 return docCount;
354 }
355
mergeVectors()356 void SegmentMerger::mergeVectors() {
357 TermVectorsWriterPtr termVectorsWriter(newLucene<TermVectorsWriter>(directory, segment, fieldInfos));
358
359 LuceneException finally;
360 try {
361 int32_t idx = 0;
362 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
363 SegmentReaderPtr matchingSegmentReader(matchingSegmentReaders[idx++]);
364 TermVectorsReaderPtr matchingVectorsReader;
365 if (matchingSegmentReader) {
366 TermVectorsReaderPtr vectorsReader(matchingSegmentReader->getTermVectorsReaderOrig());
367
368 // If the TV* files are an older format then they cannot read raw docs
369 if (vectorsReader && vectorsReader->canReadRawDocs()) {
370 matchingVectorsReader = vectorsReader;
371 }
372 }
373 if ((*reader)->hasDeletions()) {
374 copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, *reader);
375 } else {
376 copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, *reader);
377 }
378 }
379 } catch (LuceneException& e) {
380 finally = e;
381 }
382 termVectorsWriter->close();
383 finally.throwException();
384
385 String fileName(segment + L"." + IndexFileNames::VECTORS_INDEX_EXTENSION());
386 int64_t tvxSize = directory->fileLength(fileName);
387
388 if (4 + ((int64_t)mergedDocs) * 16 != tvxSize) {
389 boost::throw_exception(RuntimeException(L"mergeVectors produced an invalid result: mergedDocs is " +
390 StringUtils::toString(mergedDocs) + L" but tvx size is " +
391 StringUtils::toString(tvxSize) + L" file=" + fileName +
392 L" file exists?=" + StringUtils::toString(directory->fileExists(fileName)) +
393 L"; now aborting this merge to prevent index corruption"));
394 }
395 }
396
copyVectorsWithDeletions(const TermVectorsWriterPtr & termVectorsWriter,const TermVectorsReaderPtr & matchingVectorsReader,const IndexReaderPtr & reader)397 void SegmentMerger::copyVectorsWithDeletions(const TermVectorsWriterPtr& termVectorsWriter, const TermVectorsReaderPtr& matchingVectorsReader, const IndexReaderPtr& reader) {
398 int32_t maxDoc = reader->maxDoc();
399 if (matchingVectorsReader) {
400 // We can bulk-copy because the fieldInfos are "congruent"
401 for (int32_t docNum = 0; docNum < maxDoc;) {
402 if (reader->isDeleted(docNum)) {
403 // skip deleted docs
404 ++docNum;
405 continue;
406 }
407 // We can optimize this case (doing a bulk byte copy) since the field numbers are identical
408 int32_t start = docNum;
409 int32_t numDocs = 0;
410 do {
411 ++docNum;
412 ++numDocs;
413 if (docNum >= maxDoc) {
414 break;
415 }
416 if (reader->isDeleted(docNum)) {
417 ++docNum;
418 break;
419 }
420 } while (numDocs < MAX_RAW_MERGE_DOCS);
421
422 matchingVectorsReader->rawDocs(rawDocLengths, rawDocLengths2, start, numDocs);
423 termVectorsWriter->addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs);
424 checkAbort->work(300 * numDocs);
425 }
426 } else {
427 for (int32_t docNum = 0; docNum < maxDoc; ++docNum) {
428 if (reader->isDeleted(docNum)) {
429 // skip deleted docs
430 continue;
431 }
432
433 // NOTE: it's very important to first assign to vectors then pass it to termVectorsWriter.addAllDocVectors
434 termVectorsWriter->addAllDocVectors(reader->getTermFreqVectors(docNum));
435 checkAbort->work(300);
436 }
437 }
438 }
439
copyVectorsNoDeletions(const TermVectorsWriterPtr & termVectorsWriter,const TermVectorsReaderPtr & matchingVectorsReader,const IndexReaderPtr & reader)440 void SegmentMerger::copyVectorsNoDeletions(const TermVectorsWriterPtr& termVectorsWriter, const TermVectorsReaderPtr& matchingVectorsReader, const IndexReaderPtr& reader) {
441 int32_t maxDoc = reader->maxDoc();
442 if (matchingVectorsReader) {
443 // We can bulk-copy because the fieldInfos are "congruent"
444 int32_t docCount = 0;
445 while (docCount < maxDoc) {
446 int32_t len = std::min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
447 matchingVectorsReader->rawDocs(rawDocLengths, rawDocLengths2, docCount, len);
448 termVectorsWriter->addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, len);
449 docCount += len;
450 checkAbort->work(300 * len);
451 }
452 } else {
453 for (int32_t docNum = 0; docNum < maxDoc; ++docNum) {
454 // NOTE: it's very important to first assign to vectors then pass it to termVectorsWriter.addAllDocVectors
455 termVectorsWriter->addAllDocVectors(reader->getTermFreqVectors(docNum));
456 checkAbort->work(300);
457 }
458 }
459 }
460
mergeTerms()461 void SegmentMerger::mergeTerms() {
462 TestScope testScope(L"SegmentMerger", L"mergeTerms");
463
464 SegmentWriteStatePtr state(newLucene<SegmentWriteState>(DocumentsWriterPtr(), directory, segment, L"", mergedDocs, 0, termIndexInterval));
465
466 FormatPostingsFieldsConsumerPtr consumer(newLucene<FormatPostingsFieldsWriter>(state, fieldInfos));
467
468 LuceneException finally;
469 try {
470 queue = newLucene<SegmentMergeQueue>(readers.size());
471 mergeTermInfos(consumer);
472 } catch (LuceneException& e) {
473 finally = e;
474 }
475 consumer->finish();
476 if (queue) {
477 queue->close();
478 }
479 finally.throwException();
480 }
481
mergeTermInfos(const FormatPostingsFieldsConsumerPtr & consumer)482 void SegmentMerger::mergeTermInfos(const FormatPostingsFieldsConsumerPtr& consumer) {
483 int32_t base = 0;
484 int32_t readerCount = readers.size();
485 for (int32_t i = 0; i < readerCount; ++i) {
486 IndexReaderPtr reader(readers[i]);
487 TermEnumPtr termEnum(reader->terms());
488 SegmentMergeInfoPtr smi(newLucene<SegmentMergeInfo>(base, termEnum, reader));
489 Collection<int32_t> docMap(smi->getDocMap());
490 if (docMap) {
491 if (!docMaps) {
492 docMaps = Collection< Collection<int32_t> >::newInstance(readerCount);
493 delCounts = Collection<int32_t>::newInstance(readerCount);
494 }
495 docMaps[i] = docMap;
496 IndexReaderPtr segmentMergeReader(smi->_reader);
497 delCounts[i] = segmentMergeReader->maxDoc() - segmentMergeReader->numDocs();
498 }
499
500 base += reader->numDocs();
501
502 BOOST_ASSERT(reader->numDocs() == reader->maxDoc() - smi->delCount);
503
504 if (smi->next()) {
505 queue->add(smi); // initialize queue
506 } else {
507 smi->close();
508 }
509 }
510
511 Collection<SegmentMergeInfoPtr> match(Collection<SegmentMergeInfoPtr>::newInstance(readers.size()));
512
513 String currentField;
514 FormatPostingsTermsConsumerPtr termsConsumer;
515
516 while (!queue->empty()) {
517 int32_t matchSize = 0; // pop matching terms
518 match[matchSize++] = queue->pop();
519 TermPtr term(match[0]->term);
520 SegmentMergeInfoPtr top(queue->empty() ? SegmentMergeInfoPtr() : queue->top());
521
522 while (top && term->compareTo(top->term) == 0) {
523 match[matchSize++] = queue->pop();
524 top = queue->top();
525 }
526
527 if (currentField != term->_field) {
528 currentField = term->_field;
529 if (termsConsumer) {
530 termsConsumer->finish();
531 }
532 FieldInfoPtr fieldInfo(fieldInfos->fieldInfo(currentField));
533 termsConsumer = consumer->addField(fieldInfo);
534 omitTermFreqAndPositions = fieldInfo->omitTermFreqAndPositions;
535 }
536
537 int32_t df = appendPostings(termsConsumer, match, matchSize); // add new TermInfo
538
539 checkAbort->work(df / 3.0);
540
541 while (matchSize > 0) {
542 SegmentMergeInfoPtr smi(match[--matchSize]);
543 if (smi->next()) {
544 queue->add(smi); // restore queue
545 } else {
546 smi->close(); // done with a segment
547 }
548 }
549 }
550 }
551
getDocMaps()552 Collection< Collection<int32_t> > SegmentMerger::getDocMaps() {
553 return docMaps;
554 }
555
getDelCounts()556 Collection<int32_t> SegmentMerger::getDelCounts() {
557 return delCounts;
558 }
559
appendPostings(const FormatPostingsTermsConsumerPtr & termsConsumer,Collection<SegmentMergeInfoPtr> smis,int32_t n)560 int32_t SegmentMerger::appendPostings(const FormatPostingsTermsConsumerPtr& termsConsumer, Collection<SegmentMergeInfoPtr> smis, int32_t n) {
561 FormatPostingsDocsConsumerPtr docConsumer(termsConsumer->addTerm(smis[0]->term->_text));
562 int32_t df = 0;
563 for (int32_t i = 0; i < n; ++i) {
564 SegmentMergeInfoPtr smi(smis[i]);
565 TermPositionsPtr postings(smi->getPositions());
566 BOOST_ASSERT(postings);
567 int32_t base = smi->base;
568 Collection<int32_t> docMap(smi->getDocMap());
569 postings->seek(smi->termEnum);
570
571 while (postings->next()) {
572 ++df;
573 int32_t doc = postings->doc();
574 if (docMap) {
575 doc = docMap[doc]; // map around deletions
576 }
577 doc += base; // convert to merged space
578
579 int32_t freq = postings->freq();
580 FormatPostingsPositionsConsumerPtr posConsumer(docConsumer->addDoc(doc, freq));
581
582 if (!omitTermFreqAndPositions) {
583 for (int32_t j = 0; j < freq; ++j) {
584 int32_t position = postings->nextPosition();
585 int32_t payloadLength = postings->getPayloadLength();
586 if (payloadLength > 0) {
587 if (!payloadBuffer) {
588 payloadBuffer = ByteArray::newInstance(payloadLength);
589 }
590 if (payloadBuffer.size() < payloadLength) {
591 payloadBuffer.resize(payloadLength);
592 }
593 postings->getPayload(payloadBuffer, 0);
594 }
595 posConsumer->addPosition(position, payloadBuffer, 0, payloadLength);
596 }
597 posConsumer->finish();
598 }
599 }
600 }
601 docConsumer->finish();
602
603 return df;
604 }
605
mergeNorms()606 void SegmentMerger::mergeNorms() {
607 ByteArray normBuffer;
608 IndexOutputPtr output;
609 LuceneException finally;
610 try {
611 int32_t numFieldInfos = fieldInfos->size();
612 for (int32_t i = 0; i < numFieldInfos; ++i) {
613 FieldInfoPtr fi(fieldInfos->fieldInfo(i));
614 if (fi->isIndexed && !fi->omitNorms) {
615 if (!output) {
616 output = directory->createOutput(segment + L"." + IndexFileNames::NORMS_EXTENSION());
617 output->writeBytes(NORMS_HEADER, SIZEOF_ARRAY(NORMS_HEADER));
618 }
619 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
620 int32_t maxDoc = (*reader)->maxDoc();
621
622 if (!normBuffer) {
623 normBuffer = ByteArray::newInstance(maxDoc);
624 }
625 if (normBuffer.size() < maxDoc) { // the buffer is too small for the current segment
626 normBuffer.resize(maxDoc);
627 }
628 MiscUtils::arrayFill(normBuffer.get(), 0, normBuffer.size(), 0);
629 (*reader)->norms(fi->name, normBuffer, 0);
630 if (!(*reader)->hasDeletions()) {
631 // optimized case for segments without deleted docs
632 output->writeBytes(normBuffer.get(), maxDoc);
633 } else {
634 // this segment has deleted docs, so we have to check for every doc if it is deleted or not
635 for (int32_t k = 0; k < maxDoc; ++k) {
636 if (!(*reader)->isDeleted(k)) {
637 output->writeByte(normBuffer[k]);
638 }
639 }
640 }
641 checkAbort->work(maxDoc);
642 }
643 }
644 }
645 } catch (LuceneException& e) {
646 finally = e;
647 }
648 if (output) {
649 output->close();
650 }
651 finally.throwException();
652 }
653
CheckAbort(const OneMergePtr & merge,const DirectoryPtr & dir)654 CheckAbort::CheckAbort(const OneMergePtr& merge, const DirectoryPtr& dir) {
655 workCount = 0;
656 this->merge = merge;
657 this->_dir = dir;
658 }
659
~CheckAbort()660 CheckAbort::~CheckAbort() {
661 }
662
work(double units)663 void CheckAbort::work(double units) {
664 workCount += units;
665 if (workCount >= 10000.0) {
666 merge->checkAborted(DirectoryPtr(_dir));
667 workCount = 0;
668 }
669 }
670
CheckAbortNull()671 CheckAbortNull::CheckAbortNull() : CheckAbort(OneMergePtr(), DirectoryPtr()) {
672 }
673
~CheckAbortNull()674 CheckAbortNull::~CheckAbortNull() {
675 }
676
work(double units)677 void CheckAbortNull::work(double units) {
678 // do nothing
679 }
680
681 }
682