1// Copyright 2017, Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// AUTO-GENERATED CODE. DO NOT EDIT.
16
17package pubsub
18
19import (
20	"math"
21	"time"
22
23	"cloud.google.com/go/iam"
24	"cloud.google.com/go/internal/version"
25	gax "github.com/googleapis/gax-go"
26	"golang.org/x/net/context"
27	"google.golang.org/api/iterator"
28	"google.golang.org/api/option"
29	"google.golang.org/api/transport"
30	pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36// SubscriberCallOptions contains the retry settings for each method of SubscriberClient.
37type SubscriberCallOptions struct {
38	CreateSubscription []gax.CallOption
39	GetSubscription    []gax.CallOption
40	UpdateSubscription []gax.CallOption
41	ListSubscriptions  []gax.CallOption
42	DeleteSubscription []gax.CallOption
43	ModifyAckDeadline  []gax.CallOption
44	Acknowledge        []gax.CallOption
45	Pull               []gax.CallOption
46	StreamingPull      []gax.CallOption
47	ModifyPushConfig   []gax.CallOption
48	ListSnapshots      []gax.CallOption
49	CreateSnapshot     []gax.CallOption
50	UpdateSnapshot     []gax.CallOption
51	DeleteSnapshot     []gax.CallOption
52	Seek               []gax.CallOption
53}
54
55func defaultSubscriberClientOptions() []option.ClientOption {
56	return []option.ClientOption{
57		option.WithEndpoint("pubsub.googleapis.com:443"),
58		option.WithScopes(DefaultAuthScopes()...),
59	}
60}
61
62func defaultSubscriberCallOptions() *SubscriberCallOptions {
63	retry := map[[2]string][]gax.CallOption{
64		{"default", "idempotent"}: {
65			gax.WithRetry(func() gax.Retryer {
66				return gax.OnCodes([]codes.Code{
67					codes.DeadlineExceeded,
68					codes.Unavailable,
69				}, gax.Backoff{
70					Initial:    100 * time.Millisecond,
71					Max:        60000 * time.Millisecond,
72					Multiplier: 1.3,
73				})
74			}),
75		},
76		{"messaging", "pull"}: {
77			gax.WithRetry(func() gax.Retryer {
78				return gax.OnCodes([]codes.Code{
79					codes.Canceled,
80					codes.DeadlineExceeded,
81					codes.Internal,
82					codes.ResourceExhausted,
83					codes.Unavailable,
84				}, gax.Backoff{
85					Initial:    100 * time.Millisecond,
86					Max:        60000 * time.Millisecond,
87					Multiplier: 1.3,
88				})
89			}),
90		},
91		{"streaming_messaging", "pull"}: {
92			gax.WithRetry(func() gax.Retryer {
93				return gax.OnCodes([]codes.Code{
94					codes.Canceled,
95					codes.DeadlineExceeded,
96					codes.Internal,
97					codes.ResourceExhausted,
98					codes.Unavailable,
99				}, gax.Backoff{
100					Initial:    100 * time.Millisecond,
101					Max:        60000 * time.Millisecond,
102					Multiplier: 1.3,
103				})
104			}),
105		},
106	}
107	return &SubscriberCallOptions{
108		CreateSubscription: retry[[2]string{"default", "idempotent"}],
109		GetSubscription:    retry[[2]string{"default", "idempotent"}],
110		UpdateSubscription: retry[[2]string{"default", "idempotent"}],
111		ListSubscriptions:  retry[[2]string{"default", "idempotent"}],
112		DeleteSubscription: retry[[2]string{"default", "idempotent"}],
113		ModifyAckDeadline:  retry[[2]string{"default", "non_idempotent"}],
114		Acknowledge:        retry[[2]string{"messaging", "non_idempotent"}],
115		Pull:               retry[[2]string{"messaging", "pull"}],
116		StreamingPull:      retry[[2]string{"streaming_messaging", "pull"}],
117		ModifyPushConfig:   retry[[2]string{"default", "non_idempotent"}],
118		ListSnapshots:      retry[[2]string{"default", "idempotent"}],
119		CreateSnapshot:     retry[[2]string{"default", "idempotent"}],
120		UpdateSnapshot:     retry[[2]string{"default", "idempotent"}],
121		DeleteSnapshot:     retry[[2]string{"default", "idempotent"}],
122		Seek:               retry[[2]string{"default", "non_idempotent"}],
123	}
124}
125
126// SubscriberClient is a client for interacting with Google Cloud Pub/Sub API.
127type SubscriberClient struct {
128	// The connection to the service.
129	conn *grpc.ClientConn
130
131	// The gRPC API client.
132	subscriberClient pubsubpb.SubscriberClient
133
134	// The call options for this service.
135	CallOptions *SubscriberCallOptions
136
137	// The metadata to be sent with each request.
138	Metadata metadata.MD
139}
140
141// NewSubscriberClient creates a new subscriber client.
142//
143// The service that an application uses to manipulate subscriptions and to
144// consume messages from a subscription via the Pull method.
145func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error) {
146	conn, err := transport.DialGRPC(ctx, append(defaultSubscriberClientOptions(), opts...)...)
147	if err != nil {
148		return nil, err
149	}
150	c := &SubscriberClient{
151		conn:        conn,
152		CallOptions: defaultSubscriberCallOptions(),
153
154		subscriberClient: pubsubpb.NewSubscriberClient(conn),
155	}
156	c.SetGoogleClientInfo()
157	return c, nil
158}
159
160// Connection returns the client's connection to the API service.
161func (c *SubscriberClient) Connection() *grpc.ClientConn {
162	return c.conn
163}
164
165// Close closes the connection to the API service. The user should invoke this when
166// the client is no longer required.
167func (c *SubscriberClient) Close() error {
168	return c.conn.Close()
169}
170
171// SetGoogleClientInfo sets the name and version of the application in
172// the `x-goog-api-client` header passed on each request. Intended for
173// use by Google-written clients.
174func (c *SubscriberClient) SetGoogleClientInfo(keyval ...string) {
175	kv := append([]string{"gl-go", version.Go()}, keyval...)
176	kv = append(kv, "gapic", version.Repo, "gax", gax.Version, "grpc", grpc.Version)
177	c.Metadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
178}
179
180// SubscriberProjectPath returns the path for the project resource.
181func SubscriberProjectPath(project string) string {
182	return "" +
183		"projects/" +
184		project +
185		""
186}
187
188// SubscriberSnapshotPath returns the path for the snapshot resource.
189func SubscriberSnapshotPath(project, snapshot string) string {
190	return "" +
191		"projects/" +
192		project +
193		"/snapshots/" +
194		snapshot +
195		""
196}
197
198// SubscriberSubscriptionPath returns the path for the subscription resource.
199func SubscriberSubscriptionPath(project, subscription string) string {
200	return "" +
201		"projects/" +
202		project +
203		"/subscriptions/" +
204		subscription +
205		""
206}
207
208// SubscriberTopicPath returns the path for the topic resource.
209func SubscriberTopicPath(project, topic string) string {
210	return "" +
211		"projects/" +
212		project +
213		"/topics/" +
214		topic +
215		""
216}
217
218func (c *SubscriberClient) SubscriptionIAM(subscription *pubsubpb.Subscription) *iam.Handle {
219	return iam.InternalNewHandle(c.Connection(), subscription.Name)
220}
221
222func (c *SubscriberClient) TopicIAM(topic *pubsubpb.Topic) *iam.Handle {
223	return iam.InternalNewHandle(c.Connection(), topic.Name)
224}
225
226// CreateSubscription creates a subscription to a given topic.
227// If the subscription already exists, returns ALREADY_EXISTS.
228// If the corresponding topic doesn't exist, returns NOT_FOUND.
229//
230// If the name is not provided in the request, the server will assign a random
231// name for this subscription on the same project as the topic, conforming
232// to the
233// resource name format (at https://cloud.google.com/pubsub/docs/overview#names).
234// The generated name is populated in the returned Subscription object.
235// Note that for REST API requests, you must specify a name in the request.
236func (c *SubscriberClient) CreateSubscription(ctx context.Context, req *pubsubpb.Subscription, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
237	ctx = insertMetadata(ctx, c.Metadata)
238	opts = append(c.CallOptions.CreateSubscription[0:len(c.CallOptions.CreateSubscription):len(c.CallOptions.CreateSubscription)], opts...)
239	var resp *pubsubpb.Subscription
240	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
241		var err error
242		resp, err = c.subscriberClient.CreateSubscription(ctx, req, settings.GRPC...)
243		return err
244	}, opts...)
245	if err != nil {
246		return nil, err
247	}
248	return resp, nil
249}
250
251// GetSubscription gets the configuration details of a subscription.
252func (c *SubscriberClient) GetSubscription(ctx context.Context, req *pubsubpb.GetSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
253	ctx = insertMetadata(ctx, c.Metadata)
254	opts = append(c.CallOptions.GetSubscription[0:len(c.CallOptions.GetSubscription):len(c.CallOptions.GetSubscription)], opts...)
255	var resp *pubsubpb.Subscription
256	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
257		var err error
258		resp, err = c.subscriberClient.GetSubscription(ctx, req, settings.GRPC...)
259		return err
260	}, opts...)
261	if err != nil {
262		return nil, err
263	}
264	return resp, nil
265}
266
267// UpdateSubscription updates an existing subscription. Note that certain properties of a
268// subscription, such as its topic, are not modifiable.
269// NOTE:  The style guide requires body: "subscription" instead of body: "*".
270// Keeping the latter for internal consistency in V1, however it should be
271// corrected in V2.  See
272// https://cloud.google.com/apis/design/standard_methods#update for details.
273func (c *SubscriberClient) UpdateSubscription(ctx context.Context, req *pubsubpb.UpdateSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error) {
274	ctx = insertMetadata(ctx, c.Metadata)
275	opts = append(c.CallOptions.UpdateSubscription[0:len(c.CallOptions.UpdateSubscription):len(c.CallOptions.UpdateSubscription)], opts...)
276	var resp *pubsubpb.Subscription
277	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
278		var err error
279		resp, err = c.subscriberClient.UpdateSubscription(ctx, req, settings.GRPC...)
280		return err
281	}, opts...)
282	if err != nil {
283		return nil, err
284	}
285	return resp, nil
286}
287
288// ListSubscriptions lists matching subscriptions.
289func (c *SubscriberClient) ListSubscriptions(ctx context.Context, req *pubsubpb.ListSubscriptionsRequest, opts ...gax.CallOption) *SubscriptionIterator {
290	ctx = insertMetadata(ctx, c.Metadata)
291	opts = append(c.CallOptions.ListSubscriptions[0:len(c.CallOptions.ListSubscriptions):len(c.CallOptions.ListSubscriptions)], opts...)
292	it := &SubscriptionIterator{}
293	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Subscription, string, error) {
294		var resp *pubsubpb.ListSubscriptionsResponse
295		req.PageToken = pageToken
296		if pageSize > math.MaxInt32 {
297			req.PageSize = math.MaxInt32
298		} else {
299			req.PageSize = int32(pageSize)
300		}
301		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
302			var err error
303			resp, err = c.subscriberClient.ListSubscriptions(ctx, req, settings.GRPC...)
304			return err
305		}, opts...)
306		if err != nil {
307			return nil, "", err
308		}
309		return resp.Subscriptions, resp.NextPageToken, nil
310	}
311	fetch := func(pageSize int, pageToken string) (string, error) {
312		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
313		if err != nil {
314			return "", err
315		}
316		it.items = append(it.items, items...)
317		return nextPageToken, nil
318	}
319	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
320	return it
321}
322
323// DeleteSubscription deletes an existing subscription. All messages retained in the subscription
324// are immediately dropped. Calls to Pull after deletion will return
325// NOT_FOUND. After a subscription is deleted, a new one may be created with
326// the same name, but the new one has no association with the old
327// subscription or its topic unless the same topic is specified.
328func (c *SubscriberClient) DeleteSubscription(ctx context.Context, req *pubsubpb.DeleteSubscriptionRequest, opts ...gax.CallOption) error {
329	ctx = insertMetadata(ctx, c.Metadata)
330	opts = append(c.CallOptions.DeleteSubscription[0:len(c.CallOptions.DeleteSubscription):len(c.CallOptions.DeleteSubscription)], opts...)
331	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
332		var err error
333		_, err = c.subscriberClient.DeleteSubscription(ctx, req, settings.GRPC...)
334		return err
335	}, opts...)
336	return err
337}
338
339// ModifyAckDeadline modifies the ack deadline for a specific message. This method is useful
340// to indicate that more time is needed to process a message by the
341// subscriber, or to make the message available for redelivery if the
342// processing was interrupted. Note that this does not modify the
343// subscription-level ackDeadlineSeconds used for subsequent messages.
344func (c *SubscriberClient) ModifyAckDeadline(ctx context.Context, req *pubsubpb.ModifyAckDeadlineRequest, opts ...gax.CallOption) error {
345	ctx = insertMetadata(ctx, c.Metadata)
346	opts = append(c.CallOptions.ModifyAckDeadline[0:len(c.CallOptions.ModifyAckDeadline):len(c.CallOptions.ModifyAckDeadline)], opts...)
347	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
348		var err error
349		_, err = c.subscriberClient.ModifyAckDeadline(ctx, req, settings.GRPC...)
350		return err
351	}, opts...)
352	return err
353}
354
355// Acknowledge acknowledges the messages associated with the ack_ids in the
356// AcknowledgeRequest. The Pub/Sub system can remove the relevant messages
357// from the subscription.
358//
359// Acknowledging a message whose ack deadline has expired may succeed,
360// but such a message may be redelivered later. Acknowledging a message more
361// than once will not result in an error.
362func (c *SubscriberClient) Acknowledge(ctx context.Context, req *pubsubpb.AcknowledgeRequest, opts ...gax.CallOption) error {
363	ctx = insertMetadata(ctx, c.Metadata)
364	opts = append(c.CallOptions.Acknowledge[0:len(c.CallOptions.Acknowledge):len(c.CallOptions.Acknowledge)], opts...)
365	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
366		var err error
367		_, err = c.subscriberClient.Acknowledge(ctx, req, settings.GRPC...)
368		return err
369	}, opts...)
370	return err
371}
372
373// Pull pulls messages from the server. Returns an empty list if there are no
374// messages available in the backlog. The server may return UNAVAILABLE if
375// there are too many concurrent pull requests pending for the given
376// subscription.
377func (c *SubscriberClient) Pull(ctx context.Context, req *pubsubpb.PullRequest, opts ...gax.CallOption) (*pubsubpb.PullResponse, error) {
378	ctx = insertMetadata(ctx, c.Metadata)
379	opts = append(c.CallOptions.Pull[0:len(c.CallOptions.Pull):len(c.CallOptions.Pull)], opts...)
380	var resp *pubsubpb.PullResponse
381	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
382		var err error
383		resp, err = c.subscriberClient.Pull(ctx, req, settings.GRPC...)
384		return err
385	}, opts...)
386	if err != nil {
387		return nil, err
388	}
389	return resp, nil
390}
391
392// StreamingPull (EXPERIMENTAL) StreamingPull is an experimental feature. This RPC will
393// respond with UNIMPLEMENTED errors unless you have been invited to test
394// this feature. Contact cloud-pubsub@google.com with any questions.
395//
396// Establishes a stream with the server, which sends messages down to the
397// client. The client streams acknowledgements and ack deadline modifications
398// back to the server. The server will close the stream and return the status
399// on any error. The server may close the stream with status OK to reassign
400// server-side resources, in which case, the client should re-establish the
401// stream. UNAVAILABLE may also be returned in the case of a transient error
402// (e.g., a server restart). These should also be retried by the client. Flow
403// control can be achieved by configuring the underlying RPC channel.
404func (c *SubscriberClient) StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error) {
405	ctx = insertMetadata(ctx, c.Metadata)
406	opts = append(c.CallOptions.StreamingPull[0:len(c.CallOptions.StreamingPull):len(c.CallOptions.StreamingPull)], opts...)
407	var resp pubsubpb.Subscriber_StreamingPullClient
408	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
409		var err error
410		resp, err = c.subscriberClient.StreamingPull(ctx, settings.GRPC...)
411		return err
412	}, opts...)
413	if err != nil {
414		return nil, err
415	}
416	return resp, nil
417}
418
419// ModifyPushConfig modifies the PushConfig for a specified subscription.
420//
421// This may be used to change a push subscription to a pull one (signified by
422// an empty PushConfig) or vice versa, or change the endpoint URL and other
423// attributes of a push subscription. Messages will accumulate for delivery
424// continuously through the call regardless of changes to the PushConfig.
425func (c *SubscriberClient) ModifyPushConfig(ctx context.Context, req *pubsubpb.ModifyPushConfigRequest, opts ...gax.CallOption) error {
426	ctx = insertMetadata(ctx, c.Metadata)
427	opts = append(c.CallOptions.ModifyPushConfig[0:len(c.CallOptions.ModifyPushConfig):len(c.CallOptions.ModifyPushConfig)], opts...)
428	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
429		var err error
430		_, err = c.subscriberClient.ModifyPushConfig(ctx, req, settings.GRPC...)
431		return err
432	}, opts...)
433	return err
434}
435
436// ListSnapshots lists the existing snapshots.
437func (c *SubscriberClient) ListSnapshots(ctx context.Context, req *pubsubpb.ListSnapshotsRequest, opts ...gax.CallOption) *SnapshotIterator {
438	ctx = insertMetadata(ctx, c.Metadata)
439	opts = append(c.CallOptions.ListSnapshots[0:len(c.CallOptions.ListSnapshots):len(c.CallOptions.ListSnapshots)], opts...)
440	it := &SnapshotIterator{}
441	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Snapshot, string, error) {
442		var resp *pubsubpb.ListSnapshotsResponse
443		req.PageToken = pageToken
444		if pageSize > math.MaxInt32 {
445			req.PageSize = math.MaxInt32
446		} else {
447			req.PageSize = int32(pageSize)
448		}
449		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
450			var err error
451			resp, err = c.subscriberClient.ListSnapshots(ctx, req, settings.GRPC...)
452			return err
453		}, opts...)
454		if err != nil {
455			return nil, "", err
456		}
457		return resp.Snapshots, resp.NextPageToken, nil
458	}
459	fetch := func(pageSize int, pageToken string) (string, error) {
460		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
461		if err != nil {
462			return "", err
463		}
464		it.items = append(it.items, items...)
465		return nextPageToken, nil
466	}
467	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
468	return it
469}
470
471// CreateSnapshot creates a snapshot from the requested subscription.
472// If the snapshot already exists, returns ALREADY_EXISTS.
473// If the requested subscription doesn't exist, returns NOT_FOUND.
474//
475// If the name is not provided in the request, the server will assign a random
476// name for this snapshot on the same project as the subscription, conforming
477// to the
478// resource name format (at https://cloud.google.com/pubsub/docs/overview#names).
479// The generated name is populated in the returned Snapshot object.
480// Note that for REST API requests, you must specify a name in the request.
481func (c *SubscriberClient) CreateSnapshot(ctx context.Context, req *pubsubpb.CreateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error) {
482	ctx = insertMetadata(ctx, c.Metadata)
483	opts = append(c.CallOptions.CreateSnapshot[0:len(c.CallOptions.CreateSnapshot):len(c.CallOptions.CreateSnapshot)], opts...)
484	var resp *pubsubpb.Snapshot
485	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
486		var err error
487		resp, err = c.subscriberClient.CreateSnapshot(ctx, req, settings.GRPC...)
488		return err
489	}, opts...)
490	if err != nil {
491		return nil, err
492	}
493	return resp, nil
494}
495
496// UpdateSnapshot updates an existing snapshot. Note that certain properties of a snapshot
497// are not modifiable.
498// NOTE:  The style guide requires body: "snapshot" instead of body: "*".
499// Keeping the latter for internal consistency in V1, however it should be
500// corrected in V2.  See
501// https://cloud.google.com/apis/design/standard_methods#update for details.
502func (c *SubscriberClient) UpdateSnapshot(ctx context.Context, req *pubsubpb.UpdateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error) {
503	ctx = insertMetadata(ctx, c.Metadata)
504	opts = append(c.CallOptions.UpdateSnapshot[0:len(c.CallOptions.UpdateSnapshot):len(c.CallOptions.UpdateSnapshot)], opts...)
505	var resp *pubsubpb.Snapshot
506	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
507		var err error
508		resp, err = c.subscriberClient.UpdateSnapshot(ctx, req, settings.GRPC...)
509		return err
510	}, opts...)
511	if err != nil {
512		return nil, err
513	}
514	return resp, nil
515}
516
517// DeleteSnapshot removes an existing snapshot. All messages retained in the snapshot
518// are immediately dropped. After a snapshot is deleted, a new one may be
519// created with the same name, but the new one has no association with the old
520// snapshot or its subscription, unless the same subscription is specified.
521func (c *SubscriberClient) DeleteSnapshot(ctx context.Context, req *pubsubpb.DeleteSnapshotRequest, opts ...gax.CallOption) error {
522	ctx = insertMetadata(ctx, c.Metadata)
523	opts = append(c.CallOptions.DeleteSnapshot[0:len(c.CallOptions.DeleteSnapshot):len(c.CallOptions.DeleteSnapshot)], opts...)
524	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
525		var err error
526		_, err = c.subscriberClient.DeleteSnapshot(ctx, req, settings.GRPC...)
527		return err
528	}, opts...)
529	return err
530}
531
532// Seek seeks an existing subscription to a point in time or to a given snapshot,
533// whichever is provided in the request.
534func (c *SubscriberClient) Seek(ctx context.Context, req *pubsubpb.SeekRequest, opts ...gax.CallOption) (*pubsubpb.SeekResponse, error) {
535	ctx = insertMetadata(ctx, c.Metadata)
536	opts = append(c.CallOptions.Seek[0:len(c.CallOptions.Seek):len(c.CallOptions.Seek)], opts...)
537	var resp *pubsubpb.SeekResponse
538	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
539		var err error
540		resp, err = c.subscriberClient.Seek(ctx, req, settings.GRPC...)
541		return err
542	}, opts...)
543	if err != nil {
544		return nil, err
545	}
546	return resp, nil
547}
548
549// SnapshotIterator manages a stream of *pubsubpb.Snapshot.
550type SnapshotIterator struct {
551	items    []*pubsubpb.Snapshot
552	pageInfo *iterator.PageInfo
553	nextFunc func() error
554
555	// InternalFetch is for use by the Google Cloud Libraries only.
556	// It is not part of the stable interface of this package.
557	//
558	// InternalFetch returns results from a single call to the underlying RPC.
559	// The number of results is no greater than pageSize.
560	// If there are no more results, nextPageToken is empty and err is nil.
561	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Snapshot, nextPageToken string, err error)
562}
563
564// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
565func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
566	return it.pageInfo
567}
568
569// Next returns the next result. Its second return value is iterator.Done if there are no more
570// results. Once Next returns Done, all subsequent calls will return Done.
571func (it *SnapshotIterator) Next() (*pubsubpb.Snapshot, error) {
572	var item *pubsubpb.Snapshot
573	if err := it.nextFunc(); err != nil {
574		return item, err
575	}
576	item = it.items[0]
577	it.items = it.items[1:]
578	return item, nil
579}
580
581func (it *SnapshotIterator) bufLen() int {
582	return len(it.items)
583}
584
585func (it *SnapshotIterator) takeBuf() interface{} {
586	b := it.items
587	it.items = nil
588	return b
589}
590
591// SubscriptionIterator manages a stream of *pubsubpb.Subscription.
592type SubscriptionIterator struct {
593	items    []*pubsubpb.Subscription
594	pageInfo *iterator.PageInfo
595	nextFunc func() error
596
597	// InternalFetch is for use by the Google Cloud Libraries only.
598	// It is not part of the stable interface of this package.
599	//
600	// InternalFetch returns results from a single call to the underlying RPC.
601	// The number of results is no greater than pageSize.
602	// If there are no more results, nextPageToken is empty and err is nil.
603	InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Subscription, nextPageToken string, err error)
604}
605
606// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
607func (it *SubscriptionIterator) PageInfo() *iterator.PageInfo {
608	return it.pageInfo
609}
610
611// Next returns the next result. Its second return value is iterator.Done if there are no more
612// results. Once Next returns Done, all subsequent calls will return Done.
613func (it *SubscriptionIterator) Next() (*pubsubpb.Subscription, error) {
614	var item *pubsubpb.Subscription
615	if err := it.nextFunc(); err != nil {
616		return item, err
617	}
618	item = it.items[0]
619	it.items = it.items[1:]
620	return item, nil
621}
622
623func (it *SubscriptionIterator) bufLen() int {
624	return len(it.items)
625}
626
627func (it *SubscriptionIterator) takeBuf() interface{} {
628	b := it.items
629	it.items = nil
630	return b
631}
632