1package synchronization
2
3import (
4	"context"
5
6	"github.com/pkg/errors"
7
8	"github.com/mutagen-io/mutagen/pkg/prompting"
9	"github.com/mutagen-io/mutagen/pkg/synchronization"
10)
11
12// Server provides an implementation of the Synchronization service.
13type Server struct {
14	// manager is the underlying session manager.
15	manager *synchronization.Manager
16}
17
18// NewServer creates a new session server.
19func NewServer(manager *synchronization.Manager) *Server {
20	return &Server{
21		manager: manager,
22	}
23}
24
25// Create creates a new session.
26func (s *Server) Create(stream Synchronization_CreateServer) error {
27	// Receive and validate the request.
28	request, err := stream.Recv()
29	if err != nil {
30		return errors.Wrap(err, "unable to receive request")
31	} else if err = request.ensureValid(true); err != nil {
32		return errors.Wrap(err, "received invalid create request")
33	}
34
35	// Wrap the stream in a prompter and register it with the prompt server.
36	prompter, err := prompting.RegisterPrompter(&createStreamPrompter{stream})
37	if err != nil {
38		return errors.Wrap(err, "unable to register prompter")
39	}
40
41	// Perform creation.
42	session, err := s.manager.Create(
43		stream.Context(),
44		request.Specification.Alpha,
45		request.Specification.Beta,
46		request.Specification.Configuration,
47		request.Specification.ConfigurationAlpha,
48		request.Specification.ConfigurationBeta,
49		request.Specification.Name,
50		request.Specification.Labels,
51		request.Specification.Paused,
52		prompter,
53	)
54
55	// Unregister the prompter.
56	prompting.UnregisterPrompter(prompter)
57
58	// Handle any errors.
59	if err != nil {
60		return err
61	}
62
63	// Signal completion.
64	if err := stream.Send(&CreateResponse{Session: session}); err != nil {
65		return errors.Wrap(err, "unable to send response")
66	}
67
68	// Success.
69	return nil
70}
71
72// List queries session status.
73func (s *Server) List(ctx context.Context, request *ListRequest) (*ListResponse, error) {
74	// Validate the request.
75	if err := request.ensureValid(); err != nil {
76		return nil, errors.Wrap(err, "received invalid list request")
77	}
78
79	// Perform listing.
80	stateIndex, states, err := s.manager.List(ctx, request.Selection, request.PreviousStateIndex)
81	if err != nil {
82		return nil, err
83	}
84
85	// Success.
86	return &ListResponse{
87		StateIndex:    stateIndex,
88		SessionStates: states,
89	}, nil
90}
91
92// Flush flushes sessions.
93func (s *Server) Flush(stream Synchronization_FlushServer) error {
94	// Receive the first request.
95	request, err := stream.Recv()
96	if err != nil {
97		return errors.Wrap(err, "unable to receive request")
98	} else if err = request.ensureValid(true); err != nil {
99		return errors.Wrap(err, "received invalid flush request")
100	}
101
102	// Wrap the stream in a prompter and register it with the prompt server.
103	prompter, err := prompting.RegisterPrompter(&flushStreamPrompter{stream})
104	if err != nil {
105		return errors.Wrap(err, "unable to register prompter")
106	}
107
108	// Perform flush(es).
109	err = s.manager.Flush(stream.Context(), request.Selection, prompter, request.SkipWait)
110
111	// Unregister the prompter.
112	prompting.UnregisterPrompter(prompter)
113
114	// Handle any errors.
115	if err != nil {
116		return err
117	}
118
119	// Signal completion.
120	if err := stream.Send(&FlushResponse{}); err != nil {
121		return errors.Wrap(err, "unable to send response")
122	}
123
124	// Success.
125	return nil
126}
127
128// Pause pauses sessions.
129func (s *Server) Pause(stream Synchronization_PauseServer) error {
130	// Receive the first request.
131	request, err := stream.Recv()
132	if err != nil {
133		return errors.Wrap(err, "unable to receive request")
134	} else if err = request.ensureValid(true); err != nil {
135		return errors.Wrap(err, "received invalid pause request")
136	}
137
138	// Wrap the stream in a prompter and register it with the prompt server.
139	prompter, err := prompting.RegisterPrompter(&pauseStreamPrompter{stream})
140	if err != nil {
141		return errors.Wrap(err, "unable to register prompter")
142	}
143
144	// Perform pause(s).
145	err = s.manager.Pause(stream.Context(), request.Selection, prompter)
146
147	// Unregister the prompter.
148	prompting.UnregisterPrompter(prompter)
149
150	// Handle any errors.
151	if err != nil {
152		return err
153	}
154
155	// Signal completion.
156	if err := stream.Send(&PauseResponse{}); err != nil {
157		return errors.Wrap(err, "unable to send response")
158	}
159
160	// Success.
161	return nil
162}
163
164// Resume resumes sessions.
165func (s *Server) Resume(stream Synchronization_ResumeServer) error {
166	// Receive the first request.
167	request, err := stream.Recv()
168	if err != nil {
169		return errors.Wrap(err, "unable to receive request")
170	} else if err = request.ensureValid(true); err != nil {
171		return errors.Wrap(err, "received invalid resume request")
172	}
173
174	// Wrap the stream in a prompter and register it with the prompt server.
175	prompter, err := prompting.RegisterPrompter(&resumeStreamPrompter{stream})
176	if err != nil {
177		return errors.Wrap(err, "unable to register prompter")
178	}
179
180	// Perform resume(s).
181	err = s.manager.Resume(stream.Context(), request.Selection, prompter)
182
183	// Unregister the prompter.
184	prompting.UnregisterPrompter(prompter)
185
186	// Handle any errors.
187	if err != nil {
188		return err
189	}
190
191	// Signal completion.
192	if err := stream.Send(&ResumeResponse{}); err != nil {
193		return errors.Wrap(err, "unable to send response")
194	}
195
196	// Success.
197	return nil
198}
199
200// Reset resets sessions.
201func (s *Server) Reset(stream Synchronization_ResetServer) error {
202	// Receive the first request.
203	request, err := stream.Recv()
204	if err != nil {
205		return errors.Wrap(err, "unable to receive request")
206	} else if err = request.ensureValid(true); err != nil {
207		return errors.Wrap(err, "received invalid reset request")
208	}
209
210	// Wrap the stream in a prompter and register it with the prompt server.
211	prompter, err := prompting.RegisterPrompter(&resetStreamPrompter{stream})
212	if err != nil {
213		return errors.Wrap(err, "unable to register prompter")
214	}
215
216	// Perform reset(s).
217	err = s.manager.Reset(stream.Context(), request.Selection, prompter)
218
219	// Unregister the prompter.
220	prompting.UnregisterPrompter(prompter)
221
222	// Handle any errors.
223	if err != nil {
224		return err
225	}
226
227	// Signal completion.
228	if err := stream.Send(&ResetResponse{}); err != nil {
229		return errors.Wrap(err, "unable to send response")
230	}
231
232	// Success.
233	return nil
234}
235
236// Terminate terminates sessions.
237func (s *Server) Terminate(stream Synchronization_TerminateServer) error {
238	// Receive the first request.
239	request, err := stream.Recv()
240	if err != nil {
241		return errors.Wrap(err, "unable to receive request")
242	} else if err = request.ensureValid(true); err != nil {
243		return errors.Wrap(err, "received invalid terminate request")
244	}
245
246	// Wrap the stream in a prompter and register it with the prompt server.
247	prompter, err := prompting.RegisterPrompter(&terminateStreamPrompter{stream})
248	if err != nil {
249		return errors.Wrap(err, "unable to register prompter")
250	}
251
252	// Perform termination.
253	err = s.manager.Terminate(stream.Context(), request.Selection, prompter)
254
255	// Unregister the prompter.
256	prompting.UnregisterPrompter(prompter)
257
258	// Handle any errors.
259	if err != nil {
260		return err
261	}
262
263	// Signal completion.
264	if err := stream.Send(&TerminateResponse{}); err != nil {
265		return errors.Wrap(err, "unable to send response")
266	}
267
268	// Success.
269	return nil
270}
271