1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/s/write_ops/write_op.h"
34 
35 #include "mongo/util/assert_util.h"
36 
37 namespace mongo {
38 
39 using std::stringstream;
40 using std::vector;
41 
getWriteItem() const42 const BatchItemRef& WriteOp::getWriteItem() const {
43     return _itemRef;
44 }
45 
getWriteState() const46 WriteOpState WriteOp::getWriteState() const {
47     return _state;
48 }
49 
getOpError() const50 const WriteErrorDetail& WriteOp::getOpError() const {
51     dassert(_state == WriteOpState_Error);
52     return *_error;
53 }
54 
targetWrites(OperationContext * opCtx,const NSTargeter & targeter,std::vector<TargetedWrite * > * targetedWrites)55 Status WriteOp::targetWrites(OperationContext* opCtx,
56                              const NSTargeter& targeter,
57                              std::vector<TargetedWrite*>* targetedWrites) {
58     const bool isIndexInsert = _itemRef.getRequest()->isInsertIndexRequest();
59 
60     auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
61         if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
62             if (isIndexInsert) {
63                 // TODO: Remove the index targeting stuff once there is a command for it?
64                 // TODO: Retry index writes with stale version?
65                 return targeter.targetCollection();
66             }
67 
68             auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument());
69             if (!swEndpoint.isOK())
70                 return swEndpoint.getStatus();
71 
72             return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
73         } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
74             return targeter.targetUpdate(opCtx, _itemRef.getUpdate());
75         } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
76             return targeter.targetDelete(opCtx, _itemRef.getDelete());
77         } else {
78             MONGO_UNREACHABLE;
79         }
80     }();
81 
82     // If we're targeting more than one endpoint with an update/delete, we have to target everywhere
83     // since we cannot currently retry partial results.
84     //
85     // NOTE: Index inserts are currently specially targeted only at the current collection to avoid
86     // creating collections everywhere.
87     if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !isIndexInsert) {
88         swEndpoints = targeter.targetAllShards(opCtx);
89     }
90 
91     // If we had an error, stop here
92     if (!swEndpoints.isOK())
93         return swEndpoints.getStatus();
94 
95     auto& endpoints = swEndpoints.getValue();
96 
97     for (auto&& endpoint : endpoints) {
98         _childOps.emplace_back(this);
99 
100         WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
101 
102         // For now, multiple endpoints imply no versioning - we can't retry half a multi-write
103         if (endpoints.size() > 1u) {
104             endpoint.shardVersion = ChunkVersion::IGNORED();
105         }
106 
107         targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
108 
109         _childOps.back().pendingWrite = targetedWrites->back();
110         _childOps.back().state = WriteOpState_Pending;
111     }
112 
113     _state = WriteOpState_Pending;
114     return Status::OK();
115 }
116 
getNumTargeted()117 size_t WriteOp::getNumTargeted() {
118     return _childOps.size();
119 }
120 
isRetryErrCode(int errCode)121 static bool isRetryErrCode(int errCode) {
122     return errCode == ErrorCodes::StaleShardVersion;
123 }
124 
125 // Aggregate a bunch of errors for a single op together
combineOpErrors(const vector<ChildWriteOp const * > & errOps,WriteErrorDetail * error)126 static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) {
127     // Special case single response
128     if (errOps.size() == 1) {
129         errOps.front()->error->cloneTo(error);
130         return;
131     }
132 
133     error->setErrCode(ErrorCodes::MultipleErrorsOccurred);
134 
135     // Generate the multi-error message below
136     stringstream msg;
137     msg << "multiple errors for op : ";
138 
139     BSONArrayBuilder errB;
140     for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end();
141          ++it) {
142         const ChildWriteOp* errOp = *it;
143         if (it != errOps.begin())
144             msg << " :: and :: ";
145         msg << errOp->error->getErrMessage();
146         errB.append(errOp->error->toBSON());
147     }
148 
149     error->setErrInfo(BSON("causedBy" << errB.arr()));
150     error->setIndex(errOps.front()->error->getIndex());
151     error->setErrMessage(msg.str());
152 }
153 
154 /**
155  * This is the core function which aggregates all the results of a write operation on multiple
156  * shards and updates the write operation's state.
157  */
_updateOpState()158 void WriteOp::_updateOpState() {
159     std::vector<ChildWriteOp const*> childErrors;
160 
161     bool isRetryError = true;
162     for (const auto& childOp : _childOps) {
163         // Don't do anything till we have all the info
164         if (childOp.state != WriteOpState_Completed && childOp.state != WriteOpState_Error) {
165             return;
166         }
167 
168         if (childOp.state == WriteOpState_Error) {
169             childErrors.push_back(&childOp);
170 
171             // Any non-retry error aborts all
172             if (!isRetryErrCode(childOp.error->getErrCode())) {
173                 isRetryError = false;
174             }
175         }
176     }
177 
178     if (!childErrors.empty() && isRetryError) {
179         // Since we're using broadcast mode for multi-shard writes, which cannot SCE
180         invariant(childErrors.size() == 1u);
181         _state = WriteOpState_Ready;
182     } else if (!childErrors.empty()) {
183         _error.reset(new WriteErrorDetail);
184         combineOpErrors(childErrors, _error.get());
185         _state = WriteOpState_Error;
186     } else {
187         _state = WriteOpState_Completed;
188     }
189 
190     invariant(_state != WriteOpState_Pending);
191     _childOps.clear();
192 }
193 
cancelWrites(const WriteErrorDetail * why)194 void WriteOp::cancelWrites(const WriteErrorDetail* why) {
195     invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready);
196 
197     for (auto& childOp : _childOps) {
198         if (childOp.state == WriteOpState_Pending) {
199             childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint));
200             if (why) {
201                 childOp.error.reset(new WriteErrorDetail);
202                 why->cloneTo(childOp.error.get());
203             }
204 
205             childOp.state = WriteOpState_Cancelled;
206         }
207     }
208 
209     _state = WriteOpState_Ready;
210     _childOps.clear();
211 }
212 
noteWriteComplete(const TargetedWrite & targetedWrite)213 void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) {
214     const WriteOpRef& ref = targetedWrite.writeOpRef;
215     auto& childOp = _childOps[ref.second];
216 
217     childOp.pendingWrite = NULL;
218     childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
219     childOp.state = WriteOpState_Completed;
220     _updateOpState();
221 }
222 
noteWriteError(const TargetedWrite & targetedWrite,const WriteErrorDetail & error)223 void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) {
224     const WriteOpRef& ref = targetedWrite.writeOpRef;
225     auto& childOp = _childOps[ref.second];
226 
227     childOp.pendingWrite = NULL;
228     childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
229     childOp.error.reset(new WriteErrorDetail);
230     error.cloneTo(childOp.error.get());
231     dassert(ref.first == _itemRef.getItemIndex());
232     childOp.error->setIndex(_itemRef.getItemIndex());
233     childOp.state = WriteOpState_Error;
234     _updateOpState();
235 }
236 
setOpError(const WriteErrorDetail & error)237 void WriteOp::setOpError(const WriteErrorDetail& error) {
238     dassert(_state == WriteOpState_Ready);
239     _error.reset(new WriteErrorDetail);
240     error.cloneTo(_error.get());
241     _error->setIndex(_itemRef.getItemIndex());
242     _state = WriteOpState_Error;
243     // No need to updateOpState, set directly
244 }
245 
246 }  // namespace mongo
247