1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "DocFieldProcessorPerThread.h"
9 #include "DocFieldProcessorPerField.h"
10 #include "DocFieldProcessor.h"
11 #include "DocFieldConsumer.h"
12 #include "DocFieldConsumerPerThread.h"
13 #include "DocFieldConsumerPerField.h"
14 #include "DocumentsWriterThreadState.h"
15 #include "DocumentsWriter.h"
16 #include "StoredFieldsWriter.h"
17 #include "StoredFieldsWriterPerThread.h"
18 #include "SegmentWriteState.h"
19 #include "FieldInfo.h"
20 #include "FieldInfos.h"
21 #include "Fieldable.h"
22 #include "IndexWriter.h"
23 #include "Document.h"
24 #include "InfoStream.h"
25 #include "MiscUtils.h"
26 #include "StringUtils.h"
27
28 namespace Lucene {
29
DocFieldProcessorPerThread(const DocumentsWriterThreadStatePtr & threadState,const DocFieldProcessorPtr & docFieldProcessor)30 DocFieldProcessorPerThread::DocFieldProcessorPerThread(const DocumentsWriterThreadStatePtr& threadState, const DocFieldProcessorPtr& docFieldProcessor) {
31 _fields = Collection<DocFieldProcessorPerFieldPtr>::newInstance(1);
32 fieldHash = Collection<DocFieldProcessorPerFieldPtr>::newInstance(2);
33 hashMask = 1;
34 fieldGen = 0;
35 fieldCount = 0;
36 totalFieldCount = 0;
37
38 this->docState = threadState->docState;
39 this->_docFieldProcessor = docFieldProcessor;
40 this->fieldInfos = docFieldProcessor->fieldInfos;
41
42 docFreeList = Collection<DocFieldProcessorPerThreadPerDocPtr>::newInstance(1);
43 freeCount = 0;
44 allocCount = 0;
45 }
46
~DocFieldProcessorPerThread()47 DocFieldProcessorPerThread::~DocFieldProcessorPerThread() {
48 }
49
initialize()50 void DocFieldProcessorPerThread::initialize() {
51 DocFieldProcessorPtr docFieldProcessor(_docFieldProcessor);
52 consumer = docFieldProcessor->consumer->addThread(shared_from_this());
53 fieldsWriter = docFieldProcessor->fieldsWriter->addThread(docState);
54 }
55
abort()56 void DocFieldProcessorPerThread::abort() {
57 for (Collection<DocFieldProcessorPerFieldPtr>::iterator field = fieldHash.begin(); field != fieldHash.end(); ++field) {
58 DocFieldProcessorPerFieldPtr current(*field);
59 while (current) {
60 DocFieldProcessorPerFieldPtr next(current->next);
61 current->abort();
62 current = next;
63 }
64 }
65 fieldsWriter->abort();
66 consumer->abort();
67 }
68
fields()69 Collection<DocFieldConsumerPerFieldPtr> DocFieldProcessorPerThread::fields() {
70 Collection<DocFieldConsumerPerFieldPtr> fields(Collection<DocFieldConsumerPerFieldPtr>::newInstance());
71 for (Collection<DocFieldProcessorPerFieldPtr>::iterator field = fieldHash.begin(); field != fieldHash.end(); ++field) {
72 DocFieldProcessorPerFieldPtr current(*field);
73 while (current) {
74 fields.add(current->consumer);
75 current = current->next;
76 }
77 }
78 BOOST_ASSERT(fields.size() == totalFieldCount);
79 return fields;
80 }
81
trimFields(const SegmentWriteStatePtr & state)82 void DocFieldProcessorPerThread::trimFields(const SegmentWriteStatePtr& state) {
83 for (Collection<DocFieldProcessorPerFieldPtr>::iterator perField = fieldHash.begin(); perField != fieldHash.end(); ++perField) {
84 DocFieldProcessorPerFieldPtr current(*perField);
85 DocFieldProcessorPerFieldPtr lastPerField;
86
87 while (current) {
88 if (current->lastGen == -1) {
89 // This field was not seen since the previous flush, so, free up its resources now
90
91 // Unhash
92 if (!lastPerField) {
93 *perField = current->next;
94 } else {
95 lastPerField->next = current->next;
96 }
97
98 DocumentsWriterPtr docWriter(state->_docWriter);
99 if (docWriter->infoStream) {
100 *(docWriter->infoStream) << L" purge field=" << current->fieldInfo->name << L"\n";
101 }
102
103 --totalFieldCount;
104 } else {
105 // Reset
106 current->lastGen = -1;
107 lastPerField = current;
108 }
109
110 current = current->next;
111 }
112 }
113 }
114
rehash()115 void DocFieldProcessorPerThread::rehash() {
116 int32_t newHashSize = (fieldHash.size() * 2);
117 BOOST_ASSERT(newHashSize > fieldHash.size());
118
119 Collection<DocFieldProcessorPerFieldPtr> newHashArray(Collection<DocFieldProcessorPerFieldPtr>::newInstance(newHashSize));
120
121 // Rehash
122 int32_t newHashMask = newHashSize - 1;
123 for (Collection<DocFieldProcessorPerFieldPtr>::iterator fp0 = fieldHash.begin(); fp0 != fieldHash.end(); ++fp0) {
124 DocFieldProcessorPerFieldPtr current(*fp0);
125 while (current) {
126 int32_t hashPos2 = StringUtils::hashCode(current->fieldInfo->name) & newHashMask;
127 DocFieldProcessorPerFieldPtr nextFP0(current->next);
128 current->next = newHashArray[hashPos2];
129 newHashArray[hashPos2] = current;
130 current = nextFP0;
131 }
132 }
133
134 fieldHash = newHashArray;
135 hashMask = newHashMask;
136 }
137
138 struct lessFieldInfoName {
operator ()Lucene::lessFieldInfoName139 inline bool operator()(const DocFieldProcessorPerFieldPtr& first, const DocFieldProcessorPerFieldPtr& second) const {
140 return (first->fieldInfo->name < second->fieldInfo->name);
141 }
142 };
143
processDocument()144 DocWriterPtr DocFieldProcessorPerThread::processDocument() {
145 consumer->startDocument();
146 fieldsWriter->startDocument();
147
148 DocumentPtr doc(docState->doc);
149
150 DocFieldProcessorPtr docFieldProcessor(_docFieldProcessor);
151 DocumentsWriterPtr docWriter(docFieldProcessor->_docWriter);
152 bool testPoint = IndexWriterPtr(docWriter->_writer)->testPoint(L"DocumentsWriter.ThreadState.init start");
153 BOOST_ASSERT(testPoint);
154
155 fieldCount = 0;
156 int32_t thisFieldGen = fieldGen++;
157
158 Collection<FieldablePtr> docFields(doc->getFields());
159
160 // Absorb any new fields first seen in this document.
161 // Also absorb any changes to fields we had already seen before (eg suddenly turning on norms or
162 // vectors, etc.)
163 for (Collection<FieldablePtr>::iterator field = docFields.begin(); field != docFields.end(); ++field) {
164 String fieldName((*field)->name());
165
166 // Make sure we have a PerField allocated
167 int32_t hashPos = StringUtils::hashCode(fieldName) & hashMask;
168
169 DocFieldProcessorPerFieldPtr fp(fieldHash[hashPos]);
170 while (fp && fp->fieldInfo->name != fieldName) {
171 fp = fp->next;
172 }
173
174 if (!fp) {
175 FieldInfoPtr fi(fieldInfos->add(fieldName, (*field)->isIndexed(), (*field)->isTermVectorStored(),
176 (*field)->isStorePositionWithTermVector(), (*field)->isStoreOffsetWithTermVector(),
177 (*field)->getOmitNorms(), false, (*field)->getOmitTermFreqAndPositions()));
178
179 fp = newLucene<DocFieldProcessorPerField>(shared_from_this(), fi);
180 fp->next = fieldHash[hashPos];
181 fieldHash[hashPos] = fp;
182 ++totalFieldCount;
183
184 if (totalFieldCount >= fieldHash.size() / 2) {
185 rehash();
186 }
187 } else {
188 fp->fieldInfo->update((*field)->isIndexed(), (*field)->isTermVectorStored(),
189 (*field)->isStorePositionWithTermVector(), (*field)->isStoreOffsetWithTermVector(),
190 (*field)->getOmitNorms(), false, (*field)->getOmitTermFreqAndPositions());
191 }
192
193 if (thisFieldGen != fp->lastGen) {
194 // First time we're seeing this field for this doc
195 fp->fieldCount = 0;
196
197 if (fieldCount == _fields.size()) {
198 _fields.resize(_fields.size() * 2);
199 }
200
201 _fields[fieldCount++] = fp;
202 fp->lastGen = thisFieldGen;
203 }
204
205 if (fp->fieldCount == fp->fields.size()) {
206 fp->fields.resize(fp->fields.size() * 2);
207 }
208
209 fp->fields[fp->fieldCount++] = *field;
210 if ((*field)->isStored()) {
211 fieldsWriter->addField(*field, fp->fieldInfo);
212 }
213 }
214
215 // If we are writing vectors then we must visit fields in sorted order so they are written in sorted order.
216 std::sort(_fields.begin(), _fields.begin() + fieldCount, lessFieldInfoName());
217
218 for (int32_t i = 0; i < fieldCount; ++i) {
219 _fields[i]->consumer->processFields(_fields[i]->fields, _fields[i]->fieldCount);
220 }
221
222 if (!docState->maxTermPrefix.empty() && docState->infoStream) {
223 *(docState->infoStream) << L"WARNING: document contains at least one immense term (longer than the max length " <<
224 StringUtils::toString(DocumentsWriter::MAX_TERM_LENGTH) << L"), all of which were skipped. " <<
225 L"Please correct the analyzer to not produce such terms. The prefix of the first immense " <<
226 L"term is: '" << StringUtils::toString(docState->maxTermPrefix) << L"...'\n";
227 docState->maxTermPrefix.clear();
228 }
229
230 DocWriterPtr one(fieldsWriter->finishDocument());
231 DocWriterPtr two(consumer->finishDocument());
232
233 if (!one) {
234 return two;
235 } else if (!two) {
236 return one;
237 } else {
238 DocFieldProcessorPerThreadPerDocPtr both(getPerDoc());
239 both->docID = docState->docID;
240 BOOST_ASSERT(one->docID == docState->docID);
241 BOOST_ASSERT(two->docID == docState->docID);
242 both->one = one;
243 both->two = two;
244 return both;
245 }
246 }
247
getPerDoc()248 DocFieldProcessorPerThreadPerDocPtr DocFieldProcessorPerThread::getPerDoc() {
249 SyncLock syncLock(this);
250 if (freeCount == 0) {
251 ++allocCount;
252 if (allocCount > docFreeList.size()) {
253 // Grow our free list up front to make sure we have enough space to recycle all
254 // outstanding PerDoc instances
255 BOOST_ASSERT(allocCount == docFreeList.size() + 1);
256 docFreeList.resize(MiscUtils::getNextSize(allocCount));
257 }
258 return newLucene<DocFieldProcessorPerThreadPerDoc>(shared_from_this());
259 } else {
260 return docFreeList[--freeCount];
261 }
262 }
263
freePerDoc(const DocFieldProcessorPerThreadPerDocPtr & perDoc)264 void DocFieldProcessorPerThread::freePerDoc(const DocFieldProcessorPerThreadPerDocPtr& perDoc) {
265 SyncLock syncLock(this);
266 BOOST_ASSERT(freeCount < docFreeList.size());
267 docFreeList[freeCount++] = perDoc;
268 }
269
DocFieldProcessorPerThreadPerDoc(const DocFieldProcessorPerThreadPtr & docProcessor)270 DocFieldProcessorPerThreadPerDoc::DocFieldProcessorPerThreadPerDoc(const DocFieldProcessorPerThreadPtr& docProcessor) {
271 this->_docProcessor = docProcessor;
272 }
273
~DocFieldProcessorPerThreadPerDoc()274 DocFieldProcessorPerThreadPerDoc::~DocFieldProcessorPerThreadPerDoc() {
275 }
276
sizeInBytes()277 int64_t DocFieldProcessorPerThreadPerDoc::sizeInBytes() {
278 return one->sizeInBytes() + two->sizeInBytes();
279 }
280
finish()281 void DocFieldProcessorPerThreadPerDoc::finish() {
282 LuceneException finally;
283 try {
284 try {
285 one->finish();
286 } catch (LuceneException& e) {
287 finally = e;
288 }
289 two->finish();
290 } catch (LuceneException& e) {
291 finally = e;
292 }
293 DocFieldProcessorPerThreadPtr(_docProcessor)->freePerDoc(shared_from_this());
294 finally.throwException();
295 }
296
abort()297 void DocFieldProcessorPerThreadPerDoc::abort() {
298 LuceneException finally;
299 try {
300 try {
301 one->abort();
302 } catch (LuceneException& e) {
303 finally = e;
304 }
305 two->abort();
306 } catch (LuceneException& e) {
307 finally = e;
308 }
309 DocFieldProcessorPerThreadPtr(_docProcessor)->freePerDoc(shared_from_this());
310 finally.throwException();
311 }
312
313 }
314