1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package pscompat
15
16import (
17	"context"
18	"errors"
19	"testing"
20
21	"cloud.google.com/go/pubsub"
22	"cloud.google.com/go/pubsublite/internal/test"
23	"cloud.google.com/go/pubsublite/internal/wire"
24
25	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
26)
27
28// mockWirePublisher is a mock implementation of the wire.Publisher interface.
29// It uses test.RPCVerifier to install fake PublishResults for each Publish
30// call.
31type mockWirePublisher struct {
32	Verifier *test.RPCVerifier
33	Stopped  bool
34	err      error
35}
36
37func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) {
38	resp, err := mp.Verifier.Pop(msg)
39	if err != nil {
40		mp.err = err
41		onResult(nil, err)
42		return
43	}
44	result := resp.(*wire.MessageMetadata)
45	onResult(result, nil)
46}
47
48func (mp *mockWirePublisher) Start()             {}
49func (mp *mockWirePublisher) Stop()              { mp.Stopped = true }
50func (mp *mockWirePublisher) WaitStarted() error { return mp.err }
51func (mp *mockWirePublisher) WaitStopped() error { return mp.err }
52func (mp *mockWirePublisher) Error() error       { return mp.err }
53
54func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient {
55	return &PublisherClient{
56		settings: settings,
57		wirePub:  &mockWirePublisher{Verifier: verifier},
58	}
59}
60
61func TestPublisherClientTransformMessage(t *testing.T) {
62	ctx := context.Background()
63	input := &pubsub.Message{
64		Data:        []byte("data"),
65		OrderingKey: "ordering_key",
66		Attributes:  map[string]string{"attr": "value"},
67	}
68	fakeResponse := &wire.MessageMetadata{
69		Partition: 2,
70		Offset:    42,
71	}
72	wantResultID := "2:42"
73
74	for _, tc := range []struct {
75		desc string
76		// mutateSettings is passed a copy of DefaultPublishSettings to mutate.
77		mutateSettings func(settings *PublishSettings)
78		wantMsg        *pb.PubSubMessage
79	}{
80		{
81			desc:           "default settings",
82			mutateSettings: func(settings *PublishSettings) {},
83			wantMsg: &pb.PubSubMessage{
84				Data: []byte("data"),
85				Key:  []byte("ordering_key"),
86				Attributes: map[string]*pb.AttributeValues{
87					"attr": {Values: [][]byte{[]byte("value")}},
88				},
89			},
90		},
91		{
92			desc: "custom key extractor",
93			mutateSettings: func(settings *PublishSettings) {
94				settings.KeyExtractor = func(msg *pubsub.Message) []byte {
95					return msg.Data
96				}
97			},
98			wantMsg: &pb.PubSubMessage{
99				Data: []byte("data"),
100				Key:  []byte("data"),
101				Attributes: map[string]*pb.AttributeValues{
102					"attr": {Values: [][]byte{[]byte("value")}},
103				},
104			},
105		},
106		{
107			desc: "custom message transformer",
108			mutateSettings: func(settings *PublishSettings) {
109				settings.KeyExtractor = func(msg *pubsub.Message) []byte {
110					return msg.Data
111				}
112				settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error {
113					// Swaps data and key.
114					to.Data = []byte(from.OrderingKey)
115					to.Key = from.Data
116					return nil
117				}
118			},
119			wantMsg: &pb.PubSubMessage{
120				Data: []byte("ordering_key"),
121				Key:  []byte("data"),
122			},
123		},
124	} {
125		t.Run(tc.desc, func(t *testing.T) {
126			settings := DefaultPublishSettings
127			tc.mutateSettings(&settings)
128
129			verifier := test.NewRPCVerifier(t)
130			verifier.Push(tc.wantMsg, fakeResponse, nil)
131			defer verifier.Flush()
132
133			pubClient := newTestPublisherClient(verifier, settings)
134			result := pubClient.Publish(ctx, input)
135
136			gotID, err := result.Get(ctx)
137			if err != nil {
138				t.Errorf("Publish() got err: %v", err)
139			}
140			if gotID != wantResultID {
141				t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID)
142			}
143		})
144	}
145}
146
147func TestPublisherClientTransformMessageError(t *testing.T) {
148	wantErr := errors.New("message could not be converted")
149
150	settings := DefaultPublishSettings
151	settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error {
152		return wantErr
153	}
154
155	// No publish calls expected.
156	verifier := test.NewRPCVerifier(t)
157	defer verifier.Flush()
158
159	ctx := context.Background()
160	input := &pubsub.Message{
161		Data: []byte("data"),
162	}
163	pubClient := newTestPublisherClient(verifier, settings)
164	result := pubClient.Publish(ctx, input)
165
166	_, gotErr := result.Get(ctx)
167	if !test.ErrorEqual(gotErr, wantErr) {
168		t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr)
169	}
170	if !test.ErrorEqual(pubClient.Error(), wantErr) {
171		t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr)
172	}
173	if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want {
174		t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
175	}
176}
177