1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Code generated by protoc-gen-go_gapic. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"net/url"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	gax "github.com/googleapis/gax-go/v2"
28	"google.golang.org/api/iterator"
29	"google.golang.org/api/option"
30	gtransport "google.golang.org/api/transport/grpc"
31	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/metadata"
35)
36
37var newPublisherClientHook clientHook
38
39// PublisherCallOptions contains the retry settings for each method of PublisherClient.
40type PublisherCallOptions struct {
41	CreateTopic            []gax.CallOption
42	UpdateTopic            []gax.CallOption
43	Publish                []gax.CallOption
44	GetTopic               []gax.CallOption
45	ListTopics             []gax.CallOption
46	ListTopicSubscriptions []gax.CallOption
47	ListTopicSnapshots     []gax.CallOption
48	DeleteTopic            []gax.CallOption
49	DetachSubscription     []gax.CallOption
50}
51
52func defaultPublisherClientOptions() []option.ClientOption {
53	return []option.ClientOption{
54		option.WithEndpoint("pubsub.googleapis.com:443"),
55		option.WithGRPCDialOption(grpc.WithDisableServiceConfig()),
56		option.WithScopes(DefaultAuthScopes()...),
57		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
58			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
59	}
60}
61
62func defaultPublisherCallOptions() *PublisherCallOptions {
63	return &PublisherCallOptions{
64		CreateTopic: []gax.CallOption{
65			gax.WithRetry(func() gax.Retryer {
66				return gax.OnCodes([]codes.Code{
67					codes.Unavailable,
68				}, gax.Backoff{
69					Initial:    100 * time.Millisecond,
70					Max:        60000 * time.Millisecond,
71					Multiplier: 1.30,
72				})
73			}),
74		},
75		UpdateTopic: []gax.CallOption{
76			gax.WithRetry(func() gax.Retryer {
77				return gax.OnCodes([]codes.Code{
78					codes.Unavailable,
79				}, gax.Backoff{
80					Initial:    100 * time.Millisecond,
81					Max:        60000 * time.Millisecond,
82					Multiplier: 1.30,
83				})
84			}),
85		},
86		Publish: []gax.CallOption{
87			gax.WithRetry(func() gax.Retryer {
88				return gax.OnCodes([]codes.Code{
89					codes.Aborted,
90					codes.Canceled,
91					codes.Internal,
92					codes.ResourceExhausted,
93					codes.Unknown,
94					codes.Unavailable,
95					codes.DeadlineExceeded,
96				}, gax.Backoff{
97					Initial:    100 * time.Millisecond,
98					Max:        60000 * time.Millisecond,
99					Multiplier: 1.30,
100				})
101			}),
102		},
103		GetTopic: []gax.CallOption{
104			gax.WithRetry(func() gax.Retryer {
105				return gax.OnCodes([]codes.Code{
106					codes.Unknown,
107					codes.Aborted,
108					codes.Unavailable,
109				}, gax.Backoff{
110					Initial:    100 * time.Millisecond,
111					Max:        60000 * time.Millisecond,
112					Multiplier: 1.30,
113				})
114			}),
115		},
116		ListTopics: []gax.CallOption{
117			gax.WithRetry(func() gax.Retryer {
118				return gax.OnCodes([]codes.Code{
119					codes.Unknown,
120					codes.Aborted,
121					codes.Unavailable,
122				}, gax.Backoff{
123					Initial:    100 * time.Millisecond,
124					Max:        60000 * time.Millisecond,
125					Multiplier: 1.30,
126				})
127			}),
128		},
129		ListTopicSubscriptions: []gax.CallOption{
130			gax.WithRetry(func() gax.Retryer {
131				return gax.OnCodes([]codes.Code{
132					codes.Unknown,
133					codes.Aborted,
134					codes.Unavailable,
135				}, gax.Backoff{
136					Initial:    100 * time.Millisecond,
137					Max:        60000 * time.Millisecond,
138					Multiplier: 1.30,
139				})
140			}),
141		},
142		ListTopicSnapshots: []gax.CallOption{
143			gax.WithRetry(func() gax.Retryer {
144				return gax.OnCodes([]codes.Code{
145					codes.Unknown,
146					codes.Aborted,
147					codes.Unavailable,
148				}, gax.Backoff{
149					Initial:    100 * time.Millisecond,
150					Max:        60000 * time.Millisecond,
151					Multiplier: 1.30,
152				})
153			}),
154		},
155		DeleteTopic: []gax.CallOption{
156			gax.WithRetry(func() gax.Retryer {
157				return gax.OnCodes([]codes.Code{
158					codes.Unavailable,
159				}, gax.Backoff{
160					Initial:    100 * time.Millisecond,
161					Max:        60000 * time.Millisecond,
162					Multiplier: 1.30,
163				})
164			}),
165		},
166		DetachSubscription: []gax.CallOption{
167			gax.WithRetry(func() gax.Retryer {
168				return gax.OnCodes([]codes.Code{
169					codes.Unavailable,
170				}, gax.Backoff{
171					Initial:    100 * time.Millisecond,
172					Max:        60000 * time.Millisecond,
173					Multiplier: 1.30,
174				})
175			}),
176		},
177	}
178}
179
180// PublisherClient is a client for interacting with Cloud Pub/Sub API.
181//
182// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
183type PublisherClient struct {
184	// Connection pool of gRPC connections to the service.
185	connPool gtransport.ConnPool
186
187	// The gRPC API client.
188	publisherClient pubsubpb.PublisherClient
189
190	// The call options for this service.
191	CallOptions *PublisherCallOptions
192
193	// The x-goog-* metadata to be sent with each request.
194	xGoogMetadata metadata.MD
195}
196
197// NewPublisherClient creates a new publisher client.
198//
199// The service that an application uses to manipulate topics, and to send
200// messages to a topic.
201func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
202	clientOpts := defaultPublisherClientOptions()
203
204	if newPublisherClientHook != nil {
205		hookOpts, err := newPublisherClientHook(ctx, clientHookParams{})
206		if err != nil {
207			return nil, err
208		}
209		clientOpts = append(clientOpts, hookOpts...)
210	}
211
212	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
213	if err != nil {
214		return nil, err
215	}
216	c := &PublisherClient{
217		connPool:    connPool,
218		CallOptions: defaultPublisherCallOptions(),
219
220		publisherClient: pubsubpb.NewPublisherClient(connPool),
221	}
222	c.setGoogleClientInfo()
223
224	return c, nil
225}
226
227// Connection returns a connection to the API service.
228//
229// Deprecated.
230func (c *PublisherClient) Connection() *grpc.ClientConn {
231	return c.connPool.Conn()
232}
233
234// Close closes the connection to the API service. The user should invoke this when
235// the client is no longer required.
236func (c *PublisherClient) Close() error {
237	return c.connPool.Close()
238}
239
240// setGoogleClientInfo sets the name and version of the application in
241// the `x-goog-api-client` header passed on each request. Intended for
242// use by Google-written clients.
243func (c *PublisherClient) setGoogleClientInfo(keyval ...string) {
244	kv := append([]string{"gl-go", versionGo()}, keyval...)
245	kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version)
246	c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
247}
248
249// CreateTopic creates the given topic with the given name. See the
250// resource name rules (at https://cloud.google.com/pubsub/docs/admin#resource_names).
251func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
252	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName())))
253	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
254	opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...)
255	var resp *pubsubpb.Topic
256	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
257		var err error
258		resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
259		return err
260	}, opts...)
261	if err != nil {
262		return nil, err
263	}
264	return resp, nil
265}
266
267// UpdateTopic updates an existing topic. Note that certain properties of a
268// topic are not modifiable.
269func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
270	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName())))
271	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
272	opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...)
273	var resp *pubsubpb.Topic
274	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
275		var err error
276		resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
277		return err
278	}, opts...)
279	if err != nil {
280		return nil, err
281	}
282	return resp, nil
283}
284
285// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
286// does not exist.
287func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
288	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
289	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
290	opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...)
291	var resp *pubsubpb.PublishResponse
292	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
293		var err error
294		resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
295		return err
296	}, opts...)
297	if err != nil {
298		return nil, err
299	}
300	return resp, nil
301}
302
303// GetTopic gets the configuration of a topic.
304func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
305	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
306	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
307	opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...)
308	var resp *pubsubpb.Topic
309	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
310		var err error
311		resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
312		return err
313	}, opts...)
314	if err != nil {
315		return nil, err
316	}
317	return resp, nil
318}
319
320// ListTopics lists matching topics.
321func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
322	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject())))
323	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
324	opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...)
325	it := &TopicIterator{}
326	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
327	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
328		var resp *pubsubpb.ListTopicsResponse
329		req.PageToken = pageToken
330		if pageSize > math.MaxInt32 {
331			req.PageSize = math.MaxInt32
332		} else {
333			req.PageSize = int32(pageSize)
334		}
335		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
336			var err error
337			resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
338			return err
339		}, opts...)
340		if err != nil {
341			return nil, "", err
342		}
343
344		it.Response = resp
345		return resp.Topics, resp.NextPageToken, nil
346	}
347	fetch := func(pageSize int, pageToken string) (string, error) {
348		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
349		if err != nil {
350			return "", err
351		}
352		it.items = append(it.items, items...)
353		return nextPageToken, nil
354	}
355	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
356	it.pageInfo.MaxSize = int(req.PageSize)
357	it.pageInfo.Token = req.PageToken
358	return it
359}
360
361// ListTopicSubscriptions lists the names of the attached subscriptions on this topic.
362func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
363	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
364	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
365	opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...)
366	it := &StringIterator{}
367	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
368	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
369		var resp *pubsubpb.ListTopicSubscriptionsResponse
370		req.PageToken = pageToken
371		if pageSize > math.MaxInt32 {
372			req.PageSize = math.MaxInt32
373		} else {
374			req.PageSize = int32(pageSize)
375		}
376		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
377			var err error
378			resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
379			return err
380		}, opts...)
381		if err != nil {
382			return nil, "", err
383		}
384
385		it.Response = resp
386		return resp.Subscriptions, resp.NextPageToken, nil
387	}
388	fetch := func(pageSize int, pageToken string) (string, error) {
389		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
390		if err != nil {
391			return "", err
392		}
393		it.items = append(it.items, items...)
394		return nextPageToken, nil
395	}
396	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
397	it.pageInfo.MaxSize = int(req.PageSize)
398	it.pageInfo.Token = req.PageToken
399	return it
400}
401
402// ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in
403// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
404// operations, which allow
405// you to manage message acknowledgments in bulk. That is, you can set the
406// acknowledgment state of messages in an existing subscription to the state
407// captured by a snapshot.
408func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
409	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
410	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
411	opts = append(c.CallOptions.ListTopicSnapshots[0:len(c.CallOptions.ListTopicSnapshots):len(c.CallOptions.ListTopicSnapshots)], opts...)
412	it := &StringIterator{}
413	req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
414	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
415		var resp *pubsubpb.ListTopicSnapshotsResponse
416		req.PageToken = pageToken
417		if pageSize > math.MaxInt32 {
418			req.PageSize = math.MaxInt32
419		} else {
420			req.PageSize = int32(pageSize)
421		}
422		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
423			var err error
424			resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...)
425			return err
426		}, opts...)
427		if err != nil {
428			return nil, "", err
429		}
430
431		it.Response = resp
432		return resp.Snapshots, resp.NextPageToken, nil
433	}
434	fetch := func(pageSize int, pageToken string) (string, error) {
435		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
436		if err != nil {
437			return "", err
438		}
439		it.items = append(it.items, items...)
440		return nextPageToken, nil
441	}
442	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
443	it.pageInfo.MaxSize = int(req.PageSize)
444	it.pageInfo.Token = req.PageToken
445	return it
446}
447
448// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
449// does not exist. After a topic is deleted, a new topic may be created with
450// the same name; this is an entirely new topic with none of the old
451// configuration or subscriptions. Existing subscriptions to this topic are
452// not deleted, but their topic field is set to _deleted-topic_.
453func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
454	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
455	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
456	opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...)
457	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
458		var err error
459		_, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
460		return err
461	}, opts...)
462	return err
463}
464
465// DetachSubscription detaches a subscription from this topic. All messages retained in the
466// subscription are dropped. Subsequent Pull and StreamingPull requests
467// will return FAILED_PRECONDITION. If the subscription is a push
468// subscription, pushes to the endpoint will stop.
469func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
470	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
471	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
472	opts = append(c.CallOptions.DetachSubscription[0:len(c.CallOptions.DetachSubscription):len(c.CallOptions.DetachSubscription)], opts...)
473	var resp *pubsubpb.DetachSubscriptionResponse
474	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
475		var err error
476		resp, err = c.publisherClient.DetachSubscription(ctx, req, settings.GRPC...)
477		return err
478	}, opts...)
479	if err != nil {
480		return nil, err
481	}
482	return resp, nil
483}
484
485// StringIterator manages a stream of string.
486type StringIterator struct {
487	items    []string
488	pageInfo *iterator.PageInfo
489	nextFunc func() error
490
491	// Response is the raw response for the current page.
492	// It must be cast to the RPC response type.
493	// Calling Next() or InternalFetch() updates this value.
494	Response interface{}
495
496	// InternalFetch is for use by the Google Cloud Libraries only.
497	// It is not part of the stable interface of this package.
498	//
499	// InternalFetch returns results from a single call to the underlying RPC.
500	// The number of results is no greater than pageSize.
501	// If there are no more results, nextPageToken is empty and err is nil.
502	InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error)
503}
504
505// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
506func (it *StringIterator) PageInfo() *iterator.PageInfo {
507	return it.pageInfo
508}
509
510// Next returns the next result. Its second return value is iterator.Done if there are no more
511// results. Once Next returns Done, all subsequent calls will return Done.
512func (it *StringIterator) Next() (string, error) {
513	var item string
514	if err := it.nextFunc(); err != nil {
515		return item, err
516	}
517	item = it.items[0]
518	it.items = it.items[1:]
519	return item, nil
520}
521
522func (it *StringIterator) bufLen() int {
523	return len(it.items)
524}
525
526func (it *StringIterator) takeBuf() interface{} {
527	b := it.items
528	it.items = nil
529	return b
530}
531
532// TopicIterator manages a stream of *pubsubpb.Topic.
533type TopicIterator struct {
534	items    []*pubsubpb.Topic
535	pageInfo *iterator.PageInfo
536	nextFunc func() error
537
538	// Response is the raw response for the current page.
539	// It must be cast to the RPC response type.
540	// Calling Next() or InternalFetch() updates this value.
541	Response interface{}
542
543	// InternalFetch is for use by the Google Cloud Libraries only.
544	// It is not part of the stable interface of this package.
545	//
546	// InternalFetch returns results from a single call to the underlying RPC.
547	// The number of results is no greater than pageSize.
548	// If there are no more results, nextPageToken is empty and err is nil.
549	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error)
550}
551
552// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
553func (it *TopicIterator) PageInfo() *iterator.PageInfo {
554	return it.pageInfo
555}
556
557// Next returns the next result. Its second return value is iterator.Done if there are no more
558// results. Once Next returns Done, all subsequent calls will return Done.
559func (it *TopicIterator) Next() (*pubsubpb.Topic, error) {
560	var item *pubsubpb.Topic
561	if err := it.nextFunc(); err != nil {
562		return item, err
563	}
564	item = it.items[0]
565	it.items = it.items[1:]
566	return item, nil
567}
568
569func (it *TopicIterator) bufLen() int {
570	return len(it.items)
571}
572
573func (it *TopicIterator) takeBuf() interface{} {
574	b := it.items
575	it.items = nil
576	return b
577}
578