1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7// Package intents provides utilities for performing dump/restore operations. 8package intents 9 10import ( 11 "fmt" 12 "io" 13 14 "github.com/mongodb/mongo-tools-common/log" 15 "github.com/mongodb/mongo-tools-common/util" 16 "go.mongodb.org/mongo-driver/bson" 17) 18 19type file interface { 20 io.ReadWriteCloser 21 Open() error 22 Pos() int64 23} 24 25// DestinationConflictError occurs when multiple namespaces map to the same 26// destination. 27type DestinationConflictError struct { 28 Src, Dst string 29} 30 31func (e DestinationConflictError) Error() string { 32 return fmt.Sprintf("destination conflict: %s (src) => %s (dst)", e.Src, e.Dst) 33} 34 35// FileNeedsIOBuffer is an interface that denotes that a struct needs 36// an IO buffer that is managed by an outside control. This interface 37// is used to both hand off a buffer to a struct and signal that it should 38// release its buffer. Added to reduce memory usage as outlined in TOOLS-1088. 39type FileNeedsIOBuffer interface { 40 TakeIOBuffer([]byte) 41 ReleaseIOBuffer() 42} 43 44// mongorestore first scans the directory to generate a list 45// of all files to restore and what they map to. TODO comments 46type Intent struct { 47 // Destination namespace info 48 DB string 49 C string 50 51 // File locations as absolute paths 52 BSONFile file 53 BSONSize int64 54 MetadataFile file 55 56 // Indicates where the intent will be read from or written to 57 Location string 58 MetadataLocation string 59 60 // Collection options 61 Options bson.M 62 63 // UUID (for MongoDB 3.6+) as a big-endian hex string 64 UUID string 65 66 // File/collection size, for some prioritizer implementations. 67 // Units don't matter as long as they are consistent for a given use case. 68 Size int64 69} 70 71func (it *Intent) Namespace() string { 72 return it.DB + "." + it.C 73} 74 75func (it *Intent) IsOplog() bool { 76 if it.DB == "" && it.C == "oplog" { 77 return true 78 } 79 return it.DB == "local" && (it.C == "oplog.rs" || it.C == "oplog.$main") 80} 81 82func (it *Intent) IsUsers() bool { 83 if it.C == "$admin.system.users" { 84 return true 85 } 86 if it.DB == "admin" && it.C == "system.users" { 87 return true 88 } 89 return false 90} 91 92func (it *Intent) IsRoles() bool { 93 if it.C == "$admin.system.roles" { 94 return true 95 } 96 if it.DB == "admin" && it.C == "system.roles" { 97 return true 98 } 99 return false 100} 101 102func (it *Intent) IsAuthVersion() bool { 103 if it.C == "$admin.system.version" { 104 return true 105 } 106 if it.DB == "admin" && it.C == "system.version" { 107 return true 108 } 109 return false 110} 111 112func (it *Intent) IsSystemIndexes() bool { 113 return it.C == "system.indexes" 114} 115 116func (it *Intent) IsSystemProfile() bool { 117 return it.C == "system.profile" 118} 119 120func (it *Intent) IsSpecialCollection() bool { 121 // can't see oplog as special collection because when restore from archive it need to be a RegularCollectionReceiver 122 return it.IsSystemIndexes() || it.IsUsers() || it.IsRoles() || it.IsAuthVersion() || it.IsSystemProfile() 123} 124 125func (it *Intent) IsView() bool { 126 if it.Options == nil { 127 return false 128 } 129 _, isView := it.Options["viewOn"] 130 return isView 131} 132 133func (it *Intent) MergeIntent(newIt *Intent) { 134 // merge new intent into old intent 135 if it.BSONFile == nil { 136 it.BSONFile = newIt.BSONFile 137 } 138 if it.Size == 0 { 139 it.Size = newIt.Size 140 } 141 if it.Location == "" { 142 it.Location = newIt.Location 143 } 144 if it.MetadataFile == nil { 145 it.MetadataFile = newIt.MetadataFile 146 } 147 if it.MetadataLocation == "" { 148 it.MetadataLocation = newIt.MetadataLocation 149 } 150 151} 152 153type Manager struct { 154 // intents are for all of the regular user created collections 155 intents map[string]*Intent 156 // special intents are for all of the collections that are created by mongod 157 // and require special handling 158 specialIntents map[string]*Intent 159 160 // legacy mongorestore works in the order that paths are discovered, 161 // so we need an ordered data structure to preserve this behavior. 162 intentsByDiscoveryOrder []*Intent 163 164 // we need different scheduling order depending on the target 165 // mongod/mongos and whether or not we are multi threading; 166 // the IntentPrioritizer interface encapsulates this. 167 prioritizer IntentPrioritizer 168 169 // special cases that should be saved but not be part of the queue. 170 // used to deal with oplog and user/roles restoration, which are 171 // handled outside of the basic logic of the tool 172 oplogIntent *Intent 173 usersIntent *Intent 174 rolesIntent *Intent 175 versionIntent *Intent 176 indexIntents map[string]*Intent 177 178 // Tells the manager if it should choose a single oplog when multiple are provided. 179 smartPickOplog bool 180 181 // Indicates if an the manager has seen two conflicting oplogs. 182 oplogConflict bool 183 184 // prevent conflicting destinations by checking which sources map to the 185 // same namespace 186 destinations map[string][]string 187} 188 189func NewIntentManager() *Manager { 190 return &Manager{ 191 intents: map[string]*Intent{}, 192 specialIntents: map[string]*Intent{}, 193 intentsByDiscoveryOrder: []*Intent{}, 194 indexIntents: map[string]*Intent{}, 195 smartPickOplog: false, 196 oplogConflict: false, 197 destinations: map[string][]string{}, 198 } 199} 200 201func (mgr *Manager) SetSmartPickOplog(smartPick bool) { 202 mgr.smartPickOplog = smartPick 203} 204 205// HasConfigDBIntent returns a bool indicating if any of the intents refer to the "config" database. 206// This can be used to check for possible unwanted conflicts before restoring to a sharded system. 207func (mgr *Manager) HasConfigDBIntent() bool { 208 for _, intent := range mgr.intentsByDiscoveryOrder { 209 if intent.DB == "config" { 210 return true 211 } 212 } 213 return false 214} 215 216// PutOplogIntent takes an intent for an oplog and stores it in the intent manager with the 217// provided key. If the manager has smartPickOplog enabled, then it uses a priority system 218// to determine which oplog intent to maintain as the actual oplog. 219func (mgr *Manager) PutOplogIntent(intent *Intent, managerKey string) { 220 if mgr.smartPickOplog { 221 if existing := mgr.specialIntents[managerKey]; existing != nil { 222 existing.MergeIntent(intent) 223 return 224 } 225 if mgr.oplogIntent == nil { 226 // If there is no oplog intent, make this one the oplog. 227 mgr.oplogIntent = intent 228 mgr.specialIntents[managerKey] = intent 229 } else if intent.DB == "" { 230 // We already have an oplog and this is a top priority oplog. 231 if mgr.oplogIntent.DB == "" { 232 // If the manager's current oplog is also top priority, we have a 233 // conflict and ignore this oplog. 234 mgr.oplogConflict = true 235 } else { 236 // If the manager's current oplog is lower priority, replace it and 237 // move that one to be a normal intent. 238 mgr.putNormalIntent(mgr.oplogIntent) 239 delete(mgr.specialIntents, mgr.oplogIntent.Namespace()) 240 mgr.oplogIntent = intent 241 mgr.specialIntents[managerKey] = intent 242 } 243 } else { 244 // We already have an oplog and this is a low priority oplog. 245 if mgr.oplogIntent.DB != "" { 246 // If the manager's current oplog is also low priority, set a conflict. 247 mgr.oplogConflict = true 248 } 249 // No matter what, set this lower priority oplog to be a normal intent. 250 mgr.putNormalIntent(intent) 251 } 252 } else { 253 if intent.DB == "" && intent.C == "oplog" { 254 // If this is a normal oplog, then add it as an oplog intent. 255 if existing := mgr.specialIntents[managerKey]; existing != nil { 256 existing.MergeIntent(intent) 257 return 258 } 259 mgr.oplogIntent = intent 260 mgr.specialIntents[managerKey] = intent 261 } else { 262 mgr.putNormalIntent(intent) 263 } 264 } 265} 266 267func (mgr *Manager) putNormalIntent(intent *Intent) { 268 mgr.putNormalIntentWithNamespace(intent.Namespace(), intent) 269} 270 271func (mgr *Manager) putNormalIntentWithNamespace(ns string, intent *Intent) { 272 // BSON and metadata files for the same collection are merged 273 // into the same intent. This is done to allow for simple 274 // pairing of BSON + metadata without keeping track of the 275 // state of the filepath walker 276 if existing := mgr.intents[ns]; existing != nil { 277 if existing.Namespace() != intent.Namespace() { 278 // remove old destination, add new one 279 dst := existing.Namespace() 280 dsts := mgr.destinations[dst] 281 i := util.StringSliceIndex(dsts, ns) 282 mgr.destinations[dst] = append(dsts[:i], dsts[i+1:]...) 283 284 dsts = mgr.destinations[intent.Namespace()] 285 mgr.destinations[intent.Namespace()] = append(dsts, ns) 286 } 287 existing.MergeIntent(intent) 288 return 289 } 290 291 // if key doesn't already exist, add it to the manager 292 mgr.intents[ns] = intent 293 mgr.intentsByDiscoveryOrder = append(mgr.intentsByDiscoveryOrder, intent) 294 295 mgr.destinations[intent.Namespace()] = append(mgr.destinations[intent.Namespace()], ns) 296} 297 298// Put inserts an intent into the manager with the same source namespace as 299// its destinations. 300func (mgr *Manager) Put(intent *Intent) { 301 log.Logvf(log.DebugLow, "enqueued collection '%v'", intent.Namespace()) 302 mgr.PutWithNamespace(intent.Namespace(), intent) 303} 304 305// PutWithNamespace inserts an intent into the manager with the source set 306// to the provided namespace. Intents for the same collection are merged 307// together, so that BSON and metadata files for the same collection are 308// returned in the same intent. 309func (mgr *Manager) PutWithNamespace(ns string, intent *Intent) { 310 if intent == nil { 311 panic("cannot insert nil *Intent into IntentManager") 312 } 313 db, _ := util.SplitNamespace(ns) 314 315 // bucket special-case collections 316 if intent.IsOplog() { 317 mgr.PutOplogIntent(intent, intent.Namespace()) 318 return 319 } 320 if intent.IsSystemIndexes() { 321 if intent.BSONFile != nil { 322 mgr.indexIntents[db] = intent 323 mgr.specialIntents[ns] = intent 324 } 325 return 326 } 327 if intent.IsUsers() { 328 if intent.BSONFile != nil { 329 mgr.usersIntent = intent 330 mgr.specialIntents[ns] = intent 331 } 332 return 333 } 334 if intent.IsRoles() { 335 if intent.BSONFile != nil { 336 mgr.rolesIntent = intent 337 mgr.specialIntents[ns] = intent 338 } 339 return 340 } 341 if intent.IsAuthVersion() { 342 if intent.BSONFile != nil { 343 mgr.versionIntent = intent 344 mgr.specialIntents[ns] = intent 345 } 346 return 347 } 348 349 mgr.putNormalIntentWithNamespace(ns, intent) 350} 351 352func (mgr *Manager) GetOplogConflict() bool { 353 return mgr.oplogConflict 354} 355 356func (mgr *Manager) GetDestinationConflicts() (errs []DestinationConflictError) { 357 for dst, srcs := range mgr.destinations { 358 if len(srcs) <= 1 { 359 continue 360 } 361 for _, src := range srcs { 362 errs = append(errs, DestinationConflictError{Dst: dst, Src: src}) 363 } 364 } 365 return 366} 367 368// Intents returns a slice containing all of the intents in the manager. 369// Intents is not thread safe 370func (mgr *Manager) Intents() []*Intent { 371 allIntents := []*Intent{} 372 for _, intent := range mgr.intents { 373 allIntents = append(allIntents, intent) 374 } 375 for _, intent := range mgr.indexIntents { 376 allIntents = append(allIntents, intent) 377 } 378 if mgr.oplogIntent != nil { 379 allIntents = append(allIntents, mgr.oplogIntent) 380 } 381 if mgr.usersIntent != nil { 382 allIntents = append(allIntents, mgr.usersIntent) 383 } 384 if mgr.rolesIntent != nil { 385 allIntents = append(allIntents, mgr.rolesIntent) 386 } 387 if mgr.versionIntent != nil { 388 allIntents = append(allIntents, mgr.versionIntent) 389 } 390 return allIntents 391} 392 393func (mgr *Manager) IntentForNamespace(ns string) *Intent { 394 intent := mgr.intents[ns] 395 if intent != nil { 396 return intent 397 } 398 intent = mgr.specialIntents[ns] 399 return intent 400} 401 402// Pop returns the next available intent from the manager. If the manager is 403// empty, it returns nil. Pop is thread safe. 404func (mgr *Manager) Pop() *Intent { 405 return mgr.prioritizer.Get() 406} 407 408// Peek returns a copy of a stored intent from the manager without removing 409// the intent. This method is useful for edge cases that need to look ahead 410// at what collections are in the manager before they are scheduled. 411// 412// NOTE: There are no guarantees that peek will return a usable 413// intent after Finalize() is called. 414func (mgr *Manager) Peek() *Intent { 415 if len(mgr.intentsByDiscoveryOrder) == 0 { 416 return nil 417 } 418 intentCopy := *mgr.intentsByDiscoveryOrder[0] 419 return &intentCopy 420} 421 422// Finish tells the prioritizer that mongorestore is done restoring 423// the given collection intent. 424func (mgr *Manager) Finish(intent *Intent) { 425 mgr.prioritizer.Finish(intent) 426} 427 428// Oplog returns the intent representing the oplog, which isn't 429// stored with the other intents, because it is dumped and restored in 430// a very different way from other collections. 431func (mgr *Manager) Oplog() *Intent { 432 return mgr.oplogIntent 433} 434 435// SystemIndexes returns the system.indexes bson for a database 436func (mgr *Manager) SystemIndexes(dbName string) *Intent { 437 return mgr.indexIntents[dbName] 438} 439 440// SystemIndexes returns the databases for which there are system.indexes 441func (mgr *Manager) SystemIndexDBs() []string { 442 databases := []string{} 443 for dbname := range mgr.indexIntents { 444 databases = append(databases, dbname) 445 } 446 return databases 447} 448 449// Users returns the intent of the users collection to restore, a special case 450func (mgr *Manager) Users() *Intent { 451 return mgr.usersIntent 452} 453 454// Roles returns the intent of the user roles collection to restore, a special case 455func (mgr *Manager) Roles() *Intent { 456 return mgr.rolesIntent 457} 458 459// AuthVersion returns the intent of the version collection to restore, a special case 460func (mgr *Manager) AuthVersion() *Intent { 461 return mgr.versionIntent 462} 463 464// Finalize processes the intents for prioritization. Currently only two 465// kinds of prioritizers are supported. No more "Put" operations may be done 466// after finalize is called. 467func (mgr *Manager) Finalize(pType PriorityType) { 468 switch pType { 469 case Legacy: 470 log.Logv(log.DebugHigh, "finalizing intent manager with legacy prioritizer") 471 mgr.prioritizer = newLegacyPrioritizer(mgr.intentsByDiscoveryOrder) 472 case LongestTaskFirst: 473 log.Logv(log.DebugHigh, "finalizing intent manager with longest task first prioritizer") 474 mgr.prioritizer = newLongestTaskFirstPrioritizer(mgr.intentsByDiscoveryOrder) 475 case MultiDatabaseLTF: 476 log.Logv(log.DebugHigh, "finalizing intent manager with multi-database longest task first prioritizer") 477 mgr.prioritizer = newMultiDatabaseLTFPrioritizer(mgr.intentsByDiscoveryOrder) 478 default: 479 panic("cannot initialize IntentPrioritizer with unknown type") 480 } 481 // release these for the garbage collector and to ensure code correctness 482 mgr.intents = nil 483 mgr.intentsByDiscoveryOrder = nil 484} 485 486func (mgr *Manager) UsePrioritizer(prioritizer IntentPrioritizer) { 487 mgr.prioritizer = prioritizer 488} 489