1package integration
2
3import (
4	"context"
5	"fmt"
6	"io/ioutil"
7	"net/http"
8	"os"
9	"path/filepath"
10	"runtime"
11	"testing"
12
13	"github.com/pkg/errors"
14
15	"github.com/google/uuid"
16
17	"github.com/mutagen-io/mutagen/pkg/forwarding"
18	"github.com/mutagen-io/mutagen/pkg/forwarding/endpoint/local"
19	"github.com/mutagen-io/mutagen/pkg/integration/fixtures/constants"
20	"github.com/mutagen-io/mutagen/pkg/integration/protocols/netpipe"
21	"github.com/mutagen-io/mutagen/pkg/prompting"
22	"github.com/mutagen-io/mutagen/pkg/selection"
23	"github.com/mutagen-io/mutagen/pkg/synchronization"
24	"github.com/mutagen-io/mutagen/pkg/url"
25)
26
27func waitForSuccessfulSynchronizationCycle(ctx context.Context, sessionId string, allowConflicts, allowProblems bool) error {
28	// Create a session selection specification.
29	selection := &selection.Selection{
30		Specifications: []string{sessionId},
31	}
32
33	// Perform waiting.
34	var previousStateIndex uint64
35	var states []*synchronization.State
36	var err error
37	for {
38		previousStateIndex, states, err = synchronizationManager.List(ctx, selection, previousStateIndex)
39		if err != nil {
40			return errors.Wrap(err, "unable to list session states")
41		} else if len(states) != 1 {
42			return errors.New("invalid number of session states returned")
43		} else if states[0].SuccessfulSynchronizationCycles > 0 {
44			if !allowProblems && (len(states[0].AlphaProblems) > 0 || len(states[0].BetaProblems) > 0) {
45				return errors.New("problems detected (and disallowed)")
46			} else if !allowConflicts && len(states[0].Conflicts) > 0 {
47				return errors.New("conflicts detected (and disallowed)")
48			}
49			return nil
50		}
51	}
52}
53
54func testSessionLifecycle(ctx context.Context, prompter string, alpha, beta *url.URL, configuration *synchronization.Configuration, allowConflicts, allowProblems bool) error {
55	// Create a session.
56	sessionId, err := synchronizationManager.Create(
57		ctx,
58		alpha, beta,
59		configuration,
60		&synchronization.Configuration{},
61		&synchronization.Configuration{},
62		"testSynchronizationSession",
63		nil,
64		false,
65		prompter,
66	)
67	if err != nil {
68		return errors.Wrap(err, "unable to create session")
69	}
70
71	// Wait for the session to have at least one successful synchronization
72	// cycle.
73	// TODO: Should we add a timeout on this?
74	if err := waitForSuccessfulSynchronizationCycle(ctx, sessionId, allowConflicts, allowProblems); err != nil {
75		return errors.Wrap(err, "unable to wait for successful synchronization")
76	}
77
78	// TODO: Add hook for verifying file contents.
79
80	// TODO: Add hook for verifying presence/absence of particular
81	// conflicts/problems and remove that monitoring from
82	// waitForSuccessfulSynchronizationCycle (maybe have it pass back the
83	// relevant state).
84
85	// Create a session selection specification.
86	selection := &selection.Selection{
87		Specifications: []string{sessionId},
88	}
89
90	// Pause the session.
91	if err := synchronizationManager.Pause(ctx, selection, ""); err != nil {
92		return errors.Wrap(err, "unable to pause session")
93	}
94
95	// Resume the session.
96	if err := synchronizationManager.Resume(ctx, selection, ""); err != nil {
97		return errors.Wrap(err, "unable to resume session")
98	}
99
100	// Wait for the session to have at least one additional synchronization
101	// cycle.
102	if err := waitForSuccessfulSynchronizationCycle(ctx, sessionId, allowConflicts, allowProblems); err != nil {
103		return errors.Wrap(err, "unable to wait for additional synchronization")
104	}
105
106	// Attempt an additional resume (this should be a no-op).
107	if err := synchronizationManager.Resume(ctx, selection, ""); err != nil {
108		return errors.Wrap(err, "unable to perform additional resume")
109	}
110
111	// Terminate the session.
112	if err := synchronizationManager.Terminate(ctx, selection, ""); err != nil {
113		return errors.Wrap(err, "unable to terminate session")
114	}
115
116	// TODO: Verify that cleanup took place.
117
118	// Success.
119	return nil
120}
121
122func TestSynchronizationBothRootsNil(t *testing.T) {
123	// Allow this test to run in parallel.
124	t.Parallel()
125
126	// Create a temporary directory and defer its cleanup.
127	directory, err := ioutil.TempDir("", "mutagen_end_to_end")
128	if err != nil {
129		t.Fatal("unable to create temporary directory:", err)
130	}
131	defer os.RemoveAll(directory)
132
133	// Calculate alpha and beta paths.
134	alphaRoot := filepath.Join(directory, "alpha")
135	betaRoot := filepath.Join(directory, "beta")
136
137	// Compute alpha and beta URLs.
138	alphaURL := &url.URL{Path: alphaRoot}
139	betaURL := &url.URL{Path: betaRoot}
140
141	// Compute configuration. We use defaults for everything.
142	configuration := &synchronization.Configuration{}
143
144	// Test the session lifecycle.
145	if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil {
146		t.Fatal("session lifecycle test failed:", err)
147	}
148}
149
150func TestSynchronizationGOROOTSrcToBeta(t *testing.T) {
151	// Check the end-to-end test mode and compute the source synchronization
152	// root accordingly. If no mode has been specified, then skip the test.
153	endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END")
154	var sourceRoot string
155	if endToEndTestMode == "" {
156		t.Skip()
157	} else if endToEndTestMode == "full" {
158		sourceRoot = filepath.Join(runtime.GOROOT(), "src")
159	} else if endToEndTestMode == "slim" {
160		sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio")
161	} else {
162		t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode)
163	}
164
165	// Allow the test to run in parallel.
166	t.Parallel()
167
168	// Create a temporary directory and defer its cleanup.
169	directory, err := ioutil.TempDir("", "mutagen_end_to_end")
170	if err != nil {
171		t.Fatal("unable to create temporary directory:", err)
172	}
173	defer os.RemoveAll(directory)
174
175	// Calculate alpha and beta paths.
176	alphaRoot := sourceRoot
177	betaRoot := filepath.Join(directory, "beta")
178
179	// Compute alpha and beta URLs.
180	alphaURL := &url.URL{Path: alphaRoot}
181	betaURL := &url.URL{Path: betaRoot}
182
183	// Compute configuration. We use defaults for everything.
184	configuration := &synchronization.Configuration{}
185
186	// Test the session lifecycle.
187	if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil {
188		t.Fatal("session lifecycle test failed:", err)
189	}
190}
191
192func TestSynchronizationGOROOTSrcToAlpha(t *testing.T) {
193	// Check the end-to-end test mode and compute the source synchronization
194	// root accordingly. If no mode has been specified, then skip the test.
195	endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END")
196	var sourceRoot string
197	if endToEndTestMode == "" {
198		t.Skip()
199	} else if endToEndTestMode == "full" {
200		sourceRoot = filepath.Join(runtime.GOROOT(), "src")
201	} else if endToEndTestMode == "slim" {
202		sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio")
203	} else {
204		t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode)
205	}
206
207	// Allow the test to run in parallel.
208	t.Parallel()
209
210	// Create a temporary directory and defer its cleanup.
211	directory, err := ioutil.TempDir("", "mutagen_end_to_end")
212	if err != nil {
213		t.Fatal("unable to create temporary directory:", err)
214	}
215	defer os.RemoveAll(directory)
216
217	// Calculate alpha and beta paths.
218	alphaRoot := filepath.Join(directory, "alpha")
219	betaRoot := sourceRoot
220
221	// Compute alpha and beta URLs.
222	alphaURL := &url.URL{Path: alphaRoot}
223	betaURL := &url.URL{Path: betaRoot}
224
225	// Compute configuration. We use defaults for everything.
226	configuration := &synchronization.Configuration{}
227
228	// Test the session lifecycle.
229	if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil {
230		t.Fatal("session lifecycle test failed:", err)
231	}
232}
233
234func TestSynchronizationGOROOTSrcToBetaInMemory(t *testing.T) {
235	// Check the end-to-end test mode and compute the source synchronization
236	// root accordingly. If no mode has been specified, then skip the test.
237	endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END")
238	var sourceRoot string
239	if endToEndTestMode == "" {
240		t.Skip()
241	} else if endToEndTestMode == "full" {
242		sourceRoot = filepath.Join(runtime.GOROOT(), "src")
243	} else if endToEndTestMode == "slim" {
244		sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio")
245	} else {
246		t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode)
247	}
248
249	// Allow the test to run in parallel.
250	t.Parallel()
251
252	// Create a temporary directory and defer its cleanup.
253	directory, err := ioutil.TempDir("", "mutagen_end_to_end")
254	if err != nil {
255		t.Fatal("unable to create temporary directory:", err)
256	}
257	defer os.RemoveAll(directory)
258
259	// Calculate alpha and beta paths.
260	alphaRoot := sourceRoot
261	betaRoot := filepath.Join(directory, "beta")
262
263	// Compute alpha and beta URLs. We use a special protocol with a custom
264	// handler to indicate an in-memory connection.
265	alphaURL := &url.URL{Path: alphaRoot}
266	betaURL := &url.URL{
267		Protocol: netpipe.Protocol_Netpipe,
268		Path:     betaRoot,
269	}
270
271	// Compute configuration. We use defaults for everything.
272	configuration := &synchronization.Configuration{}
273
274	// Test the session lifecycle.
275	if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil {
276		t.Fatal("session lifecycle test failed:", err)
277	}
278}
279
280func TestSynchronizationGOROOTSrcToBetaOverSSH(t *testing.T) {
281	// If localhost SSH support isn't available, then skip this test.
282	if os.Getenv("MUTAGEN_TEST_SSH") != "true" {
283		t.Skip()
284	}
285
286	// Check the end-to-end test mode and compute the source synchronization
287	// root accordingly. If no mode has been specified, then skip the test.
288	endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END")
289	var sourceRoot string
290	if endToEndTestMode == "" {
291		t.Skip()
292	} else if endToEndTestMode == "full" {
293		sourceRoot = filepath.Join(runtime.GOROOT(), "src")
294	} else if endToEndTestMode == "slim" {
295		sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio")
296	} else {
297		t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode)
298	}
299
300	// Allow the test to run in parallel.
301	t.Parallel()
302
303	// Create a temporary directory and defer its cleanup.
304	directory, err := ioutil.TempDir("", "mutagen_end_to_end")
305	if err != nil {
306		t.Fatal("unable to create temporary directory:", err)
307	}
308	defer os.RemoveAll(directory)
309
310	// Calculate alpha and beta paths.
311	alphaRoot := sourceRoot
312	betaRoot := filepath.Join(directory, "beta")
313
314	// Compute alpha and beta URLs.
315	alphaURL := &url.URL{Path: alphaRoot}
316	betaURL := &url.URL{
317		Protocol: url.Protocol_SSH,
318		Host:     "localhost",
319		Path:     betaRoot,
320	}
321
322	// Compute configuration. We use defaults for everything.
323	configuration := &synchronization.Configuration{}
324
325	// Test the session lifecycle.
326	if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil {
327		t.Fatal("session lifecycle test failed:", err)
328	}
329}
330
331// testWindowsDockerTransportPrompter is a prompting.Prompter implementation
332// that will answer "yes" to all prompts. It's needed to confirm container
333// restart behavior in the Docker transport on Windows.
334type testWindowsDockerTransportPrompter struct{}
335
336func (t *testWindowsDockerTransportPrompter) Message(_ string) error {
337	return nil
338}
339
340func (t *testWindowsDockerTransportPrompter) Prompt(_ string) (string, error) {
341	return "yes", nil
342}
343
344func TestSynchronizationGOROOTSrcToBetaOverDocker(t *testing.T) {
345	// If Docker test support isn't available, then skip this test.
346	if os.Getenv("MUTAGEN_TEST_DOCKER") != "true" {
347		t.Skip()
348	}
349
350	// Check the end-to-end test mode and compute the source synchronization
351	// root accordingly. If no mode has been specified, then skip the test.
352	endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END")
353	var sourceRoot string
354	if endToEndTestMode == "" {
355		t.Skip()
356	} else if endToEndTestMode == "full" {
357		sourceRoot = filepath.Join(runtime.GOROOT(), "src")
358	} else if endToEndTestMode == "slim" {
359		sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio")
360	} else {
361		t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode)
362	}
363
364	// If we're on a POSIX system, then allow this test to run concurrently with
365	// other tests. On Windows, agent installation into Docker containers
366	// requires temporarily halting the container, meaning that multiple
367	// simultaneous Docker tests could conflict with each other, so we don't
368	// allow Docker-based tests to run concurrently on Windows.
369	if runtime.GOOS != "windows" {
370		t.Parallel()
371	}
372
373	// If we're on Windows, register a prompter that will answer yes to
374	// questions about stoping and restarting containers.
375	var prompter string
376	if runtime.GOOS == "windows" {
377		if p, err := prompting.RegisterPrompter(&testWindowsDockerTransportPrompter{}); err != nil {
378			t.Fatal("unable to register prompter:", err)
379		} else {
380			prompter = p
381			defer prompting.UnregisterPrompter(prompter)
382		}
383	}
384
385	// Create a unique directory name for synchronization into the container. We
386	// don't clean it up, because it will be wiped out when the test container
387	// is deleted.
388	randomUUID, err := uuid.NewRandom()
389	if err != nil {
390		t.Fatal("unable to create random directory UUID:", err)
391	}
392
393	// Calculate alpha and beta paths.
394	alphaRoot := sourceRoot
395	betaRoot := "~/" + randomUUID.String()
396
397	// Grab Docker environment variables.
398	environment := make(map[string]string, len(url.DockerEnvironmentVariables))
399	for _, variable := range url.DockerEnvironmentVariables {
400		environment[variable] = os.Getenv(variable)
401	}
402
403	// Compute alpha and beta URLs.
404	alphaURL := &url.URL{Path: alphaRoot}
405	betaURL := &url.URL{
406		Protocol:    url.Protocol_Docker,
407		User:        os.Getenv("MUTAGEN_TEST_DOCKER_USERNAME"),
408		Host:        os.Getenv("MUTAGEN_TEST_DOCKER_CONTAINER_NAME"),
409		Path:        betaRoot,
410		Environment: environment,
411	}
412
413	// Verify that the beta URL is valid (this will validate the test
414	// environment variables as well).
415	if err := betaURL.EnsureValid(); err != nil {
416		t.Fatal("beta URL is invalid:", err)
417	}
418
419	// Compute configuration. We use defaults for everything.
420	configuration := &synchronization.Configuration{}
421
422	// Test the session lifecycle.
423	if err := testSessionLifecycle(context.Background(), prompter, alphaURL, betaURL, configuration, false, false); err != nil {
424		t.Fatal("session lifecycle test failed:", err)
425	}
426}
427
428func init() {
429	// HACK: Disable lazy listener initialization since it makes test
430	// coordination difficult.
431	local.DisableLazyListenerInitialization = true
432}
433
434func TestForwardingToHTTPDemo(t *testing.T) {
435	// If Docker test support isn't available, then skip this test.
436	if os.Getenv("MUTAGEN_TEST_DOCKER") != "true" {
437		t.Skip()
438	}
439
440	// If we're on a POSIX system, then allow this test to run concurrently with
441	// other tests. On Windows, agent installation into Docker containers
442	// requires temporarily halting the container, meaning that multiple
443	// simultaneous Docker tests could conflict with each other, so we don't
444	// allow Docker-based tests to run concurrently on Windows.
445	if runtime.GOOS != "windows" {
446		t.Parallel()
447	}
448
449	// If we're on Windows, register a prompter that will answer yes to
450	// questions about stoping and restarting containers.
451	var prompter string
452	if runtime.GOOS == "windows" {
453		if p, err := prompting.RegisterPrompter(&testWindowsDockerTransportPrompter{}); err != nil {
454			t.Fatal("unable to register prompter:", err)
455		} else {
456			prompter = p
457			defer prompting.UnregisterPrompter(prompter)
458		}
459	}
460
461	// Pick a local listener address.
462	listenerProtocol := "tcp"
463	listenerAddress := "localhost:7070"
464
465	// Compute source and destination URLs.
466	source := &url.URL{
467		Kind:     url.Kind_Forwarding,
468		Protocol: url.Protocol_Local,
469		Path:     listenerProtocol + ":" + listenerAddress,
470	}
471	destination := &url.URL{
472		Kind:     url.Kind_Forwarding,
473		Protocol: url.Protocol_Docker,
474		User:     os.Getenv("MUTAGEN_TEST_DOCKER_USERNAME"),
475		Host:     os.Getenv("MUTAGEN_TEST_DOCKER_CONTAINER_NAME"),
476		Path:     "tcp:" + constants.HTTPDemoBindAddress,
477	}
478
479	// Verify that the destination URL is valid (this will validate the test
480	// environment variables as well).
481	if err := destination.EnsureValid(); err != nil {
482		t.Fatal("beta URL is invalid:", err)
483	}
484
485	// Create a function to perform a simple HTTP request and ensure that the
486	// returned contents are as expected.
487	performHTTPRequest := func() error {
488		// Perform the request and defer closure of the response body.
489		response, err := http.Get(fmt.Sprintf("http://%s/", listenerAddress))
490		if err != nil {
491			return errors.Wrap(err, "unable to perform HTTP GET")
492		}
493		defer response.Body.Close()
494
495		// Read the full body.
496		message, err := ioutil.ReadAll(response.Body)
497		if err != nil {
498			return errors.Wrap(err, "unable to read response body")
499		}
500
501		// Compare the message.
502		if string(message) != constants.HTTPDemoResponse {
503			return errors.New("response does not match expected")
504		}
505
506		// Success.
507		return nil
508	}
509
510	// Create a context to regulate the test.
511	ctx := context.Background()
512
513	// Create a forwarding session. Note that we've disabled lazy listener
514	// initialization using a private API in the init function above, so we can
515	// be sure that the listener has been established (with some non-empty
516	// backlog) by the time creation is complete.
517	sessionID, err := forwardingManager.Create(
518		ctx,
519		source,
520		destination,
521		&forwarding.Configuration{},
522		&forwarding.Configuration{},
523		&forwarding.Configuration{},
524		"testForwardingSession",
525		nil,
526		false,
527		prompter,
528	)
529	if err != nil {
530		t.Fatal("unable to create session:", err)
531	}
532
533	// Attempt an HTTP request.
534	// TODO: Attempt a more complicated exchange here. Maybe gRPC?
535	if err := performHTTPRequest(); err != nil {
536		t.Error("error performing forwarded HTTP request:", err)
537	}
538
539	// Create a session selection specification.
540	selection := &selection.Selection{
541		Specifications: []string{sessionID},
542	}
543
544	// Pause the session.
545	if err := forwardingManager.Pause(ctx, selection, ""); err != nil {
546		t.Error("unable to pause session:", err)
547	}
548
549	// Resume the session.
550	if err := forwardingManager.Resume(ctx, selection, ""); err != nil {
551		t.Error("unable to resume session:", err)
552	}
553
554	// Attempt an HTTP request.
555	// TODO: Attempt a more complicated exchange here. Maybe gRPC?
556	if err := performHTTPRequest(); err != nil {
557		t.Error("error performing forwarded HTTP request:", err)
558	}
559
560	// Attempt an additional resume (this should be a no-op).
561	if err := forwardingManager.Resume(ctx, selection, ""); err != nil {
562		t.Error("unable to perform additional resume:", err)
563	}
564
565	// Terminate the session.
566	if err := forwardingManager.Terminate(ctx, selection, ""); err != nil {
567		t.Error("unable to terminate session:", err)
568	}
569
570	// TODO: Verify that cleanup took place.
571}
572
573// TODO: Add forwarding tests using the netpipe protocol.
574