1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Code generated by gapic-generator. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"time"
24
25	"github.com/golang/protobuf/proto"
26	gax "github.com/googleapis/gax-go/v2"
27	"google.golang.org/api/iterator"
28	"google.golang.org/api/option"
29	"google.golang.org/api/transport"
30	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36// PublisherCallOptions contains the retry settings for each method of PublisherClient.
37type PublisherCallOptions struct {
38	CreateTopic            []gax.CallOption
39	UpdateTopic            []gax.CallOption
40	Publish                []gax.CallOption
41	GetTopic               []gax.CallOption
42	ListTopics             []gax.CallOption
43	ListTopicSubscriptions []gax.CallOption
44	DeleteTopic            []gax.CallOption
45}
46
47func defaultPublisherClientOptions() []option.ClientOption {
48	return []option.ClientOption{
49		option.WithEndpoint("pubsub.googleapis.com:443"),
50		option.WithScopes(DefaultAuthScopes()...),
51	}
52}
53
54func defaultPublisherCallOptions() *PublisherCallOptions {
55	retry := map[[2]string][]gax.CallOption{
56		{"default", "idempotent"}: {
57			gax.WithRetry(func() gax.Retryer {
58				return gax.OnCodes([]codes.Code{
59					codes.Aborted,
60					codes.Unavailable,
61					codes.Unknown,
62				}, gax.Backoff{
63					Initial:    100 * time.Millisecond,
64					Max:        60000 * time.Millisecond,
65					Multiplier: 1.3,
66				})
67			}),
68		},
69		{"default", "non_idempotent"}: {
70			gax.WithRetry(func() gax.Retryer {
71				return gax.OnCodes([]codes.Code{
72					codes.Unavailable,
73				}, gax.Backoff{
74					Initial:    100 * time.Millisecond,
75					Max:        60000 * time.Millisecond,
76					Multiplier: 1.3,
77				})
78			}),
79		},
80		{"messaging", "publish"}: {
81			gax.WithRetry(func() gax.Retryer {
82				return gax.OnCodes([]codes.Code{
83					codes.Aborted,
84					codes.Canceled,
85					codes.DeadlineExceeded,
86					codes.Internal,
87					codes.ResourceExhausted,
88					codes.Unavailable,
89					codes.Unknown,
90				}, gax.Backoff{
91					Initial:    100 * time.Millisecond,
92					Max:        60000 * time.Millisecond,
93					Multiplier: 1.3,
94				})
95			}),
96		},
97	}
98	return &PublisherCallOptions{
99		CreateTopic:            retry[[2]string{"default", "non_idempotent"}],
100		UpdateTopic:            retry[[2]string{"default", "non_idempotent"}],
101		Publish:                retry[[2]string{"messaging", "publish"}],
102		GetTopic:               retry[[2]string{"default", "idempotent"}],
103		ListTopics:             retry[[2]string{"default", "idempotent"}],
104		ListTopicSubscriptions: retry[[2]string{"default", "idempotent"}],
105		DeleteTopic:            retry[[2]string{"default", "non_idempotent"}],
106	}
107}
108
109// PublisherClient is a client for interacting with Google Cloud Pub/Sub API.
110//
111// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
112type PublisherClient struct {
113	// The connection to the service.
114	conn *grpc.ClientConn
115
116	// The gRPC API client.
117	publisherClient pubsubpb.PublisherClient
118
119	// The call options for this service.
120	CallOptions *PublisherCallOptions
121
122	// The x-goog-* metadata to be sent with each request.
123	xGoogMetadata metadata.MD
124}
125
126// NewPublisherClient creates a new publisher client.
127//
128// The service that an application uses to manipulate topics, and to send
129// messages to a topic.
130func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
131	conn, err := transport.DialGRPC(ctx, append(defaultPublisherClientOptions(), opts...)...)
132	if err != nil {
133		return nil, err
134	}
135	c := &PublisherClient{
136		conn:        conn,
137		CallOptions: defaultPublisherCallOptions(),
138
139		publisherClient: pubsubpb.NewPublisherClient(conn),
140	}
141	c.SetGoogleClientInfo()
142	return c, nil
143}
144
145// Connection returns the client's connection to the API service.
146func (c *PublisherClient) Connection() *grpc.ClientConn {
147	return c.conn
148}
149
150// Close closes the connection to the API service. The user should invoke this when
151// the client is no longer required.
152func (c *PublisherClient) Close() error {
153	return c.conn.Close()
154}
155
156// SetGoogleClientInfo sets the name and version of the application in
157// the `x-goog-api-client` header passed on each request. Intended for
158// use by Google-written clients.
159func (c *PublisherClient) SetGoogleClientInfo(keyval ...string) {
160	kv := append([]string{"gl-go", versionGo()}, keyval...)
161	kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version)
162	c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
163}
164
165// CreateTopic creates the given topic with the given name. See the
166// <a href="https://cloud.google.com/pubsub/docs/admin#resource_names">
167// resource name rules</a>.
168func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
169	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", req.GetName()))
170	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
171	opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...)
172	var resp *pubsubpb.Topic
173	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
174		var err error
175		resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
176		return err
177	}, opts...)
178	if err != nil {
179		return nil, err
180	}
181	return resp, nil
182}
183
184// UpdateTopic updates an existing topic. Note that certain properties of a
185// topic are not modifiable.
186func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
187	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", req.GetTopic().GetName()))
188	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
189	opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...)
190	var resp *pubsubpb.Topic
191	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
192		var err error
193		resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
194		return err
195	}, opts...)
196	if err != nil {
197		return nil, err
198	}
199	return resp, nil
200}
201
202// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
203// does not exist.
204func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
205	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic()))
206	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
207	opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...)
208	var resp *pubsubpb.PublishResponse
209	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
210		var err error
211		resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
212		return err
213	}, opts...)
214	if err != nil {
215		return nil, err
216	}
217	return resp, nil
218}
219
220// GetTopic gets the configuration of a topic.
221func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
222	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic()))
223	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
224	opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...)
225	var resp *pubsubpb.Topic
226	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
227		var err error
228		resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
229		return err
230	}, opts...)
231	if err != nil {
232		return nil, err
233	}
234	return resp, nil
235}
236
237// ListTopics lists matching topics.
238func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
239	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", req.GetProject()))
240	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
241	opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...)
242	it := &TopicIterator{}
243	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
244	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
245		var resp *pubsubpb.ListTopicsResponse
246		req.PageToken = pageToken
247		if pageSize > math.MaxInt32 {
248			req.PageSize = math.MaxInt32
249		} else {
250			req.PageSize = int32(pageSize)
251		}
252		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
253			var err error
254			resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
255			return err
256		}, opts...)
257		if err != nil {
258			return nil, "", err
259		}
260		return resp.Topics, resp.NextPageToken, nil
261	}
262	fetch := func(pageSize int, pageToken string) (string, error) {
263		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
264		if err != nil {
265			return "", err
266		}
267		it.items = append(it.items, items...)
268		return nextPageToken, nil
269	}
270	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
271	it.pageInfo.MaxSize = int(req.PageSize)
272	it.pageInfo.Token = req.PageToken
273	return it
274}
275
276// ListTopicSubscriptions lists the names of the subscriptions on this topic.
277func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
278	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic()))
279	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
280	opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...)
281	it := &StringIterator{}
282	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
283	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
284		var resp *pubsubpb.ListTopicSubscriptionsResponse
285		req.PageToken = pageToken
286		if pageSize > math.MaxInt32 {
287			req.PageSize = math.MaxInt32
288		} else {
289			req.PageSize = int32(pageSize)
290		}
291		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
292			var err error
293			resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
294			return err
295		}, opts...)
296		if err != nil {
297			return nil, "", err
298		}
299		return resp.Subscriptions, resp.NextPageToken, nil
300	}
301	fetch := func(pageSize int, pageToken string) (string, error) {
302		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
303		if err != nil {
304			return "", err
305		}
306		it.items = append(it.items, items...)
307		return nextPageToken, nil
308	}
309	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
310	it.pageInfo.MaxSize = int(req.PageSize)
311	it.pageInfo.Token = req.PageToken
312	return it
313}
314
315// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
316// does not exist. After a topic is deleted, a new topic may be created with
317// the same name; this is an entirely new topic with none of the old
318// configuration or subscriptions. Existing subscriptions to this topic are
319// not deleted, but their topic field is set to _deleted-topic_.
320func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
321	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic()))
322	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
323	opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...)
324	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
325		var err error
326		_, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
327		return err
328	}, opts...)
329	return err
330}
331
332// StringIterator manages a stream of string.
333type StringIterator struct {
334	items    []string
335	pageInfo *iterator.PageInfo
336	nextFunc func() error
337
338	// InternalFetch is for use by the Google Cloud Libraries only.
339	// It is not part of the stable interface of this package.
340	//
341	// InternalFetch returns results from a single call to the underlying RPC.
342	// The number of results is no greater than pageSize.
343	// If there are no more results, nextPageToken is empty and err is nil.
344	InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error)
345}
346
347// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
348func (it *StringIterator) PageInfo() *iterator.PageInfo {
349	return it.pageInfo
350}
351
352// Next returns the next result. Its second return value is iterator.Done if there are no more
353// results. Once Next returns Done, all subsequent calls will return Done.
354func (it *StringIterator) Next() (string, error) {
355	var item string
356	if err := it.nextFunc(); err != nil {
357		return item, err
358	}
359	item = it.items[0]
360	it.items = it.items[1:]
361	return item, nil
362}
363
364func (it *StringIterator) bufLen() int {
365	return len(it.items)
366}
367
368func (it *StringIterator) takeBuf() interface{} {
369	b := it.items
370	it.items = nil
371	return b
372}
373
374// TopicIterator manages a stream of *pubsubpb.Topic.
375type TopicIterator struct {
376	items    []*pubsubpb.Topic
377	pageInfo *iterator.PageInfo
378	nextFunc func() error
379
380	// InternalFetch is for use by the Google Cloud Libraries only.
381	// It is not part of the stable interface of this package.
382	//
383	// InternalFetch returns results from a single call to the underlying RPC.
384	// The number of results is no greater than pageSize.
385	// If there are no more results, nextPageToken is empty and err is nil.
386	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error)
387}
388
389// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
390func (it *TopicIterator) PageInfo() *iterator.PageInfo {
391	return it.pageInfo
392}
393
394// Next returns the next result. Its second return value is iterator.Done if there are no more
395// results. Once Next returns Done, all subsequent calls will return Done.
396func (it *TopicIterator) Next() (*pubsubpb.Topic, error) {
397	var item *pubsubpb.Topic
398	if err := it.nextFunc(); err != nil {
399		return item, err
400	}
401	item = it.items[0]
402	it.items = it.items[1:]
403	return item, nil
404}
405
406func (it *TopicIterator) bufLen() int {
407	return len(it.items)
408}
409
410func (it *TopicIterator) takeBuf() interface{} {
411	b := it.items
412	it.items = nil
413	return b
414}
415