1// Copyright (C) 2020 The Syncthing Authors. 2// 3// This Source Code Form is subject to the terms of the Mozilla Public 4// License, v. 2.0. If a copy of the MPL was not distributed with this file, 5// You can obtain one at https://mozilla.org/MPL/2.0/. 6 7package model 8 9import ( 10 "context" 11 "fmt" 12 "sync" 13 "time" 14 15 "github.com/thejerf/suture/v4" 16 17 "github.com/syncthing/syncthing/lib/config" 18 "github.com/syncthing/syncthing/lib/db" 19 "github.com/syncthing/syncthing/lib/events" 20 "github.com/syncthing/syncthing/lib/protocol" 21 "github.com/syncthing/syncthing/lib/svcutil" 22) 23 24type indexHandler struct { 25 conn protocol.Connection 26 downloads *deviceDownloadState 27 folder string 28 folderIsReceiveEncrypted bool 29 prevSequence int64 30 evLogger events.Logger 31 token suture.ServiceToken 32 33 cond *sync.Cond 34 paused bool 35 fset *db.FileSet 36 runner service 37} 38 39func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler { 40 myIndexID := fset.IndexID(protocol.LocalDeviceID) 41 mySequence := fset.Sequence(protocol.LocalDeviceID) 42 var startSequence int64 43 44 // This is the other side's description of what it knows 45 // about us. Lets check to see if we can start sending index 46 // updates directly or need to send the index from start... 47 48 if startInfo.local.IndexID == myIndexID { 49 // They say they've seen our index ID before, so we can 50 // send a delta update only. 51 52 if startInfo.local.MaxSequence > mySequence { 53 // Safety check. They claim to have more or newer 54 // index data than we have - either we have lost 55 // index data, or reset the index without resetting 56 // the IndexID, or something else weird has 57 // happened. We send a full index to reset the 58 // situation. 59 l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.ID().Short(), folder.Description()) 60 startSequence = 0 61 } else { 62 l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.ID().Short(), folder.Description(), startInfo.local.MaxSequence) 63 startSequence = startInfo.local.MaxSequence 64 } 65 } else if startInfo.local.IndexID != 0 { 66 // They say they've seen an index ID from us, but it's 67 // not the right one. Either they are confused or we 68 // must have reset our database since last talking to 69 // them. We'll start with a full index transfer. 70 l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.ID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID) 71 startSequence = 0 72 } else { 73 l.Debugf("Device %v folder %s has no index ID for us", conn.ID().Short(), folder.Description()) 74 } 75 76 // This is the other side's description of themselves. We 77 // check to see that it matches the IndexID we have on file, 78 // otherwise we drop our old index data and expect to get a 79 // completely new set. 80 81 theirIndexID := fset.IndexID(conn.ID()) 82 if startInfo.remote.IndexID == 0 { 83 // They're not announcing an index ID. This means they 84 // do not support delta indexes and we should clear any 85 // information we have from them before accepting their 86 // index, which will presumably be a full index. 87 l.Debugf("Device %v folder %s does not announce an index ID", conn.ID().Short(), folder.Description()) 88 fset.Drop(conn.ID()) 89 } else if startInfo.remote.IndexID != theirIndexID { 90 // The index ID we have on file is not what they're 91 // announcing. They must have reset their database and 92 // will probably send us a full index. We drop any 93 // information we have and remember this new index ID 94 // instead. 95 l.Infof("Device %v folder %s has a new index ID (%v)", conn.ID().Short(), folder.Description(), startInfo.remote.IndexID) 96 fset.Drop(conn.ID()) 97 fset.SetIndexID(conn.ID(), startInfo.remote.IndexID) 98 } 99 100 return &indexHandler{ 101 conn: conn, 102 downloads: downloads, 103 folder: folder.ID, 104 folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted, 105 prevSequence: startSequence, 106 evLogger: evLogger, 107 108 fset: fset, 109 runner: runner, 110 cond: sync.NewCond(new(sync.Mutex)), 111 } 112} 113 114// waitForFileset waits for the handler to resume and fetches the current fileset. 115func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error) { 116 s.cond.L.Lock() 117 defer s.cond.L.Unlock() 118 119 for s.paused { 120 select { 121 case <-ctx.Done(): 122 return nil, ctx.Err() 123 default: 124 s.cond.Wait() 125 } 126 } 127 128 return s.fset, nil 129} 130 131func (s *indexHandler) Serve(ctx context.Context) (err error) { 132 l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence) 133 stop := make(chan struct{}) 134 135 defer func() { 136 err = svcutil.NoRestartErr(err) 137 l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err) 138 close(stop) 139 }() 140 141 // Broadcast the pause cond when the context quits 142 go func() { 143 select { 144 case <-ctx.Done(): 145 s.cond.Broadcast() 146 case <-stop: 147 } 148 }() 149 150 // We need to send one index, regardless of whether there is something to send or not 151 fset, err := s.waitForFileset(ctx) 152 if err != nil { 153 return err 154 } 155 err = s.sendIndexTo(ctx, fset) 156 157 // Subscribe to LocalIndexUpdated (we have new information to send) and 158 // DeviceDisconnected (it might be us who disconnected, so we should 159 // exit). 160 sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) 161 defer sub.Unsubscribe() 162 163 evChan := sub.C() 164 ticker := time.NewTicker(time.Minute) 165 defer ticker.Stop() 166 167 for err == nil { 168 fset, err = s.waitForFileset(ctx) 169 if err != nil { 170 return err 171 } 172 173 // While we have sent a sequence at least equal to the one 174 // currently in the database, wait for the local index to update. The 175 // local index may update for other folders than the one we are 176 // sending for. 177 if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { 178 select { 179 case <-ctx.Done(): 180 return ctx.Err() 181 case <-evChan: 182 case <-ticker.C: 183 } 184 continue 185 } 186 187 err = s.sendIndexTo(ctx, fset) 188 189 // Wait a short amount of time before entering the next loop. If there 190 // are continuous changes happening to the local index, this gives us 191 // time to batch them up a little. 192 select { 193 case <-ctx.Done(): 194 return ctx.Err() 195 case <-time.After(250 * time.Millisecond): 196 } 197 } 198 199 return err 200} 201 202// resume might be called because the folder was actually resumed, or just 203// because the folder config changed (and thus the runner and potentially fset). 204func (s *indexHandler) resume(fset *db.FileSet, runner service) { 205 s.cond.L.Lock() 206 s.paused = false 207 s.fset = fset 208 s.runner = runner 209 s.cond.Broadcast() 210 s.cond.L.Unlock() 211} 212 213func (s *indexHandler) pause() { 214 s.cond.L.Lock() 215 if s.paused { 216 s.evLogger.Log(events.Failure, "index handler got paused while already paused") 217 } 218 s.paused = true 219 s.fset = nil 220 s.runner = nil 221 s.cond.Broadcast() 222 s.cond.L.Unlock() 223} 224 225// sendIndexTo sends file infos with a sequence number higher than prevSequence and 226// returns the highest sent sequence number. 227func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error { 228 initial := s.prevSequence == 0 229 batch := db.NewFileInfoBatch(nil) 230 batch.SetFlushFunc(func(fs []protocol.FileInfo) error { 231 l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size()) 232 if initial { 233 initial = false 234 return s.conn.Index(ctx, s.folder, fs) 235 } 236 return s.conn.IndexUpdate(ctx, s.folder, fs) 237 }) 238 239 var err error 240 var f protocol.FileInfo 241 snap, err := fset.Snapshot() 242 if err != nil { 243 return svcutil.AsFatalErr(err, svcutil.ExitError) 244 } 245 defer snap.Release() 246 previousWasDelete := false 247 snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool { 248 // This is to make sure that renames (which is an add followed by a delete) land in the same batch. 249 // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that 250 // the batch ends with a non-delete, or that the last item in the batch is already a delete 251 if batch.Full() && (!fi.IsDeleted() || previousWasDelete) { 252 if err = batch.Flush(); err != nil { 253 return false 254 } 255 } 256 257 if shouldDebug() { 258 if fi.SequenceNo() < s.prevSequence+1 { 259 panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) 260 } 261 } 262 263 if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { 264 l.Warnln("Non-increasing sequence detected: Checking and repairing the db...") 265 // Abort this round of index sending - the next one will pick 266 // up from the last successful one with the repeaired db. 267 defer func() { 268 if fixed, dbErr := fset.RepairSequence(); dbErr != nil { 269 l.Warnln("Failed repairing sequence entries:", dbErr) 270 panic("Failed repairing sequence entries") 271 } else { 272 s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence") 273 l.Infof("Repaired %v sequence entries in database", fixed) 274 } 275 }() 276 return false 277 } 278 279 f = fi.(protocol.FileInfo) 280 281 // If this is a folder receiving encrypted files only, we 282 // mustn't ever send locally changed file infos. Those aren't 283 // encrypted and thus would be a protocol error at the remote. 284 if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() { 285 return true 286 } 287 288 f = prepareFileInfoForIndex(f) 289 290 previousWasDelete = f.IsDeleted() 291 292 batch.Append(f) 293 return true 294 }) 295 if err != nil { 296 return err 297 } 298 299 err = batch.Flush() 300 301 // True if there was nothing to be sent 302 if f.Sequence == 0 { 303 return err 304 } 305 306 s.prevSequence = f.Sequence 307 return err 308} 309 310func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error { 311 deviceID := s.conn.ID() 312 313 s.cond.L.Lock() 314 paused := s.paused 315 fset := s.fset 316 runner := s.runner 317 s.cond.L.Unlock() 318 319 if paused { 320 l.Infof("%v for paused folder %q", op, s.folder) 321 return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused) 322 } 323 324 defer runner.SchedulePull() 325 326 s.downloads.Update(s.folder, makeForgetUpdate(fs)) 327 328 if !update { 329 fset.Drop(deviceID) 330 } 331 for i := range fs { 332 // The local attributes should never be transmitted over the wire. 333 // Make sure they look like they weren't. 334 fs[i].LocalFlags = 0 335 fs[i].VersionHash = nil 336 } 337 fset.Update(deviceID, fs) 338 339 seq := fset.Sequence(deviceID) 340 s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{ 341 "device": deviceID.String(), 342 "folder": s.folder, 343 "items": len(fs), 344 "sequence": seq, 345 "version": seq, // legacy for sequence 346 }) 347 348 return nil 349} 350 351func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo { 352 // Mark the file as invalid if any of the local bad stuff flags are set. 353 f.RawInvalid = f.IsInvalid() 354 // If the file is marked LocalReceive (i.e., changed locally on a 355 // receive only folder) we do not want it to ever become the 356 // globally best version, invalid or not. 357 if f.IsReceiveOnlyChanged() { 358 f.Version = protocol.Vector{} 359 } 360 // never sent externally 361 f.LocalFlags = 0 362 f.VersionHash = nil 363 return f 364} 365 366func (s *indexHandler) String() string { 367 return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.ID().Short(), s.conn) 368} 369 370type indexHandlerRegistry struct { 371 sup *suture.Supervisor 372 evLogger events.Logger 373 conn protocol.Connection 374 downloads *deviceDownloadState 375 indexHandlers map[string]*indexHandler 376 startInfos map[string]*clusterConfigDeviceInfo 377 folderStates map[string]*indexHandlerFolderState 378 mut sync.Mutex 379} 380 381type indexHandlerFolderState struct { 382 cfg config.FolderConfiguration 383 fset *db.FileSet 384 runner service 385} 386 387func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry { 388 r := &indexHandlerRegistry{ 389 conn: conn, 390 downloads: downloads, 391 evLogger: evLogger, 392 indexHandlers: make(map[string]*indexHandler), 393 startInfos: make(map[string]*clusterConfigDeviceInfo), 394 folderStates: make(map[string]*indexHandlerFolderState), 395 mut: sync.Mutex{}, 396 } 397 r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l)) 398 ourToken := parentSup.Add(r.sup) 399 r.sup.Add(svcutil.AsService(func(ctx context.Context) error { 400 select { 401 case <-ctx.Done(): 402 return ctx.Err() 403 case <-closed: 404 parentSup.Remove(ourToken) 405 } 406 return nil 407 }, fmt.Sprintf("%v/waitForClosed", r))) 408 return r 409} 410 411func (r *indexHandlerRegistry) String() string { 412 return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.ID().Short()) 413} 414 415func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor { 416 return r.sup 417} 418 419func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) { 420 if is, ok := r.indexHandlers[folder.ID]; ok { 421 r.sup.RemoveAndWait(is.token, 0) 422 delete(r.indexHandlers, folder.ID) 423 } 424 delete(r.startInfos, folder.ID) 425 426 is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger) 427 is.token = r.sup.Add(is) 428 r.indexHandlers[folder.ID] = is 429} 430 431// AddIndexInfo starts an index handler for given folder, unless it is paused. 432// If it is paused, the given startInfo is stored to start the sender once the 433// folder is resumed. 434// If an index handler is already running, it will be stopped first. 435func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) { 436 r.mut.Lock() 437 defer r.mut.Unlock() 438 439 if is, ok := r.indexHandlers[folder]; ok { 440 r.sup.RemoveAndWait(is.token, 0) 441 delete(r.indexHandlers, folder) 442 l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.ID().Short(), folder) 443 } 444 folderState, ok := r.folderStates[folder] 445 if !ok { 446 l.Debugf("Pending index handler for device %v and folder %v", r.conn.ID().Short(), folder) 447 r.startInfos[folder] = startInfo 448 return 449 } 450 r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo) 451} 452 453// Remove stops a running index handler or removes one pending to be started. 454// It is a noop if the folder isn't known. 455func (r *indexHandlerRegistry) Remove(folder string) { 456 r.mut.Lock() 457 defer r.mut.Unlock() 458 459 l.Debugf("Removing index handler for device %v and folder %v", r.conn.ID().Short(), folder) 460 if is, ok := r.indexHandlers[folder]; ok { 461 r.sup.RemoveAndWait(is.token, 0) 462 delete(r.indexHandlers, folder) 463 } 464 delete(r.startInfos, folder) 465 l.Debugf("Removed index handler for device %v and folder %v", r.conn.ID().Short(), folder) 466} 467 468// RemoveAllExcept stops all running index handlers and removes those pending to be started, 469// except mentioned ones. 470// It is a noop if the folder isn't known. 471func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]struct{}) { 472 r.mut.Lock() 473 defer r.mut.Unlock() 474 475 for folder, is := range r.indexHandlers { 476 if _, ok := except[folder]; !ok { 477 r.sup.RemoveAndWait(is.token, 0) 478 delete(r.indexHandlers, folder) 479 l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder) 480 } 481 } 482 for folder := range r.startInfos { 483 if _, ok := except[folder]; !ok { 484 delete(r.startInfos, folder) 485 l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder) 486 } 487 } 488} 489 490// RegisterFolderState must be called whenever something about the folder 491// changes. The exception being if the folder is removed entirely, then call 492// Remove. The fset and runner arguments may be nil, if given folder is paused. 493func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) { 494 if !folder.SharedWith(r.conn.ID()) { 495 r.Remove(folder.ID) 496 return 497 } 498 499 r.mut.Lock() 500 if folder.Paused { 501 r.folderPausedLocked(folder.ID) 502 } else { 503 r.folderRunningLocked(folder, fset, runner) 504 } 505 r.mut.Unlock() 506} 507 508// folderPausedLocked stops a running index handler. 509// It is a noop if the folder isn't known or has not been started yet. 510func (r *indexHandlerRegistry) folderPausedLocked(folder string) { 511 l.Debugf("Pausing index handler for device %v and folder %v", r.conn.ID().Short(), folder) 512 delete(r.folderStates, folder) 513 if is, ok := r.indexHandlers[folder]; ok { 514 is.pause() 515 l.Debugf("Paused index handler for device %v and folder %v", r.conn.ID().Short(), folder) 516 } else { 517 l.Debugf("No index handler for device %v and folder %v to pause", r.conn.ID().Short(), folder) 518 } 519} 520 521// folderRunningLocked resumes an already running index handler or starts it, if it 522// was added while paused. 523// It is a noop if the folder isn't known. 524func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) { 525 r.folderStates[folder.ID] = &indexHandlerFolderState{ 526 cfg: folder, 527 fset: fset, 528 runner: runner, 529 } 530 531 is, isOk := r.indexHandlers[folder.ID] 532 if info, ok := r.startInfos[folder.ID]; ok { 533 if isOk { 534 r.sup.RemoveAndWait(is.token, 0) 535 delete(r.indexHandlers, folder.ID) 536 l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID) 537 } 538 r.startLocked(folder, fset, runner, info) 539 delete(r.startInfos, folder.ID) 540 l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID) 541 } else if isOk { 542 l.Debugf("Resuming index handler for device %v and folder %v", r.conn.ID().Short(), folder) 543 is.resume(fset, runner) 544 } else { 545 l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.ID().Short(), folder.ID) 546 } 547} 548 549func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error { 550 r.mut.Lock() 551 defer r.mut.Unlock() 552 is, isOk := r.indexHandlers[folder] 553 if !isOk { 554 l.Infof("%v for nonexistent or paused folder %q", op, folder) 555 return ErrFolderMissing 556 } 557 return is.receive(fs, update, op) 558} 559 560// makeForgetUpdate takes an index update and constructs a download progress update 561// causing to forget any progress for files which we've just been sent. 562func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate { 563 updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files)) 564 for _, file := range files { 565 if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() { 566 continue 567 } 568 updates = append(updates, protocol.FileDownloadProgressUpdate{ 569 Name: file.Name, 570 Version: file.Version, 571 UpdateType: protocol.FileDownloadProgressUpdateTypeForget, 572 }) 573 } 574 return updates 575} 576