1// Copyright 2018 The Go Cloud Development Kit Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14package pubsub 15 16import ( 17 "context" 18 "errors" 19 "fmt" 20 "net/url" 21 "strings" 22 "sync" 23 "testing" 24 "time" 25 26 "github.com/google/go-cmp/cmp" 27 "gocloud.dev/gcerrors" 28 "gocloud.dev/internal/gcerr" 29 "gocloud.dev/internal/testing/octest" 30 "gocloud.dev/pubsub/batcher" 31 "gocloud.dev/pubsub/driver" 32) 33 34type driverTopic struct { 35 driver.Topic 36 subs []*driverSub 37} 38 39func (t *driverTopic) SendBatch(ctx context.Context, ms []*driver.Message) error { 40 for _, s := range t.subs { 41 select { 42 case <-s.sem: 43 s.q = append(s.q, ms...) 44 s.sem <- struct{}{} 45 case <-ctx.Done(): 46 return ctx.Err() 47 } 48 } 49 return nil 50} 51 52func (*driverTopic) IsRetryable(error) bool { return false } 53func (*driverTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown } 54func (*driverTopic) Close() error { return nil } 55 56type driverSub struct { 57 driver.Subscription 58 sem chan struct{} 59 // Normally this queue would live on a separate server in the cloud. 60 q []*driver.Message 61} 62 63func NewDriverSub() *driverSub { 64 ds := &driverSub{ 65 sem: make(chan struct{}, 1), 66 } 67 ds.sem <- struct{}{} 68 return ds 69} 70 71func (s *driverSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { 72 for { 73 select { 74 case <-s.sem: 75 ms := s.grabQueue(maxMessages) 76 if len(ms) != 0 { 77 return ms, nil 78 } 79 case <-ctx.Done(): 80 return nil, ctx.Err() 81 default: 82 } 83 } 84} 85 86func (s *driverSub) grabQueue(maxMessages int) []*driver.Message { 87 defer func() { s.sem <- struct{}{} }() 88 if len(s.q) > 0 { 89 if len(s.q) <= maxMessages { 90 ms := s.q 91 s.q = nil 92 return ms 93 } 94 ms := s.q[:maxMessages] 95 s.q = s.q[maxMessages:] 96 return ms 97 } 98 return nil 99} 100 101func (s *driverSub) SendAcks(ctx context.Context, ackIDs []driver.AckID) error { 102 return nil 103} 104 105func (*driverSub) IsRetryable(error) bool { return false } 106func (*driverSub) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Internal } 107func (*driverSub) CanNack() bool { return false } 108func (*driverSub) Close() error { return nil } 109 110func TestSendReceive(t *testing.T) { 111 ctx := context.Background() 112 ds := NewDriverSub() 113 dt := &driverTopic{ 114 subs: []*driverSub{ds}, 115 } 116 topic := NewTopic(dt, nil) 117 defer topic.Shutdown(ctx) 118 m := &Message{Body: []byte("user signed up")} 119 if err := topic.Send(ctx, m); err != nil { 120 t.Fatal(err) 121 } 122 123 sub := NewSubscription(ds, nil, nil) 124 defer sub.Shutdown(ctx) 125 m2, err := sub.Receive(ctx) 126 if err != nil { 127 t.Fatal(err) 128 } 129 if string(m2.Body) != string(m.Body) { 130 t.Fatalf("received message has body %q, want %q", m2.Body, m.Body) 131 } 132 m2.Ack() 133} 134 135func TestConcurrentReceivesGetAllTheMessages(t *testing.T) { 136 howManyToSend := int(1e3) 137 ctx, cancel := context.WithCancel(context.Background()) 138 dt := &driverTopic{} 139 140 // wg is used to wait until all messages are received. 141 var wg sync.WaitGroup 142 wg.Add(howManyToSend) 143 144 // Make a subscription. 145 ds := NewDriverSub() 146 dt.subs = append(dt.subs, ds) 147 s := NewSubscription(ds, nil, nil) 148 defer s.Shutdown(ctx) 149 150 // Start 10 goroutines to receive from it. 151 var mu sync.Mutex 152 receivedMsgs := make(map[string]bool) 153 for i := 0; i < 10; i++ { 154 go func() { 155 for { 156 m, err := s.Receive(ctx) 157 if err != nil { 158 // Permanent error; ctx cancelled or subscription closed is 159 // expected once we've received all the messages. 160 mu.Lock() 161 n := len(receivedMsgs) 162 mu.Unlock() 163 if n != howManyToSend { 164 t.Errorf("Worker's Receive failed before all messages were received (%d)", n) 165 } 166 return 167 } 168 m.Ack() 169 mu.Lock() 170 receivedMsgs[string(m.Body)] = true 171 mu.Unlock() 172 wg.Done() 173 } 174 }() 175 } 176 177 // Send messages. Each message has a unique body used as a key to receivedMsgs. 178 topic := NewTopic(dt, nil) 179 defer topic.Shutdown(ctx) 180 for i := 0; i < howManyToSend; i++ { 181 key := fmt.Sprintf("message #%d", i) 182 m := &Message{Body: []byte(key)} 183 if err := topic.Send(ctx, m); err != nil { 184 t.Fatal(err) 185 } 186 } 187 188 // Wait for the goroutines to receive all of the messages, then cancel the 189 // ctx so they all exit. 190 wg.Wait() 191 defer cancel() 192 193 // Check that all the messages were received. 194 for i := 0; i < howManyToSend; i++ { 195 key := fmt.Sprintf("message #%d", i) 196 if !receivedMsgs[key] { 197 t.Errorf("message %q was not received", key) 198 } 199 } 200} 201 202func TestCancelSend(t *testing.T) { 203 ctx, cancel := context.WithCancel(context.Background()) 204 ds := NewDriverSub() 205 dt := &driverTopic{ 206 subs: []*driverSub{ds}, 207 } 208 topic := NewTopic(dt, nil) 209 defer topic.Shutdown(ctx) 210 m := &Message{} 211 212 // Intentionally break the driver subscription by acquiring its semaphore. 213 // Now topic.Send will have to wait for cancellation. 214 <-ds.sem 215 216 cancel() 217 if err := topic.Send(ctx, m); err == nil { 218 t.Error("got nil, want cancellation error") 219 } 220} 221 222func TestCancelReceive(t *testing.T) { 223 ctx, cancel := context.WithCancel(context.Background()) 224 ds := NewDriverSub() 225 s := NewSubscription(ds, nil, nil) 226 defer s.Shutdown(ctx) 227 cancel() 228 // Without cancellation, this Receive would hang. 229 if _, err := s.Receive(ctx); err == nil { 230 t.Error("got nil, want cancellation error") 231 } 232} 233 234type blockingDriverSub struct { 235 driver.Subscription 236 inReceiveBatch chan struct{} 237} 238 239func (b blockingDriverSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { 240 b.inReceiveBatch <- struct{}{} 241 <-ctx.Done() 242 return nil, ctx.Err() 243} 244func (blockingDriverSub) CanNack() bool { return false } 245func (blockingDriverSub) IsRetryable(error) bool { return false } 246func (blockingDriverSub) Close() error { return nil } 247 248func TestCancelTwoReceives(t *testing.T) { 249 // We want to create the following situation: 250 // 1. Goroutine 1 calls Receive, obtains the lock (Subscription.mu), 251 // then releases the lock and calls driver.ReceiveBatch, which hangs. 252 // 2. Goroutine 2 calls Receive. 253 // 3. The context passed to the Goroutine 2 call is canceled. 254 // We expect Goroutine 2's Receive to exit immediately. That won't 255 // happen if Receive holds the lock during the call to ReceiveBatch. 256 inReceiveBatch := make(chan struct{}) 257 s := NewSubscription(blockingDriverSub{inReceiveBatch: inReceiveBatch}, nil, nil) 258 defer s.Shutdown(context.Background()) 259 go func() { 260 _, err := s.Receive(context.Background()) 261 // This should happen at the very end of the test, during Shutdown. 262 if err != context.Canceled { 263 t.Errorf("got %v, want context.Canceled", err) 264 } 265 }() 266 <-inReceiveBatch 267 // Give the Receive call time to block on the mutex before timing out. 268 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 269 defer cancel() 270 errc := make(chan error) 271 go func() { 272 _, err := s.Receive(ctx) 273 errc <- err 274 }() 275 err := <-errc 276 if err != context.DeadlineExceeded { 277 t.Errorf("got %v, want context.DeadlineExceeded", err) 278 } 279} 280 281func TestRetryTopic(t *testing.T) { 282 // Test that Send is retried if the driver returns a retryable error. 283 ctx := context.Background() 284 ft := &failTopic{} 285 topic := NewTopic(ft, nil) 286 defer topic.Shutdown(ctx) 287 err := topic.Send(ctx, &Message{}) 288 if err != nil { 289 t.Errorf("Send: got %v, want nil", err) 290 } 291 if got, want := ft.calls, nRetryCalls+1; got != want { 292 t.Errorf("calls: got %d, want %d", got, want) 293 } 294} 295 296var errRetry = errors.New("retry") 297 298func isRetryable(err error) bool { 299 return err == errRetry 300} 301 302const nRetryCalls = 2 303 304// failTopic helps test retries for SendBatch. 305// 306// SendBatch will fail nRetryCall times before succeeding. 307type failTopic struct { 308 driver.Topic 309 calls int 310} 311 312func (t *failTopic) SendBatch(ctx context.Context, ms []*driver.Message) error { 313 t.calls++ 314 if t.calls <= nRetryCalls { 315 return errRetry 316 } 317 return nil 318} 319 320func (*failTopic) IsRetryable(err error) bool { return isRetryable(err) } 321func (*failTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown } 322func (*failTopic) Close() error { return nil } 323 324func TestRetryReceive(t *testing.T) { 325 ctx := context.Background() 326 fs := &failSub{fail: true} 327 sub := NewSubscription(fs, nil, nil) 328 defer sub.Shutdown(ctx) 329 m, err := sub.Receive(ctx) 330 if err != nil { 331 t.Fatalf("Receive: got %v, want nil", err) 332 } 333 m.Ack() 334 if got, want := fs.calls, nRetryCalls+1; got != want { 335 t.Errorf("calls: got %d, want %d", got, want) 336 } 337} 338 339// TestBatchSizeDecay verifies that the batch size decays when no messages are available. 340// (see https://github.com/google/go-cloud/issues/2849). 341func TestBatchSizeDecays(t *testing.T) { 342 ctx := context.Background() 343 fs := &failSub{} 344 // Allow multiple handlers and cap max batch size to ensure we get concurrency. 345 sub := NewSubscription(fs, &batcher.Options{MaxHandlers: 10, MaxBatchSize: 2}, nil) 346 defer sub.Shutdown(ctx) 347 348 // Records the last batch size. 349 var mu sync.Mutex 350 lastMaxMessages := 0 351 sub.preReceiveBatchHook = func(maxMessages int) { 352 mu.Lock() 353 defer mu.Unlock() 354 lastMaxMessages = maxMessages 355 } 356 357 // Do some receives to allow the number of batches to increase past 1. 358 for n := 0; n < 100; n++ { 359 m, err := sub.Receive(ctx) 360 if err != nil { 361 t.Fatalf("Receive: got %v, want nil", err) 362 } 363 m.Ack() 364 } 365 366 // Tell the failSub to start returning no messages. 367 fs.mu.Lock() 368 fs.empty = true 369 fs.mu.Unlock() 370 371 mu.Lock() 372 highWaterMarkBatchSize := lastMaxMessages 373 if lastMaxMessages <= 1 { 374 t.Fatal("max messages wasn't greater than 1") 375 } 376 mu.Unlock() 377 378 // Make a bunch of calls to Receive to drain any outstanding 379 // messages, and wait some extra time during which we should 380 // continue polling, and the batch size should decay. 381 for { 382 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) 383 defer cancel() 384 m, err := sub.Receive(ctx) 385 if err != nil { 386 // Expected: no more messages, and timed out. 387 break 388 } 389 // Drained a message. 390 m.Ack() 391 } 392 393 // Verify that the batch size decayed. 394 mu.Lock() 395 if lastMaxMessages >= highWaterMarkBatchSize { 396 t.Fatalf("wanted batch size to decay; high water mark was %d, now %d", highWaterMarkBatchSize, lastMaxMessages) 397 } 398 mu.Unlock() 399} 400 401// TestRetryReceiveBatches verifies that batching and retries work without races 402// (see https://github.com/google/go-cloud/issues/2676). 403func TestRetryReceiveInBatchesDoesntRace(t *testing.T) { 404 ctx := context.Background() 405 fs := &failSub{} 406 // Allow multiple handlers and cap max batch size to ensure we get concurrency. 407 sub := NewSubscription(fs, &batcher.Options{MaxHandlers: 10, MaxBatchSize: 2}, nil) 408 defer sub.Shutdown(ctx) 409 410 // Do some receives to allow the number of batches to increase past 1. 411 for n := 0; n < 100; n++ { 412 m, err := sub.Receive(ctx) 413 if err != nil { 414 t.Fatalf("Receive: got %v, want nil", err) 415 } 416 m.Ack() 417 } 418 // Tell the failSub to start failing. 419 fs.mu.Lock() 420 fs.fail = true 421 fs.mu.Unlock() 422 423 // This call to Receive should result in nRetryCalls+1 calls to ReceiveBatch for 424 // each batch. In the issue noted above, this would cause a race. 425 for n := 0; n < 100; n++ { 426 m, err := sub.Receive(ctx) 427 if err != nil { 428 t.Fatalf("Receive: got %v, want nil", err) 429 } 430 m.Ack() 431 } 432 // Don't try to verify the exact number of calls, as it is unpredictable 433 // based on the timing of the batching. 434} 435 436// failSub helps test retries for ReceiveBatch. 437// 438// Once start=true, ReceiveBatch will fail nRetryCalls times before succeeding. 439type failSub struct { 440 driver.Subscription 441 fail bool 442 empty bool 443 calls int 444 mu sync.Mutex 445} 446 447func (t *failSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { 448 t.mu.Lock() 449 defer t.mu.Unlock() 450 if t.fail { 451 t.calls++ 452 if t.calls <= nRetryCalls { 453 return nil, errRetry 454 } 455 } 456 if t.empty { 457 t.calls++ 458 return nil, nil 459 } 460 return []*driver.Message{{Body: []byte("")}}, nil 461} 462 463func (*failSub) SendAcks(ctx context.Context, ackIDs []driver.AckID) error { return nil } 464func (*failSub) IsRetryable(err error) bool { return isRetryable(err) } 465func (*failSub) CanNack() bool { return false } 466func (*failSub) Close() error { return nil } 467 468// TODO(jba): add a test for retry of SendAcks. 469 470var errDriver = errors.New("driver error") 471 472type erroringTopic struct { 473 driver.Topic 474} 475 476func (erroringTopic) SendBatch(context.Context, []*driver.Message) error { return errDriver } 477func (erroringTopic) IsRetryable(err error) bool { return isRetryable(err) } 478func (erroringTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.AlreadyExists } 479func (erroringTopic) Close() error { return errDriver } 480 481type erroringSubscription struct { 482 driver.Subscription 483} 484 485func (erroringSubscription) ReceiveBatch(context.Context, int) ([]*driver.Message, error) { 486 return nil, errDriver 487} 488 489func (erroringSubscription) SendAcks(context.Context, []driver.AckID) error { return errDriver } 490func (erroringSubscription) IsRetryable(err error) bool { return isRetryable(err) } 491func (erroringSubscription) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.AlreadyExists } 492func (erroringSubscription) CanNack() bool { return false } 493func (erroringSubscription) Close() error { return errDriver } 494 495// TestErrorsAreWrapped tests that all errors returned from the driver are 496// wrapped exactly once by the portable type. 497func TestErrorsAreWrapped(t *testing.T) { 498 ctx := context.Background() 499 500 verify := func(err error) { 501 t.Helper() 502 if err == nil { 503 t.Errorf("got nil error, wanted non-nil") 504 return 505 } 506 if e, ok := err.(*gcerr.Error); !ok { 507 t.Errorf("not wrapped: %v", err) 508 } else if got := e.Unwrap(); got != errDriver { 509 t.Errorf("got %v for wrapped error, not errDriver", got) 510 } 511 if s := err.Error(); !strings.HasPrefix(s, "pubsub ") { 512 t.Errorf("Error() for wrapped error doesn't start with 'pubsub': prefix: %s", s) 513 } 514 } 515 516 topic := NewTopic(erroringTopic{}, nil) 517 verify(topic.Send(ctx, &Message{})) 518 err := topic.Shutdown(ctx) 519 verify(err) 520 521 sub := NewSubscription(erroringSubscription{}, nil, nil) 522 _, err = sub.Receive(ctx) 523 verify(err) 524 err = sub.Shutdown(ctx) 525 verify(err) 526} 527 528func TestOpenCensus(t *testing.T) { 529 ctx := context.Background() 530 te := octest.NewTestExporter(OpenCensusViews) 531 defer te.Unregister() 532 533 ds := NewDriverSub() 534 dt := &driverTopic{ 535 subs: []*driverSub{ds}, 536 } 537 topic := NewTopic(dt, nil) 538 defer topic.Shutdown(ctx) 539 sub := NewSubscription(ds, nil, nil) 540 defer sub.Shutdown(ctx) 541 if err := topic.Send(ctx, &Message{Body: []byte("x")}); err != nil { 542 t.Fatal(err) 543 } 544 if err := topic.Shutdown(ctx); err != nil { 545 t.Fatal(err) 546 } 547 msg, err := sub.Receive(ctx) 548 if err != nil { 549 t.Fatal(err) 550 } 551 msg.Ack() 552 if err := sub.Shutdown(ctx); err != nil { 553 t.Fatal(err) 554 } 555 _, _ = sub.Receive(ctx) 556 557 diff := octest.Diff(te.Spans(), te.Counts(), "gocloud.dev/pubsub", "gocloud.dev/pubsub", []octest.Call{ 558 {Method: "driver.Topic.SendBatch", Code: gcerrors.OK}, 559 {Method: "Topic.Send", Code: gcerrors.OK}, 560 {Method: "Topic.Shutdown", Code: gcerrors.OK}, 561 {Method: "driver.Subscription.ReceiveBatch", Code: gcerrors.OK}, 562 {Method: "Subscription.Receive", Code: gcerrors.OK}, 563 {Method: "driver.Subscription.SendAcks", Code: gcerrors.OK}, 564 {Method: "Subscription.Shutdown", Code: gcerrors.OK}, 565 {Method: "Subscription.Receive", Code: gcerrors.FailedPrecondition}, 566 }) 567 if diff != "" { 568 t.Error(diff) 569 } 570} 571 572var ( 573 testOpenOnce sync.Once 574 testOpenGot *url.URL 575) 576 577func TestURLMux(t *testing.T) { 578 ctx := context.Background() 579 580 mux := new(URLMux) 581 fake := &fakeOpener{} 582 mux.RegisterTopic("foo", fake) 583 mux.RegisterTopic("err", fake) 584 mux.RegisterSubscription("foo", fake) 585 mux.RegisterSubscription("err", fake) 586 587 if diff := cmp.Diff(mux.TopicSchemes(), []string{"err", "foo"}); diff != "" { 588 t.Errorf("Schemes: %s", diff) 589 } 590 if !mux.ValidTopicScheme("foo") || !mux.ValidTopicScheme("err") { 591 t.Errorf("ValidTopicScheme didn't return true for valid scheme") 592 } 593 if mux.ValidTopicScheme("foo2") || mux.ValidTopicScheme("http") { 594 t.Errorf("ValidTopicScheme didn't return false for invalid scheme") 595 } 596 597 if diff := cmp.Diff(mux.SubscriptionSchemes(), []string{"err", "foo"}); diff != "" { 598 t.Errorf("Schemes: %s", diff) 599 } 600 if !mux.ValidSubscriptionScheme("foo") || !mux.ValidSubscriptionScheme("err") { 601 t.Errorf("ValidSubscriptionScheme didn't return true for valid scheme") 602 } 603 if mux.ValidSubscriptionScheme("foo2") || mux.ValidSubscriptionScheme("http") { 604 t.Errorf("ValidSubscriptionScheme didn't return false for invalid scheme") 605 } 606 607 for _, tc := range []struct { 608 name string 609 url string 610 wantErr bool 611 }{ 612 { 613 name: "empty URL", 614 wantErr: true, 615 }, 616 { 617 name: "invalid URL", 618 url: ":foo", 619 wantErr: true, 620 }, 621 { 622 name: "invalid URL no scheme", 623 url: "foo", 624 wantErr: true, 625 }, 626 { 627 name: "unregistered scheme", 628 url: "bar://myps", 629 wantErr: true, 630 }, 631 { 632 name: "func returns error", 633 url: "err://myps", 634 wantErr: true, 635 }, 636 { 637 name: "no query options", 638 url: "foo://myps", 639 }, 640 { 641 name: "empty query options", 642 url: "foo://myps?", 643 }, 644 { 645 name: "query options", 646 url: "foo://myps?aAa=bBb&cCc=dDd", 647 }, 648 { 649 name: "multiple query options", 650 url: "foo://myps?x=a&x=b&x=c", 651 }, 652 { 653 name: "fancy ps name", 654 url: "foo:///foo/bar/baz", 655 }, 656 { 657 name: "using api schema prefix", 658 url: "pubsub+foo://foo", 659 }, 660 } { 661 t.Run("topic: "+tc.name, func(t *testing.T) { 662 _, gotErr := mux.OpenTopic(ctx, tc.url) 663 if (gotErr != nil) != tc.wantErr { 664 t.Fatalf("got err %v, want error %v", gotErr, tc.wantErr) 665 } 666 if gotErr != nil { 667 return 668 } 669 if got := fake.u.String(); got != tc.url { 670 t.Errorf("got %q want %q", got, tc.url) 671 } 672 // Repeat with OpenTopicURL. 673 parsed, err := url.Parse(tc.url) 674 if err != nil { 675 t.Fatal(err) 676 } 677 _, gotErr = mux.OpenTopicURL(ctx, parsed) 678 if gotErr != nil { 679 t.Fatalf("got err %v, want nil", gotErr) 680 } 681 if got := fake.u.String(); got != tc.url { 682 t.Errorf("got %q want %q", got, tc.url) 683 } 684 }) 685 t.Run("subscription: "+tc.name, func(t *testing.T) { 686 _, gotErr := mux.OpenSubscription(ctx, tc.url) 687 if (gotErr != nil) != tc.wantErr { 688 t.Fatalf("got err %v, want error %v", gotErr, tc.wantErr) 689 } 690 if gotErr != nil { 691 return 692 } 693 if got := fake.u.String(); got != tc.url { 694 t.Errorf("got %q want %q", got, tc.url) 695 } 696 // Repeat with OpenSubscriptionURL. 697 parsed, err := url.Parse(tc.url) 698 if err != nil { 699 t.Fatal(err) 700 } 701 _, gotErr = mux.OpenSubscriptionURL(ctx, parsed) 702 if gotErr != nil { 703 t.Fatalf("got err %v, want nil", gotErr) 704 } 705 if got := fake.u.String(); got != tc.url { 706 t.Errorf("got %q want %q", got, tc.url) 707 } 708 }) 709 } 710} 711 712type fakeOpener struct { 713 u *url.URL // last url passed to OpenTopicURL/OpenSubscriptionURL 714} 715 716func (o *fakeOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error) { 717 if u.Scheme == "err" { 718 return nil, errors.New("fail") 719 } 720 o.u = u 721 return nil, nil 722} 723 724func (o *fakeOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error) { 725 if u.Scheme == "err" { 726 return nil, errors.New("fail") 727 } 728 o.u = u 729 return nil, nil 730} 731