1// Copyright (C) 2014 The Syncthing Authors. 2// 3// This Source Code Form is subject to the terms of the Mozilla Public 4// License, v. 2.0. If a copy of the MPL was not distributed with this file, 5// You can obtain one at https://mozilla.org/MPL/2.0/. 6 7// Package db provides a set type to track local/remote files with newness 8// checks. We must do a certain amount of normalization in here. We will get 9// fed paths with either native or wire-format separators and encodings 10// depending on who calls us. We transform paths to wire-format (NFC and 11// slashes) on the way to the database, and transform to native format 12// (varying separator and encoding) on the way back out. 13package db 14 15import ( 16 "fmt" 17 18 "github.com/syncthing/syncthing/lib/db/backend" 19 "github.com/syncthing/syncthing/lib/fs" 20 "github.com/syncthing/syncthing/lib/osutil" 21 "github.com/syncthing/syncthing/lib/protocol" 22 "github.com/syncthing/syncthing/lib/sync" 23) 24 25type FileSet struct { 26 folder string 27 fs fs.Filesystem 28 db *Lowlevel 29 meta *metadataTracker 30 31 updateMutex sync.Mutex // protects database updates and the corresponding metadata changes 32} 33 34// The Iterator is called with either a protocol.FileInfo or a 35// FileInfoTruncated (depending on the method) and returns true to 36// continue iteration, false to stop. 37type Iterator func(f protocol.FileIntf) bool 38 39func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) (*FileSet, error) { 40 select { 41 case <-db.oneFileSetCreated: 42 default: 43 close(db.oneFileSetCreated) 44 } 45 meta, err := db.loadMetadataTracker(folder) 46 if err != nil { 47 db.handleFailure(err) 48 return nil, err 49 } 50 s := &FileSet{ 51 folder: folder, 52 fs: fs, 53 db: db, 54 meta: meta, 55 updateMutex: sync.NewMutex(), 56 } 57 if id := s.IndexID(protocol.LocalDeviceID); id == 0 { 58 // No index ID set yet. We create one now. 59 id = protocol.NewIndexID() 60 err := s.db.setIndexID(protocol.LocalDeviceID[:], []byte(s.folder), id) 61 if err != nil && !backend.IsClosed(err) { 62 fatalError(err, fmt.Sprintf("%s Creating new IndexID", s.folder), s.db) 63 } 64 } 65 return s, nil 66} 67 68func (s *FileSet) Drop(device protocol.DeviceID) { 69 opStr := fmt.Sprintf("%s Drop(%v)", s.folder, device) 70 l.Debugf(opStr) 71 72 s.updateMutex.Lock() 73 defer s.updateMutex.Unlock() 74 75 if err := s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta); backend.IsClosed(err) { 76 return 77 } else if err != nil { 78 fatalError(err, opStr, s.db) 79 } 80 81 if device == protocol.LocalDeviceID { 82 s.meta.resetCounts(device) 83 // We deliberately do not reset the sequence number here. Dropping 84 // all files for the local device ID only happens in testing - which 85 // expects the sequence to be retained, like an old Replace() of all 86 // files would do. However, if we ever did it "in production" we 87 // would anyway want to retain the sequence for delta indexes to be 88 // happy. 89 } else { 90 // Here, on the other hand, we want to make sure that any file 91 // announced from the remote is newer than our current sequence 92 // number. 93 s.meta.resetAll(device) 94 } 95 96 t, err := s.db.newReadWriteTransaction() 97 if backend.IsClosed(err) { 98 return 99 } else if err != nil { 100 fatalError(err, opStr, s.db) 101 } 102 defer t.close() 103 104 if err := s.meta.toDB(t, []byte(s.folder)); backend.IsClosed(err) { 105 return 106 } else if err != nil { 107 fatalError(err, opStr, s.db) 108 } 109 if err := t.Commit(); backend.IsClosed(err) { 110 return 111 } else if err != nil { 112 fatalError(err, opStr, s.db) 113 } 114} 115 116func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { 117 opStr := fmt.Sprintf("%s Update(%v, [%d])", s.folder, device, len(fs)) 118 l.Debugf(opStr) 119 120 // do not modify fs in place, it is still used in outer scope 121 fs = append([]protocol.FileInfo(nil), fs...) 122 123 // If one file info is present multiple times, only keep the last. 124 // Updating the same file multiple times is problematic, because the 125 // previous updates won't yet be represented in the db when we update it 126 // again. Additionally even if that problem was taken care of, it would 127 // be pointless because we remove the previously added file info again 128 // right away. 129 fs = normalizeFilenamesAndDropDuplicates(fs) 130 131 s.updateMutex.Lock() 132 defer s.updateMutex.Unlock() 133 134 if device == protocol.LocalDeviceID { 135 // For the local device we have a bunch of metadata to track. 136 if err := s.db.updateLocalFiles([]byte(s.folder), fs, s.meta); err != nil && !backend.IsClosed(err) { 137 fatalError(err, opStr, s.db) 138 } 139 return 140 } 141 // Easy case, just update the files and we're done. 142 if err := s.db.updateRemoteFiles([]byte(s.folder), device[:], fs, s.meta); err != nil && !backend.IsClosed(err) { 143 fatalError(err, opStr, s.db) 144 } 145} 146 147type Snapshot struct { 148 folder string 149 t readOnlyTransaction 150 meta *countsMap 151 fatalError func(error, string) 152} 153 154func (s *FileSet) Snapshot() (*Snapshot, error) { 155 opStr := fmt.Sprintf("%s Snapshot()", s.folder) 156 l.Debugf(opStr) 157 t, err := s.db.newReadOnlyTransaction() 158 if err != nil { 159 s.db.handleFailure(err) 160 return nil, err 161 } 162 return &Snapshot{ 163 folder: s.folder, 164 t: t, 165 meta: s.meta.Snapshot(), 166 fatalError: func(err error, opStr string) { 167 fatalError(err, opStr, s.db) 168 }, 169 }, nil 170} 171 172func (s *Snapshot) Release() { 173 s.t.close() 174} 175 176func (s *Snapshot) WithNeed(device protocol.DeviceID, fn Iterator) { 177 opStr := fmt.Sprintf("%s WithNeed(%v)", s.folder, device) 178 l.Debugf(opStr) 179 if err := s.t.withNeed([]byte(s.folder), device[:], false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 180 s.fatalError(err, opStr) 181 } 182} 183 184func (s *Snapshot) WithNeedTruncated(device protocol.DeviceID, fn Iterator) { 185 opStr := fmt.Sprintf("%s WithNeedTruncated(%v)", s.folder, device) 186 l.Debugf(opStr) 187 if err := s.t.withNeed([]byte(s.folder), device[:], true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 188 s.fatalError(err, opStr) 189 } 190} 191 192func (s *Snapshot) WithHave(device protocol.DeviceID, fn Iterator) { 193 opStr := fmt.Sprintf("%s WithHave(%v)", s.folder, device) 194 l.Debugf(opStr) 195 if err := s.t.withHave([]byte(s.folder), device[:], nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 196 s.fatalError(err, opStr) 197 } 198} 199 200func (s *Snapshot) WithHaveTruncated(device protocol.DeviceID, fn Iterator) { 201 opStr := fmt.Sprintf("%s WithHaveTruncated(%v)", s.folder, device) 202 l.Debugf(opStr) 203 if err := s.t.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 204 s.fatalError(err, opStr) 205 } 206} 207 208func (s *Snapshot) WithHaveSequence(startSeq int64, fn Iterator) { 209 opStr := fmt.Sprintf("%s WithHaveSequence(%v)", s.folder, startSeq) 210 l.Debugf(opStr) 211 if err := s.t.withHaveSequence([]byte(s.folder), startSeq, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 212 s.fatalError(err, opStr) 213 } 214} 215 216// Except for an item with a path equal to prefix, only children of prefix are iterated. 217// E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not. 218func (s *Snapshot) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) { 219 opStr := fmt.Sprintf(`%s WithPrefixedHaveTruncated(%v, "%v")`, s.folder, device, prefix) 220 l.Debugf(opStr) 221 if err := s.t.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 222 s.fatalError(err, opStr) 223 } 224} 225 226func (s *Snapshot) WithGlobal(fn Iterator) { 227 opStr := fmt.Sprintf("%s WithGlobal()", s.folder) 228 l.Debugf(opStr) 229 if err := s.t.withGlobal([]byte(s.folder), nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 230 s.fatalError(err, opStr) 231 } 232} 233 234func (s *Snapshot) WithGlobalTruncated(fn Iterator) { 235 opStr := fmt.Sprintf("%s WithGlobalTruncated()", s.folder) 236 l.Debugf(opStr) 237 if err := s.t.withGlobal([]byte(s.folder), nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 238 s.fatalError(err, opStr) 239 } 240} 241 242// Except for an item with a path equal to prefix, only children of prefix are iterated. 243// E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not. 244func (s *Snapshot) WithPrefixedGlobalTruncated(prefix string, fn Iterator) { 245 opStr := fmt.Sprintf(`%s WithPrefixedGlobalTruncated("%v")`, s.folder, prefix) 246 l.Debugf(opStr) 247 if err := s.t.withGlobal([]byte(s.folder), []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 248 s.fatalError(err, opStr) 249 } 250} 251 252func (s *Snapshot) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) { 253 opStr := fmt.Sprintf("%s Get(%v)", s.folder, file) 254 l.Debugf(opStr) 255 f, ok, err := s.t.getFile([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file))) 256 if backend.IsClosed(err) { 257 return protocol.FileInfo{}, false 258 } else if err != nil { 259 s.fatalError(err, opStr) 260 } 261 f.Name = osutil.NativeFilename(f.Name) 262 return f, ok 263} 264 265func (s *Snapshot) GetGlobal(file string) (protocol.FileInfo, bool) { 266 opStr := fmt.Sprintf("%s GetGlobal(%v)", s.folder, file) 267 l.Debugf(opStr) 268 _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), false) 269 if backend.IsClosed(err) { 270 return protocol.FileInfo{}, false 271 } else if err != nil { 272 s.fatalError(err, opStr) 273 } 274 if !ok { 275 return protocol.FileInfo{}, false 276 } 277 f := fi.(protocol.FileInfo) 278 f.Name = osutil.NativeFilename(f.Name) 279 return f, true 280} 281 282func (s *Snapshot) GetGlobalTruncated(file string) (FileInfoTruncated, bool) { 283 opStr := fmt.Sprintf("%s GetGlobalTruncated(%v)", s.folder, file) 284 l.Debugf(opStr) 285 _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), true) 286 if backend.IsClosed(err) { 287 return FileInfoTruncated{}, false 288 } else if err != nil { 289 s.fatalError(err, opStr) 290 } 291 if !ok { 292 return FileInfoTruncated{}, false 293 } 294 f := fi.(FileInfoTruncated) 295 f.Name = osutil.NativeFilename(f.Name) 296 return f, true 297} 298 299func (s *Snapshot) Availability(file string) []protocol.DeviceID { 300 opStr := fmt.Sprintf("%s Availability(%v)", s.folder, file) 301 l.Debugf(opStr) 302 av, err := s.t.availability([]byte(s.folder), []byte(osutil.NormalizedFilename(file))) 303 if backend.IsClosed(err) { 304 return nil 305 } else if err != nil { 306 s.fatalError(err, opStr) 307 } 308 return av 309} 310 311func (s *Snapshot) DebugGlobalVersions(file string) VersionList { 312 opStr := fmt.Sprintf("%s DebugGlobalVersions(%v)", s.folder, file) 313 l.Debugf(opStr) 314 vl, err := s.t.getGlobalVersions(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file))) 315 if backend.IsClosed(err) || backend.IsNotFound(err) { 316 return VersionList{} 317 } else if err != nil { 318 s.fatalError(err, opStr) 319 } 320 return vl 321} 322 323func (s *Snapshot) Sequence(device protocol.DeviceID) int64 { 324 return s.meta.Counts(device, 0).Sequence 325} 326 327// RemoteSequence returns the change version for the given folder, as 328// sent by remote peers. This is guaranteed to increment if the contents of 329// the remote or global folder has changed. 330func (s *Snapshot) RemoteSequence() int64 { 331 var ver int64 332 333 for _, device := range s.meta.devices() { 334 ver += s.Sequence(device) 335 } 336 337 return ver 338} 339 340func (s *Snapshot) LocalSize() Counts { 341 local := s.meta.Counts(protocol.LocalDeviceID, 0) 342 return local.Add(s.ReceiveOnlyChangedSize()) 343} 344 345func (s *Snapshot) ReceiveOnlyChangedSize() Counts { 346 return s.meta.Counts(protocol.LocalDeviceID, protocol.FlagLocalReceiveOnly) 347} 348 349func (s *Snapshot) GlobalSize() Counts { 350 return s.meta.Counts(protocol.GlobalDeviceID, 0) 351} 352 353func (s *Snapshot) NeedSize(device protocol.DeviceID) Counts { 354 return s.meta.Counts(device, needFlag) 355} 356 357func (s *Snapshot) WithBlocksHash(hash []byte, fn Iterator) { 358 opStr := fmt.Sprintf(`%s WithBlocksHash("%x")`, s.folder, hash) 359 l.Debugf(opStr) 360 if err := s.t.withBlocksHash([]byte(s.folder), hash, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) { 361 s.fatalError(err, opStr) 362 } 363} 364 365func (s *FileSet) Sequence(device protocol.DeviceID) int64 { 366 return s.meta.Sequence(device) 367} 368 369func (s *FileSet) IndexID(device protocol.DeviceID) protocol.IndexID { 370 opStr := fmt.Sprintf("%s IndexID(%v)", s.folder, device) 371 l.Debugf(opStr) 372 id, err := s.db.getIndexID(device[:], []byte(s.folder)) 373 if backend.IsClosed(err) { 374 return 0 375 } else if err != nil { 376 fatalError(err, opStr, s.db) 377 } 378 return id 379} 380 381func (s *FileSet) SetIndexID(device protocol.DeviceID, id protocol.IndexID) { 382 if device == protocol.LocalDeviceID { 383 panic("do not explicitly set index ID for local device") 384 } 385 opStr := fmt.Sprintf("%s SetIndexID(%v, %v)", s.folder, device, id) 386 l.Debugf(opStr) 387 if err := s.db.setIndexID(device[:], []byte(s.folder), id); err != nil && !backend.IsClosed(err) { 388 fatalError(err, opStr, s.db) 389 } 390} 391 392func (s *FileSet) MtimeFS() fs.Filesystem { 393 opStr := fmt.Sprintf("%s MtimeFS()", s.folder) 394 l.Debugf(opStr) 395 prefix, err := s.db.keyer.GenerateMtimesKey(nil, []byte(s.folder)) 396 if backend.IsClosed(err) { 397 return nil 398 } else if err != nil { 399 fatalError(err, opStr, s.db) 400 } 401 kv := NewNamespacedKV(s.db, string(prefix)) 402 return fs.NewMtimeFS(s.fs, kv) 403} 404 405func (s *FileSet) ListDevices() []protocol.DeviceID { 406 return s.meta.devices() 407} 408 409func (s *FileSet) RepairSequence() (int, error) { 410 s.updateAndGCMutexLock() // Ensures consistent locking order 411 defer s.updateMutex.Unlock() 412 defer s.db.gcMut.RUnlock() 413 return s.db.repairSequenceGCLocked(s.folder, s.meta) 414} 415 416func (s *FileSet) updateAndGCMutexLock() { 417 s.updateMutex.Lock() 418 s.db.gcMut.RLock() 419} 420 421// DropFolder clears out all information related to the given folder from the 422// database. 423func DropFolder(db *Lowlevel, folder string) { 424 opStr := fmt.Sprintf("DropFolder(%v)", folder) 425 l.Debugf(opStr) 426 droppers := []func([]byte) error{ 427 db.dropFolder, 428 db.dropMtimes, 429 db.dropFolderMeta, 430 db.dropFolderIndexIDs, 431 db.folderIdx.Delete, 432 } 433 for _, drop := range droppers { 434 if err := drop([]byte(folder)); backend.IsClosed(err) { 435 return 436 } else if err != nil { 437 fatalError(err, opStr, db) 438 } 439 } 440} 441 442// DropDeltaIndexIDs removes all delta index IDs from the database. 443// This will cause a full index transmission on the next connection. 444// Must be called before using FileSets, i.e. before NewFileSet is called for 445// the first time. 446func DropDeltaIndexIDs(db *Lowlevel) { 447 select { 448 case <-db.oneFileSetCreated: 449 panic("DropDeltaIndexIDs must not be called after NewFileSet for the same Lowlevel") 450 default: 451 } 452 opStr := "DropDeltaIndexIDs" 453 l.Debugf(opStr) 454 err := db.dropIndexIDs() 455 if backend.IsClosed(err) { 456 return 457 } else if err != nil { 458 fatalError(err, opStr, db) 459 } 460} 461 462func normalizeFilenamesAndDropDuplicates(fs []protocol.FileInfo) []protocol.FileInfo { 463 positions := make(map[string]int, len(fs)) 464 for i, f := range fs { 465 norm := osutil.NormalizedFilename(f.Name) 466 if pos, ok := positions[norm]; ok { 467 fs[pos] = protocol.FileInfo{} 468 } 469 positions[norm] = i 470 fs[i].Name = norm 471 } 472 for i := 0; i < len(fs); { 473 if fs[i].Name == "" { 474 fs = append(fs[:i], fs[i+1:]...) 475 continue 476 } 477 i++ 478 } 479 return fs 480} 481 482func nativeFileIterator(fn Iterator) Iterator { 483 return func(fi protocol.FileIntf) bool { 484 switch f := fi.(type) { 485 case protocol.FileInfo: 486 f.Name = osutil.NativeFilename(f.Name) 487 return fn(f) 488 case FileInfoTruncated: 489 f.Name = osutil.NativeFilename(f.Name) 490 return fn(f) 491 default: 492 panic("unknown interface type") 493 } 494 } 495} 496 497func fatalError(err error, opStr string, db *Lowlevel) { 498 db.checkErrorForRepair(err) 499 l.Warnf("Fatal error: %v: %v", opStr, err) 500 panic(ldbPathRe.ReplaceAllString(err.Error(), "$1 x: ")) 501} 502