1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"context"
18	"errors"
19	"sort"
20	"testing"
21	"time"
22
23	"cloud.google.com/go/internal/testutil"
24	"cloud.google.com/go/pubsublite/internal/test"
25	"github.com/google/uuid"
26	"google.golang.org/grpc/codes"
27	"google.golang.org/grpc/status"
28
29	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
30)
31
32func TestPartitionSet(t *testing.T) {
33	partitions := newPartitionSet(&pb.PartitionAssignment{
34		Partitions: []int64{8, 5, 8, 1},
35	})
36
37	wantPartitions := []int{1, 5, 8}
38	for _, partition := range wantPartitions {
39		if !partitions.Contains(partition) {
40			t.Errorf("Contains(%d) got false, want true", partition)
41		}
42	}
43	for _, partition := range []int{2, 3, 4, 6, 7} {
44		if partitions.Contains(partition) {
45			t.Errorf("Contains(%d) got true, want false", partition)
46		}
47	}
48
49	gotPartitions := partitions.Ints()
50	sort.Ints(gotPartitions)
51	if !testutil.Equal(gotPartitions, wantPartitions) {
52		t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
53	}
54}
55
56var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'}
57
58func fakeGenerateUUID() (uuid.UUID, error) {
59	return fakeUUID, nil
60}
61
62// testAssigner wraps an assigner for ease of testing.
63type testAssigner struct {
64	// Fake error to simulate receiver unable to handle assignment.
65	recvError  error
66	t          *testing.T
67	asn        *assigner
68	partitions chan []int
69
70	serviceTestProxy
71}
72
73func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssigner {
74	ctx := context.Background()
75	assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
76	if err != nil {
77		t.Fatal(err)
78	}
79
80	ta := &testAssigner{
81		recvError:  recvErr,
82		t:          t,
83		partitions: make(chan []int, 1),
84	}
85	asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment)
86	if err != nil {
87		t.Fatal(err)
88	}
89	ta.asn = asn
90	ta.initAndStart(t, ta.asn, "Assigner", assignmentClient)
91	return ta
92}
93
94func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
95	p := partitions.Ints()
96	sort.Ints(p)
97	ta.partitions <- p
98
99	if ta.recvError != nil {
100		return ta.recvError
101	}
102	return nil
103}
104
105func (ta *testAssigner) NextPartitions() []int {
106	select {
107	case <-time.After(serviceTestWaitTimeout):
108		ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout)
109		return nil
110	case p := <-ta.partitions:
111		return p
112	}
113}
114
115func TestAssignerNoInitialResponse(t *testing.T) {
116	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
117
118	verifiers := test.NewVerifiers(t)
119	stream := test.NewRPCVerifier(t)
120	barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil)
121	verifiers.AddAssignmentStream(subscription, stream)
122
123	mockServer.OnTestStart(verifiers)
124	defer mockServer.OnTestEnd()
125
126	asn := newTestAssigner(t, subscription, nil)
127
128	// Assigner starts even though no initial response was received from the
129	// server.
130	if gotErr := asn.StartError(); gotErr != nil {
131		t.Errorf("Start() got err: (%v)", gotErr)
132	}
133	// To ensure test is deterministic, i.e. server must receive initial request
134	// before stopping the client.
135	barrier.Release()
136	asn.StopVerifyNoError()
137}
138
139func TestAssignerReconnect(t *testing.T) {
140	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
141	permanentErr := status.Error(codes.FailedPrecondition, "failed")
142
143	verifiers := test.NewVerifiers(t)
144
145	// Simulate a transient error that results in a reconnect.
146	stream1 := test.NewRPCVerifier(t)
147	stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable"))
148	verifiers.AddAssignmentStream(subscription, stream1)
149
150	// Send 2 partition assignments before terminating with permanent error.
151	stream2 := test.NewRPCVerifier(t)
152	stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil)
153	stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil)
154	stream2.Push(assignmentAckReq(), nil, permanentErr)
155	verifiers.AddAssignmentStream(subscription, stream2)
156
157	mockServer.OnTestStart(verifiers)
158	defer mockServer.OnTestEnd()
159
160	asn := newTestAssigner(t, subscription, nil)
161
162	if gotErr := asn.StartError(); gotErr != nil {
163		t.Errorf("Start() got err: (%v)", gotErr)
164	}
165	if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) {
166		t.Errorf("Partition assignment #1: got %v, want %v", got, want)
167	}
168	if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) {
169		t.Errorf("Partition assignment #2: got %v, want %v", got, want)
170	}
171	if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) {
172		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
173	}
174}
175
176func TestAssignerHandlePartitionFailure(t *testing.T) {
177	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
178
179	verifiers := test.NewVerifiers(t)
180	stream := test.NewRPCVerifier(t)
181	stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
182	verifiers.AddAssignmentStream(subscription, stream)
183
184	mockServer.OnTestStart(verifiers)
185	defer mockServer.OnTestEnd()
186
187	// Simulates the assigningSubscriber discarding assignments.
188	wantErr := errors.New("subscriber shutting down")
189	asn := newTestAssigner(t, subscription, wantErr)
190
191	if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
192		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
193	}
194	if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
195		t.Errorf("Partition assignments: got %v, want %v", got, want)
196	}
197}
198