1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "FreqProxTermsWriter.h"
9 #include "FreqProxTermsWriterPerThread.h"
10 #include "FreqProxTermsWriterPerField.h"
11 #include "FreqProxFieldMergeState.h"
12 #include "TermsHashConsumerPerThread.h"
13 #include "TermsHashConsumerPerField.h"
14 #include "TermsHashPerField.h"
15 #include "TermsHashPerThread.h"
16 #include "FormatPostingsDocsConsumer.h"
17 #include "FormatPostingsFieldsConsumer.h"
18 #include "FormatPostingsFieldsWriter.h"
19 #include "FormatPostingsTermsConsumer.h"
20 #include "FormatPostingsPositionsConsumer.h"
21 #include "FieldInfo.h"
22 #include "ByteSliceReader.h"
23 #include "RawPostingList.h"
24 #include "DocumentsWriter.h"
25 #include "UTF8Stream.h"
26 #include "TestPoint.h"
27 
28 namespace Lucene {
29 
~FreqProxTermsWriter()30 FreqProxTermsWriter::~FreqProxTermsWriter() {
31 }
32 
addThread(const TermsHashPerThreadPtr & perThread)33 TermsHashConsumerPerThreadPtr FreqProxTermsWriter::addThread(const TermsHashPerThreadPtr& perThread) {
34     return newLucene<FreqProxTermsWriterPerThread>(perThread);
35 }
36 
createPostings(Collection<RawPostingListPtr> postings,int32_t start,int32_t count)37 void FreqProxTermsWriter::createPostings(Collection<RawPostingListPtr> postings, int32_t start, int32_t count) {
38     int32_t end = start + count;
39     for (int32_t i = start; i < end; ++i) {
40         postings[i] = newLucene<FreqProxTermsWriterPostingList>();
41     }
42 }
43 
compareText(const wchar_t * text1,int32_t pos1,const wchar_t * text2,int32_t pos2)44 int32_t FreqProxTermsWriter::compareText(const wchar_t* text1, int32_t pos1, const wchar_t* text2, int32_t pos2) {
45     while (true) {
46         wchar_t c1 = text1[pos1++];
47         wchar_t c2 = text2[pos2++];
48         if (c1 != c2) {
49             if (c2 == UTF8Base::UNICODE_TERMINATOR) {
50                 return 1;
51             } else if (c1 == UTF8Base::UNICODE_TERMINATOR) {
52                 return -1;
53             } else {
54                 return (c1 - c2);
55             }
56         } else if (c1 == UTF8Base::UNICODE_TERMINATOR) {
57             return 0;
58         }
59     }
60 }
61 
closeDocStore(const SegmentWriteStatePtr & state)62 void FreqProxTermsWriter::closeDocStore(const SegmentWriteStatePtr& state) {
63 }
64 
abort()65 void FreqProxTermsWriter::abort() {
66 }
67 
flush(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)68 void FreqProxTermsWriter::flush(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
69     // Gather all FieldData's that have postings, across all ThreadStates
70     Collection<FreqProxTermsWriterPerFieldPtr> allFields(Collection<FreqProxTermsWriterPerFieldPtr>::newInstance());
71 
72     for (MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
73         for (Collection<TermsHashConsumerPerFieldPtr>::iterator perField = entry->second.begin(); perField != entry->second.end(); ++perField) {
74             FreqProxTermsWriterPerFieldPtr freqProxPerField(boost::static_pointer_cast<FreqProxTermsWriterPerField>(*perField));
75             if (TermsHashPerFieldPtr(freqProxPerField->_termsHashPerField)->numPostings > 0) {
76                 allFields.add(freqProxPerField);
77             }
78         }
79     }
80 
81     // Sort by field name
82     std::sort(allFields.begin(), allFields.end(), luceneCompare<FreqProxTermsWriterPerFieldPtr>());
83 
84     int32_t numAllFields = allFields.size();
85 
86     FormatPostingsFieldsConsumerPtr consumer(newLucene<FormatPostingsFieldsWriter>(state, fieldInfos));
87 
88     // Current writer chain:
89     // FormatPostingsFieldsConsumer
90     // -> IMPL: FormatPostingsFieldsWriter
91     //  -> FormatPostingsTermsConsumer
92     //   -> IMPL: FormatPostingsTermsWriter
93     //       -> FormatPostingsDocConsumer
94     //        -> IMPL: FormatPostingsDocWriter
95     //          -> FormatPostingsPositionsConsumer
96     //            -> IMPL: FormatPostingsPositionsWriter
97 
98     int32_t start = 0;
99     while (start < numAllFields) {
100         FieldInfoPtr fieldInfo(allFields[start]->fieldInfo);
101         String fieldName(fieldInfo->name);
102 
103         int32_t end = start + 1;
104         while (end < numAllFields && allFields[end]->fieldInfo->name == fieldName) {
105             ++end;
106         }
107 
108         Collection<FreqProxTermsWriterPerFieldPtr> fields(Collection<FreqProxTermsWriterPerFieldPtr>::newInstance(end - start));
109         for (int32_t i = start; i < end; ++i) {
110             fields[i - start] = allFields[i];
111 
112             // Aggregate the storePayload as seen by the same field across multiple threads
113             if (fields[i - start]->hasPayloads) {
114                 fieldInfo->storePayloads = true;
115             }
116         }
117 
118         // If this field has postings then add them to the segment
119         appendPostings(fields, consumer);
120 
121         for (int32_t i = 0; i < fields.size(); ++i) {
122             TermsHashPerFieldPtr perField(fields[i]->_termsHashPerField);
123             int32_t numPostings = perField->numPostings;
124             perField->reset();
125             perField->shrinkHash(numPostings);
126             fields[i]->reset();
127         }
128 
129         start = end;
130     }
131 
132     for (MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
133         TermsHashPerThreadPtr(boost::static_pointer_cast<FreqProxTermsWriterPerThread>(entry->first)->_termsHashPerThread)->reset(true);
134     }
135 
136     consumer->finish();
137 }
138 
appendPostings(Collection<FreqProxTermsWriterPerFieldPtr> fields,const FormatPostingsFieldsConsumerPtr & consumer)139 void FreqProxTermsWriter::appendPostings(Collection<FreqProxTermsWriterPerFieldPtr> fields, const FormatPostingsFieldsConsumerPtr& consumer) {
140     TestScope testScope(L"FreqProxTermsWriter", L"appendPostings");
141     int32_t numFields = fields.size();
142 
143     Collection<FreqProxFieldMergeStatePtr> mergeStates(Collection<FreqProxFieldMergeStatePtr>::newInstance(numFields));
144 
145     for (int32_t i = 0; i < numFields; ++i) {
146         FreqProxFieldMergeStatePtr fms(newLucene<FreqProxFieldMergeState>(fields[i]));
147         mergeStates[i] = fms;
148 
149         BOOST_ASSERT(fms->field->fieldInfo == fields[0]->fieldInfo);
150 
151         // Should always be true
152         bool result = fms->nextTerm();
153         BOOST_ASSERT(result);
154     }
155 
156     FormatPostingsTermsConsumerPtr termsConsumer(consumer->addField(fields[0]->fieldInfo));
157 
158     Collection<FreqProxFieldMergeStatePtr> termStates(Collection<FreqProxFieldMergeStatePtr>::newInstance(numFields));
159 
160     bool currentFieldOmitTermFreqAndPositions = fields[0]->fieldInfo->omitTermFreqAndPositions;
161 
162     while (numFields > 0) {
163         // Get the next term to merge
164         termStates[0] = mergeStates[0];
165         int32_t numToMerge = 1;
166 
167         for (int32_t i = 1; i < numFields; ++i) {
168             CharArray text = mergeStates[i]->text;
169             int32_t textOffset = mergeStates[i]->textOffset;
170             int32_t cmp = compareText(text.get(), textOffset, termStates[0]->text.get(), termStates[0]->textOffset);
171 
172             if (cmp < 0) {
173                 termStates[0] = mergeStates[i];
174                 numToMerge = 1;
175             } else if (cmp == 0) {
176                 termStates[numToMerge++] = mergeStates[i];
177             }
178         }
179 
180         FormatPostingsDocsConsumerPtr docConsumer(termsConsumer->addTerm(termStates[0]->text, termStates[0]->textOffset));
181 
182         // Now termStates has numToMerge FieldMergeStates which all share the same term.  Now we must
183         // interleave the docID streams.
184         while (numToMerge > 0) {
185             FreqProxFieldMergeStatePtr minState(termStates[0]);
186             for (int32_t i = 1; i < numToMerge; ++i) {
187                 if (termStates[i]->docID < minState->docID) {
188                     minState = termStates[i];
189                 }
190             }
191 
192             int32_t termDocFreq = minState->termFreq;
193 
194             FormatPostingsPositionsConsumerPtr posConsumer(docConsumer->addDoc(minState->docID, termDocFreq));
195 
196             ByteSliceReaderPtr prox(minState->prox);
197 
198             // Carefully copy over the prox + payload info, changing the format to match Lucene's segment format.
199             if (!currentFieldOmitTermFreqAndPositions) {
200                 // omitTermFreqAndPositions == false so we do write positions & payload
201                 int32_t position = 0;
202                 for (int32_t j = 0; j < termDocFreq; ++j) {
203                     int32_t code = prox->readVInt();
204                     position += (code >> 1);
205 
206                     int32_t payloadLength;
207                     if ((code & 1) != 0) {
208                         // This position has a payload
209                         payloadLength = prox->readVInt();
210 
211                         if (!payloadBuffer) {
212                             payloadBuffer = ByteArray::newInstance(payloadLength);
213                         }
214                         if (payloadBuffer.size() < payloadLength) {
215                             payloadBuffer.resize(payloadLength);
216                         }
217 
218                         prox->readBytes(payloadBuffer.get(), 0, payloadLength);
219                     } else {
220                         payloadLength = 0;
221                     }
222 
223                     posConsumer->addPosition(position, payloadBuffer, 0, payloadLength);
224                 }
225 
226                 posConsumer->finish();
227             }
228 
229             if (!minState->nextDoc()) {
230                 // Remove from termStates
231                 int32_t upto = 0;
232                 for (int32_t i = 0; i < numToMerge; ++i) {
233                     if (termStates[i] != minState) {
234                         termStates[upto++] = termStates[i];
235                     }
236                 }
237                 --numToMerge;
238                 BOOST_ASSERT(upto == numToMerge);
239 
240                 // Advance this state to the next term
241 
242                 if (!minState->nextTerm()) {
243                     // OK, no more terms, so remove from mergeStates as well
244                     upto = 0;
245                     for (int32_t i = 0; i < numFields; ++i) {
246                         if (mergeStates[i] != minState) {
247                             mergeStates[upto++] = mergeStates[i];
248                         }
249                     }
250                     --numFields;
251                     BOOST_ASSERT(upto == numFields);
252                 }
253             }
254         }
255 
256         docConsumer->finish();
257     }
258 
259     termsConsumer->finish();
260 }
261 
bytesPerPosting()262 int32_t FreqProxTermsWriter::bytesPerPosting() {
263     return RawPostingList::BYTES_SIZE + 4 * DocumentsWriter::INT_NUM_BYTE;
264 }
265 
FreqProxTermsWriterPostingList()266 FreqProxTermsWriterPostingList::FreqProxTermsWriterPostingList() {
267     docFreq = 0;
268     lastDocID = 0;
269     lastDocCode = 0;
270     lastPosition = 0;
271 }
272 
~FreqProxTermsWriterPostingList()273 FreqProxTermsWriterPostingList::~FreqProxTermsWriterPostingList() {
274 }
275 
276 }
277