1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/session.h"
36
37 #include "mongo/db/catalog/index_catalog.h"
38 #include "mongo/db/concurrency/write_conflict_exception.h"
39 #include "mongo/db/db_raii.h"
40 #include "mongo/db/dbdirectclient.h"
41 #include "mongo/db/index/index_access_method.h"
42 #include "mongo/db/namespace_string.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/ops/update.h"
45 #include "mongo/db/query/get_executor.h"
46 #include "mongo/db/repl/read_concern_args.h"
47 #include "mongo/db/retryable_writes_stats.h"
48 #include "mongo/db/transaction_history_iterator.h"
49 #include "mongo/stdx/memory.h"
50 #include "mongo/transport/transport_layer.h"
51 #include "mongo/util/fail_point_service.h"
52 #include "mongo/util/log.h"
53 #include "mongo/util/mongoutils/str.h"
54
55 namespace mongo {
56 namespace {
57
fassertOnRepeatedExecution(OperationContext * opCtx,const LogicalSessionId & lsid,TxnNumber txnNumber,StmtId stmtId,const repl::OpTime & firstOpTime,const repl::OpTime & secondOpTime)58 void fassertOnRepeatedExecution(OperationContext* opCtx,
59 const LogicalSessionId& lsid,
60 TxnNumber txnNumber,
61 StmtId stmtId,
62 const repl::OpTime& firstOpTime,
63 const repl::OpTime& secondOpTime) {
64 severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
65 << txnNumber << " ] was committed once with opTime " << firstOpTime
66 << " and a second time with opTime " << secondOpTime
67 << ". This indicates possible data corruption or server bug and the process will be "
68 "terminated.";
69 fassertFailed(40526);
70 }
71
72 struct ActiveTransactionHistory {
73 boost::optional<SessionTxnRecord> lastTxnRecord;
74 Session::CommittedStatementTimestampMap committedStatements;
75 bool hasIncompleteHistory{false};
76 };
77
fetchActiveTransactionHistory(OperationContext * opCtx,const LogicalSessionId & lsid)78 ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
79 const LogicalSessionId& lsid) {
80 ActiveTransactionHistory result;
81
82 result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> {
83 DBDirectClient client(opCtx);
84 auto result =
85 client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
86 {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
87 if (result.isEmpty()) {
88 return boost::none;
89 }
90
91 return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"),
92 result);
93 }();
94
95 if (!result.lastTxnRecord) {
96 return result;
97 }
98
99 auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
100 while (it.hasNext()) {
101 try {
102 const auto entry = it.next(opCtx);
103 invariant(entry.getStatementId());
104
105 if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
106 // Only the dead end sentinel can have this id for oplog write history
107 invariant(entry.getObject2());
108 invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
109 result.hasIncompleteHistory = true;
110 continue;
111 }
112
113 const auto insertRes =
114 result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
115 if (!insertRes.second) {
116 const auto& existingOpTime = insertRes.first->second;
117 fassertOnRepeatedExecution(opCtx,
118 lsid,
119 result.lastTxnRecord->getTxnNum(),
120 *entry.getStatementId(),
121 existingOpTime,
122 entry.getOpTime());
123 }
124 } catch (const DBException& ex) {
125 if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
126 result.hasIncompleteHistory = true;
127 break;
128 }
129
130 throw;
131 }
132 }
133
134 return result;
135 }
136
updateSessionEntry(OperationContext * opCtx,const UpdateRequest & updateRequest)137 void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
138 // Current code only supports replacement update.
139 dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates()));
140
141 AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
142
143 uassert(40527,
144 str::stream() << "Unable to persist transaction state because the session transaction "
145 "collection is missing. This indicates that the "
146 << NamespaceString::kSessionTransactionsTableNamespace.ns()
147 << " collection has been manually deleted.",
148 autoColl.getCollection());
149
150 WriteUnitOfWork wuow(opCtx);
151
152 auto collection = autoColl.getCollection();
153 auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx);
154
155 uassert(40672,
156 str::stream() << "Failed to fetch _id index for "
157 << NamespaceString::kSessionTransactionsTableNamespace.ns(),
158 idIndex);
159
160 auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex);
161 // Since we are looking up a key inside the _id index, create a key object consisting of only
162 // the _id field.
163 auto idToFetch = updateRequest.getQuery().firstElement();
164 auto toUpdateIdDoc = idToFetch.wrap();
165 dassert(idToFetch.fieldNameStringData() == "_id"_sd);
166 auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc);
167 auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId();
168
169 if (recordId.isNull()) {
170 // Upsert case.
171 auto status = collection->insertDocument(
172 opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, true, false);
173
174 if (status == ErrorCodes::DuplicateKey) {
175 throw WriteConflictException();
176 }
177
178 uassertStatusOK(status);
179 wuow.commit();
180 return;
181 }
182
183 auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId);
184 auto originalDoc = originalRecordData.toBson();
185
186 invariant(collection->getDefaultCollator() == nullptr);
187 boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr));
188
189 auto matcher = fassertStatusOK(
190 40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx)));
191 if (!matcher->matchesBSON(originalDoc)) {
192 // Document no longer match what we expect so throw WCE to make the caller re-examine.
193 throw WriteConflictException();
194 }
195
196 OplogUpdateEntryArgs args;
197 args.nss = NamespaceString::kSessionTransactionsTableNamespace;
198 args.uuid = collection->uuid();
199 args.update = updateRequest.getUpdates();
200 args.criteria = toUpdateIdDoc;
201 args.fromMigrate = false;
202
203 collection->updateDocument(opCtx,
204 recordId,
205 Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
206 updateRequest.getUpdates(),
207 true, // enforceQuota
208 false, // indexesAffected = false because _id is the only index
209 nullptr,
210 &args);
211
212 wuow.commit();
213 }
214
215 // Failpoint which allows different failure actions to happen after each write. Supports the
216 // parameters below, which can be combined with each other (unless explicitly disallowed):
217 //
218 // closeConnection (bool, default = true): Closes the connection on which the write was executed.
219 // failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception
220 // code will be thrown, which will cause the write to not commit; if not specified, the write
221 // will be allowed to commit.
222 MONGO_FP_DECLARE(onPrimaryTransactionalWrite);
223
224 } // namespace
225
226 const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
227
Session(LogicalSessionId sessionId)228 Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
229
refreshFromStorageIfNeeded(OperationContext * opCtx)230 void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
231 invariant(!opCtx->lockState()->isLocked());
232 invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
233 repl::ReadConcernLevel::kLocalReadConcern);
234
235 stdx::unique_lock<stdx::mutex> ul(_mutex);
236
237 while (!_isValid) {
238 const int numInvalidations = _numInvalidations;
239
240 ul.unlock();
241
242 auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId);
243
244 ul.lock();
245
246 // Protect against concurrent refreshes or invalidations
247 if (!_isValid && _numInvalidations == numInvalidations) {
248 _isValid = true;
249 _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
250
251 if (_lastWrittenSessionRecord) {
252 _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
253 _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
254 _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
255 }
256
257 break;
258 }
259 }
260 }
261
beginTxn(OperationContext * opCtx,TxnNumber txnNumber)262 void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
263 invariant(!opCtx->lockState()->isLocked());
264
265 stdx::lock_guard<stdx::mutex> lg(_mutex);
266 _beginTxn(lg, txnNumber);
267 }
268
onWriteOpCompletedOnPrimary(OperationContext * opCtx,TxnNumber txnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime,Date_t lastStmtIdWriteDate)269 void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
270 TxnNumber txnNumber,
271 std::vector<StmtId> stmtIdsWritten,
272 const repl::OpTime& lastStmtIdWriteOpTime,
273 Date_t lastStmtIdWriteDate) {
274 invariant(opCtx->lockState()->inAWriteUnitOfWork());
275
276 stdx::unique_lock<stdx::mutex> ul(_mutex);
277
278 // Sanity check that we don't double-execute statements
279 for (const auto stmtId : stmtIdsWritten) {
280 const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
281 if (stmtOpTime) {
282 fassertOnRepeatedExecution(
283 opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
284 }
285 }
286
287 const auto updateRequest =
288 _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
289
290 ul.unlock();
291
292 repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
293
294 updateSessionEntry(opCtx, updateRequest);
295 _registerUpdateCacheOnCommit(
296 opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
297 }
298
onMigrateCompletedOnPrimary(OperationContext * opCtx,TxnNumber txnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime,Date_t oplogLastStmtIdWriteDate)299 void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
300 TxnNumber txnNumber,
301 std::vector<StmtId> stmtIdsWritten,
302 const repl::OpTime& lastStmtIdWriteOpTime,
303 Date_t oplogLastStmtIdWriteDate) {
304 invariant(opCtx->lockState()->inAWriteUnitOfWork());
305
306 stdx::unique_lock<stdx::mutex> ul(_mutex);
307
308 _checkValid(ul);
309 _checkIsActiveTransaction(ul, txnNumber);
310
311 // We do not migrate transaction oplog entries.
312 const auto updateRequest =
313 _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate);
314
315 ul.unlock();
316
317 repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
318
319 updateSessionEntry(opCtx, updateRequest);
320 _registerUpdateCacheOnCommit(
321 opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
322 }
323
invalidate()324 void Session::invalidate() {
325 stdx::lock_guard<stdx::mutex> lg(_mutex);
326 _isValid = false;
327 _numInvalidations++;
328
329 _lastWrittenSessionRecord.reset();
330
331 _activeTxnNumber = kUninitializedTxnNumber;
332 _activeTxnCommittedStatements.clear();
333 _hasIncompleteHistory = false;
334 }
335
getLastWriteOpTime(TxnNumber txnNumber) const336 repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
337 stdx::lock_guard<stdx::mutex> lg(_mutex);
338 _checkValid(lg);
339 _checkIsActiveTransaction(lg, txnNumber);
340
341 if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
342 return {};
343
344 return _lastWrittenSessionRecord->getLastWriteOpTime();
345 }
346
checkStatementExecuted(OperationContext * opCtx,TxnNumber txnNumber,StmtId stmtId) const347 boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx,
348 TxnNumber txnNumber,
349 StmtId stmtId) const {
350 const auto stmtTimestamp = [&] {
351 stdx::lock_guard<stdx::mutex> lg(_mutex);
352 return _checkStatementExecuted(lg, txnNumber, stmtId);
353 }();
354
355 if (!stmtTimestamp)
356 return boost::none;
357
358 TransactionHistoryIterator txnIter(*stmtTimestamp);
359 while (txnIter.hasNext()) {
360 const auto entry = txnIter.next(opCtx);
361 invariant(entry.getStatementId());
362 if (*entry.getStatementId() == stmtId)
363 return entry;
364 }
365
366 MONGO_UNREACHABLE;
367 }
368
checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber,StmtId stmtId) const369 bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const {
370 stdx::lock_guard<stdx::mutex> lg(_mutex);
371 return bool(_checkStatementExecuted(lg, txnNumber, stmtId));
372 }
373
_beginTxn(WithLock wl,TxnNumber txnNumber)374 void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) {
375 _checkValid(wl);
376
377 uassert(ErrorCodes::TransactionTooOld,
378 str::stream() << "Cannot start transaction " << txnNumber << " on session "
379 << getSessionId()
380 << " because a newer transaction "
381 << _activeTxnNumber
382 << " has already started.",
383 txnNumber >= _activeTxnNumber);
384
385 // Check for continuing an existing transaction
386 if (txnNumber == _activeTxnNumber)
387 return;
388
389 _activeTxnNumber = txnNumber;
390 _activeTxnCommittedStatements.clear();
391 _hasIncompleteHistory = false;
392 }
393
_checkValid(WithLock) const394 void Session::_checkValid(WithLock) const {
395 uassert(ErrorCodes::ConflictingOperationInProgress,
396 str::stream() << "Session " << getSessionId()
397 << " was concurrently modified and the operation must be retried.",
398 _isValid);
399 }
400
_checkIsActiveTransaction(WithLock,TxnNumber txnNumber) const401 void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
402 uassert(ErrorCodes::ConflictingOperationInProgress,
403 str::stream() << "Cannot perform retryability check for transaction " << txnNumber
404 << " on session "
405 << getSessionId()
406 << " because a different transaction "
407 << _activeTxnNumber
408 << " is now active.",
409 txnNumber == _activeTxnNumber);
410 }
411
_checkStatementExecuted(WithLock wl,TxnNumber txnNumber,StmtId stmtId) const412 boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
413 TxnNumber txnNumber,
414 StmtId stmtId) const {
415 _checkValid(wl);
416 _checkIsActiveTransaction(wl, txnNumber);
417
418 const auto it = _activeTxnCommittedStatements.find(stmtId);
419 if (it == _activeTxnCommittedStatements.end()) {
420 uassert(ErrorCodes::IncompleteTransactionHistory,
421 str::stream() << "Incomplete history detected for transaction " << txnNumber
422 << " on session "
423 << _sessionId.toBSON(),
424 !_hasIncompleteHistory);
425
426 return boost::none;
427 }
428
429 invariant(_lastWrittenSessionRecord);
430 invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
431
432 return it->second;
433 }
434
_getLastWriteDate(WithLock wl,TxnNumber txnNumber) const435 Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
436 _checkValid(wl);
437 _checkIsActiveTransaction(wl, txnNumber);
438
439 if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
440 return {};
441
442 return _lastWrittenSessionRecord->getLastWriteDate();
443 }
444
_makeUpdateRequest(WithLock,TxnNumber newTxnNumber,const repl::OpTime & newLastWriteOpTime,Date_t newLastWriteDate) const445 UpdateRequest Session::_makeUpdateRequest(WithLock,
446 TxnNumber newTxnNumber,
447 const repl::OpTime& newLastWriteOpTime,
448 Date_t newLastWriteDate) const {
449 UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
450
451 const auto updateBSON = [&] {
452 SessionTxnRecord newTxnRecord;
453 newTxnRecord.setSessionId(_sessionId);
454 newTxnRecord.setTxnNum(newTxnNumber);
455 newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
456 newTxnRecord.setLastWriteDate(newLastWriteDate);
457 return newTxnRecord.toBSON();
458 }();
459 updateRequest.setUpdates(updateBSON);
460
461 if (_lastWrittenSessionRecord) {
462 updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
463 << _sessionId.toBSON()
464 << SessionTxnRecord::kTxnNumFieldName
465 << _lastWrittenSessionRecord->getTxnNum()
466 << SessionTxnRecord::kLastWriteOpTimeFieldName
467 << _lastWrittenSessionRecord->getLastWriteOpTime()));
468 } else {
469 updateRequest.setQuery(updateBSON);
470 updateRequest.setUpsert(true);
471 }
472
473 return updateRequest;
474 }
475
_registerUpdateCacheOnCommit(OperationContext * opCtx,TxnNumber newTxnNumber,std::vector<StmtId> stmtIdsWritten,const repl::OpTime & lastStmtIdWriteOpTime)476 void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
477 TxnNumber newTxnNumber,
478 std::vector<StmtId> stmtIdsWritten,
479 const repl::OpTime& lastStmtIdWriteOpTime) {
480 opCtx->recoveryUnit()->onCommit([
481 this,
482 opCtx,
483 newTxnNumber,
484 stmtIdsWritten = std::move(stmtIdsWritten),
485 lastStmtIdWriteOpTime
486 ] {
487 RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount();
488
489 stdx::lock_guard<stdx::mutex> lg(_mutex);
490
491 if (!_isValid)
492 return;
493
494 // The cache of the last written record must always be advanced after a write so that
495 // subsequent writes have the correct point to start from.
496 if (!_lastWrittenSessionRecord) {
497 _lastWrittenSessionRecord.emplace();
498
499 _lastWrittenSessionRecord->setSessionId(_sessionId);
500 _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
501 _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
502 } else {
503 if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
504 _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
505
506 if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
507 _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
508 }
509
510 if (newTxnNumber > _activeTxnNumber) {
511 // This call is necessary in order to advance the txn number and reset the cached state
512 // in the case where just before the storage transaction commits, the cache entry gets
513 // invalidated and immediately refreshed while there were no writes for newTxnNumber
514 // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to
515 // update the cache even though the write was successful.
516 _beginTxn(lg, newTxnNumber);
517 }
518
519 if (newTxnNumber == _activeTxnNumber) {
520 for (const auto stmtId : stmtIdsWritten) {
521 if (stmtId == kIncompleteHistoryStmtId) {
522 _hasIncompleteHistory = true;
523 continue;
524 }
525
526 const auto insertRes =
527 _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
528 if (!insertRes.second) {
529 const auto& existingOpTime = insertRes.first->second;
530 fassertOnRepeatedExecution(opCtx,
531 _sessionId,
532 newTxnNumber,
533 stmtId,
534 existingOpTime,
535 lastStmtIdWriteOpTime);
536 }
537 }
538 }
539 });
540
541 MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
542 const auto& data = customArgs.getData();
543
544 const auto closeConnectionElem = data["closeConnection"];
545 if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
546 auto transportSession = opCtx->getClient()->session();
547 transportSession->getTransportLayer()->end(transportSession);
548 }
549
550 const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
551 if (!failBeforeCommitExceptionElem.eoo()) {
552 const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
553 uasserted(failureCode,
554 str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber
555 << " due to failpoint. The write must not be reflected.");
556 }
557 }
558 }
559
createMatchingTransactionTableUpdate(const repl::OplogEntry & entry)560 boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate(
561 const repl::OplogEntry& entry) {
562 auto sessionInfo = entry.getOperationSessionInfo();
563 if (!sessionInfo.getTxnNumber()) {
564 return boost::none;
565 }
566
567 invariant(sessionInfo.getSessionId());
568 invariant(entry.getWallClockTime());
569
570 const auto updateBSON = [&] {
571 SessionTxnRecord newTxnRecord;
572 newTxnRecord.setSessionId(*sessionInfo.getSessionId());
573 newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
574 newTxnRecord.setLastWriteOpTime(entry.getOpTime());
575 newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
576 return newTxnRecord.toBSON();
577 }();
578
579 return repl::OplogEntry(
580 entry.getOpTime(),
581 0, // hash
582 repl::OpTypeEnum::kUpdate,
583 NamespaceString::kSessionTransactionsTableNamespace,
584 boost::none, // uuid
585 false, // fromMigrate
586 repl::OplogEntry::kOplogVersion,
587 updateBSON,
588 BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
589 {}, // sessionInfo
590 true, // upsert
591 *entry.getWallClockTime(),
592 boost::none, // statementId
593 boost::none, // prevWriteOpTime
594 boost::none, // preImangeOpTime
595 boost::none // postImageOpTime
596 );
597 }
598
599 } // namespace mongo
600