1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/update/push_node.h"
34
35 #include <numeric>
36
37 #include "mongo/base/simple_string_data_comparator.h"
38 #include "mongo/bson/mutable/algorithm.h"
39 #include "mongo/db/matcher/expression_parser.h"
40 #include "mongo/db/update/update_internal_node.h"
41
42 namespace mongo {
43
44 const StringData PushNode::kEachClauseName = "$each"_sd;
45 const StringData PushNode::kSliceClauseName = "$slice";
46 const StringData PushNode::kSortClauseName = "$sort";
47 const StringData PushNode::kPositionClauseName = "$position";
48
49 namespace {
50
51 /**
52 * When the $sort clause in a $push modifer is an object, that object should pass the checks in
53 * this function.
54 */
checkSortClause(const BSONObj & sortObject)55 Status checkSortClause(const BSONObj& sortObject) {
56 if (sortObject.isEmpty()) {
57 return Status(ErrorCodes::BadValue,
58 "The $sort pattern is empty when it should be a set of fields.");
59 }
60
61 for (auto&& patternElement : sortObject) {
62 double orderVal = patternElement.isNumber() ? patternElement.Number() : 0;
63 if (orderVal != -1 && orderVal != 1) {
64 return Status(ErrorCodes::BadValue, "The $sort element value must be either 1 or -1");
65 }
66
67 FieldRef sortField(patternElement.fieldName());
68 if (sortField.numParts() == 0) {
69 return Status(ErrorCodes::BadValue, "The $sort field cannot be empty");
70 }
71
72 for (size_t i = 0; i < sortField.numParts(); ++i) {
73 if (sortField.getPart(i).size() == 0) {
74 return Status(ErrorCodes::BadValue,
75 str::stream() << "The $sort field is a dotted field "
76 "but has an empty part: "
77 << sortField.dottedField());
78 }
79 }
80 }
81
82 return Status::OK();
83 }
84
85 } // namespace
86
init(BSONElement modExpr,const boost::intrusive_ptr<ExpressionContext> & expCtx)87 Status PushNode::init(BSONElement modExpr, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
88 invariant(modExpr.ok());
89
90 if (modExpr.type() == BSONType::Object && modExpr[kEachClauseName]) {
91 std::set<StringData> validClauseNames{
92 kEachClauseName, kSliceClauseName, kSortClauseName, kPositionClauseName};
93 auto clausesFound =
94 SimpleStringDataComparator::kInstance.makeStringDataUnorderedMap<const BSONElement>();
95
96 for (auto&& modifier : modExpr.embeddedObject()) {
97 auto clauseName = modifier.fieldNameStringData();
98
99 auto foundClauseName = validClauseNames.find(clauseName);
100 if (foundClauseName == validClauseNames.end()) {
101 return Status(ErrorCodes::BadValue,
102 str::stream() << "Unrecognized clause in $push: "
103 << modifier.fieldNameStringData());
104 }
105
106 if (clausesFound.find(*foundClauseName) != clausesFound.end()) {
107 return Status(ErrorCodes::BadValue,
108 str::stream() << "Only one " << clauseName << " is supported.");
109 }
110
111 clausesFound.insert(std::make_pair(*foundClauseName, modifier));
112 }
113
114 // Parse $each.
115 auto eachIt = clausesFound.find(kEachClauseName);
116 invariant(eachIt != clausesFound.end()); // We already checked for a $each clause.
117 const auto& eachClause = eachIt->second;
118 if (eachClause.type() != BSONType::Array) {
119 return Status(ErrorCodes::BadValue,
120 str::stream() << "The argument to $each in $push must be"
121 " an array but it was of type: "
122 << typeName(eachClause.type()));
123 }
124
125 for (auto&& item : eachClause.embeddedObject()) {
126 _valuesToPush.push_back(item);
127 }
128
129 // Parse (optional) $slice.
130 auto sliceIt = clausesFound.find(kSliceClauseName);
131 if (sliceIt != clausesFound.end()) {
132 auto sliceClause = sliceIt->second;
133 auto parsedSliceValue = MatchExpressionParser::parseIntegerElementToLong(sliceClause);
134 if (parsedSliceValue.isOK()) {
135 _slice = parsedSliceValue.getValue();
136 } else {
137 return Status(ErrorCodes::BadValue,
138 str::stream() << "The value for $slice must "
139 "be an integer value but was given type: "
140 << typeName(sliceClause.type()));
141 }
142 }
143
144 // Parse (optional) $sort.
145 auto sortIt = clausesFound.find(kSortClauseName);
146 if (sortIt != clausesFound.end()) {
147 auto sortClause = sortIt->second;
148
149 if (sortClause.type() == BSONType::Object) {
150 auto status = checkSortClause(sortClause.embeddedObject());
151
152 if (status.isOK()) {
153 _sort = PatternElementCmp(sortClause.embeddedObject(), expCtx->getCollator());
154 } else {
155 return status;
156 }
157 } else if (sortClause.isNumber()) {
158 double orderVal = sortClause.Number();
159 if (orderVal == -1 || orderVal == 1) {
160 _sort = PatternElementCmp(BSON("" << orderVal), expCtx->getCollator());
161 } else {
162 return Status(ErrorCodes::BadValue,
163 "The $sort element value must be either 1 or -1");
164 }
165 } else {
166 return Status(ErrorCodes::BadValue,
167 "The $sort is invalid: use 1/-1 to sort the whole element, "
168 "or {field:1/-1} to sort embedded fields");
169 }
170 }
171
172 // Parse (optional) $position.
173 auto positionIt = clausesFound.find(kPositionClauseName);
174 if (positionIt != clausesFound.end()) {
175 auto positionClause = positionIt->second;
176 auto parsedPositionValue =
177 MatchExpressionParser::parseIntegerElementToLong(positionClause);
178 if (parsedPositionValue.isOK()) {
179 _position = parsedPositionValue.getValue();
180 } else {
181 return Status(ErrorCodes::BadValue,
182 str::stream() << "The value for $position must "
183 "be an integer value, not of type: "
184 << typeName(positionClause.type()));
185 }
186 }
187 } else {
188 // No $each clause. We treat the value of $push as the element to add to the array.
189 _valuesToPush.push_back(modExpr);
190 }
191
192 return Status::OK();
193 }
194
insertElementsWithPosition(mutablebson::Element * array,long long position,const std::vector<BSONElement> & valuesToPush)195 ModifierNode::ModifyResult PushNode::insertElementsWithPosition(
196 mutablebson::Element* array, long long position, const std::vector<BSONElement>& valuesToPush) {
197 if (valuesToPush.empty()) {
198 return ModifyResult::kNoOp;
199 }
200
201 auto& document = array->getDocument();
202 auto firstElementToInsert =
203 document.makeElementWithNewFieldName(StringData(), valuesToPush.front());
204
205 // We assume that no array has more than std::numerical_limits<long long>::max() elements.
206 long long arraySize = static_cast<long long>(countChildren(*array));
207
208 // We insert the first element of 'valuesToPush' at the location requested in the 'position'
209 // variable.
210 ModifyResult result;
211 if (arraySize == 0) {
212 invariantOK(array->pushBack(firstElementToInsert));
213 result = ModifyResult::kNormalUpdate;
214 } else if (position > arraySize) {
215 invariantOK(array->pushBack(firstElementToInsert));
216 result = ModifyResult::kArrayAppendUpdate;
217 } else if (position > 0) {
218 auto insertAfter = getNthChild(*array, position - 1);
219 invariantOK(insertAfter.addSiblingRight(firstElementToInsert));
220 result = ModifyResult::kNormalUpdate;
221 } else if (position < 0 && -position < arraySize) {
222 auto insertAfter = getNthChild(*array, arraySize - (-position) - 1);
223 invariantOK(insertAfter.addSiblingRight(firstElementToInsert));
224 result = ModifyResult::kNormalUpdate;
225 } else {
226 invariantOK(array->pushFront(firstElementToInsert));
227 result = ModifyResult::kNormalUpdate;
228 }
229
230 // We insert all the rest of the elements after the one we just inserted.
231 std::accumulate(std::next(valuesToPush.begin()),
232 valuesToPush.end(),
233 firstElementToInsert,
234 [&document](auto& insertAfter, auto& valueToInsert) {
235 auto nextElementToInsert =
236 document.makeElementWithNewFieldName(StringData(), valueToInsert);
237 invariantOK(insertAfter.addSiblingRight(nextElementToInsert));
238 return nextElementToInsert;
239 });
240
241 return result;
242 }
243
performPush(mutablebson::Element * element,FieldRef * elementPath) const244 ModifierNode::ModifyResult PushNode::performPush(mutablebson::Element* element,
245 FieldRef* elementPath) const {
246 if (element->getType() != BSONType::Array) {
247 invariant(elementPath); // We can only hit this error if we are updating an existing path.
248 auto idElem = mutablebson::findFirstChildNamed(element->getDocument().root(), "_id");
249 uasserted(ErrorCodes::BadValue,
250 str::stream() << "The field '" << elementPath->dottedField() << "'"
251 << " must be an array but is of type "
252 << typeName(element->getType())
253 << " in document {"
254 << (idElem.ok() ? idElem.toString() : "no id")
255 << "}");
256 }
257
258 auto result = insertElementsWithPosition(element, _position, _valuesToPush);
259
260 if (_sort) {
261 result = ModifyResult::kNormalUpdate;
262 sortChildren(*element, *_sort);
263 }
264
265 // std::abs(LLONG_MIN) results in undefined behavior on 2's complement systems because the
266 // absolute value of LLONG_MIN cannot be represented in a 'long long'.
267 const auto sliceAbs = _slice == std::numeric_limits<decltype(_slice)>::min()
268 ? std::abs(_slice + 1)
269 : std::abs(_slice);
270
271 while (static_cast<long long>(countChildren(*element)) > sliceAbs) {
272 result = ModifyResult::kNormalUpdate;
273 if (_slice >= 0) {
274 invariantOK(element->popBack());
275 } else {
276 // A negative value in '_slice' trims the array down to abs(_slice) but removes entries
277 // from the front of the array instead of the back.
278 invariantOK(element->popFront());
279 }
280 }
281
282 return result;
283 }
284
updateExistingElement(mutablebson::Element * element,std::shared_ptr<FieldRef> elementPath) const285 ModifierNode::ModifyResult PushNode::updateExistingElement(
286 mutablebson::Element* element, std::shared_ptr<FieldRef> elementPath) const {
287 return performPush(element, elementPath.get());
288 }
289
logUpdate(LogBuilder * logBuilder,StringData pathTaken,mutablebson::Element element,ModifyResult modifyResult) const290 void PushNode::logUpdate(LogBuilder* logBuilder,
291 StringData pathTaken,
292 mutablebson::Element element,
293 ModifyResult modifyResult) const {
294 invariant(logBuilder);
295
296 if (modifyResult == ModifyResult::kNormalUpdate || modifyResult == ModifyResult::kCreated) {
297 // Simple case: log the entires contents of the updated array.
298 uassertStatusOK(logBuilder->addToSetsWithNewFieldName(pathTaken, element));
299 } else if (modifyResult == ModifyResult::kArrayAppendUpdate) {
300 // This update only modified the array by appending entries to the end. Rather than writing
301 // out the entire contents of the array, we create oplog entries for the newly appended
302 // elements.
303 auto numAppended = _valuesToPush.size();
304 auto arraySize = countChildren(element);
305
306 invariant(arraySize > numAppended);
307 auto position = arraySize - numAppended;
308 for (const auto& valueToLog : _valuesToPush) {
309 std::string pathToArrayElement(str::stream() << pathTaken << "." << position);
310 uassertStatusOK(logBuilder->addToSetsWithNewFieldName(pathToArrayElement, valueToLog));
311
312 ++position;
313 }
314 } else {
315 MONGO_UNREACHABLE;
316 }
317 }
318
setValueForNewElement(mutablebson::Element * element) const319 void PushNode::setValueForNewElement(mutablebson::Element* element) const {
320 BSONObj emptyArray;
321 invariantOK(element->setValueArray(emptyArray));
322 (void)performPush(element, nullptr);
323 }
324
325 } // namespace mongo
326