1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"strings"
23	"sync"
24	"time"
25
26	"cloud.google.com/go/iam"
27	"cloud.google.com/go/internal/optional"
28	"github.com/golang/protobuf/ptypes"
29	durpb "github.com/golang/protobuf/ptypes/duration"
30	gax "github.com/googleapis/gax-go/v2"
31	"golang.org/x/sync/errgroup"
32	pb "google.golang.org/genproto/googleapis/pubsub/v1"
33	fmpb "google.golang.org/genproto/protobuf/field_mask"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/status"
36)
37
38// Subscription is a reference to a PubSub subscription.
39type Subscription struct {
40	c *Client
41
42	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
43	name string
44
45	// Settings for pulling messages. Configure these before calling Receive.
46	ReceiveSettings ReceiveSettings
47
48	mu            sync.Mutex
49	receiveActive bool
50}
51
52// Subscription creates a reference to a subscription.
53func (c *Client) Subscription(id string) *Subscription {
54	return c.SubscriptionInProject(id, c.projectID)
55}
56
57// SubscriptionInProject creates a reference to a subscription in a given project.
58func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
59	return &Subscription{
60		c:    c,
61		name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
62	}
63}
64
65// String returns the globally unique printable name of the subscription.
66func (s *Subscription) String() string {
67	return s.name
68}
69
70// ID returns the unique identifier of the subscription within its project.
71func (s *Subscription) ID() string {
72	slash := strings.LastIndex(s.name, "/")
73	if slash == -1 {
74		// name is not a fully-qualified name.
75		panic("bad subscription name")
76	}
77	return s.name[slash+1:]
78}
79
80// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
81func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
82	it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
83		Project: c.fullyQualifiedProjectName(),
84	})
85	return &SubscriptionIterator{
86		c: c,
87		next: func() (string, error) {
88			sub, err := it.Next()
89			if err != nil {
90				return "", err
91			}
92			return sub.Name, nil
93		},
94	}
95}
96
97// SubscriptionIterator is an iterator that returns a series of subscriptions.
98type SubscriptionIterator struct {
99	c    *Client
100	next func() (string, error)
101}
102
103// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
104func (subs *SubscriptionIterator) Next() (*Subscription, error) {
105	subName, err := subs.next()
106	if err != nil {
107		return nil, err
108	}
109	return &Subscription{c: subs.c, name: subName}, nil
110}
111
112// PushConfig contains configuration for subscriptions that operate in push mode.
113type PushConfig struct {
114	// A URL locating the endpoint to which messages should be pushed.
115	Endpoint string
116
117	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
118	Attributes map[string]string
119
120	// AuthenticationMethod is used by push endpoints to verify the source
121	// of push requests.
122	// It can be used with push endpoints that are private by default to
123	// allow requests only from the Cloud Pub/Sub system, for example.
124	// This field is optional and should be set only by users interested in
125	// authenticated push.
126	AuthenticationMethod AuthenticationMethod
127}
128
129func (pc *PushConfig) toProto() *pb.PushConfig {
130	if pc == nil {
131		return nil
132	}
133	pbCfg := &pb.PushConfig{
134		Attributes:   pc.Attributes,
135		PushEndpoint: pc.Endpoint,
136	}
137	if authMethod := pc.AuthenticationMethod; authMethod != nil {
138		switch am := authMethod.(type) {
139		case *OIDCToken:
140			pbCfg.AuthenticationMethod = am.toProto()
141		default: // TODO: add others here when GAIC adds more definitions.
142		}
143	}
144	return pbCfg
145}
146
147// AuthenticationMethod is used by push points to verify the source of push requests.
148// This interface defines fields that are part of a closed alpha that may not be accessible
149// to all users.
150type AuthenticationMethod interface {
151	isAuthMethod() bool
152}
153
154// OIDCToken allows PushConfigs to be authenticated using
155// the OpenID Connect protocol https://openid.net/connect/
156type OIDCToken struct {
157	// Audience to be used when generating OIDC token. The audience claim
158	// identifies the recipients that the JWT is intended for. The audience
159	// value is a single case-sensitive string. Having multiple values (array)
160	// for the audience field is not supported. More info about the OIDC JWT
161	// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
162	// Note: if not specified, the Push endpoint URL will be used.
163	Audience string
164
165	// The service account email to be used for generating the OpenID Connect token.
166	// The caller of:
167	//  * CreateSubscription
168	//  * UpdateSubscription
169	//  * ModifyPushConfig
170	// calls must have the iam.serviceAccounts.actAs permission for the service account.
171	// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
172	ServiceAccountEmail string
173}
174
175var _ AuthenticationMethod = (*OIDCToken)(nil)
176
177func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
178
179func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
180	if oidcToken == nil {
181		return nil
182	}
183	return &pb.PushConfig_OidcToken_{
184		OidcToken: &pb.PushConfig_OidcToken{
185			Audience:            oidcToken.Audience,
186			ServiceAccountEmail: oidcToken.ServiceAccountEmail,
187		},
188	}
189}
190
191// SubscriptionConfig describes the configuration of a subscription.
192type SubscriptionConfig struct {
193	Topic      *Topic
194	PushConfig PushConfig
195
196	// The default maximum time after a subscriber receives a message before
197	// the subscriber should acknowledge the message. Note: messages which are
198	// obtained via Subscription.Receive need not be acknowledged within this
199	// deadline, as the deadline will be automatically extended.
200	AckDeadline time.Duration
201
202	// Whether to retain acknowledged messages. If true, acknowledged messages
203	// will not be expunged until they fall out of the RetentionDuration window.
204	RetainAckedMessages bool
205
206	// How long to retain messages in backlog, from the time of publish. If
207	// RetainAckedMessages is true, this duration affects the retention of
208	// acknowledged messages, otherwise only unacknowledged messages are retained.
209	// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
210	RetentionDuration time.Duration
211
212	// Expiration policy specifies the conditions for a subscription's expiration.
213	// A subscription is considered active as long as any connected subscriber is
214	// successfully consuming messages from the subscription or is issuing
215	// operations on the subscription. If `expiration_policy` is not set, a
216	// *default policy* with `ttl` of 31 days will be used. The minimum allowed
217	// value for `expiration_policy.ttl` is 1 day.
218	//
219	// Use time.Duration(0) to indicate that the subscription should never expire.
220	ExpirationPolicy optional.Duration
221
222	// The set of labels for the subscription.
223	Labels map[string]string
224
225	// DeadLetterPolicy specifies the conditions for dead lettering messages in
226	// a subscription. If not set, dead lettering is disabled.
227	//
228	// It is EXPERIMENTAL and a part of a closed alpha that may not be
229	// accessible to all users. This field is subject to change or removal
230	// without notice.
231	DeadLetterPolicy *DeadLetterPolicy
232}
233
234func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
235	var pbPushConfig *pb.PushConfig
236	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
237		pbPushConfig = cfg.PushConfig.toProto()
238	}
239	var retentionDuration *durpb.Duration
240	if cfg.RetentionDuration != 0 {
241		retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
242	}
243	var pbDeadLetter *pb.DeadLetterPolicy
244	if cfg.DeadLetterPolicy != nil {
245		pbDeadLetter = cfg.DeadLetterPolicy.toProto()
246	}
247	return &pb.Subscription{
248		Name:                     name,
249		Topic:                    cfg.Topic.name,
250		PushConfig:               pbPushConfig,
251		AckDeadlineSeconds:       trunc32(int64(cfg.AckDeadline.Seconds())),
252		RetainAckedMessages:      cfg.RetainAckedMessages,
253		MessageRetentionDuration: retentionDuration,
254		Labels:                   cfg.Labels,
255		ExpirationPolicy:         expirationPolicyToProto(cfg.ExpirationPolicy),
256		DeadLetterPolicy:         pbDeadLetter,
257	}
258}
259
260func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
261	rd := time.Hour * 24 * 7
262	var err error
263	if pbSub.MessageRetentionDuration != nil {
264		rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
265		if err != nil {
266			return SubscriptionConfig{}, err
267		}
268	}
269	var expirationPolicy time.Duration
270	if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
271		expirationPolicy, err = ptypes.Duration(ttl)
272		if err != nil {
273			return SubscriptionConfig{}, err
274		}
275	}
276	dlp := protoToDLP(pbSub.DeadLetterPolicy)
277	subC := SubscriptionConfig{
278		Topic:               newTopic(c, pbSub.Topic),
279		AckDeadline:         time.Second * time.Duration(pbSub.AckDeadlineSeconds),
280		RetainAckedMessages: pbSub.RetainAckedMessages,
281		RetentionDuration:   rd,
282		Labels:              pbSub.Labels,
283		ExpirationPolicy:    expirationPolicy,
284		DeadLetterPolicy:    dlp,
285	}
286	pc := protoToPushConfig(pbSub.PushConfig)
287	if pc != nil {
288		subC.PushConfig = *pc
289	}
290	return subC, nil
291}
292
293func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
294	if pbPc == nil {
295		return nil
296	}
297	pc := &PushConfig{
298		Endpoint:   pbPc.PushEndpoint,
299		Attributes: pbPc.Attributes,
300	}
301	if am := pbPc.AuthenticationMethod; am != nil {
302		if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
303			pc.AuthenticationMethod = &OIDCToken{
304				Audience:            oidcToken.OidcToken.GetAudience(),
305				ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
306			}
307		}
308	}
309	return pc
310}
311
312// DeadLetterPolicy specifies the conditions for dead lettering messages in
313// a subscription.
314//
315// It is EXPERIMENTAL and a part of a closed alpha that may not be
316// accessible to all users.
317type DeadLetterPolicy struct {
318	DeadLetterTopic     string
319	MaxDeliveryAttempts int
320}
321
322func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
323	if dlp == nil {
324		return nil
325	}
326	return &pb.DeadLetterPolicy{
327		DeadLetterTopic:     dlp.DeadLetterTopic,
328		MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
329	}
330}
331func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
332	if pbDLP == nil {
333		return nil
334	}
335	return &DeadLetterPolicy{
336		DeadLetterTopic:     pbDLP.GetDeadLetterTopic(),
337		MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
338	}
339}
340
341// ReceiveSettings configure the Receive method.
342// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
343type ReceiveSettings struct {
344	// MaxExtension is the maximum period for which the Subscription should
345	// automatically extend the ack deadline for each message.
346	//
347	// The Subscription will automatically extend the ack deadline of all
348	// fetched Messages up to the duration specified. Automatic deadline
349	// extension beyond the initial receipt may be disabled by specifying a
350	// duration less than 0.
351	MaxExtension time.Duration
352
353	// MaxExtensionPeriod is the maximum duration by which to extend the ack
354	// deadline at a time. The ack deadline will continue to be extended by up
355	// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
356	// bounds the maximum amount of time before a message redelivery in the
357	// event the subscriber fails to extend the deadline.
358	//
359	// MaxExtensionPeriod configuration can be disabled by specifying a
360	// duration less than (or equal to) 0.
361	MaxExtensionPeriod time.Duration
362
363	// MaxOutstandingMessages is the maximum number of unprocessed messages
364	// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
365	// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
366	// If the value is negative, then there will be no limit on the number of
367	// unprocessed messages.
368	MaxOutstandingMessages int
369
370	// MaxOutstandingBytes is the maximum size of unprocessed messages
371	// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
372	// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
373	// the value is negative, then there will be no limit on the number of bytes
374	// for unprocessed messages.
375	MaxOutstandingBytes int
376
377	// NumGoroutines is the number of goroutines Receive will spawn to pull
378	// messages concurrently. If NumGoroutines is less than 1, it will be treated
379	// as if it were DefaultReceiveSettings.NumGoroutines.
380	//
381	// NumGoroutines does not limit the number of messages that can be processed
382	// concurrently. Even with one goroutine, many messages might be processed at
383	// once, because that goroutine may continually receive messages and invoke the
384	// function passed to Receive on them. To limit the number of messages being
385	// processed concurrently, set MaxOutstandingMessages.
386	NumGoroutines int
387
388	// If Synchronous is true, then no more than MaxOutstandingMessages will be in
389	// memory at one time. (In contrast, when Synchronous is false, more than
390	// MaxOutstandingMessages may have been received from the service and in memory
391	// before being processed.) MaxOutstandingBytes still refers to the total bytes
392	// processed, rather than in memory. NumGoroutines is ignored.
393	// The default is false.
394	Synchronous bool
395}
396
397// For synchronous receive, the time to wait if we are already processing
398// MaxOutstandingMessages. There is no point calling Pull and asking for zero
399// messages, so we pause to allow some message-processing callbacks to finish.
400//
401// The wait time is large enough to avoid consuming significant CPU, but
402// small enough to provide decent throughput. Users who want better
403// throughput should not be using synchronous mode.
404//
405// Waiting might seem like polling, so it's natural to think we could do better by
406// noticing when a callback is finished and immediately calling Pull. But if
407// callbacks finish in quick succession, this will result in frequent Pull RPCs that
408// request a single message, which wastes network bandwidth. Better to wait for a few
409// callbacks to finish, so we make fewer RPCs fetching more messages.
410//
411// This value is unexported so the user doesn't have another knob to think about. Note that
412// it is the same value as the one used for nackTicker, so it matches this client's
413// idea of a duration that is short, but not so short that we perform excessive RPCs.
414const synchronousWaitTime = 100 * time.Millisecond
415
416// This is a var so that tests can change it.
417var minAckDeadline = 10 * time.Second
418
419// DefaultReceiveSettings holds the default values for ReceiveSettings.
420var DefaultReceiveSettings = ReceiveSettings{
421	MaxExtension:           60 * time.Minute,
422	MaxExtensionPeriod:     -1,
423	MaxOutstandingMessages: 1000,
424	MaxOutstandingBytes:    1e9, // 1G
425	NumGoroutines:          1,
426}
427
428// Delete deletes the subscription.
429func (s *Subscription) Delete(ctx context.Context) error {
430	return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
431}
432
433// Exists reports whether the subscription exists on the server.
434func (s *Subscription) Exists(ctx context.Context) (bool, error) {
435	_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
436	if err == nil {
437		return true, nil
438	}
439	if status.Code(err) == codes.NotFound {
440		return false, nil
441	}
442	return false, err
443}
444
445// Config fetches the current configuration for the subscription.
446func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
447	pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
448	if err != nil {
449		return SubscriptionConfig{}, err
450	}
451	cfg, err := protoToSubscriptionConfig(pbSub, s.c)
452	if err != nil {
453		return SubscriptionConfig{}, err
454	}
455	return cfg, nil
456}
457
458// SubscriptionConfigToUpdate describes how to update a subscription.
459type SubscriptionConfigToUpdate struct {
460	// If non-nil, the push config is changed.
461	PushConfig *PushConfig
462
463	// If non-zero, the ack deadline is changed.
464	AckDeadline time.Duration
465
466	// If set, RetainAckedMessages is changed.
467	RetainAckedMessages optional.Bool
468
469	// If non-zero, RetentionDuration is changed.
470	RetentionDuration time.Duration
471
472	// If non-zero, Expiration is changed.
473	ExpirationPolicy optional.Duration
474
475	// If non-nil, DeadLetterPolicy is changed.
476	//
477	// It is EXPERIMENTAL and a part of a closed alpha that may not be
478	// accessible to all users.
479	DeadLetterPolicy *DeadLetterPolicy
480
481	// If non-nil, the current set of labels is completely
482	// replaced by the new set.
483	// This field has beta status. It is not subject to the stability guarantee
484	// and may change.
485	Labels map[string]string
486}
487
488// Update changes an existing subscription according to the fields set in cfg.
489// It returns the new SubscriptionConfig.
490//
491// Update returns an error if no fields were modified.
492func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
493	req := s.updateRequest(&cfg)
494	if err := cfg.validate(); err != nil {
495		return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err)
496	}
497	if len(req.UpdateMask.Paths) == 0 {
498		return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
499	}
500	rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
501	if err != nil {
502		return SubscriptionConfig{}, err
503	}
504	return protoToSubscriptionConfig(rpsub, s.c)
505}
506
507func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
508	psub := &pb.Subscription{Name: s.name}
509	var paths []string
510	if cfg.PushConfig != nil {
511		psub.PushConfig = cfg.PushConfig.toProto()
512		paths = append(paths, "push_config")
513	}
514	if cfg.AckDeadline != 0 {
515		psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
516		paths = append(paths, "ack_deadline_seconds")
517	}
518	if cfg.RetainAckedMessages != nil {
519		psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
520		paths = append(paths, "retain_acked_messages")
521	}
522	if cfg.RetentionDuration != 0 {
523		psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
524		paths = append(paths, "message_retention_duration")
525	}
526	if cfg.ExpirationPolicy != nil {
527		psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
528		paths = append(paths, "expiration_policy")
529	}
530	if cfg.DeadLetterPolicy != nil {
531		psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
532		paths = append(paths, "dead_letter_policy")
533	}
534	if cfg.Labels != nil {
535		psub.Labels = cfg.Labels
536		paths = append(paths, "labels")
537	}
538	return &pb.UpdateSubscriptionRequest{
539		Subscription: psub,
540		UpdateMask:   &fmpb.FieldMask{Paths: paths},
541	}
542}
543
544const (
545	// The minimum expiration policy duration is 1 day as per:
546	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
547	minExpirationPolicy = 24 * time.Hour
548
549	// If an expiration policy is not specified, the default of 31 days is used as per:
550	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
551	defaultExpirationPolicy = 31 * 24 * time.Hour
552)
553
554func (cfg *SubscriptionConfigToUpdate) validate() error {
555	if cfg == nil || cfg.ExpirationPolicy == nil {
556		return nil
557	}
558	policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
559	if policy == 0 || policy >= min {
560		return nil
561	}
562	return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min)
563}
564
565func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
566	if expirationPolicy == nil {
567		return nil
568	}
569
570	dur := optional.ToDuration(expirationPolicy)
571	var ttl *durpb.Duration
572	// As per:
573	//    https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl
574	// if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire.
575	if dur != 0 {
576		ttl = ptypes.DurationProto(dur)
577	}
578	return &pb.ExpirationPolicy{
579		Ttl: ttl,
580	}
581}
582
583// IAM returns the subscription's IAM handle.
584func (s *Subscription) IAM() *iam.Handle {
585	return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
586}
587
588// CreateSubscription creates a new subscription on a topic.
589//
590// id is the name of the subscription to create. It must start with a letter,
591// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
592// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
593// must be between 3 and 255 characters in length, and must not start with
594// "goog".
595//
596// cfg.Topic is the topic from which the subscription should receive messages. It
597// need not belong to the same project as the subscription. This field is required.
598//
599// cfg.AckDeadline is the maximum time after a subscriber receives a message before
600// the subscriber should acknowledge the message. It must be between 10 and 600
601// seconds (inclusive), and is rounded down to the nearest second. If the
602// provided ackDeadline is 0, then the default value of 10 seconds is used.
603// Note: messages which are obtained via Subscription.Receive need not be
604// acknowledged within this deadline, as the deadline will be automatically
605// extended.
606//
607// cfg.PushConfig may be set to configure this subscription for push delivery.
608//
609// If the subscription already exists an error will be returned.
610func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
611	if cfg.Topic == nil {
612		return nil, errors.New("pubsub: require non-nil Topic")
613	}
614	if cfg.AckDeadline == 0 {
615		cfg.AckDeadline = 10 * time.Second
616	}
617	if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
618		return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
619	}
620
621	sub := c.Subscription(id)
622	_, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
623	if err != nil {
624		return nil, err
625	}
626	return sub, nil
627}
628
629var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
630
631// Receive calls f with the outstanding messages from the subscription.
632// It blocks until ctx is done, or the service returns a non-retryable error.
633//
634// The standard way to terminate a Receive is to cancel its context:
635//
636//   cctx, cancel := context.WithCancel(ctx)
637//   err := sub.Receive(cctx, callback)
638//   // Call cancel from callback, or another goroutine.
639//
640// If the service returns a non-retryable error, Receive returns that error after
641// all of the outstanding calls to f have returned. If ctx is done, Receive
642// returns nil after all of the outstanding calls to f have returned and
643// all messages have been acknowledged or have expired.
644//
645// Receive calls f concurrently from multiple goroutines. It is encouraged to
646// process messages synchronously in f, even if that processing is relatively
647// time-consuming; Receive will spawn new goroutines for incoming messages,
648// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
649//
650// The context passed to f will be canceled when ctx is Done or there is a
651// fatal service error.
652//
653// Receive will send an ack deadline extension on message receipt, then
654// automatically extend the ack deadline of all fetched Messages up to the
655// period specified by s.ReceiveSettings.MaxExtension.
656//
657// Each Subscription may have only one invocation of Receive active at a time.
658func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
659	s.mu.Lock()
660	if s.receiveActive {
661		s.mu.Unlock()
662		return errReceiveInProgress
663	}
664	s.receiveActive = true
665	s.mu.Unlock()
666	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
667
668	maxCount := s.ReceiveSettings.MaxOutstandingMessages
669	if maxCount == 0 {
670		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
671	}
672	maxBytes := s.ReceiveSettings.MaxOutstandingBytes
673	if maxBytes == 0 {
674		maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
675	}
676	maxExt := s.ReceiveSettings.MaxExtension
677	if maxExt == 0 {
678		maxExt = DefaultReceiveSettings.MaxExtension
679	} else if maxExt < 0 {
680		// If MaxExtension is negative, disable automatic extension.
681		maxExt = 0
682	}
683	var numGoroutines int
684	switch {
685	case s.ReceiveSettings.Synchronous:
686		numGoroutines = 1
687	case s.ReceiveSettings.NumGoroutines >= 1:
688		numGoroutines = s.ReceiveSettings.NumGoroutines
689	default:
690		numGoroutines = DefaultReceiveSettings.NumGoroutines
691	}
692	// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
693	po := &pullOptions{
694		maxExtension: maxExt,
695		maxPrefetch:  trunc32(int64(maxCount)),
696		synchronous:  s.ReceiveSettings.Synchronous,
697	}
698	fc := newFlowController(maxCount, maxBytes)
699
700	// Wait for all goroutines started by Receive to return, so instead of an
701	// obscure goroutine leak we have an obvious blocked call to Receive.
702	group, gctx := errgroup.WithContext(ctx)
703	for i := 0; i < numGoroutines; i++ {
704		group.Go(func() error {
705			return s.receive(gctx, po, fc, f)
706		})
707	}
708	return group.Wait()
709}
710
711func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
712	// Cancel a sub-context when we return, to kick the context-aware callbacks
713	// and the goroutine below.
714	ctx2, cancel := context.WithCancel(ctx)
715	// The iterator does not use the context passed to Receive. If it did, canceling
716	// that context would immediately stop the iterator without waiting for unacked
717	// messages.
718	iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtensionPeriod, po)
719
720	// We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
721	// and group.Wait cannot be called concurrently with group.Go. We give each receive() its
722	// own WaitGroup instead.
723	// Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
724	// to be called after all Adds.
725	var wg sync.WaitGroup
726	wg.Add(1)
727	go func() {
728		<-ctx2.Done()
729		// Call stop when Receive's context is done.
730		// Stop will block until all outstanding messages have been acknowledged
731		// or there was a fatal service error.
732		iter.stop()
733		wg.Done()
734	}()
735	defer wg.Wait()
736
737	defer cancel()
738	for {
739		var maxToPull int32 // maximum number of messages to pull
740		if po.synchronous {
741			if po.maxPrefetch < 0 {
742				// If there is no limit on the number of messages to pull, use a reasonable default.
743				maxToPull = 1000
744			} else {
745				// Limit the number of messages in memory to MaxOutstandingMessages
746				// (here, po.maxPrefetch). For each message currently in memory, we have
747				// called fc.acquire but not fc.release: this is fc.count(). The next
748				// call to Pull should fetch no more than the difference between these
749				// values.
750				maxToPull = po.maxPrefetch - int32(fc.count())
751				if maxToPull <= 0 {
752					// Wait for some callbacks to finish.
753					if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
754						// Return nil if the context is done, not err.
755						return nil
756					}
757					continue
758				}
759			}
760		}
761		msgs, err := iter.receive(maxToPull)
762		if err == io.EOF {
763			return nil
764		}
765		if err != nil {
766			return err
767		}
768		for i, msg := range msgs {
769			msg := msg
770			// TODO(jba): call acquire closer to when the message is allocated.
771			if err := fc.acquire(ctx, len(msg.Data)); err != nil {
772				// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
773				for _, m := range msgs[i:] {
774					m.Nack()
775				}
776				// Return nil if the context is done, not err.
777				return nil
778			}
779			old := msg.doneFunc
780			msgLen := len(msg.Data)
781			msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
782				defer fc.release(msgLen)
783				old(ackID, ack, receiveTime)
784			}
785			wg.Add(1)
786			go func() {
787				defer wg.Done()
788				f(ctx2, msg)
789			}()
790		}
791	}
792}
793
794type pullOptions struct {
795	maxExtension time.Duration
796	maxPrefetch  int32
797	// If true, use unary Pull instead of StreamingPull, and never pull more
798	// than maxPrefetch messages.
799	synchronous bool
800}
801