1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/exec/multi_plan.h"
36 
37 #include <algorithm>
38 #include <math.h>
39 
40 #include "mongo/base/owned_pointer_vector.h"
41 #include "mongo/db/catalog/collection.h"
42 #include "mongo/db/catalog/database.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/concurrency/write_conflict_exception.h"
45 #include "mongo/db/exec/scoped_timer.h"
46 #include "mongo/db/exec/working_set_common.h"
47 #include "mongo/db/query/explain.h"
48 #include "mongo/db/query/plan_cache.h"
49 #include "mongo/db/query/plan_ranker.h"
50 #include "mongo/db/storage/record_fetcher.h"
51 #include "mongo/stdx/memory.h"
52 #include "mongo/util/log.h"
53 #include "mongo/util/mongoutils/str.h"
54 
55 namespace mongo {
56 
57 using std::endl;
58 using std::list;
59 using std::unique_ptr;
60 using std::vector;
61 using stdx::make_unique;
62 
63 // static
64 const char* MultiPlanStage::kStageType = "MULTI_PLAN";
65 
MultiPlanStage(OperationContext * opCtx,const Collection * collection,CanonicalQuery * cq,CachingMode cachingMode)66 MultiPlanStage::MultiPlanStage(OperationContext* opCtx,
67                                const Collection* collection,
68                                CanonicalQuery* cq,
69                                CachingMode cachingMode)
70     : PlanStage(kStageType, opCtx),
71       _collection(collection),
72       _cachingMode(cachingMode),
73       _query(cq),
74       _bestPlanIdx(kNoSuchPlan),
75       _backupPlanIdx(kNoSuchPlan),
76       _failure(false),
77       _failureCount(0),
78       _statusMemberId(WorkingSet::INVALID_ID) {
79     invariant(_collection);
80 }
81 
addPlan(QuerySolution * solution,PlanStage * root,WorkingSet * ws)82 void MultiPlanStage::addPlan(QuerySolution* solution, PlanStage* root, WorkingSet* ws) {
83     _candidates.push_back(CandidatePlan(solution, root, ws));
84     _children.emplace_back(root);
85 }
86 
isEOF()87 bool MultiPlanStage::isEOF() {
88     if (_failure) {
89         return true;
90     }
91 
92     // If _bestPlanIdx hasn't been found, can't be at EOF
93     if (!bestPlanChosen()) {
94         return false;
95     }
96 
97     // We must have returned all our cached results
98     // and there must be no more results from the best plan.
99     CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
100     return bestPlan.results.empty() && bestPlan.root->isEOF();
101 }
102 
doWork(WorkingSetID * out)103 PlanStage::StageState MultiPlanStage::doWork(WorkingSetID* out) {
104     if (_failure) {
105         *out = _statusMemberId;
106         return PlanStage::FAILURE;
107     }
108 
109     CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
110 
111     // Look for an already produced result that provides the data the caller wants.
112     if (!bestPlan.results.empty()) {
113         *out = bestPlan.results.front();
114         bestPlan.results.pop_front();
115         return PlanStage::ADVANCED;
116     }
117 
118     // best plan had no (or has no more) cached results
119 
120     StageState state = bestPlan.root->work(out);
121 
122     if (PlanStage::FAILURE == state && hasBackupPlan()) {
123         LOG(5) << "Best plan errored out switching to backup";
124         // Uncache the bad solution if we fall back
125         // on the backup solution.
126         //
127         // XXX: Instead of uncaching we should find a way for the
128         // cached plan runner to fall back on a different solution
129         // if the best solution fails. Alternatively we could try to
130         // defer cache insertion to be after the first produced result.
131 
132         _collection->infoCache()->getPlanCache()->remove(*_query).transitional_ignore();
133 
134         _bestPlanIdx = _backupPlanIdx;
135         _backupPlanIdx = kNoSuchPlan;
136 
137         return _candidates[_bestPlanIdx].root->work(out);
138     }
139 
140     if (hasBackupPlan() && PlanStage::ADVANCED == state) {
141         LOG(5) << "Best plan had a blocking stage, became unblocked";
142         _backupPlanIdx = kNoSuchPlan;
143     }
144 
145     return state;
146 }
147 
tryYield(PlanYieldPolicy * yieldPolicy)148 Status MultiPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
149     // These are the conditions which can cause us to yield:
150     //   1) The yield policy's timer elapsed, or
151     //   2) some stage requested a yield due to a document fetch, or
152     //   3) we need to yield and retry due to a WriteConflictException.
153     // In all cases, the actual yielding happens here.
154     if (yieldPolicy->shouldYield()) {
155         auto yieldStatus = yieldPolicy->yield(_fetcher.get());
156 
157         if (!yieldStatus.isOK()) {
158             _failure = true;
159             _statusMemberId =
160                 WorkingSetCommon::allocateStatusMember(_candidates[0].ws, yieldStatus);
161             return yieldStatus;
162         }
163     }
164 
165     // We're done using the fetcher, so it should be freed. We don't want to
166     // use the same RecordFetcher twice.
167     _fetcher.reset();
168 
169     return Status::OK();
170 }
171 
172 // static
getTrialPeriodWorks(OperationContext * opCtx,const Collection * collection)173 size_t MultiPlanStage::getTrialPeriodWorks(OperationContext* opCtx, const Collection* collection) {
174     // Run each plan some number of times. This number is at least as great as
175     // 'internalQueryPlanEvaluationWorks', but may be larger for big collections.
176     size_t numWorks = internalQueryPlanEvaluationWorks.load();
177     if (NULL != collection) {
178         // For large collections, the number of works is set to be this
179         // fraction of the collection size.
180         double fraction = internalQueryPlanEvaluationCollFraction;
181 
182         numWorks = std::max(static_cast<size_t>(internalQueryPlanEvaluationWorks.load()),
183                             static_cast<size_t>(fraction * collection->numRecords(opCtx)));
184     }
185 
186     return numWorks;
187 }
188 
189 // static
getTrialPeriodNumToReturn(const CanonicalQuery & query)190 size_t MultiPlanStage::getTrialPeriodNumToReturn(const CanonicalQuery& query) {
191     // Determine the number of results which we will produce during the plan
192     // ranking phase before stopping.
193     size_t numResults = static_cast<size_t>(internalQueryPlanEvaluationMaxResults.load());
194     if (query.getQueryRequest().getNToReturn()) {
195         numResults =
196             std::min(static_cast<size_t>(*query.getQueryRequest().getNToReturn()), numResults);
197     } else if (query.getQueryRequest().getLimit()) {
198         numResults = std::min(static_cast<size_t>(*query.getQueryRequest().getLimit()), numResults);
199     }
200 
201     return numResults;
202 }
203 
pickBestPlan(PlanYieldPolicy * yieldPolicy)204 Status MultiPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
205     // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
206     // execution work that happens here, so this is needed for the time accounting to
207     // make sense.
208     ScopedTimer timer(getClock(), &_commonStats.executionTimeMillis);
209 
210     size_t numWorks = getTrialPeriodWorks(getOpCtx(), _collection);
211     size_t numResults = getTrialPeriodNumToReturn(*_query);
212 
213     // Work the plans, stopping when a plan hits EOF or returns some
214     // fixed number of results.
215     for (size_t ix = 0; ix < numWorks; ++ix) {
216         bool moreToDo = workAllPlans(numResults, yieldPolicy);
217         if (!moreToDo) {
218             break;
219         }
220     }
221 
222     if (_failure) {
223         invariant(WorkingSet::INVALID_ID != _statusMemberId);
224         WorkingSetMember* member = _candidates[0].ws->get(_statusMemberId);
225         return WorkingSetCommon::getMemberStatus(*member);
226     }
227 
228     // After picking best plan, ranking will own plan stats from
229     // candidate solutions (winner and losers).
230     std::unique_ptr<PlanRankingDecision> ranking(new PlanRankingDecision);
231     _bestPlanIdx = PlanRanker::pickBestPlan(_candidates, ranking.get());
232     verify(_bestPlanIdx >= 0 && _bestPlanIdx < static_cast<int>(_candidates.size()));
233 
234     // Copy candidate order. We will need this to sort candidate stats for explain
235     // after transferring ownership of 'ranking' to plan cache.
236     std::vector<size_t> candidateOrder = ranking->candidateOrder;
237 
238     CandidatePlan& bestCandidate = _candidates[_bestPlanIdx];
239     std::list<WorkingSetID>& alreadyProduced = bestCandidate.results;
240     const auto& bestSolution = bestCandidate.solution;
241 
242     LOG(5) << "Winning solution:\n" << redact(bestSolution->toString());
243     LOG(2) << "Winning plan: " << Explain::getPlanSummary(bestCandidate.root);
244 
245     _backupPlanIdx = kNoSuchPlan;
246     if (bestSolution->hasBlockingStage && (0 == alreadyProduced.size())) {
247         LOG(5) << "Winner has blocking stage, looking for backup plan...";
248         for (size_t ix = 0; ix < _candidates.size(); ++ix) {
249             if (!_candidates[ix].solution->hasBlockingStage) {
250                 LOG(5) << "Candidate " << ix << " is backup child";
251                 _backupPlanIdx = ix;
252                 break;
253             }
254         }
255     }
256 
257     // Even if the query is of a cacheable shape, the caller might have indicated that we shouldn't
258     // write to the plan cache.
259     //
260     // TODO: We can remove this if we introduce replanning logic to the SubplanStage.
261     bool canCache = (_cachingMode == CachingMode::AlwaysCache);
262     if (_cachingMode == CachingMode::SometimesCache) {
263         // In "sometimes cache" mode, we cache unless we hit one of the special cases below.
264         canCache = true;
265 
266         if (ranking->tieForBest) {
267             // The winning plan tied with the runner-up and we're using "sometimes cache" mode. We
268             // will not write a plan cache entry.
269             canCache = false;
270 
271             // These arrays having two or more entries is implied by 'tieForBest'.
272             invariant(ranking->scores.size() > 1U);
273             invariant(ranking->candidateOrder.size() > 1U);
274 
275             size_t winnerIdx = ranking->candidateOrder[0];
276             size_t runnerUpIdx = ranking->candidateOrder[1];
277 
278             LOG(1) << "Winning plan tied with runner-up. Not caching."
279                    << " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
280                    << " winner score: " << ranking->scores[0]
281                    << " winner summary: " << Explain::getPlanSummary(_candidates[winnerIdx].root)
282                    << " runner-up score: " << ranking->scores[1] << " runner-up summary: "
283                    << Explain::getPlanSummary(_candidates[runnerUpIdx].root);
284         }
285 
286         if (alreadyProduced.empty()) {
287             // We're using the "sometimes cache" mode, and the winning plan produced no results
288             // during the plan ranking trial period. We will not write a plan cache entry.
289             canCache = false;
290 
291             size_t winnerIdx = ranking->candidateOrder[0];
292             LOG(1) << "Winning plan had zero results. Not caching."
293                    << " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
294                    << " winner score: " << ranking->scores[0]
295                    << " winner summary: " << Explain::getPlanSummary(_candidates[winnerIdx].root);
296         }
297     }
298 
299     // Store the choice we just made in the cache, if the query is of a type that is safe to
300     // cache.
301     if (PlanCache::shouldCacheQuery(*_query) && canCache) {
302         // Create list of candidate solutions for the cache with
303         // the best solution at the front.
304         std::vector<QuerySolution*> solutions;
305 
306         // Generate solutions and ranking decisions sorted by score.
307         for (size_t orderingIndex = 0; orderingIndex < candidateOrder.size(); ++orderingIndex) {
308             // index into candidates/ranking
309             size_t ix = candidateOrder[orderingIndex];
310             solutions.push_back(_candidates[ix].solution.get());
311         }
312 
313         // Check solution cache data. Do not add to cache if
314         // we have any invalid SolutionCacheData data.
315         // XXX: One known example is 2D queries
316         bool validSolutions = true;
317         for (size_t ix = 0; ix < solutions.size(); ++ix) {
318             if (NULL == solutions[ix]->cacheData.get()) {
319                 LOG(5) << "Not caching query because this solution has no cache data: "
320                        << redact(solutions[ix]->toString());
321                 validSolutions = false;
322                 break;
323             }
324         }
325 
326         if (validSolutions) {
327             _collection->infoCache()
328                 ->getPlanCache()
329                 ->add(*_query,
330                       solutions,
331                       std::move(ranking),
332                       getOpCtx()->getServiceContext()->getPreciseClockSource()->now())
333                 .transitional_ignore();
334         }
335     }
336 
337     return Status::OK();
338 }
339 
workAllPlans(size_t numResults,PlanYieldPolicy * yieldPolicy)340 bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolicy) {
341     bool doneWorking = false;
342 
343     for (size_t ix = 0; ix < _candidates.size(); ++ix) {
344         CandidatePlan& candidate = _candidates[ix];
345         if (candidate.failed) {
346             continue;
347         }
348 
349         // Might need to yield between calls to work due to the timer elapsing.
350         if (!(tryYield(yieldPolicy)).isOK()) {
351             return false;
352         }
353 
354         WorkingSetID id = WorkingSet::INVALID_ID;
355         PlanStage::StageState state = candidate.root->work(&id);
356 
357         if (PlanStage::ADVANCED == state) {
358             // Save result for later.
359             WorkingSetMember* member = candidate.ws->get(id);
360             // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we choose to
361             // return the results from the 'candidate' plan.
362             member->makeObjOwnedIfNeeded();
363             candidate.results.push_back(id);
364 
365             // Once a plan returns enough results, stop working.
366             if (candidate.results.size() >= numResults) {
367                 doneWorking = true;
368             }
369         } else if (PlanStage::IS_EOF == state) {
370             // First plan to hit EOF wins automatically.  Stop evaluating other plans.
371             // Assumes that the ranking will pick this plan.
372             doneWorking = true;
373         } else if (PlanStage::NEED_YIELD == state) {
374             if (id == WorkingSet::INVALID_ID) {
375                 if (!yieldPolicy->canAutoYield())
376                     throw WriteConflictException();
377             } else {
378                 WorkingSetMember* member = candidate.ws->get(id);
379                 invariant(member->hasFetcher());
380                 // Transfer ownership of the fetcher and yield.
381                 _fetcher.reset(member->releaseFetcher());
382             }
383 
384             if (yieldPolicy->canAutoYield()) {
385                 yieldPolicy->forceYield();
386             }
387 
388             if (!(tryYield(yieldPolicy)).isOK()) {
389                 return false;
390             }
391         } else if (PlanStage::NEED_TIME != state) {
392             // FAILURE or DEAD.  Do we want to just tank that plan and try the rest?  We
393             // probably want to fail globally as this shouldn't happen anyway.
394 
395             candidate.failed = true;
396             ++_failureCount;
397 
398             // Propagate most recent seen failure to parent.
399             if (PlanStage::FAILURE == state) {
400                 _statusMemberId = id;
401             }
402 
403             if (_failureCount == _candidates.size()) {
404                 _failure = true;
405                 return false;
406             }
407         }
408     }
409 
410     return !doneWorking;
411 }
412 
413 namespace {
414 
invalidateHelper(OperationContext * opCtx,WorkingSet * ws,const RecordId & recordId,list<WorkingSetID> * idsToInvalidate,const Collection * collection)415 void invalidateHelper(OperationContext* opCtx,
416                       WorkingSet* ws,  // may flag for review
417                       const RecordId& recordId,
418                       list<WorkingSetID>* idsToInvalidate,
419                       const Collection* collection) {
420     for (auto it = idsToInvalidate->begin(); it != idsToInvalidate->end(); ++it) {
421         WorkingSetMember* member = ws->get(*it);
422         if (member->hasRecordId() && member->recordId == recordId) {
423             WorkingSetCommon::fetchAndInvalidateRecordId(opCtx, member, collection);
424         }
425     }
426 }
427 
428 }  // namespace
429 
doInvalidate(OperationContext * opCtx,const RecordId & recordId,InvalidationType type)430 void MultiPlanStage::doInvalidate(OperationContext* opCtx,
431                                   const RecordId& recordId,
432                                   InvalidationType type) {
433     if (_failure) {
434         return;
435     }
436 
437     if (bestPlanChosen()) {
438         CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
439         invalidateHelper(opCtx, bestPlan.ws, recordId, &bestPlan.results, _collection);
440         if (hasBackupPlan()) {
441             CandidatePlan& backupPlan = _candidates[_backupPlanIdx];
442             invalidateHelper(opCtx, backupPlan.ws, recordId, &backupPlan.results, _collection);
443         }
444     } else {
445         for (size_t ix = 0; ix < _candidates.size(); ++ix) {
446             invalidateHelper(
447                 opCtx, _candidates[ix].ws, recordId, &_candidates[ix].results, _collection);
448         }
449     }
450 }
451 
hasBackupPlan() const452 bool MultiPlanStage::hasBackupPlan() const {
453     return kNoSuchPlan != _backupPlanIdx;
454 }
455 
bestPlanChosen() const456 bool MultiPlanStage::bestPlanChosen() const {
457     return kNoSuchPlan != _bestPlanIdx;
458 }
459 
bestPlanIdx() const460 int MultiPlanStage::bestPlanIdx() const {
461     return _bestPlanIdx;
462 }
463 
bestSolution()464 QuerySolution* MultiPlanStage::bestSolution() {
465     if (_bestPlanIdx == kNoSuchPlan)
466         return NULL;
467 
468     return _candidates[_bestPlanIdx].solution.get();
469 }
470 
getStats()471 unique_ptr<PlanStageStats> MultiPlanStage::getStats() {
472     _commonStats.isEOF = isEOF();
473     unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_MULTI_PLAN);
474     ret->specific = make_unique<MultiPlanStats>(_specificStats);
475     for (auto&& child : _children) {
476         ret->children.emplace_back(child->getStats());
477     }
478     return ret;
479 }
480 
getSpecificStats() const481 const SpecificStats* MultiPlanStage::getSpecificStats() const {
482     return &_specificStats;
483 }
484 
485 }  // namespace mongo
486