1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Code generated by protoc-gen-go_gapic. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"net/url"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	gax "github.com/googleapis/gax-go/v2"
28	"google.golang.org/api/iterator"
29	"google.golang.org/api/option"
30	gtransport "google.golang.org/api/transport/grpc"
31	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/metadata"
35)
36
37var newSubscriberClientHook clientHook
38
39// SubscriberCallOptions contains the retry settings for each method of SubscriberClient.
40type SubscriberCallOptions struct {
41	CreateSubscription []gax.CallOption
42	GetSubscription    []gax.CallOption
43	UpdateSubscription []gax.CallOption
44	ListSubscriptions  []gax.CallOption
45	DeleteSubscription []gax.CallOption
46	ModifyAckDeadline  []gax.CallOption
47	Acknowledge        []gax.CallOption
48	Pull               []gax.CallOption
49	StreamingPull      []gax.CallOption
50	ModifyPushConfig   []gax.CallOption
51	GetSnapshot        []gax.CallOption
52	ListSnapshots      []gax.CallOption
53	CreateSnapshot     []gax.CallOption
54	UpdateSnapshot     []gax.CallOption
55	DeleteSnapshot     []gax.CallOption
56	Seek               []gax.CallOption
57}
58
59func defaultSubscriberClientOptions() []option.ClientOption {
60	return []option.ClientOption{
61		option.WithEndpoint("pubsub.googleapis.com:443"),
62		option.WithGRPCDialOption(grpc.WithDisableServiceConfig()),
63		option.WithScopes(DefaultAuthScopes()...),
64		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
65			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
66	}
67}
68
69func defaultSubscriberCallOptions() *SubscriberCallOptions {
70	return &SubscriberCallOptions{
71		CreateSubscription: []gax.CallOption{
72			gax.WithRetry(func() gax.Retryer {
73				return gax.OnCodes([]codes.Code{
74					codes.Unknown,
75					codes.Aborted,
76					codes.Unavailable,
77				}, gax.Backoff{
78					Initial:    100 * time.Millisecond,
79					Max:        60000 * time.Millisecond,
80					Multiplier: 1.30,
81				})
82			}),
83		},
84		GetSubscription: []gax.CallOption{
85			gax.WithRetry(func() gax.Retryer {
86				return gax.OnCodes([]codes.Code{
87					codes.Unknown,
88					codes.Aborted,
89					codes.Unavailable,
90				}, gax.Backoff{
91					Initial:    100 * time.Millisecond,
92					Max:        60000 * time.Millisecond,
93					Multiplier: 1.30,
94				})
95			}),
96		},
97		UpdateSubscription: []gax.CallOption{
98			gax.WithRetry(func() gax.Retryer {
99				return gax.OnCodes([]codes.Code{
100					codes.Unavailable,
101				}, gax.Backoff{
102					Initial:    100 * time.Millisecond,
103					Max:        60000 * time.Millisecond,
104					Multiplier: 1.30,
105				})
106			}),
107		},
108		ListSubscriptions: []gax.CallOption{
109			gax.WithRetry(func() gax.Retryer {
110				return gax.OnCodes([]codes.Code{
111					codes.Unknown,
112					codes.Aborted,
113					codes.Unavailable,
114				}, gax.Backoff{
115					Initial:    100 * time.Millisecond,
116					Max:        60000 * time.Millisecond,
117					Multiplier: 1.30,
118				})
119			}),
120		},
121		DeleteSubscription: []gax.CallOption{
122			gax.WithRetry(func() gax.Retryer {
123				return gax.OnCodes([]codes.Code{
124					codes.Unavailable,
125				}, gax.Backoff{
126					Initial:    100 * time.Millisecond,
127					Max:        60000 * time.Millisecond,
128					Multiplier: 1.30,
129				})
130			}),
131		},
132		ModifyAckDeadline: []gax.CallOption{
133			gax.WithRetry(func() gax.Retryer {
134				return gax.OnCodes([]codes.Code{
135					codes.Unavailable,
136				}, gax.Backoff{
137					Initial:    100 * time.Millisecond,
138					Max:        60000 * time.Millisecond,
139					Multiplier: 1.30,
140				})
141			}),
142		},
143		Acknowledge: []gax.CallOption{
144			gax.WithRetry(func() gax.Retryer {
145				return gax.OnCodes([]codes.Code{
146					codes.Unavailable,
147				}, gax.Backoff{
148					Initial:    100 * time.Millisecond,
149					Max:        60000 * time.Millisecond,
150					Multiplier: 1.30,
151				})
152			}),
153		},
154		Pull: []gax.CallOption{
155			gax.WithRetry(func() gax.Retryer {
156				return gax.OnCodes([]codes.Code{
157					codes.Unknown,
158					codes.Aborted,
159					codes.Unavailable,
160				}, gax.Backoff{
161					Initial:    100 * time.Millisecond,
162					Max:        60000 * time.Millisecond,
163					Multiplier: 1.30,
164				})
165			}),
166		},
167		StreamingPull: []gax.CallOption{
168			gax.WithRetry(func() gax.Retryer {
169				return gax.OnCodes([]codes.Code{
170					codes.DeadlineExceeded,
171					codes.ResourceExhausted,
172					codes.Aborted,
173					codes.Internal,
174					codes.Unavailable,
175				}, gax.Backoff{
176					Initial:    100 * time.Millisecond,
177					Max:        60000 * time.Millisecond,
178					Multiplier: 1.30,
179				})
180			}),
181		},
182		ModifyPushConfig: []gax.CallOption{
183			gax.WithRetry(func() gax.Retryer {
184				return gax.OnCodes([]codes.Code{
185					codes.Unavailable,
186				}, gax.Backoff{
187					Initial:    100 * time.Millisecond,
188					Max:        60000 * time.Millisecond,
189					Multiplier: 1.30,
190				})
191			}),
192		},
193		GetSnapshot: []gax.CallOption{
194			gax.WithRetry(func() gax.Retryer {
195				return gax.OnCodes([]codes.Code{
196					codes.Unknown,
197					codes.Aborted,
198					codes.Unavailable,
199				}, gax.Backoff{
200					Initial:    100 * time.Millisecond,
201					Max:        60000 * time.Millisecond,
202					Multiplier: 1.30,
203				})
204			}),
205		},
206		ListSnapshots: []gax.CallOption{
207			gax.WithRetry(func() gax.Retryer {
208				return gax.OnCodes([]codes.Code{
209					codes.Unknown,
210					codes.Aborted,
211					codes.Unavailable,
212				}, gax.Backoff{
213					Initial:    100 * time.Millisecond,
214					Max:        60000 * time.Millisecond,
215					Multiplier: 1.30,
216				})
217			}),
218		},
219		CreateSnapshot: []gax.CallOption{
220			gax.WithRetry(func() gax.Retryer {
221				return gax.OnCodes([]codes.Code{
222					codes.Unavailable,
223				}, gax.Backoff{
224					Initial:    100 * time.Millisecond,
225					Max:        60000 * time.Millisecond,
226					Multiplier: 1.30,
227				})
228			}),
229		},
230		UpdateSnapshot: []gax.CallOption{
231			gax.WithRetry(func() gax.Retryer {
232				return gax.OnCodes([]codes.Code{
233					codes.Unavailable,
234				}, gax.Backoff{
235					Initial:    100 * time.Millisecond,
236					Max:        60000 * time.Millisecond,
237					Multiplier: 1.30,
238				})
239			}),
240		},
241		DeleteSnapshot: []gax.CallOption{
242			gax.WithRetry(func() gax.Retryer {
243				return gax.OnCodes([]codes.Code{
244					codes.Unavailable,
245				}, gax.Backoff{
246					Initial:    100 * time.Millisecond,
247					Max:        60000 * time.Millisecond,
248					Multiplier: 1.30,
249				})
250			}),
251		},
252		Seek: []gax.CallOption{
253			gax.WithRetry(func() gax.Retryer {
254				return gax.OnCodes([]codes.Code{
255					codes.Unknown,
256					codes.Aborted,
257					codes.Unavailable,
258				}, gax.Backoff{
259					Initial:    100 * time.Millisecond,
260					Max:        60000 * time.Millisecond,
261					Multiplier: 1.30,
262				})
263			}),
264		},
265	}
266}
267
268// SubscriberClient is a client for interacting with Cloud Pub/Sub API.
269//
270// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
271type SubscriberClient struct {
272	// Connection pool of gRPC connections to the service.
273	connPool gtransport.ConnPool
274
275	// flag to opt out of default deadlines via GOOGLE_API_GO_EXPERIMENTAL_DISABLE_DEFAULT_DEADLINE
276	disableDeadlines bool
277
278	// The gRPC API client.
279	subscriberClient pubsubpb.SubscriberClient
280
281	// The call options for this service.
282	CallOptions *SubscriberCallOptions
283
284	// The x-goog-* metadata to be sent with each request.
285	xGoogMetadata metadata.MD
286}
287
288// NewSubscriberClient creates a new subscriber client.
289//
290// The service that an application uses to manipulate subscriptions and to
291// consume messages from a subscription via the Pull method or by
292// establishing a bi-directional stream using the StreamingPull method.
293func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error) {
294	clientOpts := defaultSubscriberClientOptions()
295
296	if newSubscriberClientHook != nil {
297		hookOpts, err := newSubscriberClientHook(ctx, clientHookParams{})
298		if err != nil {
299			return nil, err
300		}
301		clientOpts = append(clientOpts, hookOpts...)
302	}
303
304	disableDeadlines, err := checkDisableDeadlines()
305	if err != nil {
306		return nil, err
307	}
308
309	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
310	if err != nil {
311		return nil, err
312	}
313	c := &SubscriberClient{
314		connPool:         connPool,
315		disableDeadlines: disableDeadlines,
316		CallOptions:      defaultSubscriberCallOptions(),
317
318		subscriberClient: pubsubpb.NewSubscriberClient(connPool),
319	}
320	c.setGoogleClientInfo()
321
322	return c, nil
323}
324
325// Connection returns a connection to the API service.
326//
327// Deprecated.
328func (c *SubscriberClient) Connection() *grpc.ClientConn {
329	return c.connPool.Conn()
330}
331
332// Close closes the connection to the API service. The user should invoke this when
333// the client is no longer required.
334func (c *SubscriberClient) Close() error {
335	return c.connPool.Close()
336}
337
338// setGoogleClientInfo sets the name and version of the application in
339// the `x-goog-api-client` header passed on each request. Intended for
340// use by Google-written clients.
341func (c *SubscriberClient) setGoogleClientInfo(keyval ...string) {
342	kv := append([]string{"gl-go", versionGo()}, keyval...)
343	kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version)
344	c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
345}
346
347// CreateSubscription creates a subscription to a given topic. See the [resource name rules]
348// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)).
349// If the subscription already exists, returns ALREADY_EXISTS.
350// If the corresponding topic doesn’t exist, returns NOT_FOUND.
351//
352// If the name is not provided in the request, the server will assign a random
353// name for this subscription on the same project as the topic, conforming
354// to the [resource name format]
355// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The generated
356// name is populated in the returned Subscription object. Note that for REST
357// API requests, you must specify a name in the request.
358func (c *SubscriberClient) CreateSubscription(ctx context.Context, req *pubsubpb.Subscription, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
359	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
360		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
361		defer cancel()
362		ctx = cctx
363	}
364	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName())))
365	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
366	opts = append(c.CallOptions.CreateSubscription[0:len(c.CallOptions.CreateSubscription):len(c.CallOptions.CreateSubscription)], opts...)
367	var resp *pubsubpb.Subscription
368	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
369		var err error
370		resp, err = c.subscriberClient.CreateSubscription(ctx, req, settings.GRPC...)
371		return err
372	}, opts...)
373	if err != nil {
374		return nil, err
375	}
376	return resp, nil
377}
378
379// GetSubscription gets the configuration details of a subscription.
380func (c *SubscriberClient) GetSubscription(ctx context.Context, req *pubsubpb.GetSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
381	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
382		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
383		defer cancel()
384		ctx = cctx
385	}
386	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
387	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
388	opts = append(c.CallOptions.GetSubscription[0:len(c.CallOptions.GetSubscription):len(c.CallOptions.GetSubscription)], opts...)
389	var resp *pubsubpb.Subscription
390	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
391		var err error
392		resp, err = c.subscriberClient.GetSubscription(ctx, req, settings.GRPC...)
393		return err
394	}, opts...)
395	if err != nil {
396		return nil, err
397	}
398	return resp, nil
399}
400
401// UpdateSubscription updates an existing subscription. Note that certain properties of a
402// subscription, such as its topic, are not modifiable.
403func (c *SubscriberClient) UpdateSubscription(ctx context.Context, req *pubsubpb.UpdateSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
404	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
405		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
406		defer cancel()
407		ctx = cctx
408	}
409	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription.name", url.QueryEscape(req.GetSubscription().GetName())))
410	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
411	opts = append(c.CallOptions.UpdateSubscription[0:len(c.CallOptions.UpdateSubscription):len(c.CallOptions.UpdateSubscription)], opts...)
412	var resp *pubsubpb.Subscription
413	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
414		var err error
415		resp, err = c.subscriberClient.UpdateSubscription(ctx, req, settings.GRPC...)
416		return err
417	}, opts...)
418	if err != nil {
419		return nil, err
420	}
421	return resp, nil
422}
423
424// ListSubscriptions lists matching subscriptions.
425func (c *SubscriberClient) ListSubscriptions(ctx context.Context, req *pubsubpb.ListSubscriptionsRequest, opts ...gax.CallOption) *SubscriptionIterator {
426	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject())))
427	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
428	opts = append(c.CallOptions.ListSubscriptions[0:len(c.CallOptions.ListSubscriptions):len(c.CallOptions.ListSubscriptions)], opts...)
429	it := &SubscriptionIterator{}
430	req = proto.Clone(req).(*pubsubpb.ListSubscriptionsRequest)
431	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Subscription, string, error) {
432		var resp *pubsubpb.ListSubscriptionsResponse
433		req.PageToken = pageToken
434		if pageSize > math.MaxInt32 {
435			req.PageSize = math.MaxInt32
436		} else {
437			req.PageSize = int32(pageSize)
438		}
439		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
440			var err error
441			resp, err = c.subscriberClient.ListSubscriptions(ctx, req, settings.GRPC...)
442			return err
443		}, opts...)
444		if err != nil {
445			return nil, "", err
446		}
447
448		it.Response = resp
449		return resp.GetSubscriptions(), resp.GetNextPageToken(), nil
450	}
451	fetch := func(pageSize int, pageToken string) (string, error) {
452		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
453		if err != nil {
454			return "", err
455		}
456		it.items = append(it.items, items...)
457		return nextPageToken, nil
458	}
459	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
460	it.pageInfo.MaxSize = int(req.GetPageSize())
461	it.pageInfo.Token = req.GetPageToken()
462	return it
463}
464
465// DeleteSubscription deletes an existing subscription. All messages retained in the subscription
466// are immediately dropped. Calls to Pull after deletion will return
467// NOT_FOUND. After a subscription is deleted, a new one may be created with
468// the same name, but the new one has no association with the old
469// subscription or its topic unless the same topic is specified.
470func (c *SubscriberClient) DeleteSubscription(ctx context.Context, req *pubsubpb.DeleteSubscriptionRequest, opts ...gax.CallOption) error {
471	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
472		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
473		defer cancel()
474		ctx = cctx
475	}
476	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
477	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
478	opts = append(c.CallOptions.DeleteSubscription[0:len(c.CallOptions.DeleteSubscription):len(c.CallOptions.DeleteSubscription)], opts...)
479	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
480		var err error
481		_, err = c.subscriberClient.DeleteSubscription(ctx, req, settings.GRPC...)
482		return err
483	}, opts...)
484	return err
485}
486
487// ModifyAckDeadline modifies the ack deadline for a specific message. This method is useful
488// to indicate that more time is needed to process a message by the
489// subscriber, or to make the message available for redelivery if the
490// processing was interrupted. Note that this does not modify the
491// subscription-level ackDeadlineSeconds used for subsequent messages.
492func (c *SubscriberClient) ModifyAckDeadline(ctx context.Context, req *pubsubpb.ModifyAckDeadlineRequest, opts ...gax.CallOption) error {
493	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
494		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
495		defer cancel()
496		ctx = cctx
497	}
498	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
499	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
500	opts = append(c.CallOptions.ModifyAckDeadline[0:len(c.CallOptions.ModifyAckDeadline):len(c.CallOptions.ModifyAckDeadline)], opts...)
501	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
502		var err error
503		_, err = c.subscriberClient.ModifyAckDeadline(ctx, req, settings.GRPC...)
504		return err
505	}, opts...)
506	return err
507}
508
509// Acknowledge acknowledges the messages associated with the ack_ids in the
510// AcknowledgeRequest. The Pub/Sub system can remove the relevant messages
511// from the subscription.
512//
513// Acknowledging a message whose ack deadline has expired may succeed,
514// but such a message may be redelivered later. Acknowledging a message more
515// than once will not result in an error.
516func (c *SubscriberClient) Acknowledge(ctx context.Context, req *pubsubpb.AcknowledgeRequest, opts ...gax.CallOption) error {
517	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
518		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
519		defer cancel()
520		ctx = cctx
521	}
522	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
523	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
524	opts = append(c.CallOptions.Acknowledge[0:len(c.CallOptions.Acknowledge):len(c.CallOptions.Acknowledge)], opts...)
525	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
526		var err error
527		_, err = c.subscriberClient.Acknowledge(ctx, req, settings.GRPC...)
528		return err
529	}, opts...)
530	return err
531}
532
533// Pull pulls messages from the server. The server may return UNAVAILABLE if
534// there are too many concurrent pull requests pending for the given
535// subscription.
536func (c *SubscriberClient) Pull(ctx context.Context, req *pubsubpb.PullRequest, opts ...gax.CallOption) (*pubsubpb.PullResponse, error) {
537	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
538		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
539		defer cancel()
540		ctx = cctx
541	}
542	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
543	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
544	opts = append(c.CallOptions.Pull[0:len(c.CallOptions.Pull):len(c.CallOptions.Pull)], opts...)
545	var resp *pubsubpb.PullResponse
546	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
547		var err error
548		resp, err = c.subscriberClient.Pull(ctx, req, settings.GRPC...)
549		return err
550	}, opts...)
551	if err != nil {
552		return nil, err
553	}
554	return resp, nil
555}
556
557// StreamingPull establishes a stream with the server, which sends messages down to the
558// client. The client streams acknowledgements and ack deadline modifications
559// back to the server. The server will close the stream and return the status
560// on any error. The server may close the stream with status UNAVAILABLE to
561// reassign server-side resources, in which case, the client should
562// re-establish the stream. Flow control can be achieved by configuring the
563// underlying RPC channel.
564func (c *SubscriberClient) StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error) {
565	ctx = insertMetadata(ctx, c.xGoogMetadata)
566	opts = append(c.CallOptions.StreamingPull[0:len(c.CallOptions.StreamingPull):len(c.CallOptions.StreamingPull)], opts...)
567	var resp pubsubpb.Subscriber_StreamingPullClient
568	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
569		var err error
570		resp, err = c.subscriberClient.StreamingPull(ctx, settings.GRPC...)
571		return err
572	}, opts...)
573	if err != nil {
574		return nil, err
575	}
576	return resp, nil
577}
578
579// ModifyPushConfig modifies the PushConfig for a specified subscription.
580//
581// This may be used to change a push subscription to a pull one (signified by
582// an empty PushConfig) or vice versa, or change the endpoint URL and other
583// attributes of a push subscription. Messages will accumulate for delivery
584// continuously through the call regardless of changes to the PushConfig.
585func (c *SubscriberClient) ModifyPushConfig(ctx context.Context, req *pubsubpb.ModifyPushConfigRequest, opts ...gax.CallOption) error {
586	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
587		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
588		defer cancel()
589		ctx = cctx
590	}
591	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
592	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
593	opts = append(c.CallOptions.ModifyPushConfig[0:len(c.CallOptions.ModifyPushConfig):len(c.CallOptions.ModifyPushConfig)], opts...)
594	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
595		var err error
596		_, err = c.subscriberClient.ModifyPushConfig(ctx, req, settings.GRPC...)
597		return err
598	}, opts...)
599	return err
600}
601
602// GetSnapshot gets the configuration details of a snapshot. Snapshots are used in
603// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
604// operations, which allow you to manage message acknowledgments in bulk. That
605// is, you can set the acknowledgment state of messages in an existing
606// subscription to the state captured by a snapshot.
607func (c *SubscriberClient) GetSnapshot(ctx context.Context, req *pubsubpb.GetSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error) {
608	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
609		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
610		defer cancel()
611		ctx = cctx
612	}
613	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "snapshot", url.QueryEscape(req.GetSnapshot())))
614	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
615	opts = append(c.CallOptions.GetSnapshot[0:len(c.CallOptions.GetSnapshot):len(c.CallOptions.GetSnapshot)], opts...)
616	var resp *pubsubpb.Snapshot
617	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
618		var err error
619		resp, err = c.subscriberClient.GetSnapshot(ctx, req, settings.GRPC...)
620		return err
621	}, opts...)
622	if err != nil {
623		return nil, err
624	}
625	return resp, nil
626}
627
628// ListSnapshots lists the existing snapshots. Snapshots are used in Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations, which
629// allow you to manage message acknowledgments in bulk. That is, you can set
630// the acknowledgment state of messages in an existing subscription to the
631// state captured by a snapshot.
632func (c *SubscriberClient) ListSnapshots(ctx context.Context, req *pubsubpb.ListSnapshotsRequest, opts ...gax.CallOption) *SnapshotIterator {
633	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject())))
634	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
635	opts = append(c.CallOptions.ListSnapshots[0:len(c.CallOptions.ListSnapshots):len(c.CallOptions.ListSnapshots)], opts...)
636	it := &SnapshotIterator{}
637	req = proto.Clone(req).(*pubsubpb.ListSnapshotsRequest)
638	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Snapshot, string, error) {
639		var resp *pubsubpb.ListSnapshotsResponse
640		req.PageToken = pageToken
641		if pageSize > math.MaxInt32 {
642			req.PageSize = math.MaxInt32
643		} else {
644			req.PageSize = int32(pageSize)
645		}
646		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
647			var err error
648			resp, err = c.subscriberClient.ListSnapshots(ctx, req, settings.GRPC...)
649			return err
650		}, opts...)
651		if err != nil {
652			return nil, "", err
653		}
654
655		it.Response = resp
656		return resp.GetSnapshots(), resp.GetNextPageToken(), nil
657	}
658	fetch := func(pageSize int, pageToken string) (string, error) {
659		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
660		if err != nil {
661			return "", err
662		}
663		it.items = append(it.items, items...)
664		return nextPageToken, nil
665	}
666	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
667	it.pageInfo.MaxSize = int(req.GetPageSize())
668	it.pageInfo.Token = req.GetPageToken()
669	return it
670}
671
672// CreateSnapshot creates a snapshot from the requested subscription. Snapshots are used in
673// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
674// which allow you to manage message acknowledgments in bulk. That is, you can
675// set the acknowledgment state of messages in an existing subscription to the
676// state captured by a snapshot.
677// If the snapshot already exists, returns ALREADY_EXISTS.
678// If the requested subscription doesn’t exist, returns NOT_FOUND.
679// If the backlog in the subscription is too old – and the resulting snapshot
680// would expire in less than 1 hour – then FAILED_PRECONDITION is returned.
681// See also the Snapshot.expire_time field. If the name is not provided in
682// the request, the server will assign a random
683// name for this snapshot on the same project as the subscription, conforming
684// to the [resource name format]
685// (https://cloud.google.com/pubsub/docs/admin#resource_names (at https://cloud.google.com/pubsub/docs/admin#resource_names)). The
686// generated name is populated in the returned Snapshot object. Note that for
687// REST API requests, you must specify a name in the request.
688func (c *SubscriberClient) CreateSnapshot(ctx context.Context, req *pubsubpb.CreateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error) {
689	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
690		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
691		defer cancel()
692		ctx = cctx
693	}
694	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName())))
695	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
696	opts = append(c.CallOptions.CreateSnapshot[0:len(c.CallOptions.CreateSnapshot):len(c.CallOptions.CreateSnapshot)], opts...)
697	var resp *pubsubpb.Snapshot
698	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
699		var err error
700		resp, err = c.subscriberClient.CreateSnapshot(ctx, req, settings.GRPC...)
701		return err
702	}, opts...)
703	if err != nil {
704		return nil, err
705	}
706	return resp, nil
707}
708
709// UpdateSnapshot updates an existing snapshot. Snapshots are used in
710// Seek (at https://cloud.google.com/pubsub/docs/replay-overview)
711// operations, which allow
712// you to manage message acknowledgments in bulk. That is, you can set the
713// acknowledgment state of messages in an existing subscription to the state
714// captured by a snapshot.
715func (c *SubscriberClient) UpdateSnapshot(ctx context.Context, req *pubsubpb.UpdateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error) {
716	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
717		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
718		defer cancel()
719		ctx = cctx
720	}
721	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "snapshot.name", url.QueryEscape(req.GetSnapshot().GetName())))
722	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
723	opts = append(c.CallOptions.UpdateSnapshot[0:len(c.CallOptions.UpdateSnapshot):len(c.CallOptions.UpdateSnapshot)], opts...)
724	var resp *pubsubpb.Snapshot
725	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
726		var err error
727		resp, err = c.subscriberClient.UpdateSnapshot(ctx, req, settings.GRPC...)
728		return err
729	}, opts...)
730	if err != nil {
731		return nil, err
732	}
733	return resp, nil
734}
735
736// DeleteSnapshot removes an existing snapshot. Snapshots are used in [Seek]
737// (https://cloud.google.com/pubsub/docs/replay-overview (at https://cloud.google.com/pubsub/docs/replay-overview)) operations, which
738// allow you to manage message acknowledgments in bulk. That is, you can set
739// the acknowledgment state of messages in an existing subscription to the
740// state captured by a snapshot.
741// When the snapshot is deleted, all messages retained in the snapshot
742// are immediately dropped. After a snapshot is deleted, a new one may be
743// created with the same name, but the new one has no association with the old
744// snapshot or its subscription, unless the same subscription is specified.
745func (c *SubscriberClient) DeleteSnapshot(ctx context.Context, req *pubsubpb.DeleteSnapshotRequest, opts ...gax.CallOption) error {
746	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
747		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
748		defer cancel()
749		ctx = cctx
750	}
751	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "snapshot", url.QueryEscape(req.GetSnapshot())))
752	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
753	opts = append(c.CallOptions.DeleteSnapshot[0:len(c.CallOptions.DeleteSnapshot):len(c.CallOptions.DeleteSnapshot)], opts...)
754	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
755		var err error
756		_, err = c.subscriberClient.DeleteSnapshot(ctx, req, settings.GRPC...)
757		return err
758	}, opts...)
759	return err
760}
761
762// Seek seeks an existing subscription to a point in time or to a given snapshot,
763// whichever is provided in the request. Snapshots are used in Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations, which
764// allow you to manage message acknowledgments in bulk. That is, you can set
765// the acknowledgment state of messages in an existing subscription to the
766// state captured by a snapshot. Note that both the subscription and the
767// snapshot must be on the same topic.
768func (c *SubscriberClient) Seek(ctx context.Context, req *pubsubpb.SeekRequest, opts ...gax.CallOption) (*pubsubpb.SeekResponse, error) {
769	if _, ok := ctx.Deadline(); !ok && !c.disableDeadlines {
770		cctx, cancel := context.WithTimeout(ctx, 60000*time.Millisecond)
771		defer cancel()
772		ctx = cctx
773	}
774	md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription())))
775	ctx = insertMetadata(ctx, c.xGoogMetadata, md)
776	opts = append(c.CallOptions.Seek[0:len(c.CallOptions.Seek):len(c.CallOptions.Seek)], opts...)
777	var resp *pubsubpb.SeekResponse
778	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
779		var err error
780		resp, err = c.subscriberClient.Seek(ctx, req, settings.GRPC...)
781		return err
782	}, opts...)
783	if err != nil {
784		return nil, err
785	}
786	return resp, nil
787}
788
789// SnapshotIterator manages a stream of *pubsubpb.Snapshot.
790type SnapshotIterator struct {
791	items    []*pubsubpb.Snapshot
792	pageInfo *iterator.PageInfo
793	nextFunc func() error
794
795	// Response is the raw response for the current page.
796	// It must be cast to the RPC response type.
797	// Calling Next() or InternalFetch() updates this value.
798	Response interface{}
799
800	// InternalFetch is for use by the Google Cloud Libraries only.
801	// It is not part of the stable interface of this package.
802	//
803	// InternalFetch returns results from a single call to the underlying RPC.
804	// The number of results is no greater than pageSize.
805	// If there are no more results, nextPageToken is empty and err is nil.
806	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Snapshot, nextPageToken string, err error)
807}
808
809// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
810func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
811	return it.pageInfo
812}
813
814// Next returns the next result. Its second return value is iterator.Done if there are no more
815// results. Once Next returns Done, all subsequent calls will return Done.
816func (it *SnapshotIterator) Next() (*pubsubpb.Snapshot, error) {
817	var item *pubsubpb.Snapshot
818	if err := it.nextFunc(); err != nil {
819		return item, err
820	}
821	item = it.items[0]
822	it.items = it.items[1:]
823	return item, nil
824}
825
826func (it *SnapshotIterator) bufLen() int {
827	return len(it.items)
828}
829
830func (it *SnapshotIterator) takeBuf() interface{} {
831	b := it.items
832	it.items = nil
833	return b
834}
835
836// SubscriptionIterator manages a stream of *pubsubpb.Subscription.
837type SubscriptionIterator struct {
838	items    []*pubsubpb.Subscription
839	pageInfo *iterator.PageInfo
840	nextFunc func() error
841
842	// Response is the raw response for the current page.
843	// It must be cast to the RPC response type.
844	// Calling Next() or InternalFetch() updates this value.
845	Response interface{}
846
847	// InternalFetch is for use by the Google Cloud Libraries only.
848	// It is not part of the stable interface of this package.
849	//
850	// InternalFetch returns results from a single call to the underlying RPC.
851	// The number of results is no greater than pageSize.
852	// If there are no more results, nextPageToken is empty and err is nil.
853	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Subscription, nextPageToken string, err error)
854}
855
856// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
857func (it *SubscriptionIterator) PageInfo() *iterator.PageInfo {
858	return it.pageInfo
859}
860
861// Next returns the next result. Its second return value is iterator.Done if there are no more
862// results. Once Next returns Done, all subsequent calls will return Done.
863func (it *SubscriptionIterator) Next() (*pubsubpb.Subscription, error) {
864	var item *pubsubpb.Subscription
865	if err := it.nextFunc(); err != nil {
866		return item, err
867	}
868	item = it.items[0]
869	it.items = it.items[1:]
870	return item, nil
871}
872
873func (it *SubscriptionIterator) bufLen() int {
874	return len(it.items)
875}
876
877func (it *SubscriptionIterator) takeBuf() interface{} {
878	b := it.items
879	it.items = nil
880	return b
881}
882