1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"strings"
23	"sync"
24	"time"
25
26	"cloud.google.com/go/iam"
27	"cloud.google.com/go/internal/optional"
28	"github.com/golang/protobuf/ptypes"
29	durpb "github.com/golang/protobuf/ptypes/duration"
30	gax "github.com/googleapis/gax-go/v2"
31	"golang.org/x/sync/errgroup"
32	pb "google.golang.org/genproto/googleapis/pubsub/v1"
33	fmpb "google.golang.org/genproto/protobuf/field_mask"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36)
37
38// Subscription is a reference to a PubSub subscription.
39type Subscription struct {
40	c *Client
41
42	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
43	name string
44
45	// Settings for pulling messages. Configure these before calling Receive.
46	ReceiveSettings ReceiveSettings
47
48	mu            sync.Mutex
49	receiveActive bool
50}
51
52// Subscription creates a reference to a subscription.
53func (c *Client) Subscription(id string) *Subscription {
54	return c.SubscriptionInProject(id, c.projectID)
55}
56
57// SubscriptionInProject creates a reference to a subscription in a given project.
58func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
59	return &Subscription{
60		c:    c,
61		name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
62	}
63}
64
65// String returns the globally unique printable name of the subscription.
66func (s *Subscription) String() string {
67	return s.name
68}
69
70// ID returns the unique identifier of the subscription within its project.
71func (s *Subscription) ID() string {
72	slash := strings.LastIndex(s.name, "/")
73	if slash == -1 {
74		// name is not a fully-qualified name.
75		panic("bad subscription name")
76	}
77	return s.name[slash+1:]
78}
79
80// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
81func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
82	it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
83		Project: c.fullyQualifiedProjectName(),
84	})
85	return &SubscriptionIterator{
86		c: c,
87		next: func() (string, error) {
88			sub, err := it.Next()
89			if err != nil {
90				return "", err
91			}
92			return sub.Name, nil
93		},
94	}
95}
96
97// SubscriptionIterator is an iterator that returns a series of subscriptions.
98type SubscriptionIterator struct {
99	c    *Client
100	next func() (string, error)
101}
102
103// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
104func (subs *SubscriptionIterator) Next() (*Subscription, error) {
105	subName, err := subs.next()
106	if err != nil {
107		return nil, err
108	}
109	return &Subscription{c: subs.c, name: subName}, nil
110}
111
112// PushConfig contains configuration for subscriptions that operate in push mode.
113type PushConfig struct {
114	// A URL locating the endpoint to which messages should be pushed.
115	Endpoint string
116
117	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
118	Attributes map[string]string
119}
120
121func (pc *PushConfig) toProto() *pb.PushConfig {
122	return &pb.PushConfig{
123		Attributes:   pc.Attributes,
124		PushEndpoint: pc.Endpoint,
125	}
126}
127
128// SubscriptionConfig describes the configuration of a subscription.
129type SubscriptionConfig struct {
130	Topic      *Topic
131	PushConfig PushConfig
132
133	// The default maximum time after a subscriber receives a message before
134	// the subscriber should acknowledge the message. Note: messages which are
135	// obtained via Subscription.Receive need not be acknowledged within this
136	// deadline, as the deadline will be automatically extended.
137	AckDeadline time.Duration
138
139	// Whether to retain acknowledged messages. If true, acknowledged messages
140	// will not be expunged until they fall out of the RetentionDuration window.
141	RetainAckedMessages bool
142
143	// How long to retain messages in backlog, from the time of publish. If
144	// RetainAckedMessages is true, this duration affects the retention of
145	// acknowledged messages, otherwise only unacknowledged messages are retained.
146	// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
147	RetentionDuration time.Duration
148
149	// Expiration policy specifies the conditions for a subscription's expiration.
150	// A subscription is considered active as long as any connected subscriber is
151	// successfully consuming messages from the subscription or is issuing
152	// operations on the subscription. If `expiration_policy` is not set, a
153	// *default policy* with `ttl` of 31 days will be used. The minimum allowed
154	// value for `expiration_policy.ttl` is 1 day.
155	//
156	// It is EXPERIMENTAL and subject to change or removal without notice.
157	ExpirationPolicy time.Duration
158
159	// The set of labels for the subscription.
160	Labels map[string]string
161}
162
163func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
164	var pbPushConfig *pb.PushConfig
165	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 {
166		pbPushConfig = &pb.PushConfig{
167			Attributes:   cfg.PushConfig.Attributes,
168			PushEndpoint: cfg.PushConfig.Endpoint,
169		}
170	}
171	var retentionDuration *durpb.Duration
172	if cfg.RetentionDuration != 0 {
173		retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
174	}
175	return &pb.Subscription{
176		Name:                     name,
177		Topic:                    cfg.Topic.name,
178		PushConfig:               pbPushConfig,
179		AckDeadlineSeconds:       trunc32(int64(cfg.AckDeadline.Seconds())),
180		RetainAckedMessages:      cfg.RetainAckedMessages,
181		MessageRetentionDuration: retentionDuration,
182		Labels:                   cfg.Labels,
183		ExpirationPolicy:         expirationPolicyToProto(cfg.ExpirationPolicy),
184	}
185}
186
187func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
188	rd := time.Hour * 24 * 7
189	var err error
190	if pbSub.MessageRetentionDuration != nil {
191		rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
192		if err != nil {
193			return SubscriptionConfig{}, err
194		}
195	}
196	var expirationPolicy time.Duration
197	if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
198		expirationPolicy, err = ptypes.Duration(ttl)
199		if err != nil {
200			return SubscriptionConfig{}, err
201		}
202	}
203	return SubscriptionConfig{
204		Topic:       newTopic(c, pbSub.Topic),
205		AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
206		PushConfig: PushConfig{
207			Endpoint:   pbSub.PushConfig.PushEndpoint,
208			Attributes: pbSub.PushConfig.Attributes,
209		},
210		RetainAckedMessages: pbSub.RetainAckedMessages,
211		RetentionDuration:   rd,
212		Labels:              pbSub.Labels,
213		ExpirationPolicy:    expirationPolicy,
214	}, nil
215}
216
217// ReceiveSettings configure the Receive method.
218// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
219type ReceiveSettings struct {
220	// MaxExtension is the maximum period for which the Subscription should
221	// automatically extend the ack deadline for each message.
222	//
223	// The Subscription will automatically extend the ack deadline of all
224	// fetched Messages up to the duration specified. Automatic deadline
225	// extension beyond the initial receipt may be disabled by specifying a
226	// duration less than 0.
227	MaxExtension time.Duration
228
229	// MaxOutstandingMessages is the maximum number of unprocessed messages
230	// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
231	// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
232	// If the value is negative, then there will be no limit on the number of
233	// unprocessed messages.
234	MaxOutstandingMessages int
235
236	// MaxOutstandingBytes is the maximum size of unprocessed messages
237	// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
238	// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
239	// the value is negative, then there will be no limit on the number of bytes
240	// for unprocessed messages.
241	MaxOutstandingBytes int
242
243	// NumGoroutines is the number of goroutines Receive will spawn to pull
244	// messages concurrently. If NumGoroutines is less than 1, it will be treated
245	// as if it were DefaultReceiveSettings.NumGoroutines.
246	//
247	// NumGoroutines does not limit the number of messages that can be processed
248	// concurrently. Even with one goroutine, many messages might be processed at
249	// once, because that goroutine may continually receive messages and invoke the
250	// function passed to Receive on them. To limit the number of messages being
251	// processed concurrently, set MaxOutstandingMessages.
252	NumGoroutines int
253
254	// If Synchronous is true, then no more than MaxOutstandingMessages will be in
255	// memory at one time. (In contrast, when Synchronous is false, more than
256	// MaxOutstandingMessages may have been received from the service and in memory
257	// before being processed.) MaxOutstandingBytes still refers to the total bytes
258	// processed, rather than in memory. NumGoroutines is ignored.
259	// The default is false.
260	Synchronous bool
261}
262
263// For synchronous receive, the time to wait if we are already processing
264// MaxOutstandingMessages. There is no point calling Pull and asking for zero
265// messages, so we pause to allow some message-processing callbacks to finish.
266//
267// The wait time is large enough to avoid consuming significant CPU, but
268// small enough to provide decent throughput. Users who want better
269// throughput should not be using synchronous mode.
270//
271// Waiting might seem like polling, so it's natural to think we could do better by
272// noticing when a callback is finished and immediately calling Pull. But if
273// callbacks finish in quick succession, this will result in frequent Pull RPCs that
274// request a single message, which wastes network bandwidth. Better to wait for a few
275// callbacks to finish, so we make fewer RPCs fetching more messages.
276//
277// This value is unexported so the user doesn't have another knob to think about. Note that
278// it is the same value as the one used for nackTicker, so it matches this client's
279// idea of a duration that is short, but not so short that we perform excessive RPCs.
280const synchronousWaitTime = 100 * time.Millisecond
281
282// This is a var so that tests can change it.
283var minAckDeadline = 10 * time.Second
284
285// DefaultReceiveSettings holds the default values for ReceiveSettings.
286var DefaultReceiveSettings = ReceiveSettings{
287	MaxExtension:           10 * time.Minute,
288	MaxOutstandingMessages: 1000,
289	MaxOutstandingBytes:    1e9, // 1G
290	NumGoroutines:          1,
291}
292
293// Delete deletes the subscription.
294func (s *Subscription) Delete(ctx context.Context) error {
295	return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
296}
297
298// Exists reports whether the subscription exists on the server.
299func (s *Subscription) Exists(ctx context.Context) (bool, error) {
300	_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
301	if err == nil {
302		return true, nil
303	}
304	if grpc.Code(err) == codes.NotFound {
305		return false, nil
306	}
307	return false, err
308}
309
310// Config fetches the current configuration for the subscription.
311func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
312	pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
313	if err != nil {
314		return SubscriptionConfig{}, err
315	}
316	cfg, err := protoToSubscriptionConfig(pbSub, s.c)
317	if err != nil {
318		return SubscriptionConfig{}, err
319	}
320	return cfg, nil
321}
322
323// SubscriptionConfigToUpdate describes how to update a subscription.
324type SubscriptionConfigToUpdate struct {
325	// If non-nil, the push config is changed.
326	PushConfig *PushConfig
327
328	// If non-zero, the ack deadline is changed.
329	AckDeadline time.Duration
330
331	// If set, RetainAckedMessages is changed.
332	RetainAckedMessages optional.Bool
333
334	// If non-zero, RetentionDuration is changed.
335	RetentionDuration time.Duration
336
337	// If non-zero, Expiration is changed.
338	ExpirationPolicy time.Duration
339
340	// If non-nil, the current set of labels is completely
341	// replaced by the new set.
342	// This field has beta status. It is not subject to the stability guarantee
343	// and may change.
344	Labels map[string]string
345}
346
347// Update changes an existing subscription according to the fields set in cfg.
348// It returns the new SubscriptionConfig.
349//
350// Update returns an error if no fields were modified.
351func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
352	req := s.updateRequest(&cfg)
353	if err := cfg.validate(); err != nil {
354		return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err)
355	}
356	if len(req.UpdateMask.Paths) == 0 {
357		return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
358	}
359	rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
360	if err != nil {
361		return SubscriptionConfig{}, err
362	}
363	return protoToSubscriptionConfig(rpsub, s.c)
364}
365
366func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
367	psub := &pb.Subscription{Name: s.name}
368	var paths []string
369	if cfg.PushConfig != nil {
370		psub.PushConfig = cfg.PushConfig.toProto()
371		paths = append(paths, "push_config")
372	}
373	if cfg.AckDeadline != 0 {
374		psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
375		paths = append(paths, "ack_deadline_seconds")
376	}
377	if cfg.RetainAckedMessages != nil {
378		psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
379		paths = append(paths, "retain_acked_messages")
380	}
381	if cfg.RetentionDuration != 0 {
382		psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
383		paths = append(paths, "message_retention_duration")
384	}
385	if cfg.ExpirationPolicy != 0 {
386		psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
387		paths = append(paths, "expiration_policy")
388	}
389	if cfg.Labels != nil {
390		psub.Labels = cfg.Labels
391		paths = append(paths, "labels")
392	}
393	return &pb.UpdateSubscriptionRequest{
394		Subscription: psub,
395		UpdateMask:   &fmpb.FieldMask{Paths: paths},
396	}
397}
398
399const (
400	// The minimum expiration policy duration is 1 day as per:
401	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
402	minExpirationPolicy = 24 * time.Hour
403
404	// If an expiration policy is not specified, the default of 31 days is used as per:
405	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
406	defaultExpirationPolicy = 31 * 24 * time.Hour
407)
408
409func (cfg *SubscriptionConfigToUpdate) validate() error {
410	if cfg == nil || cfg.ExpirationPolicy == 0 {
411		return nil
412	}
413	if policy, min := cfg.ExpirationPolicy, minExpirationPolicy; policy < min {
414		return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min)
415	}
416	return nil
417}
418
419func expirationPolicyToProto(expirationPolicy time.Duration) *pb.ExpirationPolicy {
420	if expirationPolicy == 0 {
421		return nil
422	}
423	return &pb.ExpirationPolicy{
424		Ttl: ptypes.DurationProto(expirationPolicy),
425	}
426}
427
428// IAM returns the subscription's IAM handle.
429func (s *Subscription) IAM() *iam.Handle {
430	return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
431}
432
433// CreateSubscription creates a new subscription on a topic.
434//
435// id is the name of the subscription to create. It must start with a letter,
436// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
437// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
438// must be between 3 and 255 characters in length, and must not start with
439// "goog".
440//
441// cfg.Topic is the topic from which the subscription should receive messages. It
442// need not belong to the same project as the subscription. This field is required.
443//
444// cfg.AckDeadline is the maximum time after a subscriber receives a message before
445// the subscriber should acknowledge the message. It must be between 10 and 600
446// seconds (inclusive), and is rounded down to the nearest second. If the
447// provided ackDeadline is 0, then the default value of 10 seconds is used.
448// Note: messages which are obtained via Subscription.Receive need not be
449// acknowledged within this deadline, as the deadline will be automatically
450// extended.
451//
452// cfg.PushConfig may be set to configure this subscription for push delivery.
453//
454// If the subscription already exists an error will be returned.
455func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
456	if cfg.Topic == nil {
457		return nil, errors.New("pubsub: require non-nil Topic")
458	}
459	if cfg.AckDeadline == 0 {
460		cfg.AckDeadline = 10 * time.Second
461	}
462	if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
463		return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
464	}
465
466	sub := c.Subscription(id)
467	_, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
468	if err != nil {
469		return nil, err
470	}
471	return sub, nil
472}
473
474var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
475
476// Receive calls f with the outstanding messages from the subscription.
477// It blocks until ctx is done, or the service returns a non-retryable error.
478//
479// The standard way to terminate a Receive is to cancel its context:
480//
481//   cctx, cancel := context.WithCancel(ctx)
482//   err := sub.Receive(cctx, callback)
483//   // Call cancel from callback, or another goroutine.
484//
485// If the service returns a non-retryable error, Receive returns that error after
486// all of the outstanding calls to f have returned. If ctx is done, Receive
487// returns nil after all of the outstanding calls to f have returned and
488// all messages have been acknowledged or have expired.
489//
490// Receive calls f concurrently from multiple goroutines. It is encouraged to
491// process messages synchronously in f, even if that processing is relatively
492// time-consuming; Receive will spawn new goroutines for incoming messages,
493// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
494//
495// The context passed to f will be canceled when ctx is Done or there is a
496// fatal service error.
497//
498// Receive will send an ack deadline extension on message receipt, then
499// automatically extend the ack deadline of all fetched Messages up to the
500// period specified by s.ReceiveSettings.MaxExtension.
501//
502// Each Subscription may have only one invocation of Receive active at a time.
503func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
504	s.mu.Lock()
505	if s.receiveActive {
506		s.mu.Unlock()
507		return errReceiveInProgress
508	}
509	s.receiveActive = true
510	s.mu.Unlock()
511	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
512
513	maxCount := s.ReceiveSettings.MaxOutstandingMessages
514	if maxCount == 0 {
515		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
516	}
517	maxBytes := s.ReceiveSettings.MaxOutstandingBytes
518	if maxBytes == 0 {
519		maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
520	}
521	maxExt := s.ReceiveSettings.MaxExtension
522	if maxExt == 0 {
523		maxExt = DefaultReceiveSettings.MaxExtension
524	} else if maxExt < 0 {
525		// If MaxExtension is negative, disable automatic extension.
526		maxExt = 0
527	}
528	var numGoroutines int
529	switch {
530	case s.ReceiveSettings.Synchronous:
531		numGoroutines = 1
532	case s.ReceiveSettings.NumGoroutines >= 1:
533		numGoroutines = s.ReceiveSettings.NumGoroutines
534	default:
535		numGoroutines = DefaultReceiveSettings.NumGoroutines
536	}
537	// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
538	po := &pullOptions{
539		maxExtension: maxExt,
540		maxPrefetch:  trunc32(int64(maxCount)),
541		synchronous:  s.ReceiveSettings.Synchronous,
542	}
543	fc := newFlowController(maxCount, maxBytes)
544
545	// Wait for all goroutines started by Receive to return, so instead of an
546	// obscure goroutine leak we have an obvious blocked call to Receive.
547	group, gctx := errgroup.WithContext(ctx)
548	for i := 0; i < numGoroutines; i++ {
549		group.Go(func() error {
550			return s.receive(gctx, po, fc, f)
551		})
552	}
553	return group.Wait()
554}
555
556func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
557	// Cancel a sub-context when we return, to kick the context-aware callbacks
558	// and the goroutine below.
559	ctx2, cancel := context.WithCancel(ctx)
560	// The iterator does not use the context passed to Receive. If it did, canceling
561	// that context would immediately stop the iterator without waiting for unacked
562	// messages.
563	iter := newMessageIterator(s.c.subc, s.name, po)
564
565	// We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
566	// and group.Wait cannot be called concurrently with group.Go. We give each receive() its
567	// own WaitGroup instead.
568	// Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
569	// to be called after all Adds.
570	var wg sync.WaitGroup
571	wg.Add(1)
572	go func() {
573		<-ctx2.Done()
574		// Call stop when Receive's context is done.
575		// Stop will block until all outstanding messages have been acknowledged
576		// or there was a fatal service error.
577		iter.stop()
578		wg.Done()
579	}()
580	defer wg.Wait()
581
582	defer cancel()
583	for {
584		var maxToPull int32 // maximum number of messages to pull
585		if po.synchronous {
586			if po.maxPrefetch < 0 {
587				// If there is no limit on the number of messages to pull, use a reasonable default.
588				maxToPull = 1000
589			} else {
590				// Limit the number of messages in memory to MaxOutstandingMessages
591				// (here, po.maxPrefetch). For each message currently in memory, we have
592				// called fc.acquire but not fc.release: this is fc.count(). The next
593				// call to Pull should fetch no more than the difference between these
594				// values.
595				maxToPull = po.maxPrefetch - int32(fc.count())
596				if maxToPull <= 0 {
597					// Wait for some callbacks to finish.
598					if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
599						// Return nil if the context is done, not err.
600						return nil
601					}
602					continue
603				}
604			}
605		}
606		msgs, err := iter.receive(maxToPull)
607		if err == io.EOF {
608			return nil
609		}
610		if err != nil {
611			return err
612		}
613		for i, msg := range msgs {
614			msg := msg
615			// TODO(jba): call acquire closer to when the message is allocated.
616			if err := fc.acquire(ctx, len(msg.Data)); err != nil {
617				// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
618				for _, m := range msgs[i:] {
619					m.Nack()
620				}
621				// Return nil if the context is done, not err.
622				return nil
623			}
624			old := msg.doneFunc
625			msgLen := len(msg.Data)
626			msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
627				defer fc.release(msgLen)
628				old(ackID, ack, receiveTime)
629			}
630			wg.Add(1)
631			go func() {
632				defer wg.Done()
633				f(ctx2, msg)
634			}()
635		}
636	}
637}
638
639type pullOptions struct {
640	maxExtension time.Duration
641	maxPrefetch  int32
642	// If true, use unary Pull instead of StreamingPull, and never pull more
643	// than maxPrefetch messages.
644	synchronous bool
645}
646