1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongorestore 8 9import ( 10 "fmt" 11 "strconv" 12 "strings" 13 14 "github.com/mongodb/mongo-tools-common/db" 15 "github.com/mongodb/mongo-tools-common/intents" 16 "github.com/mongodb/mongo-tools-common/log" 17 "github.com/mongodb/mongo-tools-common/progress" 18 "github.com/mongodb/mongo-tools-common/txn" 19 "github.com/mongodb/mongo-tools-common/util" 20 "go.mongodb.org/mongo-driver/bson" 21 "go.mongodb.org/mongo-driver/bson/primitive" 22 "go.mongodb.org/mongo-driver/mongo" 23) 24 25// oplogMaxCommandSize sets the maximum size for multiple buffered ops in the 26// applyOps command. This is to prevent pathological cases where the array overhead 27// of many small operations can overflow the maximum command size. 28// Note that ops > 8MB will still be buffered, just as single elements. 29const oplogMaxCommandSize = 1024 * 1024 * 8 30 31type oplogContext struct { 32 progressor *progress.CountProgressor 33 session *mongo.Client 34 totalOps int 35 txnBuffer *txn.Buffer 36} 37 38// RestoreOplog attempts to restore a MongoDB oplog. 39func (restore *MongoRestore) RestoreOplog() error { 40 log.Logv(log.Always, "replaying oplog") 41 intent := restore.manager.Oplog() 42 if intent == nil { 43 // this should not be reached 44 log.Logv(log.Always, "no oplog file provided, skipping oplog application") 45 return nil 46 } 47 if err := intent.BSONFile.Open(); err != nil { 48 return err 49 } 50 if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { 51 fileNeedsIOBuffer.TakeIOBuffer(make([]byte, db.MaxBSONSize)) 52 } 53 defer intent.BSONFile.Close() 54 55 // NewBufferlessBSONSource reads each bson document into its own buffer 56 // because bson.Unmarshal currently can't unmarshal binary types without 57 // them referencing the source buffer. 58 // We also increase the max BSON size by 16 KiB to accommodate the maximum 59 // document size of 16 MiB plus any additional oplog-specific data. 60 bsonSource := db.NewBufferlessBSONSource(intent.BSONFile) 61 bsonSource.SetMaxBSONSize(db.MaxBSONSize + 16*1024) 62 decodedBsonSource := db.NewDecodedBSONSource(bsonSource) 63 defer decodedBsonSource.Close() 64 65 session, err := restore.SessionProvider.GetSession() 66 if err != nil { 67 return fmt.Errorf("error establishing connection: %v", err) 68 } 69 70 oplogCtx := &oplogContext{ 71 progressor: progress.NewCounter(intent.BSONSize), 72 txnBuffer: txn.NewBuffer(), 73 session: session, 74 } 75 defer oplogCtx.txnBuffer.Stop() 76 77 if restore.ProgressManager != nil { 78 restore.ProgressManager.Attach("oplog", oplogCtx.progressor) 79 defer restore.ProgressManager.Detach("oplog") 80 } 81 82 for { 83 rawOplogEntry := decodedBsonSource.LoadNext() 84 if rawOplogEntry == nil { 85 break 86 } 87 oplogCtx.progressor.Inc(int64(len(rawOplogEntry))) 88 89 entryAsOplog := db.Oplog{} 90 err = bson.Unmarshal(rawOplogEntry, &entryAsOplog) 91 if err != nil { 92 return fmt.Errorf("error reading oplog: %v", err) 93 } 94 if entryAsOplog.Operation == "n" { 95 //skip no-ops 96 continue 97 } 98 if !restore.TimestampBeforeLimit(entryAsOplog.Timestamp) { 99 log.Logvf( 100 log.DebugLow, 101 "timestamp %v is not below limit of %v; ending oplog restoration", 102 entryAsOplog.Timestamp, 103 restore.oplogLimit, 104 ) 105 break 106 } 107 108 meta, err := txn.NewMeta(entryAsOplog) 109 if err != nil { 110 return fmt.Errorf("error getting op metadata: %v", err) 111 } 112 113 if meta.IsTxn() { 114 err := restore.HandleTxnOp(oplogCtx, meta, entryAsOplog) 115 if err != nil { 116 return fmt.Errorf("error handling transaction oplog entry: %v", err) 117 } 118 } else { 119 err := restore.HandleNonTxnOp(oplogCtx, entryAsOplog) 120 if err != nil { 121 return fmt.Errorf("error applying oplog: %v", err) 122 } 123 } 124 125 } 126 if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { 127 fileNeedsIOBuffer.ReleaseIOBuffer() 128 } 129 130 log.Logvf(log.Always, "applied %v oplog entries", oplogCtx.totalOps) 131 if err := decodedBsonSource.Err(); err != nil { 132 return fmt.Errorf("error reading oplog bson input: %v", err) 133 } 134 return nil 135 136} 137 138func (restore *MongoRestore) HandleNonTxnOp(oplogCtx *oplogContext, op db.Oplog) error { 139 oplogCtx.totalOps++ 140 141 op, err := restore.filterUUIDs(op) 142 if err != nil { 143 return fmt.Errorf("error filtering UUIDs from oplog: %v", err) 144 } 145 146 return restore.ApplyOps(oplogCtx.session, []interface{}{op}) 147} 148 149func (restore *MongoRestore) HandleTxnOp(oplogCtx *oplogContext, meta txn.Meta, op db.Oplog) error { 150 151 err := oplogCtx.txnBuffer.AddOp(meta, op) 152 if err != nil { 153 return fmt.Errorf("error buffering transaction oplog entry: %v", err) 154 } 155 156 if meta.IsAbort() { 157 err := oplogCtx.txnBuffer.PurgeTxn(meta) 158 if err != nil { 159 return fmt.Errorf("error cleaning up transaction buffer on abort: %v", err) 160 } 161 return nil 162 } 163 164 if !meta.IsCommit() { 165 return nil 166 } 167 168 // From here, we're applying transaction entries 169 ops, errs := oplogCtx.txnBuffer.GetTxnStream(meta) 170 171Loop: 172 for { 173 select { 174 case o, ok := <-ops: 175 if !ok { 176 break Loop 177 } 178 err = restore.HandleNonTxnOp(oplogCtx, o) 179 if err != nil { 180 return fmt.Errorf("error applying transaction op: %v", err) 181 } 182 case err := <-errs: 183 if err != nil { 184 return fmt.Errorf("error replaying transaction: %v", err) 185 } 186 break Loop 187 } 188 } 189 190 err = oplogCtx.txnBuffer.PurgeTxn(meta) 191 if err != nil { 192 return fmt.Errorf("error cleaning up transaction buffer: %v", err) 193 } 194 195 return nil 196} 197 198// ApplyOps is a wrapper for the applyOps database command, we pass in 199// a session to avoid opening a new connection for a few inserts at a time. 200func (restore *MongoRestore) ApplyOps(session *mongo.Client, entries []interface{}) error { 201 singleRes := session.Database("admin").RunCommand(nil, bson.D{{"applyOps", entries}}) 202 if err := singleRes.Err(); err != nil { 203 return fmt.Errorf("applyOps: %v", err) 204 } 205 res := bson.M{} 206 singleRes.Decode(&res) 207 if util.IsFalsy(res["ok"]) { 208 return fmt.Errorf("applyOps command: %v", res["errmsg"]) 209 } 210 211 return nil 212} 213 214// TimestampBeforeLimit returns true if the given timestamp is allowed to be 215// applied to mongorestore's target database. 216func (restore *MongoRestore) TimestampBeforeLimit(ts primitive.Timestamp) bool { 217 if restore.oplogLimit.T == 0 && restore.oplogLimit.I == 0 { 218 // always valid if there is no --oplogLimit set 219 return true 220 } 221 return util.TimestampGreaterThan(restore.oplogLimit, ts) 222} 223 224// ParseTimestampFlag takes in a string the form of <time_t>:<ordinal>, 225// where <time_t> is the seconds since the UNIX epoch, and <ordinal> represents 226// a counter of operations in the oplog that occurred in the specified second. 227// It parses this timestamp string and returns a bson.MongoTimestamp type. 228func ParseTimestampFlag(ts string) (primitive.Timestamp, error) { 229 var seconds, increment int 230 timestampFields := strings.Split(ts, ":") 231 if len(timestampFields) > 2 { 232 return primitive.Timestamp{}, fmt.Errorf("too many : characters") 233 } 234 235 seconds, err := strconv.Atoi(timestampFields[0]) 236 if err != nil { 237 return primitive.Timestamp{}, fmt.Errorf("error parsing timestamp seconds: %v", err) 238 } 239 240 // parse the increment field if it exists 241 if len(timestampFields) == 2 { 242 if len(timestampFields[1]) > 0 { 243 increment, err = strconv.Atoi(timestampFields[1]) 244 if err != nil { 245 return primitive.Timestamp{}, fmt.Errorf("error parsing timestamp increment: %v", err) 246 } 247 } else { 248 // handle the case where the user writes "<time_t>:" with no ordinal 249 increment = 0 250 } 251 } 252 253 return primitive.Timestamp{T: uint32(seconds), I: uint32(increment)}, nil 254} 255 256// Server versions 3.6.0-3.6.8 and 4.0.0-4.0.2 require a 'ui' field 257// in the createIndexes command. 258func (restore *MongoRestore) needsCreateIndexWorkaround() bool { 259 sv := restore.serverVersion 260 if (sv.GTE(db.Version{3, 6, 0}) && sv.LTE(db.Version{3, 6, 8})) || 261 (sv.GTE(db.Version{4, 0, 0}) && sv.LTE(db.Version{4, 0, 2})) { 262 return true 263 } 264 return false 265} 266 267// filterUUIDs removes 'ui' entries from ops, including nested applyOps ops. 268// It also modifies ops that rely on 'ui'. 269func (restore *MongoRestore) filterUUIDs(op db.Oplog) (db.Oplog, error) { 270 // Remove UUIDs from oplog entries 271 if !restore.OutputOptions.PreserveUUID { 272 op.UI = nil 273 274 // The createIndexes oplog command requires 'ui' for some server versions, so 275 // in that case we fall back to an old-style system.indexes insert. 276 if op.Operation == "c" && op.Object[0].Key == "createIndexes" && restore.needsCreateIndexWorkaround() { 277 return convertCreateIndexToIndexInsert(op) 278 } 279 } 280 281 // Check for and filter nested applyOps ops 282 if op.Operation == "c" && isApplyOpsCmd(op.Object) { 283 filtered, err := restore.newFilteredApplyOps(op.Object) 284 if err != nil { 285 return db.Oplog{}, err 286 } 287 op.Object = filtered 288 } 289 290 return op, nil 291} 292 293// convertCreateIndexToIndexInsert converts from new-style create indexes 294// command to old style special index insert. 295func convertCreateIndexToIndexInsert(op db.Oplog) (db.Oplog, error) { 296 dbName, _ := util.SplitNamespace(op.Namespace) 297 298 cmdValue := op.Object[0].Value 299 collName, ok := cmdValue.(string) 300 if !ok { 301 return db.Oplog{}, fmt.Errorf("unknown format for createIndexes") 302 } 303 304 indexSpec := op.Object[1:] 305 if len(indexSpec) < 3 { 306 return db.Oplog{}, fmt.Errorf("unknown format for createIndexes, index spec " + 307 "must have at least \"v\", \"key\", and \"name\" fields") 308 } 309 310 // createIndexes does not include the "ns" field but index inserts 311 // do. Add it as the third field, after "v", "key", and "name". 312 ns := bson.D{{"ns", fmt.Sprintf("%s.%s", dbName, collName)}} 313 indexSpec = append(indexSpec[:3], append(ns, indexSpec[3:]...)...) 314 op.Object = indexSpec 315 op.Namespace = fmt.Sprintf("%s.system.indexes", dbName) 316 op.Operation = "i" 317 318 return op, nil 319} 320 321// isApplyOpsCmd returns true if a document seems to be an applyOps command. 322func isApplyOpsCmd(cmd bson.D) bool { 323 for _, v := range cmd { 324 if v.Key == "applyOps" { 325 return true 326 } 327 } 328 return false 329} 330 331// newFilteredApplyOps iterates over nested ops in an applyOps document and 332// returns a new applyOps document that omits the 'ui' field from nested ops. 333func (restore *MongoRestore) newFilteredApplyOps(cmd bson.D) (bson.D, error) { 334 ops, err := unwrapNestedApplyOps(cmd) 335 if err != nil { 336 return nil, err 337 } 338 339 filtered := make([]db.Oplog, len(ops)) 340 for i, v := range ops { 341 filtered[i], err = restore.filterUUIDs(v) 342 if err != nil { 343 return nil, err 344 } 345 } 346 347 doc, err := wrapNestedApplyOps(filtered) 348 if err != nil { 349 return nil, err 350 } 351 352 return doc, nil 353} 354 355// nestedApplyOps models an applyOps command document 356type nestedApplyOps struct { 357 ApplyOps []db.Oplog `bson:"applyOps"` 358} 359 360// unwrapNestedApplyOps converts a bson.D to a typed data structure. 361// Unfortunately, we're forced to convert by marshaling to bytes and 362// unmarshalling. 363func unwrapNestedApplyOps(doc bson.D) ([]db.Oplog, error) { 364 // Doc to bytes 365 bs, err := bson.Marshal(doc) 366 if err != nil { 367 return nil, fmt.Errorf("cannot remarshal nested applyOps: %s", err) 368 } 369 370 // Bytes to typed data 371 var cmd nestedApplyOps 372 err = bson.Unmarshal(bs, &cmd) 373 if err != nil { 374 return nil, fmt.Errorf("cannot unwrap nested applyOps: %s", err) 375 } 376 377 return cmd.ApplyOps, nil 378} 379 380// wrapNestedApplyOps converts a typed data structure to a bson.D. 381// Unfortunately, we're forced to convert by marshaling to bytes and 382// unmarshalling. 383func wrapNestedApplyOps(ops []db.Oplog) (bson.D, error) { 384 cmd := &nestedApplyOps{ApplyOps: ops} 385 386 // Typed data to bytes 387 raw, err := bson.Marshal(cmd) 388 if err != nil { 389 return nil, fmt.Errorf("cannot rewrap nested applyOps op: %s", err) 390 } 391 392 // Bytes to doc 393 var doc bson.D 394 err = bson.Unmarshal(raw, &doc) 395 if err != nil { 396 return nil, fmt.Errorf("cannot reunmarshal nested applyOps op: %s", err) 397 } 398 399 return doc, nil 400} 401