1// Copyright 2018 The Go Cloud Development Kit Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package pubsub
15
16import (
17	"context"
18	"errors"
19	"fmt"
20	"net/url"
21	"strings"
22	"sync"
23	"testing"
24	"time"
25
26	"github.com/google/go-cmp/cmp"
27	"gocloud.dev/gcerrors"
28	"gocloud.dev/internal/gcerr"
29	"gocloud.dev/internal/testing/octest"
30	"gocloud.dev/pubsub/batcher"
31	"gocloud.dev/pubsub/driver"
32)
33
34type driverTopic struct {
35	driver.Topic
36	subs []*driverSub
37}
38
39func (t *driverTopic) SendBatch(ctx context.Context, ms []*driver.Message) error {
40	for _, s := range t.subs {
41		select {
42		case <-s.sem:
43			s.q = append(s.q, ms...)
44			s.sem <- struct{}{}
45		case <-ctx.Done():
46			return ctx.Err()
47		}
48	}
49	return nil
50}
51
52func (*driverTopic) IsRetryable(error) bool             { return false }
53func (*driverTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }
54func (*driverTopic) Close() error                       { return nil }
55
56type driverSub struct {
57	driver.Subscription
58	sem chan struct{}
59	// Normally this queue would live on a separate server in the cloud.
60	q []*driver.Message
61}
62
63func NewDriverSub() *driverSub {
64	ds := &driverSub{
65		sem: make(chan struct{}, 1),
66	}
67	ds.sem <- struct{}{}
68	return ds
69}
70
71func (s *driverSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) {
72	for {
73		select {
74		case <-s.sem:
75			ms := s.grabQueue(maxMessages)
76			if len(ms) != 0 {
77				return ms, nil
78			}
79		case <-ctx.Done():
80			return nil, ctx.Err()
81		default:
82		}
83	}
84}
85
86func (s *driverSub) grabQueue(maxMessages int) []*driver.Message {
87	defer func() { s.sem <- struct{}{} }()
88	if len(s.q) > 0 {
89		if len(s.q) <= maxMessages {
90			ms := s.q
91			s.q = nil
92			return ms
93		}
94		ms := s.q[:maxMessages]
95		s.q = s.q[maxMessages:]
96		return ms
97	}
98	return nil
99}
100
101func (s *driverSub) SendAcks(ctx context.Context, ackIDs []driver.AckID) error {
102	return nil
103}
104
105func (*driverSub) IsRetryable(error) bool             { return false }
106func (*driverSub) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Internal }
107func (*driverSub) CanNack() bool                      { return false }
108func (*driverSub) Close() error                       { return nil }
109
110func TestSendReceive(t *testing.T) {
111	ctx := context.Background()
112	ds := NewDriverSub()
113	dt := &driverTopic{
114		subs: []*driverSub{ds},
115	}
116	topic := NewTopic(dt, nil)
117	defer topic.Shutdown(ctx)
118	m := &Message{Body: []byte("user signed up")}
119	if err := topic.Send(ctx, m); err != nil {
120		t.Fatal(err)
121	}
122
123	sub := NewSubscription(ds, nil, nil)
124	defer sub.Shutdown(ctx)
125	m2, err := sub.Receive(ctx)
126	if err != nil {
127		t.Fatal(err)
128	}
129	if string(m2.Body) != string(m.Body) {
130		t.Fatalf("received message has body %q, want %q", m2.Body, m.Body)
131	}
132	m2.Ack()
133}
134
135func TestConcurrentReceivesGetAllTheMessages(t *testing.T) {
136	howManyToSend := int(1e3)
137	ctx, cancel := context.WithCancel(context.Background())
138	dt := &driverTopic{}
139
140	// wg is used to wait until all messages are received.
141	var wg sync.WaitGroup
142	wg.Add(howManyToSend)
143
144	// Make a subscription.
145	ds := NewDriverSub()
146	dt.subs = append(dt.subs, ds)
147	s := NewSubscription(ds, nil, nil)
148	defer s.Shutdown(ctx)
149
150	// Start 10 goroutines to receive from it.
151	var mu sync.Mutex
152	receivedMsgs := make(map[string]bool)
153	for i := 0; i < 10; i++ {
154		go func() {
155			for {
156				m, err := s.Receive(ctx)
157				if err != nil {
158					// Permanent error; ctx cancelled or subscription closed is
159					// expected once we've received all the messages.
160					mu.Lock()
161					n := len(receivedMsgs)
162					mu.Unlock()
163					if n != howManyToSend {
164						t.Errorf("Worker's Receive failed before all messages were received (%d)", n)
165					}
166					return
167				}
168				m.Ack()
169				mu.Lock()
170				receivedMsgs[string(m.Body)] = true
171				mu.Unlock()
172				wg.Done()
173			}
174		}()
175	}
176
177	// Send messages. Each message has a unique body used as a key to receivedMsgs.
178	topic := NewTopic(dt, nil)
179	defer topic.Shutdown(ctx)
180	for i := 0; i < howManyToSend; i++ {
181		key := fmt.Sprintf("message #%d", i)
182		m := &Message{Body: []byte(key)}
183		if err := topic.Send(ctx, m); err != nil {
184			t.Fatal(err)
185		}
186	}
187
188	// Wait for the goroutines to receive all of the messages, then cancel the
189	// ctx so they all exit.
190	wg.Wait()
191	defer cancel()
192
193	// Check that all the messages were received.
194	for i := 0; i < howManyToSend; i++ {
195		key := fmt.Sprintf("message #%d", i)
196		if !receivedMsgs[key] {
197			t.Errorf("message %q was not received", key)
198		}
199	}
200}
201
202func TestCancelSend(t *testing.T) {
203	ctx, cancel := context.WithCancel(context.Background())
204	ds := NewDriverSub()
205	dt := &driverTopic{
206		subs: []*driverSub{ds},
207	}
208	topic := NewTopic(dt, nil)
209	defer topic.Shutdown(ctx)
210	m := &Message{}
211
212	// Intentionally break the driver subscription by acquiring its semaphore.
213	// Now topic.Send will have to wait for cancellation.
214	<-ds.sem
215
216	cancel()
217	if err := topic.Send(ctx, m); err == nil {
218		t.Error("got nil, want cancellation error")
219	}
220}
221
222func TestCancelReceive(t *testing.T) {
223	ctx, cancel := context.WithCancel(context.Background())
224	ds := NewDriverSub()
225	s := NewSubscription(ds, nil, nil)
226	defer s.Shutdown(ctx)
227	cancel()
228	// Without cancellation, this Receive would hang.
229	if _, err := s.Receive(ctx); err == nil {
230		t.Error("got nil, want cancellation error")
231	}
232}
233
234type blockingDriverSub struct {
235	driver.Subscription
236	inReceiveBatch chan struct{}
237}
238
239func (b blockingDriverSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) {
240	b.inReceiveBatch <- struct{}{}
241	<-ctx.Done()
242	return nil, ctx.Err()
243}
244func (blockingDriverSub) CanNack() bool          { return false }
245func (blockingDriverSub) IsRetryable(error) bool { return false }
246func (blockingDriverSub) Close() error           { return nil }
247
248func TestCancelTwoReceives(t *testing.T) {
249	// We want to create the following situation:
250	// 1. Goroutine 1 calls Receive, obtains the lock (Subscription.mu),
251	//    then releases the lock and calls driver.ReceiveBatch, which hangs.
252	// 2. Goroutine 2 calls Receive.
253	// 3. The context passed to the Goroutine 2 call is canceled.
254	// We expect Goroutine 2's Receive to exit immediately. That won't
255	// happen if Receive holds the lock during the call to ReceiveBatch.
256	inReceiveBatch := make(chan struct{})
257	s := NewSubscription(blockingDriverSub{inReceiveBatch: inReceiveBatch}, nil, nil)
258	defer s.Shutdown(context.Background())
259	go func() {
260		_, err := s.Receive(context.Background())
261		// This should happen at the very end of the test, during Shutdown.
262		if err != context.Canceled {
263			t.Errorf("got %v, want context.Canceled", err)
264		}
265	}()
266	<-inReceiveBatch
267	// Give the Receive call time to block on the mutex before timing out.
268	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
269	defer cancel()
270	errc := make(chan error)
271	go func() {
272		_, err := s.Receive(ctx)
273		errc <- err
274	}()
275	err := <-errc
276	if err != context.DeadlineExceeded {
277		t.Errorf("got %v, want context.DeadlineExceeded", err)
278	}
279}
280
281func TestRetryTopic(t *testing.T) {
282	// Test that Send is retried if the driver returns a retryable error.
283	ctx := context.Background()
284	ft := &failTopic{}
285	topic := NewTopic(ft, nil)
286	defer topic.Shutdown(ctx)
287	err := topic.Send(ctx, &Message{})
288	if err != nil {
289		t.Errorf("Send: got %v, want nil", err)
290	}
291	if got, want := ft.calls, nRetryCalls+1; got != want {
292		t.Errorf("calls: got %d, want %d", got, want)
293	}
294}
295
296var errRetry = errors.New("retry")
297
298func isRetryable(err error) bool {
299	return err == errRetry
300}
301
302const nRetryCalls = 2
303
304// failTopic helps test retries for SendBatch.
305//
306// SendBatch will fail nRetryCall times before succeeding.
307type failTopic struct {
308	driver.Topic
309	calls int
310}
311
312func (t *failTopic) SendBatch(ctx context.Context, ms []*driver.Message) error {
313	t.calls++
314	if t.calls <= nRetryCalls {
315		return errRetry
316	}
317	return nil
318}
319
320func (*failTopic) IsRetryable(err error) bool         { return isRetryable(err) }
321func (*failTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }
322func (*failTopic) Close() error                       { return nil }
323
324func TestRetryReceive(t *testing.T) {
325	ctx := context.Background()
326	fs := &failSub{fail: true}
327	sub := NewSubscription(fs, nil, nil)
328	defer sub.Shutdown(ctx)
329	m, err := sub.Receive(ctx)
330	if err != nil {
331		t.Fatalf("Receive: got %v, want nil", err)
332	}
333	m.Ack()
334	if got, want := fs.calls, nRetryCalls+1; got != want {
335		t.Errorf("calls: got %d, want %d", got, want)
336	}
337}
338
339// TestBatchSizeDecay verifies that the batch size decays when no messages are available.
340// (see https://github.com/google/go-cloud/issues/2849).
341func TestBatchSizeDecays(t *testing.T) {
342	ctx := context.Background()
343	fs := &failSub{}
344	// Allow multiple handlers and cap max batch size to ensure we get concurrency.
345	sub := NewSubscription(fs, &batcher.Options{MaxHandlers: 10, MaxBatchSize: 2}, nil)
346	defer sub.Shutdown(ctx)
347
348	// Records the last batch size.
349	var mu sync.Mutex
350	lastMaxMessages := 0
351	sub.preReceiveBatchHook = func(maxMessages int) {
352		mu.Lock()
353		defer mu.Unlock()
354		lastMaxMessages = maxMessages
355	}
356
357	// Do some receives to allow the number of batches to increase past 1.
358	for n := 0; n < 100; n++ {
359		m, err := sub.Receive(ctx)
360		if err != nil {
361			t.Fatalf("Receive: got %v, want nil", err)
362		}
363		m.Ack()
364	}
365
366	// Tell the failSub to start returning no messages.
367	fs.mu.Lock()
368	fs.empty = true
369	fs.mu.Unlock()
370
371	mu.Lock()
372	highWaterMarkBatchSize := lastMaxMessages
373	if lastMaxMessages <= 1 {
374		t.Fatal("max messages wasn't greater than 1")
375	}
376	mu.Unlock()
377
378	// Make a bunch of calls to Receive to drain any outstanding
379	// messages, and wait some extra time during which we should
380	// continue polling, and the batch size should decay.
381	for {
382		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
383		defer cancel()
384		m, err := sub.Receive(ctx)
385		if err != nil {
386			// Expected: no more messages, and timed out.
387			break
388		}
389		// Drained a message.
390		m.Ack()
391	}
392
393	// Verify that the batch size decayed.
394	mu.Lock()
395	if lastMaxMessages >= highWaterMarkBatchSize {
396		t.Fatalf("wanted batch size to decay; high water mark was %d, now %d", highWaterMarkBatchSize, lastMaxMessages)
397	}
398	mu.Unlock()
399}
400
401// TestRetryReceiveBatches verifies that batching and retries work without races
402// (see https://github.com/google/go-cloud/issues/2676).
403func TestRetryReceiveInBatchesDoesntRace(t *testing.T) {
404	ctx := context.Background()
405	fs := &failSub{}
406	// Allow multiple handlers and cap max batch size to ensure we get concurrency.
407	sub := NewSubscription(fs, &batcher.Options{MaxHandlers: 10, MaxBatchSize: 2}, nil)
408	defer sub.Shutdown(ctx)
409
410	// Do some receives to allow the number of batches to increase past 1.
411	for n := 0; n < 100; n++ {
412		m, err := sub.Receive(ctx)
413		if err != nil {
414			t.Fatalf("Receive: got %v, want nil", err)
415		}
416		m.Ack()
417	}
418	// Tell the failSub to start failing.
419	fs.mu.Lock()
420	fs.fail = true
421	fs.mu.Unlock()
422
423	// This call to Receive should result in nRetryCalls+1 calls to ReceiveBatch for
424	// each batch. In the issue noted above, this would cause a race.
425	for n := 0; n < 100; n++ {
426		m, err := sub.Receive(ctx)
427		if err != nil {
428			t.Fatalf("Receive: got %v, want nil", err)
429		}
430		m.Ack()
431	}
432	// Don't try to verify the exact number of calls, as it is unpredictable
433	// based on the timing of the batching.
434}
435
436// failSub helps test retries for ReceiveBatch.
437//
438// Once start=true, ReceiveBatch will fail nRetryCalls times before succeeding.
439type failSub struct {
440	driver.Subscription
441	fail  bool
442	empty bool
443	calls int
444	mu    sync.Mutex
445}
446
447func (t *failSub) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) {
448	t.mu.Lock()
449	defer t.mu.Unlock()
450	if t.fail {
451		t.calls++
452		if t.calls <= nRetryCalls {
453			return nil, errRetry
454		}
455	}
456	if t.empty {
457		t.calls++
458		return nil, nil
459	}
460	return []*driver.Message{{Body: []byte("")}}, nil
461}
462
463func (*failSub) SendAcks(ctx context.Context, ackIDs []driver.AckID) error { return nil }
464func (*failSub) IsRetryable(err error) bool                                { return isRetryable(err) }
465func (*failSub) CanNack() bool                                             { return false }
466func (*failSub) Close() error                                              { return nil }
467
468// TODO(jba): add a test for retry of SendAcks.
469
470var errDriver = errors.New("driver error")
471
472type erroringTopic struct {
473	driver.Topic
474}
475
476func (erroringTopic) SendBatch(context.Context, []*driver.Message) error { return errDriver }
477func (erroringTopic) IsRetryable(err error) bool                         { return isRetryable(err) }
478func (erroringTopic) ErrorCode(error) gcerrors.ErrorCode                 { return gcerrors.AlreadyExists }
479func (erroringTopic) Close() error                                       { return errDriver }
480
481type erroringSubscription struct {
482	driver.Subscription
483}
484
485func (erroringSubscription) ReceiveBatch(context.Context, int) ([]*driver.Message, error) {
486	return nil, errDriver
487}
488
489func (erroringSubscription) SendAcks(context.Context, []driver.AckID) error { return errDriver }
490func (erroringSubscription) IsRetryable(err error) bool                     { return isRetryable(err) }
491func (erroringSubscription) ErrorCode(error) gcerrors.ErrorCode             { return gcerrors.AlreadyExists }
492func (erroringSubscription) CanNack() bool                                  { return false }
493func (erroringSubscription) Close() error                                   { return errDriver }
494
495// TestErrorsAreWrapped tests that all errors returned from the driver are
496// wrapped exactly once by the portable type.
497func TestErrorsAreWrapped(t *testing.T) {
498	ctx := context.Background()
499
500	verify := func(err error) {
501		t.Helper()
502		if err == nil {
503			t.Errorf("got nil error, wanted non-nil")
504			return
505		}
506		if e, ok := err.(*gcerr.Error); !ok {
507			t.Errorf("not wrapped: %v", err)
508		} else if got := e.Unwrap(); got != errDriver {
509			t.Errorf("got %v for wrapped error, not errDriver", got)
510		}
511		if s := err.Error(); !strings.HasPrefix(s, "pubsub ") {
512			t.Errorf("Error() for wrapped error doesn't start with 'pubsub': prefix: %s", s)
513		}
514	}
515
516	topic := NewTopic(erroringTopic{}, nil)
517	verify(topic.Send(ctx, &Message{}))
518	err := topic.Shutdown(ctx)
519	verify(err)
520
521	sub := NewSubscription(erroringSubscription{}, nil, nil)
522	_, err = sub.Receive(ctx)
523	verify(err)
524	err = sub.Shutdown(ctx)
525	verify(err)
526}
527
528func TestOpenCensus(t *testing.T) {
529	ctx := context.Background()
530	te := octest.NewTestExporter(OpenCensusViews)
531	defer te.Unregister()
532
533	ds := NewDriverSub()
534	dt := &driverTopic{
535		subs: []*driverSub{ds},
536	}
537	topic := NewTopic(dt, nil)
538	defer topic.Shutdown(ctx)
539	sub := NewSubscription(ds, nil, nil)
540	defer sub.Shutdown(ctx)
541	if err := topic.Send(ctx, &Message{Body: []byte("x")}); err != nil {
542		t.Fatal(err)
543	}
544	if err := topic.Shutdown(ctx); err != nil {
545		t.Fatal(err)
546	}
547	msg, err := sub.Receive(ctx)
548	if err != nil {
549		t.Fatal(err)
550	}
551	msg.Ack()
552	if err := sub.Shutdown(ctx); err != nil {
553		t.Fatal(err)
554	}
555	_, _ = sub.Receive(ctx)
556
557	diff := octest.Diff(te.Spans(), te.Counts(), "gocloud.dev/pubsub", "gocloud.dev/pubsub", []octest.Call{
558		{Method: "driver.Topic.SendBatch", Code: gcerrors.OK},
559		{Method: "Topic.Send", Code: gcerrors.OK},
560		{Method: "Topic.Shutdown", Code: gcerrors.OK},
561		{Method: "driver.Subscription.ReceiveBatch", Code: gcerrors.OK},
562		{Method: "Subscription.Receive", Code: gcerrors.OK},
563		{Method: "driver.Subscription.SendAcks", Code: gcerrors.OK},
564		{Method: "Subscription.Shutdown", Code: gcerrors.OK},
565		{Method: "Subscription.Receive", Code: gcerrors.FailedPrecondition},
566	})
567	if diff != "" {
568		t.Error(diff)
569	}
570}
571
572var (
573	testOpenOnce sync.Once
574	testOpenGot  *url.URL
575)
576
577func TestURLMux(t *testing.T) {
578	ctx := context.Background()
579
580	mux := new(URLMux)
581	fake := &fakeOpener{}
582	mux.RegisterTopic("foo", fake)
583	mux.RegisterTopic("err", fake)
584	mux.RegisterSubscription("foo", fake)
585	mux.RegisterSubscription("err", fake)
586
587	if diff := cmp.Diff(mux.TopicSchemes(), []string{"err", "foo"}); diff != "" {
588		t.Errorf("Schemes: %s", diff)
589	}
590	if !mux.ValidTopicScheme("foo") || !mux.ValidTopicScheme("err") {
591		t.Errorf("ValidTopicScheme didn't return true for valid scheme")
592	}
593	if mux.ValidTopicScheme("foo2") || mux.ValidTopicScheme("http") {
594		t.Errorf("ValidTopicScheme didn't return false for invalid scheme")
595	}
596
597	if diff := cmp.Diff(mux.SubscriptionSchemes(), []string{"err", "foo"}); diff != "" {
598		t.Errorf("Schemes: %s", diff)
599	}
600	if !mux.ValidSubscriptionScheme("foo") || !mux.ValidSubscriptionScheme("err") {
601		t.Errorf("ValidSubscriptionScheme didn't return true for valid scheme")
602	}
603	if mux.ValidSubscriptionScheme("foo2") || mux.ValidSubscriptionScheme("http") {
604		t.Errorf("ValidSubscriptionScheme didn't return false for invalid scheme")
605	}
606
607	for _, tc := range []struct {
608		name    string
609		url     string
610		wantErr bool
611	}{
612		{
613			name:    "empty URL",
614			wantErr: true,
615		},
616		{
617			name:    "invalid URL",
618			url:     ":foo",
619			wantErr: true,
620		},
621		{
622			name:    "invalid URL no scheme",
623			url:     "foo",
624			wantErr: true,
625		},
626		{
627			name:    "unregistered scheme",
628			url:     "bar://myps",
629			wantErr: true,
630		},
631		{
632			name:    "func returns error",
633			url:     "err://myps",
634			wantErr: true,
635		},
636		{
637			name: "no query options",
638			url:  "foo://myps",
639		},
640		{
641			name: "empty query options",
642			url:  "foo://myps?",
643		},
644		{
645			name: "query options",
646			url:  "foo://myps?aAa=bBb&cCc=dDd",
647		},
648		{
649			name: "multiple query options",
650			url:  "foo://myps?x=a&x=b&x=c",
651		},
652		{
653			name: "fancy ps name",
654			url:  "foo:///foo/bar/baz",
655		},
656		{
657			name: "using api schema prefix",
658			url:  "pubsub+foo://foo",
659		},
660	} {
661		t.Run("topic: "+tc.name, func(t *testing.T) {
662			_, gotErr := mux.OpenTopic(ctx, tc.url)
663			if (gotErr != nil) != tc.wantErr {
664				t.Fatalf("got err %v, want error %v", gotErr, tc.wantErr)
665			}
666			if gotErr != nil {
667				return
668			}
669			if got := fake.u.String(); got != tc.url {
670				t.Errorf("got %q want %q", got, tc.url)
671			}
672			// Repeat with OpenTopicURL.
673			parsed, err := url.Parse(tc.url)
674			if err != nil {
675				t.Fatal(err)
676			}
677			_, gotErr = mux.OpenTopicURL(ctx, parsed)
678			if gotErr != nil {
679				t.Fatalf("got err %v, want nil", gotErr)
680			}
681			if got := fake.u.String(); got != tc.url {
682				t.Errorf("got %q want %q", got, tc.url)
683			}
684		})
685		t.Run("subscription: "+tc.name, func(t *testing.T) {
686			_, gotErr := mux.OpenSubscription(ctx, tc.url)
687			if (gotErr != nil) != tc.wantErr {
688				t.Fatalf("got err %v, want error %v", gotErr, tc.wantErr)
689			}
690			if gotErr != nil {
691				return
692			}
693			if got := fake.u.String(); got != tc.url {
694				t.Errorf("got %q want %q", got, tc.url)
695			}
696			// Repeat with OpenSubscriptionURL.
697			parsed, err := url.Parse(tc.url)
698			if err != nil {
699				t.Fatal(err)
700			}
701			_, gotErr = mux.OpenSubscriptionURL(ctx, parsed)
702			if gotErr != nil {
703				t.Fatalf("got err %v, want nil", gotErr)
704			}
705			if got := fake.u.String(); got != tc.url {
706				t.Errorf("got %q want %q", got, tc.url)
707			}
708		})
709	}
710}
711
712type fakeOpener struct {
713	u *url.URL // last url passed to OpenTopicURL/OpenSubscriptionURL
714}
715
716func (o *fakeOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error) {
717	if u.Scheme == "err" {
718		return nil, errors.New("fail")
719	}
720	o.u = u
721	return nil, nil
722}
723
724func (o *fakeOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error) {
725	if u.Scheme == "err" {
726		return nil, errors.New("fail")
727	}
728	o.u = u
729	return nil, nil
730}
731