1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongorestore 8 9import ( 10 "fmt" 11 "io/ioutil" 12 "strings" 13 "time" 14 15 "github.com/mongodb/mongo-tools-common/bsonutil" 16 "github.com/mongodb/mongo-tools-common/db" 17 "github.com/mongodb/mongo-tools-common/intents" 18 "github.com/mongodb/mongo-tools-common/log" 19 "github.com/mongodb/mongo-tools-common/progress" 20 "github.com/mongodb/mongo-tools-common/util" 21 22 "go.mongodb.org/mongo-driver/bson" 23 "go.mongodb.org/mongo-driver/bson/primitive" 24 "go.mongodb.org/mongo-driver/mongo" 25) 26 27const insertBufferFactor = 16 28 29// validIndexOptions are taken from https://github.com/mongodb/mongo/blob/master/src/mongo/db/index/index_descriptor.h 30var validIndexOptions = map[string]bool{ 31 "2dsphereIndexVersion": true, 32 "background": true, 33 "bits": true, 34 "bucketSize": true, 35 "coarsestIndexedLevel": true, 36 "collation": true, 37 "default_language": true, 38 "expireAfterSeconds": true, 39 "finestIndexedLevel": true, 40 "key": true, 41 "language_override": true, 42 "max": true, 43 "min": true, 44 "name": true, 45 "ns": true, 46 "partialFilterExpression": true, 47 "sparse": true, 48 "storageEngine": true, 49 "textIndexVersion": true, 50 "unique": true, 51 "v": true, 52 "weights": true, 53 "wildcardProjection": true, 54} 55 56// Result encapsulates the outcome of a particular restore attempt. 57type Result struct { 58 Successes int64 59 Failures int64 60 Err error 61} 62 63// log pretty-prints the result, associated with restoring the given namespace 64func (result *Result) log(ns string) { 65 log.Logvf(log.Always, "finished restoring %v (%v %v, %v %v)", 66 ns, result.Successes, util.Pluralize(int(result.Successes), "document", "documents"), 67 result.Failures, util.Pluralize(int(result.Failures), "failure", "failures")) 68} 69 70// combineWith sums the successes and failures from both results and the overwrites the existing Err with the Err from 71// the provided result. 72func (result *Result) combineWith(other Result) { 73 result.Successes += other.Successes 74 result.Failures += other.Failures 75 result.Err = other.Err 76} 77 78// withErr returns a copy of the current result with the provided error 79func (result Result) withErr(err error) Result { 80 result.Err = err 81 return result 82} 83 84func NewResultFromBulkResult(result *mongo.BulkWriteResult, err error) Result { 85 if result == nil { 86 return Result{} 87 } 88 89 nSuccess := result.InsertedCount 90 var nFailure int64 91 92 // if a write concern error is encountered, the failure count may be inaccurate. 93 if bwe, ok := err.(mongo.BulkWriteException); ok { 94 nFailure = int64(len(bwe.WriteErrors)) 95 } 96 97 return Result{nSuccess, nFailure, err} 98} 99 100// RestoreIntents iterates through all of the intents stored in the IntentManager, and restores them. 101func (restore *MongoRestore) RestoreIntents() Result { 102 log.Logvf(log.DebugLow, "restoring up to %v collections in parallel", restore.OutputOptions.NumParallelCollections) 103 104 if restore.OutputOptions.NumParallelCollections > 0 { 105 resultChan := make(chan Result) 106 107 // start a goroutine for each job thread 108 for i := 0; i < restore.OutputOptions.NumParallelCollections; i++ { 109 go func(id int) { 110 var workerResult Result 111 log.Logvf(log.DebugHigh, "starting restore routine with id=%v", id) 112 var ioBuf []byte 113 for { 114 intent := restore.manager.Pop() 115 if intent == nil { 116 log.Logvf(log.DebugHigh, "ending restore routine with id=%v, no more work to do", id) 117 resultChan <- workerResult // done 118 return 119 } 120 if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { 121 if ioBuf == nil { 122 ioBuf = make([]byte, db.MaxBSONSize) 123 } 124 fileNeedsIOBuffer.TakeIOBuffer(ioBuf) 125 } 126 result := restore.RestoreIntent(intent) 127 result.log(intent.Namespace()) 128 workerResult.combineWith(result) 129 if result.Err != nil { 130 resultChan <- workerResult.withErr(fmt.Errorf("%v: %v", intent.Namespace(), result.Err)) 131 return 132 } 133 restore.manager.Finish(intent) 134 if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok { 135 fileNeedsIOBuffer.ReleaseIOBuffer() 136 } 137 138 } 139 }(i) 140 } 141 142 var totalResult Result 143 // wait until all goroutines are done or one of them errors out 144 for i := 0; i < restore.OutputOptions.NumParallelCollections; i++ { 145 result := <-resultChan 146 totalResult.combineWith(result) 147 if totalResult.Err != nil { 148 return totalResult 149 } 150 } 151 return totalResult 152 } 153 154 var totalResult Result 155 // single-threaded 156 for { 157 intent := restore.manager.Pop() 158 if intent == nil { 159 break 160 } 161 result := restore.RestoreIntent(intent) 162 result.log(intent.Namespace()) 163 totalResult.combineWith(result) 164 if result.Err != nil { 165 return totalResult.withErr(fmt.Errorf("%v: %v", intent.Namespace(), result.Err)) 166 } 167 restore.manager.Finish(intent) 168 } 169 return totalResult 170} 171 172// RestoreIntent attempts to restore a given intent into MongoDB. 173func (restore *MongoRestore) RestoreIntent(intent *intents.Intent) Result { 174 175 collectionExists, err := restore.CollectionExists(intent) 176 if err != nil { 177 return Result{Err: fmt.Errorf("error reading database: %v", err)} 178 } 179 180 if !restore.OutputOptions.Drop && collectionExists { 181 log.Logvf(log.Always, "restoring to existing collection %v without dropping", intent.Namespace()) 182 } 183 184 if restore.OutputOptions.Drop { 185 if collectionExists { 186 if strings.HasPrefix(intent.C, "system.") { 187 log.Logvf(log.Always, "cannot drop system collection %v, skipping", intent.Namespace()) 188 } else { 189 log.Logvf(log.Info, "dropping collection %v before restoring", intent.Namespace()) 190 err = restore.DropCollection(intent) 191 if err != nil { 192 return Result{Err: err} // no context needed 193 } 194 collectionExists = false 195 } 196 } else { 197 log.Logvf(log.DebugLow, "collection %v doesn't exist, skipping drop command", intent.Namespace()) 198 } 199 } 200 201 var options bson.D 202 var indexes []IndexDocument 203 var uuid string 204 205 // get indexes from system.indexes dump if we have it but don't have metadata files 206 if intent.MetadataFile == nil { 207 if _, ok := restore.dbCollectionIndexes[intent.DB]; ok { 208 if indexes, ok = restore.dbCollectionIndexes[intent.DB][intent.C]; ok { 209 log.Logvf(log.Always, "no metadata; falling back to system.indexes") 210 } 211 } 212 } 213 214 logMessageSuffix := "with no metadata" 215 var hasNonSimpleCollation bool 216 // first create the collection with options from the metadata file 217 if intent.MetadataFile != nil { 218 logMessageSuffix = "using options from metadata" 219 err = intent.MetadataFile.Open() 220 if err != nil { 221 return Result{Err: err} 222 } 223 defer intent.MetadataFile.Close() 224 225 log.Logvf(log.Always, "reading metadata for %v from %v", intent.Namespace(), intent.MetadataLocation) 226 metadataJSON, err := ioutil.ReadAll(intent.MetadataFile) 227 if err != nil { 228 return Result{Err: fmt.Errorf("error reading metadata from %v: %v", intent.MetadataLocation, err)} 229 } 230 metadata, err := restore.MetadataFromJSON(metadataJSON) 231 if err != nil { 232 return Result{Err: fmt.Errorf("error parsing metadata from %v: %v", intent.MetadataLocation, err)} 233 } 234 if metadata != nil { 235 options = metadata.Options 236 indexes = metadata.Indexes 237 if restore.OutputOptions.PreserveUUID { 238 if metadata.UUID == "" { 239 return Result{Err: fmt.Errorf("--preserveUUID used but no UUID found in %v", intent.MetadataLocation)} 240 } 241 uuid = metadata.UUID 242 } 243 244 collation, err := bsonutil.FindSubdocumentByKey("collation", &options) 245 if err == nil { 246 localeValue, err := bsonutil.FindValueByKey("locale", &collation) 247 if err == nil { 248 hasNonSimpleCollation = localeValue != "simple" 249 } 250 } 251 } 252 253 // The only way to specify options on the idIndex is at collection creation time. 254 // This loop pulls out the idIndex from `indexes` and sets it in `options`. 255 for i, index := range indexes { 256 // The index with the name "_id_" will always be the idIndex. 257 if index.Options["name"].(string) == "_id_" { 258 // Remove the index version (to use the default) unless otherwise specified. 259 // If preserving UUID, we have to create a collection via 260 // applyops, which requires the "v" key. 261 if !restore.OutputOptions.KeepIndexVersion && !restore.OutputOptions.PreserveUUID { 262 delete(index.Options, "v") 263 } 264 index.Options["ns"] = intent.Namespace() 265 266 // If the collection has an idIndex, then we are about to create it, so 267 // ignore the value of autoIndexId. 268 for j, opt := range options { 269 if opt.Key == "autoIndexId" { 270 options = append(options[:j], options[j+1:]...) 271 } 272 } 273 options = append(options, bson.E{"idIndex", index}) 274 indexes = append(indexes[:i], indexes[i+1:]...) 275 break 276 } 277 } 278 279 if restore.OutputOptions.NoOptionsRestore { 280 log.Logv(log.Info, "not restoring collection options") 281 logMessageSuffix = "with no collection options" 282 options = nil 283 } 284 } 285 if !collectionExists { 286 log.Logvf(log.Info, "creating collection %v %s", intent.Namespace(), logMessageSuffix) 287 log.Logvf(log.DebugHigh, "using collection options: %#v", options) 288 err = restore.CreateCollection(intent, options, uuid) 289 if err != nil { 290 return Result{Err: fmt.Errorf("error creating collection %v: %v", intent.Namespace(), err)} 291 } 292 restore.addToKnownCollections(intent) 293 } else { 294 log.Logvf(log.Info, "collection %v already exists - skipping collection create", intent.Namespace()) 295 } 296 297 var result Result 298 if intent.BSONFile != nil { 299 err = intent.BSONFile.Open() 300 if err != nil { 301 return Result{Err: err} 302 } 303 defer intent.BSONFile.Close() 304 305 log.Logvf(log.Always, "restoring %v from %v", intent.Namespace(), intent.Location) 306 307 bsonSource := db.NewDecodedBSONSource(db.NewBSONSource(intent.BSONFile)) 308 defer bsonSource.Close() 309 310 result = restore.RestoreCollectionToDB(intent.DB, intent.C, bsonSource, intent.BSONFile, intent.Size) 311 if result.Err != nil { 312 result.Err = fmt.Errorf("error restoring from %v: %v", intent.Location, result.Err) 313 return result 314 } 315 } 316 317 // finally, add indexes 318 if len(indexes) > 0 && !restore.OutputOptions.NoIndexRestore { 319 log.Logvf(log.Always, "restoring indexes for collection %v from metadata", intent.Namespace()) 320 if restore.OutputOptions.ConvertLegacyIndexes { 321 convertLegacyIndexes(indexes) 322 } 323 if restore.OutputOptions.FixDottedHashedIndexes { 324 fixDottedHashedIndexes(indexes) 325 } 326 err = restore.CreateIndexes(intent, indexes, hasNonSimpleCollation) 327 if err != nil { 328 result.Err = fmt.Errorf("error creating indexes for %v: %v", intent.Namespace(), err) 329 return result 330 } 331 } else { 332 log.Logv(log.Always, "no indexes to restore") 333 } 334 335 return result 336} 337 338func convertLegacyIndexes(indexes []IndexDocument) { 339 for _, index := range indexes { 340 convertLegacyIndexKeys(index) 341 convertLegacyIndexOptions(index) 342 } 343} 344 345func fixDottedHashedIndexes(indexes []IndexDocument) { 346 for _, index := range indexes { 347 fixDottedHashedIndex(index) 348 } 349} 350 351// fixDottedHashedIndex fixes the issue introduced by a server bug where hashed index constraints are not 352// correctly enforced under all circumstance by changing the hashed index on the dotted field to an 353// ascending single field index. 354func fixDottedHashedIndex(index IndexDocument) { 355 indexFields := index.Key 356 for i, field := range indexFields { 357 fieldName := field.Key 358 if strings.Contains(fieldName, ".") && field.Value == "hashed" { 359 // Change the hashed index to single field index 360 indexFields[i].Value = int32(1) 361 } 362 } 363} 364 365func convertLegacyIndexKeys(index IndexDocument) { 366 var converted bool 367 originalJSONString := createExtJSONString(index.Key) 368 for j, elem := range index.Key { 369 switch v := elem.Value.(type) { 370 case int32, int64, float64: 371 // Only convert 0 value 372 if v == 0 { 373 index.Key[j].Value = 1 374 converted = true 375 } 376 case primitive.Decimal128: 377 // Note, this doesn't catch Decimal values which are equivalent to "0" (e.g. 0.00 or -0). 378 // These values are so unlikely we just ignore them 379 zeroVal, err := primitive.ParseDecimal128("0") 380 if err == nil { 381 if v == zeroVal { 382 index.Key[j].Value = 1 383 converted = true 384 } 385 } 386 case string: 387 // Only convert an empty string 388 if v == "" { 389 index.Key[j].Value = 1 390 converted = true 391 } 392 default: 393 // Convert all types that aren't strings or numbers 394 index.Key[j].Value = 1 395 converted = true 396 } 397 } 398 if converted { 399 newJSONString := createExtJSONString(index.Key) 400 log.Logvf(log.Always, "convertLegacyIndexes: converted index values '%s' to '%s' on collection '%s'", 401 originalJSONString, newJSONString, index.Options["ns"]) 402 } 403} 404 405func convertLegacyIndexOptions(index IndexDocument) { 406 var converted bool 407 originalJSONString := createExtJSONString(index.Options) 408 for key := range index.Options { 409 if _, ok := validIndexOptions[key]; !ok { 410 delete(index.Options, key) 411 converted = true 412 } 413 } 414 if converted { 415 newJSONString := createExtJSONString(index.Options) 416 log.Logvf(log.Always, "convertLegacyIndexes: converted index options '%s' to '%s'", 417 originalJSONString, newJSONString) 418 } 419} 420 421func createExtJSONString(doc interface{}) string { 422 // by default return "<unable to format document>"" since we don't 423 // want to throw an error when formatting informational messages. 424 // An error would be inconsequential. 425 JSONString := "<unable to format document>" 426 JSONBytes, err := bson.MarshalExtJSON(doc, false, false) 427 if err == nil { 428 JSONString = string(JSONBytes) 429 } 430 return JSONString 431} 432 433// RestoreCollectionToDB pipes the given BSON data into the database. 434// Returns the number of documents restored and any errors that occurred. 435func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string, 436 bsonSource *db.DecodedBSONSource, file PosReader, fileSize int64) Result { 437 438 var termErr error 439 session, err := restore.SessionProvider.GetSession() 440 if err != nil { 441 return Result{Err: fmt.Errorf("error establishing connection: %v", err)} 442 } 443 444 collection := session.Database(dbName).Collection(colName) 445 446 documentCount := int64(0) 447 watchProgressor := progress.NewCounter(fileSize) 448 if restore.ProgressManager != nil { 449 name := fmt.Sprintf("%v.%v", dbName, colName) 450 restore.ProgressManager.Attach(name, watchProgressor) 451 defer restore.ProgressManager.Detach(name) 452 } 453 454 maxInsertWorkers := restore.OutputOptions.NumInsertionWorkers 455 456 docChan := make(chan bson.Raw, insertBufferFactor) 457 resultChan := make(chan Result, maxInsertWorkers) 458 459 // stream documents for this collection on docChan 460 go func() { 461 for { 462 doc := bsonSource.LoadNext() 463 if doc == nil { 464 break 465 } 466 select { 467 case <-restore.termChan: 468 log.Logvf(log.Always, "terminating read on %v.%v", dbName, colName) 469 termErr = util.ErrTerminated 470 close(docChan) 471 return 472 default: 473 rawBytes := make([]byte, len(doc)) 474 copy(rawBytes, doc) 475 docChan <- bson.Raw(rawBytes) 476 documentCount++ 477 } 478 } 479 close(docChan) 480 }() 481 482 log.Logvf(log.DebugLow, "using %v insertion workers", maxInsertWorkers) 483 484 for i := 0; i < maxInsertWorkers; i++ { 485 go func() { 486 var result Result 487 488 bulk := db.NewUnorderedBufferedBulkInserter(collection, restore.OutputOptions.BulkBufferSize). 489 SetOrdered(restore.OutputOptions.MaintainInsertionOrder) 490 bulk.SetBypassDocumentValidation(restore.OutputOptions.BypassDocumentValidation) 491 for rawDoc := range docChan { 492 if restore.objCheck { 493 result.Err = bson.Unmarshal(rawDoc, &bson.D{}) 494 if result.Err != nil { 495 resultChan <- result 496 return 497 } 498 } 499 result.combineWith(NewResultFromBulkResult(bulk.InsertRaw(rawDoc))) 500 result.Err = db.FilterError(restore.OutputOptions.StopOnError, result.Err) 501 if result.Err != nil { 502 resultChan <- result 503 return 504 } 505 watchProgressor.Set(file.Pos()) 506 } 507 // flush the remaining docs 508 result.combineWith(NewResultFromBulkResult(bulk.Flush())) 509 resultChan <- result.withErr(db.FilterError(restore.OutputOptions.StopOnError, result.Err)) 510 return 511 }() 512 513 // sleep to prevent all threads from inserting at the same time at start 514 time.Sleep(time.Duration(i) * 10 * time.Millisecond) 515 } 516 517 var totalResult Result 518 var finalErr error 519 520 // wait until all insert jobs finish 521 for done := 0; done < maxInsertWorkers; done++ { 522 totalResult.combineWith(<-resultChan) 523 if finalErr == nil && totalResult.Err != nil { 524 finalErr = totalResult.Err 525 close(restore.termChan) 526 } 527 } 528 529 if finalErr != nil { 530 totalResult.Err = finalErr 531 } else if err = bsonSource.Err(); err != nil { 532 totalResult.Err = fmt.Errorf("reading bson input: %v", err) 533 } else if termErr != nil { 534 totalResult.Err = termErr 535 } 536 return totalResult 537} 538