1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 "github.com/golang/protobuf/ptypes" 29 durpb "github.com/golang/protobuf/ptypes/duration" 30 gax "github.com/googleapis/gax-go/v2" 31 "golang.org/x/sync/errgroup" 32 pb "google.golang.org/genproto/googleapis/pubsub/v1" 33 fmpb "google.golang.org/genproto/protobuf/field_mask" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/status" 36) 37 38// Subscription is a reference to a PubSub subscription. 39type Subscription struct { 40 c *Client 41 42 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 43 name string 44 45 // Settings for pulling messages. Configure these before calling Receive. 46 ReceiveSettings ReceiveSettings 47 48 mu sync.Mutex 49 receiveActive bool 50} 51 52// Subscription creates a reference to a subscription. 53func (c *Client) Subscription(id string) *Subscription { 54 return c.SubscriptionInProject(id, c.projectID) 55} 56 57// SubscriptionInProject creates a reference to a subscription in a given project. 58func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 59 return &Subscription{ 60 c: c, 61 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 62 } 63} 64 65// String returns the globally unique printable name of the subscription. 66func (s *Subscription) String() string { 67 return s.name 68} 69 70// ID returns the unique identifier of the subscription within its project. 71func (s *Subscription) ID() string { 72 slash := strings.LastIndex(s.name, "/") 73 if slash == -1 { 74 // name is not a fully-qualified name. 75 panic("bad subscription name") 76 } 77 return s.name[slash+1:] 78} 79 80// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 81func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 82 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 83 Project: c.fullyQualifiedProjectName(), 84 }) 85 return &SubscriptionIterator{ 86 c: c, 87 next: func() (string, error) { 88 sub, err := it.Next() 89 if err != nil { 90 return "", err 91 } 92 return sub.Name, nil 93 }, 94 } 95} 96 97// SubscriptionIterator is an iterator that returns a series of subscriptions. 98type SubscriptionIterator struct { 99 c *Client 100 next func() (string, error) 101} 102 103// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 104func (subs *SubscriptionIterator) Next() (*Subscription, error) { 105 subName, err := subs.next() 106 if err != nil { 107 return nil, err 108 } 109 return &Subscription{c: subs.c, name: subName}, nil 110} 111 112// PushConfig contains configuration for subscriptions that operate in push mode. 113type PushConfig struct { 114 // A URL locating the endpoint to which messages should be pushed. 115 Endpoint string 116 117 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 118 Attributes map[string]string 119 120 // AuthenticationMethod is used by push endpoints to verify the source 121 // of push requests. 122 // It can be used with push endpoints that are private by default to 123 // allow requests only from the Cloud Pub/Sub system, for example. 124 // This field is optional and should be set only by users interested in 125 // authenticated push. 126 // 127 // It is EXPERIMENTAL and a part of a closed alpha that may not be 128 // accessible to all users. This field is subject to change or removal 129 // without notice. 130 AuthenticationMethod AuthenticationMethod 131} 132 133func (pc *PushConfig) toProto() *pb.PushConfig { 134 if pc == nil { 135 return nil 136 } 137 pbCfg := &pb.PushConfig{ 138 Attributes: pc.Attributes, 139 PushEndpoint: pc.Endpoint, 140 } 141 if authMethod := pc.AuthenticationMethod; authMethod != nil { 142 switch am := authMethod.(type) { 143 case *OIDCToken: 144 pbCfg.AuthenticationMethod = am.toProto() 145 default: // TODO: add others here when GAIC adds more definitions. 146 } 147 } 148 return pbCfg 149} 150 151// AuthenticationMethod is used by push points to verify the source of push requests. 152// This interface defines fields that are part of a closed alpha that may not be accessible 153// to all users. 154type AuthenticationMethod interface { 155 isAuthMethod() bool 156} 157 158// OIDCToken allows PushConfigs to be authenticated using 159// the OpenID Connect protocol https://openid.net/connect/ 160type OIDCToken struct { 161 // Audience to be used when generating OIDC token. The audience claim 162 // identifies the recipients that the JWT is intended for. The audience 163 // value is a single case-sensitive string. Having multiple values (array) 164 // for the audience field is not supported. More info about the OIDC JWT 165 // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 166 // Note: if not specified, the Push endpoint URL will be used. 167 Audience string 168 169 // The service account email to be used for generating the OpenID Connect token. 170 // The caller of: 171 // * CreateSubscription 172 // * UpdateSubscription 173 // * ModifyPushConfig 174 // calls must have the iam.serviceAccounts.actAs permission for the service account. 175 // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. 176 ServiceAccountEmail string 177} 178 179var _ AuthenticationMethod = (*OIDCToken)(nil) 180 181func (oidcToken *OIDCToken) isAuthMethod() bool { return true } 182 183func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { 184 if oidcToken == nil { 185 return nil 186 } 187 return &pb.PushConfig_OidcToken_{ 188 OidcToken: &pb.PushConfig_OidcToken{ 189 Audience: oidcToken.Audience, 190 ServiceAccountEmail: oidcToken.ServiceAccountEmail, 191 }, 192 } 193} 194 195// SubscriptionConfig describes the configuration of a subscription. 196type SubscriptionConfig struct { 197 Topic *Topic 198 PushConfig PushConfig 199 200 // The default maximum time after a subscriber receives a message before 201 // the subscriber should acknowledge the message. Note: messages which are 202 // obtained via Subscription.Receive need not be acknowledged within this 203 // deadline, as the deadline will be automatically extended. 204 AckDeadline time.Duration 205 206 // Whether to retain acknowledged messages. If true, acknowledged messages 207 // will not be expunged until they fall out of the RetentionDuration window. 208 RetainAckedMessages bool 209 210 // How long to retain messages in backlog, from the time of publish. If 211 // RetainAckedMessages is true, this duration affects the retention of 212 // acknowledged messages, otherwise only unacknowledged messages are retained. 213 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 214 RetentionDuration time.Duration 215 216 // Expiration policy specifies the conditions for a subscription's expiration. 217 // A subscription is considered active as long as any connected subscriber is 218 // successfully consuming messages from the subscription or is issuing 219 // operations on the subscription. If `expiration_policy` is not set, a 220 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 221 // value for `expiration_policy.ttl` is 1 day. 222 // 223 // Use time.Duration(0) to indicate that the subscription should never expire. 224 // 225 // It is EXPERIMENTAL and subject to change or removal without notice. 226 ExpirationPolicy optional.Duration 227 228 // The set of labels for the subscription. 229 Labels map[string]string 230} 231 232func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 233 var pbPushConfig *pb.PushConfig 234 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { 235 pbPushConfig = cfg.PushConfig.toProto() 236 } 237 var retentionDuration *durpb.Duration 238 if cfg.RetentionDuration != 0 { 239 retentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 240 } 241 return &pb.Subscription{ 242 Name: name, 243 Topic: cfg.Topic.name, 244 PushConfig: pbPushConfig, 245 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 246 RetainAckedMessages: cfg.RetainAckedMessages, 247 MessageRetentionDuration: retentionDuration, 248 Labels: cfg.Labels, 249 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 250 } 251} 252 253func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 254 rd := time.Hour * 24 * 7 255 var err error 256 if pbSub.MessageRetentionDuration != nil { 257 rd, err = ptypes.Duration(pbSub.MessageRetentionDuration) 258 if err != nil { 259 return SubscriptionConfig{}, err 260 } 261 } 262 var expirationPolicy time.Duration 263 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 264 expirationPolicy, err = ptypes.Duration(ttl) 265 if err != nil { 266 return SubscriptionConfig{}, err 267 } 268 } 269 subC := SubscriptionConfig{ 270 Topic: newTopic(c, pbSub.Topic), 271 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 272 RetainAckedMessages: pbSub.RetainAckedMessages, 273 RetentionDuration: rd, 274 Labels: pbSub.Labels, 275 ExpirationPolicy: expirationPolicy, 276 } 277 pc := protoToPushConfig(pbSub.PushConfig) 278 if pc != nil { 279 subC.PushConfig = *pc 280 } 281 return subC, nil 282} 283 284func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { 285 if pbPc == nil { 286 return nil 287 } 288 pc := &PushConfig{ 289 Endpoint: pbPc.PushEndpoint, 290 Attributes: pbPc.Attributes, 291 } 292 if am := pbPc.AuthenticationMethod; am != nil { 293 if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil { 294 pc.AuthenticationMethod = &OIDCToken{ 295 Audience: oidcToken.OidcToken.GetAudience(), 296 ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(), 297 } 298 } 299 } 300 return pc 301} 302 303// ReceiveSettings configure the Receive method. 304// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 305type ReceiveSettings struct { 306 // MaxExtension is the maximum period for which the Subscription should 307 // automatically extend the ack deadline for each message. 308 // 309 // The Subscription will automatically extend the ack deadline of all 310 // fetched Messages up to the duration specified. Automatic deadline 311 // extension beyond the initial receipt may be disabled by specifying a 312 // duration less than 0. 313 MaxExtension time.Duration 314 315 // MaxOutstandingMessages is the maximum number of unprocessed messages 316 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 317 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 318 // If the value is negative, then there will be no limit on the number of 319 // unprocessed messages. 320 MaxOutstandingMessages int 321 322 // MaxOutstandingBytes is the maximum size of unprocessed messages 323 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 324 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 325 // the value is negative, then there will be no limit on the number of bytes 326 // for unprocessed messages. 327 MaxOutstandingBytes int 328 329 // NumGoroutines is the number of goroutines Receive will spawn to pull 330 // messages concurrently. If NumGoroutines is less than 1, it will be treated 331 // as if it were DefaultReceiveSettings.NumGoroutines. 332 // 333 // NumGoroutines does not limit the number of messages that can be processed 334 // concurrently. Even with one goroutine, many messages might be processed at 335 // once, because that goroutine may continually receive messages and invoke the 336 // function passed to Receive on them. To limit the number of messages being 337 // processed concurrently, set MaxOutstandingMessages. 338 NumGoroutines int 339 340 // If Synchronous is true, then no more than MaxOutstandingMessages will be in 341 // memory at one time. (In contrast, when Synchronous is false, more than 342 // MaxOutstandingMessages may have been received from the service and in memory 343 // before being processed.) MaxOutstandingBytes still refers to the total bytes 344 // processed, rather than in memory. NumGoroutines is ignored. 345 // The default is false. 346 Synchronous bool 347} 348 349// For synchronous receive, the time to wait if we are already processing 350// MaxOutstandingMessages. There is no point calling Pull and asking for zero 351// messages, so we pause to allow some message-processing callbacks to finish. 352// 353// The wait time is large enough to avoid consuming significant CPU, but 354// small enough to provide decent throughput. Users who want better 355// throughput should not be using synchronous mode. 356// 357// Waiting might seem like polling, so it's natural to think we could do better by 358// noticing when a callback is finished and immediately calling Pull. But if 359// callbacks finish in quick succession, this will result in frequent Pull RPCs that 360// request a single message, which wastes network bandwidth. Better to wait for a few 361// callbacks to finish, so we make fewer RPCs fetching more messages. 362// 363// This value is unexported so the user doesn't have another knob to think about. Note that 364// it is the same value as the one used for nackTicker, so it matches this client's 365// idea of a duration that is short, but not so short that we perform excessive RPCs. 366const synchronousWaitTime = 100 * time.Millisecond 367 368// This is a var so that tests can change it. 369var minAckDeadline = 10 * time.Second 370 371// DefaultReceiveSettings holds the default values for ReceiveSettings. 372var DefaultReceiveSettings = ReceiveSettings{ 373 MaxExtension: 10 * time.Minute, 374 MaxOutstandingMessages: 1000, 375 MaxOutstandingBytes: 1e9, // 1G 376 NumGoroutines: 1, 377} 378 379// Delete deletes the subscription. 380func (s *Subscription) Delete(ctx context.Context) error { 381 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 382} 383 384// Exists reports whether the subscription exists on the server. 385func (s *Subscription) Exists(ctx context.Context) (bool, error) { 386 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 387 if err == nil { 388 return true, nil 389 } 390 if status.Code(err) == codes.NotFound { 391 return false, nil 392 } 393 return false, err 394} 395 396// Config fetches the current configuration for the subscription. 397func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 398 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 399 if err != nil { 400 return SubscriptionConfig{}, err 401 } 402 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 403 if err != nil { 404 return SubscriptionConfig{}, err 405 } 406 return cfg, nil 407} 408 409// SubscriptionConfigToUpdate describes how to update a subscription. 410type SubscriptionConfigToUpdate struct { 411 // If non-nil, the push config is changed. 412 PushConfig *PushConfig 413 414 // If non-zero, the ack deadline is changed. 415 AckDeadline time.Duration 416 417 // If set, RetainAckedMessages is changed. 418 RetainAckedMessages optional.Bool 419 420 // If non-zero, RetentionDuration is changed. 421 RetentionDuration time.Duration 422 423 // If non-zero, Expiration is changed. 424 ExpirationPolicy optional.Duration 425 426 // If non-nil, the current set of labels is completely 427 // replaced by the new set. 428 // This field has beta status. It is not subject to the stability guarantee 429 // and may change. 430 Labels map[string]string 431} 432 433// Update changes an existing subscription according to the fields set in cfg. 434// It returns the new SubscriptionConfig. 435// 436// Update returns an error if no fields were modified. 437func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 438 req := s.updateRequest(&cfg) 439 if err := cfg.validate(); err != nil { 440 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err) 441 } 442 if len(req.UpdateMask.Paths) == 0 { 443 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 444 } 445 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 446 if err != nil { 447 return SubscriptionConfig{}, err 448 } 449 return protoToSubscriptionConfig(rpsub, s.c) 450} 451 452func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 453 psub := &pb.Subscription{Name: s.name} 454 var paths []string 455 if cfg.PushConfig != nil { 456 psub.PushConfig = cfg.PushConfig.toProto() 457 paths = append(paths, "push_config") 458 } 459 if cfg.AckDeadline != 0 { 460 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 461 paths = append(paths, "ack_deadline_seconds") 462 } 463 if cfg.RetainAckedMessages != nil { 464 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 465 paths = append(paths, "retain_acked_messages") 466 } 467 if cfg.RetentionDuration != 0 { 468 psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 469 paths = append(paths, "message_retention_duration") 470 } 471 if cfg.ExpirationPolicy != nil { 472 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 473 paths = append(paths, "expiration_policy") 474 } 475 if cfg.Labels != nil { 476 psub.Labels = cfg.Labels 477 paths = append(paths, "labels") 478 } 479 return &pb.UpdateSubscriptionRequest{ 480 Subscription: psub, 481 UpdateMask: &fmpb.FieldMask{Paths: paths}, 482 } 483} 484 485const ( 486 // The minimum expiration policy duration is 1 day as per: 487 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 488 minExpirationPolicy = 24 * time.Hour 489 490 // If an expiration policy is not specified, the default of 31 days is used as per: 491 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 492 defaultExpirationPolicy = 31 * 24 * time.Hour 493) 494 495func (cfg *SubscriptionConfigToUpdate) validate() error { 496 if cfg == nil || cfg.ExpirationPolicy == nil { 497 return nil 498 } 499 policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy 500 if policy == 0 || policy >= min { 501 return nil 502 } 503 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min) 504} 505 506func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy { 507 if expirationPolicy == nil { 508 return nil 509 } 510 511 dur := optional.ToDuration(expirationPolicy) 512 var ttl *durpb.Duration 513 // As per: 514 // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl 515 // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire. 516 if dur != 0 { 517 ttl = ptypes.DurationProto(dur) 518 } 519 return &pb.ExpirationPolicy{ 520 Ttl: ttl, 521 } 522} 523 524// IAM returns the subscription's IAM handle. 525func (s *Subscription) IAM() *iam.Handle { 526 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 527} 528 529// CreateSubscription creates a new subscription on a topic. 530// 531// id is the name of the subscription to create. It must start with a letter, 532// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 533// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 534// must be between 3 and 255 characters in length, and must not start with 535// "goog". 536// 537// cfg.Topic is the topic from which the subscription should receive messages. It 538// need not belong to the same project as the subscription. This field is required. 539// 540// cfg.AckDeadline is the maximum time after a subscriber receives a message before 541// the subscriber should acknowledge the message. It must be between 10 and 600 542// seconds (inclusive), and is rounded down to the nearest second. If the 543// provided ackDeadline is 0, then the default value of 10 seconds is used. 544// Note: messages which are obtained via Subscription.Receive need not be 545// acknowledged within this deadline, as the deadline will be automatically 546// extended. 547// 548// cfg.PushConfig may be set to configure this subscription for push delivery. 549// 550// If the subscription already exists an error will be returned. 551func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 552 if cfg.Topic == nil { 553 return nil, errors.New("pubsub: require non-nil Topic") 554 } 555 if cfg.AckDeadline == 0 { 556 cfg.AckDeadline = 10 * time.Second 557 } 558 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 559 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 560 } 561 562 sub := c.Subscription(id) 563 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 564 if err != nil { 565 return nil, err 566 } 567 return sub, nil 568} 569 570var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 571 572// Receive calls f with the outstanding messages from the subscription. 573// It blocks until ctx is done, or the service returns a non-retryable error. 574// 575// The standard way to terminate a Receive is to cancel its context: 576// 577// cctx, cancel := context.WithCancel(ctx) 578// err := sub.Receive(cctx, callback) 579// // Call cancel from callback, or another goroutine. 580// 581// If the service returns a non-retryable error, Receive returns that error after 582// all of the outstanding calls to f have returned. If ctx is done, Receive 583// returns nil after all of the outstanding calls to f have returned and 584// all messages have been acknowledged or have expired. 585// 586// Receive calls f concurrently from multiple goroutines. It is encouraged to 587// process messages synchronously in f, even if that processing is relatively 588// time-consuming; Receive will spawn new goroutines for incoming messages, 589// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 590// 591// The context passed to f will be canceled when ctx is Done or there is a 592// fatal service error. 593// 594// Receive will send an ack deadline extension on message receipt, then 595// automatically extend the ack deadline of all fetched Messages up to the 596// period specified by s.ReceiveSettings.MaxExtension. 597// 598// Each Subscription may have only one invocation of Receive active at a time. 599func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 600 s.mu.Lock() 601 if s.receiveActive { 602 s.mu.Unlock() 603 return errReceiveInProgress 604 } 605 s.receiveActive = true 606 s.mu.Unlock() 607 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 608 609 maxCount := s.ReceiveSettings.MaxOutstandingMessages 610 if maxCount == 0 { 611 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 612 } 613 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 614 if maxBytes == 0 { 615 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 616 } 617 maxExt := s.ReceiveSettings.MaxExtension 618 if maxExt == 0 { 619 maxExt = DefaultReceiveSettings.MaxExtension 620 } else if maxExt < 0 { 621 // If MaxExtension is negative, disable automatic extension. 622 maxExt = 0 623 } 624 var numGoroutines int 625 switch { 626 case s.ReceiveSettings.Synchronous: 627 numGoroutines = 1 628 case s.ReceiveSettings.NumGoroutines >= 1: 629 numGoroutines = s.ReceiveSettings.NumGoroutines 630 default: 631 numGoroutines = DefaultReceiveSettings.NumGoroutines 632 } 633 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 634 po := &pullOptions{ 635 maxExtension: maxExt, 636 maxPrefetch: trunc32(int64(maxCount)), 637 synchronous: s.ReceiveSettings.Synchronous, 638 } 639 fc := newFlowController(maxCount, maxBytes) 640 641 // Wait for all goroutines started by Receive to return, so instead of an 642 // obscure goroutine leak we have an obvious blocked call to Receive. 643 group, gctx := errgroup.WithContext(ctx) 644 for i := 0; i < numGoroutines; i++ { 645 group.Go(func() error { 646 return s.receive(gctx, po, fc, f) 647 }) 648 } 649 return group.Wait() 650} 651 652func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error { 653 // Cancel a sub-context when we return, to kick the context-aware callbacks 654 // and the goroutine below. 655 ctx2, cancel := context.WithCancel(ctx) 656 // The iterator does not use the context passed to Receive. If it did, canceling 657 // that context would immediately stop the iterator without waiting for unacked 658 // messages. 659 iter := newMessageIterator(s.c.subc, s.name, po) 660 661 // We cannot use errgroup from Receive here. Receive might already be calling group.Wait, 662 // and group.Wait cannot be called concurrently with group.Go. We give each receive() its 663 // own WaitGroup instead. 664 // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed 665 // to be called after all Adds. 666 var wg sync.WaitGroup 667 wg.Add(1) 668 go func() { 669 <-ctx2.Done() 670 // Call stop when Receive's context is done. 671 // Stop will block until all outstanding messages have been acknowledged 672 // or there was a fatal service error. 673 iter.stop() 674 wg.Done() 675 }() 676 defer wg.Wait() 677 678 defer cancel() 679 for { 680 var maxToPull int32 // maximum number of messages to pull 681 if po.synchronous { 682 if po.maxPrefetch < 0 { 683 // If there is no limit on the number of messages to pull, use a reasonable default. 684 maxToPull = 1000 685 } else { 686 // Limit the number of messages in memory to MaxOutstandingMessages 687 // (here, po.maxPrefetch). For each message currently in memory, we have 688 // called fc.acquire but not fc.release: this is fc.count(). The next 689 // call to Pull should fetch no more than the difference between these 690 // values. 691 maxToPull = po.maxPrefetch - int32(fc.count()) 692 if maxToPull <= 0 { 693 // Wait for some callbacks to finish. 694 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 695 // Return nil if the context is done, not err. 696 return nil 697 } 698 continue 699 } 700 } 701 } 702 msgs, err := iter.receive(maxToPull) 703 if err == io.EOF { 704 return nil 705 } 706 if err != nil { 707 return err 708 } 709 for i, msg := range msgs { 710 msg := msg 711 // TODO(jba): call acquire closer to when the message is allocated. 712 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 713 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 714 for _, m := range msgs[i:] { 715 m.Nack() 716 } 717 // Return nil if the context is done, not err. 718 return nil 719 } 720 old := msg.doneFunc 721 msgLen := len(msg.Data) 722 msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { 723 defer fc.release(msgLen) 724 old(ackID, ack, receiveTime) 725 } 726 wg.Add(1) 727 go func() { 728 defer wg.Done() 729 f(ctx2, msg) 730 }() 731 } 732 } 733} 734 735type pullOptions struct { 736 maxExtension time.Duration 737 maxPrefetch int32 738 // If true, use unary Pull instead of StreamingPull, and never pull more 739 // than maxPrefetch messages. 740 synchronous bool 741} 742