1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/exec/multi_plan.h"
36
37 #include <algorithm>
38 #include <math.h>
39
40 #include "mongo/base/owned_pointer_vector.h"
41 #include "mongo/db/catalog/collection.h"
42 #include "mongo/db/catalog/database.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/concurrency/write_conflict_exception.h"
45 #include "mongo/db/exec/scoped_timer.h"
46 #include "mongo/db/exec/working_set_common.h"
47 #include "mongo/db/query/explain.h"
48 #include "mongo/db/query/plan_cache.h"
49 #include "mongo/db/query/plan_ranker.h"
50 #include "mongo/db/storage/record_fetcher.h"
51 #include "mongo/stdx/memory.h"
52 #include "mongo/util/log.h"
53 #include "mongo/util/mongoutils/str.h"
54
55 namespace mongo {
56
57 using std::endl;
58 using std::list;
59 using std::unique_ptr;
60 using std::vector;
61 using stdx::make_unique;
62
63 // static
64 const char* MultiPlanStage::kStageType = "MULTI_PLAN";
65
MultiPlanStage(OperationContext * opCtx,const Collection * collection,CanonicalQuery * cq,CachingMode cachingMode)66 MultiPlanStage::MultiPlanStage(OperationContext* opCtx,
67 const Collection* collection,
68 CanonicalQuery* cq,
69 CachingMode cachingMode)
70 : PlanStage(kStageType, opCtx),
71 _collection(collection),
72 _cachingMode(cachingMode),
73 _query(cq),
74 _bestPlanIdx(kNoSuchPlan),
75 _backupPlanIdx(kNoSuchPlan),
76 _failure(false),
77 _failureCount(0),
78 _statusMemberId(WorkingSet::INVALID_ID) {
79 invariant(_collection);
80 }
81
addPlan(QuerySolution * solution,PlanStage * root,WorkingSet * ws)82 void MultiPlanStage::addPlan(QuerySolution* solution, PlanStage* root, WorkingSet* ws) {
83 _candidates.push_back(CandidatePlan(solution, root, ws));
84 _children.emplace_back(root);
85 }
86
isEOF()87 bool MultiPlanStage::isEOF() {
88 if (_failure) {
89 return true;
90 }
91
92 // If _bestPlanIdx hasn't been found, can't be at EOF
93 if (!bestPlanChosen()) {
94 return false;
95 }
96
97 // We must have returned all our cached results
98 // and there must be no more results from the best plan.
99 CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
100 return bestPlan.results.empty() && bestPlan.root->isEOF();
101 }
102
doWork(WorkingSetID * out)103 PlanStage::StageState MultiPlanStage::doWork(WorkingSetID* out) {
104 if (_failure) {
105 *out = _statusMemberId;
106 return PlanStage::FAILURE;
107 }
108
109 CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
110
111 // Look for an already produced result that provides the data the caller wants.
112 if (!bestPlan.results.empty()) {
113 *out = bestPlan.results.front();
114 bestPlan.results.pop_front();
115 return PlanStage::ADVANCED;
116 }
117
118 // best plan had no (or has no more) cached results
119
120 StageState state = bestPlan.root->work(out);
121
122 if (PlanStage::FAILURE == state && hasBackupPlan()) {
123 LOG(5) << "Best plan errored out switching to backup";
124 // Uncache the bad solution if we fall back
125 // on the backup solution.
126 //
127 // XXX: Instead of uncaching we should find a way for the
128 // cached plan runner to fall back on a different solution
129 // if the best solution fails. Alternatively we could try to
130 // defer cache insertion to be after the first produced result.
131
132 _collection->infoCache()->getPlanCache()->remove(*_query).transitional_ignore();
133
134 _bestPlanIdx = _backupPlanIdx;
135 _backupPlanIdx = kNoSuchPlan;
136
137 return _candidates[_bestPlanIdx].root->work(out);
138 }
139
140 if (hasBackupPlan() && PlanStage::ADVANCED == state) {
141 LOG(5) << "Best plan had a blocking stage, became unblocked";
142 _backupPlanIdx = kNoSuchPlan;
143 }
144
145 return state;
146 }
147
tryYield(PlanYieldPolicy * yieldPolicy)148 Status MultiPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
149 // These are the conditions which can cause us to yield:
150 // 1) The yield policy's timer elapsed, or
151 // 2) some stage requested a yield due to a document fetch, or
152 // 3) we need to yield and retry due to a WriteConflictException.
153 // In all cases, the actual yielding happens here.
154 if (yieldPolicy->shouldYield()) {
155 auto yieldStatus = yieldPolicy->yield(_fetcher.get());
156
157 if (!yieldStatus.isOK()) {
158 _failure = true;
159 _statusMemberId =
160 WorkingSetCommon::allocateStatusMember(_candidates[0].ws, yieldStatus);
161 return yieldStatus;
162 }
163 }
164
165 // We're done using the fetcher, so it should be freed. We don't want to
166 // use the same RecordFetcher twice.
167 _fetcher.reset();
168
169 return Status::OK();
170 }
171
172 // static
getTrialPeriodWorks(OperationContext * opCtx,const Collection * collection)173 size_t MultiPlanStage::getTrialPeriodWorks(OperationContext* opCtx, const Collection* collection) {
174 // Run each plan some number of times. This number is at least as great as
175 // 'internalQueryPlanEvaluationWorks', but may be larger for big collections.
176 size_t numWorks = internalQueryPlanEvaluationWorks.load();
177 if (NULL != collection) {
178 // For large collections, the number of works is set to be this
179 // fraction of the collection size.
180 double fraction = internalQueryPlanEvaluationCollFraction;
181
182 numWorks = std::max(static_cast<size_t>(internalQueryPlanEvaluationWorks.load()),
183 static_cast<size_t>(fraction * collection->numRecords(opCtx)));
184 }
185
186 return numWorks;
187 }
188
189 // static
getTrialPeriodNumToReturn(const CanonicalQuery & query)190 size_t MultiPlanStage::getTrialPeriodNumToReturn(const CanonicalQuery& query) {
191 // Determine the number of results which we will produce during the plan
192 // ranking phase before stopping.
193 size_t numResults = static_cast<size_t>(internalQueryPlanEvaluationMaxResults.load());
194 if (query.getQueryRequest().getNToReturn()) {
195 numResults =
196 std::min(static_cast<size_t>(*query.getQueryRequest().getNToReturn()), numResults);
197 } else if (query.getQueryRequest().getLimit()) {
198 numResults = std::min(static_cast<size_t>(*query.getQueryRequest().getLimit()), numResults);
199 }
200
201 return numResults;
202 }
203
pickBestPlan(PlanYieldPolicy * yieldPolicy)204 Status MultiPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
205 // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
206 // execution work that happens here, so this is needed for the time accounting to
207 // make sense.
208 ScopedTimer timer(getClock(), &_commonStats.executionTimeMillis);
209
210 size_t numWorks = getTrialPeriodWorks(getOpCtx(), _collection);
211 size_t numResults = getTrialPeriodNumToReturn(*_query);
212
213 // Work the plans, stopping when a plan hits EOF or returns some
214 // fixed number of results.
215 for (size_t ix = 0; ix < numWorks; ++ix) {
216 bool moreToDo = workAllPlans(numResults, yieldPolicy);
217 if (!moreToDo) {
218 break;
219 }
220 }
221
222 if (_failure) {
223 invariant(WorkingSet::INVALID_ID != _statusMemberId);
224 WorkingSetMember* member = _candidates[0].ws->get(_statusMemberId);
225 return WorkingSetCommon::getMemberStatus(*member);
226 }
227
228 // After picking best plan, ranking will own plan stats from
229 // candidate solutions (winner and losers).
230 std::unique_ptr<PlanRankingDecision> ranking(new PlanRankingDecision);
231 _bestPlanIdx = PlanRanker::pickBestPlan(_candidates, ranking.get());
232 verify(_bestPlanIdx >= 0 && _bestPlanIdx < static_cast<int>(_candidates.size()));
233
234 // Copy candidate order. We will need this to sort candidate stats for explain
235 // after transferring ownership of 'ranking' to plan cache.
236 std::vector<size_t> candidateOrder = ranking->candidateOrder;
237
238 CandidatePlan& bestCandidate = _candidates[_bestPlanIdx];
239 std::list<WorkingSetID>& alreadyProduced = bestCandidate.results;
240 const auto& bestSolution = bestCandidate.solution;
241
242 LOG(5) << "Winning solution:\n" << redact(bestSolution->toString());
243 LOG(2) << "Winning plan: " << Explain::getPlanSummary(bestCandidate.root);
244
245 _backupPlanIdx = kNoSuchPlan;
246 if (bestSolution->hasBlockingStage && (0 == alreadyProduced.size())) {
247 LOG(5) << "Winner has blocking stage, looking for backup plan...";
248 for (size_t ix = 0; ix < _candidates.size(); ++ix) {
249 if (!_candidates[ix].solution->hasBlockingStage) {
250 LOG(5) << "Candidate " << ix << " is backup child";
251 _backupPlanIdx = ix;
252 break;
253 }
254 }
255 }
256
257 // Even if the query is of a cacheable shape, the caller might have indicated that we shouldn't
258 // write to the plan cache.
259 //
260 // TODO: We can remove this if we introduce replanning logic to the SubplanStage.
261 bool canCache = (_cachingMode == CachingMode::AlwaysCache);
262 if (_cachingMode == CachingMode::SometimesCache) {
263 // In "sometimes cache" mode, we cache unless we hit one of the special cases below.
264 canCache = true;
265
266 if (ranking->tieForBest) {
267 // The winning plan tied with the runner-up and we're using "sometimes cache" mode. We
268 // will not write a plan cache entry.
269 canCache = false;
270
271 // These arrays having two or more entries is implied by 'tieForBest'.
272 invariant(ranking->scores.size() > 1U);
273 invariant(ranking->candidateOrder.size() > 1U);
274
275 size_t winnerIdx = ranking->candidateOrder[0];
276 size_t runnerUpIdx = ranking->candidateOrder[1];
277
278 LOG(1) << "Winning plan tied with runner-up. Not caching."
279 << " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
280 << " winner score: " << ranking->scores[0]
281 << " winner summary: " << Explain::getPlanSummary(_candidates[winnerIdx].root)
282 << " runner-up score: " << ranking->scores[1] << " runner-up summary: "
283 << Explain::getPlanSummary(_candidates[runnerUpIdx].root);
284 }
285
286 if (alreadyProduced.empty()) {
287 // We're using the "sometimes cache" mode, and the winning plan produced no results
288 // during the plan ranking trial period. We will not write a plan cache entry.
289 canCache = false;
290
291 size_t winnerIdx = ranking->candidateOrder[0];
292 LOG(1) << "Winning plan had zero results. Not caching."
293 << " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
294 << " winner score: " << ranking->scores[0]
295 << " winner summary: " << Explain::getPlanSummary(_candidates[winnerIdx].root);
296 }
297 }
298
299 // Store the choice we just made in the cache, if the query is of a type that is safe to
300 // cache.
301 if (PlanCache::shouldCacheQuery(*_query) && canCache) {
302 // Create list of candidate solutions for the cache with
303 // the best solution at the front.
304 std::vector<QuerySolution*> solutions;
305
306 // Generate solutions and ranking decisions sorted by score.
307 for (size_t orderingIndex = 0; orderingIndex < candidateOrder.size(); ++orderingIndex) {
308 // index into candidates/ranking
309 size_t ix = candidateOrder[orderingIndex];
310 solutions.push_back(_candidates[ix].solution.get());
311 }
312
313 // Check solution cache data. Do not add to cache if
314 // we have any invalid SolutionCacheData data.
315 // XXX: One known example is 2D queries
316 bool validSolutions = true;
317 for (size_t ix = 0; ix < solutions.size(); ++ix) {
318 if (NULL == solutions[ix]->cacheData.get()) {
319 LOG(5) << "Not caching query because this solution has no cache data: "
320 << redact(solutions[ix]->toString());
321 validSolutions = false;
322 break;
323 }
324 }
325
326 if (validSolutions) {
327 _collection->infoCache()
328 ->getPlanCache()
329 ->add(*_query,
330 solutions,
331 std::move(ranking),
332 getOpCtx()->getServiceContext()->getPreciseClockSource()->now())
333 .transitional_ignore();
334 }
335 }
336
337 return Status::OK();
338 }
339
workAllPlans(size_t numResults,PlanYieldPolicy * yieldPolicy)340 bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolicy) {
341 bool doneWorking = false;
342
343 for (size_t ix = 0; ix < _candidates.size(); ++ix) {
344 CandidatePlan& candidate = _candidates[ix];
345 if (candidate.failed) {
346 continue;
347 }
348
349 // Might need to yield between calls to work due to the timer elapsing.
350 if (!(tryYield(yieldPolicy)).isOK()) {
351 return false;
352 }
353
354 WorkingSetID id = WorkingSet::INVALID_ID;
355 PlanStage::StageState state = candidate.root->work(&id);
356
357 if (PlanStage::ADVANCED == state) {
358 // Save result for later.
359 WorkingSetMember* member = candidate.ws->get(id);
360 // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we choose to
361 // return the results from the 'candidate' plan.
362 member->makeObjOwnedIfNeeded();
363 candidate.results.push_back(id);
364
365 // Once a plan returns enough results, stop working.
366 if (candidate.results.size() >= numResults) {
367 doneWorking = true;
368 }
369 } else if (PlanStage::IS_EOF == state) {
370 // First plan to hit EOF wins automatically. Stop evaluating other plans.
371 // Assumes that the ranking will pick this plan.
372 doneWorking = true;
373 } else if (PlanStage::NEED_YIELD == state) {
374 if (id == WorkingSet::INVALID_ID) {
375 if (!yieldPolicy->canAutoYield())
376 throw WriteConflictException();
377 } else {
378 WorkingSetMember* member = candidate.ws->get(id);
379 invariant(member->hasFetcher());
380 // Transfer ownership of the fetcher and yield.
381 _fetcher.reset(member->releaseFetcher());
382 }
383
384 if (yieldPolicy->canAutoYield()) {
385 yieldPolicy->forceYield();
386 }
387
388 if (!(tryYield(yieldPolicy)).isOK()) {
389 return false;
390 }
391 } else if (PlanStage::NEED_TIME != state) {
392 // FAILURE or DEAD. Do we want to just tank that plan and try the rest? We
393 // probably want to fail globally as this shouldn't happen anyway.
394
395 candidate.failed = true;
396 ++_failureCount;
397
398 // Propagate most recent seen failure to parent.
399 if (PlanStage::FAILURE == state) {
400 _statusMemberId = id;
401 }
402
403 if (_failureCount == _candidates.size()) {
404 _failure = true;
405 return false;
406 }
407 }
408 }
409
410 return !doneWorking;
411 }
412
413 namespace {
414
invalidateHelper(OperationContext * opCtx,WorkingSet * ws,const RecordId & recordId,list<WorkingSetID> * idsToInvalidate,const Collection * collection)415 void invalidateHelper(OperationContext* opCtx,
416 WorkingSet* ws, // may flag for review
417 const RecordId& recordId,
418 list<WorkingSetID>* idsToInvalidate,
419 const Collection* collection) {
420 for (auto it = idsToInvalidate->begin(); it != idsToInvalidate->end(); ++it) {
421 WorkingSetMember* member = ws->get(*it);
422 if (member->hasRecordId() && member->recordId == recordId) {
423 WorkingSetCommon::fetchAndInvalidateRecordId(opCtx, member, collection);
424 }
425 }
426 }
427
428 } // namespace
429
doInvalidate(OperationContext * opCtx,const RecordId & recordId,InvalidationType type)430 void MultiPlanStage::doInvalidate(OperationContext* opCtx,
431 const RecordId& recordId,
432 InvalidationType type) {
433 if (_failure) {
434 return;
435 }
436
437 if (bestPlanChosen()) {
438 CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
439 invalidateHelper(opCtx, bestPlan.ws, recordId, &bestPlan.results, _collection);
440 if (hasBackupPlan()) {
441 CandidatePlan& backupPlan = _candidates[_backupPlanIdx];
442 invalidateHelper(opCtx, backupPlan.ws, recordId, &backupPlan.results, _collection);
443 }
444 } else {
445 for (size_t ix = 0; ix < _candidates.size(); ++ix) {
446 invalidateHelper(
447 opCtx, _candidates[ix].ws, recordId, &_candidates[ix].results, _collection);
448 }
449 }
450 }
451
hasBackupPlan() const452 bool MultiPlanStage::hasBackupPlan() const {
453 return kNoSuchPlan != _backupPlanIdx;
454 }
455
bestPlanChosen() const456 bool MultiPlanStage::bestPlanChosen() const {
457 return kNoSuchPlan != _bestPlanIdx;
458 }
459
bestPlanIdx() const460 int MultiPlanStage::bestPlanIdx() const {
461 return _bestPlanIdx;
462 }
463
bestSolution()464 QuerySolution* MultiPlanStage::bestSolution() {
465 if (_bestPlanIdx == kNoSuchPlan)
466 return NULL;
467
468 return _candidates[_bestPlanIdx].solution.get();
469 }
470
getStats()471 unique_ptr<PlanStageStats> MultiPlanStage::getStats() {
472 _commonStats.isEOF = isEOF();
473 unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_MULTI_PLAN);
474 ret->specific = make_unique<MultiPlanStats>(_specificStats);
475 for (auto&& child : _children) {
476 ret->children.emplace_back(child->getStats());
477 }
478 return ret;
479 }
480
getSpecificStats() const481 const SpecificStats* MultiPlanStage::getSpecificStats() const {
482 return &_specificStats;
483 }
484
485 } // namespace mongo
486