1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "ParallelReader.h"
9 #include "_ParallelReader.h"
10 #include "Document.h"
11 #include "FieldSelector.h"
12 #include "Term.h"
13 #include "FieldCache.h"
14 #include "StringUtils.h"
15
16 namespace Lucene {
17
ParallelReader(bool closeSubReaders)18 ParallelReader::ParallelReader(bool closeSubReaders) {
19 this->readers = Collection<IndexReaderPtr>::newInstance();
20 this->decrefOnClose = Collection<uint8_t>::newInstance();
21 this->fieldToReader = MapStringIndexReader::newInstance();
22 this->readerToFields = MapIndexReaderSetString::newInstance();
23 this->storedFieldReaders = Collection<IndexReaderPtr>::newInstance();
24 this->_maxDoc = 0;
25 this->_numDocs = 0;
26 this->_hasDeletions = false;
27
28 this->incRefReaders = !closeSubReaders;
29 }
30
~ParallelReader()31 ParallelReader::~ParallelReader() {
32 }
33
add(const IndexReaderPtr & reader)34 void ParallelReader::add(const IndexReaderPtr& reader) {
35 ensureOpen();
36 add(reader, false);
37 }
38
add(const IndexReaderPtr & reader,bool ignoreStoredFields)39 void ParallelReader::add(const IndexReaderPtr& reader, bool ignoreStoredFields) {
40 ensureOpen();
41 if (readers.empty()) {
42 this->_maxDoc = reader->maxDoc();
43 this->_numDocs = reader->numDocs();
44 this->_hasDeletions = reader->hasDeletions();
45 }
46
47 if (reader->maxDoc() != _maxDoc) { // check compatibility
48 boost::throw_exception(IllegalArgumentException(L"All readers must have same maxDoc: " + StringUtils::toString(_maxDoc) +
49 L" != " + StringUtils::toString(reader->maxDoc())));
50 }
51 if (reader->numDocs() != _numDocs) {
52 boost::throw_exception(IllegalArgumentException(L"All readers must have same numDocs: " + StringUtils::toString(_numDocs) +
53 L" != " + StringUtils::toString(reader->numDocs())));
54 }
55
56 HashSet<String> fields(reader->getFieldNames(IndexReader::FIELD_OPTION_ALL));
57 readerToFields.put(reader, fields);
58 for (HashSet<String>::iterator field = fields.begin(); field != fields.end(); ++field) { // update fieldToReader map
59 if (!fieldToReader.contains(*field)) {
60 fieldToReader.put(*field, reader);
61 }
62 }
63
64 if (!ignoreStoredFields) {
65 storedFieldReaders.add(reader); // add to storedFieldReaders
66 }
67 readers.add(reader);
68
69 if (incRefReaders) {
70 reader->incRef();
71 }
72
73 decrefOnClose.add(incRefReaders);
74 }
75
clone(const LuceneObjectPtr & other)76 LuceneObjectPtr ParallelReader::clone(const LuceneObjectPtr& other) {
77 SyncLock syncLock(this);
78 try {
79 return doReopen(true);
80 } catch (LuceneException& e) {
81 boost::throw_exception(RuntimeException(e.getError()));
82 }
83 return LuceneObjectPtr();
84 }
85
reopen()86 IndexReaderPtr ParallelReader::reopen() {
87 SyncLock syncLock(this);
88 return doReopen(false);
89 }
90
doReopen(bool doClone)91 IndexReaderPtr ParallelReader::doReopen(bool doClone) {
92 ensureOpen();
93
94 bool reopened = false;
95 Collection<IndexReaderPtr> newReaders(Collection<IndexReaderPtr>::newInstance());
96
97 bool success = false;
98 LuceneException finally;
99 try {
100 for (Collection<IndexReaderPtr>::iterator oldReader = readers.begin(); oldReader != readers.end(); ++oldReader) {
101 IndexReaderPtr newReader;
102 if (doClone) {
103 newReader = boost::dynamic_pointer_cast<IndexReader>((*oldReader)->clone());
104 } else {
105 newReader = (*oldReader)->reopen();
106 }
107 newReaders.add(newReader);
108 // if at least one of the subreaders was updated we remember that and return a new ParallelReader
109 if (newReader != *oldReader) {
110 reopened = true;
111 }
112 }
113 success = true;
114 } catch (LuceneException& e) {
115 finally = e;
116 }
117 if (!success && reopened) {
118 for (int32_t i = 0; i < newReaders.size(); ++i) {
119 if (newReaders[i] != readers[i]) {
120 try {
121 if (newReaders[i]) {
122 newReaders[i]->close();
123 }
124 } catch (...) {
125 // keep going - we want to clean up as much as possible
126 }
127 }
128 }
129 }
130 finally.throwException();
131
132 if (reopened) {
133 Collection<uint8_t> newDecrefOnClose(Collection<uint8_t>::newInstance());
134 ParallelReaderPtr pr(newLucene<ParallelReader>());
135 for (int32_t i = 0; i < readers.size(); ++i) {
136 IndexReaderPtr oldReader(readers[i]);
137 IndexReaderPtr newReader(newReaders[i]);
138 if (newReader == oldReader) {
139 newDecrefOnClose.add(true);
140 newReader->incRef();
141 } else {
142 // this is a new subreader instance, so on close() we don't decRef but close it
143 newDecrefOnClose.add(false);
144 }
145 pr->add(newReader, !storedFieldReaders.contains(oldReader));
146 }
147 pr->decrefOnClose = newDecrefOnClose;
148 pr->incRefReaders = incRefReaders;
149 return pr;
150 } else {
151 // No subreader was refreshed
152 return shared_from_this();
153 }
154 }
155
numDocs()156 int32_t ParallelReader::numDocs() {
157 // Don't call ensureOpen() here (it could affect performance)
158 return _numDocs;
159 }
160
maxDoc()161 int32_t ParallelReader::maxDoc() {
162 // Don't call ensureOpen() here (it could affect performance)
163 return _maxDoc;
164 }
165
hasDeletions()166 bool ParallelReader::hasDeletions() {
167 // Don't call ensureOpen() here (it could affect performance)
168 return _hasDeletions;
169 }
170
isDeleted(int32_t n)171 bool ParallelReader::isDeleted(int32_t n) {
172 // Don't call ensureOpen() here (it could affect performance)
173 return !readers.empty() ? readers[0]->isDeleted(n) : false; // check first reader
174 }
175
doDelete(int32_t docNum)176 void ParallelReader::doDelete(int32_t docNum) {
177 // delete in all readers
178 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
179 (*reader)->deleteDocument(docNum);
180 }
181 _hasDeletions = true;
182 }
183
doUndeleteAll()184 void ParallelReader::doUndeleteAll() {
185 // undeleteAll in all readers
186 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
187 (*reader)->undeleteAll();
188 }
189 _hasDeletions = false;
190 }
191
document(int32_t n,const FieldSelectorPtr & fieldSelector)192 DocumentPtr ParallelReader::document(int32_t n, const FieldSelectorPtr& fieldSelector) {
193 ensureOpen();
194 DocumentPtr result(newLucene<Document>());
195
196 // append fields from storedFieldReaders
197 for (Collection<IndexReaderPtr>::iterator reader = storedFieldReaders.begin(); reader != storedFieldReaders.end(); ++reader) {
198 bool include = !fieldSelector;
199 if (!include) {
200 HashSet<String> fields = readerToFields.get(*reader);
201 for (HashSet<String>::iterator field = fields.begin(); field != fields.end(); ++field) {
202 if (fieldSelector->accept(*field) != FieldSelector::SELECTOR_NO_LOAD) {
203 include = true;
204 break;
205 }
206 }
207 }
208 if (include) {
209 Collection<FieldablePtr> fields((*reader)->document(n, fieldSelector)->getFields());
210 for (Collection<FieldablePtr>::iterator field = fields.begin(); field != fields.end(); ++field) {
211 result->add(*field);
212 }
213 }
214 }
215 return result;
216 }
217
getTermFreqVectors(int32_t docNumber)218 Collection<TermFreqVectorPtr> ParallelReader::getTermFreqVectors(int32_t docNumber) {
219 ensureOpen();
220
221 Collection<TermFreqVectorPtr> results(Collection<TermFreqVectorPtr>::newInstance());
222
223 // get all vectors
224 for (MapStringIndexReader::iterator entry = fieldToReader.begin(); entry != fieldToReader.end(); ++entry) {
225 TermFreqVectorPtr vector(entry->second->getTermFreqVector(docNumber, entry->first));
226 if (vector) {
227 results.add(vector);
228 }
229 }
230
231 return results;
232 }
233
getTermFreqVector(int32_t docNumber,const String & field)234 TermFreqVectorPtr ParallelReader::getTermFreqVector(int32_t docNumber, const String& field) {
235 ensureOpen();
236 MapStringIndexReader::iterator reader = fieldToReader.find(field);
237 return reader == fieldToReader.end() ? TermFreqVectorPtr() : reader->second->getTermFreqVector(docNumber, field);
238 }
239
getTermFreqVector(int32_t docNumber,const String & field,const TermVectorMapperPtr & mapper)240 void ParallelReader::getTermFreqVector(int32_t docNumber, const String& field, const TermVectorMapperPtr& mapper) {
241 ensureOpen();
242 MapStringIndexReader::iterator reader = fieldToReader.find(field);
243 if (reader != fieldToReader.end()) {
244 reader->second->getTermFreqVector(docNumber, field, mapper);
245 }
246 }
247
getTermFreqVector(int32_t docNumber,const TermVectorMapperPtr & mapper)248 void ParallelReader::getTermFreqVector(int32_t docNumber, const TermVectorMapperPtr& mapper) {
249 ensureOpen();
250 for (MapStringIndexReader::iterator entry = fieldToReader.begin(); entry != fieldToReader.end(); ++entry) {
251 entry->second->getTermFreqVector(docNumber, entry->first, mapper);
252 }
253 }
254
hasNorms(const String & field)255 bool ParallelReader::hasNorms(const String& field) {
256 ensureOpen();
257 MapStringIndexReader::iterator reader = fieldToReader.find(field);
258 return reader == fieldToReader.end() ? false : reader->second->hasNorms(field);
259 }
260
norms(const String & field)261 ByteArray ParallelReader::norms(const String& field) {
262 ensureOpen();
263 MapStringIndexReader::iterator reader = fieldToReader.find(field);
264 return reader == fieldToReader.end() ? ByteArray() : reader->second->norms(field);
265 }
266
norms(const String & field,ByteArray norms,int32_t offset)267 void ParallelReader::norms(const String& field, ByteArray norms, int32_t offset) {
268 ensureOpen();
269 MapStringIndexReader::iterator reader = fieldToReader.find(field);
270 if (reader != fieldToReader.end()) {
271 reader->second->norms(field, norms, offset);
272 }
273 }
274
doSetNorm(int32_t doc,const String & field,uint8_t value)275 void ParallelReader::doSetNorm(int32_t doc, const String& field, uint8_t value) {
276 ensureOpen();
277 MapStringIndexReader::iterator reader = fieldToReader.find(field);
278 if (reader != fieldToReader.end()) {
279 reader->second->doSetNorm(doc, field, value);
280 }
281 }
282
terms()283 TermEnumPtr ParallelReader::terms() {
284 ensureOpen();
285 return newLucene<ParallelTermEnum>(shared_from_this());
286 }
287
terms(const TermPtr & t)288 TermEnumPtr ParallelReader::terms(const TermPtr& t) {
289 ensureOpen();
290 return newLucene<ParallelTermEnum>(shared_from_this(), t);
291 }
292
docFreq(const TermPtr & t)293 int32_t ParallelReader::docFreq(const TermPtr& t) {
294 ensureOpen();
295 MapStringIndexReader::iterator reader = fieldToReader.find(t->field());
296 return reader == fieldToReader.end() ? 0 : reader->second->docFreq(t);
297 }
298
termDocs(const TermPtr & term)299 TermDocsPtr ParallelReader::termDocs(const TermPtr& term) {
300 ensureOpen();
301 return newLucene<ParallelTermDocs>(shared_from_this(), term);
302 }
303
termDocs()304 TermDocsPtr ParallelReader::termDocs() {
305 ensureOpen();
306 return newLucene<ParallelTermDocs>(shared_from_this());
307 }
308
termPositions(const TermPtr & term)309 TermPositionsPtr ParallelReader::termPositions(const TermPtr& term) {
310 ensureOpen();
311 return newLucene<ParallelTermPositions>(shared_from_this(), term);
312 }
313
termPositions()314 TermPositionsPtr ParallelReader::termPositions() {
315 ensureOpen();
316 return newLucene<ParallelTermPositions>(shared_from_this());
317 }
318
isCurrent()319 bool ParallelReader::isCurrent() {
320 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
321 if (!(*reader)->isCurrent()) {
322 return false;
323 }
324 }
325
326 // all subreaders are up to date
327 return true;
328 }
329
isOptimized()330 bool ParallelReader::isOptimized() {
331 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
332 if (!(*reader)->isOptimized()) {
333 return false;
334 }
335 }
336
337 // all subindexes are optimized
338 return true;
339 }
340
getVersion()341 int64_t ParallelReader::getVersion() {
342 boost::throw_exception(UnsupportedOperationException(L"ParallelReader does not support this method."));
343 return 0;
344 }
345
getSubReaders()346 Collection<IndexReaderPtr> ParallelReader::getSubReaders() {
347 return readers;
348 }
349
doCommit(MapStringString commitUserData)350 void ParallelReader::doCommit(MapStringString commitUserData) {
351 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
352 (*reader)->commit(commitUserData);
353 }
354 }
355
doClose()356 void ParallelReader::doClose() {
357 SyncLock syncLock(this);
358 for (int32_t i = 0; i < readers.size(); ++i) {
359 if (decrefOnClose[i]) {
360 readers[i]->decRef();
361 } else {
362 readers[i]->close();
363 }
364 }
365
366 FieldCache::DEFAULT()->purge(shared_from_this());
367 }
368
getFieldNames(FieldOption fieldOption)369 HashSet<String> ParallelReader::getFieldNames(FieldOption fieldOption) {
370 ensureOpen();
371 HashSet<String> fieldSet(HashSet<String>::newInstance());
372 for (Collection<IndexReaderPtr>::iterator reader = readers.begin(); reader != readers.end(); ++reader) {
373 HashSet<String> names((*reader)->getFieldNames(fieldOption));
374 fieldSet.addAll(names.begin(), names.end());
375 }
376 return fieldSet;
377 }
378
ParallelTermEnum(const ParallelReaderPtr & reader)379 ParallelTermEnum::ParallelTermEnum(const ParallelReaderPtr& reader) {
380 this->setIterator = false;
381 this->_reader = reader;
382 MapStringIndexReader::iterator indexReader = reader->fieldToReader.begin();
383 if (indexReader != reader->fieldToReader.end()) {
384 this->field = indexReader->first;
385 }
386 if (!field.empty()) {
387 this->termEnum = reader->fieldToReader[field]->terms();
388 }
389 }
390
ParallelTermEnum(const ParallelReaderPtr & reader,const TermPtr & term)391 ParallelTermEnum::ParallelTermEnum(const ParallelReaderPtr& reader, const TermPtr& term) {
392 this->setIterator = false;
393 this->_reader = reader;
394 this->field = term->field();
395 MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(field);
396 if (indexReader != reader->fieldToReader.end()) {
397 this->termEnum = indexReader->second->terms(term);
398 }
399 }
400
~ParallelTermEnum()401 ParallelTermEnum::~ParallelTermEnum() {
402 }
403
next()404 bool ParallelTermEnum::next() {
405 if (!termEnum) {
406 return false;
407 }
408
409 // another term in this field?
410 if (termEnum->next() && termEnum->term()->field() == field) {
411 return true; // yes, keep going
412 }
413
414 termEnum->close(); // close old termEnum
415 ParallelReaderPtr reader(_reader);
416
417 // find the next field with terms, if any
418 if (!setIterator) {
419 fieldIterator = reader->fieldToReader.find(field);
420 ++fieldIterator; // Skip field to get next one
421 setIterator = false;
422 }
423
424 while (fieldIterator != reader->fieldToReader.end()) {
425 field = fieldIterator->first;
426 termEnum = fieldIterator->second->terms(newLucene<Term>(field));
427 ++fieldIterator;
428 TermPtr term(termEnum->term());
429 if (term && term->field() == field) {
430 return true;
431 } else {
432 termEnum->close();
433 }
434 }
435
436 return false; // no more fields
437 }
438
term()439 TermPtr ParallelTermEnum::term() {
440 return termEnum ? termEnum->term() : TermPtr();
441 }
442
docFreq()443 int32_t ParallelTermEnum::docFreq() {
444 return termEnum ? termEnum->docFreq() : 0;
445 }
446
close()447 void ParallelTermEnum::close() {
448 if (termEnum) {
449 termEnum->close();
450 }
451 }
452
ParallelTermDocs(const ParallelReaderPtr & reader)453 ParallelTermDocs::ParallelTermDocs(const ParallelReaderPtr& reader) {
454 this->_reader = reader;
455 }
456
ParallelTermDocs(const ParallelReaderPtr & reader,const TermPtr & term)457 ParallelTermDocs::ParallelTermDocs(const ParallelReaderPtr& reader, const TermPtr& term) {
458 this->_reader = reader;
459 if (!term) {
460 termDocs = reader->readers.empty() ? TermDocsPtr() : reader->readers[0]->termDocs(TermPtr());
461 } else {
462 seek(term);
463 }
464 }
465
~ParallelTermDocs()466 ParallelTermDocs::~ParallelTermDocs() {
467 }
468
doc()469 int32_t ParallelTermDocs::doc() {
470 return termDocs->doc();
471 }
472
freq()473 int32_t ParallelTermDocs::freq() {
474 return termDocs->freq();
475 }
476
seek(const TermPtr & term)477 void ParallelTermDocs::seek(const TermPtr& term) {
478 ParallelReaderPtr reader(_reader);
479 MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(term->field());
480 termDocs = indexReader != reader->fieldToReader.end() ? indexReader->second->termDocs(term) : TermDocsPtr();
481 }
482
seek(const TermEnumPtr & termEnum)483 void ParallelTermDocs::seek(const TermEnumPtr& termEnum) {
484 seek(termEnum->term());
485 }
486
next()487 bool ParallelTermDocs::next() {
488 return termDocs ? termDocs->next() : false;
489 }
490
read(Collection<int32_t> docs,Collection<int32_t> freqs)491 int32_t ParallelTermDocs::read(Collection<int32_t> docs, Collection<int32_t> freqs) {
492 return termDocs ? termDocs->read(docs, freqs) : 0;
493 }
494
skipTo(int32_t target)495 bool ParallelTermDocs::skipTo(int32_t target) {
496 return termDocs ? termDocs->skipTo(target) : false;
497 }
498
close()499 void ParallelTermDocs::close() {
500 if (termDocs) {
501 termDocs->close();
502 }
503 }
504
ParallelTermPositions(const ParallelReaderPtr & reader)505 ParallelTermPositions::ParallelTermPositions(const ParallelReaderPtr& reader) : ParallelTermDocs(reader) {
506 }
507
ParallelTermPositions(const ParallelReaderPtr & reader,const TermPtr & term)508 ParallelTermPositions::ParallelTermPositions(const ParallelReaderPtr& reader, const TermPtr& term) : ParallelTermDocs(reader) {
509 seek(term);
510 }
511
~ParallelTermPositions()512 ParallelTermPositions::~ParallelTermPositions() {
513 }
514
seek(const TermPtr & term)515 void ParallelTermPositions::seek(const TermPtr& term) {
516 ParallelReaderPtr reader(_reader);
517 MapStringIndexReader::iterator indexReader = reader->fieldToReader.find(term->field());
518 termDocs = indexReader != reader->fieldToReader.end() ? indexReader->second->termPositions(term) : TermDocsPtr();
519 }
520
nextPosition()521 int32_t ParallelTermPositions::nextPosition() {
522 // It is an error to call this if there is no next position, eg. if termDocs==null
523 return boost::static_pointer_cast<TermPositions>(termDocs)->nextPosition();
524 }
525
getPayloadLength()526 int32_t ParallelTermPositions::getPayloadLength() {
527 // It is an error to call this if there is no next position, eg. if termDocs==null
528 return boost::static_pointer_cast<TermPositions>(termDocs)->getPayloadLength();
529 }
530
getPayload(ByteArray data,int32_t offset)531 ByteArray ParallelTermPositions::getPayload(ByteArray data, int32_t offset) {
532 // It is an error to call this if there is no next position, eg. if termDocs==null
533 return boost::static_pointer_cast<TermPositions>(termDocs)->getPayload(data, offset);
534 }
535
isPayloadAvailable()536 bool ParallelTermPositions::isPayloadAvailable() {
537 // It is an error to call this if there is no next position, eg. if termDocs==null
538 return boost::static_pointer_cast<TermPositions>(termDocs)->isPayloadAvailable();
539 }
540
541 }
542