1package synchronization 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "sync" 8 "time" 9 10 "github.com/pkg/errors" 11 12 "github.com/golang/protobuf/ptypes" 13 14 "github.com/mutagen-io/mutagen/pkg/encoding" 15 "github.com/mutagen-io/mutagen/pkg/logging" 16 "github.com/mutagen-io/mutagen/pkg/mutagen" 17 "github.com/mutagen-io/mutagen/pkg/prompting" 18 "github.com/mutagen-io/mutagen/pkg/state" 19 "github.com/mutagen-io/mutagen/pkg/synchronization/core" 20 "github.com/mutagen-io/mutagen/pkg/synchronization/rsync" 21 "github.com/mutagen-io/mutagen/pkg/url" 22) 23 24const ( 25 // autoReconnectInterval is the period of time to wait before attempting an 26 // automatic reconnect after disconnection or a failed reconnect. 27 autoReconnectInterval = 15 * time.Second 28 // rescanWaitDuration is the period of time to wait before attempting to 29 // rescan after an ephemeral scan failure. 30 rescanWaitDuration = 5 * time.Second 31) 32 33// controller manages and executes a single session. 34type controller struct { 35 // logger is the controller logger. 36 logger *logging.Logger 37 // sessionPath is the path to the serialized session. 38 sessionPath string 39 // archivePath is the path to the serialized archive. 40 archivePath string 41 // stateLock guards and tracks changes to the session member's Paused field 42 // and the state member. 43 stateLock *state.TrackingLock 44 // session encodes the associated session metadata. It is considered static 45 // and safe for concurrent access except for its Paused field, for which the 46 // stateLock member should be held. It should be saved to disk any time it 47 // is modified. 48 session *Session 49 // mergedAlphaConfiguration is the alpha-specific configuration object 50 // (computed from the core configuration and alpha-specific overrides). It 51 // is considered static and safe for concurrent access. It is a derived 52 // field and not saved to disk. 53 mergedAlphaConfiguration *Configuration 54 // mergedBetaConfiguration is the beta-specific configuration object 55 // (computed from the core configuration and beta-specific overrides). It is 56 // considered static and safe for concurrent access. It is a derived field 57 // and not saved to disk. 58 mergedBetaConfiguration *Configuration 59 // state represents the current synchronization state. 60 state *State 61 // lifecycleLock guards setting of the disabled, cancel, flushRequests, and 62 // done members. Access to these members is allowed for the synchronization 63 // loop without holding the lock. Any code wishing to set these members 64 // should first acquire the lock, then cancel the synchronization loop, and 65 // wait for it to complete before making any such changes. 66 lifecycleLock sync.Mutex 67 // disabled indicates that no more changes to the synchronization loop 68 // lifecycle are allowed (i.e. no more synchronization loops can be started 69 // for this controller). This is used by terminate and shutdown. It should 70 // only be set to true once any existing synchronization loop has been 71 // stopped. 72 disabled bool 73 // cancel cancels the synchronization loop execution context. It should be 74 // nil if and only if there is no synchronization loop running. 75 cancel context.CancelFunc 76 // flushRequests is used pass flush requests to the synchronization loop. It 77 // is buffered, allowing a single request to be queued. All requests passed 78 // via this channel must be buffered and contain room for one error. 79 flushRequests chan chan error 80 // done will be closed by the current synchronization loop when it exits. 81 done chan struct{} 82} 83 84// newSession creates a new session and corresponding controller. 85func newSession( 86 ctx context.Context, 87 logger *logging.Logger, 88 tracker *state.Tracker, 89 identifier string, 90 alpha, beta *url.URL, 91 configuration, configurationAlpha, configurationBeta *Configuration, 92 name string, 93 labels map[string]string, 94 paused bool, 95 prompter string, 96) (*controller, error) { 97 // Update status. 98 prompting.Message(prompter, "Creating session...") 99 100 // Set the session version. 101 version := Version_Version1 102 103 // Compute the creation time and convert it to Protocol Buffers format. 104 creationTime := time.Now() 105 creationTimeProto, err := ptypes.TimestampProto(creationTime) 106 if err != nil { 107 return nil, errors.Wrap(err, "unable to convert creation time format") 108 } 109 110 // Compute merged endpoint configurations. 111 mergedAlphaConfiguration := MergeConfigurations(configuration, configurationAlpha) 112 mergedBetaConfiguration := MergeConfigurations(configuration, configurationBeta) 113 114 // If the session isn't being created paused, then try to connect to any 115 // endpoints not using the tunnel protocol. The tunnel protocol is the one 116 // case where we want to allow asynchronous connectivity (since it doesn't 117 // require user input but also isn't guaranteed to connect immediately). If 118 // we connect to endpoints here and don't hand them off to the runloop 119 // below, then defer their shutdown. 120 var alphaEndpoint, betaEndpoint Endpoint 121 defer func() { 122 if alphaEndpoint != nil { 123 alphaEndpoint.Shutdown() 124 alphaEndpoint = nil 125 } 126 if betaEndpoint != nil { 127 betaEndpoint.Shutdown() 128 betaEndpoint = nil 129 } 130 }() 131 if !paused && alpha.Protocol != url.Protocol_Tunnel { 132 logger.Info("Connecting to alpha endpoint") 133 alphaEndpoint, err = connect( 134 ctx, 135 logger.Sublogger("alpha"), 136 alpha, 137 prompter, 138 identifier, 139 version, 140 mergedAlphaConfiguration, 141 true, 142 ) 143 if err != nil { 144 logger.Info("Alpha connection failure:", err) 145 return nil, errors.Wrap(err, "unable to connect to alpha") 146 } 147 } 148 if !paused && beta.Protocol != url.Protocol_Tunnel { 149 logger.Info("Connecting to beta endpoint") 150 betaEndpoint, err = connect( 151 ctx, 152 logger.Sublogger("beta"), 153 beta, 154 prompter, 155 identifier, 156 version, 157 mergedBetaConfiguration, 158 false, 159 ) 160 if err != nil { 161 logger.Info("Beta connection failure:", err) 162 return nil, errors.Wrap(err, "unable to connect to beta") 163 } 164 } 165 166 // Create the session and initial archive. 167 session := &Session{ 168 Identifier: identifier, 169 Version: version, 170 CreationTime: creationTimeProto, 171 CreatingVersionMajor: mutagen.VersionMajor, 172 CreatingVersionMinor: mutagen.VersionMinor, 173 CreatingVersionPatch: mutagen.VersionPatch, 174 Alpha: alpha, 175 Beta: beta, 176 Configuration: configuration, 177 ConfigurationAlpha: configurationAlpha, 178 ConfigurationBeta: configurationBeta, 179 Name: name, 180 Labels: labels, 181 Paused: paused, 182 } 183 archive := &core.Archive{} 184 185 // Compute the session and archive paths. 186 sessionPath, err := pathForSession(session.Identifier) 187 if err != nil { 188 return nil, errors.Wrap(err, "unable to compute session path") 189 } 190 archivePath, err := pathForArchive(session.Identifier) 191 if err != nil { 192 return nil, errors.Wrap(err, "unable to compute archive path") 193 } 194 195 // Save components to disk. 196 if err := encoding.MarshalAndSaveProtobuf(sessionPath, session); err != nil { 197 return nil, errors.Wrap(err, "unable to save session") 198 } 199 if err := encoding.MarshalAndSaveProtobuf(archivePath, archive); err != nil { 200 os.Remove(sessionPath) 201 return nil, errors.Wrap(err, "unable to save archive") 202 } 203 204 // Create the controller. 205 controller := &controller{ 206 logger: logger, 207 sessionPath: sessionPath, 208 archivePath: archivePath, 209 stateLock: state.NewTrackingLock(tracker), 210 session: session, 211 mergedAlphaConfiguration: mergedAlphaConfiguration, 212 mergedBetaConfiguration: mergedBetaConfiguration, 213 state: &State{ 214 Session: session, 215 }, 216 } 217 218 // If the session isn't being created pre-paused, then start a 219 // synchronization loop and mark the endpoints as handed off to that loop so 220 // that we don't defer their shutdown. 221 if !paused { 222 logger.Info("Starting synchronization loop") 223 ctx, cancel := context.WithCancel(context.Background()) 224 controller.cancel = cancel 225 controller.flushRequests = make(chan chan error, 1) 226 controller.done = make(chan struct{}) 227 go controller.run(ctx, alphaEndpoint, betaEndpoint) 228 alphaEndpoint = nil 229 betaEndpoint = nil 230 } 231 232 // Success. 233 logger.Info("Session initialized") 234 return controller, nil 235} 236 237// loadSession loads an existing session and creates a corresponding controller. 238func loadSession(logger *logging.Logger, tracker *state.Tracker, identifier string) (*controller, error) { 239 // Compute session and archive paths. 240 sessionPath, err := pathForSession(identifier) 241 if err != nil { 242 return nil, errors.Wrap(err, "unable to compute session path") 243 } 244 archivePath, err := pathForArchive(identifier) 245 if err != nil { 246 return nil, errors.Wrap(err, "unable to compute archive path") 247 } 248 249 // Load and validate the session. We have to populate a few optional fields 250 // before validation if they're not set. We can't do this in the Session 251 // literal because they'll be wiped out during unmarshalling, even if not 252 // set. 253 session := &Session{} 254 if err := encoding.LoadAndUnmarshalProtobuf(sessionPath, session); err != nil { 255 return nil, errors.Wrap(err, "unable to load session configuration") 256 } 257 if session.ConfigurationAlpha == nil { 258 session.ConfigurationAlpha = &Configuration{} 259 } 260 if session.ConfigurationBeta == nil { 261 session.ConfigurationBeta = &Configuration{} 262 } 263 if err := session.EnsureValid(); err != nil { 264 return nil, errors.Wrap(err, "invalid session found on disk") 265 } 266 267 // Create the controller. 268 controller := &controller{ 269 logger: logger, 270 sessionPath: sessionPath, 271 archivePath: archivePath, 272 stateLock: state.NewTrackingLock(tracker), 273 session: session, 274 mergedAlphaConfiguration: MergeConfigurations( 275 session.Configuration, 276 session.ConfigurationAlpha, 277 ), 278 mergedBetaConfiguration: MergeConfigurations( 279 session.Configuration, 280 session.ConfigurationBeta, 281 ), 282 state: &State{ 283 Session: session, 284 }, 285 } 286 287 // If the session isn't marked as paused, start a synchronization loop. 288 if !session.Paused { 289 ctx, cancel := context.WithCancel(context.Background()) 290 controller.cancel = cancel 291 controller.flushRequests = make(chan chan error, 1) 292 controller.done = make(chan struct{}) 293 go controller.run(ctx, nil, nil) 294 } 295 296 // Success. 297 logger.Info("Session loaded") 298 return controller, nil 299} 300 301// currentState creates a snapshot of the current session state. 302func (c *controller) currentState() *State { 303 // Lock the session state and defer its release. It's very important that we 304 // unlock without a notification here, otherwise we'd trigger an infinite 305 // cycle of list/notify. 306 c.stateLock.Lock() 307 defer c.stateLock.UnlockWithoutNotify() 308 309 // Perform a (pseudo) deep copy of the state. 310 return c.state.Copy() 311} 312 313// flush attempts to force a synchronization cycle for the session. If wait is 314// specified, then the method will wait until a post-flush synchronization cycle 315// has completed. The provided context (which must be non-nil) can terminate 316// this wait early. 317func (c *controller) flush(ctx context.Context, prompter string, skipWait bool) error { 318 // Update status. 319 prompting.Message(prompter, fmt.Sprintf("Forcing synchronization cycle for session %s...", c.session.Identifier)) 320 321 // Lock the controller's lifecycle and defer its release. 322 c.lifecycleLock.Lock() 323 defer c.lifecycleLock.Unlock() 324 325 // Don't allow any operations if the controller is disabled. 326 if c.disabled { 327 return errors.New("controller disabled") 328 } 329 330 // Check if the session is paused by checking whether or not it has a 331 // synchronization loop. It's an internal invariant that a session has a 332 // synchronization loop if and only if it is not paused, but checking for 333 // paused status requires holding the state lock, whereas checking for the 334 // existence of a synchronization loop only requires holding the lifecycle 335 // lock, which we do at this point. 336 if c.cancel == nil { 337 return errors.New("session is paused") 338 } 339 340 // Create a flush request. 341 request := make(chan error, 1) 342 343 // If we don't want to wait, then we can simply send the request in a 344 // non-blocking manner, in which case either this request (or one that's 345 // already queued) will be processed eventually. After that, we're done. 346 if skipWait { 347 // Send the request in a non-blocking manner. 348 select { 349 case c.flushRequests <- request: 350 default: 351 } 352 353 // Success. 354 return nil 355 } 356 357 // Otherwise we need to send the request in a blocking manner, watching for 358 // cancellation in the mean time. 359 select { 360 case c.flushRequests <- request: 361 case <-ctx.Done(): 362 return errors.New("flush cancelled before request could be sent") 363 } 364 365 // Now we need to wait for a response to the request, again watching for 366 // cancellation in the mean time. 367 select { 368 case err := <-request: 369 if err != nil { 370 return err 371 } 372 case <-ctx.Done(): 373 return errors.New("flush cancelled while waiting for synchronization cycle") 374 } 375 376 // Success. 377 return nil 378} 379 380// resume attempts to reconnect and resume the session if it isn't currently 381// connected and synchronizing. If lifecycleLockHeld is true, then halt will 382// assume that the lifecycle lock is held by the caller and will not attempt to 383// acquire it. 384func (c *controller) resume(ctx context.Context, prompter string, lifecycleLockHeld bool) error { 385 // Update status. 386 prompting.Message(prompter, fmt.Sprintf("Resuming session %s...", c.session.Identifier)) 387 388 // If not already held, acquire the lifecycle lock and defer its release. 389 if !lifecycleLockHeld { 390 c.lifecycleLock.Lock() 391 defer c.lifecycleLock.Unlock() 392 } 393 394 // Don't allow any resume operations if the controller is disabled. 395 if c.disabled { 396 return errors.New("controller disabled") 397 } 398 399 // Check if there's an existing synchronization loop (i.e. if the session is 400 // unpaused). 401 if c.cancel != nil { 402 // If there is an existing synchronization loop, check if it's already 403 // in a state that's considered "connected". 404 c.stateLock.Lock() 405 connected := c.state.Status >= Status_Watching 406 c.stateLock.UnlockWithoutNotify() 407 408 // If we're already connected, then there's nothing we need to do. We 409 // don't even need to mark the session as unpaused because it can't be 410 // marked as paused if an existing synchronization loop is running (we 411 // enforce this invariant as part of the controller's logic). 412 if connected { 413 return nil 414 } 415 416 // Otherwise, cancel the existing synchronization loop and wait for it 417 // to finish. 418 // 419 // There's something of an efficiency race condition here, because the 420 // existing loop might succeed in connecting between the time we check 421 // and the time we cancel it. That could happen if an auto-reconnect 422 // succeeds or even if the loop was already passed connections and it's 423 // just hasn't updated its status yet. But the only danger here is 424 // basically wasting those connections, and the window is very small. 425 c.cancel() 426 <-c.done 427 428 // Nil out any lifecycle state. 429 c.cancel = nil 430 c.flushRequests = nil 431 c.done = nil 432 } 433 434 // Mark the session as unpaused and save it to disk. 435 c.stateLock.Lock() 436 c.session.Paused = false 437 saveErr := encoding.MarshalAndSaveProtobuf(c.sessionPath, c.session) 438 c.stateLock.Unlock() 439 440 // Attempt to connect to alpha. 441 c.stateLock.Lock() 442 c.state.Status = Status_ConnectingAlpha 443 c.stateLock.Unlock() 444 alpha, alphaConnectErr := connect( 445 ctx, 446 c.logger.Sublogger("alpha"), 447 c.session.Alpha, 448 prompter, 449 c.session.Identifier, 450 c.session.Version, 451 c.mergedAlphaConfiguration, 452 true, 453 ) 454 c.stateLock.Lock() 455 c.state.AlphaConnected = (alpha != nil) 456 c.stateLock.Unlock() 457 458 // Attempt to connect to beta. 459 c.stateLock.Lock() 460 c.state.Status = Status_ConnectingBeta 461 c.stateLock.Unlock() 462 beta, betaConnectErr := connect( 463 ctx, 464 c.logger.Sublogger("beta"), 465 c.session.Beta, 466 prompter, 467 c.session.Identifier, 468 c.session.Version, 469 c.mergedBetaConfiguration, 470 false, 471 ) 472 c.stateLock.Lock() 473 c.state.BetaConnected = (beta != nil) 474 c.stateLock.Unlock() 475 476 // Start the synchronization loop with what we have. Alpha or beta may have 477 // failed to connect (and be nil), but in any case that'll just make the run 478 // loop keep trying to connect. 479 ctx, cancel := context.WithCancel(context.Background()) 480 c.cancel = cancel 481 c.flushRequests = make(chan chan error, 1) 482 c.done = make(chan struct{}) 483 go c.run(ctx, alpha, beta) 484 485 // Report any errors. Since we always want to start a synchronization loop, 486 // even on partial or complete failure (since it might be able to 487 // auto-reconnect on its own), we wait until the end to report errors. 488 if saveErr != nil { 489 return errors.Wrap(saveErr, "unable to save session") 490 } else if alphaConnectErr != nil { 491 return errors.Wrap(alphaConnectErr, "unable to connect to alpha") 492 } else if betaConnectErr != nil { 493 return errors.Wrap(betaConnectErr, "unable to connect to beta") 494 } 495 496 // Success. 497 return nil 498} 499 500// controllerHaltMode represents the behavior to use when halting a session. 501type controllerHaltMode uint8 502 503const ( 504 // controllerHaltModePause indicates that a session should be halted and 505 // marked as paused. 506 controllerHaltModePause controllerHaltMode = iota 507 // controllerHaltModeShutdown indicates that a session should be halted. 508 controllerHaltModeShutdown 509 // controllerHaltModeShutdown indicates that a session should be halted and 510 // then deleted. 511 controllerHaltModeTerminate 512) 513 514// description returns a human-readable description of a halt mode. 515func (m controllerHaltMode) description() string { 516 switch m { 517 case controllerHaltModePause: 518 return "Pausing" 519 case controllerHaltModeShutdown: 520 return "Shutting down" 521 case controllerHaltModeTerminate: 522 return "Terminating" 523 default: 524 panic("unhandled halt mode") 525 } 526} 527 528// halt halts the session with the specified behavior. If lifecycleLockHeld is 529// true, then halt will assume that the lifecycle lock is held by the caller and 530// will not attempt to acquire it. 531func (c *controller) halt(_ context.Context, mode controllerHaltMode, prompter string, lifecycleLockHeld bool) error { 532 // Update status. 533 prompting.Message(prompter, fmt.Sprintf("%s session %s...", mode.description(), c.session.Identifier)) 534 535 // If not already held, acquire the lifecycle lock and defer its release. 536 if !lifecycleLockHeld { 537 c.lifecycleLock.Lock() 538 defer c.lifecycleLock.Unlock() 539 } 540 541 // Don't allow any additional halt operations if the controller is disabled, 542 // because either this session is being terminated or the service is 543 // shutting down, and in either case there is no point in halting. 544 if c.disabled { 545 return errors.New("controller disabled") 546 } 547 548 // Kill any existing synchronization loop. 549 if c.cancel != nil { 550 // Cancel the synchronization loop and wait for it to finish. 551 c.cancel() 552 <-c.done 553 554 // Nil out any lifecycle state. 555 c.cancel = nil 556 c.flushRequests = nil 557 c.done = nil 558 } 559 560 // Handle based on the halt mode. 561 if mode == controllerHaltModePause { 562 // Mark the session as paused and save it. 563 c.stateLock.Lock() 564 c.session.Paused = true 565 saveErr := encoding.MarshalAndSaveProtobuf(c.sessionPath, c.session) 566 c.stateLock.Unlock() 567 if saveErr != nil { 568 return errors.Wrap(saveErr, "unable to save session") 569 } 570 } else if mode == controllerHaltModeShutdown { 571 // Disable the controller. 572 c.disabled = true 573 } else if mode == controllerHaltModeTerminate { 574 // Disable the controller. 575 c.disabled = true 576 577 // Wipe the session information from disk. 578 sessionRemoveErr := os.Remove(c.sessionPath) 579 archiveRemoveErr := os.Remove(c.archivePath) 580 if sessionRemoveErr != nil { 581 return errors.Wrap(sessionRemoveErr, "unable to remove session from disk") 582 } else if archiveRemoveErr != nil { 583 return errors.Wrap(archiveRemoveErr, "unable to remove archive from disk") 584 } 585 } else { 586 panic("invalid halt mode specified") 587 } 588 589 // Success. 590 return nil 591} 592 593// reset resets synchronization session history by pausing the session (if it's 594// running), overwriting the ancestor data stored on disk with an empty 595// ancestor, and then resuming the session (if it was previously running). 596func (c *controller) reset(ctx context.Context, prompter string) error { 597 // Lock the controller's lifecycle and defer its release. 598 c.lifecycleLock.Lock() 599 defer c.lifecycleLock.Unlock() 600 601 // Check if the session is currently running. 602 running := c.cancel != nil 603 604 // If the session is running, pause it. 605 if running { 606 if err := c.halt(ctx, controllerHaltModePause, prompter, true); err != nil { 607 return fmt.Errorf("unable to pause session: %w", err) 608 } 609 } 610 611 // Reset the session archive on disk. 612 archive := &core.Archive{} 613 if err := encoding.MarshalAndSaveProtobuf(c.archivePath, archive); err != nil { 614 return fmt.Errorf("unable to clear session history: %w", err) 615 } 616 617 // Resume the session if it was previously running. 618 if running { 619 if err := c.resume(ctx, prompter, true); err != nil { 620 return fmt.Errorf("unable to resume session: %w", err) 621 } 622 } 623 624 // Success. 625 return nil 626} 627 628// run is the main runloop for the controller, managing connectivity and 629// synchronization. 630func (c *controller) run(ctx context.Context, alpha, beta Endpoint) { 631 // Defer resource and state cleanup. 632 defer func() { 633 // Shutdown any endpoints. These might be non-nil if the runloop was 634 // cancelled while partially connected rather than after sync failure. 635 if alpha != nil { 636 alpha.Shutdown() 637 } 638 if beta != nil { 639 beta.Shutdown() 640 } 641 642 // Reset the state. 643 c.stateLock.Lock() 644 c.state = &State{ 645 Session: c.session, 646 } 647 c.stateLock.Unlock() 648 649 // Signal completion. 650 close(c.done) 651 }() 652 653 // Track the last time that synchronization failed. 654 var lastSynchronizationFailureTime time.Time 655 656 // Loop until cancelled. 657 for { 658 // Loop until we're connected to both endpoints. We do a non-blocking 659 // check for cancellation on each reconnect error so that we don't waste 660 // resources by trying another connect when the context has been 661 // cancelled (it'll be wasteful). This is better than sentinel errors. 662 for { 663 // Ensure that alpha is connected. 664 if alpha == nil { 665 c.stateLock.Lock() 666 c.state.Status = Status_ConnectingAlpha 667 c.stateLock.Unlock() 668 alpha, _ = connect( 669 ctx, 670 c.logger.Sublogger("alpha"), 671 c.session.Alpha, 672 "", 673 c.session.Identifier, 674 c.session.Version, 675 c.mergedAlphaConfiguration, 676 true, 677 ) 678 } 679 c.stateLock.Lock() 680 c.state.AlphaConnected = (alpha != nil) 681 c.stateLock.Unlock() 682 683 // Check for cancellation to avoid a spurious connection to beta in 684 // case cancellation occurred while connecting to alpha. 685 select { 686 case <-ctx.Done(): 687 return 688 default: 689 } 690 691 // Ensure that beta is connected. 692 if beta == nil { 693 c.stateLock.Lock() 694 c.state.Status = Status_ConnectingBeta 695 c.stateLock.Unlock() 696 beta, _ = connect( 697 ctx, 698 c.logger.Sublogger("beta"), 699 c.session.Beta, 700 "", 701 c.session.Identifier, 702 c.session.Version, 703 c.mergedBetaConfiguration, 704 false, 705 ) 706 } 707 c.stateLock.Lock() 708 c.state.BetaConnected = (beta != nil) 709 c.stateLock.Unlock() 710 711 // If both endpoints are connected, we're done. We perform this 712 // check here (rather than in the loop condition) because if we did 713 // it in the loop condition we'd still need a check here to avoid a 714 // sleep every time (even if already successfully connected). 715 if alpha != nil && beta != nil { 716 break 717 } 718 719 // If we failed to connect, wait and then retry. Watch for 720 // cancellation in the mean time. 721 select { 722 case <-ctx.Done(): 723 return 724 case <-time.After(autoReconnectInterval): 725 } 726 } 727 728 // Perform synchronization. 729 err := c.synchronize(ctx, alpha, beta) 730 731 // Shutdown the endpoints. 732 alpha.Shutdown() 733 alpha = nil 734 beta.Shutdown() 735 beta = nil 736 737 // Reset the synchronization state, but propagate the error that caused 738 // failure. 739 c.stateLock.Lock() 740 c.state = &State{ 741 Session: c.session, 742 LastError: err.Error(), 743 } 744 c.stateLock.Unlock() 745 746 // When synchronization fails, we generally want to restart it as 747 // quickly as possible. Thus, if it's been longer than our usual waiting 748 // period since synchronization failed last, simply try to reconnect 749 // immediately (though still check for cancellation). If it's been less 750 // than our usual waiting period since synchronization failed last, then 751 // something is probably wrong, so wait for our usual waiting period 752 // (while checking and monitoring for cancellation). 753 now := time.Now() 754 if now.Sub(lastSynchronizationFailureTime) >= autoReconnectInterval { 755 select { 756 case <-ctx.Done(): 757 return 758 default: 759 } 760 } else { 761 select { 762 case <-ctx.Done(): 763 return 764 case <-time.After(autoReconnectInterval): 765 } 766 } 767 lastSynchronizationFailureTime = now 768 } 769} 770 771// synchronize is the main synchronization loop for the controller. 772func (c *controller) synchronize(ctx context.Context, alpha, beta Endpoint) error { 773 // Clear any error state upon restart of this function. If there was a 774 // terminal error previously caused synchronization to fail, then the user 775 // will have had time to review it (while the run loop is waiting to 776 // reconnect), so it's not like we're getting rid of it too quickly. 777 c.stateLock.Lock() 778 if c.state.LastError != "" { 779 c.state.LastError = "" 780 c.stateLock.Unlock() 781 } else { 782 c.stateLock.UnlockWithoutNotify() 783 } 784 785 // Track any flush request that we've pulled from the queue but haven't 786 // marked as complete. If we bail due to an error, then close out the 787 // request. 788 var flushRequest chan error 789 defer func() { 790 if flushRequest != nil { 791 flushRequest <- errors.New("synchronization cycle failed") 792 flushRequest = nil 793 } 794 }() 795 796 // Load the archive and extract the ancestor. 797 archive := &core.Archive{} 798 if err := encoding.LoadAndUnmarshalProtobuf(c.archivePath, archive); err != nil { 799 return errors.Wrap(err, "unable to load archive") 800 } else if err = archive.Root.EnsureValid(); err != nil { 801 return errors.Wrap(err, "invalid archive found on disk") 802 } 803 ancestor := archive.Root 804 805 // Compute the effective synchronization mode. 806 synchronizationMode := c.session.Configuration.SynchronizationMode 807 if synchronizationMode.IsDefault() { 808 synchronizationMode = c.session.Version.DefaultSynchronizationMode() 809 } 810 811 // Compute, on a per-endpoint basis, whether or not polling should be 812 // disabled. 813 αWatchMode := c.mergedAlphaConfiguration.WatchMode 814 βWatchMode := c.mergedBetaConfiguration.WatchMode 815 if αWatchMode.IsDefault() { 816 αWatchMode = c.session.Version.DefaultWatchMode() 817 } 818 if βWatchMode.IsDefault() { 819 βWatchMode = c.session.Version.DefaultWatchMode() 820 } 821 αDisablePolling := (αWatchMode == WatchMode_WatchModeNoWatch) 822 βDisablePolling := (βWatchMode == WatchMode_WatchModeNoWatch) 823 824 // Create a switch that will allow us to skip polling and force a 825 // synchronization cycle. On startup, we enable this switch and skip polling 826 // to immediately force a check for changes that may have occurred while the 827 // synchronization loop wasn't running. The only time we don't force this 828 // check on startup is when both endpoints have polling disabled, which is 829 // an indication that the session should operate in a fully manual mode. 830 skipPolling := (!αDisablePolling || !βDisablePolling) 831 832 // Create variables to track our reasons for skipping polling. 833 var skippingPollingDueToScanError, skippingPollingDueToMissingFiles bool 834 835 // Loop until there is a synchronization error. 836 for { 837 // Unless we've been requested to skip polling, wait for a dirty state 838 // while monitoring for cancellation. If we've been requested to skip 839 // polling, it should only be for one iteration. 840 if !skipPolling { 841 // Update status to watching. 842 c.stateLock.Lock() 843 c.state.Status = Status_Watching 844 c.stateLock.Unlock() 845 846 // Create a polling context that we can cancel. We don't make it a 847 // subcontext of our own cancellation context because it's easier to 848 // just track cancellation there separately. 849 pollCtx, pollCancel := context.WithCancel(context.Background()) 850 851 // Start alpha polling. If alpha has been put into a no-watch mode, 852 // then we still perform polling in order to detect transport errors 853 // that might occur while the session is sitting idle, but we ignore 854 // any non-error responses and instead wait for the polling context 855 // to be cancelled. We perform this ignore operation because we 856 // don't want a broken or malicious endpoint to be able to force 857 // synchronization, especially if its watching has been 858 // intentionally disabled. 859 // 860 // It's worth noting that, because a well-behaved endpoint in 861 // no-watch mode never returns events, we'll always be polling on it 862 // (and thereby testing the transport) right up until the polling 863 // context is cancelled. Thus, there's no need to worry about cases 864 // where the endpoint sends back an event that we ignore and then 865 // has a transport failure without us noticing while we wait on the 866 // polling context (at least not for well-behaved endpoints). 867 αPollResults := make(chan error, 1) 868 go func() { 869 if αDisablePolling { 870 if err := alpha.Poll(pollCtx); err != nil { 871 αPollResults <- err 872 } else { 873 <-pollCtx.Done() 874 αPollResults <- nil 875 } 876 } else { 877 αPollResults <- alpha.Poll(pollCtx) 878 } 879 }() 880 881 // Start beta polling. The logic here mirrors that for alpha above. 882 βPollResults := make(chan error, 1) 883 go func() { 884 if βDisablePolling { 885 if err := beta.Poll(pollCtx); err != nil { 886 βPollResults <- err 887 } else { 888 <-pollCtx.Done() 889 βPollResults <- nil 890 } 891 } else { 892 βPollResults <- beta.Poll(pollCtx) 893 } 894 }() 895 896 // Wait for either poll to return an event or an error, for a flush 897 // request, or for cancellation. In any of these cases, cancel 898 // polling and ensure that both polling operations have completed. 899 var αPollErr, βPollErr error 900 cancelled := false 901 select { 902 case αPollErr = <-αPollResults: 903 pollCancel() 904 βPollErr = <-βPollResults 905 case βPollErr = <-βPollResults: 906 pollCancel() 907 αPollErr = <-αPollResults 908 case flushRequest = <-c.flushRequests: 909 if cap(flushRequest) < 1 { 910 panic("unbuffered flush request") 911 } 912 pollCancel() 913 αPollErr = <-αPollResults 914 βPollErr = <-βPollResults 915 case <-ctx.Done(): 916 cancelled = true 917 pollCancel() 918 αPollErr = <-αPollResults 919 βPollErr = <-βPollResults 920 } 921 922 // Watch for errors or cancellation. 923 if cancelled { 924 return errors.New("cancelled during polling") 925 } else if αPollErr != nil { 926 return errors.Wrap(αPollErr, "alpha polling error") 927 } else if βPollErr != nil { 928 return errors.Wrap(βPollErr, "beta polling error") 929 } 930 } else { 931 skipPolling = false 932 } 933 934 // Scan both endpoints in parallel and check for errors. If a flush 935 // request is present, then force both endpoints to perform a full 936 // (warm) re-scan rather than using acceleration. 937 c.stateLock.Lock() 938 c.state.Status = Status_Scanning 939 c.stateLock.Unlock() 940 forceFullScan := flushRequest != nil 941 var αSnapshot, βSnapshot *core.Entry 942 var αPreservesExecutability, βPreservesExecutability bool 943 var αScanErr, βScanErr error 944 var αTryAgain, βTryAgain bool 945 scanDone := &sync.WaitGroup{} 946 scanDone.Add(2) 947 go func() { 948 αSnapshot, αPreservesExecutability, αScanErr, αTryAgain = alpha.Scan(ctx, ancestor, forceFullScan) 949 scanDone.Done() 950 }() 951 go func() { 952 βSnapshot, βPreservesExecutability, βScanErr, βTryAgain = beta.Scan(ctx, ancestor, forceFullScan) 953 scanDone.Done() 954 }() 955 scanDone.Wait() 956 957 // Check if cancellation occurred during scanning. 958 select { 959 case <-ctx.Done(): 960 return errors.New("cancelled during scanning") 961 default: 962 } 963 964 // Check for scan errors. 965 if αScanErr != nil { 966 αScanErr = errors.Wrap(αScanErr, "alpha scan error") 967 if !αTryAgain { 968 return αScanErr 969 } else { 970 c.stateLock.Lock() 971 c.state.LastError = αScanErr.Error() 972 c.stateLock.Unlock() 973 } 974 } 975 if βScanErr != nil { 976 βScanErr = errors.Wrap(βScanErr, "beta scan error") 977 if !βTryAgain { 978 return βScanErr 979 } else { 980 c.stateLock.Lock() 981 c.state.LastError = βScanErr.Error() 982 c.stateLock.Unlock() 983 } 984 } 985 986 // Watch for retry recommendations from scan operations. These occur 987 // when a scan fails and concurrent modifications are suspected as the 988 // culprit. In these cases, we force another synchronization cycle. Note 989 // that, because we skip polling, our flush request, if any, will still 990 // be valid, and we'll be able to respond to it once a successful 991 // synchronization cycle completes. 992 // 993 // TODO: Should we eventually abort synchronization after a certain 994 // number of consecutive scan retries? 995 if αTryAgain || βTryAgain { 996 // If we're already in a synchronization cycle that was forced due 997 // to a previous scan error, and we've now received another retry 998 // recommendation, then wait before attempting a rescan. 999 if skippingPollingDueToScanError { 1000 // Update status to waiting for rescan. 1001 c.stateLock.Lock() 1002 c.state.Status = Status_WaitingForRescan 1003 c.stateLock.Unlock() 1004 1005 // Wait before trying to rescan, but watch for cancellation. 1006 select { 1007 case <-time.After(rescanWaitDuration): 1008 case <-ctx.Done(): 1009 return errors.New("cancelled during rescan wait") 1010 } 1011 } 1012 1013 // Retry. 1014 skipPolling = true 1015 skippingPollingDueToScanError = true 1016 continue 1017 } 1018 skippingPollingDueToScanError = false 1019 1020 // Clear the last error (if any) after a successful scan. Since scan 1021 // errors are the only non-terminal errors, and since we know that we've 1022 // cleared any other terminal error at the entry to this loop, we know 1023 // that what we're covering up here can only be a scan error. 1024 c.stateLock.Lock() 1025 if c.state.LastError != "" { 1026 c.state.LastError = "" 1027 c.stateLock.Unlock() 1028 } else { 1029 c.stateLock.UnlockWithoutNotify() 1030 } 1031 1032 // If one side preserves executability and the other does not, then 1033 // propagate executability from the preserving side to the 1034 // non-preserving side. 1035 if αPreservesExecutability && !βPreservesExecutability { 1036 βSnapshot = core.PropagateExecutability(ancestor, αSnapshot, βSnapshot) 1037 } else if βPreservesExecutability && !αPreservesExecutability { 1038 αSnapshot = core.PropagateExecutability(ancestor, βSnapshot, αSnapshot) 1039 } 1040 1041 // Update status to reconciling. 1042 c.stateLock.Lock() 1043 c.state.Status = Status_Reconciling 1044 c.stateLock.Unlock() 1045 1046 // Check if the root is a directory that's been emptied (by deleting a 1047 // non-trivial amount of content) on one endpoint (but not both). This 1048 // can be intentional, but usually indicates that a non-persistent 1049 // filesystem (such as a container filesystem) is being used as the 1050 // synchronization root. In any case, we switch to a halted state and 1051 // wait for the user to either manually propagate the deletion and 1052 // resume the session, recreate the session, or reset the session. 1053 if oneEndpointEmptiedRoot(ancestor, αSnapshot, βSnapshot) { 1054 c.stateLock.Lock() 1055 c.state.Status = Status_HaltedOnRootEmptied 1056 c.stateLock.Unlock() 1057 <-ctx.Done() 1058 return errors.New("cancelled while halted on emptied root") 1059 } 1060 1061 // Perform reconciliation. 1062 ancestorChanges, αTransitions, βTransitions, conflicts := core.Reconcile( 1063 ancestor, 1064 αSnapshot, 1065 βSnapshot, 1066 synchronizationMode, 1067 ) 1068 1069 // Create a slim copy of the conflicts so that we don't need to hold 1070 // the full-size versions in memory or send them over the wire. 1071 var slimConflicts []*core.Conflict 1072 if len(conflicts) > 0 { 1073 slimConflicts = make([]*core.Conflict, len(conflicts)) 1074 for c, conflict := range conflicts { 1075 slimConflicts[c] = conflict.CopySlim() 1076 } 1077 } 1078 c.stateLock.Lock() 1079 c.state.Conflicts = slimConflicts 1080 c.stateLock.Unlock() 1081 1082 // Check if a root deletion operation is being propagated. This can be 1083 // intentional, accidental, or an indication of a non-persistent 1084 // filesystem (such as a container filesystem). In any case, we switch 1085 // to a halted state and wait for the user to either manually propagate 1086 // the deletion and resume the session, recreate the session, or reset 1087 // the session. 1088 if containsRootDeletion(αTransitions) || containsRootDeletion(βTransitions) { 1089 c.stateLock.Lock() 1090 c.state.Status = Status_HaltedOnRootDeletion 1091 c.stateLock.Unlock() 1092 <-ctx.Done() 1093 return errors.New("cancelled while halted on root deletion") 1094 } 1095 1096 // Check if a root type change is being propagated. This can be 1097 // intentional or accidental. In any case, we switch to a halted state 1098 // and wait for the user to manually delete the content that will be 1099 // overwritten by the type change and resume the session. 1100 if containsRootTypeChange(αTransitions) || containsRootTypeChange(βTransitions) { 1101 c.stateLock.Lock() 1102 c.state.Status = Status_HaltedOnRootTypeChange 1103 c.stateLock.Unlock() 1104 <-ctx.Done() 1105 return errors.New("cancelled while halted on root type change") 1106 } 1107 1108 // Create a monitoring callback for rsync staging. 1109 monitor := func(status *rsync.ReceiverStatus) error { 1110 c.stateLock.Lock() 1111 c.state.StagingStatus = status 1112 c.stateLock.Unlock() 1113 return nil 1114 } 1115 1116 // Stage files on alpha. 1117 c.stateLock.Lock() 1118 c.state.Status = Status_StagingAlpha 1119 c.stateLock.Unlock() 1120 if paths, digests, err := core.TransitionDependencies(αTransitions); err != nil { 1121 return errors.Wrap(err, "unable to determine paths for staging on alpha") 1122 } else if len(paths) > 0 { 1123 filteredPaths, signatures, receiver, err := alpha.Stage(paths, digests) 1124 if err != nil { 1125 return errors.Wrap(err, "unable to begin staging on alpha") 1126 } 1127 if !filteredPathsAreSubset(filteredPaths, paths) { 1128 return errors.New("alpha returned incorrect subset of staging paths") 1129 } 1130 if len(filteredPaths) > 0 { 1131 receiver = rsync.NewMonitoringReceiver(receiver, filteredPaths, monitor) 1132 receiver = rsync.NewPreemptableReceiver(ctx, receiver) 1133 if err = beta.Supply(filteredPaths, signatures, receiver); err != nil { 1134 return errors.Wrap(err, "unable to stage files on alpha") 1135 } 1136 } 1137 } 1138 1139 // Stage files on beta. 1140 c.stateLock.Lock() 1141 c.state.Status = Status_StagingBeta 1142 c.stateLock.Unlock() 1143 if paths, digests, err := core.TransitionDependencies(βTransitions); err != nil { 1144 return errors.Wrap(err, "unable to determine paths for staging on beta") 1145 } else if len(paths) > 0 { 1146 filteredPaths, signatures, receiver, err := beta.Stage(paths, digests) 1147 if err != nil { 1148 return errors.Wrap(err, "unable to begin staging on beta") 1149 } 1150 if !filteredPathsAreSubset(filteredPaths, paths) { 1151 return errors.New("beta returned incorrect subset of staging paths") 1152 } 1153 if len(filteredPaths) > 0 { 1154 receiver = rsync.NewMonitoringReceiver(receiver, filteredPaths, monitor) 1155 receiver = rsync.NewPreemptableReceiver(ctx, receiver) 1156 if err = alpha.Supply(filteredPaths, signatures, receiver); err != nil { 1157 return errors.Wrap(err, "unable to stage files on beta") 1158 } 1159 } 1160 } 1161 1162 // Perform transitions on both endpoints in parallel. For each side that 1163 // doesn't completely error out, convert its results to ancestor 1164 // changes. Transition errors are checked later, once the ancestor has 1165 // been updated. 1166 c.stateLock.Lock() 1167 c.state.Status = Status_Transitioning 1168 c.stateLock.Unlock() 1169 var αResults, βResults []*core.Entry 1170 var αProblems, βProblems []*core.Problem 1171 var αMissingFiles, βMissingFiles bool 1172 var αTransitionErr, βTransitionErr error 1173 var αChanges, βChanges []*core.Change 1174 transitionDone := &sync.WaitGroup{} 1175 if len(αTransitions) > 0 { 1176 transitionDone.Add(1) 1177 } 1178 if len(βTransitions) > 0 { 1179 transitionDone.Add(1) 1180 } 1181 if len(αTransitions) > 0 { 1182 go func() { 1183 αResults, αProblems, αMissingFiles, αTransitionErr = alpha.Transition(ctx, αTransitions) 1184 if αTransitionErr == nil { 1185 for t, transition := range αTransitions { 1186 αChanges = append(αChanges, &core.Change{Path: transition.Path, New: αResults[t]}) 1187 } 1188 } 1189 transitionDone.Done() 1190 }() 1191 } 1192 if len(βTransitions) > 0 { 1193 go func() { 1194 βResults, βProblems, βMissingFiles, βTransitionErr = beta.Transition(ctx, βTransitions) 1195 if βTransitionErr == nil { 1196 for t, transition := range βTransitions { 1197 βChanges = append(βChanges, &core.Change{Path: transition.Path, New: βResults[t]}) 1198 } 1199 } 1200 transitionDone.Done() 1201 }() 1202 } 1203 transitionDone.Wait() 1204 1205 // Record problems and then combine changes and propagate them to the 1206 // ancestor. Even if there were transition errors, this code is still 1207 // valid. 1208 c.stateLock.Lock() 1209 c.state.Status = Status_Saving 1210 c.state.AlphaProblems = αProblems 1211 c.state.BetaProblems = βProblems 1212 c.stateLock.Unlock() 1213 ancestorChanges = append(ancestorChanges, αChanges...) 1214 ancestorChanges = append(ancestorChanges, βChanges...) 1215 if newAncestor, err := core.Apply(ancestor, ancestorChanges); err != nil { 1216 return errors.Wrap(err, "unable to propagate changes to ancestor") 1217 } else { 1218 ancestor = newAncestor 1219 } 1220 1221 // Validate the new ancestor before saving it to ensure that our 1222 // reconciliation logic doesn't have any flaws. This is the only time 1223 // that we validate a data structure generated by code in the same 1224 // process (usually our tests are our validation), but this case is 1225 // special because (a) our test cases can't cover every real world 1226 // condition that might arise and (b) if we write a broken ancestor to 1227 // disk, the session is toast. This safety check ensures that even if we 1228 // put out a broken release, or encounter some bizarre real world merge 1229 // case that we didn't consider, things can be fixed. 1230 if err := ancestor.EnsureValid(); err != nil { 1231 return errors.Wrap(err, "new ancestor is invalid") 1232 } 1233 1234 // Save the ancestor. 1235 archive.Root = ancestor 1236 if err := encoding.MarshalAndSaveProtobuf(c.archivePath, archive); err != nil { 1237 return errors.Wrap(err, "unable to save ancestor") 1238 } 1239 1240 // Now check for transition errors. 1241 if αTransitionErr != nil { 1242 return errors.Wrap(αTransitionErr, "unable to apply changes to alpha") 1243 } else if βTransitionErr != nil { 1244 return errors.Wrap(βTransitionErr, "unable to apply changes to beta") 1245 } 1246 1247 // If there were files missing from either endpoint's stager during the 1248 // transition operations, then there were likely concurrent 1249 // modifications during staging. If we see this, then skip polling and 1250 // attempt to run another synchronization cycle immediately, but only if 1251 // we're not already in a synchronization cycle that was forced due to 1252 // previously missing files. 1253 if (αMissingFiles || βMissingFiles) && !skippingPollingDueToMissingFiles { 1254 skipPolling = true 1255 skippingPollingDueToMissingFiles = true 1256 } else { 1257 skippingPollingDueToMissingFiles = false 1258 } 1259 1260 // Increment the synchronization cycle count. 1261 c.stateLock.Lock() 1262 c.state.SuccessfulSynchronizationCycles++ 1263 c.stateLock.Unlock() 1264 1265 // If a flush request triggered this synchronization cycle, then tell it 1266 // that the cycle has completed and remove it from our tracking. 1267 if flushRequest != nil { 1268 flushRequest <- nil 1269 flushRequest = nil 1270 } 1271 } 1272} 1273