1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "DocFieldProcessorPerThread.h"
9 #include "DocFieldProcessorPerField.h"
10 #include "DocFieldProcessor.h"
11 #include "DocFieldConsumer.h"
12 #include "DocFieldConsumerPerThread.h"
13 #include "DocFieldConsumerPerField.h"
14 #include "DocumentsWriterThreadState.h"
15 #include "DocumentsWriter.h"
16 #include "StoredFieldsWriter.h"
17 #include "StoredFieldsWriterPerThread.h"
18 #include "SegmentWriteState.h"
19 #include "FieldInfo.h"
20 #include "FieldInfos.h"
21 #include "Fieldable.h"
22 #include "IndexWriter.h"
23 #include "Document.h"
24 #include "InfoStream.h"
25 #include "MiscUtils.h"
26 #include "StringUtils.h"
27 
28 namespace Lucene {
29 
DocFieldProcessorPerThread(const DocumentsWriterThreadStatePtr & threadState,const DocFieldProcessorPtr & docFieldProcessor)30 DocFieldProcessorPerThread::DocFieldProcessorPerThread(const DocumentsWriterThreadStatePtr& threadState, const DocFieldProcessorPtr& docFieldProcessor) {
31     _fields = Collection<DocFieldProcessorPerFieldPtr>::newInstance(1);
32     fieldHash = Collection<DocFieldProcessorPerFieldPtr>::newInstance(2);
33     hashMask = 1;
34     fieldGen = 0;
35     fieldCount = 0;
36     totalFieldCount = 0;
37 
38     this->docState = threadState->docState;
39     this->_docFieldProcessor = docFieldProcessor;
40     this->fieldInfos = docFieldProcessor->fieldInfos;
41 
42     docFreeList = Collection<DocFieldProcessorPerThreadPerDocPtr>::newInstance(1);
43     freeCount = 0;
44     allocCount = 0;
45 }
46 
~DocFieldProcessorPerThread()47 DocFieldProcessorPerThread::~DocFieldProcessorPerThread() {
48 }
49 
initialize()50 void DocFieldProcessorPerThread::initialize() {
51     DocFieldProcessorPtr docFieldProcessor(_docFieldProcessor);
52     consumer = docFieldProcessor->consumer->addThread(shared_from_this());
53     fieldsWriter = docFieldProcessor->fieldsWriter->addThread(docState);
54 }
55 
abort()56 void DocFieldProcessorPerThread::abort() {
57     for (Collection<DocFieldProcessorPerFieldPtr>::iterator field = fieldHash.begin(); field != fieldHash.end(); ++field) {
58         DocFieldProcessorPerFieldPtr current(*field);
59         while (current) {
60             DocFieldProcessorPerFieldPtr next(current->next);
61             current->abort();
62             current = next;
63         }
64     }
65     fieldsWriter->abort();
66     consumer->abort();
67 }
68 
fields()69 Collection<DocFieldConsumerPerFieldPtr> DocFieldProcessorPerThread::fields() {
70     Collection<DocFieldConsumerPerFieldPtr> fields(Collection<DocFieldConsumerPerFieldPtr>::newInstance());
71     for (Collection<DocFieldProcessorPerFieldPtr>::iterator field = fieldHash.begin(); field != fieldHash.end(); ++field) {
72         DocFieldProcessorPerFieldPtr current(*field);
73         while (current) {
74             fields.add(current->consumer);
75             current = current->next;
76         }
77     }
78     BOOST_ASSERT(fields.size() == totalFieldCount);
79     return fields;
80 }
81 
trimFields(const SegmentWriteStatePtr & state)82 void DocFieldProcessorPerThread::trimFields(const SegmentWriteStatePtr& state) {
83     for (Collection<DocFieldProcessorPerFieldPtr>::iterator perField = fieldHash.begin(); perField != fieldHash.end(); ++perField) {
84         DocFieldProcessorPerFieldPtr current(*perField);
85         DocFieldProcessorPerFieldPtr lastPerField;
86 
87         while (current) {
88             if (current->lastGen == -1) {
89                 // This field was not seen since the previous flush, so, free up its resources now
90 
91                 // Unhash
92                 if (!lastPerField) {
93                     *perField = current->next;
94                 } else {
95                     lastPerField->next = current->next;
96                 }
97 
98                 DocumentsWriterPtr docWriter(state->_docWriter);
99                 if (docWriter->infoStream) {
100                     *(docWriter->infoStream) << L"  purge field=" << current->fieldInfo->name << L"\n";
101                 }
102 
103                 --totalFieldCount;
104             } else {
105                 // Reset
106                 current->lastGen = -1;
107                 lastPerField = current;
108             }
109 
110             current = current->next;
111         }
112     }
113 }
114 
rehash()115 void DocFieldProcessorPerThread::rehash() {
116     int32_t newHashSize = (fieldHash.size() * 2);
117     BOOST_ASSERT(newHashSize > fieldHash.size());
118 
119     Collection<DocFieldProcessorPerFieldPtr> newHashArray(Collection<DocFieldProcessorPerFieldPtr>::newInstance(newHashSize));
120 
121     // Rehash
122     int32_t newHashMask = newHashSize - 1;
123     for (Collection<DocFieldProcessorPerFieldPtr>::iterator fp0 = fieldHash.begin(); fp0 != fieldHash.end(); ++fp0) {
124         DocFieldProcessorPerFieldPtr current(*fp0);
125         while (current) {
126             int32_t hashPos2 = StringUtils::hashCode(current->fieldInfo->name) & newHashMask;
127             DocFieldProcessorPerFieldPtr nextFP0(current->next);
128             current->next = newHashArray[hashPos2];
129             newHashArray[hashPos2] = current;
130             current = nextFP0;
131         }
132     }
133 
134     fieldHash = newHashArray;
135     hashMask = newHashMask;
136 }
137 
138 struct lessFieldInfoName {
operator ()Lucene::lessFieldInfoName139     inline bool operator()(const DocFieldProcessorPerFieldPtr& first, const DocFieldProcessorPerFieldPtr& second) const {
140         return (first->fieldInfo->name < second->fieldInfo->name);
141     }
142 };
143 
processDocument()144 DocWriterPtr DocFieldProcessorPerThread::processDocument() {
145     consumer->startDocument();
146     fieldsWriter->startDocument();
147 
148     DocumentPtr doc(docState->doc);
149 
150     DocFieldProcessorPtr docFieldProcessor(_docFieldProcessor);
151     DocumentsWriterPtr docWriter(docFieldProcessor->_docWriter);
152     bool testPoint = IndexWriterPtr(docWriter->_writer)->testPoint(L"DocumentsWriter.ThreadState.init start");
153     BOOST_ASSERT(testPoint);
154 
155     fieldCount = 0;
156     int32_t thisFieldGen = fieldGen++;
157 
158     Collection<FieldablePtr> docFields(doc->getFields());
159 
160     // Absorb any new fields first seen in this document.
161     // Also absorb any changes to fields we had already seen before (eg suddenly turning on norms or
162     // vectors, etc.)
163     for (Collection<FieldablePtr>::iterator field = docFields.begin(); field != docFields.end(); ++field) {
164         String fieldName((*field)->name());
165 
166         // Make sure we have a PerField allocated
167         int32_t hashPos = StringUtils::hashCode(fieldName) & hashMask;
168 
169         DocFieldProcessorPerFieldPtr fp(fieldHash[hashPos]);
170         while (fp && fp->fieldInfo->name != fieldName) {
171             fp = fp->next;
172         }
173 
174         if (!fp) {
175             FieldInfoPtr fi(fieldInfos->add(fieldName, (*field)->isIndexed(), (*field)->isTermVectorStored(),
176                                             (*field)->isStorePositionWithTermVector(), (*field)->isStoreOffsetWithTermVector(),
177                                             (*field)->getOmitNorms(), false, (*field)->getOmitTermFreqAndPositions()));
178 
179             fp = newLucene<DocFieldProcessorPerField>(shared_from_this(), fi);
180             fp->next = fieldHash[hashPos];
181             fieldHash[hashPos] = fp;
182             ++totalFieldCount;
183 
184             if (totalFieldCount >= fieldHash.size() / 2) {
185                 rehash();
186             }
187         } else {
188             fp->fieldInfo->update((*field)->isIndexed(), (*field)->isTermVectorStored(),
189                                   (*field)->isStorePositionWithTermVector(), (*field)->isStoreOffsetWithTermVector(),
190                                   (*field)->getOmitNorms(), false, (*field)->getOmitTermFreqAndPositions());
191         }
192 
193         if (thisFieldGen != fp->lastGen) {
194             // First time we're seeing this field for this doc
195             fp->fieldCount = 0;
196 
197             if (fieldCount == _fields.size()) {
198                 _fields.resize(_fields.size() * 2);
199             }
200 
201             _fields[fieldCount++] = fp;
202             fp->lastGen = thisFieldGen;
203         }
204 
205         if (fp->fieldCount == fp->fields.size()) {
206             fp->fields.resize(fp->fields.size() * 2);
207         }
208 
209         fp->fields[fp->fieldCount++] = *field;
210         if ((*field)->isStored()) {
211             fieldsWriter->addField(*field, fp->fieldInfo);
212         }
213     }
214 
215     // If we are writing vectors then we must visit fields in sorted order so they are written in sorted order.
216     std::sort(_fields.begin(), _fields.begin() + fieldCount, lessFieldInfoName());
217 
218     for (int32_t i = 0; i < fieldCount; ++i) {
219         _fields[i]->consumer->processFields(_fields[i]->fields, _fields[i]->fieldCount);
220     }
221 
222     if (!docState->maxTermPrefix.empty() && docState->infoStream) {
223         *(docState->infoStream) << L"WARNING: document contains at least one immense term (longer than the max length " <<
224                                 StringUtils::toString(DocumentsWriter::MAX_TERM_LENGTH) << L"), all of which were skipped.  " <<
225                                 L"Please correct the analyzer to not produce such terms.  The prefix of the first immense " <<
226                                 L"term is: '" << StringUtils::toString(docState->maxTermPrefix) << L"...'\n";
227         docState->maxTermPrefix.clear();
228     }
229 
230     DocWriterPtr one(fieldsWriter->finishDocument());
231     DocWriterPtr two(consumer->finishDocument());
232 
233     if (!one) {
234         return two;
235     } else if (!two) {
236         return one;
237     } else {
238         DocFieldProcessorPerThreadPerDocPtr both(getPerDoc());
239         both->docID = docState->docID;
240         BOOST_ASSERT(one->docID == docState->docID);
241         BOOST_ASSERT(two->docID == docState->docID);
242         both->one = one;
243         both->two = two;
244         return both;
245     }
246 }
247 
getPerDoc()248 DocFieldProcessorPerThreadPerDocPtr DocFieldProcessorPerThread::getPerDoc() {
249     SyncLock syncLock(this);
250     if (freeCount == 0) {
251         ++allocCount;
252         if (allocCount > docFreeList.size()) {
253             // Grow our free list up front to make sure we have enough space to recycle all
254             // outstanding PerDoc instances
255             BOOST_ASSERT(allocCount == docFreeList.size() + 1);
256             docFreeList.resize(MiscUtils::getNextSize(allocCount));
257         }
258         return newLucene<DocFieldProcessorPerThreadPerDoc>(shared_from_this());
259     } else {
260         return docFreeList[--freeCount];
261     }
262 }
263 
freePerDoc(const DocFieldProcessorPerThreadPerDocPtr & perDoc)264 void DocFieldProcessorPerThread::freePerDoc(const DocFieldProcessorPerThreadPerDocPtr& perDoc) {
265     SyncLock syncLock(this);
266     BOOST_ASSERT(freeCount < docFreeList.size());
267     docFreeList[freeCount++] = perDoc;
268 }
269 
DocFieldProcessorPerThreadPerDoc(const DocFieldProcessorPerThreadPtr & docProcessor)270 DocFieldProcessorPerThreadPerDoc::DocFieldProcessorPerThreadPerDoc(const DocFieldProcessorPerThreadPtr& docProcessor) {
271     this->_docProcessor = docProcessor;
272 }
273 
~DocFieldProcessorPerThreadPerDoc()274 DocFieldProcessorPerThreadPerDoc::~DocFieldProcessorPerThreadPerDoc() {
275 }
276 
sizeInBytes()277 int64_t DocFieldProcessorPerThreadPerDoc::sizeInBytes() {
278     return one->sizeInBytes() + two->sizeInBytes();
279 }
280 
finish()281 void DocFieldProcessorPerThreadPerDoc::finish() {
282     LuceneException finally;
283     try {
284         try {
285             one->finish();
286         } catch (LuceneException& e) {
287             finally = e;
288         }
289         two->finish();
290     } catch (LuceneException& e) {
291         finally = e;
292     }
293     DocFieldProcessorPerThreadPtr(_docProcessor)->freePerDoc(shared_from_this());
294     finally.throwException();
295 }
296 
abort()297 void DocFieldProcessorPerThreadPerDoc::abort() {
298     LuceneException finally;
299     try {
300         try {
301             one->abort();
302         } catch (LuceneException& e) {
303             finally = e;
304         }
305         two->abort();
306     } catch (LuceneException& e) {
307         finally = e;
308     }
309     DocFieldProcessorPerThreadPtr(_docProcessor)->freePerDoc(shared_from_this());
310     finally.throwException();
311 }
312 
313 }
314