1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "context" 18 "errors" 19 "sort" 20 "testing" 21 "time" 22 23 "cloud.google.com/go/internal/testutil" 24 "cloud.google.com/go/pubsublite/internal/test" 25 "github.com/google/uuid" 26 "google.golang.org/grpc/codes" 27 "google.golang.org/grpc/status" 28 29 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 30) 31 32func TestPartitionSet(t *testing.T) { 33 partitions := newPartitionSet(&pb.PartitionAssignment{ 34 Partitions: []int64{8, 5, 8, 1}, 35 }) 36 37 wantPartitions := []int{1, 5, 8} 38 for _, partition := range wantPartitions { 39 if !partitions.Contains(partition) { 40 t.Errorf("Contains(%d) got false, want true", partition) 41 } 42 } 43 for _, partition := range []int{2, 3, 4, 6, 7} { 44 if partitions.Contains(partition) { 45 t.Errorf("Contains(%d) got true, want false", partition) 46 } 47 } 48 49 gotPartitions := partitions.Ints() 50 sort.Ints(gotPartitions) 51 if !testutil.Equal(gotPartitions, wantPartitions) { 52 t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions) 53 } 54} 55 56var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'} 57 58func fakeGenerateUUID() (uuid.UUID, error) { 59 return fakeUUID, nil 60} 61 62// testAssigner wraps an assigner for ease of testing. 63type testAssigner struct { 64 // Fake error to simulate receiver unable to handle assignment. 65 recvError error 66 t *testing.T 67 asn *assigner 68 partitions chan []int 69 70 serviceTestProxy 71} 72 73func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssigner { 74 ctx := context.Background() 75 assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn()) 76 if err != nil { 77 t.Fatal(err) 78 } 79 80 ta := &testAssigner{ 81 recvError: recvErr, 82 t: t, 83 partitions: make(chan []int, 1), 84 } 85 asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment) 86 if err != nil { 87 t.Fatal(err) 88 } 89 ta.asn = asn 90 ta.initAndStart(t, ta.asn, "Assigner", assignmentClient) 91 return ta 92} 93 94func (ta *testAssigner) receiveAssignment(partitions partitionSet) error { 95 p := partitions.Ints() 96 sort.Ints(p) 97 ta.partitions <- p 98 99 if ta.recvError != nil { 100 return ta.recvError 101 } 102 return nil 103} 104 105func (ta *testAssigner) NextPartitions() []int { 106 select { 107 case <-time.After(serviceTestWaitTimeout): 108 ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout) 109 return nil 110 case p := <-ta.partitions: 111 return p 112 } 113} 114 115func TestAssignerNoInitialResponse(t *testing.T) { 116 subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" 117 118 verifiers := test.NewVerifiers(t) 119 stream := test.NewRPCVerifier(t) 120 barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil) 121 verifiers.AddAssignmentStream(subscription, stream) 122 123 mockServer.OnTestStart(verifiers) 124 defer mockServer.OnTestEnd() 125 126 asn := newTestAssigner(t, subscription, nil) 127 128 // Assigner starts even though no initial response was received from the 129 // server. 130 if gotErr := asn.StartError(); gotErr != nil { 131 t.Errorf("Start() got err: (%v)", gotErr) 132 } 133 // To ensure test is deterministic, i.e. server must receive initial request 134 // before stopping the client. 135 barrier.Release() 136 asn.StopVerifyNoError() 137} 138 139func TestAssignerReconnect(t *testing.T) { 140 subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" 141 permanentErr := status.Error(codes.FailedPrecondition, "failed") 142 143 verifiers := test.NewVerifiers(t) 144 145 // Simulate a transient error that results in a reconnect. 146 stream1 := test.NewRPCVerifier(t) 147 stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable")) 148 verifiers.AddAssignmentStream(subscription, stream1) 149 150 // Send 2 partition assignments before terminating with permanent error. 151 stream2 := test.NewRPCVerifier(t) 152 stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil) 153 stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil) 154 stream2.Push(assignmentAckReq(), nil, permanentErr) 155 verifiers.AddAssignmentStream(subscription, stream2) 156 157 mockServer.OnTestStart(verifiers) 158 defer mockServer.OnTestEnd() 159 160 asn := newTestAssigner(t, subscription, nil) 161 162 if gotErr := asn.StartError(); gotErr != nil { 163 t.Errorf("Start() got err: (%v)", gotErr) 164 } 165 if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) { 166 t.Errorf("Partition assignment #1: got %v, want %v", got, want) 167 } 168 if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) { 169 t.Errorf("Partition assignment #2: got %v, want %v", got, want) 170 } 171 if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) { 172 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 173 } 174} 175 176func TestAssignerHandlePartitionFailure(t *testing.T) { 177 subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs" 178 179 verifiers := test.NewVerifiers(t) 180 stream := test.NewRPCVerifier(t) 181 stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil) 182 verifiers.AddAssignmentStream(subscription, stream) 183 184 mockServer.OnTestStart(verifiers) 185 defer mockServer.OnTestEnd() 186 187 // Simulates the assigningSubscriber discarding assignments. 188 wantErr := errors.New("subscriber shutting down") 189 asn := newTestAssigner(t, subscription, wantErr) 190 191 if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) { 192 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 193 } 194 if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) { 195 t.Errorf("Partition assignments: got %v, want %v", got, want) 196 } 197} 198