1package sync 2 3import ( 4 "context" 5 6 "github.com/pkg/errors" 7 8 "github.com/spf13/cobra" 9 10 "github.com/mutagen-io/mutagen/cmd" 11 "github.com/mutagen-io/mutagen/cmd/mutagen/daemon" 12 "github.com/mutagen-io/mutagen/pkg/grpcutil" 13 "github.com/mutagen-io/mutagen/pkg/selection" 14 synchronizationsvc "github.com/mutagen-io/mutagen/pkg/service/synchronization" 15) 16 17// FlushWithLabelSelector is an orchestration convenience method that invokes 18// the flush command using the specified label selector. 19func FlushWithLabelSelector(labelSelector string, skipWait bool) error { 20 flushConfiguration.labelSelector = labelSelector 21 flushConfiguration.skipWait = skipWait 22 return flushMain(nil, nil) 23} 24 25// FlushWithSessionIdentifiers is an orchestration convenience method that 26// invokes the flush command with the specified session identifiers. 27func FlushWithSessionIdentifiers(sessions []string, skipWait bool) error { 28 flushConfiguration.skipWait = skipWait 29 return flushMain(nil, sessions) 30} 31 32func flushMain(command *cobra.Command, arguments []string) error { 33 // Create session selection specification. 34 selection := &selection.Selection{ 35 All: flushConfiguration.all, 36 Specifications: arguments, 37 LabelSelector: flushConfiguration.labelSelector, 38 } 39 if err := selection.EnsureValid(); err != nil { 40 return errors.Wrap(err, "invalid session selection specification") 41 } 42 43 // Connect to the daemon and defer closure of the connection. 44 daemonConnection, err := daemon.CreateClientConnection(true, true) 45 if err != nil { 46 return errors.Wrap(err, "unable to connect to daemon") 47 } 48 defer daemonConnection.Close() 49 50 // Create a session service client. 51 sessionService := synchronizationsvc.NewSynchronizationClient(daemonConnection) 52 53 // Invoke the session flush method. The stream will close when the 54 // associated context is cancelled. 55 ctx, cancel := context.WithCancel(context.Background()) 56 defer cancel() 57 stream, err := sessionService.Flush(ctx) 58 if err != nil { 59 return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to invoke flush") 60 } 61 62 // Send the initial request. 63 request := &synchronizationsvc.FlushRequest{ 64 Selection: selection, 65 SkipWait: flushConfiguration.skipWait, 66 } 67 if err := stream.Send(request); err != nil { 68 return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to send flush request") 69 } 70 71 // Create a status line printer. 72 statusLinePrinter := &cmd.StatusLinePrinter{} 73 74 // Receive and process responses until we're done. 75 for { 76 if response, err := stream.Recv(); err != nil { 77 statusLinePrinter.BreakIfNonEmpty() 78 return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "flush failed") 79 } else if err = response.EnsureValid(); err != nil { 80 return errors.Wrap(err, "invalid flush response received") 81 } else if response.Message == "" { 82 statusLinePrinter.Clear() 83 return nil 84 } else if response.Message != "" { 85 statusLinePrinter.Print(response.Message) 86 if err := stream.Send(&synchronizationsvc.FlushRequest{}); err != nil { 87 statusLinePrinter.BreakIfNonEmpty() 88 return errors.Wrap(grpcutil.PeelAwayRPCErrorLayer(err), "unable to send message response") 89 } 90 } 91 } 92} 93 94var flushCommand = &cobra.Command{ 95 Use: "flush [<session>...]", 96 Short: "Force a synchronization cycle", 97 RunE: flushMain, 98 SilenceUsage: true, 99} 100 101var flushConfiguration struct { 102 // help indicates whether or not to show help information and exit. 103 help bool 104 // all indicates whether or not all sessions should be flushed. 105 all bool 106 // labelSelector encodes a label selector to be used in identifying which 107 // sessions should be paused. 108 labelSelector string 109 // skipWait indicates whether or not the flush operation should block until 110 // a synchronization cycle completes for each sesion requested. 111 skipWait bool 112} 113 114func init() { 115 // Grab a handle for the command line flags. 116 flags := flushCommand.Flags() 117 118 // Disable alphabetical sorting of flags in help output. 119 flags.SortFlags = false 120 121 // Manually add a help flag to override the default message. Cobra will 122 // still implement its logic automatically. 123 flags.BoolVarP(&flushConfiguration.help, "help", "h", false, "Show help information") 124 125 // Wire up flush flags. 126 flags.BoolVarP(&flushConfiguration.all, "all", "a", false, "Flush all sessions") 127 flags.StringVar(&flushConfiguration.labelSelector, "label-selector", "", "Flush sessions matching the specified label selector") 128 flags.BoolVar(&flushConfiguration.skipWait, "skip-wait", false, "Avoid waiting for the resulting synchronization cycle(s) to complete") 129} 130