1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 "github.com/golang/protobuf/ptypes" 29 durpb "github.com/golang/protobuf/ptypes/duration" 30 gax "github.com/googleapis/gax-go/v2" 31 "golang.org/x/sync/errgroup" 32 pb "google.golang.org/genproto/googleapis/pubsub/v1" 33 fmpb "google.golang.org/genproto/protobuf/field_mask" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/codes" 36) 37 38// Subscription is a reference to a PubSub subscription. 39type Subscription struct { 40 c *Client 41 42 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 43 name string 44 45 // Settings for pulling messages. Configure these before calling Receive. 46 ReceiveSettings ReceiveSettings 47 48 mu sync.Mutex 49 receiveActive bool 50} 51 52// Subscription creates a reference to a subscription. 53func (c *Client) Subscription(id string) *Subscription { 54 return c.SubscriptionInProject(id, c.projectID) 55} 56 57// SubscriptionInProject creates a reference to a subscription in a given project. 58func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 59 return &Subscription{ 60 c: c, 61 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 62 } 63} 64 65// String returns the globally unique printable name of the subscription. 66func (s *Subscription) String() string { 67 return s.name 68} 69 70// ID returns the unique identifier of the subscription within its project. 71func (s *Subscription) ID() string { 72 slash := strings.LastIndex(s.name, "/") 73 if slash == -1 { 74 // name is not a fully-qualified name. 75 panic("bad subscription name") 76 } 77 return s.name[slash+1:] 78} 79 80// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 81func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 82 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 83 Project: c.fullyQualifiedProjectName(), 84 }) 85 return &SubscriptionIterator{ 86 c: c, 87 next: func() (string, error) { 88 sub, err := it.Next() 89 if err != nil { 90 return "", err 91 } 92 return sub.Name, nil 93 }, 94 } 95} 96 97// SubscriptionIterator is an iterator that returns a series of subscriptions. 98type SubscriptionIterator struct { 99 c *Client 100 next func() (string, error) 101} 102 103// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 104func (subs *SubscriptionIterator) Next() (*Subscription, error) { 105 subName, err := subs.next() 106 if err != nil { 107 return nil, err 108 } 109 return &Subscription{c: subs.c, name: subName}, nil 110} 111 112// PushConfig contains configuration for subscriptions that operate in push mode. 113type PushConfig struct { 114 // A URL locating the endpoint to which messages should be pushed. 115 Endpoint string 116 117 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 118 Attributes map[string]string 119} 120 121func (pc *PushConfig) toProto() *pb.PushConfig { 122 return &pb.PushConfig{ 123 Attributes: pc.Attributes, 124 PushEndpoint: pc.Endpoint, 125 } 126} 127 128// SubscriptionConfig describes the configuration of a subscription. 129type SubscriptionConfig struct { 130 Topic *Topic 131 PushConfig PushConfig 132 133 // The default maximum time after a subscriber receives a message before 134 // the subscriber should acknowledge the message. Note: messages which are 135 // obtained via Subscription.Receive need not be acknowledged within this 136 // deadline, as the deadline will be automatically extended. 137 AckDeadline time.Duration 138 139 // Whether to retain acknowledged messages. If true, acknowledged messages 140 // will not be expunged until they fall out of the RetentionDuration window. 141 RetainAckedMessages bool 142 143 // How long to retain messages in backlog, from the time of publish. If 144 // RetainAckedMessages is true, this duration affects the retention of 145 // acknowledged messages, otherwise only unacknowledged messages are retained. 146 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 147 RetentionDuration time.Duration 148 149 // Expiration policy specifies the conditions for a subscription's expiration. 150 // A subscription is considered active as long as any connected subscriber is 151 // successfully consuming messages from the subscription or is issuing 152 // operations on the subscription. If `expiration_policy` is not set, a 153 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 154 // value for `expiration_policy.ttl` is 1 day. 155 // 156 // It is EXPERIMENTAL and subject to change or removal without notice. 157 ExpirationPolicy time.Duration 158 159 // The set of labels for the subscription. 160 Labels map[string]string 161} 162 163func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 164 var pbPushConfig *pb.PushConfig 165 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 { 166 pbPushConfig = &pb.PushConfig{ 167 Attributes: cfg.PushConfig.Attributes, 168 PushEndpoint: cfg.PushConfig.Endpoint, 169 } 170 } 171 var retentionDuration *durpb.Duration 172 if cfg.RetentionDuration != 0 { 173 retentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 174 } 175 return &pb.Subscription{ 176 Name: name, 177 Topic: cfg.Topic.name, 178 PushConfig: pbPushConfig, 179 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 180 RetainAckedMessages: cfg.RetainAckedMessages, 181 MessageRetentionDuration: retentionDuration, 182 Labels: cfg.Labels, 183 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 184 } 185} 186 187func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 188 rd := time.Hour * 24 * 7 189 var err error 190 if pbSub.MessageRetentionDuration != nil { 191 rd, err = ptypes.Duration(pbSub.MessageRetentionDuration) 192 if err != nil { 193 return SubscriptionConfig{}, err 194 } 195 } 196 var expirationPolicy time.Duration 197 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 198 expirationPolicy, err = ptypes.Duration(ttl) 199 if err != nil { 200 return SubscriptionConfig{}, err 201 } 202 } 203 return SubscriptionConfig{ 204 Topic: newTopic(c, pbSub.Topic), 205 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 206 PushConfig: PushConfig{ 207 Endpoint: pbSub.PushConfig.PushEndpoint, 208 Attributes: pbSub.PushConfig.Attributes, 209 }, 210 RetainAckedMessages: pbSub.RetainAckedMessages, 211 RetentionDuration: rd, 212 Labels: pbSub.Labels, 213 ExpirationPolicy: expirationPolicy, 214 }, nil 215} 216 217// ReceiveSettings configure the Receive method. 218// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 219type ReceiveSettings struct { 220 // MaxExtension is the maximum period for which the Subscription should 221 // automatically extend the ack deadline for each message. 222 // 223 // The Subscription will automatically extend the ack deadline of all 224 // fetched Messages up to the duration specified. Automatic deadline 225 // extension beyond the initial receipt may be disabled by specifying a 226 // duration less than 0. 227 MaxExtension time.Duration 228 229 // MaxOutstandingMessages is the maximum number of unprocessed messages 230 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 231 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 232 // If the value is negative, then there will be no limit on the number of 233 // unprocessed messages. 234 MaxOutstandingMessages int 235 236 // MaxOutstandingBytes is the maximum size of unprocessed messages 237 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 238 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 239 // the value is negative, then there will be no limit on the number of bytes 240 // for unprocessed messages. 241 MaxOutstandingBytes int 242 243 // NumGoroutines is the number of goroutines Receive will spawn to pull 244 // messages concurrently. If NumGoroutines is less than 1, it will be treated 245 // as if it were DefaultReceiveSettings.NumGoroutines. 246 // 247 // NumGoroutines does not limit the number of messages that can be processed 248 // concurrently. Even with one goroutine, many messages might be processed at 249 // once, because that goroutine may continually receive messages and invoke the 250 // function passed to Receive on them. To limit the number of messages being 251 // processed concurrently, set MaxOutstandingMessages. 252 NumGoroutines int 253 254 // If Synchronous is true, then no more than MaxOutstandingMessages will be in 255 // memory at one time. (In contrast, when Synchronous is false, more than 256 // MaxOutstandingMessages may have been received from the service and in memory 257 // before being processed.) MaxOutstandingBytes still refers to the total bytes 258 // processed, rather than in memory. NumGoroutines is ignored. 259 // The default is false. 260 Synchronous bool 261} 262 263// For synchronous receive, the time to wait if we are already processing 264// MaxOutstandingMessages. There is no point calling Pull and asking for zero 265// messages, so we pause to allow some message-processing callbacks to finish. 266// 267// The wait time is large enough to avoid consuming significant CPU, but 268// small enough to provide decent throughput. Users who want better 269// throughput should not be using synchronous mode. 270// 271// Waiting might seem like polling, so it's natural to think we could do better by 272// noticing when a callback is finished and immediately calling Pull. But if 273// callbacks finish in quick succession, this will result in frequent Pull RPCs that 274// request a single message, which wastes network bandwidth. Better to wait for a few 275// callbacks to finish, so we make fewer RPCs fetching more messages. 276// 277// This value is unexported so the user doesn't have another knob to think about. Note that 278// it is the same value as the one used for nackTicker, so it matches this client's 279// idea of a duration that is short, but not so short that we perform excessive RPCs. 280const synchronousWaitTime = 100 * time.Millisecond 281 282// This is a var so that tests can change it. 283var minAckDeadline = 10 * time.Second 284 285// DefaultReceiveSettings holds the default values for ReceiveSettings. 286var DefaultReceiveSettings = ReceiveSettings{ 287 MaxExtension: 10 * time.Minute, 288 MaxOutstandingMessages: 1000, 289 MaxOutstandingBytes: 1e9, // 1G 290 NumGoroutines: 1, 291} 292 293// Delete deletes the subscription. 294func (s *Subscription) Delete(ctx context.Context) error { 295 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 296} 297 298// Exists reports whether the subscription exists on the server. 299func (s *Subscription) Exists(ctx context.Context) (bool, error) { 300 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 301 if err == nil { 302 return true, nil 303 } 304 if grpc.Code(err) == codes.NotFound { 305 return false, nil 306 } 307 return false, err 308} 309 310// Config fetches the current configuration for the subscription. 311func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 312 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 313 if err != nil { 314 return SubscriptionConfig{}, err 315 } 316 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 317 if err != nil { 318 return SubscriptionConfig{}, err 319 } 320 return cfg, nil 321} 322 323// SubscriptionConfigToUpdate describes how to update a subscription. 324type SubscriptionConfigToUpdate struct { 325 // If non-nil, the push config is changed. 326 PushConfig *PushConfig 327 328 // If non-zero, the ack deadline is changed. 329 AckDeadline time.Duration 330 331 // If set, RetainAckedMessages is changed. 332 RetainAckedMessages optional.Bool 333 334 // If non-zero, RetentionDuration is changed. 335 RetentionDuration time.Duration 336 337 // If non-zero, Expiration is changed. 338 ExpirationPolicy time.Duration 339 340 // If non-nil, the current set of labels is completely 341 // replaced by the new set. 342 // This field has beta status. It is not subject to the stability guarantee 343 // and may change. 344 Labels map[string]string 345} 346 347// Update changes an existing subscription according to the fields set in cfg. 348// It returns the new SubscriptionConfig. 349// 350// Update returns an error if no fields were modified. 351func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 352 req := s.updateRequest(&cfg) 353 if err := cfg.validate(); err != nil { 354 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err) 355 } 356 if len(req.UpdateMask.Paths) == 0 { 357 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 358 } 359 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 360 if err != nil { 361 return SubscriptionConfig{}, err 362 } 363 return protoToSubscriptionConfig(rpsub, s.c) 364} 365 366func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 367 psub := &pb.Subscription{Name: s.name} 368 var paths []string 369 if cfg.PushConfig != nil { 370 psub.PushConfig = cfg.PushConfig.toProto() 371 paths = append(paths, "push_config") 372 } 373 if cfg.AckDeadline != 0 { 374 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 375 paths = append(paths, "ack_deadline_seconds") 376 } 377 if cfg.RetainAckedMessages != nil { 378 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 379 paths = append(paths, "retain_acked_messages") 380 } 381 if cfg.RetentionDuration != 0 { 382 psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 383 paths = append(paths, "message_retention_duration") 384 } 385 if cfg.ExpirationPolicy != 0 { 386 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 387 paths = append(paths, "expiration_policy") 388 } 389 if cfg.Labels != nil { 390 psub.Labels = cfg.Labels 391 paths = append(paths, "labels") 392 } 393 return &pb.UpdateSubscriptionRequest{ 394 Subscription: psub, 395 UpdateMask: &fmpb.FieldMask{Paths: paths}, 396 } 397} 398 399const ( 400 // The minimum expiration policy duration is 1 day as per: 401 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 402 minExpirationPolicy = 24 * time.Hour 403 404 // If an expiration policy is not specified, the default of 31 days is used as per: 405 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 406 defaultExpirationPolicy = 31 * 24 * time.Hour 407) 408 409func (cfg *SubscriptionConfigToUpdate) validate() error { 410 if cfg == nil || cfg.ExpirationPolicy == 0 { 411 return nil 412 } 413 if policy, min := cfg.ExpirationPolicy, minExpirationPolicy; policy < min { 414 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min) 415 } 416 return nil 417} 418 419func expirationPolicyToProto(expirationPolicy time.Duration) *pb.ExpirationPolicy { 420 if expirationPolicy == 0 { 421 return nil 422 } 423 return &pb.ExpirationPolicy{ 424 Ttl: ptypes.DurationProto(expirationPolicy), 425 } 426} 427 428// IAM returns the subscription's IAM handle. 429func (s *Subscription) IAM() *iam.Handle { 430 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 431} 432 433// CreateSubscription creates a new subscription on a topic. 434// 435// id is the name of the subscription to create. It must start with a letter, 436// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 437// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 438// must be between 3 and 255 characters in length, and must not start with 439// "goog". 440// 441// cfg.Topic is the topic from which the subscription should receive messages. It 442// need not belong to the same project as the subscription. This field is required. 443// 444// cfg.AckDeadline is the maximum time after a subscriber receives a message before 445// the subscriber should acknowledge the message. It must be between 10 and 600 446// seconds (inclusive), and is rounded down to the nearest second. If the 447// provided ackDeadline is 0, then the default value of 10 seconds is used. 448// Note: messages which are obtained via Subscription.Receive need not be 449// acknowledged within this deadline, as the deadline will be automatically 450// extended. 451// 452// cfg.PushConfig may be set to configure this subscription for push delivery. 453// 454// If the subscription already exists an error will be returned. 455func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 456 if cfg.Topic == nil { 457 return nil, errors.New("pubsub: require non-nil Topic") 458 } 459 if cfg.AckDeadline == 0 { 460 cfg.AckDeadline = 10 * time.Second 461 } 462 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 463 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 464 } 465 466 sub := c.Subscription(id) 467 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 468 if err != nil { 469 return nil, err 470 } 471 return sub, nil 472} 473 474var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 475 476// Receive calls f with the outstanding messages from the subscription. 477// It blocks until ctx is done, or the service returns a non-retryable error. 478// 479// The standard way to terminate a Receive is to cancel its context: 480// 481// cctx, cancel := context.WithCancel(ctx) 482// err := sub.Receive(cctx, callback) 483// // Call cancel from callback, or another goroutine. 484// 485// If the service returns a non-retryable error, Receive returns that error after 486// all of the outstanding calls to f have returned. If ctx is done, Receive 487// returns nil after all of the outstanding calls to f have returned and 488// all messages have been acknowledged or have expired. 489// 490// Receive calls f concurrently from multiple goroutines. It is encouraged to 491// process messages synchronously in f, even if that processing is relatively 492// time-consuming; Receive will spawn new goroutines for incoming messages, 493// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 494// 495// The context passed to f will be canceled when ctx is Done or there is a 496// fatal service error. 497// 498// Receive will send an ack deadline extension on message receipt, then 499// automatically extend the ack deadline of all fetched Messages up to the 500// period specified by s.ReceiveSettings.MaxExtension. 501// 502// Each Subscription may have only one invocation of Receive active at a time. 503func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 504 s.mu.Lock() 505 if s.receiveActive { 506 s.mu.Unlock() 507 return errReceiveInProgress 508 } 509 s.receiveActive = true 510 s.mu.Unlock() 511 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 512 513 maxCount := s.ReceiveSettings.MaxOutstandingMessages 514 if maxCount == 0 { 515 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 516 } 517 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 518 if maxBytes == 0 { 519 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 520 } 521 maxExt := s.ReceiveSettings.MaxExtension 522 if maxExt == 0 { 523 maxExt = DefaultReceiveSettings.MaxExtension 524 } else if maxExt < 0 { 525 // If MaxExtension is negative, disable automatic extension. 526 maxExt = 0 527 } 528 var numGoroutines int 529 switch { 530 case s.ReceiveSettings.Synchronous: 531 numGoroutines = 1 532 case s.ReceiveSettings.NumGoroutines >= 1: 533 numGoroutines = s.ReceiveSettings.NumGoroutines 534 default: 535 numGoroutines = DefaultReceiveSettings.NumGoroutines 536 } 537 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 538 po := &pullOptions{ 539 maxExtension: maxExt, 540 maxPrefetch: trunc32(int64(maxCount)), 541 synchronous: s.ReceiveSettings.Synchronous, 542 } 543 fc := newFlowController(maxCount, maxBytes) 544 545 // Wait for all goroutines started by Receive to return, so instead of an 546 // obscure goroutine leak we have an obvious blocked call to Receive. 547 group, gctx := errgroup.WithContext(ctx) 548 for i := 0; i < numGoroutines; i++ { 549 group.Go(func() error { 550 return s.receive(gctx, po, fc, f) 551 }) 552 } 553 return group.Wait() 554} 555 556func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error { 557 // Cancel a sub-context when we return, to kick the context-aware callbacks 558 // and the goroutine below. 559 ctx2, cancel := context.WithCancel(ctx) 560 // The iterator does not use the context passed to Receive. If it did, canceling 561 // that context would immediately stop the iterator without waiting for unacked 562 // messages. 563 iter := newMessageIterator(s.c.subc, s.name, po) 564 565 // We cannot use errgroup from Receive here. Receive might already be calling group.Wait, 566 // and group.Wait cannot be called concurrently with group.Go. We give each receive() its 567 // own WaitGroup instead. 568 // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed 569 // to be called after all Adds. 570 var wg sync.WaitGroup 571 wg.Add(1) 572 go func() { 573 <-ctx2.Done() 574 // Call stop when Receive's context is done. 575 // Stop will block until all outstanding messages have been acknowledged 576 // or there was a fatal service error. 577 iter.stop() 578 wg.Done() 579 }() 580 defer wg.Wait() 581 582 defer cancel() 583 for { 584 var maxToPull int32 // maximum number of messages to pull 585 if po.synchronous { 586 if po.maxPrefetch < 0 { 587 // If there is no limit on the number of messages to pull, use a reasonable default. 588 maxToPull = 1000 589 } else { 590 // Limit the number of messages in memory to MaxOutstandingMessages 591 // (here, po.maxPrefetch). For each message currently in memory, we have 592 // called fc.acquire but not fc.release: this is fc.count(). The next 593 // call to Pull should fetch no more than the difference between these 594 // values. 595 maxToPull = po.maxPrefetch - int32(fc.count()) 596 if maxToPull <= 0 { 597 // Wait for some callbacks to finish. 598 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 599 // Return nil if the context is done, not err. 600 return nil 601 } 602 continue 603 } 604 } 605 } 606 msgs, err := iter.receive(maxToPull) 607 if err == io.EOF { 608 return nil 609 } 610 if err != nil { 611 return err 612 } 613 for i, msg := range msgs { 614 msg := msg 615 // TODO(jba): call acquire closer to when the message is allocated. 616 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 617 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 618 for _, m := range msgs[i:] { 619 m.Nack() 620 } 621 // Return nil if the context is done, not err. 622 return nil 623 } 624 old := msg.doneFunc 625 msgLen := len(msg.Data) 626 msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { 627 defer fc.release(msgLen) 628 old(ackID, ack, receiveTime) 629 } 630 wg.Add(1) 631 go func() { 632 defer wg.Done() 633 f(ctx2, msg) 634 }() 635 } 636 } 637} 638 639type pullOptions struct { 640 maxExtension time.Duration 641 maxPrefetch int32 642 // If true, use unary Pull instead of StreamingPull, and never pull more 643 // than maxPrefetch messages. 644 synchronous bool 645} 646