1package synchronization
2
3import (
4	"context"
5	"fmt"
6	"os"
7	"sync"
8	"time"
9
10	"github.com/pkg/errors"
11
12	"github.com/golang/protobuf/ptypes"
13
14	"github.com/mutagen-io/mutagen/pkg/encoding"
15	"github.com/mutagen-io/mutagen/pkg/logging"
16	"github.com/mutagen-io/mutagen/pkg/mutagen"
17	"github.com/mutagen-io/mutagen/pkg/prompting"
18	"github.com/mutagen-io/mutagen/pkg/state"
19	"github.com/mutagen-io/mutagen/pkg/synchronization/core"
20	"github.com/mutagen-io/mutagen/pkg/synchronization/rsync"
21	"github.com/mutagen-io/mutagen/pkg/url"
22)
23
24const (
25	// autoReconnectInterval is the period of time to wait before attempting an
26	// automatic reconnect after disconnection or a failed reconnect.
27	autoReconnectInterval = 15 * time.Second
28	// rescanWaitDuration is the period of time to wait before attempting to
29	// rescan after an ephemeral scan failure.
30	rescanWaitDuration = 5 * time.Second
31)
32
33// controller manages and executes a single session.
34type controller struct {
35	// logger is the controller logger.
36	logger *logging.Logger
37	// sessionPath is the path to the serialized session.
38	sessionPath string
39	// archivePath is the path to the serialized archive.
40	archivePath string
41	// stateLock guards and tracks changes to the session member's Paused field
42	// and the state member.
43	stateLock *state.TrackingLock
44	// session encodes the associated session metadata. It is considered static
45	// and safe for concurrent access except for its Paused field, for which the
46	// stateLock member should be held. It should be saved to disk any time it
47	// is modified.
48	session *Session
49	// mergedAlphaConfiguration is the alpha-specific configuration object
50	// (computed from the core configuration and alpha-specific overrides). It
51	// is considered static and safe for concurrent access. It is a derived
52	// field and not saved to disk.
53	mergedAlphaConfiguration *Configuration
54	// mergedBetaConfiguration is the beta-specific configuration object
55	// (computed from the core configuration and beta-specific overrides). It is
56	// considered static and safe for concurrent access. It is a derived field
57	// and not saved to disk.
58	mergedBetaConfiguration *Configuration
59	// state represents the current synchronization state.
60	state *State
61	// lifecycleLock guards setting of the disabled, cancel, flushRequests, and
62	// done members. Access to these members is allowed for the synchronization
63	// loop without holding the lock. Any code wishing to set these members
64	// should first acquire the lock, then cancel the synchronization loop, and
65	// wait for it to complete before making any such changes.
66	lifecycleLock sync.Mutex
67	// disabled indicates that no more changes to the synchronization loop
68	// lifecycle are allowed (i.e. no more synchronization loops can be started
69	// for this controller). This is used by terminate and shutdown. It should
70	// only be set to true once any existing synchronization loop has been
71	// stopped.
72	disabled bool
73	// cancel cancels the synchronization loop execution context. It should be
74	// nil if and only if there is no synchronization loop running.
75	cancel context.CancelFunc
76	// flushRequests is used pass flush requests to the synchronization loop. It
77	// is buffered, allowing a single request to be queued. All requests passed
78	// via this channel must be buffered and contain room for one error.
79	flushRequests chan chan error
80	// done will be closed by the current synchronization loop when it exits.
81	done chan struct{}
82}
83
84// newSession creates a new session and corresponding controller.
85func newSession(
86	ctx context.Context,
87	logger *logging.Logger,
88	tracker *state.Tracker,
89	identifier string,
90	alpha, beta *url.URL,
91	configuration, configurationAlpha, configurationBeta *Configuration,
92	name string,
93	labels map[string]string,
94	paused bool,
95	prompter string,
96) (*controller, error) {
97	// Update status.
98	prompting.Message(prompter, "Creating session...")
99
100	// Set the session version.
101	version := Version_Version1
102
103	// Compute the creation time and convert it to Protocol Buffers format.
104	creationTime := time.Now()
105	creationTimeProto, err := ptypes.TimestampProto(creationTime)
106	if err != nil {
107		return nil, errors.Wrap(err, "unable to convert creation time format")
108	}
109
110	// Compute merged endpoint configurations.
111	mergedAlphaConfiguration := MergeConfigurations(configuration, configurationAlpha)
112	mergedBetaConfiguration := MergeConfigurations(configuration, configurationBeta)
113
114	// If the session isn't being created paused, then try to connect to any
115	// endpoints not using the tunnel protocol. The tunnel protocol is the one
116	// case where we want to allow asynchronous connectivity (since it doesn't
117	// require user input but also isn't guaranteed to connect immediately). If
118	// we connect to endpoints here and don't hand them off to the runloop
119	// below, then defer their shutdown.
120	var alphaEndpoint, betaEndpoint Endpoint
121	defer func() {
122		if alphaEndpoint != nil {
123			alphaEndpoint.Shutdown()
124			alphaEndpoint = nil
125		}
126		if betaEndpoint != nil {
127			betaEndpoint.Shutdown()
128			betaEndpoint = nil
129		}
130	}()
131	if !paused && alpha.Protocol != url.Protocol_Tunnel {
132		logger.Info("Connecting to alpha endpoint")
133		alphaEndpoint, err = connect(
134			ctx,
135			logger.Sublogger("alpha"),
136			alpha,
137			prompter,
138			identifier,
139			version,
140			mergedAlphaConfiguration,
141			true,
142		)
143		if err != nil {
144			logger.Info("Alpha connection failure:", err)
145			return nil, errors.Wrap(err, "unable to connect to alpha")
146		}
147	}
148	if !paused && beta.Protocol != url.Protocol_Tunnel {
149		logger.Info("Connecting to beta endpoint")
150		betaEndpoint, err = connect(
151			ctx,
152			logger.Sublogger("beta"),
153			beta,
154			prompter,
155			identifier,
156			version,
157			mergedBetaConfiguration,
158			false,
159		)
160		if err != nil {
161			logger.Info("Beta connection failure:", err)
162			return nil, errors.Wrap(err, "unable to connect to beta")
163		}
164	}
165
166	// Create the session and initial archive.
167	session := &Session{
168		Identifier:           identifier,
169		Version:              version,
170		CreationTime:         creationTimeProto,
171		CreatingVersionMajor: mutagen.VersionMajor,
172		CreatingVersionMinor: mutagen.VersionMinor,
173		CreatingVersionPatch: mutagen.VersionPatch,
174		Alpha:                alpha,
175		Beta:                 beta,
176		Configuration:        configuration,
177		ConfigurationAlpha:   configurationAlpha,
178		ConfigurationBeta:    configurationBeta,
179		Name:                 name,
180		Labels:               labels,
181		Paused:               paused,
182	}
183	archive := &core.Archive{}
184
185	// Compute the session and archive paths.
186	sessionPath, err := pathForSession(session.Identifier)
187	if err != nil {
188		return nil, errors.Wrap(err, "unable to compute session path")
189	}
190	archivePath, err := pathForArchive(session.Identifier)
191	if err != nil {
192		return nil, errors.Wrap(err, "unable to compute archive path")
193	}
194
195	// Save components to disk.
196	if err := encoding.MarshalAndSaveProtobuf(sessionPath, session); err != nil {
197		return nil, errors.Wrap(err, "unable to save session")
198	}
199	if err := encoding.MarshalAndSaveProtobuf(archivePath, archive); err != nil {
200		os.Remove(sessionPath)
201		return nil, errors.Wrap(err, "unable to save archive")
202	}
203
204	// Create the controller.
205	controller := &controller{
206		logger:                   logger,
207		sessionPath:              sessionPath,
208		archivePath:              archivePath,
209		stateLock:                state.NewTrackingLock(tracker),
210		session:                  session,
211		mergedAlphaConfiguration: mergedAlphaConfiguration,
212		mergedBetaConfiguration:  mergedBetaConfiguration,
213		state: &State{
214			Session: session,
215		},
216	}
217
218	// If the session isn't being created pre-paused, then start a
219	// synchronization loop and mark the endpoints as handed off to that loop so
220	// that we don't defer their shutdown.
221	if !paused {
222		logger.Info("Starting synchronization loop")
223		ctx, cancel := context.WithCancel(context.Background())
224		controller.cancel = cancel
225		controller.flushRequests = make(chan chan error, 1)
226		controller.done = make(chan struct{})
227		go controller.run(ctx, alphaEndpoint, betaEndpoint)
228		alphaEndpoint = nil
229		betaEndpoint = nil
230	}
231
232	// Success.
233	logger.Info("Session initialized")
234	return controller, nil
235}
236
237// loadSession loads an existing session and creates a corresponding controller.
238func loadSession(logger *logging.Logger, tracker *state.Tracker, identifier string) (*controller, error) {
239	// Compute session and archive paths.
240	sessionPath, err := pathForSession(identifier)
241	if err != nil {
242		return nil, errors.Wrap(err, "unable to compute session path")
243	}
244	archivePath, err := pathForArchive(identifier)
245	if err != nil {
246		return nil, errors.Wrap(err, "unable to compute archive path")
247	}
248
249	// Load and validate the session. We have to populate a few optional fields
250	// before validation if they're not set. We can't do this in the Session
251	// literal because they'll be wiped out during unmarshalling, even if not
252	// set.
253	session := &Session{}
254	if err := encoding.LoadAndUnmarshalProtobuf(sessionPath, session); err != nil {
255		return nil, errors.Wrap(err, "unable to load session configuration")
256	}
257	if session.ConfigurationAlpha == nil {
258		session.ConfigurationAlpha = &Configuration{}
259	}
260	if session.ConfigurationBeta == nil {
261		session.ConfigurationBeta = &Configuration{}
262	}
263	if err := session.EnsureValid(); err != nil {
264		return nil, errors.Wrap(err, "invalid session found on disk")
265	}
266
267	// Create the controller.
268	controller := &controller{
269		logger:      logger,
270		sessionPath: sessionPath,
271		archivePath: archivePath,
272		stateLock:   state.NewTrackingLock(tracker),
273		session:     session,
274		mergedAlphaConfiguration: MergeConfigurations(
275			session.Configuration,
276			session.ConfigurationAlpha,
277		),
278		mergedBetaConfiguration: MergeConfigurations(
279			session.Configuration,
280			session.ConfigurationBeta,
281		),
282		state: &State{
283			Session: session,
284		},
285	}
286
287	// If the session isn't marked as paused, start a synchronization loop.
288	if !session.Paused {
289		ctx, cancel := context.WithCancel(context.Background())
290		controller.cancel = cancel
291		controller.flushRequests = make(chan chan error, 1)
292		controller.done = make(chan struct{})
293		go controller.run(ctx, nil, nil)
294	}
295
296	// Success.
297	logger.Info("Session loaded")
298	return controller, nil
299}
300
301// currentState creates a snapshot of the current session state.
302func (c *controller) currentState() *State {
303	// Lock the session state and defer its release. It's very important that we
304	// unlock without a notification here, otherwise we'd trigger an infinite
305	// cycle of list/notify.
306	c.stateLock.Lock()
307	defer c.stateLock.UnlockWithoutNotify()
308
309	// Perform a (pseudo) deep copy of the state.
310	return c.state.Copy()
311}
312
313// flush attempts to force a synchronization cycle for the session. If wait is
314// specified, then the method will wait until a post-flush synchronization cycle
315// has completed. The provided context (which must be non-nil) can terminate
316// this wait early.
317func (c *controller) flush(ctx context.Context, prompter string, skipWait bool) error {
318	// Update status.
319	prompting.Message(prompter, fmt.Sprintf("Forcing synchronization cycle for session %s...", c.session.Identifier))
320
321	// Lock the controller's lifecycle and defer its release.
322	c.lifecycleLock.Lock()
323	defer c.lifecycleLock.Unlock()
324
325	// Don't allow any operations if the controller is disabled.
326	if c.disabled {
327		return errors.New("controller disabled")
328	}
329
330	// Check if the session is paused by checking whether or not it has a
331	// synchronization loop. It's an internal invariant that a session has a
332	// synchronization loop if and only if it is not paused, but checking for
333	// paused status requires holding the state lock, whereas checking for the
334	// existence of a synchronization loop only requires holding the lifecycle
335	// lock, which we do at this point.
336	if c.cancel == nil {
337		return errors.New("session is paused")
338	}
339
340	// Create a flush request.
341	request := make(chan error, 1)
342
343	// If we don't want to wait, then we can simply send the request in a
344	// non-blocking manner, in which case either this request (or one that's
345	// already queued) will be processed eventually. After that, we're done.
346	if skipWait {
347		// Send the request in a non-blocking manner.
348		select {
349		case c.flushRequests <- request:
350		default:
351		}
352
353		// Success.
354		return nil
355	}
356
357	// Otherwise we need to send the request in a blocking manner, watching for
358	// cancellation in the mean time.
359	select {
360	case c.flushRequests <- request:
361	case <-ctx.Done():
362		return errors.New("flush cancelled before request could be sent")
363	}
364
365	// Now we need to wait for a response to the request, again watching for
366	// cancellation in the mean time.
367	select {
368	case err := <-request:
369		if err != nil {
370			return err
371		}
372	case <-ctx.Done():
373		return errors.New("flush cancelled while waiting for synchronization cycle")
374	}
375
376	// Success.
377	return nil
378}
379
380// resume attempts to reconnect and resume the session if it isn't currently
381// connected and synchronizing. If lifecycleLockHeld is true, then halt will
382// assume that the lifecycle lock is held by the caller and will not attempt to
383// acquire it.
384func (c *controller) resume(ctx context.Context, prompter string, lifecycleLockHeld bool) error {
385	// Update status.
386	prompting.Message(prompter, fmt.Sprintf("Resuming session %s...", c.session.Identifier))
387
388	// If not already held, acquire the lifecycle lock and defer its release.
389	if !lifecycleLockHeld {
390		c.lifecycleLock.Lock()
391		defer c.lifecycleLock.Unlock()
392	}
393
394	// Don't allow any resume operations if the controller is disabled.
395	if c.disabled {
396		return errors.New("controller disabled")
397	}
398
399	// Check if there's an existing synchronization loop (i.e. if the session is
400	// unpaused).
401	if c.cancel != nil {
402		// If there is an existing synchronization loop, check if it's already
403		// in a state that's considered "connected".
404		c.stateLock.Lock()
405		connected := c.state.Status >= Status_Watching
406		c.stateLock.UnlockWithoutNotify()
407
408		// If we're already connected, then there's nothing we need to do. We
409		// don't even need to mark the session as unpaused because it can't be
410		// marked as paused if an existing synchronization loop is running (we
411		// enforce this invariant as part of the controller's logic).
412		if connected {
413			return nil
414		}
415
416		// Otherwise, cancel the existing synchronization loop and wait for it
417		// to finish.
418		//
419		// There's something of an efficiency race condition here, because the
420		// existing loop might succeed in connecting between the time we check
421		// and the time we cancel it. That could happen if an auto-reconnect
422		// succeeds or even if the loop was already passed connections and it's
423		// just hasn't updated its status yet. But the only danger here is
424		// basically wasting those connections, and the window is very small.
425		c.cancel()
426		<-c.done
427
428		// Nil out any lifecycle state.
429		c.cancel = nil
430		c.flushRequests = nil
431		c.done = nil
432	}
433
434	// Mark the session as unpaused and save it to disk.
435	c.stateLock.Lock()
436	c.session.Paused = false
437	saveErr := encoding.MarshalAndSaveProtobuf(c.sessionPath, c.session)
438	c.stateLock.Unlock()
439
440	// Attempt to connect to alpha.
441	c.stateLock.Lock()
442	c.state.Status = Status_ConnectingAlpha
443	c.stateLock.Unlock()
444	alpha, alphaConnectErr := connect(
445		ctx,
446		c.logger.Sublogger("alpha"),
447		c.session.Alpha,
448		prompter,
449		c.session.Identifier,
450		c.session.Version,
451		c.mergedAlphaConfiguration,
452		true,
453	)
454	c.stateLock.Lock()
455	c.state.AlphaConnected = (alpha != nil)
456	c.stateLock.Unlock()
457
458	// Attempt to connect to beta.
459	c.stateLock.Lock()
460	c.state.Status = Status_ConnectingBeta
461	c.stateLock.Unlock()
462	beta, betaConnectErr := connect(
463		ctx,
464		c.logger.Sublogger("beta"),
465		c.session.Beta,
466		prompter,
467		c.session.Identifier,
468		c.session.Version,
469		c.mergedBetaConfiguration,
470		false,
471	)
472	c.stateLock.Lock()
473	c.state.BetaConnected = (beta != nil)
474	c.stateLock.Unlock()
475
476	// Start the synchronization loop with what we have. Alpha or beta may have
477	// failed to connect (and be nil), but in any case that'll just make the run
478	// loop keep trying to connect.
479	ctx, cancel := context.WithCancel(context.Background())
480	c.cancel = cancel
481	c.flushRequests = make(chan chan error, 1)
482	c.done = make(chan struct{})
483	go c.run(ctx, alpha, beta)
484
485	// Report any errors. Since we always want to start a synchronization loop,
486	// even on partial or complete failure (since it might be able to
487	// auto-reconnect on its own), we wait until the end to report errors.
488	if saveErr != nil {
489		return errors.Wrap(saveErr, "unable to save session")
490	} else if alphaConnectErr != nil {
491		return errors.Wrap(alphaConnectErr, "unable to connect to alpha")
492	} else if betaConnectErr != nil {
493		return errors.Wrap(betaConnectErr, "unable to connect to beta")
494	}
495
496	// Success.
497	return nil
498}
499
500// controllerHaltMode represents the behavior to use when halting a session.
501type controllerHaltMode uint8
502
503const (
504	// controllerHaltModePause indicates that a session should be halted and
505	// marked as paused.
506	controllerHaltModePause controllerHaltMode = iota
507	// controllerHaltModeShutdown indicates that a session should be halted.
508	controllerHaltModeShutdown
509	// controllerHaltModeShutdown indicates that a session should be halted and
510	// then deleted.
511	controllerHaltModeTerminate
512)
513
514// description returns a human-readable description of a halt mode.
515func (m controllerHaltMode) description() string {
516	switch m {
517	case controllerHaltModePause:
518		return "Pausing"
519	case controllerHaltModeShutdown:
520		return "Shutting down"
521	case controllerHaltModeTerminate:
522		return "Terminating"
523	default:
524		panic("unhandled halt mode")
525	}
526}
527
528// halt halts the session with the specified behavior. If lifecycleLockHeld is
529// true, then halt will assume that the lifecycle lock is held by the caller and
530// will not attempt to acquire it.
531func (c *controller) halt(_ context.Context, mode controllerHaltMode, prompter string, lifecycleLockHeld bool) error {
532	// Update status.
533	prompting.Message(prompter, fmt.Sprintf("%s session %s...", mode.description(), c.session.Identifier))
534
535	// If not already held, acquire the lifecycle lock and defer its release.
536	if !lifecycleLockHeld {
537		c.lifecycleLock.Lock()
538		defer c.lifecycleLock.Unlock()
539	}
540
541	// Don't allow any additional halt operations if the controller is disabled,
542	// because either this session is being terminated or the service is
543	// shutting down, and in either case there is no point in halting.
544	if c.disabled {
545		return errors.New("controller disabled")
546	}
547
548	// Kill any existing synchronization loop.
549	if c.cancel != nil {
550		// Cancel the synchronization loop and wait for it to finish.
551		c.cancel()
552		<-c.done
553
554		// Nil out any lifecycle state.
555		c.cancel = nil
556		c.flushRequests = nil
557		c.done = nil
558	}
559
560	// Handle based on the halt mode.
561	if mode == controllerHaltModePause {
562		// Mark the session as paused and save it.
563		c.stateLock.Lock()
564		c.session.Paused = true
565		saveErr := encoding.MarshalAndSaveProtobuf(c.sessionPath, c.session)
566		c.stateLock.Unlock()
567		if saveErr != nil {
568			return errors.Wrap(saveErr, "unable to save session")
569		}
570	} else if mode == controllerHaltModeShutdown {
571		// Disable the controller.
572		c.disabled = true
573	} else if mode == controllerHaltModeTerminate {
574		// Disable the controller.
575		c.disabled = true
576
577		// Wipe the session information from disk.
578		sessionRemoveErr := os.Remove(c.sessionPath)
579		archiveRemoveErr := os.Remove(c.archivePath)
580		if sessionRemoveErr != nil {
581			return errors.Wrap(sessionRemoveErr, "unable to remove session from disk")
582		} else if archiveRemoveErr != nil {
583			return errors.Wrap(archiveRemoveErr, "unable to remove archive from disk")
584		}
585	} else {
586		panic("invalid halt mode specified")
587	}
588
589	// Success.
590	return nil
591}
592
593// reset resets synchronization session history by pausing the session (if it's
594// running), overwriting the ancestor data stored on disk with an empty
595// ancestor, and then resuming the session (if it was previously running).
596func (c *controller) reset(ctx context.Context, prompter string) error {
597	// Lock the controller's lifecycle and defer its release.
598	c.lifecycleLock.Lock()
599	defer c.lifecycleLock.Unlock()
600
601	// Check if the session is currently running.
602	running := c.cancel != nil
603
604	// If the session is running, pause it.
605	if running {
606		if err := c.halt(ctx, controllerHaltModePause, prompter, true); err != nil {
607			return fmt.Errorf("unable to pause session: %w", err)
608		}
609	}
610
611	// Reset the session archive on disk.
612	archive := &core.Archive{}
613	if err := encoding.MarshalAndSaveProtobuf(c.archivePath, archive); err != nil {
614		return fmt.Errorf("unable to clear session history: %w", err)
615	}
616
617	// Resume the session if it was previously running.
618	if running {
619		if err := c.resume(ctx, prompter, true); err != nil {
620			return fmt.Errorf("unable to resume session: %w", err)
621		}
622	}
623
624	// Success.
625	return nil
626}
627
628// run is the main runloop for the controller, managing connectivity and
629// synchronization.
630func (c *controller) run(ctx context.Context, alpha, beta Endpoint) {
631	// Defer resource and state cleanup.
632	defer func() {
633		// Shutdown any endpoints. These might be non-nil if the runloop was
634		// cancelled while partially connected rather than after sync failure.
635		if alpha != nil {
636			alpha.Shutdown()
637		}
638		if beta != nil {
639			beta.Shutdown()
640		}
641
642		// Reset the state.
643		c.stateLock.Lock()
644		c.state = &State{
645			Session: c.session,
646		}
647		c.stateLock.Unlock()
648
649		// Signal completion.
650		close(c.done)
651	}()
652
653	// Track the last time that synchronization failed.
654	var lastSynchronizationFailureTime time.Time
655
656	// Loop until cancelled.
657	for {
658		// Loop until we're connected to both endpoints. We do a non-blocking
659		// check for cancellation on each reconnect error so that we don't waste
660		// resources by trying another connect when the context has been
661		// cancelled (it'll be wasteful). This is better than sentinel errors.
662		for {
663			// Ensure that alpha is connected.
664			if alpha == nil {
665				c.stateLock.Lock()
666				c.state.Status = Status_ConnectingAlpha
667				c.stateLock.Unlock()
668				alpha, _ = connect(
669					ctx,
670					c.logger.Sublogger("alpha"),
671					c.session.Alpha,
672					"",
673					c.session.Identifier,
674					c.session.Version,
675					c.mergedAlphaConfiguration,
676					true,
677				)
678			}
679			c.stateLock.Lock()
680			c.state.AlphaConnected = (alpha != nil)
681			c.stateLock.Unlock()
682
683			// Check for cancellation to avoid a spurious connection to beta in
684			// case cancellation occurred while connecting to alpha.
685			select {
686			case <-ctx.Done():
687				return
688			default:
689			}
690
691			// Ensure that beta is connected.
692			if beta == nil {
693				c.stateLock.Lock()
694				c.state.Status = Status_ConnectingBeta
695				c.stateLock.Unlock()
696				beta, _ = connect(
697					ctx,
698					c.logger.Sublogger("beta"),
699					c.session.Beta,
700					"",
701					c.session.Identifier,
702					c.session.Version,
703					c.mergedBetaConfiguration,
704					false,
705				)
706			}
707			c.stateLock.Lock()
708			c.state.BetaConnected = (beta != nil)
709			c.stateLock.Unlock()
710
711			// If both endpoints are connected, we're done. We perform this
712			// check here (rather than in the loop condition) because if we did
713			// it in the loop condition we'd still need a check here to avoid a
714			// sleep every time (even if already successfully connected).
715			if alpha != nil && beta != nil {
716				break
717			}
718
719			// If we failed to connect, wait and then retry. Watch for
720			// cancellation in the mean time.
721			select {
722			case <-ctx.Done():
723				return
724			case <-time.After(autoReconnectInterval):
725			}
726		}
727
728		// Perform synchronization.
729		err := c.synchronize(ctx, alpha, beta)
730
731		// Shutdown the endpoints.
732		alpha.Shutdown()
733		alpha = nil
734		beta.Shutdown()
735		beta = nil
736
737		// Reset the synchronization state, but propagate the error that caused
738		// failure.
739		c.stateLock.Lock()
740		c.state = &State{
741			Session:   c.session,
742			LastError: err.Error(),
743		}
744		c.stateLock.Unlock()
745
746		// When synchronization fails, we generally want to restart it as
747		// quickly as possible. Thus, if it's been longer than our usual waiting
748		// period since synchronization failed last, simply try to reconnect
749		// immediately (though still check for cancellation). If it's been less
750		// than our usual waiting period since synchronization failed last, then
751		// something is probably wrong, so wait for our usual waiting period
752		// (while checking and monitoring for cancellation).
753		now := time.Now()
754		if now.Sub(lastSynchronizationFailureTime) >= autoReconnectInterval {
755			select {
756			case <-ctx.Done():
757				return
758			default:
759			}
760		} else {
761			select {
762			case <-ctx.Done():
763				return
764			case <-time.After(autoReconnectInterval):
765			}
766		}
767		lastSynchronizationFailureTime = now
768	}
769}
770
771// synchronize is the main synchronization loop for the controller.
772func (c *controller) synchronize(ctx context.Context, alpha, beta Endpoint) error {
773	// Clear any error state upon restart of this function. If there was a
774	// terminal error previously caused synchronization to fail, then the user
775	// will have had time to review it (while the run loop is waiting to
776	// reconnect), so it's not like we're getting rid of it too quickly.
777	c.stateLock.Lock()
778	if c.state.LastError != "" {
779		c.state.LastError = ""
780		c.stateLock.Unlock()
781	} else {
782		c.stateLock.UnlockWithoutNotify()
783	}
784
785	// Track any flush request that we've pulled from the queue but haven't
786	// marked as complete. If we bail due to an error, then close out the
787	// request.
788	var flushRequest chan error
789	defer func() {
790		if flushRequest != nil {
791			flushRequest <- errors.New("synchronization cycle failed")
792			flushRequest = nil
793		}
794	}()
795
796	// Load the archive and extract the ancestor.
797	archive := &core.Archive{}
798	if err := encoding.LoadAndUnmarshalProtobuf(c.archivePath, archive); err != nil {
799		return errors.Wrap(err, "unable to load archive")
800	} else if err = archive.Root.EnsureValid(); err != nil {
801		return errors.Wrap(err, "invalid archive found on disk")
802	}
803	ancestor := archive.Root
804
805	// Compute the effective synchronization mode.
806	synchronizationMode := c.session.Configuration.SynchronizationMode
807	if synchronizationMode.IsDefault() {
808		synchronizationMode = c.session.Version.DefaultSynchronizationMode()
809	}
810
811	// Compute, on a per-endpoint basis, whether or not polling should be
812	// disabled.
813	αWatchMode := c.mergedAlphaConfiguration.WatchMode
814	βWatchMode := c.mergedBetaConfiguration.WatchMode
815	if αWatchMode.IsDefault() {
816		αWatchMode = c.session.Version.DefaultWatchMode()
817	}
818	if βWatchMode.IsDefault() {
819		βWatchMode = c.session.Version.DefaultWatchMode()
820	}
821	αDisablePolling := (αWatchMode == WatchMode_WatchModeNoWatch)
822	βDisablePolling := (βWatchMode == WatchMode_WatchModeNoWatch)
823
824	// Create a switch that will allow us to skip polling and force a
825	// synchronization cycle. On startup, we enable this switch and skip polling
826	// to immediately force a check for changes that may have occurred while the
827	// synchronization loop wasn't running. The only time we don't force this
828	// check on startup is when both endpoints have polling disabled, which is
829	// an indication that the session should operate in a fully manual mode.
830	skipPolling := (!αDisablePolling || !βDisablePolling)
831
832	// Create variables to track our reasons for skipping polling.
833	var skippingPollingDueToScanError, skippingPollingDueToMissingFiles bool
834
835	// Loop until there is a synchronization error.
836	for {
837		// Unless we've been requested to skip polling, wait for a dirty state
838		// while monitoring for cancellation. If we've been requested to skip
839		// polling, it should only be for one iteration.
840		if !skipPolling {
841			// Update status to watching.
842			c.stateLock.Lock()
843			c.state.Status = Status_Watching
844			c.stateLock.Unlock()
845
846			// Create a polling context that we can cancel. We don't make it a
847			// subcontext of our own cancellation context because it's easier to
848			// just track cancellation there separately.
849			pollCtx, pollCancel := context.WithCancel(context.Background())
850
851			// Start alpha polling. If alpha has been put into a no-watch mode,
852			// then we still perform polling in order to detect transport errors
853			// that might occur while the session is sitting idle, but we ignore
854			// any non-error responses and instead wait for the polling context
855			// to be cancelled. We perform this ignore operation because we
856			// don't want a broken or malicious endpoint to be able to force
857			// synchronization, especially if its watching has been
858			// intentionally disabled.
859			//
860			// It's worth noting that, because a well-behaved endpoint in
861			// no-watch mode never returns events, we'll always be polling on it
862			// (and thereby testing the transport) right up until the polling
863			// context is cancelled. Thus, there's no need to worry about cases
864			// where the endpoint sends back an event that we ignore and then
865			// has a transport failure without us noticing while we wait on the
866			// polling context (at least not for well-behaved endpoints).
867			αPollResults := make(chan error, 1)
868			go func() {
869				if αDisablePolling {
870					if err := alpha.Poll(pollCtx); err != nil {
871						αPollResults <- err
872					} else {
873						<-pollCtx.Done()
874						αPollResults <- nil
875					}
876				} else {
877					αPollResults <- alpha.Poll(pollCtx)
878				}
879			}()
880
881			// Start beta polling. The logic here mirrors that for alpha above.
882			βPollResults := make(chan error, 1)
883			go func() {
884				if βDisablePolling {
885					if err := beta.Poll(pollCtx); err != nil {
886						βPollResults <- err
887					} else {
888						<-pollCtx.Done()
889						βPollResults <- nil
890					}
891				} else {
892					βPollResults <- beta.Poll(pollCtx)
893				}
894			}()
895
896			// Wait for either poll to return an event or an error, for a flush
897			// request, or for cancellation. In any of these cases, cancel
898			// polling and ensure that both polling operations have completed.
899			var αPollErr, βPollErr error
900			cancelled := false
901			select {
902			case αPollErr = <-αPollResults:
903				pollCancel()
904				βPollErr = <-βPollResults
905			case βPollErr = <-βPollResults:
906				pollCancel()
907				αPollErr = <-αPollResults
908			case flushRequest = <-c.flushRequests:
909				if cap(flushRequest) < 1 {
910					panic("unbuffered flush request")
911				}
912				pollCancel()
913				αPollErr = <-αPollResults
914				βPollErr = <-βPollResults
915			case <-ctx.Done():
916				cancelled = true
917				pollCancel()
918				αPollErr = <-αPollResults
919				βPollErr = <-βPollResults
920			}
921
922			// Watch for errors or cancellation.
923			if cancelled {
924				return errors.New("cancelled during polling")
925			} else if αPollErr != nil {
926				return errors.WrapPollErr, "alpha polling error")
927			} else if βPollErr != nil {
928				return errors.WrapPollErr, "beta polling error")
929			}
930		} else {
931			skipPolling = false
932		}
933
934		// Scan both endpoints in parallel and check for errors. If a flush
935		// request is present, then force both endpoints to perform a full
936		// (warm) re-scan rather than using acceleration.
937		c.stateLock.Lock()
938		c.state.Status = Status_Scanning
939		c.stateLock.Unlock()
940		forceFullScan := flushRequest != nil
941		var αSnapshot, βSnapshot *core.Entry
942		var αPreservesExecutability, βPreservesExecutability bool
943		var αScanErr, βScanErr error
944		var αTryAgain, βTryAgain bool
945		scanDone := &sync.WaitGroup{}
946		scanDone.Add(2)
947		go func() {
948			αSnapshot, αPreservesExecutability, αScanErr, αTryAgain = alpha.Scan(ctx, ancestor, forceFullScan)
949			scanDone.Done()
950		}()
951		go func() {
952			βSnapshot, βPreservesExecutability, βScanErr, βTryAgain = beta.Scan(ctx, ancestor, forceFullScan)
953			scanDone.Done()
954		}()
955		scanDone.Wait()
956
957		// Check if cancellation occurred during scanning.
958		select {
959		case <-ctx.Done():
960			return errors.New("cancelled during scanning")
961		default:
962		}
963
964		// Check for scan errors.
965		if αScanErr != nil {
966			αScanErr = errors.WrapScanErr, "alpha scan error")
967			ifTryAgain {
968				return αScanErr
969			} else {
970				c.stateLock.Lock()
971				c.state.LastError = αScanErr.Error()
972				c.stateLock.Unlock()
973			}
974		}
975		if βScanErr != nil {
976			βScanErr = errors.WrapScanErr, "beta scan error")
977			ifTryAgain {
978				return βScanErr
979			} else {
980				c.stateLock.Lock()
981				c.state.LastError = βScanErr.Error()
982				c.stateLock.Unlock()
983			}
984		}
985
986		// Watch for retry recommendations from scan operations. These occur
987		// when a scan fails and concurrent modifications are suspected as the
988		// culprit. In these cases, we force another synchronization cycle. Note
989		// that, because we skip polling, our flush request, if any, will still
990		// be valid, and we'll be able to respond to it once a successful
991		// synchronization cycle completes.
992		//
993		// TODO: Should we eventually abort synchronization after a certain
994		// number of consecutive scan retries?
995		if αTryAgain || βTryAgain {
996			// If we're already in a synchronization cycle that was forced due
997			// to a previous scan error, and we've now received another retry
998			// recommendation, then wait before attempting a rescan.
999			if skippingPollingDueToScanError {
1000				// Update status to waiting for rescan.
1001				c.stateLock.Lock()
1002				c.state.Status = Status_WaitingForRescan
1003				c.stateLock.Unlock()
1004
1005				// Wait before trying to rescan, but watch for cancellation.
1006				select {
1007				case <-time.After(rescanWaitDuration):
1008				case <-ctx.Done():
1009					return errors.New("cancelled during rescan wait")
1010				}
1011			}
1012
1013			// Retry.
1014			skipPolling = true
1015			skippingPollingDueToScanError = true
1016			continue
1017		}
1018		skippingPollingDueToScanError = false
1019
1020		// Clear the last error (if any) after a successful scan. Since scan
1021		// errors are the only non-terminal errors, and since we know that we've
1022		// cleared any other terminal error at the entry to this loop, we know
1023		// that what we're covering up here can only be a scan error.
1024		c.stateLock.Lock()
1025		if c.state.LastError != "" {
1026			c.state.LastError = ""
1027			c.stateLock.Unlock()
1028		} else {
1029			c.stateLock.UnlockWithoutNotify()
1030		}
1031
1032		// If one side preserves executability and the other does not, then
1033		// propagate executability from the preserving side to the
1034		// non-preserving side.
1035		if αPreservesExecutability && !βPreservesExecutability {
1036			βSnapshot = core.PropagateExecutability(ancestor, αSnapshot, βSnapshot)
1037		} else if βPreservesExecutability && !αPreservesExecutability {
1038			αSnapshot = core.PropagateExecutability(ancestor, βSnapshot, αSnapshot)
1039		}
1040
1041		// Update status to reconciling.
1042		c.stateLock.Lock()
1043		c.state.Status = Status_Reconciling
1044		c.stateLock.Unlock()
1045
1046		// Check if the root is a directory that's been emptied (by deleting a
1047		// non-trivial amount of content) on one endpoint (but not both). This
1048		// can be intentional, but usually indicates that a non-persistent
1049		// filesystem (such as a container filesystem) is being used as the
1050		// synchronization root. In any case, we switch to a halted state and
1051		// wait for the user to either manually propagate the deletion and
1052		// resume the session, recreate the session, or reset the session.
1053		if oneEndpointEmptiedRoot(ancestor, αSnapshot, βSnapshot) {
1054			c.stateLock.Lock()
1055			c.state.Status = Status_HaltedOnRootEmptied
1056			c.stateLock.Unlock()
1057			<-ctx.Done()
1058			return errors.New("cancelled while halted on emptied root")
1059		}
1060
1061		// Perform reconciliation.
1062		ancestorChanges, αTransitions, βTransitions, conflicts := core.Reconcile(
1063			ancestor,
1064			αSnapshot,
1065			βSnapshot,
1066			synchronizationMode,
1067		)
1068
1069		// Create a slim copy of the conflicts so that we don't need to hold
1070		// the full-size versions in memory or send them over the wire.
1071		var slimConflicts []*core.Conflict
1072		if len(conflicts) > 0 {
1073			slimConflicts = make([]*core.Conflict, len(conflicts))
1074			for c, conflict := range conflicts {
1075				slimConflicts[c] = conflict.CopySlim()
1076			}
1077		}
1078		c.stateLock.Lock()
1079		c.state.Conflicts = slimConflicts
1080		c.stateLock.Unlock()
1081
1082		// Check if a root deletion operation is being propagated. This can be
1083		// intentional, accidental, or an indication of a non-persistent
1084		// filesystem (such as a container filesystem). In any case, we switch
1085		// to a halted state and wait for the user to either manually propagate
1086		// the deletion and resume the session, recreate the session, or reset
1087		// the session.
1088		if containsRootDeletionTransitions) || containsRootDeletionTransitions) {
1089			c.stateLock.Lock()
1090			c.state.Status = Status_HaltedOnRootDeletion
1091			c.stateLock.Unlock()
1092			<-ctx.Done()
1093			return errors.New("cancelled while halted on root deletion")
1094		}
1095
1096		// Check if a root type change is being propagated. This can be
1097		// intentional or accidental. In any case, we switch to a halted state
1098		// and wait for the user to manually delete the content that will be
1099		// overwritten by the type change and resume the session.
1100		if containsRootTypeChangeTransitions) || containsRootTypeChangeTransitions) {
1101			c.stateLock.Lock()
1102			c.state.Status = Status_HaltedOnRootTypeChange
1103			c.stateLock.Unlock()
1104			<-ctx.Done()
1105			return errors.New("cancelled while halted on root type change")
1106		}
1107
1108		// Create a monitoring callback for rsync staging.
1109		monitor := func(status *rsync.ReceiverStatus) error {
1110			c.stateLock.Lock()
1111			c.state.StagingStatus = status
1112			c.stateLock.Unlock()
1113			return nil
1114		}
1115
1116		// Stage files on alpha.
1117		c.stateLock.Lock()
1118		c.state.Status = Status_StagingAlpha
1119		c.stateLock.Unlock()
1120		if paths, digests, err := core.TransitionDependenciesTransitions); err != nil {
1121			return errors.Wrap(err, "unable to determine paths for staging on alpha")
1122		} else if len(paths) > 0 {
1123			filteredPaths, signatures, receiver, err := alpha.Stage(paths, digests)
1124			if err != nil {
1125				return errors.Wrap(err, "unable to begin staging on alpha")
1126			}
1127			if !filteredPathsAreSubset(filteredPaths, paths) {
1128				return errors.New("alpha returned incorrect subset of staging paths")
1129			}
1130			if len(filteredPaths) > 0 {
1131				receiver = rsync.NewMonitoringReceiver(receiver, filteredPaths, monitor)
1132				receiver = rsync.NewPreemptableReceiver(ctx, receiver)
1133				if err = beta.Supply(filteredPaths, signatures, receiver); err != nil {
1134					return errors.Wrap(err, "unable to stage files on alpha")
1135				}
1136			}
1137		}
1138
1139		// Stage files on beta.
1140		c.stateLock.Lock()
1141		c.state.Status = Status_StagingBeta
1142		c.stateLock.Unlock()
1143		if paths, digests, err := core.TransitionDependenciesTransitions); err != nil {
1144			return errors.Wrap(err, "unable to determine paths for staging on beta")
1145		} else if len(paths) > 0 {
1146			filteredPaths, signatures, receiver, err := beta.Stage(paths, digests)
1147			if err != nil {
1148				return errors.Wrap(err, "unable to begin staging on beta")
1149			}
1150			if !filteredPathsAreSubset(filteredPaths, paths) {
1151				return errors.New("beta returned incorrect subset of staging paths")
1152			}
1153			if len(filteredPaths) > 0 {
1154				receiver = rsync.NewMonitoringReceiver(receiver, filteredPaths, monitor)
1155				receiver = rsync.NewPreemptableReceiver(ctx, receiver)
1156				if err = alpha.Supply(filteredPaths, signatures, receiver); err != nil {
1157					return errors.Wrap(err, "unable to stage files on beta")
1158				}
1159			}
1160		}
1161
1162		// Perform transitions on both endpoints in parallel. For each side that
1163		// doesn't completely error out, convert its results to ancestor
1164		// changes. Transition errors are checked later, once the ancestor has
1165		// been updated.
1166		c.stateLock.Lock()
1167		c.state.Status = Status_Transitioning
1168		c.stateLock.Unlock()
1169		var αResults, βResults []*core.Entry
1170		var αProblems, βProblems []*core.Problem
1171		var αMissingFiles, βMissingFiles bool
1172		var αTransitionErr, βTransitionErr error
1173		var αChanges, βChanges []*core.Change
1174		transitionDone := &sync.WaitGroup{}
1175		if lenTransitions) > 0 {
1176			transitionDone.Add(1)
1177		}
1178		if lenTransitions) > 0 {
1179			transitionDone.Add(1)
1180		}
1181		if lenTransitions) > 0 {
1182			go func() {
1183				αResults, αProblems, αMissingFiles, αTransitionErr = alpha.Transition(ctx, αTransitions)
1184				if αTransitionErr == nil {
1185					for t, transition := range αTransitions {
1186						αChanges = appendChanges, &core.Change{Path: transition.Path, New: αResults[t]})
1187					}
1188				}
1189				transitionDone.Done()
1190			}()
1191		}
1192		if lenTransitions) > 0 {
1193			go func() {
1194				βResults, βProblems, βMissingFiles, βTransitionErr = beta.Transition(ctx, βTransitions)
1195				if βTransitionErr == nil {
1196					for t, transition := range βTransitions {
1197						βChanges = appendChanges, &core.Change{Path: transition.Path, New: βResults[t]})
1198					}
1199				}
1200				transitionDone.Done()
1201			}()
1202		}
1203		transitionDone.Wait()
1204
1205		// Record problems and then combine changes and propagate them to the
1206		// ancestor. Even if there were transition errors, this code is still
1207		// valid.
1208		c.stateLock.Lock()
1209		c.state.Status = Status_Saving
1210		c.state.AlphaProblems = αProblems
1211		c.state.BetaProblems = βProblems
1212		c.stateLock.Unlock()
1213		ancestorChanges = append(ancestorChanges, αChanges...)
1214		ancestorChanges = append(ancestorChanges, βChanges...)
1215		if newAncestor, err := core.Apply(ancestor, ancestorChanges); err != nil {
1216			return errors.Wrap(err, "unable to propagate changes to ancestor")
1217		} else {
1218			ancestor = newAncestor
1219		}
1220
1221		// Validate the new ancestor before saving it to ensure that our
1222		// reconciliation logic doesn't have any flaws. This is the only time
1223		// that we validate a data structure generated by code in the same
1224		// process (usually our tests are our validation), but this case is
1225		// special because (a) our test cases can't cover every real world
1226		// condition that might arise and (b) if we write a broken ancestor to
1227		// disk, the session is toast. This safety check ensures that even if we
1228		// put out a broken release, or encounter some bizarre real world merge
1229		// case that we didn't consider, things can be fixed.
1230		if err := ancestor.EnsureValid(); err != nil {
1231			return errors.Wrap(err, "new ancestor is invalid")
1232		}
1233
1234		// Save the ancestor.
1235		archive.Root = ancestor
1236		if err := encoding.MarshalAndSaveProtobuf(c.archivePath, archive); err != nil {
1237			return errors.Wrap(err, "unable to save ancestor")
1238		}
1239
1240		// Now check for transition errors.
1241		if αTransitionErr != nil {
1242			return errors.WrapTransitionErr, "unable to apply changes to alpha")
1243		} else if βTransitionErr != nil {
1244			return errors.WrapTransitionErr, "unable to apply changes to beta")
1245		}
1246
1247		// If there were files missing from either endpoint's stager during the
1248		// transition operations, then there were likely concurrent
1249		// modifications during staging. If we see this, then skip polling and
1250		// attempt to run another synchronization cycle immediately, but only if
1251		// we're not already in a synchronization cycle that was forced due to
1252		// previously missing files.
1253		ifMissingFiles || βMissingFiles) && !skippingPollingDueToMissingFiles {
1254			skipPolling = true
1255			skippingPollingDueToMissingFiles = true
1256		} else {
1257			skippingPollingDueToMissingFiles = false
1258		}
1259
1260		// Increment the synchronization cycle count.
1261		c.stateLock.Lock()
1262		c.state.SuccessfulSynchronizationCycles++
1263		c.stateLock.Unlock()
1264
1265		// If a flush request triggered this synchronization cycle, then tell it
1266		// that the cycle has completed and remove it from our tracking.
1267		if flushRequest != nil {
1268			flushRequest <- nil
1269			flushRequest = nil
1270		}
1271	}
1272}
1273