1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/repl/multiapplier.h"
34
35 #include <utility>
36
37 #include "mongo/db/client.h"
38 #include "mongo/db/operation_context.h"
39 #include "mongo/db/repl/optime.h"
40 #include "mongo/util/destructor_guard.h"
41
42 namespace mongo {
43 namespace repl {
44
MultiApplier(executor::TaskExecutor * executor,const Operations & operations,const ApplyOperationFn & applyOperation,const MultiApplyFn & multiApply,const CallbackFn & onCompletion)45 MultiApplier::MultiApplier(executor::TaskExecutor* executor,
46 const Operations& operations,
47 const ApplyOperationFn& applyOperation,
48 const MultiApplyFn& multiApply,
49 const CallbackFn& onCompletion)
50 : _executor(executor),
51 _operations(operations),
52 _applyOperation(applyOperation),
53 _multiApply(multiApply),
54 _onCompletion(onCompletion) {
55 uassert(ErrorCodes::BadValue, "null replication executor", executor);
56 uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
57 uassert(ErrorCodes::FailedToParse,
58 str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
59 operations.back().raw.hasField("ts"));
60 uassert(ErrorCodes::TypeMismatch,
61 str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
62 BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
63 uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
64 uassert(ErrorCodes::BadValue, "multi apply function cannot be null", multiApply);
65 uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
66 }
67
~MultiApplier()68 MultiApplier::~MultiApplier() {
69 DESTRUCTOR_GUARD(shutdown(); join(););
70 }
71
isActive() const72 bool MultiApplier::isActive() const {
73 stdx::lock_guard<stdx::mutex> lk(_mutex);
74 return _isActive_inlock();
75 }
76
_isActive_inlock() const77 bool MultiApplier::_isActive_inlock() const {
78 return State::kRunning == _state || State::kShuttingDown == _state;
79 }
80
startup()81 Status MultiApplier::startup() noexcept {
82 stdx::lock_guard<stdx::mutex> lk(_mutex);
83
84 switch (_state) {
85 case State::kPreStart:
86 _state = State::kRunning;
87 break;
88 case State::kRunning:
89 return Status(ErrorCodes::InternalError, "multi applier already started");
90 case State::kShuttingDown:
91 return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down");
92 case State::kComplete:
93 return Status(ErrorCodes::ShutdownInProgress, "multi applier completed");
94 }
95
96 auto scheduleResult =
97 _executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
98 if (!scheduleResult.isOK()) {
99 _state = State::kComplete;
100 return scheduleResult.getStatus();
101 }
102
103 _dbWorkCallbackHandle = scheduleResult.getValue();
104
105 return Status::OK();
106 }
107
shutdown()108 void MultiApplier::shutdown() {
109 stdx::lock_guard<stdx::mutex> lk(_mutex);
110 switch (_state) {
111 case State::kPreStart:
112 // Transition directly from PreStart to Complete if not started yet.
113 _state = State::kComplete;
114 return;
115 case State::kRunning:
116 _state = State::kShuttingDown;
117 break;
118 case State::kShuttingDown:
119 case State::kComplete:
120 // Nothing to do if we are already in ShuttingDown or Complete state.
121 return;
122 }
123
124 if (_dbWorkCallbackHandle.isValid()) {
125 _executor->cancel(_dbWorkCallbackHandle);
126 }
127 }
128
join()129 void MultiApplier::join() {
130 stdx::unique_lock<stdx::mutex> lk(_mutex);
131 _condition.wait(lk, [this]() { return !_isActive_inlock(); });
132 }
133
getState_forTest() const134 MultiApplier::State MultiApplier::getState_forTest() const {
135 stdx::lock_guard<stdx::mutex> lk(_mutex);
136 return _state;
137 }
138
_callback(const executor::TaskExecutor::CallbackArgs & cbd)139 void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
140 if (!cbd.status.isOK()) {
141 _finishCallback(cbd.status);
142 return;
143 }
144
145 invariant(!_operations.empty());
146
147 StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated");
148 try {
149 auto opCtx = cc().makeOperationContext();
150 applyStatus = _multiApply(opCtx.get(), _operations, _applyOperation);
151 } catch (...) {
152 applyStatus = exceptionToStatus();
153 }
154 _finishCallback(applyStatus.getStatus());
155 }
156
_finishCallback(const Status & result)157 void MultiApplier::_finishCallback(const Status& result) {
158 // After running callback function, clear '_onCompletion' to release any resources that might be
159 // held by this function object.
160 // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
161 // there is any logic that's invoked at the function object's destruction that might call into
162 // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is
163 // destroyed outside the lock.
164 decltype(_onCompletion) onCompletion;
165 {
166 stdx::lock_guard<stdx::mutex> lock(_mutex);
167 invariant(_onCompletion);
168 std::swap(_onCompletion, onCompletion);
169 }
170
171 onCompletion(result);
172
173 stdx::lock_guard<stdx::mutex> lk(_mutex);
174 invariant(State::kComplete != _state);
175 _state = State::kComplete;
176 _condition.notify_all();
177 }
178
operator <<(std::ostream & os,const MultiApplier::State & state)179 std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) {
180 switch (state) {
181 case MultiApplier::State::kPreStart:
182 return os << "PreStart";
183 case MultiApplier::State::kRunning:
184 return os << "Running";
185 case MultiApplier::State::kShuttingDown:
186 return os << "ShuttingDown";
187 case MultiApplier::State::kComplete:
188 return os << "Complete";
189 }
190 MONGO_UNREACHABLE;
191 }
192
193 } // namespace repl
194 } // namespace mongo
195