1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Code generated by protoc-gen-go_gapic. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"net/url"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	gax "github.com/googleapis/gax-go/v2"
28	"google.golang.org/api/iterator"
29	"google.golang.org/api/option"
30	gtransport "google.golang.org/api/transport/grpc"
31	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/metadata"
35)
36
37var newPublisherClientHook clientHook
38
39// PublisherCallOptions contains the retry settings for each method of PublisherClient.
40type PublisherCallOptions struct {
41	CreateTopic            []gax.CallOption
42	UpdateTopic            []gax.CallOption
43	Publish                []gax.CallOption
44	GetTopic               []gax.CallOption
45	ListTopics             []gax.CallOption
46	ListTopicSubscriptions []gax.CallOption
47	ListTopicSnapshots     []gax.CallOption
48	DeleteTopic            []gax.CallOption
49}
50
51func defaultPublisherClientOptions() []option.ClientOption {
52	return []option.ClientOption{
53		option.WithEndpoint("pubsub.googleapis.com:443"),
54		option.WithGRPCDialOption(grpc.WithDisableServiceConfig()),
55		option.WithScopes(DefaultAuthScopes()...),
56		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
57			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
58	}
59}
60
61func defaultPublisherCallOptions() *PublisherCallOptions {
62	return &PublisherCallOptions{
63		CreateTopic: []gax.CallOption{
64			gax.WithRetry(func() gax.Retryer {
65				return gax.OnCodes([]codes.Code{
66					codes.Unavailable,
67				}, gax.Backoff{
68					Initial:    100 * time.Millisecond,
69					Max:        60000 * time.Millisecond,
70					Multiplier: 1.30,
71				})
72			}),
73		},
74		UpdateTopic: []gax.CallOption{
75			gax.WithRetry(func() gax.Retryer {
76				return gax.OnCodes([]codes.Code{
77					codes.Unavailable,
78				}, gax.Backoff{
79					Initial:    100 * time.Millisecond,
80					Max:        60000 * time.Millisecond,
81					Multiplier: 1.30,
82				})
83			}),
84		},
85		Publish: []gax.CallOption{
86			gax.WithRetry(func() gax.Retryer {
87				return gax.OnCodes([]codes.Code{
88					codes.Aborted,
89					codes.Canceled,
90					codes.Internal,
91					codes.ResourceExhausted,
92					codes.Unknown,
93					codes.Unavailable,
94					codes.DeadlineExceeded,
95				}, gax.Backoff{
96					Initial:    100 * time.Millisecond,
97					Max:        60000 * time.Millisecond,
98					Multiplier: 1.30,
99				})
100			}),
101		},
102		GetTopic: []gax.CallOption{
103			gax.WithRetry(func() gax.Retryer {
104				return gax.OnCodes([]codes.Code{
105					codes.Unknown,
106					codes.Aborted,
107					codes.Unavailable,
108				}, gax.Backoff{
109					Initial:    100 * time.Millisecond,
110					Max:        60000 * time.Millisecond,
111					Multiplier: 1.30,
112				})
113			}),
114		},
115		ListTopics: []gax.CallOption{
116			gax.WithRetry(func() gax.Retryer {
117				return gax.OnCodes([]codes.Code{
118					codes.Unknown,
119					codes.Aborted,
120					codes.Unavailable,
121				}, gax.Backoff{
122					Initial:    100 * time.Millisecond,
123					Max:        60000 * time.Millisecond,
124					Multiplier: 1.30,
125				})
126			}),
127		},
128		ListTopicSubscriptions: []gax.CallOption{
129			gax.WithRetry(func() gax.Retryer {
130				return gax.OnCodes([]codes.Code{
131					codes.Unknown,
132					codes.Aborted,
133					codes.Unavailable,
134				}, gax.Backoff{
135					Initial:    100 * time.Millisecond,
136					Max:        60000 * time.Millisecond,
137					Multiplier: 1.30,
138				})
139			}),
140		},
141		ListTopicSnapshots: []gax.CallOption{
142			gax.WithRetry(func() gax.Retryer {
143				return gax.OnCodes([]codes.Code{
144					codes.Unknown,
145					codes.Aborted,
146					codes.Unavailable,
147				}, gax.Backoff{
148					Initial:    100 * time.Millisecond,
149					Max:        60000 * time.Millisecond,
150					Multiplier: 1.30,
151				})
152			}),
153		},
154		DeleteTopic: []gax.CallOption{
155			gax.WithRetry(func() gax.Retryer {
156				return gax.OnCodes([]codes.Code{
157					codes.Unavailable,
158				}, gax.Backoff{
159					Initial:    100 * time.Millisecond,
160					Max:        60000 * time.Millisecond,
161					Multiplier: 1.30,
162				})
163			}),
164		},
165	}
166}
167
168// PublisherClient is a client for interacting with Cloud Pub/Sub API.
169//
170// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
171type PublisherClient struct {
172	// Connection pool of gRPC connections to the service.
173	connPool gtransport.ConnPool
174
175	// The gRPC API client.
176	publisherClient pubsubpb.PublisherClient
177
178	// The call options for this service.
179	CallOptions *PublisherCallOptions
180
181	// The x-goog-* metadata to be sent with each request.
182	xGoogMetadata metadata.MD
183}
184
185// NewPublisherClient creates a new publisher client.
186//
187// The service that an application uses to manipulate topics, and to send
188// messages to a topic.
189func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
190	clientOpts := defaultPublisherClientOptions()
191
192	if newPublisherClientHook != nil {
193		hookOpts, err := newPublisherClientHook(ctx, clientHookParams{})
194		if err != nil {
195			return nil, err
196		}
197		clientOpts = append(clientOpts, hookOpts...)
198	}
199
200	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
201	if err != nil {
202		return nil, err
203	}
204	c := &PublisherClient{
205		connPool:    connPool,
206		CallOptions: defaultPublisherCallOptions(),
207
208		publisherClient: pubsubpb.NewPublisherClient(connPool),
209	}
210	c.setGoogleClientInfo()
211
212	return c, nil
213}
214
215// Connection returns a connection to the API service.
216//
217// Deprecated.
218func (c *PublisherClient) Connection() *grpc.ClientConn {
219	return c.connPool.Conn()
220}
221
222// Close closes the connection to the API service. The user should invoke this when
223// the client is no longer required.
224func (c *PublisherClient) Close() error {
225	return c.connPool.Close()
226}
227
228// setGoogleClientInfo sets the name and version of the application in
229// the `x-goog-api-client` header passed on each request. Intended for
230// use by Google-written clients.
231func (c *PublisherClient) setGoogleClientInfo(keyval ...string) {
232	kv := append([]string{"gl-go", versionGo()}, keyval...)
233	kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version)
234	c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
235}
236
237// CreateTopic creates the given topic with the given name. See the
238// resource name rules (at https://cloud.google.com/pubsub/docs/admin#resource_names).
239func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
240	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName())))
241	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
242	opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...)
243	var resp *pubsubpb.Topic
244	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
245		var err error
246		resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
247		return err
248	}, opts...)
249	if err != nil {
250		return nil, err
251	}
252	return resp, nil
253}
254
255// UpdateTopic updates an existing topic. Note that certain properties of a
256// topic are not modifiable.
257func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
258	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName())))
259	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
260	opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...)
261	var resp *pubsubpb.Topic
262	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
263		var err error
264		resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
265		return err
266	}, opts...)
267	if err != nil {
268		return nil, err
269	}
270	return resp, nil
271}
272
273// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
274// does not exist.
275func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
276	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
277	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
278	opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...)
279	var resp *pubsubpb.PublishResponse
280	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
281		var err error
282		resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
283		return err
284	}, opts...)
285	if err != nil {
286		return nil, err
287	}
288	return resp, nil
289}
290
291// GetTopic gets the configuration of a topic.
292func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
293	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
294	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
295	opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...)
296	var resp *pubsubpb.Topic
297	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
298		var err error
299		resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
300		return err
301	}, opts...)
302	if err != nil {
303		return nil, err
304	}
305	return resp, nil
306}
307
308// ListTopics lists matching topics.
309func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
310	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject())))
311	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
312	opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...)
313	it := &TopicIterator{}
314	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
315	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
316		var resp *pubsubpb.ListTopicsResponse
317		req.PageToken = pageToken
318		if pageSize > math.MaxInt32 {
319			req.PageSize = math.MaxInt32
320		} else {
321			req.PageSize = int32(pageSize)
322		}
323		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
324			var err error
325			resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
326			return err
327		}, opts...)
328		if err != nil {
329			return nil, "", err
330		}
331
332		it.Response = resp
333		return resp.Topics, resp.NextPageToken, nil
334	}
335	fetch := func(pageSize int, pageToken string) (string, error) {
336		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
337		if err != nil {
338			return "", err
339		}
340		it.items = append(it.items, items...)
341		return nextPageToken, nil
342	}
343	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
344	it.pageInfo.MaxSize = int(req.PageSize)
345	it.pageInfo.Token = req.PageToken
346	return it
347}
348
349// ListTopicSubscriptions lists the names of the subscriptions on this topic.
350func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
351	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
352	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
353	opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...)
354	it := &StringIterator{}
355	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
356	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
357		var resp *pubsubpb.ListTopicSubscriptionsResponse
358		req.PageToken = pageToken
359		if pageSize > math.MaxInt32 {
360			req.PageSize = math.MaxInt32
361		} else {
362			req.PageSize = int32(pageSize)
363		}
364		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
365			var err error
366			resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
367			return err
368		}, opts...)
369		if err != nil {
370			return nil, "", err
371		}
372
373		it.Response = resp
374		return resp.Subscriptions, resp.NextPageToken, nil
375	}
376	fetch := func(pageSize int, pageToken string) (string, error) {
377		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
378		if err != nil {
379			return "", err
380		}
381		it.items = append(it.items, items...)
382		return nextPageToken, nil
383	}
384	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
385	it.pageInfo.MaxSize = int(req.PageSize)
386	it.pageInfo.Token = req.PageToken
387	return it
388}
389
390// ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in
391// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
392// operations, which allow
393// you to manage message acknowledgments in bulk. That is, you can set the
394// acknowledgment state of messages in an existing subscription to the state
395// captured by a snapshot.
396func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
397	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
398	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
399	opts = append(c.CallOptions.ListTopicSnapshots[0:len(c.CallOptions.ListTopicSnapshots):len(c.CallOptions.ListTopicSnapshots)], opts...)
400	it := &StringIterator{}
401	req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
402	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
403		var resp *pubsubpb.ListTopicSnapshotsResponse
404		req.PageToken = pageToken
405		if pageSize > math.MaxInt32 {
406			req.PageSize = math.MaxInt32
407		} else {
408			req.PageSize = int32(pageSize)
409		}
410		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
411			var err error
412			resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...)
413			return err
414		}, opts...)
415		if err != nil {
416			return nil, "", err
417		}
418
419		it.Response = resp
420		return resp.Snapshots, resp.NextPageToken, nil
421	}
422	fetch := func(pageSize int, pageToken string) (string, error) {
423		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
424		if err != nil {
425			return "", err
426		}
427		it.items = append(it.items, items...)
428		return nextPageToken, nil
429	}
430	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
431	it.pageInfo.MaxSize = int(req.PageSize)
432	it.pageInfo.Token = req.PageToken
433	return it
434}
435
436// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
437// does not exist. After a topic is deleted, a new topic may be created with
438// the same name; this is an entirely new topic with none of the old
439// configuration or subscriptions. Existing subscriptions to this topic are
440// not deleted, but their topic field is set to _deleted-topic_.
441func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
442	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
443	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
444	opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...)
445	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
446		var err error
447		_, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
448		return err
449	}, opts...)
450	return err
451}
452
453// StringIterator manages a stream of string.
454type StringIterator struct {
455	items    []string
456	pageInfo *iterator.PageInfo
457	nextFunc func() error
458
459	// Response is the raw response for the current page.
460	// It must be cast to the RPC response type.
461	// Calling Next() or InternalFetch() updates this value.
462	Response interface{}
463
464	// InternalFetch is for use by the Google Cloud Libraries only.
465	// It is not part of the stable interface of this package.
466	//
467	// InternalFetch returns results from a single call to the underlying RPC.
468	// The number of results is no greater than pageSize.
469	// If there are no more results, nextPageToken is empty and err is nil.
470	InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error)
471}
472
473// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
474func (it *StringIterator) PageInfo() *iterator.PageInfo {
475	return it.pageInfo
476}
477
478// Next returns the next result. Its second return value is iterator.Done if there are no more
479// results. Once Next returns Done, all subsequent calls will return Done.
480func (it *StringIterator) Next() (string, error) {
481	var item string
482	if err := it.nextFunc(); err != nil {
483		return item, err
484	}
485	item = it.items[0]
486	it.items = it.items[1:]
487	return item, nil
488}
489
490func (it *StringIterator) bufLen() int {
491	return len(it.items)
492}
493
494func (it *StringIterator) takeBuf() interface{} {
495	b := it.items
496	it.items = nil
497	return b
498}
499
500// TopicIterator manages a stream of *pubsubpb.Topic.
501type TopicIterator struct {
502	items    []*pubsubpb.Topic
503	pageInfo *iterator.PageInfo
504	nextFunc func() error
505
506	// Response is the raw response for the current page.
507	// It must be cast to the RPC response type.
508	// Calling Next() or InternalFetch() updates this value.
509	Response interface{}
510
511	// InternalFetch is for use by the Google Cloud Libraries only.
512	// It is not part of the stable interface of this package.
513	//
514	// InternalFetch returns results from a single call to the underlying RPC.
515	// The number of results is no greater than pageSize.
516	// If there are no more results, nextPageToken is empty and err is nil.
517	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error)
518}
519
520// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
521func (it *TopicIterator) PageInfo() *iterator.PageInfo {
522	return it.pageInfo
523}
524
525// Next returns the next result. Its second return value is iterator.Done if there are no more
526// results. Once Next returns Done, all subsequent calls will return Done.
527func (it *TopicIterator) Next() (*pubsubpb.Topic, error) {
528	var item *pubsubpb.Topic
529	if err := it.nextFunc(); err != nil {
530		return item, err
531	}
532	item = it.items[0]
533	it.items = it.items[1:]
534	return item, nil
535}
536
537func (it *TopicIterator) bufLen() int {
538	return len(it.items)
539}
540
541func (it *TopicIterator) takeBuf() interface{} {
542	b := it.items
543	it.items = nil
544	return b
545}
546