1package synchronization 2 3import ( 4 "context" 5 6 "github.com/pkg/errors" 7 8 "github.com/mutagen-io/mutagen/pkg/prompting" 9 "github.com/mutagen-io/mutagen/pkg/synchronization" 10) 11 12// Server provides an implementation of the Synchronization service. 13type Server struct { 14 // manager is the underlying session manager. 15 manager *synchronization.Manager 16} 17 18// NewServer creates a new session server. 19func NewServer(manager *synchronization.Manager) *Server { 20 return &Server{ 21 manager: manager, 22 } 23} 24 25// Create creates a new session. 26func (s *Server) Create(stream Synchronization_CreateServer) error { 27 // Receive and validate the request. 28 request, err := stream.Recv() 29 if err != nil { 30 return errors.Wrap(err, "unable to receive request") 31 } else if err = request.ensureValid(true); err != nil { 32 return errors.Wrap(err, "received invalid create request") 33 } 34 35 // Wrap the stream in a prompter and register it with the prompt server. 36 prompter, err := prompting.RegisterPrompter(&createStreamPrompter{stream}) 37 if err != nil { 38 return errors.Wrap(err, "unable to register prompter") 39 } 40 41 // Perform creation. 42 session, err := s.manager.Create( 43 stream.Context(), 44 request.Specification.Alpha, 45 request.Specification.Beta, 46 request.Specification.Configuration, 47 request.Specification.ConfigurationAlpha, 48 request.Specification.ConfigurationBeta, 49 request.Specification.Name, 50 request.Specification.Labels, 51 request.Specification.Paused, 52 prompter, 53 ) 54 55 // Unregister the prompter. 56 prompting.UnregisterPrompter(prompter) 57 58 // Handle any errors. 59 if err != nil { 60 return err 61 } 62 63 // Signal completion. 64 if err := stream.Send(&CreateResponse{Session: session}); err != nil { 65 return errors.Wrap(err, "unable to send response") 66 } 67 68 // Success. 69 return nil 70} 71 72// List queries session status. 73func (s *Server) List(ctx context.Context, request *ListRequest) (*ListResponse, error) { 74 // Validate the request. 75 if err := request.ensureValid(); err != nil { 76 return nil, errors.Wrap(err, "received invalid list request") 77 } 78 79 // Perform listing. 80 stateIndex, states, err := s.manager.List(ctx, request.Selection, request.PreviousStateIndex) 81 if err != nil { 82 return nil, err 83 } 84 85 // Success. 86 return &ListResponse{ 87 StateIndex: stateIndex, 88 SessionStates: states, 89 }, nil 90} 91 92// Flush flushes sessions. 93func (s *Server) Flush(stream Synchronization_FlushServer) error { 94 // Receive the first request. 95 request, err := stream.Recv() 96 if err != nil { 97 return errors.Wrap(err, "unable to receive request") 98 } else if err = request.ensureValid(true); err != nil { 99 return errors.Wrap(err, "received invalid flush request") 100 } 101 102 // Wrap the stream in a prompter and register it with the prompt server. 103 prompter, err := prompting.RegisterPrompter(&flushStreamPrompter{stream}) 104 if err != nil { 105 return errors.Wrap(err, "unable to register prompter") 106 } 107 108 // Perform flush(es). 109 err = s.manager.Flush(stream.Context(), request.Selection, prompter, request.SkipWait) 110 111 // Unregister the prompter. 112 prompting.UnregisterPrompter(prompter) 113 114 // Handle any errors. 115 if err != nil { 116 return err 117 } 118 119 // Signal completion. 120 if err := stream.Send(&FlushResponse{}); err != nil { 121 return errors.Wrap(err, "unable to send response") 122 } 123 124 // Success. 125 return nil 126} 127 128// Pause pauses sessions. 129func (s *Server) Pause(stream Synchronization_PauseServer) error { 130 // Receive the first request. 131 request, err := stream.Recv() 132 if err != nil { 133 return errors.Wrap(err, "unable to receive request") 134 } else if err = request.ensureValid(true); err != nil { 135 return errors.Wrap(err, "received invalid pause request") 136 } 137 138 // Wrap the stream in a prompter and register it with the prompt server. 139 prompter, err := prompting.RegisterPrompter(&pauseStreamPrompter{stream}) 140 if err != nil { 141 return errors.Wrap(err, "unable to register prompter") 142 } 143 144 // Perform pause(s). 145 err = s.manager.Pause(stream.Context(), request.Selection, prompter) 146 147 // Unregister the prompter. 148 prompting.UnregisterPrompter(prompter) 149 150 // Handle any errors. 151 if err != nil { 152 return err 153 } 154 155 // Signal completion. 156 if err := stream.Send(&PauseResponse{}); err != nil { 157 return errors.Wrap(err, "unable to send response") 158 } 159 160 // Success. 161 return nil 162} 163 164// Resume resumes sessions. 165func (s *Server) Resume(stream Synchronization_ResumeServer) error { 166 // Receive the first request. 167 request, err := stream.Recv() 168 if err != nil { 169 return errors.Wrap(err, "unable to receive request") 170 } else if err = request.ensureValid(true); err != nil { 171 return errors.Wrap(err, "received invalid resume request") 172 } 173 174 // Wrap the stream in a prompter and register it with the prompt server. 175 prompter, err := prompting.RegisterPrompter(&resumeStreamPrompter{stream}) 176 if err != nil { 177 return errors.Wrap(err, "unable to register prompter") 178 } 179 180 // Perform resume(s). 181 err = s.manager.Resume(stream.Context(), request.Selection, prompter) 182 183 // Unregister the prompter. 184 prompting.UnregisterPrompter(prompter) 185 186 // Handle any errors. 187 if err != nil { 188 return err 189 } 190 191 // Signal completion. 192 if err := stream.Send(&ResumeResponse{}); err != nil { 193 return errors.Wrap(err, "unable to send response") 194 } 195 196 // Success. 197 return nil 198} 199 200// Reset resets sessions. 201func (s *Server) Reset(stream Synchronization_ResetServer) error { 202 // Receive the first request. 203 request, err := stream.Recv() 204 if err != nil { 205 return errors.Wrap(err, "unable to receive request") 206 } else if err = request.ensureValid(true); err != nil { 207 return errors.Wrap(err, "received invalid reset request") 208 } 209 210 // Wrap the stream in a prompter and register it with the prompt server. 211 prompter, err := prompting.RegisterPrompter(&resetStreamPrompter{stream}) 212 if err != nil { 213 return errors.Wrap(err, "unable to register prompter") 214 } 215 216 // Perform reset(s). 217 err = s.manager.Reset(stream.Context(), request.Selection, prompter) 218 219 // Unregister the prompter. 220 prompting.UnregisterPrompter(prompter) 221 222 // Handle any errors. 223 if err != nil { 224 return err 225 } 226 227 // Signal completion. 228 if err := stream.Send(&ResetResponse{}); err != nil { 229 return errors.Wrap(err, "unable to send response") 230 } 231 232 // Success. 233 return nil 234} 235 236// Terminate terminates sessions. 237func (s *Server) Terminate(stream Synchronization_TerminateServer) error { 238 // Receive the first request. 239 request, err := stream.Recv() 240 if err != nil { 241 return errors.Wrap(err, "unable to receive request") 242 } else if err = request.ensureValid(true); err != nil { 243 return errors.Wrap(err, "received invalid terminate request") 244 } 245 246 // Wrap the stream in a prompter and register it with the prompt server. 247 prompter, err := prompting.RegisterPrompter(&terminateStreamPrompter{stream}) 248 if err != nil { 249 return errors.Wrap(err, "unable to register prompter") 250 } 251 252 // Perform termination. 253 err = s.manager.Terminate(stream.Context(), request.Selection, prompter) 254 255 // Unregister the prompter. 256 prompting.UnregisterPrompter(prompter) 257 258 // Handle any errors. 259 if err != nil { 260 return err 261 } 262 263 // Signal completion. 264 if err := stream.Send(&TerminateResponse{}); err != nil { 265 return errors.Wrap(err, "unable to send response") 266 } 267 268 // Success. 269 return nil 270} 271