1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongorestore
8
9import (
10	"fmt"
11	"io/ioutil"
12	"strings"
13	"time"
14
15	"github.com/mongodb/mongo-tools-common/bsonutil"
16	"github.com/mongodb/mongo-tools-common/db"
17	"github.com/mongodb/mongo-tools-common/intents"
18	"github.com/mongodb/mongo-tools-common/log"
19	"github.com/mongodb/mongo-tools-common/progress"
20	"github.com/mongodb/mongo-tools-common/util"
21
22	"go.mongodb.org/mongo-driver/bson"
23	"go.mongodb.org/mongo-driver/bson/primitive"
24	"go.mongodb.org/mongo-driver/mongo"
25)
26
27const insertBufferFactor = 16
28
29// validIndexOptions are taken from https://github.com/mongodb/mongo/blob/master/src/mongo/db/index/index_descriptor.h
30var validIndexOptions = map[string]bool{
31	"2dsphereIndexVersion":    true,
32	"background":              true,
33	"bits":                    true,
34	"bucketSize":              true,
35	"coarsestIndexedLevel":    true,
36	"collation":               true,
37	"default_language":        true,
38	"expireAfterSeconds":      true,
39	"finestIndexedLevel":      true,
40	"key":                     true,
41	"language_override":       true,
42	"max":                     true,
43	"min":                     true,
44	"name":                    true,
45	"ns":                      true,
46	"partialFilterExpression": true,
47	"sparse":                  true,
48	"storageEngine":           true,
49	"textIndexVersion":        true,
50	"unique":                  true,
51	"v":                       true,
52	"weights":                 true,
53	"wildcardProjection":      true,
54}
55
56// Result encapsulates the outcome of a particular restore attempt.
57type Result struct {
58	Successes int64
59	Failures  int64
60	Err       error
61}
62
63// log pretty-prints the result, associated with restoring the given namespace
64func (result *Result) log(ns string) {
65	log.Logvf(log.Always, "finished restoring %v (%v %v, %v %v)",
66		ns, result.Successes, util.Pluralize(int(result.Successes), "document", "documents"),
67		result.Failures, util.Pluralize(int(result.Failures), "failure", "failures"))
68}
69
70// combineWith sums the successes and failures from both results and the overwrites the existing Err with the Err from
71// the provided result.
72func (result *Result) combineWith(other Result) {
73	result.Successes += other.Successes
74	result.Failures += other.Failures
75	result.Err = other.Err
76}
77
78// withErr returns a copy of the current result with the provided error
79func (result Result) withErr(err error) Result {
80	result.Err = err
81	return result
82}
83
84func NewResultFromBulkResult(result *mongo.BulkWriteResult, err error) Result {
85	if result == nil {
86		return Result{}
87	}
88
89	nSuccess := result.InsertedCount
90	var nFailure int64
91
92	// if a write concern error is encountered, the failure count may be inaccurate.
93	if bwe, ok := err.(mongo.BulkWriteException); ok {
94		nFailure = int64(len(bwe.WriteErrors))
95	}
96
97	return Result{nSuccess, nFailure, err}
98}
99
100// RestoreIntents iterates through all of the intents stored in the IntentManager, and restores them.
101func (restore *MongoRestore) RestoreIntents() Result {
102	log.Logvf(log.DebugLow, "restoring up to %v collections in parallel", restore.OutputOptions.NumParallelCollections)
103
104	if restore.OutputOptions.NumParallelCollections > 0 {
105		resultChan := make(chan Result)
106
107		// start a goroutine for each job thread
108		for i := 0; i < restore.OutputOptions.NumParallelCollections; i++ {
109			go func(id int) {
110				var workerResult Result
111				log.Logvf(log.DebugHigh, "starting restore routine with id=%v", id)
112				var ioBuf []byte
113				for {
114					intent := restore.manager.Pop()
115					if intent == nil {
116						log.Logvf(log.DebugHigh, "ending restore routine with id=%v, no more work to do", id)
117						resultChan <- workerResult // done
118						return
119					}
120					if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok {
121						if ioBuf == nil {
122							ioBuf = make([]byte, db.MaxBSONSize)
123						}
124						fileNeedsIOBuffer.TakeIOBuffer(ioBuf)
125					}
126					result := restore.RestoreIntent(intent)
127					result.log(intent.Namespace())
128					workerResult.combineWith(result)
129					if result.Err != nil {
130						resultChan <- workerResult.withErr(fmt.Errorf("%v: %v", intent.Namespace(), result.Err))
131						return
132					}
133					restore.manager.Finish(intent)
134					if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok {
135						fileNeedsIOBuffer.ReleaseIOBuffer()
136					}
137
138				}
139			}(i)
140		}
141
142		var totalResult Result
143		// wait until all goroutines are done or one of them errors out
144		for i := 0; i < restore.OutputOptions.NumParallelCollections; i++ {
145			result := <-resultChan
146			totalResult.combineWith(result)
147			if totalResult.Err != nil {
148				return totalResult
149			}
150		}
151		return totalResult
152	}
153
154	var totalResult Result
155	// single-threaded
156	for {
157		intent := restore.manager.Pop()
158		if intent == nil {
159			break
160		}
161		result := restore.RestoreIntent(intent)
162		result.log(intent.Namespace())
163		totalResult.combineWith(result)
164		if result.Err != nil {
165			return totalResult.withErr(fmt.Errorf("%v: %v", intent.Namespace(), result.Err))
166		}
167		restore.manager.Finish(intent)
168	}
169	return totalResult
170}
171
172// RestoreIntent attempts to restore a given intent into MongoDB.
173func (restore *MongoRestore) RestoreIntent(intent *intents.Intent) Result {
174
175	collectionExists, err := restore.CollectionExists(intent)
176	if err != nil {
177		return Result{Err: fmt.Errorf("error reading database: %v", err)}
178	}
179
180	if !restore.OutputOptions.Drop && collectionExists {
181		log.Logvf(log.Always, "restoring to existing collection %v without dropping", intent.Namespace())
182	}
183
184	if restore.OutputOptions.Drop {
185		if collectionExists {
186			if strings.HasPrefix(intent.C, "system.") {
187				log.Logvf(log.Always, "cannot drop system collection %v, skipping", intent.Namespace())
188			} else {
189				log.Logvf(log.Info, "dropping collection %v before restoring", intent.Namespace())
190				err = restore.DropCollection(intent)
191				if err != nil {
192					return Result{Err: err} // no context needed
193				}
194				collectionExists = false
195			}
196		} else {
197			log.Logvf(log.DebugLow, "collection %v doesn't exist, skipping drop command", intent.Namespace())
198		}
199	}
200
201	var options bson.D
202	var indexes []IndexDocument
203	var uuid string
204
205	// get indexes from system.indexes dump if we have it but don't have metadata files
206	if intent.MetadataFile == nil {
207		if _, ok := restore.dbCollectionIndexes[intent.DB]; ok {
208			if indexes, ok = restore.dbCollectionIndexes[intent.DB][intent.C]; ok {
209				log.Logvf(log.Always, "no metadata; falling back to system.indexes")
210			}
211		}
212	}
213
214	logMessageSuffix := "with no metadata"
215	var hasNonSimpleCollation bool
216	// first create the collection with options from the metadata file
217	if intent.MetadataFile != nil {
218		logMessageSuffix = "using options from metadata"
219		err = intent.MetadataFile.Open()
220		if err != nil {
221			return Result{Err: err}
222		}
223		defer intent.MetadataFile.Close()
224
225		log.Logvf(log.Always, "reading metadata for %v from %v", intent.Namespace(), intent.MetadataLocation)
226		metadataJSON, err := ioutil.ReadAll(intent.MetadataFile)
227		if err != nil {
228			return Result{Err: fmt.Errorf("error reading metadata from %v: %v", intent.MetadataLocation, err)}
229		}
230		metadata, err := restore.MetadataFromJSON(metadataJSON)
231		if err != nil {
232			return Result{Err: fmt.Errorf("error parsing metadata from %v: %v", intent.MetadataLocation, err)}
233		}
234		if metadata != nil {
235			options = metadata.Options
236			indexes = metadata.Indexes
237			if restore.OutputOptions.PreserveUUID {
238				if metadata.UUID == "" {
239					return Result{Err: fmt.Errorf("--preserveUUID used but no UUID found in %v", intent.MetadataLocation)}
240				}
241				uuid = metadata.UUID
242			}
243
244			collation, err := bsonutil.FindSubdocumentByKey("collation", &options)
245			if err == nil {
246				localeValue, err := bsonutil.FindValueByKey("locale", &collation)
247				if err == nil {
248					hasNonSimpleCollation = localeValue != "simple"
249				}
250			}
251		}
252
253		// The only way to specify options on the idIndex is at collection creation time.
254		// This loop pulls out the idIndex from `indexes` and sets it in `options`.
255		for i, index := range indexes {
256			// The index with the name "_id_" will always be the idIndex.
257			if index.Options["name"].(string) == "_id_" {
258				// Remove the index version (to use the default) unless otherwise specified.
259				// If preserving UUID, we have to create a collection via
260				// applyops, which requires the "v" key.
261				if !restore.OutputOptions.KeepIndexVersion && !restore.OutputOptions.PreserveUUID {
262					delete(index.Options, "v")
263				}
264				index.Options["ns"] = intent.Namespace()
265
266				// If the collection has an idIndex, then we are about to create it, so
267				// ignore the value of autoIndexId.
268				for j, opt := range options {
269					if opt.Key == "autoIndexId" {
270						options = append(options[:j], options[j+1:]...)
271					}
272				}
273				options = append(options, bson.E{"idIndex", index})
274				indexes = append(indexes[:i], indexes[i+1:]...)
275				break
276			}
277		}
278
279		if restore.OutputOptions.NoOptionsRestore {
280			log.Logv(log.Info, "not restoring collection options")
281			logMessageSuffix = "with no collection options"
282			options = nil
283		}
284	}
285	if !collectionExists {
286		log.Logvf(log.Info, "creating collection %v %s", intent.Namespace(), logMessageSuffix)
287		log.Logvf(log.DebugHigh, "using collection options: %#v", options)
288		err = restore.CreateCollection(intent, options, uuid)
289		if err != nil {
290			return Result{Err: fmt.Errorf("error creating collection %v: %v", intent.Namespace(), err)}
291		}
292		restore.addToKnownCollections(intent)
293	} else {
294		log.Logvf(log.Info, "collection %v already exists - skipping collection create", intent.Namespace())
295	}
296
297	var result Result
298	if intent.BSONFile != nil {
299		err = intent.BSONFile.Open()
300		if err != nil {
301			return Result{Err: err}
302		}
303		defer intent.BSONFile.Close()
304
305		log.Logvf(log.Always, "restoring %v from %v", intent.Namespace(), intent.Location)
306
307		bsonSource := db.NewDecodedBSONSource(db.NewBSONSource(intent.BSONFile))
308		defer bsonSource.Close()
309
310		result = restore.RestoreCollectionToDB(intent.DB, intent.C, bsonSource, intent.BSONFile, intent.Size)
311		if result.Err != nil {
312			result.Err = fmt.Errorf("error restoring from %v: %v", intent.Location, result.Err)
313			return result
314		}
315	}
316
317	// finally, add indexes
318	if len(indexes) > 0 && !restore.OutputOptions.NoIndexRestore {
319		log.Logvf(log.Always, "restoring indexes for collection %v from metadata", intent.Namespace())
320		if restore.OutputOptions.ConvertLegacyIndexes {
321			convertLegacyIndexes(indexes)
322		}
323		if restore.OutputOptions.FixDottedHashedIndexes {
324			fixDottedHashedIndexes(indexes)
325		}
326		err = restore.CreateIndexes(intent, indexes, hasNonSimpleCollation)
327		if err != nil {
328			result.Err = fmt.Errorf("error creating indexes for %v: %v", intent.Namespace(), err)
329			return result
330		}
331	} else {
332		log.Logv(log.Always, "no indexes to restore")
333	}
334
335	return result
336}
337
338func convertLegacyIndexes(indexes []IndexDocument) {
339	for _, index := range indexes {
340		convertLegacyIndexKeys(index)
341		convertLegacyIndexOptions(index)
342	}
343}
344
345func fixDottedHashedIndexes(indexes []IndexDocument) {
346	for _, index := range indexes {
347		fixDottedHashedIndex(index)
348	}
349}
350
351// fixDottedHashedIndex fixes the issue introduced by a server bug where hashed index constraints are not
352// correctly enforced under all circumstance by changing the hashed index on the dotted field to an
353// ascending single field index.
354func fixDottedHashedIndex(index IndexDocument) {
355	indexFields := index.Key
356	for i, field := range indexFields {
357		fieldName := field.Key
358		if strings.Contains(fieldName, ".") && field.Value == "hashed" {
359			// Change the hashed index to single field index
360			indexFields[i].Value = int32(1)
361		}
362	}
363}
364
365func convertLegacyIndexKeys(index IndexDocument) {
366	var converted bool
367	originalJSONString := createExtJSONString(index.Key)
368	for j, elem := range index.Key {
369		switch v := elem.Value.(type) {
370		case int32, int64, float64:
371			// Only convert 0 value
372			if v == 0 {
373				index.Key[j].Value = 1
374				converted = true
375			}
376		case primitive.Decimal128:
377			// Note, this doesn't catch Decimal values which are equivalent to "0" (e.g. 0.00 or -0).
378			// These values are so unlikely we just ignore them
379			zeroVal, err := primitive.ParseDecimal128("0")
380			if err == nil {
381				if v == zeroVal {
382					index.Key[j].Value = 1
383					converted = true
384				}
385			}
386		case string:
387			// Only convert an empty string
388			if v == "" {
389				index.Key[j].Value = 1
390				converted = true
391			}
392		default:
393			// Convert all types that aren't strings or numbers
394			index.Key[j].Value = 1
395			converted = true
396		}
397	}
398	if converted {
399		newJSONString := createExtJSONString(index.Key)
400		log.Logvf(log.Always, "convertLegacyIndexes: converted index values '%s' to '%s' on collection '%s'",
401			originalJSONString, newJSONString, index.Options["ns"])
402	}
403}
404
405func convertLegacyIndexOptions(index IndexDocument) {
406	var converted bool
407	originalJSONString := createExtJSONString(index.Options)
408	for key := range index.Options {
409		if _, ok := validIndexOptions[key]; !ok {
410			delete(index.Options, key)
411			converted = true
412		}
413	}
414	if converted {
415		newJSONString := createExtJSONString(index.Options)
416		log.Logvf(log.Always, "convertLegacyIndexes: converted index options '%s' to '%s'",
417			originalJSONString, newJSONString)
418	}
419}
420
421func createExtJSONString(doc interface{}) string {
422	// by default return "<unable to format document>"" since we don't
423	// want to throw an error when formatting informational messages.
424	// An error would be inconsequential.
425	JSONString := "<unable to format document>"
426	JSONBytes, err := bson.MarshalExtJSON(doc, false, false)
427	if err == nil {
428		JSONString = string(JSONBytes)
429	}
430	return JSONString
431}
432
433// RestoreCollectionToDB pipes the given BSON data into the database.
434// Returns the number of documents restored and any errors that occurred.
435func (restore *MongoRestore) RestoreCollectionToDB(dbName, colName string,
436	bsonSource *db.DecodedBSONSource, file PosReader, fileSize int64) Result {
437
438	var termErr error
439	session, err := restore.SessionProvider.GetSession()
440	if err != nil {
441		return Result{Err: fmt.Errorf("error establishing connection: %v", err)}
442	}
443
444	collection := session.Database(dbName).Collection(colName)
445
446	documentCount := int64(0)
447	watchProgressor := progress.NewCounter(fileSize)
448	if restore.ProgressManager != nil {
449		name := fmt.Sprintf("%v.%v", dbName, colName)
450		restore.ProgressManager.Attach(name, watchProgressor)
451		defer restore.ProgressManager.Detach(name)
452	}
453
454	maxInsertWorkers := restore.OutputOptions.NumInsertionWorkers
455
456	docChan := make(chan bson.Raw, insertBufferFactor)
457	resultChan := make(chan Result, maxInsertWorkers)
458
459	// stream documents for this collection on docChan
460	go func() {
461		for {
462			doc := bsonSource.LoadNext()
463			if doc == nil {
464				break
465			}
466			select {
467			case <-restore.termChan:
468				log.Logvf(log.Always, "terminating read on %v.%v", dbName, colName)
469				termErr = util.ErrTerminated
470				close(docChan)
471				return
472			default:
473				rawBytes := make([]byte, len(doc))
474				copy(rawBytes, doc)
475				docChan <- bson.Raw(rawBytes)
476				documentCount++
477			}
478		}
479		close(docChan)
480	}()
481
482	log.Logvf(log.DebugLow, "using %v insertion workers", maxInsertWorkers)
483
484	for i := 0; i < maxInsertWorkers; i++ {
485		go func() {
486			var result Result
487
488			bulk := db.NewUnorderedBufferedBulkInserter(collection, restore.OutputOptions.BulkBufferSize).
489				SetOrdered(restore.OutputOptions.MaintainInsertionOrder)
490			bulk.SetBypassDocumentValidation(restore.OutputOptions.BypassDocumentValidation)
491			for rawDoc := range docChan {
492				if restore.objCheck {
493					result.Err = bson.Unmarshal(rawDoc, &bson.D{})
494					if result.Err != nil {
495						resultChan <- result
496						return
497					}
498				}
499				result.combineWith(NewResultFromBulkResult(bulk.InsertRaw(rawDoc)))
500				result.Err = db.FilterError(restore.OutputOptions.StopOnError, result.Err)
501				if result.Err != nil {
502					resultChan <- result
503					return
504				}
505				watchProgressor.Set(file.Pos())
506			}
507			// flush the remaining docs
508			result.combineWith(NewResultFromBulkResult(bulk.Flush()))
509			resultChan <- result.withErr(db.FilterError(restore.OutputOptions.StopOnError, result.Err))
510			return
511		}()
512
513		// sleep to prevent all threads from inserting at the same time at start
514		time.Sleep(time.Duration(i) * 10 * time.Millisecond)
515	}
516
517	var totalResult Result
518	var finalErr error
519
520	// wait until all insert jobs finish
521	for done := 0; done < maxInsertWorkers; done++ {
522		totalResult.combineWith(<-resultChan)
523		if finalErr == nil && totalResult.Err != nil {
524			finalErr = totalResult.Err
525			close(restore.termChan)
526		}
527	}
528
529	if finalErr != nil {
530		totalResult.Err = finalErr
531	} else if err = bsonSource.Err(); err != nil {
532		totalResult.Err = fmt.Errorf("reading bson input: %v", err)
533	} else if termErr != nil {
534		totalResult.Err = termErr
535	}
536	return totalResult
537}
538