1package sync
2
3import (
4	"context"
5
6	"github.com/pkg/errors"
7
8	"github.com/spf13/cobra"
9
10	"github.com/mutagen-io/mutagen/cmd"
11	"github.com/mutagen-io/mutagen/cmd/mutagen/daemon"
12	"github.com/mutagen-io/mutagen/pkg/grpcutil"
13	"github.com/mutagen-io/mutagen/pkg/selection"
14	synchronizationsvc "github.com/mutagen-io/mutagen/pkg/service/synchronization"
15)
16
17// FlushWithLabelSelector is an orchestration convenience method that invokes
18// the flush command using the specified label selector.
19func FlushWithLabelSelector(labelSelector string, skipWait bool) error {
20	flushConfiguration.labelSelector = labelSelector
21	flushConfiguration.skipWait = skipWait
22	return flushMain(nil, nil)
23}
24
25// FlushWithSessionIdentifiers is an orchestration convenience method that
26// invokes the flush command with the specified session identifiers.
27func FlushWithSessionIdentifiers(sessions []string, skipWait bool) error {
28	flushConfiguration.skipWait = skipWait
29	return flushMain(nil, sessions)
30}
31
32func flushMain(command *cobra.Command, arguments []string) error {
33	// Create session selection specification.
34	selection := &selection.Selection{
35		All:            flushConfiguration.all,
36		Specifications: arguments,
37		LabelSelector:  flushConfiguration.labelSelector,
38	}
39	if err := selection.EnsureValid(); err != nil {
40		return errors.Wrap(err, "invalid session selection specification")
41	}
42
43	// Connect to the daemon and defer closure of the connection.
44	daemonConnection, err := daemon.CreateClientConnection(true, true)
45	if err != nil {
46		return errors.Wrap(err, "unable to connect to daemon")
47	}
48	defer daemonConnection.Close()
49
50	// Create a session service client.
51	sessionService := synchronizationsvc.NewSynchronizationClient(daemonConnection)
52
53	// Invoke the session flush method. The stream will close when the
54	// associated context is cancelled.
55	ctx, cancel := context.WithCancel(context.Background())
56	defer cancel()
57	stream, err := sessionService.Flush(ctx)
58	if err != nil {
59		return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to invoke flush")
60	}
61
62	// Send the initial request.
63	request := &synchronizationsvc.FlushRequest{
64		Selection: selection,
65		SkipWait:  flushConfiguration.skipWait,
66	}
67	if err := stream.Send(request); err != nil {
68		return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to send flush request")
69	}
70
71	// Create a status line printer.
72	statusLinePrinter := &cmd.StatusLinePrinter{}
73
74	// Receive and process responses until we're done.
75	for {
76		if response, err := stream.Recv(); err != nil {
77			statusLinePrinter.BreakIfNonEmpty()
78			return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "flush failed")
79		} else if err = response.EnsureValid(); err != nil {
80			return errors.Wrap(err, "invalid flush response received")
81		} else if response.Message == "" {
82			statusLinePrinter.Clear()
83			return nil
84		} else if response.Message != "" {
85			statusLinePrinter.Print(response.Message)
86			if err := stream.Send(&synchronizationsvc.FlushRequest{}); err != nil {
87				statusLinePrinter.BreakIfNonEmpty()
88				return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to send message response")
89			}
90		}
91	}
92}
93
94var flushCommand = &cobra.Command{
95	Use:          "flush [<session>...]",
96	Short:        "Force a synchronization cycle",
97	RunE:         flushMain,
98	SilenceUsage: true,
99}
100
101var flushConfiguration struct {
102	// help indicates whether or not to show help information and exit.
103	help bool
104	// all indicates whether or not all sessions should be flushed.
105	all bool
106	// labelSelector encodes a label selector to be used in identifying which
107	// sessions should be paused.
108	labelSelector string
109	// skipWait indicates whether or not the flush operation should block until
110	// a synchronization cycle completes for each sesion requested.
111	skipWait bool
112}
113
114func init() {
115	// Grab a handle for the command line flags.
116	flags := flushCommand.Flags()
117
118	// Disable alphabetical sorting of flags in help output.
119	flags.SortFlags = false
120
121	// Manually add a help flag to override the default message. Cobra will
122	// still implement its logic automatically.
123	flags.BoolVarP(&flushConfiguration.help, "help", "h", false, "Show help information")
124
125	// Wire up flush flags.
126	flags.BoolVarP(&flushConfiguration.all, "all", "a", false, "Flush all sessions")
127	flags.StringVar(&flushConfiguration.labelSelector, "label-selector", "", "Flush sessions matching the specified label selector")
128	flags.BoolVar(&flushConfiguration.skipWait, "skip-wait", false, "Avoid waiting for the resulting synchronization cycle(s) to complete")
129}
130