1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/db/exec/text_or.h"
32
33 #include <map>
34 #include <vector>
35
36 #include "mongo/db/concurrency/write_conflict_exception.h"
37 #include "mongo/db/exec/filter.h"
38 #include "mongo/db/exec/index_scan.h"
39 #include "mongo/db/exec/scoped_timer.h"
40 #include "mongo/db/exec/working_set.h"
41 #include "mongo/db/exec/working_set_common.h"
42 #include "mongo/db/exec/working_set_computed_data.h"
43 #include "mongo/db/jsobj.h"
44 #include "mongo/db/record_id.h"
45 #include "mongo/stdx/memory.h"
46
47 namespace mongo {
48
49 using std::unique_ptr;
50 using std::vector;
51 using std::string;
52 using stdx::make_unique;
53
54 using fts::FTSSpec;
55
56 const char* TextOrStage::kStageType = "TEXT_OR";
57
TextOrStage(OperationContext * opCtx,const FTSSpec & ftsSpec,WorkingSet * ws,const MatchExpression * filter,IndexDescriptor * index)58 TextOrStage::TextOrStage(OperationContext* opCtx,
59 const FTSSpec& ftsSpec,
60 WorkingSet* ws,
61 const MatchExpression* filter,
62 IndexDescriptor* index)
63 : PlanStage(kStageType, opCtx),
64 _ftsSpec(ftsSpec),
65 _ws(ws),
66 _scoreIterator(_scores.end()),
67 _filter(filter),
68 _idRetrying(WorkingSet::INVALID_ID),
69 _index(index) {}
70
~TextOrStage()71 TextOrStage::~TextOrStage() {}
72
addChild(unique_ptr<PlanStage> child)73 void TextOrStage::addChild(unique_ptr<PlanStage> child) {
74 _children.push_back(std::move(child));
75 }
76
addChildren(Children childrenToAdd)77 void TextOrStage::addChildren(Children childrenToAdd) {
78 _children.insert(_children.end(),
79 std::make_move_iterator(childrenToAdd.begin()),
80 std::make_move_iterator(childrenToAdd.end()));
81 }
82
isEOF()83 bool TextOrStage::isEOF() {
84 return _internalState == State::kDone;
85 }
86
doSaveState()87 void TextOrStage::doSaveState() {
88 if (_recordCursor) {
89 _recordCursor->saveUnpositioned();
90 }
91 }
92
doRestoreState()93 void TextOrStage::doRestoreState() {
94 if (_recordCursor) {
95 invariant(_recordCursor->restore());
96 }
97 }
98
doDetachFromOperationContext()99 void TextOrStage::doDetachFromOperationContext() {
100 if (_recordCursor)
101 _recordCursor->detachFromOperationContext();
102 }
103
doReattachToOperationContext()104 void TextOrStage::doReattachToOperationContext() {
105 if (_recordCursor)
106 _recordCursor->reattachToOperationContext(getOpCtx());
107 }
108
doInvalidate(OperationContext * opCtx,const RecordId & dl,InvalidationType type)109 void TextOrStage::doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) {
110 // Remove the RecordID from the ScoreMap.
111 ScoreMap::iterator scoreIt = _scores.find(dl);
112 if (scoreIt != _scores.end()) {
113 if (scoreIt == _scoreIterator) {
114 _scoreIterator++;
115 }
116 _scores.erase(scoreIt);
117 }
118 }
119
getStats()120 std::unique_ptr<PlanStageStats> TextOrStage::getStats() {
121 _commonStats.isEOF = isEOF();
122
123 if (_filter) {
124 BSONObjBuilder bob;
125 _filter->serialize(&bob);
126 _commonStats.filter = bob.obj();
127 }
128
129 unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_TEXT_OR);
130 ret->specific = make_unique<TextOrStats>(_specificStats);
131
132 for (auto&& child : _children) {
133 ret->children.emplace_back(child->getStats());
134 }
135
136 return ret;
137 }
138
getSpecificStats() const139 const SpecificStats* TextOrStage::getSpecificStats() const {
140 return &_specificStats;
141 }
142
doWork(WorkingSetID * out)143 PlanStage::StageState TextOrStage::doWork(WorkingSetID* out) {
144 if (isEOF()) {
145 return PlanStage::IS_EOF;
146 }
147
148 PlanStage::StageState stageState = PlanStage::IS_EOF;
149
150 switch (_internalState) {
151 case State::kInit:
152 stageState = initStage(out);
153 break;
154 case State::kReadingTerms:
155 stageState = readFromChildren(out);
156 break;
157 case State::kReturningResults:
158 stageState = returnResults(out);
159 break;
160 case State::kDone:
161 // Should have been handled above.
162 invariant(false);
163 break;
164 }
165
166 return stageState;
167 }
168
initStage(WorkingSetID * out)169 PlanStage::StageState TextOrStage::initStage(WorkingSetID* out) {
170 *out = WorkingSet::INVALID_ID;
171 try {
172 _recordCursor = _index->getCollection()->getCursor(getOpCtx());
173 _internalState = State::kReadingTerms;
174 return PlanStage::NEED_TIME;
175 } catch (const WriteConflictException&) {
176 invariant(_internalState == State::kInit);
177 _recordCursor.reset();
178 return PlanStage::NEED_YIELD;
179 }
180 }
181
readFromChildren(WorkingSetID * out)182 PlanStage::StageState TextOrStage::readFromChildren(WorkingSetID* out) {
183 // Check to see if there were any children added in the first place.
184 if (_children.size() == 0) {
185 _internalState = State::kDone;
186 return PlanStage::IS_EOF;
187 }
188 invariant(_currentChild < _children.size());
189
190 // Either retry the last WSM we worked on or get a new one from our current child.
191 WorkingSetID id;
192 StageState childState;
193 if (_idRetrying == WorkingSet::INVALID_ID) {
194 childState = _children[_currentChild]->work(&id);
195 } else {
196 childState = ADVANCED;
197 id = _idRetrying;
198 _idRetrying = WorkingSet::INVALID_ID;
199 }
200
201 if (PlanStage::ADVANCED == childState) {
202 return addTerm(id, out);
203 } else if (PlanStage::IS_EOF == childState) {
204 // Done with this child.
205 ++_currentChild;
206
207 if (_currentChild < _children.size()) {
208 // We have another child to read from.
209 return PlanStage::NEED_TIME;
210 }
211
212 // If we're here we are done reading results. Move to the next state.
213 _scoreIterator = _scores.begin();
214 _internalState = State::kReturningResults;
215
216 return PlanStage::NEED_TIME;
217 } else if (PlanStage::FAILURE == childState) {
218 // If a stage fails, it may create a status WSM to indicate why it
219 // failed, in which case 'id' is valid. If ID is invalid, we
220 // create our own error message.
221 if (WorkingSet::INVALID_ID == id) {
222 mongoutils::str::stream ss;
223 ss << "TEXT_OR stage failed to read in results from child";
224 Status status(ErrorCodes::InternalError, ss);
225 *out = WorkingSetCommon::allocateStatusMember(_ws, status);
226 } else {
227 *out = id;
228 }
229 return PlanStage::FAILURE;
230 } else {
231 // Propagate WSID from below.
232 *out = id;
233 return childState;
234 }
235 }
236
returnResults(WorkingSetID * out)237 PlanStage::StageState TextOrStage::returnResults(WorkingSetID* out) {
238 if (_scoreIterator == _scores.end()) {
239 _internalState = State::kDone;
240 return PlanStage::IS_EOF;
241 }
242
243 // Retrieve the record that contains the text score.
244 TextRecordData textRecordData = _scoreIterator->second;
245 ++_scoreIterator;
246
247 // Ignore non-matched documents.
248 if (textRecordData.score < 0) {
249 invariant(textRecordData.wsid == WorkingSet::INVALID_ID);
250 return PlanStage::NEED_TIME;
251 }
252
253 WorkingSetMember* wsm = _ws->get(textRecordData.wsid);
254
255 // Populate the working set member with the text score and return it.
256 wsm->addComputed(new TextScoreComputedData(textRecordData.score));
257 *out = textRecordData.wsid;
258 return PlanStage::ADVANCED;
259 }
260
addTerm(WorkingSetID wsid,WorkingSetID * out)261 PlanStage::StageState TextOrStage::addTerm(WorkingSetID wsid, WorkingSetID* out) {
262 WorkingSetMember* wsm = _ws->get(wsid);
263 invariant(wsm->getState() == WorkingSetMember::RID_AND_IDX);
264 invariant(1 == wsm->keyData.size());
265 const IndexKeyDatum newKeyData = wsm->keyData.back(); // copy to keep it around.
266 TextRecordData* textRecordData = &_scores[wsm->recordId];
267
268 if (textRecordData->score < 0) {
269 // We have already rejected this document for not matching the filter.
270 invariant(WorkingSet::INVALID_ID == textRecordData->wsid);
271 _ws->free(wsid);
272 return NEED_TIME;
273 }
274
275 if (WorkingSet::INVALID_ID == textRecordData->wsid) {
276 // We haven't seen this RecordId before.
277 invariant(textRecordData->score == 0);
278
279 if (!Filter::passes(newKeyData.keyData, newKeyData.indexKeyPattern, _filter)) {
280 _ws->free(wsid);
281 textRecordData->score = -1;
282 return NEED_TIME;
283 }
284
285 // Our parent expects RID_AND_OBJ members, so we fetch the document here if we haven't
286 // already.
287 try {
288 if (!WorkingSetCommon::fetch(getOpCtx(), _ws, wsid, _recordCursor)) {
289 _ws->free(wsid);
290 textRecordData->score = -1;
291 return NEED_TIME;
292 }
293 ++_specificStats.fetches;
294 } catch (const WriteConflictException&) {
295 wsm->makeObjOwnedIfNeeded();
296 _idRetrying = wsid;
297 *out = WorkingSet::INVALID_ID;
298 return NEED_YIELD;
299 }
300
301 textRecordData->wsid = wsid;
302
303 // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we yield.
304 wsm->makeObjOwnedIfNeeded();
305 } else {
306 // We already have a working set member for this RecordId. Free the new WSM and retrieve the
307 // old one. Note that since we don't keep all index keys, we could get a score that doesn't
308 // match the document, but this has always been a problem.
309 // TODO something to improve the situation.
310 invariant(wsid != textRecordData->wsid);
311 _ws->free(wsid);
312 wsm = _ws->get(textRecordData->wsid);
313 }
314
315 // Locate score within possibly compound key: {prefix,term,score,suffix}.
316 BSONObjIterator keyIt(newKeyData.keyData);
317 for (unsigned i = 0; i < _ftsSpec.numExtraBefore(); i++) {
318 keyIt.next();
319 }
320
321 keyIt.next(); // Skip past 'term'.
322
323 BSONElement scoreElement = keyIt.next();
324 double documentTermScore = scoreElement.number();
325
326 // Aggregate relevance score, term keys.
327 textRecordData->score += documentTermScore;
328 return NEED_TIME;
329 }
330
331 } // namespace mongo
332