1package integration 2 3import ( 4 "context" 5 "fmt" 6 "io/ioutil" 7 "net/http" 8 "os" 9 "path/filepath" 10 "runtime" 11 "testing" 12 13 "github.com/pkg/errors" 14 15 "github.com/google/uuid" 16 17 "github.com/mutagen-io/mutagen/pkg/forwarding" 18 "github.com/mutagen-io/mutagen/pkg/forwarding/endpoint/local" 19 "github.com/mutagen-io/mutagen/pkg/integration/fixtures/constants" 20 "github.com/mutagen-io/mutagen/pkg/integration/protocols/netpipe" 21 "github.com/mutagen-io/mutagen/pkg/prompting" 22 "github.com/mutagen-io/mutagen/pkg/selection" 23 "github.com/mutagen-io/mutagen/pkg/synchronization" 24 "github.com/mutagen-io/mutagen/pkg/url" 25) 26 27func waitForSuccessfulSynchronizationCycle(ctx context.Context, sessionId string, allowConflicts, allowProblems bool) error { 28 // Create a session selection specification. 29 selection := &selection.Selection{ 30 Specifications: []string{sessionId}, 31 } 32 33 // Perform waiting. 34 var previousStateIndex uint64 35 var states []*synchronization.State 36 var err error 37 for { 38 previousStateIndex, states, err = synchronizationManager.List(ctx, selection, previousStateIndex) 39 if err != nil { 40 return errors.Wrap(err, "unable to list session states") 41 } else if len(states) != 1 { 42 return errors.New("invalid number of session states returned") 43 } else if states[0].SuccessfulSynchronizationCycles > 0 { 44 if !allowProblems && (len(states[0].AlphaProblems) > 0 || len(states[0].BetaProblems) > 0) { 45 return errors.New("problems detected (and disallowed)") 46 } else if !allowConflicts && len(states[0].Conflicts) > 0 { 47 return errors.New("conflicts detected (and disallowed)") 48 } 49 return nil 50 } 51 } 52} 53 54func testSessionLifecycle(ctx context.Context, prompter string, alpha, beta *url.URL, configuration *synchronization.Configuration, allowConflicts, allowProblems bool) error { 55 // Create a session. 56 sessionId, err := synchronizationManager.Create( 57 ctx, 58 alpha, beta, 59 configuration, 60 &synchronization.Configuration{}, 61 &synchronization.Configuration{}, 62 "testSynchronizationSession", 63 nil, 64 false, 65 prompter, 66 ) 67 if err != nil { 68 return errors.Wrap(err, "unable to create session") 69 } 70 71 // Wait for the session to have at least one successful synchronization 72 // cycle. 73 // TODO: Should we add a timeout on this? 74 if err := waitForSuccessfulSynchronizationCycle(ctx, sessionId, allowConflicts, allowProblems); err != nil { 75 return errors.Wrap(err, "unable to wait for successful synchronization") 76 } 77 78 // TODO: Add hook for verifying file contents. 79 80 // TODO: Add hook for verifying presence/absence of particular 81 // conflicts/problems and remove that monitoring from 82 // waitForSuccessfulSynchronizationCycle (maybe have it pass back the 83 // relevant state). 84 85 // Create a session selection specification. 86 selection := &selection.Selection{ 87 Specifications: []string{sessionId}, 88 } 89 90 // Pause the session. 91 if err := synchronizationManager.Pause(ctx, selection, ""); err != nil { 92 return errors.Wrap(err, "unable to pause session") 93 } 94 95 // Resume the session. 96 if err := synchronizationManager.Resume(ctx, selection, ""); err != nil { 97 return errors.Wrap(err, "unable to resume session") 98 } 99 100 // Wait for the session to have at least one additional synchronization 101 // cycle. 102 if err := waitForSuccessfulSynchronizationCycle(ctx, sessionId, allowConflicts, allowProblems); err != nil { 103 return errors.Wrap(err, "unable to wait for additional synchronization") 104 } 105 106 // Attempt an additional resume (this should be a no-op). 107 if err := synchronizationManager.Resume(ctx, selection, ""); err != nil { 108 return errors.Wrap(err, "unable to perform additional resume") 109 } 110 111 // Terminate the session. 112 if err := synchronizationManager.Terminate(ctx, selection, ""); err != nil { 113 return errors.Wrap(err, "unable to terminate session") 114 } 115 116 // TODO: Verify that cleanup took place. 117 118 // Success. 119 return nil 120} 121 122func TestSynchronizationBothRootsNil(t *testing.T) { 123 // Allow this test to run in parallel. 124 t.Parallel() 125 126 // Create a temporary directory and defer its cleanup. 127 directory, err := ioutil.TempDir("", "mutagen_end_to_end") 128 if err != nil { 129 t.Fatal("unable to create temporary directory:", err) 130 } 131 defer os.RemoveAll(directory) 132 133 // Calculate alpha and beta paths. 134 alphaRoot := filepath.Join(directory, "alpha") 135 betaRoot := filepath.Join(directory, "beta") 136 137 // Compute alpha and beta URLs. 138 alphaURL := &url.URL{Path: alphaRoot} 139 betaURL := &url.URL{Path: betaRoot} 140 141 // Compute configuration. We use defaults for everything. 142 configuration := &synchronization.Configuration{} 143 144 // Test the session lifecycle. 145 if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil { 146 t.Fatal("session lifecycle test failed:", err) 147 } 148} 149 150func TestSynchronizationGOROOTSrcToBeta(t *testing.T) { 151 // Check the end-to-end test mode and compute the source synchronization 152 // root accordingly. If no mode has been specified, then skip the test. 153 endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END") 154 var sourceRoot string 155 if endToEndTestMode == "" { 156 t.Skip() 157 } else if endToEndTestMode == "full" { 158 sourceRoot = filepath.Join(runtime.GOROOT(), "src") 159 } else if endToEndTestMode == "slim" { 160 sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio") 161 } else { 162 t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode) 163 } 164 165 // Allow the test to run in parallel. 166 t.Parallel() 167 168 // Create a temporary directory and defer its cleanup. 169 directory, err := ioutil.TempDir("", "mutagen_end_to_end") 170 if err != nil { 171 t.Fatal("unable to create temporary directory:", err) 172 } 173 defer os.RemoveAll(directory) 174 175 // Calculate alpha and beta paths. 176 alphaRoot := sourceRoot 177 betaRoot := filepath.Join(directory, "beta") 178 179 // Compute alpha and beta URLs. 180 alphaURL := &url.URL{Path: alphaRoot} 181 betaURL := &url.URL{Path: betaRoot} 182 183 // Compute configuration. We use defaults for everything. 184 configuration := &synchronization.Configuration{} 185 186 // Test the session lifecycle. 187 if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil { 188 t.Fatal("session lifecycle test failed:", err) 189 } 190} 191 192func TestSynchronizationGOROOTSrcToAlpha(t *testing.T) { 193 // Check the end-to-end test mode and compute the source synchronization 194 // root accordingly. If no mode has been specified, then skip the test. 195 endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END") 196 var sourceRoot string 197 if endToEndTestMode == "" { 198 t.Skip() 199 } else if endToEndTestMode == "full" { 200 sourceRoot = filepath.Join(runtime.GOROOT(), "src") 201 } else if endToEndTestMode == "slim" { 202 sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio") 203 } else { 204 t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode) 205 } 206 207 // Allow the test to run in parallel. 208 t.Parallel() 209 210 // Create a temporary directory and defer its cleanup. 211 directory, err := ioutil.TempDir("", "mutagen_end_to_end") 212 if err != nil { 213 t.Fatal("unable to create temporary directory:", err) 214 } 215 defer os.RemoveAll(directory) 216 217 // Calculate alpha and beta paths. 218 alphaRoot := filepath.Join(directory, "alpha") 219 betaRoot := sourceRoot 220 221 // Compute alpha and beta URLs. 222 alphaURL := &url.URL{Path: alphaRoot} 223 betaURL := &url.URL{Path: betaRoot} 224 225 // Compute configuration. We use defaults for everything. 226 configuration := &synchronization.Configuration{} 227 228 // Test the session lifecycle. 229 if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil { 230 t.Fatal("session lifecycle test failed:", err) 231 } 232} 233 234func TestSynchronizationGOROOTSrcToBetaInMemory(t *testing.T) { 235 // Check the end-to-end test mode and compute the source synchronization 236 // root accordingly. If no mode has been specified, then skip the test. 237 endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END") 238 var sourceRoot string 239 if endToEndTestMode == "" { 240 t.Skip() 241 } else if endToEndTestMode == "full" { 242 sourceRoot = filepath.Join(runtime.GOROOT(), "src") 243 } else if endToEndTestMode == "slim" { 244 sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio") 245 } else { 246 t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode) 247 } 248 249 // Allow the test to run in parallel. 250 t.Parallel() 251 252 // Create a temporary directory and defer its cleanup. 253 directory, err := ioutil.TempDir("", "mutagen_end_to_end") 254 if err != nil { 255 t.Fatal("unable to create temporary directory:", err) 256 } 257 defer os.RemoveAll(directory) 258 259 // Calculate alpha and beta paths. 260 alphaRoot := sourceRoot 261 betaRoot := filepath.Join(directory, "beta") 262 263 // Compute alpha and beta URLs. We use a special protocol with a custom 264 // handler to indicate an in-memory connection. 265 alphaURL := &url.URL{Path: alphaRoot} 266 betaURL := &url.URL{ 267 Protocol: netpipe.Protocol_Netpipe, 268 Path: betaRoot, 269 } 270 271 // Compute configuration. We use defaults for everything. 272 configuration := &synchronization.Configuration{} 273 274 // Test the session lifecycle. 275 if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil { 276 t.Fatal("session lifecycle test failed:", err) 277 } 278} 279 280func TestSynchronizationGOROOTSrcToBetaOverSSH(t *testing.T) { 281 // If localhost SSH support isn't available, then skip this test. 282 if os.Getenv("MUTAGEN_TEST_SSH") != "true" { 283 t.Skip() 284 } 285 286 // Check the end-to-end test mode and compute the source synchronization 287 // root accordingly. If no mode has been specified, then skip the test. 288 endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END") 289 var sourceRoot string 290 if endToEndTestMode == "" { 291 t.Skip() 292 } else if endToEndTestMode == "full" { 293 sourceRoot = filepath.Join(runtime.GOROOT(), "src") 294 } else if endToEndTestMode == "slim" { 295 sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio") 296 } else { 297 t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode) 298 } 299 300 // Allow the test to run in parallel. 301 t.Parallel() 302 303 // Create a temporary directory and defer its cleanup. 304 directory, err := ioutil.TempDir("", "mutagen_end_to_end") 305 if err != nil { 306 t.Fatal("unable to create temporary directory:", err) 307 } 308 defer os.RemoveAll(directory) 309 310 // Calculate alpha and beta paths. 311 alphaRoot := sourceRoot 312 betaRoot := filepath.Join(directory, "beta") 313 314 // Compute alpha and beta URLs. 315 alphaURL := &url.URL{Path: alphaRoot} 316 betaURL := &url.URL{ 317 Protocol: url.Protocol_SSH, 318 Host: "localhost", 319 Path: betaRoot, 320 } 321 322 // Compute configuration. We use defaults for everything. 323 configuration := &synchronization.Configuration{} 324 325 // Test the session lifecycle. 326 if err := testSessionLifecycle(context.Background(), "", alphaURL, betaURL, configuration, false, false); err != nil { 327 t.Fatal("session lifecycle test failed:", err) 328 } 329} 330 331// testWindowsDockerTransportPrompter is a prompting.Prompter implementation 332// that will answer "yes" to all prompts. It's needed to confirm container 333// restart behavior in the Docker transport on Windows. 334type testWindowsDockerTransportPrompter struct{} 335 336func (t *testWindowsDockerTransportPrompter) Message(_ string) error { 337 return nil 338} 339 340func (t *testWindowsDockerTransportPrompter) Prompt(_ string) (string, error) { 341 return "yes", nil 342} 343 344func TestSynchronizationGOROOTSrcToBetaOverDocker(t *testing.T) { 345 // If Docker test support isn't available, then skip this test. 346 if os.Getenv("MUTAGEN_TEST_DOCKER") != "true" { 347 t.Skip() 348 } 349 350 // Check the end-to-end test mode and compute the source synchronization 351 // root accordingly. If no mode has been specified, then skip the test. 352 endToEndTestMode := os.Getenv("MUTAGEN_TEST_END_TO_END") 353 var sourceRoot string 354 if endToEndTestMode == "" { 355 t.Skip() 356 } else if endToEndTestMode == "full" { 357 sourceRoot = filepath.Join(runtime.GOROOT(), "src") 358 } else if endToEndTestMode == "slim" { 359 sourceRoot = filepath.Join(runtime.GOROOT(), "src", "bufio") 360 } else { 361 t.Fatal("unknown end-to-end test mode specified:", endToEndTestMode) 362 } 363 364 // If we're on a POSIX system, then allow this test to run concurrently with 365 // other tests. On Windows, agent installation into Docker containers 366 // requires temporarily halting the container, meaning that multiple 367 // simultaneous Docker tests could conflict with each other, so we don't 368 // allow Docker-based tests to run concurrently on Windows. 369 if runtime.GOOS != "windows" { 370 t.Parallel() 371 } 372 373 // If we're on Windows, register a prompter that will answer yes to 374 // questions about stoping and restarting containers. 375 var prompter string 376 if runtime.GOOS == "windows" { 377 if p, err := prompting.RegisterPrompter(&testWindowsDockerTransportPrompter{}); err != nil { 378 t.Fatal("unable to register prompter:", err) 379 } else { 380 prompter = p 381 defer prompting.UnregisterPrompter(prompter) 382 } 383 } 384 385 // Create a unique directory name for synchronization into the container. We 386 // don't clean it up, because it will be wiped out when the test container 387 // is deleted. 388 randomUUID, err := uuid.NewRandom() 389 if err != nil { 390 t.Fatal("unable to create random directory UUID:", err) 391 } 392 393 // Calculate alpha and beta paths. 394 alphaRoot := sourceRoot 395 betaRoot := "~/" + randomUUID.String() 396 397 // Grab Docker environment variables. 398 environment := make(map[string]string, len(url.DockerEnvironmentVariables)) 399 for _, variable := range url.DockerEnvironmentVariables { 400 environment[variable] = os.Getenv(variable) 401 } 402 403 // Compute alpha and beta URLs. 404 alphaURL := &url.URL{Path: alphaRoot} 405 betaURL := &url.URL{ 406 Protocol: url.Protocol_Docker, 407 User: os.Getenv("MUTAGEN_TEST_DOCKER_USERNAME"), 408 Host: os.Getenv("MUTAGEN_TEST_DOCKER_CONTAINER_NAME"), 409 Path: betaRoot, 410 Environment: environment, 411 } 412 413 // Verify that the beta URL is valid (this will validate the test 414 // environment variables as well). 415 if err := betaURL.EnsureValid(); err != nil { 416 t.Fatal("beta URL is invalid:", err) 417 } 418 419 // Compute configuration. We use defaults for everything. 420 configuration := &synchronization.Configuration{} 421 422 // Test the session lifecycle. 423 if err := testSessionLifecycle(context.Background(), prompter, alphaURL, betaURL, configuration, false, false); err != nil { 424 t.Fatal("session lifecycle test failed:", err) 425 } 426} 427 428func init() { 429 // HACK: Disable lazy listener initialization since it makes test 430 // coordination difficult. 431 local.DisableLazyListenerInitialization = true 432} 433 434func TestForwardingToHTTPDemo(t *testing.T) { 435 // If Docker test support isn't available, then skip this test. 436 if os.Getenv("MUTAGEN_TEST_DOCKER") != "true" { 437 t.Skip() 438 } 439 440 // If we're on a POSIX system, then allow this test to run concurrently with 441 // other tests. On Windows, agent installation into Docker containers 442 // requires temporarily halting the container, meaning that multiple 443 // simultaneous Docker tests could conflict with each other, so we don't 444 // allow Docker-based tests to run concurrently on Windows. 445 if runtime.GOOS != "windows" { 446 t.Parallel() 447 } 448 449 // If we're on Windows, register a prompter that will answer yes to 450 // questions about stoping and restarting containers. 451 var prompter string 452 if runtime.GOOS == "windows" { 453 if p, err := prompting.RegisterPrompter(&testWindowsDockerTransportPrompter{}); err != nil { 454 t.Fatal("unable to register prompter:", err) 455 } else { 456 prompter = p 457 defer prompting.UnregisterPrompter(prompter) 458 } 459 } 460 461 // Pick a local listener address. 462 listenerProtocol := "tcp" 463 listenerAddress := "localhost:7070" 464 465 // Compute source and destination URLs. 466 source := &url.URL{ 467 Kind: url.Kind_Forwarding, 468 Protocol: url.Protocol_Local, 469 Path: listenerProtocol + ":" + listenerAddress, 470 } 471 destination := &url.URL{ 472 Kind: url.Kind_Forwarding, 473 Protocol: url.Protocol_Docker, 474 User: os.Getenv("MUTAGEN_TEST_DOCKER_USERNAME"), 475 Host: os.Getenv("MUTAGEN_TEST_DOCKER_CONTAINER_NAME"), 476 Path: "tcp:" + constants.HTTPDemoBindAddress, 477 } 478 479 // Verify that the destination URL is valid (this will validate the test 480 // environment variables as well). 481 if err := destination.EnsureValid(); err != nil { 482 t.Fatal("beta URL is invalid:", err) 483 } 484 485 // Create a function to perform a simple HTTP request and ensure that the 486 // returned contents are as expected. 487 performHTTPRequest := func() error { 488 // Perform the request and defer closure of the response body. 489 response, err := http.Get(fmt.Sprintf("http://%s/", listenerAddress)) 490 if err != nil { 491 return errors.Wrap(err, "unable to perform HTTP GET") 492 } 493 defer response.Body.Close() 494 495 // Read the full body. 496 message, err := ioutil.ReadAll(response.Body) 497 if err != nil { 498 return errors.Wrap(err, "unable to read response body") 499 } 500 501 // Compare the message. 502 if string(message) != constants.HTTPDemoResponse { 503 return errors.New("response does not match expected") 504 } 505 506 // Success. 507 return nil 508 } 509 510 // Create a context to regulate the test. 511 ctx := context.Background() 512 513 // Create a forwarding session. Note that we've disabled lazy listener 514 // initialization using a private API in the init function above, so we can 515 // be sure that the listener has been established (with some non-empty 516 // backlog) by the time creation is complete. 517 sessionID, err := forwardingManager.Create( 518 ctx, 519 source, 520 destination, 521 &forwarding.Configuration{}, 522 &forwarding.Configuration{}, 523 &forwarding.Configuration{}, 524 "testForwardingSession", 525 nil, 526 false, 527 prompter, 528 ) 529 if err != nil { 530 t.Fatal("unable to create session:", err) 531 } 532 533 // Attempt an HTTP request. 534 // TODO: Attempt a more complicated exchange here. Maybe gRPC? 535 if err := performHTTPRequest(); err != nil { 536 t.Error("error performing forwarded HTTP request:", err) 537 } 538 539 // Create a session selection specification. 540 selection := &selection.Selection{ 541 Specifications: []string{sessionID}, 542 } 543 544 // Pause the session. 545 if err := forwardingManager.Pause(ctx, selection, ""); err != nil { 546 t.Error("unable to pause session:", err) 547 } 548 549 // Resume the session. 550 if err := forwardingManager.Resume(ctx, selection, ""); err != nil { 551 t.Error("unable to resume session:", err) 552 } 553 554 // Attempt an HTTP request. 555 // TODO: Attempt a more complicated exchange here. Maybe gRPC? 556 if err := performHTTPRequest(); err != nil { 557 t.Error("error performing forwarded HTTP request:", err) 558 } 559 560 // Attempt an additional resume (this should be a no-op). 561 if err := forwardingManager.Resume(ctx, selection, ""); err != nil { 562 t.Error("unable to perform additional resume:", err) 563 } 564 565 // Terminate the session. 566 if err := forwardingManager.Terminate(ctx, selection, ""); err != nil { 567 t.Error("unable to terminate session:", err) 568 } 569 570 // TODO: Verify that cleanup took place. 571} 572 573// TODO: Add forwarding tests using the netpipe protocol. 574