1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"strings"
23	"sync"
24	"time"
25
26	"cloud.google.com/go/iam"
27	"cloud.google.com/go/internal/optional"
28	"cloud.google.com/go/pubsub/internal/scheduler"
29	"github.com/golang/protobuf/ptypes"
30	durpb "github.com/golang/protobuf/ptypes/duration"
31	gax "github.com/googleapis/gax-go/v2"
32	"golang.org/x/sync/errgroup"
33	pb "google.golang.org/genproto/googleapis/pubsub/v1"
34	fmpb "google.golang.org/genproto/protobuf/field_mask"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/status"
37)
38
39// Subscription is a reference to a PubSub subscription.
40type Subscription struct {
41	c *Client
42
43	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
44	name string
45
46	// Settings for pulling messages. Configure these before calling Receive.
47	ReceiveSettings ReceiveSettings
48
49	mu            sync.Mutex
50	receiveActive bool
51}
52
53// Subscription creates a reference to a subscription.
54func (c *Client) Subscription(id string) *Subscription {
55	return c.SubscriptionInProject(id, c.projectID)
56}
57
58// SubscriptionInProject creates a reference to a subscription in a given project.
59func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
60	return &Subscription{
61		c:    c,
62		name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
63	}
64}
65
66// String returns the globally unique printable name of the subscription.
67func (s *Subscription) String() string {
68	return s.name
69}
70
71// ID returns the unique identifier of the subscription within its project.
72func (s *Subscription) ID() string {
73	slash := strings.LastIndex(s.name, "/")
74	if slash == -1 {
75		// name is not a fully-qualified name.
76		panic("bad subscription name")
77	}
78	return s.name[slash+1:]
79}
80
81// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
82func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
83	it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
84		Project: c.fullyQualifiedProjectName(),
85	})
86	return &SubscriptionIterator{
87		c: c,
88		next: func() (string, error) {
89			sub, err := it.Next()
90			if err != nil {
91				return "", err
92			}
93			return sub.Name, nil
94		},
95	}
96}
97
98// SubscriptionIterator is an iterator that returns a series of subscriptions.
99type SubscriptionIterator struct {
100	c    *Client
101	next func() (string, error)
102}
103
104// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
105func (subs *SubscriptionIterator) Next() (*Subscription, error) {
106	subName, err := subs.next()
107	if err != nil {
108		return nil, err
109	}
110	return &Subscription{c: subs.c, name: subName}, nil
111}
112
113// PushConfig contains configuration for subscriptions that operate in push mode.
114type PushConfig struct {
115	// A URL locating the endpoint to which messages should be pushed.
116	Endpoint string
117
118	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
119	Attributes map[string]string
120
121	// AuthenticationMethod is used by push endpoints to verify the source
122	// of push requests.
123	// It can be used with push endpoints that are private by default to
124	// allow requests only from the Cloud Pub/Sub system, for example.
125	// This field is optional and should be set only by users interested in
126	// authenticated push.
127	AuthenticationMethod AuthenticationMethod
128}
129
130func (pc *PushConfig) toProto() *pb.PushConfig {
131	if pc == nil {
132		return nil
133	}
134	pbCfg := &pb.PushConfig{
135		Attributes:   pc.Attributes,
136		PushEndpoint: pc.Endpoint,
137	}
138	if authMethod := pc.AuthenticationMethod; authMethod != nil {
139		switch am := authMethod.(type) {
140		case *OIDCToken:
141			pbCfg.AuthenticationMethod = am.toProto()
142		default: // TODO: add others here when GAIC adds more definitions.
143		}
144	}
145	return pbCfg
146}
147
148// AuthenticationMethod is used by push points to verify the source of push requests.
149// This interface defines fields that are part of a closed alpha that may not be accessible
150// to all users.
151type AuthenticationMethod interface {
152	isAuthMethod() bool
153}
154
155// OIDCToken allows PushConfigs to be authenticated using
156// the OpenID Connect protocol https://openid.net/connect/
157type OIDCToken struct {
158	// Audience to be used when generating OIDC token. The audience claim
159	// identifies the recipients that the JWT is intended for. The audience
160	// value is a single case-sensitive string. Having multiple values (array)
161	// for the audience field is not supported. More info about the OIDC JWT
162	// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
163	// Note: if not specified, the Push endpoint URL will be used.
164	Audience string
165
166	// The service account email to be used for generating the OpenID Connect token.
167	// The caller of:
168	//  * CreateSubscription
169	//  * UpdateSubscription
170	//  * ModifyPushConfig
171	// calls must have the iam.serviceAccounts.actAs permission for the service account.
172	// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
173	ServiceAccountEmail string
174}
175
176var _ AuthenticationMethod = (*OIDCToken)(nil)
177
178func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
179
180func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
181	if oidcToken == nil {
182		return nil
183	}
184	return &pb.PushConfig_OidcToken_{
185		OidcToken: &pb.PushConfig_OidcToken{
186			Audience:            oidcToken.Audience,
187			ServiceAccountEmail: oidcToken.ServiceAccountEmail,
188		},
189	}
190}
191
192// SubscriptionConfig describes the configuration of a subscription.
193type SubscriptionConfig struct {
194	Topic      *Topic
195	PushConfig PushConfig
196
197	// The default maximum time after a subscriber receives a message before
198	// the subscriber should acknowledge the message. Note: messages which are
199	// obtained via Subscription.Receive need not be acknowledged within this
200	// deadline, as the deadline will be automatically extended.
201	AckDeadline time.Duration
202
203	// Whether to retain acknowledged messages. If true, acknowledged messages
204	// will not be expunged until they fall out of the RetentionDuration window.
205	RetainAckedMessages bool
206
207	// How long to retain messages in backlog, from the time of publish. If
208	// RetainAckedMessages is true, this duration affects the retention of
209	// acknowledged messages, otherwise only unacknowledged messages are retained.
210	// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
211	RetentionDuration time.Duration
212
213	// Expiration policy specifies the conditions for a subscription's expiration.
214	// A subscription is considered active as long as any connected subscriber is
215	// successfully consuming messages from the subscription or is issuing
216	// operations on the subscription. If `expiration_policy` is not set, a
217	// *default policy* with `ttl` of 31 days will be used. The minimum allowed
218	// value for `expiration_policy.ttl` is 1 day.
219	//
220	// Use time.Duration(0) to indicate that the subscription should never expire.
221	ExpirationPolicy optional.Duration
222
223	// The set of labels for the subscription.
224	Labels map[string]string
225
226	// EnableMessageOrdering enables message ordering.
227	//
228	// It is EXPERIMENTAL and a part of a closed alpha that may not be
229	// accessible to all users. This field is subject to change or removal
230	// without notice.
231	EnableMessageOrdering bool
232
233	// DeadLetterPolicy specifies the conditions for dead lettering messages in
234	// a subscription. If not set, dead lettering is disabled.
235	DeadLetterPolicy *DeadLetterPolicy
236
237	// Filter is an expression written in the Cloud Pub/Sub filter language. If
238	// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
239	// filter are delivered on this subscription. If empty, then no messages are
240	// filtered out. Cannot be changed after the subscription is created.
241	//
242	// It is EXPERIMENTAL and a part of a closed alpha that may not be
243	// accessible to all users. This field is subject to change or removal
244	// without notice.
245	Filter string
246
247	// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
248	RetryPolicy *RetryPolicy
249
250	// Detached indicates whether the subscription is detached from its topic.
251	// Detached subscriptions don't receive messages from their topic and don't
252	// retain any backlog. `Pull` and `StreamingPull` requests will return
253	// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
254	// the endpoint will not be made.
255	Detached bool
256}
257
258func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
259	var pbPushConfig *pb.PushConfig
260	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
261		pbPushConfig = cfg.PushConfig.toProto()
262	}
263	var retentionDuration *durpb.Duration
264	if cfg.RetentionDuration != 0 {
265		retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
266	}
267	var pbDeadLetter *pb.DeadLetterPolicy
268	if cfg.DeadLetterPolicy != nil {
269		pbDeadLetter = cfg.DeadLetterPolicy.toProto()
270	}
271	var pbRetryPolicy *pb.RetryPolicy
272	if cfg.RetryPolicy != nil {
273		pbRetryPolicy = cfg.RetryPolicy.toProto()
274	}
275	return &pb.Subscription{
276		Name:                     name,
277		Topic:                    cfg.Topic.name,
278		PushConfig:               pbPushConfig,
279		AckDeadlineSeconds:       trunc32(int64(cfg.AckDeadline.Seconds())),
280		RetainAckedMessages:      cfg.RetainAckedMessages,
281		MessageRetentionDuration: retentionDuration,
282		Labels:                   cfg.Labels,
283		ExpirationPolicy:         expirationPolicyToProto(cfg.ExpirationPolicy),
284		EnableMessageOrdering:    cfg.EnableMessageOrdering,
285		DeadLetterPolicy:         pbDeadLetter,
286		Filter:                   cfg.Filter,
287		RetryPolicy:              pbRetryPolicy,
288		Detached:                 cfg.Detached,
289	}
290}
291
292func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
293	rd := time.Hour * 24 * 7
294	var err error
295	if pbSub.MessageRetentionDuration != nil {
296		rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
297		if err != nil {
298			return SubscriptionConfig{}, err
299		}
300	}
301	var expirationPolicy time.Duration
302	if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
303		expirationPolicy, err = ptypes.Duration(ttl)
304		if err != nil {
305			return SubscriptionConfig{}, err
306		}
307	}
308	dlp := protoToDLP(pbSub.DeadLetterPolicy)
309	rp := protoToRetryPolicy(pbSub.RetryPolicy)
310	subC := SubscriptionConfig{
311		Topic:               newTopic(c, pbSub.Topic),
312		AckDeadline:         time.Second * time.Duration(pbSub.AckDeadlineSeconds),
313		RetainAckedMessages: pbSub.RetainAckedMessages,
314		RetentionDuration:   rd,
315		Labels:              pbSub.Labels,
316		ExpirationPolicy:    expirationPolicy,
317		DeadLetterPolicy:    dlp,
318		Filter:              pbSub.Filter,
319		RetryPolicy:         rp,
320		Detached:            pbSub.Detached,
321	}
322	pc := protoToPushConfig(pbSub.PushConfig)
323	if pc != nil {
324		subC.PushConfig = *pc
325	}
326	return subC, nil
327}
328
329func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
330	if pbPc == nil {
331		return nil
332	}
333	pc := &PushConfig{
334		Endpoint:   pbPc.PushEndpoint,
335		Attributes: pbPc.Attributes,
336	}
337	if am := pbPc.AuthenticationMethod; am != nil {
338		if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
339			pc.AuthenticationMethod = &OIDCToken{
340				Audience:            oidcToken.OidcToken.GetAudience(),
341				ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
342			}
343		}
344	}
345	return pc
346}
347
348// DeadLetterPolicy specifies the conditions for dead lettering messages in
349// a subscription.
350type DeadLetterPolicy struct {
351	DeadLetterTopic     string
352	MaxDeliveryAttempts int
353}
354
355func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
356	if dlp == nil || dlp.DeadLetterTopic == "" {
357		return nil
358	}
359	return &pb.DeadLetterPolicy{
360		DeadLetterTopic:     dlp.DeadLetterTopic,
361		MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
362	}
363}
364func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
365	if pbDLP == nil {
366		return nil
367	}
368	return &DeadLetterPolicy{
369		DeadLetterTopic:     pbDLP.GetDeadLetterTopic(),
370		MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
371	}
372}
373
374// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
375//
376// Retry delay will be exponential based on provided minimum and maximum
377// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
378//
379// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
380// events for a given message.
381//
382// Retry Policy is implemented on a best effort basis. At times, the delay
383// between consecutive deliveries may not match the configuration. That is,
384// delay can be more or less than configured backoff.
385type RetryPolicy struct {
386	// MinimumBackoff is the minimum delay between consecutive deliveries of a
387	// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
388	MinimumBackoff optional.Duration
389	// MaximumBackoff is the maximum delay between consecutive deliveries of a
390	// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
391	MaximumBackoff optional.Duration
392}
393
394func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
395	if rp == nil {
396		return nil
397	}
398	// If RetryPolicy is the empty struct, take this as an instruction
399	// to remove RetryPolicy from the subscription.
400	if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
401		return nil
402	}
403
404	// Initialize minDur and maxDur to be negative, such that if the conversion from an
405	// optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
406	var minDur time.Duration = -1
407	var maxDur time.Duration = -1
408	if rp.MinimumBackoff != nil {
409		minDur = optional.ToDuration(rp.MinimumBackoff)
410	}
411	if rp.MaximumBackoff != nil {
412		maxDur = optional.ToDuration(rp.MaximumBackoff)
413	}
414
415	var minDurPB, maxDurPB *durpb.Duration
416	if minDur > 0 {
417		minDurPB = ptypes.DurationProto(minDur)
418	}
419	if maxDur > 0 {
420		maxDurPB = ptypes.DurationProto(maxDur)
421	}
422
423	return &pb.RetryPolicy{
424		MinimumBackoff: minDurPB,
425		MaximumBackoff: maxDurPB,
426	}
427}
428
429func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
430	if rp == nil {
431		return nil
432	}
433	var minBackoff, maxBackoff time.Duration
434	var err error
435	if rp.MinimumBackoff != nil {
436		minBackoff, err = ptypes.Duration(rp.MinimumBackoff)
437		if err != nil {
438			return nil
439		}
440	}
441	if rp.MaximumBackoff != nil {
442		maxBackoff, err = ptypes.Duration(rp.MaximumBackoff)
443		if err != nil {
444			return nil
445		}
446	}
447
448	retryPolicy := &RetryPolicy{
449		MinimumBackoff: minBackoff,
450		MaximumBackoff: maxBackoff,
451	}
452	return retryPolicy
453}
454
455// ReceiveSettings configure the Receive method.
456// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
457type ReceiveSettings struct {
458	// MaxExtension is the maximum period for which the Subscription should
459	// automatically extend the ack deadline for each message.
460	//
461	// The Subscription will automatically extend the ack deadline of all
462	// fetched Messages up to the duration specified. Automatic deadline
463	// extension beyond the initial receipt may be disabled by specifying a
464	// duration less than 0.
465	MaxExtension time.Duration
466
467	// MaxExtensionPeriod is the maximum duration by which to extend the ack
468	// deadline at a time. The ack deadline will continue to be extended by up
469	// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
470	// bounds the maximum amount of time before a message redelivery in the
471	// event the subscriber fails to extend the deadline.
472	//
473	// MaxExtensionPeriod configuration can be disabled by specifying a
474	// duration less than (or equal to) 0.
475	MaxExtensionPeriod time.Duration
476
477	// MaxOutstandingMessages is the maximum number of unprocessed messages
478	// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
479	// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
480	// If the value is negative, then there will be no limit on the number of
481	// unprocessed messages.
482	MaxOutstandingMessages int
483
484	// MaxOutstandingBytes is the maximum size of unprocessed messages
485	// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
486	// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
487	// the value is negative, then there will be no limit on the number of bytes
488	// for unprocessed messages.
489	MaxOutstandingBytes int
490
491	// NumGoroutines is the number of goroutines that each datastructure along
492	// the Receive path will spawn. Adjusting this value adjusts concurrency
493	// along the receive path.
494	//
495	// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
496	//
497	// NumGoroutines does not limit the number of messages that can be processed
498	// concurrently. Even with one goroutine, many messages might be processed at
499	// once, because that goroutine may continually receive messages and invoke the
500	// function passed to Receive on them. To limit the number of messages being
501	// processed concurrently, set MaxOutstandingMessages.
502	NumGoroutines int
503
504	// If Synchronous is true, then no more than MaxOutstandingMessages will be in
505	// memory at one time. (In contrast, when Synchronous is false, more than
506	// MaxOutstandingMessages may have been received from the service and in memory
507	// before being processed.) MaxOutstandingBytes still refers to the total bytes
508	// processed, rather than in memory. NumGoroutines is ignored.
509	// The default is false.
510	Synchronous bool
511}
512
513// For synchronous receive, the time to wait if we are already processing
514// MaxOutstandingMessages. There is no point calling Pull and asking for zero
515// messages, so we pause to allow some message-processing callbacks to finish.
516//
517// The wait time is large enough to avoid consuming significant CPU, but
518// small enough to provide decent throughput. Users who want better
519// throughput should not be using synchronous mode.
520//
521// Waiting might seem like polling, so it's natural to think we could do better by
522// noticing when a callback is finished and immediately calling Pull. But if
523// callbacks finish in quick succession, this will result in frequent Pull RPCs that
524// request a single message, which wastes network bandwidth. Better to wait for a few
525// callbacks to finish, so we make fewer RPCs fetching more messages.
526//
527// This value is unexported so the user doesn't have another knob to think about. Note that
528// it is the same value as the one used for nackTicker, so it matches this client's
529// idea of a duration that is short, but not so short that we perform excessive RPCs.
530const synchronousWaitTime = 100 * time.Millisecond
531
532// This is a var so that tests can change it.
533var minAckDeadline = 10 * time.Second
534
535// DefaultReceiveSettings holds the default values for ReceiveSettings.
536var DefaultReceiveSettings = ReceiveSettings{
537	MaxExtension:           60 * time.Minute,
538	MaxExtensionPeriod:     0,
539	MaxOutstandingMessages: 1000,
540	MaxOutstandingBytes:    1e9, // 1G
541	NumGoroutines:          10,
542}
543
544// Delete deletes the subscription.
545func (s *Subscription) Delete(ctx context.Context) error {
546	return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
547}
548
549// Exists reports whether the subscription exists on the server.
550func (s *Subscription) Exists(ctx context.Context) (bool, error) {
551	_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
552	if err == nil {
553		return true, nil
554	}
555	if status.Code(err) == codes.NotFound {
556		return false, nil
557	}
558	return false, err
559}
560
561// Config fetches the current configuration for the subscription.
562func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
563	pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
564	if err != nil {
565		return SubscriptionConfig{}, err
566	}
567	cfg, err := protoToSubscriptionConfig(pbSub, s.c)
568	if err != nil {
569		return SubscriptionConfig{}, err
570	}
571	return cfg, nil
572}
573
574// SubscriptionConfigToUpdate describes how to update a subscription.
575type SubscriptionConfigToUpdate struct {
576	// If non-nil, the push config is changed.
577	PushConfig *PushConfig
578
579	// If non-zero, the ack deadline is changed.
580	AckDeadline time.Duration
581
582	// If set, RetainAckedMessages is changed.
583	RetainAckedMessages optional.Bool
584
585	// If non-zero, RetentionDuration is changed.
586	RetentionDuration time.Duration
587
588	// If non-zero, Expiration is changed.
589	ExpirationPolicy optional.Duration
590
591	// If non-nil, DeadLetterPolicy is changed. To remove dead lettering from
592	// a subscription, use the zero value for this struct.
593	DeadLetterPolicy *DeadLetterPolicy
594
595	// If non-nil, the current set of labels is completely
596	// replaced by the new set.
597	// This field has beta status. It is not subject to the stability guarantee
598	// and may change.
599	Labels map[string]string
600
601	// If non-nil, RetryPolicy is changed. To remove an existing retry policy
602	// (to redeliver messages as soon as possible) use a pointer to the zero value
603	// for this struct.
604	RetryPolicy *RetryPolicy
605}
606
607// Update changes an existing subscription according to the fields set in cfg.
608// It returns the new SubscriptionConfig.
609//
610// Update returns an error if no fields were modified.
611func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
612	req := s.updateRequest(&cfg)
613	if err := cfg.validate(); err != nil {
614		return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err)
615	}
616	if len(req.UpdateMask.Paths) == 0 {
617		return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
618	}
619	rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
620	if err != nil {
621		return SubscriptionConfig{}, err
622	}
623	return protoToSubscriptionConfig(rpsub, s.c)
624}
625
626func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
627	psub := &pb.Subscription{Name: s.name}
628	var paths []string
629	if cfg.PushConfig != nil {
630		psub.PushConfig = cfg.PushConfig.toProto()
631		paths = append(paths, "push_config")
632	}
633	if cfg.AckDeadline != 0 {
634		psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
635		paths = append(paths, "ack_deadline_seconds")
636	}
637	if cfg.RetainAckedMessages != nil {
638		psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
639		paths = append(paths, "retain_acked_messages")
640	}
641	if cfg.RetentionDuration != 0 {
642		psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
643		paths = append(paths, "message_retention_duration")
644	}
645	if cfg.ExpirationPolicy != nil {
646		psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
647		paths = append(paths, "expiration_policy")
648	}
649	if cfg.DeadLetterPolicy != nil {
650		psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
651		paths = append(paths, "dead_letter_policy")
652	}
653	if cfg.Labels != nil {
654		psub.Labels = cfg.Labels
655		paths = append(paths, "labels")
656	}
657	if cfg.RetryPolicy != nil {
658		psub.RetryPolicy = cfg.RetryPolicy.toProto()
659		paths = append(paths, "retry_policy")
660	}
661	return &pb.UpdateSubscriptionRequest{
662		Subscription: psub,
663		UpdateMask:   &fmpb.FieldMask{Paths: paths},
664	}
665}
666
667const (
668	// The minimum expiration policy duration is 1 day as per:
669	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
670	minExpirationPolicy = 24 * time.Hour
671
672	// If an expiration policy is not specified, the default of 31 days is used as per:
673	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
674	defaultExpirationPolicy = 31 * 24 * time.Hour
675)
676
677func (cfg *SubscriptionConfigToUpdate) validate() error {
678	if cfg == nil || cfg.ExpirationPolicy == nil {
679		return nil
680	}
681	expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
682	if expPolicy != 0 && expPolicy < min {
683		return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min)
684	}
685	return nil
686}
687
688func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
689	if expirationPolicy == nil {
690		return nil
691	}
692
693	dur := optional.ToDuration(expirationPolicy)
694	var ttl *durpb.Duration
695	// As per:
696	//    https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl
697	// if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire.
698	if dur != 0 {
699		ttl = ptypes.DurationProto(dur)
700	}
701	return &pb.ExpirationPolicy{
702		Ttl: ttl,
703	}
704}
705
706// IAM returns the subscription's IAM handle.
707func (s *Subscription) IAM() *iam.Handle {
708	return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
709}
710
711// CreateSubscription creates a new subscription on a topic.
712//
713// id is the name of the subscription to create. It must start with a letter,
714// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
715// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
716// must be between 3 and 255 characters in length, and must not start with
717// "goog".
718//
719// cfg.Topic is the topic from which the subscription should receive messages. It
720// need not belong to the same project as the subscription. This field is required.
721//
722// cfg.AckDeadline is the maximum time after a subscriber receives a message before
723// the subscriber should acknowledge the message. It must be between 10 and 600
724// seconds (inclusive), and is rounded down to the nearest second. If the
725// provided ackDeadline is 0, then the default value of 10 seconds is used.
726// Note: messages which are obtained via Subscription.Receive need not be
727// acknowledged within this deadline, as the deadline will be automatically
728// extended.
729//
730// cfg.PushConfig may be set to configure this subscription for push delivery.
731//
732// If the subscription already exists an error will be returned.
733func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
734	if cfg.Topic == nil {
735		return nil, errors.New("pubsub: require non-nil Topic")
736	}
737	if cfg.AckDeadline == 0 {
738		cfg.AckDeadline = 10 * time.Second
739	}
740	if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
741		return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
742	}
743
744	sub := c.Subscription(id)
745	_, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
746	if err != nil {
747		return nil, err
748	}
749	return sub, nil
750}
751
752var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
753
754// Receive calls f with the outstanding messages from the subscription.
755// It blocks until ctx is done, or the service returns a non-retryable error.
756//
757// The standard way to terminate a Receive is to cancel its context:
758//
759//   cctx, cancel := context.WithCancel(ctx)
760//   err := sub.Receive(cctx, callback)
761//   // Call cancel from callback, or another goroutine.
762//
763// If the service returns a non-retryable error, Receive returns that error after
764// all of the outstanding calls to f have returned. If ctx is done, Receive
765// returns nil after all of the outstanding calls to f have returned and
766// all messages have been acknowledged or have expired.
767//
768// Receive calls f concurrently from multiple goroutines. It is encouraged to
769// process messages synchronously in f, even if that processing is relatively
770// time-consuming; Receive will spawn new goroutines for incoming messages,
771// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
772//
773// The context passed to f will be canceled when ctx is Done or there is a
774// fatal service error.
775//
776// Receive will send an ack deadline extension on message receipt, then
777// automatically extend the ack deadline of all fetched Messages up to the
778// period specified by s.ReceiveSettings.MaxExtension.
779//
780// Each Subscription may have only one invocation of Receive active at a time.
781func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
782	s.mu.Lock()
783	if s.receiveActive {
784		s.mu.Unlock()
785		return errReceiveInProgress
786	}
787	s.receiveActive = true
788	s.mu.Unlock()
789	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
790
791	maxCount := s.ReceiveSettings.MaxOutstandingMessages
792	if maxCount == 0 {
793		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
794	}
795	maxBytes := s.ReceiveSettings.MaxOutstandingBytes
796	if maxBytes == 0 {
797		maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
798	}
799	maxExt := s.ReceiveSettings.MaxExtension
800	if maxExt == 0 {
801		maxExt = DefaultReceiveSettings.MaxExtension
802	} else if maxExt < 0 {
803		// If MaxExtension is negative, disable automatic extension.
804		maxExt = 0
805	}
806	var numGoroutines int
807	switch {
808	case s.ReceiveSettings.Synchronous:
809		numGoroutines = 1
810	case s.ReceiveSettings.NumGoroutines >= 1:
811		numGoroutines = s.ReceiveSettings.NumGoroutines
812	default:
813		numGoroutines = DefaultReceiveSettings.NumGoroutines
814	}
815	// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
816	po := &pullOptions{
817		maxExtension: maxExt,
818		maxPrefetch:  trunc32(int64(maxCount)),
819		synchronous:  s.ReceiveSettings.Synchronous,
820	}
821	fc := newFlowController(maxCount, maxBytes)
822
823	sched := scheduler.NewReceiveScheduler(maxCount)
824
825	// Wait for all goroutines started by Receive to return, so instead of an
826	// obscure goroutine leak we have an obvious blocked call to Receive.
827	group, gctx := errgroup.WithContext(ctx)
828
829	type closeablePair struct {
830		wg   *sync.WaitGroup
831		iter *messageIterator
832	}
833
834	var pairs []closeablePair
835
836	// Cancel a sub-context which, when we finish a single receiver, will kick
837	// off the context-aware callbacks and the goroutine below (which stops
838	// all receivers, iterators, and the scheduler).
839	ctx2, cancel2 := context.WithCancel(gctx)
840	defer cancel2()
841
842	for i := 0; i < numGoroutines; i++ {
843		// The iterator does not use the context passed to Receive. If it did,
844		// canceling that context would immediately stop the iterator without
845		// waiting for unacked messages.
846		iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po)
847
848		// We cannot use errgroup from Receive here. Receive might already be
849		// calling group.Wait, and group.Wait cannot be called concurrently with
850		// group.Go. We give each receive() its own WaitGroup instead.
851		//
852		// Since wg.Add is only called from the main goroutine, wg.Wait is
853		// guaranteed to be called after all Adds.
854		var wg sync.WaitGroup
855		wg.Add(1)
856		pairs = append(pairs, closeablePair{wg: &wg, iter: iter})
857
858		group.Go(func() error {
859			defer wg.Wait()
860			defer cancel2()
861			for {
862				var maxToPull int32 // maximum number of messages to pull
863				if po.synchronous {
864					if po.maxPrefetch < 0 {
865						// If there is no limit on the number of messages to
866						// pull, use a reasonable default.
867						maxToPull = 1000
868					} else {
869						// Limit the number of messages in memory to MaxOutstandingMessages
870						// (here, po.maxPrefetch). For each message currently in memory, we have
871						// called fc.acquire but not fc.release: this is fc.count(). The next
872						// call to Pull should fetch no more than the difference between these
873						// values.
874						maxToPull = po.maxPrefetch - int32(fc.count())
875						if maxToPull <= 0 {
876							// Wait for some callbacks to finish.
877							if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
878								// Return nil if the context is done, not err.
879								return nil
880							}
881							continue
882						}
883					}
884				}
885				msgs, err := iter.receive(maxToPull)
886				if err == io.EOF {
887					return nil
888				}
889				if err != nil {
890					return err
891				}
892				for i, msg := range msgs {
893					msg := msg
894					// TODO(jba): call acquire closer to when the message is allocated.
895					if err := fc.acquire(ctx, len(msg.Data)); err != nil {
896						// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
897						for _, m := range msgs[i:] {
898							m.Nack()
899						}
900						// Return nil if the context is done, not err.
901						return nil
902					}
903					old := msg.doneFunc
904					msgLen := len(msg.Data)
905					msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
906						defer fc.release(msgLen)
907						old(ackID, ack, receiveTime)
908					}
909					wg.Add(1)
910					// TODO(deklerk): Can we have a generic handler at the
911					// constructor level?
912					if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) {
913						defer wg.Done()
914						f(ctx2, msg.(*Message))
915					}); err != nil {
916						wg.Done()
917						return err
918					}
919				}
920			}
921		})
922	}
923
924	go func() {
925		<-ctx2.Done()
926
927		// Wait for all iterators to stop.
928		for _, p := range pairs {
929			p.iter.stop()
930			p.wg.Done()
931		}
932
933		// This _must_ happen after every iterator has stopped, or some
934		// iterator will still have undelivered messages but the scheduler will
935		// already be shut down.
936		sched.Shutdown()
937	}()
938
939	return group.Wait()
940}
941
942type pullOptions struct {
943	maxExtension time.Duration
944	maxPrefetch  int32
945	// If true, use unary Pull instead of StreamingPull, and never pull more
946	// than maxPrefetch messages.
947	synchronous bool
948}
949