1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "ParallelReader.h"
9 #include "_ParallelReader.h"
10 #include "Document.h"
11 #include "FieldSelector.h"
12 #include "Term.h"
13 #include "FieldCache.h"
14 #include "StringUtils.h"
15 
16 namespace Lucene {
17 
ParallelReader(bool closeSubReaders)18 ParallelReader::ParallelReader(bool closeSubReaders) {
19     this->readers = Collection<IndexReaderPtr>::newInstance();
20     this->decrefOnClose = Collection<uint8_t>::newInstance();
21     this->fieldToReader = MapStringIndexReader::newInstance();
22     this->readerToFields = MapIndexReaderSetString::newInstance();
23     this->storedFieldReaders = Collection<IndexReaderPtr>::newInstance();
24     this->_maxDoc = 0;
25     this->_numDocs = 0;
26     this->_hasDeletions = false;
27 
28     this->incRefReaders = !closeSubReaders;
29 }
30 
~ParallelReader()31 ParallelReader::~ParallelReader() {
32 }
33 
add(const IndexReaderPtr & reader)34 void ParallelReader::add(const IndexReaderPtr& reader) {
35     ensureOpen();
36     add(reader, false);
37 }
38 
add(const IndexReaderPtr & reader,bool ignoreStoredFields)39 void ParallelReader::add(const IndexReaderPtr& reader, bool ignoreStoredFields) {
40     ensureOpen();
41     if (readers.empty()) {
42         this->_maxDoc = reader->maxDoc();
43         this->_numDocs = reader->numDocs();
44         this->_hasDeletions = reader->hasDeletions();
45     }
46 
47     if (reader->maxDoc() != _maxDoc) { // check compatibility
48         boost::throw_exception(IllegalArgumentException(L"All readers must have same maxDoc: " + StringUtils::toString(_maxDoc) +
49                                L" != " + StringUtils::toString(reader->maxDoc())));
50     }
51     if (reader->numDocs() != _numDocs) {
52         boost::throw_exception(IllegalArgumentException(L"All readers must have same numDocs: " + StringUtils::toString(_numDocs) +
53                                L" != " + StringUtils::toString(reader->numDocs())));
54     }
55 
56     HashSet<String> fields(reader->getFieldNames(IndexReader::FIELD_OPTION_ALL));
57     readerToFields.put(reader, fields);
58     for (HashSet<String>::iterator field = fields.begin(); field != fields.end(); ++field) { // update fieldToReader map
59         if (!fieldToReader.contains(*field)) {
60             fieldToReader.put(*field, reader);
61         }
62     }
63 
64     if (!ignoreStoredFields) {
65         storedFieldReaders.add(reader);    // add to storedFieldReaders
66     }
67     readers.add(reader);
68 
69     if (incRefReaders) {
70         reader->incRef();
71     }
72 
73     decrefOnClose.add(incRefReaders);
74 }
75 
clone(const LuceneObjectPtr & other)76 LuceneObjectPtr ParallelReader::clone(const LuceneObjectPtr& other) {
77     SyncLock syncLock(this);
78     try {
79         return doReopen(true);
80     } catch (LuceneException& e) {
81         boost::throw_exception(RuntimeException(e.getError()));
82     }
83     return LuceneObjectPtr();
84 }
85 
reopen()86 IndexReaderPtr ParallelReader::reopen() {
87     SyncLock syncLock(this);
88     return doReopen(false);
89 }
90 
doReopen(bool doClone)91 IndexReaderPtr ParallelReader::doReopen(bool doClone) {
92     ensureOpen();
93 
94     bool reopened = false;
95     Collection<IndexReaderPtr> newReaders(Collection<IndexReaderPtr>::newInstance());
96 
97     bool success = false;
98     LuceneException finally;
99     try {
100         for (Collection<IndexReaderPtr>::iterator oldReader = readers.begin(); oldReader != readers.end(); ++oldReader) {
101             IndexReaderPtr newReader;
102             if (doClone) {
103                 newReader = boost::dynamic_pointer_cast<IndexReader>((*oldReader)->clone());
104             } else {
105                 newReader = (*oldReader)->reopen();
106             }
107             newReaders.add(newReader);
108             // if at least one of the subreaders was updated we remember that and return a new ParallelReader
109             if (newReader != *oldReader) {
110                 reopened = true;
111             }
112         }
113         success = true;
114     } catch (LuceneException& e) {
115         finally = e;
116     }
117     if (!success && reopened) {
118         for (int32_t i = 0; i < newReaders.size(); ++i) {
119             if (newReaders[i] != readers[i]) {
120                 try {
121                     if (newReaders[i]) {
122                         newReaders[i]->close();
123                     }
124                 } catch (...) {
125                     // keep going - we want to clean up as much as possible
126                 }
127             }
128         }
129     }
130     finally.throwException();
131 
132     if (reopened) {
133         Collection<uint8_t> newDecrefOnClose(Collection<uint8_t>::newInstance());
134         ParallelReaderPtr pr(newLucene<ParallelReader>());
135         for (int32_t i = 0; i < readers.size(); ++i) {
136             IndexReaderPtr oldReader(readers[i]);
137             IndexReaderPtr newReader(newReaders[i]);
138             if (newReader == oldReader) {
139                 newDecrefOnClose.add(true);
140                 newReader->incRef();
141             } else {
142                 // this is a new subreader instance, so on close() we don't decRef but close it
143                 newDecrefOnClose.add(false);
144             }
145             pr->add(newReader, !storedFieldReaders.contains(oldReader));
146         }
147         pr->decrefOnClose = newDecrefOnClose;
148         pr->incRefReaders = incRefReaders;
149         return pr;
150     } else {
151         // No subreader was refreshed
152         return shared_from_this();
153     }
154 }
155 
numDocs()156 int32_t ParallelReader::numDocs() {
157     // Don't call ensureOpen() here (it could affect performance)
158     return _numDocs;
159 }
160 
maxDoc()161 int32_t ParallelReader::maxDoc() {
162     // Don't call ensureOpen() here (it could affect performance)
163     return _maxDoc;
164 }
165 
hasDeletions()166 bool ParallelReader::hasDeletions() {
167     // Don't call ensureOpen() here (it could affect performance)
168     return _hasDeletions;
169 }
170 
isDeleted(int32_t n)171 bool ParallelReader::isDeleted(int32_t n) {
172     // Don't call ensureOpen() here (it could affect performance)
173     return !readers.empty() ? readers[0]->isDeleted(n) : false; // check first reader
174 }
175 
doDelete(int32_t docNum)176 void ParallelReader::doDelete(int32_t docNum) {
177     // delete in all readers
178     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
179         (*reader)->deleteDocument(docNum);
180     }
181     _hasDeletions = true;
182 }
183 
doUndeleteAll()184 void ParallelReader::doUndeleteAll() {
185     // undeleteAll in all readers
186     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
187         (*reader)->undeleteAll();
188     }
189     _hasDeletions = false;
190 }
191 
document(int32_t n,const FieldSelectorPtr & fieldSelector)192 DocumentPtr ParallelReader::document(int32_t n, const FieldSelectorPtr& fieldSelector) {
193     ensureOpen();
194     DocumentPtr result(newLucene<Document>());
195 
196     // append fields from storedFieldReaders
197     for (Collection<IndexReaderPtr>::iterator reader = storedFieldReaders.begin(); reader != storedFieldReaders.end(); ++reader) {
198         bool include = !fieldSelector;
199         if (!include) {
200             HashSet<String> fields = readerToFields.get(*reader);
201             for (HashSet<String>::iterator field = fields.begin(); field != fields.end(); ++field) {
202                 if (fieldSelector->accept(*field) != FieldSelector::SELECTOR_NO_LOAD) {
203                     include = true;
204                     break;
205                 }
206             }
207         }
208         if (include) {
209             Collection<FieldablePtr> fields((*reader)->document(n, fieldSelector)->getFields());
210             for (Collection<FieldablePtr>::iterator field = fields.begin(); field != fields.end(); ++field) {
211                 result->add(*field);
212             }
213         }
214     }
215     return result;
216 }
217 
getTermFreqVectors(int32_t docNumber)218 Collection<TermFreqVectorPtr> ParallelReader::getTermFreqVectors(int32_t docNumber) {
219     ensureOpen();
220 
221     Collection<TermFreqVectorPtr> results(Collection<TermFreqVectorPtr>::newInstance());
222 
223     // get all vectors
224     for (MapStringIndexReader::iterator entry = fieldToReader.begin(); entry != fieldToReader.end(); ++entry) {
225         TermFreqVectorPtr vector(entry->second->getTermFreqVector(docNumber, entry->first));
226         if (vector) {
227             results.add(vector);
228         }
229     }
230 
231     return results;
232 }
233 
getTermFreqVector(int32_t docNumber,const String & field)234 TermFreqVectorPtr ParallelReader::getTermFreqVector(int32_t docNumber, const String& field) {
235     ensureOpen();
236     MapStringIndexReader::iterator reader = fieldToReader.find(field);
237     return reader == fieldToReader.end() ? TermFreqVectorPtr() : reader->second->getTermFreqVector(docNumber, field);
238 }
239 
getTermFreqVector(int32_t docNumber,const String & field,const TermVectorMapperPtr & mapper)240 void ParallelReader::getTermFreqVector(int32_t docNumber, const String& field, const TermVectorMapperPtr& mapper) {
241     ensureOpen();
242     MapStringIndexReader::iterator reader = fieldToReader.find(field);
243     if (reader != fieldToReader.end()) {
244         reader->second->getTermFreqVector(docNumber, field, mapper);
245     }
246 }
247 
getTermFreqVector(int32_t docNumber,const TermVectorMapperPtr & mapper)248 void ParallelReader::getTermFreqVector(int32_t docNumber, const TermVectorMapperPtr& mapper) {
249     ensureOpen();
250     for (MapStringIndexReader::iterator entry = fieldToReader.begin(); entry != fieldToReader.end(); ++entry) {
251         entry->second->getTermFreqVector(docNumber, entry->first, mapper);
252     }
253 }
254 
hasNorms(const String & field)255 bool ParallelReader::hasNorms(const String& field) {
256     ensureOpen();
257     MapStringIndexReader::iterator reader = fieldToReader.find(field);
258     return reader == fieldToReader.end() ? false : reader->second->hasNorms(field);
259 }
260 
norms(const String & field)261 ByteArray ParallelReader::norms(const String& field) {
262     ensureOpen();
263     MapStringIndexReader::iterator reader = fieldToReader.find(field);
264     return reader == fieldToReader.end() ? ByteArray() : reader->second->norms(field);
265 }
266 
norms(const String & field,ByteArray norms,int32_t offset)267 void ParallelReader::norms(const String& field, ByteArray norms, int32_t offset) {
268     ensureOpen();
269     MapStringIndexReader::iterator reader = fieldToReader.find(field);
270     if (reader != fieldToReader.end()) {
271         reader->second->norms(field, norms, offset);
272     }
273 }
274 
doSetNorm(int32_t doc,const String & field,uint8_t value)275 void ParallelReader::doSetNorm(int32_t doc, const String& field, uint8_t value) {
276     ensureOpen();
277     MapStringIndexReader::iterator reader = fieldToReader.find(field);
278     if (reader != fieldToReader.end()) {
279         reader->second->doSetNorm(doc, field, value);
280     }
281 }
282 
terms()283 TermEnumPtr ParallelReader::terms() {
284     ensureOpen();
285     return newLucene<ParallelTermEnum>(shared_from_this());
286 }
287 
terms(const TermPtr & t)288 TermEnumPtr ParallelReader::terms(const TermPtr& t) {
289     ensureOpen();
290     return newLucene<ParallelTermEnum>(shared_from_this(), t);
291 }
292 
docFreq(const TermPtr & t)293 int32_t ParallelReader::docFreq(const TermPtr& t) {
294     ensureOpen();
295     MapStringIndexReader::iterator reader = fieldToReader.find(t->field());
296     return reader == fieldToReader.end() ? 0 : reader->second->docFreq(t);
297 }
298 
termDocs(const TermPtr & term)299 TermDocsPtr ParallelReader::termDocs(const TermPtr& term) {
300     ensureOpen();
301     return newLucene<ParallelTermDocs>(shared_from_this(), term);
302 }
303 
termDocs()304 TermDocsPtr ParallelReader::termDocs() {
305     ensureOpen();
306     return newLucene<ParallelTermDocs>(shared_from_this());
307 }
308 
termPositions(const TermPtr & term)309 TermPositionsPtr ParallelReader::termPositions(const TermPtr& term) {
310     ensureOpen();
311     return newLucene<ParallelTermPositions>(shared_from_this(), term);
312 }
313 
termPositions()314 TermPositionsPtr ParallelReader::termPositions() {
315     ensureOpen();
316     return newLucene<ParallelTermPositions>(shared_from_this());
317 }
318 
isCurrent()319 bool ParallelReader::isCurrent() {
320     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
321         if (!(*reader)->isCurrent()) {
322             return false;
323         }
324     }
325 
326     // all subreaders are up to date
327     return true;
328 }
329 
isOptimized()330 bool ParallelReader::isOptimized() {
331     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
332         if (!(*reader)->isOptimized()) {
333             return false;
334         }
335     }
336 
337     // all subindexes are optimized
338     return true;
339 }
340 
getVersion()341 int64_t ParallelReader::getVersion() {
342     boost::throw_exception(UnsupportedOperationException(L"ParallelReader does not support this method."));
343     return 0;
344 }
345 
getSubReaders()346 Collection<IndexReaderPtr> ParallelReader::getSubReaders() {
347     return readers;
348 }
349 
doCommit(MapStringString commitUserData)350 void ParallelReader::doCommit(MapStringString commitUserData) {
351     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
352         (*reader)->commit(commitUserData);
353     }
354 }
355 
doClose()356 void ParallelReader::doClose() {
357     SyncLock syncLock(this);
358     for (int32_t i = 0; i < readers.size(); ++i) {
359         if (decrefOnClose[i]) {
360             readers[i]->decRef();
361         } else {
362             readers[i]->close();
363         }
364     }
365 
366     FieldCache::DEFAULT()->purge(shared_from_this());
367 }
368 
getFieldNames(FieldOption fieldOption)369 HashSet<String> ParallelReader::getFieldNames(FieldOption fieldOption) {
370     ensureOpen();
371     HashSet<String> fieldSet(HashSet<String>::newInstance());
372     for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
373         HashSet<String> names((*reader)->getFieldNames(fieldOption));
374         fieldSet.addAll(names.begin(), names.end());
375     }
376     return fieldSet;
377 }
378 
ParallelTermEnum(const ParallelReaderPtr & reader)379 ParallelTermEnum::ParallelTermEnum(const ParallelReaderPtr& reader) {
380     this->setIterator = false;
381     this->_reader = reader;
382     MapStringIndexReader::iterator indexReader = reader->fieldToReader.begin();
383     if (indexReader != reader->fieldToReader.end()) {
384         this->field = indexReader->first;
385     }
386     if (!field.empty()) {
387         this->termEnum = reader->fieldToReader[field]->terms();
388     }
389 }
390 
ParallelTermEnum(const ParallelReaderPtr & reader,const TermPtr & term)391 ParallelTermEnum::ParallelTermEnum(const ParallelReaderPtr& reader, const TermPtr& term) {
392     this->setIterator = false;
393     this->_reader = reader;
394     this->field = term->field();
395     MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(field);
396     if (indexReader != reader->fieldToReader.end()) {
397         this->termEnum = indexReader->second->terms(term);
398     }
399 }
400 
~ParallelTermEnum()401 ParallelTermEnum::~ParallelTermEnum() {
402 }
403 
next()404 bool ParallelTermEnum::next() {
405     if (!termEnum) {
406         return false;
407     }
408 
409     // another term in this field?
410     if (termEnum->next() && termEnum->term()->field() == field) {
411         return true;    // yes, keep going
412     }
413 
414     termEnum->close(); // close old termEnum
415     ParallelReaderPtr reader(_reader);
416 
417     // find the next field with terms, if any
418     if (!setIterator) {
419         fieldIterator = reader->fieldToReader.find(field);
420         ++fieldIterator; // Skip field to get next one
421         setIterator = false;
422     }
423 
424     while (fieldIterator != reader->fieldToReader.end()) {
425         field = fieldIterator->first;
426         termEnum = fieldIterator->second->terms(newLucene<Term>(field));
427         ++fieldIterator;
428         TermPtr term(termEnum->term());
429         if (term && term->field() == field) {
430             return true;
431         } else {
432             termEnum->close();
433         }
434     }
435 
436     return false; // no more fields
437 }
438 
term()439 TermPtr ParallelTermEnum::term() {
440     return termEnum ? termEnum->term() : TermPtr();
441 }
442 
docFreq()443 int32_t ParallelTermEnum::docFreq() {
444     return termEnum ? termEnum->docFreq() : 0;
445 }
446 
close()447 void ParallelTermEnum::close() {
448     if (termEnum) {
449         termEnum->close();
450     }
451 }
452 
ParallelTermDocs(const ParallelReaderPtr & reader)453 ParallelTermDocs::ParallelTermDocs(const ParallelReaderPtr& reader) {
454     this->_reader = reader;
455 }
456 
ParallelTermDocs(const ParallelReaderPtr & reader,const TermPtr & term)457 ParallelTermDocs::ParallelTermDocs(const ParallelReaderPtr& reader, const TermPtr& term) {
458     this->_reader = reader;
459     if (!term) {
460         termDocs = reader->readers.empty() ? TermDocsPtr() : reader->readers[0]->termDocs(TermPtr());
461     } else {
462         seek(term);
463     }
464 }
465 
~ParallelTermDocs()466 ParallelTermDocs::~ParallelTermDocs() {
467 }
468 
doc()469 int32_t ParallelTermDocs::doc() {
470     return termDocs->doc();
471 }
472 
freq()473 int32_t ParallelTermDocs::freq() {
474     return termDocs->freq();
475 }
476 
seek(const TermPtr & term)477 void ParallelTermDocs::seek(const TermPtr& term) {
478     ParallelReaderPtr reader(_reader);
479     MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(term->field());
480     termDocs = indexReader != reader->fieldToReader.end() ? indexReader->second->termDocs(term) : TermDocsPtr();
481 }
482 
seek(const TermEnumPtr & termEnum)483 void ParallelTermDocs::seek(const TermEnumPtr& termEnum) {
484     seek(termEnum->term());
485 }
486 
next()487 bool ParallelTermDocs::next() {
488     return termDocs ? termDocs->next() : false;
489 }
490 
read(Collection<int32_t> docs,Collection<int32_t> freqs)491 int32_t ParallelTermDocs::read(Collection<int32_t> docs, Collection<int32_t> freqs) {
492     return termDocs ? termDocs->read(docs, freqs) : 0;
493 }
494 
skipTo(int32_t target)495 bool ParallelTermDocs::skipTo(int32_t target) {
496     return termDocs ? termDocs->skipTo(target) : false;
497 }
498 
close()499 void ParallelTermDocs::close() {
500     if (termDocs) {
501         termDocs->close();
502     }
503 }
504 
ParallelTermPositions(const ParallelReaderPtr & reader)505 ParallelTermPositions::ParallelTermPositions(const ParallelReaderPtr& reader) : ParallelTermDocs(reader) {
506 }
507 
ParallelTermPositions(const ParallelReaderPtr & reader,const TermPtr & term)508 ParallelTermPositions::ParallelTermPositions(const ParallelReaderPtr& reader, const TermPtr& term) : ParallelTermDocs(reader) {
509     seek(term);
510 }
511 
~ParallelTermPositions()512 ParallelTermPositions::~ParallelTermPositions() {
513 }
514 
seek(const TermPtr & term)515 void ParallelTermPositions::seek(const TermPtr& term) {
516     ParallelReaderPtr reader(_reader);
517     MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(term->field());
518     termDocs = indexReader != reader->fieldToReader.end() ? indexReader->second->termPositions(term) : TermDocsPtr();
519 }
520 
nextPosition()521 int32_t ParallelTermPositions::nextPosition() {
522     // It is an error to call this if there is no next position, eg. if termDocs==null
523     return boost::static_pointer_cast<TermPositions>(termDocs)->nextPosition();
524 }
525 
getPayloadLength()526 int32_t ParallelTermPositions::getPayloadLength() {
527     // It is an error to call this if there is no next position, eg. if termDocs==null
528     return boost::static_pointer_cast<TermPositions>(termDocs)->getPayloadLength();
529 }
530 
getPayload(ByteArray data,int32_t offset)531 ByteArray ParallelTermPositions::getPayload(ByteArray data, int32_t offset) {
532     // It is an error to call this if there is no next position, eg. if termDocs==null
533     return boost::static_pointer_cast<TermPositions>(termDocs)->getPayload(data, offset);
534 }
535 
isPayloadAvailable()536 bool ParallelTermPositions::isPayloadAvailable() {
537     // It is an error to call this if there is no next position, eg. if termDocs==null
538     return boost::static_pointer_cast<TermPositions>(termDocs)->isPayloadAvailable();
539 }
540 
541 }
542