1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Code generated by gapic-generator. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"net/url"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	gax "github.com/googleapis/gax-go/v2"
28	"google.golang.org/api/iterator"
29	"google.golang.org/api/option"
30	"google.golang.org/api/transport"
31	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/metadata"
35)
36
37// PublisherCallOptions contains the retry settings for each method of PublisherClient.
38type PublisherCallOptions struct {
39	CreateTopic            []gax.CallOption
40	UpdateTopic            []gax.CallOption
41	Publish                []gax.CallOption
42	GetTopic               []gax.CallOption
43	ListTopics             []gax.CallOption
44	ListTopicSubscriptions []gax.CallOption
45	DeleteTopic            []gax.CallOption
46}
47
48func defaultPublisherClientOptions() []option.ClientOption {
49	return []option.ClientOption{
50		option.WithEndpoint("pubsub.googleapis.com:443"),
51		option.WithScopes(DefaultAuthScopes()...),
52		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
53			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
54	}
55}
56
57func defaultPublisherCallOptions() *PublisherCallOptions {
58	retry := map[[2]string][]gax.CallOption{
59		{"default", "idempotent"}: {
60			gax.WithRetry(func() gax.Retryer {
61				return gax.OnCodes([]codes.Code{
62					codes.Aborted,
63					codes.Unavailable,
64					codes.Unknown,
65				}, gax.Backoff{
66					Initial:    100 * time.Millisecond,
67					Max:        60000 * time.Millisecond,
68					Multiplier: 1.3,
69				})
70			}),
71		},
72		{"default", "non_idempotent"}: {
73			gax.WithRetry(func() gax.Retryer {
74				return gax.OnCodes([]codes.Code{
75					codes.Unavailable,
76				}, gax.Backoff{
77					Initial:    100 * time.Millisecond,
78					Max:        60000 * time.Millisecond,
79					Multiplier: 1.3,
80				})
81			}),
82		},
83		{"messaging", "publish"}: {
84			gax.WithRetry(func() gax.Retryer {
85				return gax.OnCodes([]codes.Code{
86					codes.Aborted,
87					codes.Canceled,
88					codes.DeadlineExceeded,
89					codes.Internal,
90					codes.ResourceExhausted,
91					codes.Unavailable,
92					codes.Unknown,
93				}, gax.Backoff{
94					Initial:    100 * time.Millisecond,
95					Max:        60000 * time.Millisecond,
96					Multiplier: 1.3,
97				})
98			}),
99		},
100	}
101	return &PublisherCallOptions{
102		CreateTopic:            retry[[2]string{"default", "non_idempotent"}],
103		UpdateTopic:            retry[[2]string{"default", "non_idempotent"}],
104		Publish:                retry[[2]string{"messaging", "publish"}],
105		GetTopic:               retry[[2]string{"default", "idempotent"}],
106		ListTopics:             retry[[2]string{"default", "idempotent"}],
107		ListTopicSubscriptions: retry[[2]string{"default", "idempotent"}],
108		DeleteTopic:            retry[[2]string{"default", "non_idempotent"}],
109	}
110}
111
112// PublisherClient is a client for interacting with Google Cloud Pub/Sub API.
113//
114// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
115type PublisherClient struct {
116	// The connection to the service.
117	conn *grpc.ClientConn
118
119	// The gRPC API client.
120	publisherClient pubsubpb.PublisherClient
121
122	// The call options for this service.
123	CallOptions *PublisherCallOptions
124
125	// The x-goog-* metadata to be sent with each request.
126	xGoogMetadata metadata.MD
127}
128
129// NewPublisherClient creates a new publisher client.
130//
131// The service that an application uses to manipulate topics, and to send
132// messages to a topic.
133func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
134	conn, err := transport.DialGRPC(ctx, append(defaultPublisherClientOptions(), opts...)...)
135	if err != nil {
136		return nil, err
137	}
138	c := &PublisherClient{
139		conn:        conn,
140		CallOptions: defaultPublisherCallOptions(),
141
142		publisherClient: pubsubpb.NewPublisherClient(conn),
143	}
144	c.SetGoogleClientInfo()
145	return c, nil
146}
147
148// Connection returns the client's connection to the API service.
149func (c *PublisherClient) Connection() *grpc.ClientConn {
150	return c.conn
151}
152
153// Close closes the connection to the API service. The user should invoke this when
154// the client is no longer required.
155func (c *PublisherClient) Close() error {
156	return c.conn.Close()
157}
158
159// SetGoogleClientInfo sets the name and version of the application in
160// the `x-goog-api-client` header passed on each request. Intended for
161// use by Google-written clients.
162func (c *PublisherClient) SetGoogleClientInfo(keyval ...string) {
163	kv := append([]string{"gl-go", versionGo()}, keyval...)
164	kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version)
165	c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
166}
167
168// CreateTopic creates the given topic with the given name. See the
169// <a href="https://cloud.google.com/pubsub/docs/admin#resource_names">
170// resource name rules</a>.
171func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
172	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName())))
173	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
174	opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...)
175	var resp *pubsubpb.Topic
176	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
177		var err error
178		resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
179		return err
180	}, opts...)
181	if err != nil {
182		return nil, err
183	}
184	return resp, nil
185}
186
187// UpdateTopic updates an existing topic. Note that certain properties of a
188// topic are not modifiable.
189func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
190	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName())))
191	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
192	opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...)
193	var resp *pubsubpb.Topic
194	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
195		var err error
196		resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
197		return err
198	}, opts...)
199	if err != nil {
200		return nil, err
201	}
202	return resp, nil
203}
204
205// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
206// does not exist.
207func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
208	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
209	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
210	opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...)
211	var resp *pubsubpb.PublishResponse
212	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
213		var err error
214		resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
215		return err
216	}, opts...)
217	if err != nil {
218		return nil, err
219	}
220	return resp, nil
221}
222
223// GetTopic gets the configuration of a topic.
224func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
225	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
226	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
227	opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...)
228	var resp *pubsubpb.Topic
229	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
230		var err error
231		resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
232		return err
233	}, opts...)
234	if err != nil {
235		return nil, err
236	}
237	return resp, nil
238}
239
240// ListTopics lists matching topics.
241func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
242	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject())))
243	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
244	opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...)
245	it := &TopicIterator{}
246	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
247	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
248		var resp *pubsubpb.ListTopicsResponse
249		req.PageToken = pageToken
250		if pageSize > math.MaxInt32 {
251			req.PageSize = math.MaxInt32
252		} else {
253			req.PageSize = int32(pageSize)
254		}
255		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
256			var err error
257			resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
258			return err
259		}, opts...)
260		if err != nil {
261			return nil, "", err
262		}
263		return resp.Topics, resp.NextPageToken, nil
264	}
265	fetch := func(pageSize int, pageToken string) (string, error) {
266		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
267		if err != nil {
268			return "", err
269		}
270		it.items = append(it.items, items...)
271		return nextPageToken, nil
272	}
273	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
274	it.pageInfo.MaxSize = int(req.PageSize)
275	it.pageInfo.Token = req.PageToken
276	return it
277}
278
279// ListTopicSubscriptions lists the names of the subscriptions on this topic.
280func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
281	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
282	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
283	opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...)
284	it := &StringIterator{}
285	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
286	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
287		var resp *pubsubpb.ListTopicSubscriptionsResponse
288		req.PageToken = pageToken
289		if pageSize > math.MaxInt32 {
290			req.PageSize = math.MaxInt32
291		} else {
292			req.PageSize = int32(pageSize)
293		}
294		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
295			var err error
296			resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
297			return err
298		}, opts...)
299		if err != nil {
300			return nil, "", err
301		}
302		return resp.Subscriptions, resp.NextPageToken, nil
303	}
304	fetch := func(pageSize int, pageToken string) (string, error) {
305		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
306		if err != nil {
307			return "", err
308		}
309		it.items = append(it.items, items...)
310		return nextPageToken, nil
311	}
312	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
313	it.pageInfo.MaxSize = int(req.PageSize)
314	it.pageInfo.Token = req.PageToken
315	return it
316}
317
318// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
319// does not exist. After a topic is deleted, a new topic may be created with
320// the same name; this is an entirely new topic with none of the old
321// configuration or subscriptions. Existing subscriptions to this topic are
322// not deleted, but their topic field is set to _deleted-topic_.
323func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
324	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic())))
325	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
326	opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...)
327	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
328		var err error
329		_, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
330		return err
331	}, opts...)
332	return err
333}
334
335// StringIterator manages a stream of string.
336type StringIterator struct {
337	items    []string
338	pageInfo *iterator.PageInfo
339	nextFunc func() error
340
341	// InternalFetch is for use by the Google Cloud Libraries only.
342	// It is not part of the stable interface of this package.
343	//
344	// InternalFetch returns results from a single call to the underlying RPC.
345	// The number of results is no greater than pageSize.
346	// If there are no more results, nextPageToken is empty and err is nil.
347	InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error)
348}
349
350// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
351func (it *StringIterator) PageInfo() *iterator.PageInfo {
352	return it.pageInfo
353}
354
355// Next returns the next result. Its second return value is iterator.Done if there are no more
356// results. Once Next returns Done, all subsequent calls will return Done.
357func (it *StringIterator) Next() (string, error) {
358	var item string
359	if err := it.nextFunc(); err != nil {
360		return item, err
361	}
362	item = it.items[0]
363	it.items = it.items[1:]
364	return item, nil
365}
366
367func (it *StringIterator) bufLen() int {
368	return len(it.items)
369}
370
371func (it *StringIterator) takeBuf() interface{} {
372	b := it.items
373	it.items = nil
374	return b
375}
376
377// TopicIterator manages a stream of *pubsubpb.Topic.
378type TopicIterator struct {
379	items    []*pubsubpb.Topic
380	pageInfo *iterator.PageInfo
381	nextFunc func() error
382
383	// InternalFetch is for use by the Google Cloud Libraries only.
384	// It is not part of the stable interface of this package.
385	//
386	// InternalFetch returns results from a single call to the underlying RPC.
387	// The number of results is no greater than pageSize.
388	// If there are no more results, nextPageToken is empty and err is nil.
389	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error)
390}
391
392// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
393func (it *TopicIterator) PageInfo() *iterator.PageInfo {
394	return it.pageInfo
395}
396
397// Next returns the next result. Its second return value is iterator.Done if there are no more
398// results. Once Next returns Done, all subsequent calls will return Done.
399func (it *TopicIterator) Next() (*pubsubpb.Topic, error) {
400	var item *pubsubpb.Topic
401	if err := it.nextFunc(); err != nil {
402		return item, err
403	}
404	item = it.items[0]
405	it.items = it.items[1:]
406	return item, nil
407}
408
409func (it *TopicIterator) bufLen() int {
410	return len(it.items)
411}
412
413func (it *TopicIterator) takeBuf() interface{} {
414	b := it.items
415	it.items = nil
416	return b
417}
418