1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/db/exec/text_or.h"
32 
33 #include <map>
34 #include <vector>
35 
36 #include "mongo/db/concurrency/write_conflict_exception.h"
37 #include "mongo/db/exec/filter.h"
38 #include "mongo/db/exec/index_scan.h"
39 #include "mongo/db/exec/scoped_timer.h"
40 #include "mongo/db/exec/working_set.h"
41 #include "mongo/db/exec/working_set_common.h"
42 #include "mongo/db/exec/working_set_computed_data.h"
43 #include "mongo/db/jsobj.h"
44 #include "mongo/db/record_id.h"
45 #include "mongo/stdx/memory.h"
46 
47 namespace mongo {
48 
49 using std::unique_ptr;
50 using std::vector;
51 using std::string;
52 using stdx::make_unique;
53 
54 using fts::FTSSpec;
55 
56 const char* TextOrStage::kStageType = "TEXT_OR";
57 
TextOrStage(OperationContext * opCtx,const FTSSpec & ftsSpec,WorkingSet * ws,const MatchExpression * filter,IndexDescriptor * index)58 TextOrStage::TextOrStage(OperationContext* opCtx,
59                          const FTSSpec& ftsSpec,
60                          WorkingSet* ws,
61                          const MatchExpression* filter,
62                          IndexDescriptor* index)
63     : PlanStage(kStageType, opCtx),
64       _ftsSpec(ftsSpec),
65       _ws(ws),
66       _scoreIterator(_scores.end()),
67       _filter(filter),
68       _idRetrying(WorkingSet::INVALID_ID),
69       _index(index) {}
70 
~TextOrStage()71 TextOrStage::~TextOrStage() {}
72 
addChild(unique_ptr<PlanStage> child)73 void TextOrStage::addChild(unique_ptr<PlanStage> child) {
74     _children.push_back(std::move(child));
75 }
76 
addChildren(Children childrenToAdd)77 void TextOrStage::addChildren(Children childrenToAdd) {
78     _children.insert(_children.end(),
79                      std::make_move_iterator(childrenToAdd.begin()),
80                      std::make_move_iterator(childrenToAdd.end()));
81 }
82 
isEOF()83 bool TextOrStage::isEOF() {
84     return _internalState == State::kDone;
85 }
86 
doSaveState()87 void TextOrStage::doSaveState() {
88     if (_recordCursor) {
89         _recordCursor->saveUnpositioned();
90     }
91 }
92 
doRestoreState()93 void TextOrStage::doRestoreState() {
94     if (_recordCursor) {
95         invariant(_recordCursor->restore());
96     }
97 }
98 
doDetachFromOperationContext()99 void TextOrStage::doDetachFromOperationContext() {
100     if (_recordCursor)
101         _recordCursor->detachFromOperationContext();
102 }
103 
doReattachToOperationContext()104 void TextOrStage::doReattachToOperationContext() {
105     if (_recordCursor)
106         _recordCursor->reattachToOperationContext(getOpCtx());
107 }
108 
doInvalidate(OperationContext * opCtx,const RecordId & dl,InvalidationType type)109 void TextOrStage::doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) {
110     // Remove the RecordID from the ScoreMap.
111     ScoreMap::iterator scoreIt = _scores.find(dl);
112     if (scoreIt != _scores.end()) {
113         if (scoreIt == _scoreIterator) {
114             _scoreIterator++;
115         }
116         _scores.erase(scoreIt);
117     }
118 }
119 
getStats()120 std::unique_ptr<PlanStageStats> TextOrStage::getStats() {
121     _commonStats.isEOF = isEOF();
122 
123     if (_filter) {
124         BSONObjBuilder bob;
125         _filter->serialize(&bob);
126         _commonStats.filter = bob.obj();
127     }
128 
129     unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_TEXT_OR);
130     ret->specific = make_unique<TextOrStats>(_specificStats);
131 
132     for (auto&& child : _children) {
133         ret->children.emplace_back(child->getStats());
134     }
135 
136     return ret;
137 }
138 
getSpecificStats() const139 const SpecificStats* TextOrStage::getSpecificStats() const {
140     return &_specificStats;
141 }
142 
doWork(WorkingSetID * out)143 PlanStage::StageState TextOrStage::doWork(WorkingSetID* out) {
144     if (isEOF()) {
145         return PlanStage::IS_EOF;
146     }
147 
148     PlanStage::StageState stageState = PlanStage::IS_EOF;
149 
150     switch (_internalState) {
151         case State::kInit:
152             stageState = initStage(out);
153             break;
154         case State::kReadingTerms:
155             stageState = readFromChildren(out);
156             break;
157         case State::kReturningResults:
158             stageState = returnResults(out);
159             break;
160         case State::kDone:
161             // Should have been handled above.
162             invariant(false);
163             break;
164     }
165 
166     return stageState;
167 }
168 
initStage(WorkingSetID * out)169 PlanStage::StageState TextOrStage::initStage(WorkingSetID* out) {
170     *out = WorkingSet::INVALID_ID;
171     try {
172         _recordCursor = _index->getCollection()->getCursor(getOpCtx());
173         _internalState = State::kReadingTerms;
174         return PlanStage::NEED_TIME;
175     } catch (const WriteConflictException&) {
176         invariant(_internalState == State::kInit);
177         _recordCursor.reset();
178         return PlanStage::NEED_YIELD;
179     }
180 }
181 
readFromChildren(WorkingSetID * out)182 PlanStage::StageState TextOrStage::readFromChildren(WorkingSetID* out) {
183     // Check to see if there were any children added in the first place.
184     if (_children.size() == 0) {
185         _internalState = State::kDone;
186         return PlanStage::IS_EOF;
187     }
188     invariant(_currentChild < _children.size());
189 
190     // Either retry the last WSM we worked on or get a new one from our current child.
191     WorkingSetID id;
192     StageState childState;
193     if (_idRetrying == WorkingSet::INVALID_ID) {
194         childState = _children[_currentChild]->work(&id);
195     } else {
196         childState = ADVANCED;
197         id = _idRetrying;
198         _idRetrying = WorkingSet::INVALID_ID;
199     }
200 
201     if (PlanStage::ADVANCED == childState) {
202         return addTerm(id, out);
203     } else if (PlanStage::IS_EOF == childState) {
204         // Done with this child.
205         ++_currentChild;
206 
207         if (_currentChild < _children.size()) {
208             // We have another child to read from.
209             return PlanStage::NEED_TIME;
210         }
211 
212         // If we're here we are done reading results.  Move to the next state.
213         _scoreIterator = _scores.begin();
214         _internalState = State::kReturningResults;
215 
216         return PlanStage::NEED_TIME;
217     } else if (PlanStage::FAILURE == childState) {
218         // If a stage fails, it may create a status WSM to indicate why it
219         // failed, in which case 'id' is valid.  If ID is invalid, we
220         // create our own error message.
221         if (WorkingSet::INVALID_ID == id) {
222             mongoutils::str::stream ss;
223             ss << "TEXT_OR stage failed to read in results from child";
224             Status status(ErrorCodes::InternalError, ss);
225             *out = WorkingSetCommon::allocateStatusMember(_ws, status);
226         } else {
227             *out = id;
228         }
229         return PlanStage::FAILURE;
230     } else {
231         // Propagate WSID from below.
232         *out = id;
233         return childState;
234     }
235 }
236 
returnResults(WorkingSetID * out)237 PlanStage::StageState TextOrStage::returnResults(WorkingSetID* out) {
238     if (_scoreIterator == _scores.end()) {
239         _internalState = State::kDone;
240         return PlanStage::IS_EOF;
241     }
242 
243     // Retrieve the record that contains the text score.
244     TextRecordData textRecordData = _scoreIterator->second;
245     ++_scoreIterator;
246 
247     // Ignore non-matched documents.
248     if (textRecordData.score < 0) {
249         invariant(textRecordData.wsid == WorkingSet::INVALID_ID);
250         return PlanStage::NEED_TIME;
251     }
252 
253     WorkingSetMember* wsm = _ws->get(textRecordData.wsid);
254 
255     // Populate the working set member with the text score and return it.
256     wsm->addComputed(new TextScoreComputedData(textRecordData.score));
257     *out = textRecordData.wsid;
258     return PlanStage::ADVANCED;
259 }
260 
addTerm(WorkingSetID wsid,WorkingSetID * out)261 PlanStage::StageState TextOrStage::addTerm(WorkingSetID wsid, WorkingSetID* out) {
262     WorkingSetMember* wsm = _ws->get(wsid);
263     invariant(wsm->getState() == WorkingSetMember::RID_AND_IDX);
264     invariant(1 == wsm->keyData.size());
265     const IndexKeyDatum newKeyData = wsm->keyData.back();  // copy to keep it around.
266     TextRecordData* textRecordData = &_scores[wsm->recordId];
267 
268     if (textRecordData->score < 0) {
269         // We have already rejected this document for not matching the filter.
270         invariant(WorkingSet::INVALID_ID == textRecordData->wsid);
271         _ws->free(wsid);
272         return NEED_TIME;
273     }
274 
275     if (WorkingSet::INVALID_ID == textRecordData->wsid) {
276         // We haven't seen this RecordId before.
277         invariant(textRecordData->score == 0);
278 
279         if (!Filter::passes(newKeyData.keyData, newKeyData.indexKeyPattern, _filter)) {
280             _ws->free(wsid);
281             textRecordData->score = -1;
282             return NEED_TIME;
283         }
284 
285         // Our parent expects RID_AND_OBJ members, so we fetch the document here if we haven't
286         // already.
287         try {
288             if (!WorkingSetCommon::fetch(getOpCtx(), _ws, wsid, _recordCursor)) {
289                 _ws->free(wsid);
290                 textRecordData->score = -1;
291                 return NEED_TIME;
292             }
293             ++_specificStats.fetches;
294         } catch (const WriteConflictException&) {
295             wsm->makeObjOwnedIfNeeded();
296             _idRetrying = wsid;
297             *out = WorkingSet::INVALID_ID;
298             return NEED_YIELD;
299         }
300 
301         textRecordData->wsid = wsid;
302 
303         // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we yield.
304         wsm->makeObjOwnedIfNeeded();
305     } else {
306         // We already have a working set member for this RecordId. Free the new WSM and retrieve the
307         // old one. Note that since we don't keep all index keys, we could get a score that doesn't
308         // match the document, but this has always been a problem.
309         // TODO something to improve the situation.
310         invariant(wsid != textRecordData->wsid);
311         _ws->free(wsid);
312         wsm = _ws->get(textRecordData->wsid);
313     }
314 
315     // Locate score within possibly compound key: {prefix,term,score,suffix}.
316     BSONObjIterator keyIt(newKeyData.keyData);
317     for (unsigned i = 0; i < _ftsSpec.numExtraBefore(); i++) {
318         keyIt.next();
319     }
320 
321     keyIt.next();  // Skip past 'term'.
322 
323     BSONElement scoreElement = keyIt.next();
324     double documentTermScore = scoreElement.number();
325 
326     // Aggregate relevance score, term keys.
327     textRecordData->score += documentTermScore;
328     return NEED_TIME;
329 }
330 
331 }  // namespace mongo
332