1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7// Package intents provides utilities for performing dump/restore operations.
8package intents
9
10import (
11	"fmt"
12	"io"
13
14	"github.com/mongodb/mongo-tools-common/log"
15	"github.com/mongodb/mongo-tools-common/util"
16	"go.mongodb.org/mongo-driver/bson"
17)
18
19type file interface {
20	io.ReadWriteCloser
21	Open() error
22	Pos() int64
23}
24
25// DestinationConflictError occurs when multiple namespaces map to the same
26// destination.
27type DestinationConflictError struct {
28	Src, Dst string
29}
30
31func (e DestinationConflictError) Error() string {
32	return fmt.Sprintf("destination conflict: %s (src) => %s (dst)", e.Src, e.Dst)
33}
34
35// FileNeedsIOBuffer is an interface that denotes that a struct needs
36// an IO buffer that is managed by an outside control. This interface
37// is used to both hand off a buffer to a struct and signal that it should
38// release its buffer. Added to reduce memory usage as outlined in TOOLS-1088.
39type FileNeedsIOBuffer interface {
40	TakeIOBuffer([]byte)
41	ReleaseIOBuffer()
42}
43
44// mongorestore first scans the directory to generate a list
45// of all files to restore and what they map to. TODO comments
46type Intent struct {
47	// Destination namespace info
48	DB string
49	C  string
50
51	// File locations as absolute paths
52	BSONFile     file
53	BSONSize     int64
54	MetadataFile file
55
56	// Indicates where the intent will be read from or written to
57	Location         string
58	MetadataLocation string
59
60	// Collection options
61	Options bson.M
62
63	// UUID (for MongoDB 3.6+) as a big-endian hex string
64	UUID string
65
66	// File/collection size, for some prioritizer implementations.
67	// Units don't matter as long as they are consistent for a given use case.
68	Size int64
69}
70
71func (it *Intent) Namespace() string {
72	return it.DB + "." + it.C
73}
74
75func (it *Intent) IsOplog() bool {
76	if it.DB == "" && it.C == "oplog" {
77		return true
78	}
79	return it.DB == "local" && (it.C == "oplog.rs" || it.C == "oplog.$main")
80}
81
82func (it *Intent) IsUsers() bool {
83	if it.C == "$admin.system.users" {
84		return true
85	}
86	if it.DB == "admin" && it.C == "system.users" {
87		return true
88	}
89	return false
90}
91
92func (it *Intent) IsRoles() bool {
93	if it.C == "$admin.system.roles" {
94		return true
95	}
96	if it.DB == "admin" && it.C == "system.roles" {
97		return true
98	}
99	return false
100}
101
102func (it *Intent) IsAuthVersion() bool {
103	if it.C == "$admin.system.version" {
104		return true
105	}
106	if it.DB == "admin" && it.C == "system.version" {
107		return true
108	}
109	return false
110}
111
112func (it *Intent) IsSystemIndexes() bool {
113	return it.C == "system.indexes"
114}
115
116func (it *Intent) IsSystemProfile() bool {
117	return it.C == "system.profile"
118}
119
120func (it *Intent) IsSpecialCollection() bool {
121	// can't see oplog as special collection because when restore from archive it need to be a RegularCollectionReceiver
122	return it.IsSystemIndexes() || it.IsUsers() || it.IsRoles() || it.IsAuthVersion() || it.IsSystemProfile()
123}
124
125func (it *Intent) IsView() bool {
126	if it.Options == nil {
127		return false
128	}
129	_, isView := it.Options["viewOn"]
130	return isView
131}
132
133func (it *Intent) MergeIntent(newIt *Intent) {
134	// merge new intent into old intent
135	if it.BSONFile == nil {
136		it.BSONFile = newIt.BSONFile
137	}
138	if it.Size == 0 {
139		it.Size = newIt.Size
140	}
141	if it.Location == "" {
142		it.Location = newIt.Location
143	}
144	if it.MetadataFile == nil {
145		it.MetadataFile = newIt.MetadataFile
146	}
147	if it.MetadataLocation == "" {
148		it.MetadataLocation = newIt.MetadataLocation
149	}
150
151}
152
153type Manager struct {
154	// intents are for all of the regular user created collections
155	intents map[string]*Intent
156	// special intents are for all of the collections that are created by mongod
157	// and require special handling
158	specialIntents map[string]*Intent
159
160	// legacy mongorestore works in the order that paths are discovered,
161	// so we need an ordered data structure to preserve this behavior.
162	intentsByDiscoveryOrder []*Intent
163
164	// we need different scheduling order depending on the target
165	// mongod/mongos and whether or not we are multi threading;
166	// the IntentPrioritizer interface encapsulates this.
167	prioritizer IntentPrioritizer
168
169	// special cases that should be saved but not be part of the queue.
170	// used to deal with oplog and user/roles restoration, which are
171	// handled outside of the basic logic of the tool
172	oplogIntent   *Intent
173	usersIntent   *Intent
174	rolesIntent   *Intent
175	versionIntent *Intent
176	indexIntents  map[string]*Intent
177
178	// Tells the manager if it should choose a single oplog when multiple are provided.
179	smartPickOplog bool
180
181	// Indicates if an the manager has seen two conflicting oplogs.
182	oplogConflict bool
183
184	// prevent conflicting destinations by checking which sources map to the
185	// same namespace
186	destinations map[string][]string
187}
188
189func NewIntentManager() *Manager {
190	return &Manager{
191		intents:                 map[string]*Intent{},
192		specialIntents:          map[string]*Intent{},
193		intentsByDiscoveryOrder: []*Intent{},
194		indexIntents:            map[string]*Intent{},
195		smartPickOplog:          false,
196		oplogConflict:           false,
197		destinations:            map[string][]string{},
198	}
199}
200
201func (mgr *Manager) SetSmartPickOplog(smartPick bool) {
202	mgr.smartPickOplog = smartPick
203}
204
205// HasConfigDBIntent returns a bool indicating if any of the intents refer to the "config" database.
206// This can be used to check for possible unwanted conflicts before restoring to a sharded system.
207func (mgr *Manager) HasConfigDBIntent() bool {
208	for _, intent := range mgr.intentsByDiscoveryOrder {
209		if intent.DB == "config" {
210			return true
211		}
212	}
213	return false
214}
215
216// PutOplogIntent takes an intent for an oplog and stores it in the intent manager with the
217// provided key. If the manager has smartPickOplog enabled, then it uses a priority system
218// to determine which oplog intent to maintain as the actual oplog.
219func (mgr *Manager) PutOplogIntent(intent *Intent, managerKey string) {
220	if mgr.smartPickOplog {
221		if existing := mgr.specialIntents[managerKey]; existing != nil {
222			existing.MergeIntent(intent)
223			return
224		}
225		if mgr.oplogIntent == nil {
226			// If there is no oplog intent, make this one the oplog.
227			mgr.oplogIntent = intent
228			mgr.specialIntents[managerKey] = intent
229		} else if intent.DB == "" {
230			// We already have an oplog and this is a top priority oplog.
231			if mgr.oplogIntent.DB == "" {
232				// If the manager's current oplog is also top priority, we have a
233				// conflict and ignore this oplog.
234				mgr.oplogConflict = true
235			} else {
236				// If the manager's current oplog is lower priority, replace it and
237				// move that one to be a normal intent.
238				mgr.putNormalIntent(mgr.oplogIntent)
239				delete(mgr.specialIntents, mgr.oplogIntent.Namespace())
240				mgr.oplogIntent = intent
241				mgr.specialIntents[managerKey] = intent
242			}
243		} else {
244			// We already have an oplog and this is a low priority oplog.
245			if mgr.oplogIntent.DB != "" {
246				// If the manager's current oplog is also low priority, set a conflict.
247				mgr.oplogConflict = true
248			}
249			// No matter what, set this lower priority oplog to be a normal intent.
250			mgr.putNormalIntent(intent)
251		}
252	} else {
253		if intent.DB == "" && intent.C == "oplog" {
254			// If this is a normal oplog, then add it as an oplog intent.
255			if existing := mgr.specialIntents[managerKey]; existing != nil {
256				existing.MergeIntent(intent)
257				return
258			}
259			mgr.oplogIntent = intent
260			mgr.specialIntents[managerKey] = intent
261		} else {
262			mgr.putNormalIntent(intent)
263		}
264	}
265}
266
267func (mgr *Manager) putNormalIntent(intent *Intent) {
268	mgr.putNormalIntentWithNamespace(intent.Namespace(), intent)
269}
270
271func (mgr *Manager) putNormalIntentWithNamespace(ns string, intent *Intent) {
272	// BSON and metadata files for the same collection are merged
273	// into the same intent. This is done to allow for simple
274	// pairing of BSON + metadata without keeping track of the
275	// state of the filepath walker
276	if existing := mgr.intents[ns]; existing != nil {
277		if existing.Namespace() != intent.Namespace() {
278			// remove old destination, add new one
279			dst := existing.Namespace()
280			dsts := mgr.destinations[dst]
281			i := util.StringSliceIndex(dsts, ns)
282			mgr.destinations[dst] = append(dsts[:i], dsts[i+1:]...)
283
284			dsts = mgr.destinations[intent.Namespace()]
285			mgr.destinations[intent.Namespace()] = append(dsts, ns)
286		}
287		existing.MergeIntent(intent)
288		return
289	}
290
291	// if key doesn't already exist, add it to the manager
292	mgr.intents[ns] = intent
293	mgr.intentsByDiscoveryOrder = append(mgr.intentsByDiscoveryOrder, intent)
294
295	mgr.destinations[intent.Namespace()] = append(mgr.destinations[intent.Namespace()], ns)
296}
297
298// Put inserts an intent into the manager with the same source namespace as
299// its destinations.
300func (mgr *Manager) Put(intent *Intent) {
301	log.Logvf(log.DebugLow, "enqueued collection '%v'", intent.Namespace())
302	mgr.PutWithNamespace(intent.Namespace(), intent)
303}
304
305// PutWithNamespace inserts an intent into the manager with the source set
306// to the provided namespace. Intents for the same collection are merged
307// together, so that BSON and metadata files for the same collection are
308// returned in the same intent.
309func (mgr *Manager) PutWithNamespace(ns string, intent *Intent) {
310	if intent == nil {
311		panic("cannot insert nil *Intent into IntentManager")
312	}
313	db, _ := util.SplitNamespace(ns)
314
315	// bucket special-case collections
316	if intent.IsOplog() {
317		mgr.PutOplogIntent(intent, intent.Namespace())
318		return
319	}
320	if intent.IsSystemIndexes() {
321		if intent.BSONFile != nil {
322			mgr.indexIntents[db] = intent
323			mgr.specialIntents[ns] = intent
324		}
325		return
326	}
327	if intent.IsUsers() {
328		if intent.BSONFile != nil {
329			mgr.usersIntent = intent
330			mgr.specialIntents[ns] = intent
331		}
332		return
333	}
334	if intent.IsRoles() {
335		if intent.BSONFile != nil {
336			mgr.rolesIntent = intent
337			mgr.specialIntents[ns] = intent
338		}
339		return
340	}
341	if intent.IsAuthVersion() {
342		if intent.BSONFile != nil {
343			mgr.versionIntent = intent
344			mgr.specialIntents[ns] = intent
345		}
346		return
347	}
348
349	mgr.putNormalIntentWithNamespace(ns, intent)
350}
351
352func (mgr *Manager) GetOplogConflict() bool {
353	return mgr.oplogConflict
354}
355
356func (mgr *Manager) GetDestinationConflicts() (errs []DestinationConflictError) {
357	for dst, srcs := range mgr.destinations {
358		if len(srcs) <= 1 {
359			continue
360		}
361		for _, src := range srcs {
362			errs = append(errs, DestinationConflictError{Dst: dst, Src: src})
363		}
364	}
365	return
366}
367
368// Intents returns a slice containing all of the intents in the manager.
369// Intents is not thread safe
370func (mgr *Manager) Intents() []*Intent {
371	allIntents := []*Intent{}
372	for _, intent := range mgr.intents {
373		allIntents = append(allIntents, intent)
374	}
375	for _, intent := range mgr.indexIntents {
376		allIntents = append(allIntents, intent)
377	}
378	if mgr.oplogIntent != nil {
379		allIntents = append(allIntents, mgr.oplogIntent)
380	}
381	if mgr.usersIntent != nil {
382		allIntents = append(allIntents, mgr.usersIntent)
383	}
384	if mgr.rolesIntent != nil {
385		allIntents = append(allIntents, mgr.rolesIntent)
386	}
387	if mgr.versionIntent != nil {
388		allIntents = append(allIntents, mgr.versionIntent)
389	}
390	return allIntents
391}
392
393func (mgr *Manager) IntentForNamespace(ns string) *Intent {
394	intent := mgr.intents[ns]
395	if intent != nil {
396		return intent
397	}
398	intent = mgr.specialIntents[ns]
399	return intent
400}
401
402// Pop returns the next available intent from the manager. If the manager is
403// empty, it returns nil. Pop is thread safe.
404func (mgr *Manager) Pop() *Intent {
405	return mgr.prioritizer.Get()
406}
407
408// Peek returns a copy of a stored intent from the manager without removing
409// the intent. This method is useful for edge cases that need to look ahead
410// at what collections are in the manager before they are scheduled.
411//
412// NOTE: There are no guarantees that peek will return a usable
413// intent after Finalize() is called.
414func (mgr *Manager) Peek() *Intent {
415	if len(mgr.intentsByDiscoveryOrder) == 0 {
416		return nil
417	}
418	intentCopy := *mgr.intentsByDiscoveryOrder[0]
419	return &intentCopy
420}
421
422// Finish tells the prioritizer that mongorestore is done restoring
423// the given collection intent.
424func (mgr *Manager) Finish(intent *Intent) {
425	mgr.prioritizer.Finish(intent)
426}
427
428// Oplog returns the intent representing the oplog, which isn't
429// stored with the other intents, because it is dumped and restored in
430// a very different way from other collections.
431func (mgr *Manager) Oplog() *Intent {
432	return mgr.oplogIntent
433}
434
435// SystemIndexes returns the system.indexes bson for a database
436func (mgr *Manager) SystemIndexes(dbName string) *Intent {
437	return mgr.indexIntents[dbName]
438}
439
440// SystemIndexes returns the databases for which there are system.indexes
441func (mgr *Manager) SystemIndexDBs() []string {
442	databases := []string{}
443	for dbname := range mgr.indexIntents {
444		databases = append(databases, dbname)
445	}
446	return databases
447}
448
449// Users returns the intent of the users collection to restore, a special case
450func (mgr *Manager) Users() *Intent {
451	return mgr.usersIntent
452}
453
454// Roles returns the intent of the user roles collection to restore, a special case
455func (mgr *Manager) Roles() *Intent {
456	return mgr.rolesIntent
457}
458
459// AuthVersion returns the intent of the version collection to restore, a special case
460func (mgr *Manager) AuthVersion() *Intent {
461	return mgr.versionIntent
462}
463
464// Finalize processes the intents for prioritization. Currently only two
465// kinds of prioritizers are supported. No more "Put" operations may be done
466// after finalize is called.
467func (mgr *Manager) Finalize(pType PriorityType) {
468	switch pType {
469	case Legacy:
470		log.Logv(log.DebugHigh, "finalizing intent manager with legacy prioritizer")
471		mgr.prioritizer = newLegacyPrioritizer(mgr.intentsByDiscoveryOrder)
472	case LongestTaskFirst:
473		log.Logv(log.DebugHigh, "finalizing intent manager with longest task first prioritizer")
474		mgr.prioritizer = newLongestTaskFirstPrioritizer(mgr.intentsByDiscoveryOrder)
475	case MultiDatabaseLTF:
476		log.Logv(log.DebugHigh, "finalizing intent manager with multi-database longest task first prioritizer")
477		mgr.prioritizer = newMultiDatabaseLTFPrioritizer(mgr.intentsByDiscoveryOrder)
478	default:
479		panic("cannot initialize IntentPrioritizer with unknown type")
480	}
481	// release these for the garbage collector and to ensure code correctness
482	mgr.intents = nil
483	mgr.intentsByDiscoveryOrder = nil
484}
485
486func (mgr *Manager) UsePrioritizer(prioritizer IntentPrioritizer) {
487	mgr.prioritizer = prioritizer
488}
489