1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/session.h"
36 
37 #include "mongo/db/catalog/index_catalog.h"
38 #include "mongo/db/concurrency/write_conflict_exception.h"
39 #include "mongo/db/db_raii.h"
40 #include "mongo/db/dbdirectclient.h"
41 #include "mongo/db/index/index_access_method.h"
42 #include "mongo/db/namespace_string.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/ops/update.h"
45 #include "mongo/db/query/get_executor.h"
46 #include "mongo/db/repl/read_concern_args.h"
47 #include "mongo/db/retryable_writes_stats.h"
48 #include "mongo/db/transaction_history_iterator.h"
49 #include "mongo/stdx/memory.h"
50 #include "mongo/transport/transport_layer.h"
51 #include "mongo/util/fail_point_service.h"
52 #include "mongo/util/log.h"
53 #include "mongo/util/mongoutils/str.h"
54 
55 namespace mongo {
56 namespace {
57 
fassertOnRepeatedExecution(OperationContext * opCtx,const LogicalSessionId & lsid,TxnNumber txnNumber,StmtId stmtId,const repl::OpTime & firstOpTime,const repl::OpTime & secondOpTime)58 void fassertOnRepeatedExecution(OperationContext* opCtx,
59                                 const LogicalSessionId& lsid,
60                                 TxnNumber txnNumber,
61                                 StmtId stmtId,
62                                 const repl::OpTime& firstOpTime,
63                                 const repl::OpTime& secondOpTime) {
64     severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
65              << txnNumber << " ] was committed once with opTime " << firstOpTime
66              << " and a second time with opTime " << secondOpTime
67              << ". This indicates possible data corruption or server bug and the process will be "
68                 "terminated.";
69     fassertFailed(40526);
70 }
71 
72 struct ActiveTransactionHistory {
73     boost::optional<SessionTxnRecord> lastTxnRecord;
74     Session::CommittedStatementTimestampMap committedStatements;
75     bool hasIncompleteHistory{false};
76 };
77 
fetchActiveTransactionHistory(OperationContext * opCtx,const LogicalSessionId & lsid)78 ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
79                                                        const LogicalSessionId& lsid) {
80     ActiveTransactionHistory result;
81 
82     result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> {
83         DBDirectClient client(opCtx);
84         auto result =
85             client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
86                            {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
87         if (result.isEmpty()) {
88             return boost::none;
89         }
90 
91         return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"),
92                                        result);
93     }();
94 
95     if (!result.lastTxnRecord) {
96         return result;
97     }
98 
99     auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
100     while (it.hasNext()) {
101         try {
102             const auto entry = it.next(opCtx);
103             invariant(entry.getStatementId());
104 
105             if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
106                 // Only the dead end sentinel can have this id for oplog write history
107                 invariant(entry.getObject2());
108                 invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
109                 result.hasIncompleteHistory = true;
110                 continue;
111             }
112 
113             const auto insertRes =
114                 result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
115             if (!insertRes.second) {
116                 const auto& existingOpTime = insertRes.first->second;
117                 fassertOnRepeatedExecution(opCtx,
118                                            lsid,
119                                            result.lastTxnRecord->getTxnNum(),
120                                            *entry.getStatementId(),
121                                            existingOpTime,
122                                            entry.getOpTime());
123             }
124         } catch (const DBException& ex) {
125             if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
126                 result.hasIncompleteHistory = true;
127                 break;
128             }
129 
130             throw;
131         }
132     }
133 
134     return result;
135 }
136 
updateSessionEntry(OperationContext * opCtx,const UpdateRequest & updateRequest)137 void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
138     // Current code only supports replacement update.
139     dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates()));
140 
141     AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
142 
143     uassert(40527,
144             str::stream() << "Unable to persist transaction state because the session transaction "
145                              "collection is missing. This indicates that the "
146                           << NamespaceString::kSessionTransactionsTableNamespace.ns()
147                           << " collection has been manually deleted.",
148             autoColl.getCollection());
149 
150     WriteUnitOfWork wuow(opCtx);
151 
152     auto collection = autoColl.getCollection();
153     auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx);
154 
155     uassert(40672,
156             str::stream() << "Failed to fetch _id index for "
157                           << NamespaceString::kSessionTransactionsTableNamespace.ns(),
158             idIndex);
159 
160     auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex);
161     // Since we are looking up a key inside the _id index, create a key object consisting of only
162     // the _id field.
163     auto idToFetch = updateRequest.getQuery().firstElement();
164     auto toUpdateIdDoc = idToFetch.wrap();
165     dassert(idToFetch.fieldNameStringData() == "_id"_sd);
166     auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc);
167     auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId();
168 
169     if (recordId.isNull()) {
170         // Upsert case.
171         auto status = collection->insertDocument(
172             opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, true, false);
173 
174         if (status == ErrorCodes::DuplicateKey) {
175             throw WriteConflictException();
176         }
177 
178         uassertStatusOK(status);
179         wuow.commit();
180         return;
181     }
182 
183     auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId);
184     auto originalDoc = originalRecordData.toBson();
185 
186     invariant(collection->getDefaultCollator() == nullptr);
187     boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr));
188 
189     auto matcher = fassertStatusOK(
190         40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx)));
191     if (!matcher->matchesBSON(originalDoc)) {
192         // Document no longer match what we expect so throw WCE to make the caller re-examine.
193         throw WriteConflictException();
194     }
195 
196     OplogUpdateEntryArgs args;
197     args.nss = NamespaceString::kSessionTransactionsTableNamespace;
198     args.uuid = collection->uuid();
199     args.update = updateRequest.getUpdates();
200     args.criteria = toUpdateIdDoc;
201     args.fromMigrate = false;
202 
203     collection->updateDocument(opCtx,
204                                recordId,
205                                Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
206                                updateRequest.getUpdates(),
207                                true,   // enforceQuota
208                                false,  // indexesAffected = false because _id is the only index
209                                nullptr,
210                                &args);
211 
212     wuow.commit();
213 }
214 
215 // Failpoint which allows different failure actions to happen after each write. Supports the
216 // parameters below, which can be combined with each other (unless explicitly disallowed):
217 //
218 // closeConnection (bool, default = true): Closes the connection on which the write was executed.
219 // failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception
220 //      code will be thrown, which will cause the write to not commit; if not specified, the write
221 //      will be allowed to commit.
222 MONGO_FP_DECLARE(onPrimaryTransactionalWrite);
223 
224 }  // namespace
225 
226 const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
227 
Session(LogicalSessionId sessionId)228 Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
229 
refreshFromStorageIfNeeded(OperationContext * opCtx)230 void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
231     invariant(!opCtx->lockState()->isLocked());
232     invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
233               repl::ReadConcernLevel::kLocalReadConcern);
234 
235     stdx::unique_lock<stdx::mutex> ul(_mutex);
236 
237     while (!_isValid) {
238         const int numInvalidations = _numInvalidations;
239 
240         ul.unlock();
241 
242         auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId);
243 
244         ul.lock();
245 
246         // Protect against concurrent refreshes or invalidations
247         if (!_isValid && _numInvalidations == numInvalidations) {
248             _isValid = true;
249             _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
250 
251             if (_lastWrittenSessionRecord) {
252                 _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
253                 _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
254                 _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
255             }
256 
257             break;
258         }
259     }
260 }
261 
beginTxn(OperationContext * opCtx,TxnNumber txnNumber)262 void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
263     invariant(!opCtx->lockState()->isLocked());
264 
265     stdx::lock_guard<stdx::mutex> lg(_mutex);
266     _beginTxn(lg, txnNumber);
267 }
268 
onWriteOpCompletedOnPrimary(OperationContext * opCtx,TxnNumber txnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime,Date_t lastStmtIdWriteDate)269 void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
270                                           TxnNumber txnNumber,
271                                           std::vector<StmtId> stmtIdsWritten,
272                                           const repl::OpTime& lastStmtIdWriteOpTime,
273                                           Date_t lastStmtIdWriteDate) {
274     invariant(opCtx->lockState()->inAWriteUnitOfWork());
275 
276     stdx::unique_lock<stdx::mutex> ul(_mutex);
277 
278     // Sanity check that we don't double-execute statements
279     for (const auto stmtId : stmtIdsWritten) {
280         const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
281         if (stmtOpTime) {
282             fassertOnRepeatedExecution(
283                 opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
284         }
285     }
286 
287     const auto updateRequest =
288         _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
289 
290     ul.unlock();
291 
292     repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
293 
294     updateSessionEntry(opCtx, updateRequest);
295     _registerUpdateCacheOnCommit(
296         opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
297 }
298 
onMigrateCompletedOnPrimary(OperationContext * opCtx,TxnNumber txnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime,Date_t oplogLastStmtIdWriteDate)299 void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
300                                           TxnNumber txnNumber,
301                                           std::vector<StmtId> stmtIdsWritten,
302                                           const repl::OpTime& lastStmtIdWriteOpTime,
303                                           Date_t oplogLastStmtIdWriteDate) {
304     invariant(opCtx->lockState()->inAWriteUnitOfWork());
305 
306     stdx::unique_lock<stdx::mutex> ul(_mutex);
307 
308     _checkValid(ul);
309     _checkIsActiveTransaction(ul, txnNumber);
310 
311     // We do not migrate transaction oplog entries.
312     const auto updateRequest =
313         _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate);
314 
315     ul.unlock();
316 
317     repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
318 
319     updateSessionEntry(opCtx, updateRequest);
320     _registerUpdateCacheOnCommit(
321         opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
322 }
323 
invalidate()324 void Session::invalidate() {
325     stdx::lock_guard<stdx::mutex> lg(_mutex);
326     _isValid = false;
327     _numInvalidations++;
328 
329     _lastWrittenSessionRecord.reset();
330 
331     _activeTxnNumber = kUninitializedTxnNumber;
332     _activeTxnCommittedStatements.clear();
333     _hasIncompleteHistory = false;
334 }
335 
getLastWriteOpTime(TxnNumber txnNumber) const336 repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
337     stdx::lock_guard<stdx::mutex> lg(_mutex);
338     _checkValid(lg);
339     _checkIsActiveTransaction(lg, txnNumber);
340 
341     if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
342         return {};
343 
344     return _lastWrittenSessionRecord->getLastWriteOpTime();
345 }
346 
checkStatementExecuted(OperationContext * opCtx,TxnNumber txnNumber,StmtId stmtId) const347 boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx,
348                                                                   TxnNumber txnNumber,
349                                                                   StmtId stmtId) const {
350     const auto stmtTimestamp = [&] {
351         stdx::lock_guard<stdx::mutex> lg(_mutex);
352         return _checkStatementExecuted(lg, txnNumber, stmtId);
353     }();
354 
355     if (!stmtTimestamp)
356         return boost::none;
357 
358     TransactionHistoryIterator txnIter(*stmtTimestamp);
359     while (txnIter.hasNext()) {
360         const auto entry = txnIter.next(opCtx);
361         invariant(entry.getStatementId());
362         if (*entry.getStatementId() == stmtId)
363             return entry;
364     }
365 
366     MONGO_UNREACHABLE;
367 }
368 
checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber,StmtId stmtId) const369 bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const {
370     stdx::lock_guard<stdx::mutex> lg(_mutex);
371     return bool(_checkStatementExecuted(lg, txnNumber, stmtId));
372 }
373 
_beginTxn(WithLock wl,TxnNumber txnNumber)374 void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) {
375     _checkValid(wl);
376 
377     uassert(ErrorCodes::TransactionTooOld,
378             str::stream() << "Cannot start transaction " << txnNumber << " on session "
379                           << getSessionId()
380                           << " because a newer transaction "
381                           << _activeTxnNumber
382                           << " has already started.",
383             txnNumber >= _activeTxnNumber);
384 
385     // Check for continuing an existing transaction
386     if (txnNumber == _activeTxnNumber)
387         return;
388 
389     _activeTxnNumber = txnNumber;
390     _activeTxnCommittedStatements.clear();
391     _hasIncompleteHistory = false;
392 }
393 
_checkValid(WithLock) const394 void Session::_checkValid(WithLock) const {
395     uassert(ErrorCodes::ConflictingOperationInProgress,
396             str::stream() << "Session " << getSessionId()
397                           << " was concurrently modified and the operation must be retried.",
398             _isValid);
399 }
400 
_checkIsActiveTransaction(WithLock,TxnNumber txnNumber) const401 void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
402     uassert(ErrorCodes::ConflictingOperationInProgress,
403             str::stream() << "Cannot perform retryability check for transaction " << txnNumber
404                           << " on session "
405                           << getSessionId()
406                           << " because a different transaction "
407                           << _activeTxnNumber
408                           << " is now active.",
409             txnNumber == _activeTxnNumber);
410 }
411 
_checkStatementExecuted(WithLock wl,TxnNumber txnNumber,StmtId stmtId) const412 boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
413                                                                TxnNumber txnNumber,
414                                                                StmtId stmtId) const {
415     _checkValid(wl);
416     _checkIsActiveTransaction(wl, txnNumber);
417 
418     const auto it = _activeTxnCommittedStatements.find(stmtId);
419     if (it == _activeTxnCommittedStatements.end()) {
420         uassert(ErrorCodes::IncompleteTransactionHistory,
421                 str::stream() << "Incomplete history detected for transaction " << txnNumber
422                               << " on session "
423                               << _sessionId.toBSON(),
424                 !_hasIncompleteHistory);
425 
426         return boost::none;
427     }
428 
429     invariant(_lastWrittenSessionRecord);
430     invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
431 
432     return it->second;
433 }
434 
_getLastWriteDate(WithLock wl,TxnNumber txnNumber) const435 Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
436     _checkValid(wl);
437     _checkIsActiveTransaction(wl, txnNumber);
438 
439     if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
440         return {};
441 
442     return _lastWrittenSessionRecord->getLastWriteDate();
443 }
444 
_makeUpdateRequest(WithLock,TxnNumber newTxnNumber,const repl::OpTime & newLastWriteOpTime,Date_t newLastWriteDate) const445 UpdateRequest Session::_makeUpdateRequest(WithLock,
446                                           TxnNumber newTxnNumber,
447                                           const repl::OpTime& newLastWriteOpTime,
448                                           Date_t newLastWriteDate) const {
449     UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
450 
451     const auto updateBSON = [&] {
452         SessionTxnRecord newTxnRecord;
453         newTxnRecord.setSessionId(_sessionId);
454         newTxnRecord.setTxnNum(newTxnNumber);
455         newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
456         newTxnRecord.setLastWriteDate(newLastWriteDate);
457         return newTxnRecord.toBSON();
458     }();
459     updateRequest.setUpdates(updateBSON);
460 
461     if (_lastWrittenSessionRecord) {
462         updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
463                                     << _sessionId.toBSON()
464                                     << SessionTxnRecord::kTxnNumFieldName
465                                     << _lastWrittenSessionRecord->getTxnNum()
466                                     << SessionTxnRecord::kLastWriteOpTimeFieldName
467                                     << _lastWrittenSessionRecord->getLastWriteOpTime()));
468     } else {
469         updateRequest.setQuery(updateBSON);
470         updateRequest.setUpsert(true);
471     }
472 
473     return updateRequest;
474 }
475 
_registerUpdateCacheOnCommit(OperationContext * opCtx,TxnNumber newTxnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime)476 void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
477                                            TxnNumber newTxnNumber,
478                                            std::vector<StmtId> stmtIdsWritten,
479                                            const repl::OpTime& lastStmtIdWriteOpTime) {
480     opCtx->recoveryUnit()->onCommit([
481         this,
482         opCtx,
483         newTxnNumber,
484         stmtIdsWritten = std::move(stmtIdsWritten),
485         lastStmtIdWriteOpTime
486     ] {
487         RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount();
488 
489         stdx::lock_guard<stdx::mutex> lg(_mutex);
490 
491         if (!_isValid)
492             return;
493 
494         // The cache of the last written record must always be advanced after a write so that
495         // subsequent writes have the correct point to start from.
496         if (!_lastWrittenSessionRecord) {
497             _lastWrittenSessionRecord.emplace();
498 
499             _lastWrittenSessionRecord->setSessionId(_sessionId);
500             _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
501             _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
502         } else {
503             if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
504                 _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
505 
506             if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
507                 _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
508         }
509 
510         if (newTxnNumber > _activeTxnNumber) {
511             // This call is necessary in order to advance the txn number and reset the cached state
512             // in the case where just before the storage transaction commits, the cache entry gets
513             // invalidated and immediately refreshed while there were no writes for newTxnNumber
514             // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to
515             // update the cache even though the write was successful.
516             _beginTxn(lg, newTxnNumber);
517         }
518 
519         if (newTxnNumber == _activeTxnNumber) {
520             for (const auto stmtId : stmtIdsWritten) {
521                 if (stmtId == kIncompleteHistoryStmtId) {
522                     _hasIncompleteHistory = true;
523                     continue;
524                 }
525 
526                 const auto insertRes =
527                     _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
528                 if (!insertRes.second) {
529                     const auto& existingOpTime = insertRes.first->second;
530                     fassertOnRepeatedExecution(opCtx,
531                                                _sessionId,
532                                                newTxnNumber,
533                                                stmtId,
534                                                existingOpTime,
535                                                lastStmtIdWriteOpTime);
536                 }
537             }
538         }
539     });
540 
541     MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
542         const auto& data = customArgs.getData();
543 
544         const auto closeConnectionElem = data["closeConnection"];
545         if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
546             auto transportSession = opCtx->getClient()->session();
547             transportSession->getTransportLayer()->end(transportSession);
548         }
549 
550         const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
551         if (!failBeforeCommitExceptionElem.eoo()) {
552             const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
553             uasserted(failureCode,
554                       str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber
555                                     << " due to failpoint. The write must not be reflected.");
556         }
557     }
558 }
559 
createMatchingTransactionTableUpdate(const repl::OplogEntry & entry)560 boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate(
561     const repl::OplogEntry& entry) {
562     auto sessionInfo = entry.getOperationSessionInfo();
563     if (!sessionInfo.getTxnNumber()) {
564         return boost::none;
565     }
566 
567     invariant(sessionInfo.getSessionId());
568     invariant(entry.getWallClockTime());
569 
570     const auto updateBSON = [&] {
571         SessionTxnRecord newTxnRecord;
572         newTxnRecord.setSessionId(*sessionInfo.getSessionId());
573         newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
574         newTxnRecord.setLastWriteOpTime(entry.getOpTime());
575         newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
576         return newTxnRecord.toBSON();
577     }();
578 
579     return repl::OplogEntry(
580         entry.getOpTime(),
581         0,  // hash
582         repl::OpTypeEnum::kUpdate,
583         NamespaceString::kSessionTransactionsTableNamespace,
584         boost::none,  // uuid
585         false,        // fromMigrate
586         repl::OplogEntry::kOplogVersion,
587         updateBSON,
588         BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
589         {},    // sessionInfo
590         true,  // upsert
591         *entry.getWallClockTime(),
592         boost::none,  // statementId
593         boost::none,  // prevWriteOpTime
594         boost::none,  // preImangeOpTime
595         boost::none   // postImageOpTime
596         );
597 }
598 
599 }  // namespace mongo
600