1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 "github.com/golang/protobuf/ptypes" 29 durpb "github.com/golang/protobuf/ptypes/duration" 30 gax "github.com/googleapis/gax-go/v2" 31 "golang.org/x/sync/errgroup" 32 pb "google.golang.org/genproto/googleapis/pubsub/v1" 33 fmpb "google.golang.org/genproto/protobuf/field_mask" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/status" 36) 37 38// Subscription is a reference to a PubSub subscription. 39type Subscription struct { 40 c *Client 41 42 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 43 name string 44 45 // Settings for pulling messages. Configure these before calling Receive. 46 ReceiveSettings ReceiveSettings 47 48 mu sync.Mutex 49 receiveActive bool 50} 51 52// Subscription creates a reference to a subscription. 53func (c *Client) Subscription(id string) *Subscription { 54 return c.SubscriptionInProject(id, c.projectID) 55} 56 57// SubscriptionInProject creates a reference to a subscription in a given project. 58func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 59 return &Subscription{ 60 c: c, 61 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 62 } 63} 64 65// String returns the globally unique printable name of the subscription. 66func (s *Subscription) String() string { 67 return s.name 68} 69 70// ID returns the unique identifier of the subscription within its project. 71func (s *Subscription) ID() string { 72 slash := strings.LastIndex(s.name, "/") 73 if slash == -1 { 74 // name is not a fully-qualified name. 75 panic("bad subscription name") 76 } 77 return s.name[slash+1:] 78} 79 80// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 81func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 82 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 83 Project: c.fullyQualifiedProjectName(), 84 }) 85 return &SubscriptionIterator{ 86 c: c, 87 next: func() (string, error) { 88 sub, err := it.Next() 89 if err != nil { 90 return "", err 91 } 92 return sub.Name, nil 93 }, 94 } 95} 96 97// SubscriptionIterator is an iterator that returns a series of subscriptions. 98type SubscriptionIterator struct { 99 c *Client 100 next func() (string, error) 101} 102 103// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 104func (subs *SubscriptionIterator) Next() (*Subscription, error) { 105 subName, err := subs.next() 106 if err != nil { 107 return nil, err 108 } 109 return &Subscription{c: subs.c, name: subName}, nil 110} 111 112// PushConfig contains configuration for subscriptions that operate in push mode. 113type PushConfig struct { 114 // A URL locating the endpoint to which messages should be pushed. 115 Endpoint string 116 117 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 118 Attributes map[string]string 119 120 // AuthenticationMethod is used by push endpoints to verify the source 121 // of push requests. 122 // It can be used with push endpoints that are private by default to 123 // allow requests only from the Cloud Pub/Sub system, for example. 124 // This field is optional and should be set only by users interested in 125 // authenticated push. 126 AuthenticationMethod AuthenticationMethod 127} 128 129func (pc *PushConfig) toProto() *pb.PushConfig { 130 if pc == nil { 131 return nil 132 } 133 pbCfg := &pb.PushConfig{ 134 Attributes: pc.Attributes, 135 PushEndpoint: pc.Endpoint, 136 } 137 if authMethod := pc.AuthenticationMethod; authMethod != nil { 138 switch am := authMethod.(type) { 139 case *OIDCToken: 140 pbCfg.AuthenticationMethod = am.toProto() 141 default: // TODO: add others here when GAIC adds more definitions. 142 } 143 } 144 return pbCfg 145} 146 147// AuthenticationMethod is used by push points to verify the source of push requests. 148// This interface defines fields that are part of a closed alpha that may not be accessible 149// to all users. 150type AuthenticationMethod interface { 151 isAuthMethod() bool 152} 153 154// OIDCToken allows PushConfigs to be authenticated using 155// the OpenID Connect protocol https://openid.net/connect/ 156type OIDCToken struct { 157 // Audience to be used when generating OIDC token. The audience claim 158 // identifies the recipients that the JWT is intended for. The audience 159 // value is a single case-sensitive string. Having multiple values (array) 160 // for the audience field is not supported. More info about the OIDC JWT 161 // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 162 // Note: if not specified, the Push endpoint URL will be used. 163 Audience string 164 165 // The service account email to be used for generating the OpenID Connect token. 166 // The caller of: 167 // * CreateSubscription 168 // * UpdateSubscription 169 // * ModifyPushConfig 170 // calls must have the iam.serviceAccounts.actAs permission for the service account. 171 // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. 172 ServiceAccountEmail string 173} 174 175var _ AuthenticationMethod = (*OIDCToken)(nil) 176 177func (oidcToken *OIDCToken) isAuthMethod() bool { return true } 178 179func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { 180 if oidcToken == nil { 181 return nil 182 } 183 return &pb.PushConfig_OidcToken_{ 184 OidcToken: &pb.PushConfig_OidcToken{ 185 Audience: oidcToken.Audience, 186 ServiceAccountEmail: oidcToken.ServiceAccountEmail, 187 }, 188 } 189} 190 191// SubscriptionConfig describes the configuration of a subscription. 192type SubscriptionConfig struct { 193 Topic *Topic 194 PushConfig PushConfig 195 196 // The default maximum time after a subscriber receives a message before 197 // the subscriber should acknowledge the message. Note: messages which are 198 // obtained via Subscription.Receive need not be acknowledged within this 199 // deadline, as the deadline will be automatically extended. 200 AckDeadline time.Duration 201 202 // Whether to retain acknowledged messages. If true, acknowledged messages 203 // will not be expunged until they fall out of the RetentionDuration window. 204 RetainAckedMessages bool 205 206 // How long to retain messages in backlog, from the time of publish. If 207 // RetainAckedMessages is true, this duration affects the retention of 208 // acknowledged messages, otherwise only unacknowledged messages are retained. 209 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 210 RetentionDuration time.Duration 211 212 // Expiration policy specifies the conditions for a subscription's expiration. 213 // A subscription is considered active as long as any connected subscriber is 214 // successfully consuming messages from the subscription or is issuing 215 // operations on the subscription. If `expiration_policy` is not set, a 216 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 217 // value for `expiration_policy.ttl` is 1 day. 218 // 219 // Use time.Duration(0) to indicate that the subscription should never expire. 220 ExpirationPolicy optional.Duration 221 222 // The set of labels for the subscription. 223 Labels map[string]string 224 225 // DeadLetterPolicy specifies the conditions for dead lettering messages in 226 // a subscription. If not set, dead lettering is disabled. 227 // 228 // It is EXPERIMENTAL and a part of a closed alpha that may not be 229 // accessible to all users. This field is subject to change or removal 230 // without notice. 231 DeadLetterPolicy *DeadLetterPolicy 232} 233 234func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 235 var pbPushConfig *pb.PushConfig 236 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { 237 pbPushConfig = cfg.PushConfig.toProto() 238 } 239 var retentionDuration *durpb.Duration 240 if cfg.RetentionDuration != 0 { 241 retentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 242 } 243 var pbDeadLetter *pb.DeadLetterPolicy 244 if cfg.DeadLetterPolicy != nil { 245 pbDeadLetter = cfg.DeadLetterPolicy.toProto() 246 } 247 return &pb.Subscription{ 248 Name: name, 249 Topic: cfg.Topic.name, 250 PushConfig: pbPushConfig, 251 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 252 RetainAckedMessages: cfg.RetainAckedMessages, 253 MessageRetentionDuration: retentionDuration, 254 Labels: cfg.Labels, 255 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 256 DeadLetterPolicy: pbDeadLetter, 257 } 258} 259 260func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 261 rd := time.Hour * 24 * 7 262 var err error 263 if pbSub.MessageRetentionDuration != nil { 264 rd, err = ptypes.Duration(pbSub.MessageRetentionDuration) 265 if err != nil { 266 return SubscriptionConfig{}, err 267 } 268 } 269 var expirationPolicy time.Duration 270 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 271 expirationPolicy, err = ptypes.Duration(ttl) 272 if err != nil { 273 return SubscriptionConfig{}, err 274 } 275 } 276 dlp := protoToDLP(pbSub.DeadLetterPolicy) 277 subC := SubscriptionConfig{ 278 Topic: newTopic(c, pbSub.Topic), 279 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 280 RetainAckedMessages: pbSub.RetainAckedMessages, 281 RetentionDuration: rd, 282 Labels: pbSub.Labels, 283 ExpirationPolicy: expirationPolicy, 284 DeadLetterPolicy: dlp, 285 } 286 pc := protoToPushConfig(pbSub.PushConfig) 287 if pc != nil { 288 subC.PushConfig = *pc 289 } 290 return subC, nil 291} 292 293func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { 294 if pbPc == nil { 295 return nil 296 } 297 pc := &PushConfig{ 298 Endpoint: pbPc.PushEndpoint, 299 Attributes: pbPc.Attributes, 300 } 301 if am := pbPc.AuthenticationMethod; am != nil { 302 if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil { 303 pc.AuthenticationMethod = &OIDCToken{ 304 Audience: oidcToken.OidcToken.GetAudience(), 305 ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(), 306 } 307 } 308 } 309 return pc 310} 311 312// DeadLetterPolicy specifies the conditions for dead lettering messages in 313// a subscription. 314// 315// It is EXPERIMENTAL and a part of a closed alpha that may not be 316// accessible to all users. 317type DeadLetterPolicy struct { 318 DeadLetterTopic string 319 MaxDeliveryAttempts int 320} 321 322func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy { 323 if dlp == nil { 324 return nil 325 } 326 return &pb.DeadLetterPolicy{ 327 DeadLetterTopic: dlp.DeadLetterTopic, 328 MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts), 329 } 330} 331func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy { 332 if pbDLP == nil { 333 return nil 334 } 335 return &DeadLetterPolicy{ 336 DeadLetterTopic: pbDLP.GetDeadLetterTopic(), 337 MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts), 338 } 339} 340 341// ReceiveSettings configure the Receive method. 342// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 343type ReceiveSettings struct { 344 // MaxExtension is the maximum period for which the Subscription should 345 // automatically extend the ack deadline for each message. 346 // 347 // The Subscription will automatically extend the ack deadline of all 348 // fetched Messages up to the duration specified. Automatic deadline 349 // extension beyond the initial receipt may be disabled by specifying a 350 // duration less than 0. 351 MaxExtension time.Duration 352 353 // MaxExtensionPeriod is the maximum duration by which to extend the ack 354 // deadline at a time. The ack deadline will continue to be extended by up 355 // to this duration until MaxExtension is reached. Setting MaxExtensionPeriod 356 // bounds the maximum amount of time before a message redelivery in the 357 // event the subscriber fails to extend the deadline. 358 // 359 // MaxExtensionPeriod configuration can be disabled by specifying a 360 // duration less than (or equal to) 0. 361 MaxExtensionPeriod time.Duration 362 363 // MaxOutstandingMessages is the maximum number of unprocessed messages 364 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 365 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 366 // If the value is negative, then there will be no limit on the number of 367 // unprocessed messages. 368 MaxOutstandingMessages int 369 370 // MaxOutstandingBytes is the maximum size of unprocessed messages 371 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 372 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 373 // the value is negative, then there will be no limit on the number of bytes 374 // for unprocessed messages. 375 MaxOutstandingBytes int 376 377 // NumGoroutines is the number of goroutines Receive will spawn to pull 378 // messages concurrently. If NumGoroutines is less than 1, it will be treated 379 // as if it were DefaultReceiveSettings.NumGoroutines. 380 // 381 // NumGoroutines does not limit the number of messages that can be processed 382 // concurrently. Even with one goroutine, many messages might be processed at 383 // once, because that goroutine may continually receive messages and invoke the 384 // function passed to Receive on them. To limit the number of messages being 385 // processed concurrently, set MaxOutstandingMessages. 386 NumGoroutines int 387 388 // If Synchronous is true, then no more than MaxOutstandingMessages will be in 389 // memory at one time. (In contrast, when Synchronous is false, more than 390 // MaxOutstandingMessages may have been received from the service and in memory 391 // before being processed.) MaxOutstandingBytes still refers to the total bytes 392 // processed, rather than in memory. NumGoroutines is ignored. 393 // The default is false. 394 Synchronous bool 395} 396 397// For synchronous receive, the time to wait if we are already processing 398// MaxOutstandingMessages. There is no point calling Pull and asking for zero 399// messages, so we pause to allow some message-processing callbacks to finish. 400// 401// The wait time is large enough to avoid consuming significant CPU, but 402// small enough to provide decent throughput. Users who want better 403// throughput should not be using synchronous mode. 404// 405// Waiting might seem like polling, so it's natural to think we could do better by 406// noticing when a callback is finished and immediately calling Pull. But if 407// callbacks finish in quick succession, this will result in frequent Pull RPCs that 408// request a single message, which wastes network bandwidth. Better to wait for a few 409// callbacks to finish, so we make fewer RPCs fetching more messages. 410// 411// This value is unexported so the user doesn't have another knob to think about. Note that 412// it is the same value as the one used for nackTicker, so it matches this client's 413// idea of a duration that is short, but not so short that we perform excessive RPCs. 414const synchronousWaitTime = 100 * time.Millisecond 415 416// This is a var so that tests can change it. 417var minAckDeadline = 10 * time.Second 418 419// DefaultReceiveSettings holds the default values for ReceiveSettings. 420var DefaultReceiveSettings = ReceiveSettings{ 421 MaxExtension: 60 * time.Minute, 422 MaxExtensionPeriod: -1, 423 MaxOutstandingMessages: 1000, 424 MaxOutstandingBytes: 1e9, // 1G 425 NumGoroutines: 1, 426} 427 428// Delete deletes the subscription. 429func (s *Subscription) Delete(ctx context.Context) error { 430 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 431} 432 433// Exists reports whether the subscription exists on the server. 434func (s *Subscription) Exists(ctx context.Context) (bool, error) { 435 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 436 if err == nil { 437 return true, nil 438 } 439 if status.Code(err) == codes.NotFound { 440 return false, nil 441 } 442 return false, err 443} 444 445// Config fetches the current configuration for the subscription. 446func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 447 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 448 if err != nil { 449 return SubscriptionConfig{}, err 450 } 451 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 452 if err != nil { 453 return SubscriptionConfig{}, err 454 } 455 return cfg, nil 456} 457 458// SubscriptionConfigToUpdate describes how to update a subscription. 459type SubscriptionConfigToUpdate struct { 460 // If non-nil, the push config is changed. 461 PushConfig *PushConfig 462 463 // If non-zero, the ack deadline is changed. 464 AckDeadline time.Duration 465 466 // If set, RetainAckedMessages is changed. 467 RetainAckedMessages optional.Bool 468 469 // If non-zero, RetentionDuration is changed. 470 RetentionDuration time.Duration 471 472 // If non-zero, Expiration is changed. 473 ExpirationPolicy optional.Duration 474 475 // If non-nil, DeadLetterPolicy is changed. 476 // 477 // It is EXPERIMENTAL and a part of a closed alpha that may not be 478 // accessible to all users. 479 DeadLetterPolicy *DeadLetterPolicy 480 481 // If non-nil, the current set of labels is completely 482 // replaced by the new set. 483 // This field has beta status. It is not subject to the stability guarantee 484 // and may change. 485 Labels map[string]string 486} 487 488// Update changes an existing subscription according to the fields set in cfg. 489// It returns the new SubscriptionConfig. 490// 491// Update returns an error if no fields were modified. 492func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 493 req := s.updateRequest(&cfg) 494 if err := cfg.validate(); err != nil { 495 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err) 496 } 497 if len(req.UpdateMask.Paths) == 0 { 498 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 499 } 500 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 501 if err != nil { 502 return SubscriptionConfig{}, err 503 } 504 return protoToSubscriptionConfig(rpsub, s.c) 505} 506 507func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 508 psub := &pb.Subscription{Name: s.name} 509 var paths []string 510 if cfg.PushConfig != nil { 511 psub.PushConfig = cfg.PushConfig.toProto() 512 paths = append(paths, "push_config") 513 } 514 if cfg.AckDeadline != 0 { 515 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 516 paths = append(paths, "ack_deadline_seconds") 517 } 518 if cfg.RetainAckedMessages != nil { 519 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 520 paths = append(paths, "retain_acked_messages") 521 } 522 if cfg.RetentionDuration != 0 { 523 psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 524 paths = append(paths, "message_retention_duration") 525 } 526 if cfg.ExpirationPolicy != nil { 527 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 528 paths = append(paths, "expiration_policy") 529 } 530 if cfg.DeadLetterPolicy != nil { 531 psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto() 532 paths = append(paths, "dead_letter_policy") 533 } 534 if cfg.Labels != nil { 535 psub.Labels = cfg.Labels 536 paths = append(paths, "labels") 537 } 538 return &pb.UpdateSubscriptionRequest{ 539 Subscription: psub, 540 UpdateMask: &fmpb.FieldMask{Paths: paths}, 541 } 542} 543 544const ( 545 // The minimum expiration policy duration is 1 day as per: 546 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 547 minExpirationPolicy = 24 * time.Hour 548 549 // If an expiration policy is not specified, the default of 31 days is used as per: 550 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 551 defaultExpirationPolicy = 31 * 24 * time.Hour 552) 553 554func (cfg *SubscriptionConfigToUpdate) validate() error { 555 if cfg == nil || cfg.ExpirationPolicy == nil { 556 return nil 557 } 558 policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy 559 if policy == 0 || policy >= min { 560 return nil 561 } 562 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min) 563} 564 565func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy { 566 if expirationPolicy == nil { 567 return nil 568 } 569 570 dur := optional.ToDuration(expirationPolicy) 571 var ttl *durpb.Duration 572 // As per: 573 // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl 574 // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire. 575 if dur != 0 { 576 ttl = ptypes.DurationProto(dur) 577 } 578 return &pb.ExpirationPolicy{ 579 Ttl: ttl, 580 } 581} 582 583// IAM returns the subscription's IAM handle. 584func (s *Subscription) IAM() *iam.Handle { 585 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 586} 587 588// CreateSubscription creates a new subscription on a topic. 589// 590// id is the name of the subscription to create. It must start with a letter, 591// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 592// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 593// must be between 3 and 255 characters in length, and must not start with 594// "goog". 595// 596// cfg.Topic is the topic from which the subscription should receive messages. It 597// need not belong to the same project as the subscription. This field is required. 598// 599// cfg.AckDeadline is the maximum time after a subscriber receives a message before 600// the subscriber should acknowledge the message. It must be between 10 and 600 601// seconds (inclusive), and is rounded down to the nearest second. If the 602// provided ackDeadline is 0, then the default value of 10 seconds is used. 603// Note: messages which are obtained via Subscription.Receive need not be 604// acknowledged within this deadline, as the deadline will be automatically 605// extended. 606// 607// cfg.PushConfig may be set to configure this subscription for push delivery. 608// 609// If the subscription already exists an error will be returned. 610func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 611 if cfg.Topic == nil { 612 return nil, errors.New("pubsub: require non-nil Topic") 613 } 614 if cfg.AckDeadline == 0 { 615 cfg.AckDeadline = 10 * time.Second 616 } 617 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 618 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 619 } 620 621 sub := c.Subscription(id) 622 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 623 if err != nil { 624 return nil, err 625 } 626 return sub, nil 627} 628 629var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 630 631// Receive calls f with the outstanding messages from the subscription. 632// It blocks until ctx is done, or the service returns a non-retryable error. 633// 634// The standard way to terminate a Receive is to cancel its context: 635// 636// cctx, cancel := context.WithCancel(ctx) 637// err := sub.Receive(cctx, callback) 638// // Call cancel from callback, or another goroutine. 639// 640// If the service returns a non-retryable error, Receive returns that error after 641// all of the outstanding calls to f have returned. If ctx is done, Receive 642// returns nil after all of the outstanding calls to f have returned and 643// all messages have been acknowledged or have expired. 644// 645// Receive calls f concurrently from multiple goroutines. It is encouraged to 646// process messages synchronously in f, even if that processing is relatively 647// time-consuming; Receive will spawn new goroutines for incoming messages, 648// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 649// 650// The context passed to f will be canceled when ctx is Done or there is a 651// fatal service error. 652// 653// Receive will send an ack deadline extension on message receipt, then 654// automatically extend the ack deadline of all fetched Messages up to the 655// period specified by s.ReceiveSettings.MaxExtension. 656// 657// Each Subscription may have only one invocation of Receive active at a time. 658func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 659 s.mu.Lock() 660 if s.receiveActive { 661 s.mu.Unlock() 662 return errReceiveInProgress 663 } 664 s.receiveActive = true 665 s.mu.Unlock() 666 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 667 668 maxCount := s.ReceiveSettings.MaxOutstandingMessages 669 if maxCount == 0 { 670 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 671 } 672 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 673 if maxBytes == 0 { 674 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 675 } 676 maxExt := s.ReceiveSettings.MaxExtension 677 if maxExt == 0 { 678 maxExt = DefaultReceiveSettings.MaxExtension 679 } else if maxExt < 0 { 680 // If MaxExtension is negative, disable automatic extension. 681 maxExt = 0 682 } 683 var numGoroutines int 684 switch { 685 case s.ReceiveSettings.Synchronous: 686 numGoroutines = 1 687 case s.ReceiveSettings.NumGoroutines >= 1: 688 numGoroutines = s.ReceiveSettings.NumGoroutines 689 default: 690 numGoroutines = DefaultReceiveSettings.NumGoroutines 691 } 692 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 693 po := &pullOptions{ 694 maxExtension: maxExt, 695 maxPrefetch: trunc32(int64(maxCount)), 696 synchronous: s.ReceiveSettings.Synchronous, 697 } 698 fc := newFlowController(maxCount, maxBytes) 699 700 // Wait for all goroutines started by Receive to return, so instead of an 701 // obscure goroutine leak we have an obvious blocked call to Receive. 702 group, gctx := errgroup.WithContext(ctx) 703 for i := 0; i < numGoroutines; i++ { 704 group.Go(func() error { 705 return s.receive(gctx, po, fc, f) 706 }) 707 } 708 return group.Wait() 709} 710 711func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error { 712 // Cancel a sub-context when we return, to kick the context-aware callbacks 713 // and the goroutine below. 714 ctx2, cancel := context.WithCancel(ctx) 715 // The iterator does not use the context passed to Receive. If it did, canceling 716 // that context would immediately stop the iterator without waiting for unacked 717 // messages. 718 iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtensionPeriod, po) 719 720 // We cannot use errgroup from Receive here. Receive might already be calling group.Wait, 721 // and group.Wait cannot be called concurrently with group.Go. We give each receive() its 722 // own WaitGroup instead. 723 // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed 724 // to be called after all Adds. 725 var wg sync.WaitGroup 726 wg.Add(1) 727 go func() { 728 <-ctx2.Done() 729 // Call stop when Receive's context is done. 730 // Stop will block until all outstanding messages have been acknowledged 731 // or there was a fatal service error. 732 iter.stop() 733 wg.Done() 734 }() 735 defer wg.Wait() 736 737 defer cancel() 738 for { 739 var maxToPull int32 // maximum number of messages to pull 740 if po.synchronous { 741 if po.maxPrefetch < 0 { 742 // If there is no limit on the number of messages to pull, use a reasonable default. 743 maxToPull = 1000 744 } else { 745 // Limit the number of messages in memory to MaxOutstandingMessages 746 // (here, po.maxPrefetch). For each message currently in memory, we have 747 // called fc.acquire but not fc.release: this is fc.count(). The next 748 // call to Pull should fetch no more than the difference between these 749 // values. 750 maxToPull = po.maxPrefetch - int32(fc.count()) 751 if maxToPull <= 0 { 752 // Wait for some callbacks to finish. 753 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 754 // Return nil if the context is done, not err. 755 return nil 756 } 757 continue 758 } 759 } 760 } 761 msgs, err := iter.receive(maxToPull) 762 if err == io.EOF { 763 return nil 764 } 765 if err != nil { 766 return err 767 } 768 for i, msg := range msgs { 769 msg := msg 770 // TODO(jba): call acquire closer to when the message is allocated. 771 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 772 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 773 for _, m := range msgs[i:] { 774 m.Nack() 775 } 776 // Return nil if the context is done, not err. 777 return nil 778 } 779 old := msg.doneFunc 780 msgLen := len(msg.Data) 781 msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { 782 defer fc.release(msgLen) 783 old(ackID, ack, receiveTime) 784 } 785 wg.Add(1) 786 go func() { 787 defer wg.Done() 788 f(ctx2, msg) 789 }() 790 } 791 } 792} 793 794type pullOptions struct { 795 maxExtension time.Duration 796 maxPrefetch int32 797 // If true, use unary Pull instead of StreamingPull, and never pull more 798 // than maxPrefetch messages. 799 synchronous bool 800} 801