1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package pscompat 15 16import ( 17 "context" 18 "errors" 19 "testing" 20 21 "cloud.google.com/go/pubsub" 22 "cloud.google.com/go/pubsublite/internal/test" 23 "cloud.google.com/go/pubsublite/internal/wire" 24 25 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 26) 27 28// mockWirePublisher is a mock implementation of the wire.Publisher interface. 29// It uses test.RPCVerifier to install fake PublishResults for each Publish 30// call. 31type mockWirePublisher struct { 32 Verifier *test.RPCVerifier 33 Stopped bool 34 err error 35} 36 37func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) { 38 resp, err := mp.Verifier.Pop(msg) 39 if err != nil { 40 mp.err = err 41 onResult(nil, err) 42 return 43 } 44 result := resp.(*wire.MessageMetadata) 45 onResult(result, nil) 46} 47 48func (mp *mockWirePublisher) Start() {} 49func (mp *mockWirePublisher) Stop() { mp.Stopped = true } 50func (mp *mockWirePublisher) WaitStarted() error { return mp.err } 51func (mp *mockWirePublisher) WaitStopped() error { return mp.err } 52func (mp *mockWirePublisher) Error() error { return mp.err } 53 54func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient { 55 return &PublisherClient{ 56 settings: settings, 57 wirePub: &mockWirePublisher{Verifier: verifier}, 58 } 59} 60 61func TestPublisherClientTransformMessage(t *testing.T) { 62 ctx := context.Background() 63 input := &pubsub.Message{ 64 Data: []byte("data"), 65 OrderingKey: "ordering_key", 66 Attributes: map[string]string{"attr": "value"}, 67 } 68 fakeResponse := &wire.MessageMetadata{ 69 Partition: 2, 70 Offset: 42, 71 } 72 wantResultID := "2:42" 73 74 for _, tc := range []struct { 75 desc string 76 // mutateSettings is passed a copy of DefaultPublishSettings to mutate. 77 mutateSettings func(settings *PublishSettings) 78 wantMsg *pb.PubSubMessage 79 }{ 80 { 81 desc: "default settings", 82 mutateSettings: func(settings *PublishSettings) {}, 83 wantMsg: &pb.PubSubMessage{ 84 Data: []byte("data"), 85 Key: []byte("ordering_key"), 86 Attributes: map[string]*pb.AttributeValues{ 87 "attr": {Values: [][]byte{[]byte("value")}}, 88 }, 89 }, 90 }, 91 { 92 desc: "custom key extractor", 93 mutateSettings: func(settings *PublishSettings) { 94 settings.KeyExtractor = func(msg *pubsub.Message) []byte { 95 return msg.Data 96 } 97 }, 98 wantMsg: &pb.PubSubMessage{ 99 Data: []byte("data"), 100 Key: []byte("data"), 101 Attributes: map[string]*pb.AttributeValues{ 102 "attr": {Values: [][]byte{[]byte("value")}}, 103 }, 104 }, 105 }, 106 { 107 desc: "custom message transformer", 108 mutateSettings: func(settings *PublishSettings) { 109 settings.KeyExtractor = func(msg *pubsub.Message) []byte { 110 return msg.Data 111 } 112 settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error { 113 // Swaps data and key. 114 to.Data = []byte(from.OrderingKey) 115 to.Key = from.Data 116 return nil 117 } 118 }, 119 wantMsg: &pb.PubSubMessage{ 120 Data: []byte("ordering_key"), 121 Key: []byte("data"), 122 }, 123 }, 124 } { 125 t.Run(tc.desc, func(t *testing.T) { 126 settings := DefaultPublishSettings 127 tc.mutateSettings(&settings) 128 129 verifier := test.NewRPCVerifier(t) 130 verifier.Push(tc.wantMsg, fakeResponse, nil) 131 defer verifier.Flush() 132 133 pubClient := newTestPublisherClient(verifier, settings) 134 result := pubClient.Publish(ctx, input) 135 136 gotID, err := result.Get(ctx) 137 if err != nil { 138 t.Errorf("Publish() got err: %v", err) 139 } 140 if gotID != wantResultID { 141 t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID) 142 } 143 }) 144 } 145} 146 147func TestPublisherClientTransformMessageError(t *testing.T) { 148 wantErr := errors.New("message could not be converted") 149 150 settings := DefaultPublishSettings 151 settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error { 152 return wantErr 153 } 154 155 // No publish calls expected. 156 verifier := test.NewRPCVerifier(t) 157 defer verifier.Flush() 158 159 ctx := context.Background() 160 input := &pubsub.Message{ 161 Data: []byte("data"), 162 } 163 pubClient := newTestPublisherClient(verifier, settings) 164 result := pubClient.Publish(ctx, input) 165 166 _, gotErr := result.Get(ctx) 167 if !test.ErrorEqual(gotErr, wantErr) { 168 t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr) 169 } 170 if !test.ErrorEqual(pubClient.Error(), wantErr) { 171 t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr) 172 } 173 if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want { 174 t.Errorf("Publisher.Stopped: got %v, want %v", got, want) 175 } 176} 177