1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/s/write_ops/write_op.h"
34
35 #include "mongo/util/assert_util.h"
36
37 namespace mongo {
38
39 using std::stringstream;
40 using std::vector;
41
getWriteItem() const42 const BatchItemRef& WriteOp::getWriteItem() const {
43 return _itemRef;
44 }
45
getWriteState() const46 WriteOpState WriteOp::getWriteState() const {
47 return _state;
48 }
49
getOpError() const50 const WriteErrorDetail& WriteOp::getOpError() const {
51 dassert(_state == WriteOpState_Error);
52 return *_error;
53 }
54
targetWrites(OperationContext * opCtx,const NSTargeter & targeter,std::vector<TargetedWrite * > * targetedWrites)55 Status WriteOp::targetWrites(OperationContext* opCtx,
56 const NSTargeter& targeter,
57 std::vector<TargetedWrite*>* targetedWrites) {
58 const bool isIndexInsert = _itemRef.getRequest()->isInsertIndexRequest();
59
60 auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
61 if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
62 if (isIndexInsert) {
63 // TODO: Remove the index targeting stuff once there is a command for it?
64 // TODO: Retry index writes with stale version?
65 return targeter.targetCollection();
66 }
67
68 auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument());
69 if (!swEndpoint.isOK())
70 return swEndpoint.getStatus();
71
72 return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
73 } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
74 return targeter.targetUpdate(opCtx, _itemRef.getUpdate());
75 } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
76 return targeter.targetDelete(opCtx, _itemRef.getDelete());
77 } else {
78 MONGO_UNREACHABLE;
79 }
80 }();
81
82 // If we're targeting more than one endpoint with an update/delete, we have to target everywhere
83 // since we cannot currently retry partial results.
84 //
85 // NOTE: Index inserts are currently specially targeted only at the current collection to avoid
86 // creating collections everywhere.
87 if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !isIndexInsert) {
88 swEndpoints = targeter.targetAllShards(opCtx);
89 }
90
91 // If we had an error, stop here
92 if (!swEndpoints.isOK())
93 return swEndpoints.getStatus();
94
95 auto& endpoints = swEndpoints.getValue();
96
97 for (auto&& endpoint : endpoints) {
98 _childOps.emplace_back(this);
99
100 WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
101
102 // For now, multiple endpoints imply no versioning - we can't retry half a multi-write
103 if (endpoints.size() > 1u) {
104 endpoint.shardVersion = ChunkVersion::IGNORED();
105 }
106
107 targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
108
109 _childOps.back().pendingWrite = targetedWrites->back();
110 _childOps.back().state = WriteOpState_Pending;
111 }
112
113 _state = WriteOpState_Pending;
114 return Status::OK();
115 }
116
getNumTargeted()117 size_t WriteOp::getNumTargeted() {
118 return _childOps.size();
119 }
120
isRetryErrCode(int errCode)121 static bool isRetryErrCode(int errCode) {
122 return errCode == ErrorCodes::StaleShardVersion;
123 }
124
125 // Aggregate a bunch of errors for a single op together
combineOpErrors(const vector<ChildWriteOp const * > & errOps,WriteErrorDetail * error)126 static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) {
127 // Special case single response
128 if (errOps.size() == 1) {
129 errOps.front()->error->cloneTo(error);
130 return;
131 }
132
133 error->setErrCode(ErrorCodes::MultipleErrorsOccurred);
134
135 // Generate the multi-error message below
136 stringstream msg;
137 msg << "multiple errors for op : ";
138
139 BSONArrayBuilder errB;
140 for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end();
141 ++it) {
142 const ChildWriteOp* errOp = *it;
143 if (it != errOps.begin())
144 msg << " :: and :: ";
145 msg << errOp->error->getErrMessage();
146 errB.append(errOp->error->toBSON());
147 }
148
149 error->setErrInfo(BSON("causedBy" << errB.arr()));
150 error->setIndex(errOps.front()->error->getIndex());
151 error->setErrMessage(msg.str());
152 }
153
154 /**
155 * This is the core function which aggregates all the results of a write operation on multiple
156 * shards and updates the write operation's state.
157 */
_updateOpState()158 void WriteOp::_updateOpState() {
159 std::vector<ChildWriteOp const*> childErrors;
160
161 bool isRetryError = true;
162 for (const auto& childOp : _childOps) {
163 // Don't do anything till we have all the info
164 if (childOp.state != WriteOpState_Completed && childOp.state != WriteOpState_Error) {
165 return;
166 }
167
168 if (childOp.state == WriteOpState_Error) {
169 childErrors.push_back(&childOp);
170
171 // Any non-retry error aborts all
172 if (!isRetryErrCode(childOp.error->getErrCode())) {
173 isRetryError = false;
174 }
175 }
176 }
177
178 if (!childErrors.empty() && isRetryError) {
179 // Since we're using broadcast mode for multi-shard writes, which cannot SCE
180 invariant(childErrors.size() == 1u);
181 _state = WriteOpState_Ready;
182 } else if (!childErrors.empty()) {
183 _error.reset(new WriteErrorDetail);
184 combineOpErrors(childErrors, _error.get());
185 _state = WriteOpState_Error;
186 } else {
187 _state = WriteOpState_Completed;
188 }
189
190 invariant(_state != WriteOpState_Pending);
191 _childOps.clear();
192 }
193
cancelWrites(const WriteErrorDetail * why)194 void WriteOp::cancelWrites(const WriteErrorDetail* why) {
195 invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready);
196
197 for (auto& childOp : _childOps) {
198 if (childOp.state == WriteOpState_Pending) {
199 childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint));
200 if (why) {
201 childOp.error.reset(new WriteErrorDetail);
202 why->cloneTo(childOp.error.get());
203 }
204
205 childOp.state = WriteOpState_Cancelled;
206 }
207 }
208
209 _state = WriteOpState_Ready;
210 _childOps.clear();
211 }
212
noteWriteComplete(const TargetedWrite & targetedWrite)213 void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) {
214 const WriteOpRef& ref = targetedWrite.writeOpRef;
215 auto& childOp = _childOps[ref.second];
216
217 childOp.pendingWrite = NULL;
218 childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
219 childOp.state = WriteOpState_Completed;
220 _updateOpState();
221 }
222
noteWriteError(const TargetedWrite & targetedWrite,const WriteErrorDetail & error)223 void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) {
224 const WriteOpRef& ref = targetedWrite.writeOpRef;
225 auto& childOp = _childOps[ref.second];
226
227 childOp.pendingWrite = NULL;
228 childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
229 childOp.error.reset(new WriteErrorDetail);
230 error.cloneTo(childOp.error.get());
231 dassert(ref.first == _itemRef.getItemIndex());
232 childOp.error->setIndex(_itemRef.getItemIndex());
233 childOp.state = WriteOpState_Error;
234 _updateOpState();
235 }
236
setOpError(const WriteErrorDetail & error)237 void WriteOp::setOpError(const WriteErrorDetail& error) {
238 dassert(_state == WriteOpState_Ready);
239 _error.reset(new WriteErrorDetail);
240 error.cloneTo(_error.get());
241 _error->setIndex(_itemRef.getItemIndex());
242 _state = WriteOpState_Error;
243 // No need to updateOpState, set directly
244 }
245
246 } // namespace mongo
247