1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/db/repl/multiapplier.h"
34 
35 #include <utility>
36 
37 #include "mongo/db/client.h"
38 #include "mongo/db/operation_context.h"
39 #include "mongo/db/repl/optime.h"
40 #include "mongo/util/destructor_guard.h"
41 
42 namespace mongo {
43 namespace repl {
44 
MultiApplier(executor::TaskExecutor * executor,const Operations & operations,const ApplyOperationFn & applyOperation,const MultiApplyFn & multiApply,const CallbackFn & onCompletion)45 MultiApplier::MultiApplier(executor::TaskExecutor* executor,
46                            const Operations& operations,
47                            const ApplyOperationFn& applyOperation,
48                            const MultiApplyFn& multiApply,
49                            const CallbackFn& onCompletion)
50     : _executor(executor),
51       _operations(operations),
52       _applyOperation(applyOperation),
53       _multiApply(multiApply),
54       _onCompletion(onCompletion) {
55     uassert(ErrorCodes::BadValue, "null replication executor", executor);
56     uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
57     uassert(ErrorCodes::FailedToParse,
58             str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
59             operations.back().raw.hasField("ts"));
60     uassert(ErrorCodes::TypeMismatch,
61             str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
62             BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
63     uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
64     uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply);
65     uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
66 }
67 
~MultiApplier()68 MultiApplier::~MultiApplier() {
69     DESTRUCTOR_GUARD(shutdown(); join(););
70 }
71 
isActive() const72 bool MultiApplier::isActive() const {
73     stdx::lock_guard<stdx::mutex> lk(_mutex);
74     return _isActive_inlock();
75 }
76 
_isActive_inlock() const77 bool MultiApplier::_isActive_inlock() const {
78     return State::kRunning == _state || State::kShuttingDown == _state;
79 }
80 
startup()81 Status MultiApplier::startup() noexcept {
82     stdx::lock_guard<stdx::mutex> lk(_mutex);
83 
84     switch (_state) {
85         case State::kPreStart:
86             _state = State::kRunning;
87             break;
88         case State::kRunning:
89             return Status(ErrorCodes::InternalError, "multi applier already started");
90         case State::kShuttingDown:
91             return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down");
92         case State::kComplete:
93             return Status(ErrorCodes::ShutdownInProgress, "multi applier completed");
94     }
95 
96     auto scheduleResult =
97         _executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
98     if (!scheduleResult.isOK()) {
99         _state = State::kComplete;
100         return scheduleResult.getStatus();
101     }
102 
103     _dbWorkCallbackHandle = scheduleResult.getValue();
104 
105     return Status::OK();
106 }
107 
shutdown()108 void MultiApplier::shutdown() {
109     stdx::lock_guard<stdx::mutex> lk(_mutex);
110     switch (_state) {
111         case State::kPreStart:
112             // Transition directly from PreStart to Complete if not started yet.
113             _state = State::kComplete;
114             return;
115         case State::kRunning:
116             _state = State::kShuttingDown;
117             break;
118         case State::kShuttingDown:
119         case State::kComplete:
120             // Nothing to do if we are already in ShuttingDown or Complete state.
121             return;
122     }
123 
124     if (_dbWorkCallbackHandle.isValid()) {
125         _executor->cancel(_dbWorkCallbackHandle);
126     }
127 }
128 
join()129 void MultiApplier::join() {
130     stdx::unique_lock<stdx::mutex> lk(_mutex);
131     _condition.wait(lk, [this]() { return !_isActive_inlock(); });
132 }
133 
getState_forTest() const134 MultiApplier::State MultiApplier::getState_forTest() const {
135     stdx::lock_guard<stdx::mutex> lk(_mutex);
136     return _state;
137 }
138 
_callback(const executor::TaskExecutor::CallbackArgs & cbd)139 void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
140     if (!cbd.status.isOK()) {
141         _finishCallback(cbd.status);
142         return;
143     }
144 
145     invariant(!_operations.empty());
146 
147     StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated");
148     try {
149         auto opCtx = cc().makeOperationContext();
150         applyStatus = _multiApply(opCtx.get(), _operations, _applyOperation);
151     } catch (...) {
152         applyStatus = exceptionToStatus();
153     }
154     _finishCallback(applyStatus.getStatus());
155 }
156 
_finishCallback(const Status & result)157 void MultiApplier::_finishCallback(const Status& result) {
158     // After running callback function, clear '_onCompletion' to release any resources that might be
159     // held by this function object.
160     // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
161     // there is any logic that's invoked at the function object's destruction that might call into
162     // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is
163     // destroyed outside the lock.
164     decltype(_onCompletion) onCompletion;
165     {
166         stdx::lock_guard<stdx::mutex> lock(_mutex);
167         invariant(_onCompletion);
168         std::swap(_onCompletion, onCompletion);
169     }
170 
171     onCompletion(result);
172 
173     stdx::lock_guard<stdx::mutex> lk(_mutex);
174     invariant(State::kComplete != _state);
175     _state = State::kComplete;
176     _condition.notify_all();
177 }
178 
operator <<(std::ostream & os,const MultiApplier::State & state)179 std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) {
180     switch (state) {
181         case MultiApplier::State::kPreStart:
182             return os << "PreStart";
183         case MultiApplier::State::kRunning:
184             return os << "Running";
185         case MultiApplier::State::kShuttingDown:
186             return os << "ShuttingDown";
187         case MultiApplier::State::kComplete:
188             return os << "Complete";
189     }
190     MONGO_UNREACHABLE;
191 }
192 
193 }  // namespace repl
194 }  // namespace mongo
195