1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 "cloud.google.com/go/pubsub/internal/scheduler" 29 "github.com/golang/protobuf/ptypes" 30 durpb "github.com/golang/protobuf/ptypes/duration" 31 gax "github.com/googleapis/gax-go/v2" 32 "golang.org/x/sync/errgroup" 33 pb "google.golang.org/genproto/googleapis/pubsub/v1" 34 fmpb "google.golang.org/genproto/protobuf/field_mask" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/status" 37) 38 39// Subscription is a reference to a PubSub subscription. 40type Subscription struct { 41 c *Client 42 43 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 44 name string 45 46 // Settings for pulling messages. Configure these before calling Receive. 47 ReceiveSettings ReceiveSettings 48 49 mu sync.Mutex 50 receiveActive bool 51} 52 53// Subscription creates a reference to a subscription. 54func (c *Client) Subscription(id string) *Subscription { 55 return c.SubscriptionInProject(id, c.projectID) 56} 57 58// SubscriptionInProject creates a reference to a subscription in a given project. 59func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 60 return &Subscription{ 61 c: c, 62 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 63 } 64} 65 66// String returns the globally unique printable name of the subscription. 67func (s *Subscription) String() string { 68 return s.name 69} 70 71// ID returns the unique identifier of the subscription within its project. 72func (s *Subscription) ID() string { 73 slash := strings.LastIndex(s.name, "/") 74 if slash == -1 { 75 // name is not a fully-qualified name. 76 panic("bad subscription name") 77 } 78 return s.name[slash+1:] 79} 80 81// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 82func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 83 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 84 Project: c.fullyQualifiedProjectName(), 85 }) 86 return &SubscriptionIterator{ 87 c: c, 88 next: func() (string, error) { 89 sub, err := it.Next() 90 if err != nil { 91 return "", err 92 } 93 return sub.Name, nil 94 }, 95 } 96} 97 98// SubscriptionIterator is an iterator that returns a series of subscriptions. 99type SubscriptionIterator struct { 100 c *Client 101 next func() (string, error) 102} 103 104// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 105func (subs *SubscriptionIterator) Next() (*Subscription, error) { 106 subName, err := subs.next() 107 if err != nil { 108 return nil, err 109 } 110 return &Subscription{c: subs.c, name: subName}, nil 111} 112 113// PushConfig contains configuration for subscriptions that operate in push mode. 114type PushConfig struct { 115 // A URL locating the endpoint to which messages should be pushed. 116 Endpoint string 117 118 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 119 Attributes map[string]string 120 121 // AuthenticationMethod is used by push endpoints to verify the source 122 // of push requests. 123 // It can be used with push endpoints that are private by default to 124 // allow requests only from the Cloud Pub/Sub system, for example. 125 // This field is optional and should be set only by users interested in 126 // authenticated push. 127 AuthenticationMethod AuthenticationMethod 128} 129 130func (pc *PushConfig) toProto() *pb.PushConfig { 131 if pc == nil { 132 return nil 133 } 134 pbCfg := &pb.PushConfig{ 135 Attributes: pc.Attributes, 136 PushEndpoint: pc.Endpoint, 137 } 138 if authMethod := pc.AuthenticationMethod; authMethod != nil { 139 switch am := authMethod.(type) { 140 case *OIDCToken: 141 pbCfg.AuthenticationMethod = am.toProto() 142 default: // TODO: add others here when GAIC adds more definitions. 143 } 144 } 145 return pbCfg 146} 147 148// AuthenticationMethod is used by push points to verify the source of push requests. 149// This interface defines fields that are part of a closed alpha that may not be accessible 150// to all users. 151type AuthenticationMethod interface { 152 isAuthMethod() bool 153} 154 155// OIDCToken allows PushConfigs to be authenticated using 156// the OpenID Connect protocol https://openid.net/connect/ 157type OIDCToken struct { 158 // Audience to be used when generating OIDC token. The audience claim 159 // identifies the recipients that the JWT is intended for. The audience 160 // value is a single case-sensitive string. Having multiple values (array) 161 // for the audience field is not supported. More info about the OIDC JWT 162 // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 163 // Note: if not specified, the Push endpoint URL will be used. 164 Audience string 165 166 // The service account email to be used for generating the OpenID Connect token. 167 // The caller of: 168 // * CreateSubscription 169 // * UpdateSubscription 170 // * ModifyPushConfig 171 // calls must have the iam.serviceAccounts.actAs permission for the service account. 172 // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. 173 ServiceAccountEmail string 174} 175 176var _ AuthenticationMethod = (*OIDCToken)(nil) 177 178func (oidcToken *OIDCToken) isAuthMethod() bool { return true } 179 180func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { 181 if oidcToken == nil { 182 return nil 183 } 184 return &pb.PushConfig_OidcToken_{ 185 OidcToken: &pb.PushConfig_OidcToken{ 186 Audience: oidcToken.Audience, 187 ServiceAccountEmail: oidcToken.ServiceAccountEmail, 188 }, 189 } 190} 191 192// SubscriptionConfig describes the configuration of a subscription. 193type SubscriptionConfig struct { 194 Topic *Topic 195 PushConfig PushConfig 196 197 // The default maximum time after a subscriber receives a message before 198 // the subscriber should acknowledge the message. Note: messages which are 199 // obtained via Subscription.Receive need not be acknowledged within this 200 // deadline, as the deadline will be automatically extended. 201 AckDeadline time.Duration 202 203 // Whether to retain acknowledged messages. If true, acknowledged messages 204 // will not be expunged until they fall out of the RetentionDuration window. 205 RetainAckedMessages bool 206 207 // How long to retain messages in backlog, from the time of publish. If 208 // RetainAckedMessages is true, this duration affects the retention of 209 // acknowledged messages, otherwise only unacknowledged messages are retained. 210 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 211 RetentionDuration time.Duration 212 213 // Expiration policy specifies the conditions for a subscription's expiration. 214 // A subscription is considered active as long as any connected subscriber is 215 // successfully consuming messages from the subscription or is issuing 216 // operations on the subscription. If `expiration_policy` is not set, a 217 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 218 // value for `expiration_policy.ttl` is 1 day. 219 // 220 // Use time.Duration(0) to indicate that the subscription should never expire. 221 ExpirationPolicy optional.Duration 222 223 // The set of labels for the subscription. 224 Labels map[string]string 225 226 // EnableMessageOrdering enables message ordering. 227 // 228 // It is EXPERIMENTAL and a part of a closed alpha that may not be 229 // accessible to all users. This field is subject to change or removal 230 // without notice. 231 EnableMessageOrdering bool 232 233 // DeadLetterPolicy specifies the conditions for dead lettering messages in 234 // a subscription. If not set, dead lettering is disabled. 235 // 236 // It is EXPERIMENTAL and a part of a closed alpha that may not be 237 // accessible to all users. This field is subject to change or removal 238 // without notice. 239 DeadLetterPolicy *DeadLetterPolicy 240} 241 242func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 243 var pbPushConfig *pb.PushConfig 244 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { 245 pbPushConfig = cfg.PushConfig.toProto() 246 } 247 var retentionDuration *durpb.Duration 248 if cfg.RetentionDuration != 0 { 249 retentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 250 } 251 var pbDeadLetter *pb.DeadLetterPolicy 252 if cfg.DeadLetterPolicy != nil { 253 pbDeadLetter = cfg.DeadLetterPolicy.toProto() 254 } 255 return &pb.Subscription{ 256 Name: name, 257 Topic: cfg.Topic.name, 258 PushConfig: pbPushConfig, 259 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 260 RetainAckedMessages: cfg.RetainAckedMessages, 261 MessageRetentionDuration: retentionDuration, 262 Labels: cfg.Labels, 263 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 264 EnableMessageOrdering: cfg.EnableMessageOrdering, 265 DeadLetterPolicy: pbDeadLetter, 266 } 267} 268 269func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 270 rd := time.Hour * 24 * 7 271 var err error 272 if pbSub.MessageRetentionDuration != nil { 273 rd, err = ptypes.Duration(pbSub.MessageRetentionDuration) 274 if err != nil { 275 return SubscriptionConfig{}, err 276 } 277 } 278 var expirationPolicy time.Duration 279 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 280 expirationPolicy, err = ptypes.Duration(ttl) 281 if err != nil { 282 return SubscriptionConfig{}, err 283 } 284 } 285 dlp := protoToDLP(pbSub.DeadLetterPolicy) 286 subC := SubscriptionConfig{ 287 Topic: newTopic(c, pbSub.Topic), 288 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 289 RetainAckedMessages: pbSub.RetainAckedMessages, 290 RetentionDuration: rd, 291 Labels: pbSub.Labels, 292 ExpirationPolicy: expirationPolicy, 293 DeadLetterPolicy: dlp, 294 } 295 pc := protoToPushConfig(pbSub.PushConfig) 296 if pc != nil { 297 subC.PushConfig = *pc 298 } 299 return subC, nil 300} 301 302func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { 303 if pbPc == nil { 304 return nil 305 } 306 pc := &PushConfig{ 307 Endpoint: pbPc.PushEndpoint, 308 Attributes: pbPc.Attributes, 309 } 310 if am := pbPc.AuthenticationMethod; am != nil { 311 if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil { 312 pc.AuthenticationMethod = &OIDCToken{ 313 Audience: oidcToken.OidcToken.GetAudience(), 314 ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(), 315 } 316 } 317 } 318 return pc 319} 320 321// DeadLetterPolicy specifies the conditions for dead lettering messages in 322// a subscription. 323// 324// It is EXPERIMENTAL and a part of a closed alpha that may not be 325// accessible to all users. 326type DeadLetterPolicy struct { 327 DeadLetterTopic string 328 MaxDeliveryAttempts int 329} 330 331func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy { 332 if dlp == nil || dlp.DeadLetterTopic == "" { 333 return nil 334 } 335 return &pb.DeadLetterPolicy{ 336 DeadLetterTopic: dlp.DeadLetterTopic, 337 MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts), 338 } 339} 340func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy { 341 if pbDLP == nil { 342 return nil 343 } 344 return &DeadLetterPolicy{ 345 DeadLetterTopic: pbDLP.GetDeadLetterTopic(), 346 MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts), 347 } 348} 349 350// ReceiveSettings configure the Receive method. 351// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 352type ReceiveSettings struct { 353 // MaxExtension is the maximum period for which the Subscription should 354 // automatically extend the ack deadline for each message. 355 // 356 // The Subscription will automatically extend the ack deadline of all 357 // fetched Messages up to the duration specified. Automatic deadline 358 // extension beyond the initial receipt may be disabled by specifying a 359 // duration less than 0. 360 MaxExtension time.Duration 361 362 // MaxExtensionPeriod is the maximum duration by which to extend the ack 363 // deadline at a time. The ack deadline will continue to be extended by up 364 // to this duration until MaxExtension is reached. Setting MaxExtensionPeriod 365 // bounds the maximum amount of time before a message redelivery in the 366 // event the subscriber fails to extend the deadline. 367 // 368 // MaxExtensionPeriod configuration can be disabled by specifying a 369 // duration less than (or equal to) 0. 370 MaxExtensionPeriod time.Duration 371 372 // MaxOutstandingMessages is the maximum number of unprocessed messages 373 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 374 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 375 // If the value is negative, then there will be no limit on the number of 376 // unprocessed messages. 377 MaxOutstandingMessages int 378 379 // MaxOutstandingBytes is the maximum size of unprocessed messages 380 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 381 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 382 // the value is negative, then there will be no limit on the number of bytes 383 // for unprocessed messages. 384 MaxOutstandingBytes int 385 386 // NumGoroutines is the number of goroutines that each datastructure along 387 // the Receive path will spawn. Adjusting this value adjusts concurrency 388 // along the receive path. 389 // 390 // NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines. 391 // 392 // NumGoroutines does not limit the number of messages that can be processed 393 // concurrently. Even with one goroutine, many messages might be processed at 394 // once, because that goroutine may continually receive messages and invoke the 395 // function passed to Receive on them. To limit the number of messages being 396 // processed concurrently, set MaxOutstandingMessages. 397 NumGoroutines int 398 399 // If Synchronous is true, then no more than MaxOutstandingMessages will be in 400 // memory at one time. (In contrast, when Synchronous is false, more than 401 // MaxOutstandingMessages may have been received from the service and in memory 402 // before being processed.) MaxOutstandingBytes still refers to the total bytes 403 // processed, rather than in memory. NumGoroutines is ignored. 404 // The default is false. 405 Synchronous bool 406} 407 408// For synchronous receive, the time to wait if we are already processing 409// MaxOutstandingMessages. There is no point calling Pull and asking for zero 410// messages, so we pause to allow some message-processing callbacks to finish. 411// 412// The wait time is large enough to avoid consuming significant CPU, but 413// small enough to provide decent throughput. Users who want better 414// throughput should not be using synchronous mode. 415// 416// Waiting might seem like polling, so it's natural to think we could do better by 417// noticing when a callback is finished and immediately calling Pull. But if 418// callbacks finish in quick succession, this will result in frequent Pull RPCs that 419// request a single message, which wastes network bandwidth. Better to wait for a few 420// callbacks to finish, so we make fewer RPCs fetching more messages. 421// 422// This value is unexported so the user doesn't have another knob to think about. Note that 423// it is the same value as the one used for nackTicker, so it matches this client's 424// idea of a duration that is short, but not so short that we perform excessive RPCs. 425const synchronousWaitTime = 100 * time.Millisecond 426 427// This is a var so that tests can change it. 428var minAckDeadline = 10 * time.Second 429 430// DefaultReceiveSettings holds the default values for ReceiveSettings. 431var DefaultReceiveSettings = ReceiveSettings{ 432 MaxExtension: 60 * time.Minute, 433 MaxExtensionPeriod: 0, 434 MaxOutstandingMessages: 1000, 435 MaxOutstandingBytes: 1e9, // 1G 436 NumGoroutines: 10, 437} 438 439// Delete deletes the subscription. 440func (s *Subscription) Delete(ctx context.Context) error { 441 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 442} 443 444// Exists reports whether the subscription exists on the server. 445func (s *Subscription) Exists(ctx context.Context) (bool, error) { 446 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 447 if err == nil { 448 return true, nil 449 } 450 if status.Code(err) == codes.NotFound { 451 return false, nil 452 } 453 return false, err 454} 455 456// Config fetches the current configuration for the subscription. 457func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 458 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 459 if err != nil { 460 return SubscriptionConfig{}, err 461 } 462 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 463 if err != nil { 464 return SubscriptionConfig{}, err 465 } 466 return cfg, nil 467} 468 469// SubscriptionConfigToUpdate describes how to update a subscription. 470type SubscriptionConfigToUpdate struct { 471 // If non-nil, the push config is changed. 472 PushConfig *PushConfig 473 474 // If non-zero, the ack deadline is changed. 475 AckDeadline time.Duration 476 477 // If set, RetainAckedMessages is changed. 478 RetainAckedMessages optional.Bool 479 480 // If non-zero, RetentionDuration is changed. 481 RetentionDuration time.Duration 482 483 // If non-zero, Expiration is changed. 484 ExpirationPolicy optional.Duration 485 486 // If non-nil, DeadLetterPolicy is changed. To remove dead lettering from 487 // a subscription, use the zero value for this struct. 488 // 489 // It is EXPERIMENTAL and a part of a closed alpha that may not be 490 // accessible to all users. 491 DeadLetterPolicy *DeadLetterPolicy 492 493 // If non-nil, the current set of labels is completely 494 // replaced by the new set. 495 // This field has beta status. It is not subject to the stability guarantee 496 // and may change. 497 Labels map[string]string 498} 499 500// Update changes an existing subscription according to the fields set in cfg. 501// It returns the new SubscriptionConfig. 502// 503// Update returns an error if no fields were modified. 504func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 505 req := s.updateRequest(&cfg) 506 if err := cfg.validate(); err != nil { 507 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err) 508 } 509 if len(req.UpdateMask.Paths) == 0 { 510 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 511 } 512 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 513 if err != nil { 514 return SubscriptionConfig{}, err 515 } 516 return protoToSubscriptionConfig(rpsub, s.c) 517} 518 519func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 520 psub := &pb.Subscription{Name: s.name} 521 var paths []string 522 if cfg.PushConfig != nil { 523 psub.PushConfig = cfg.PushConfig.toProto() 524 paths = append(paths, "push_config") 525 } 526 if cfg.AckDeadline != 0 { 527 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 528 paths = append(paths, "ack_deadline_seconds") 529 } 530 if cfg.RetainAckedMessages != nil { 531 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 532 paths = append(paths, "retain_acked_messages") 533 } 534 if cfg.RetentionDuration != 0 { 535 psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 536 paths = append(paths, "message_retention_duration") 537 } 538 if cfg.ExpirationPolicy != nil { 539 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 540 paths = append(paths, "expiration_policy") 541 } 542 if cfg.DeadLetterPolicy != nil { 543 psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto() 544 paths = append(paths, "dead_letter_policy") 545 } 546 if cfg.Labels != nil { 547 psub.Labels = cfg.Labels 548 paths = append(paths, "labels") 549 } 550 return &pb.UpdateSubscriptionRequest{ 551 Subscription: psub, 552 UpdateMask: &fmpb.FieldMask{Paths: paths}, 553 } 554} 555 556const ( 557 // The minimum expiration policy duration is 1 day as per: 558 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 559 minExpirationPolicy = 24 * time.Hour 560 561 // If an expiration policy is not specified, the default of 31 days is used as per: 562 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 563 defaultExpirationPolicy = 31 * 24 * time.Hour 564) 565 566func (cfg *SubscriptionConfigToUpdate) validate() error { 567 if cfg == nil || cfg.ExpirationPolicy == nil { 568 return nil 569 } 570 policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy 571 if policy == 0 || policy >= min { 572 return nil 573 } 574 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min) 575} 576 577func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy { 578 if expirationPolicy == nil { 579 return nil 580 } 581 582 dur := optional.ToDuration(expirationPolicy) 583 var ttl *durpb.Duration 584 // As per: 585 // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl 586 // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire. 587 if dur != 0 { 588 ttl = ptypes.DurationProto(dur) 589 } 590 return &pb.ExpirationPolicy{ 591 Ttl: ttl, 592 } 593} 594 595// IAM returns the subscription's IAM handle. 596func (s *Subscription) IAM() *iam.Handle { 597 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 598} 599 600// CreateSubscription creates a new subscription on a topic. 601// 602// id is the name of the subscription to create. It must start with a letter, 603// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 604// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 605// must be between 3 and 255 characters in length, and must not start with 606// "goog". 607// 608// cfg.Topic is the topic from which the subscription should receive messages. It 609// need not belong to the same project as the subscription. This field is required. 610// 611// cfg.AckDeadline is the maximum time after a subscriber receives a message before 612// the subscriber should acknowledge the message. It must be between 10 and 600 613// seconds (inclusive), and is rounded down to the nearest second. If the 614// provided ackDeadline is 0, then the default value of 10 seconds is used. 615// Note: messages which are obtained via Subscription.Receive need not be 616// acknowledged within this deadline, as the deadline will be automatically 617// extended. 618// 619// cfg.PushConfig may be set to configure this subscription for push delivery. 620// 621// If the subscription already exists an error will be returned. 622func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 623 if cfg.Topic == nil { 624 return nil, errors.New("pubsub: require non-nil Topic") 625 } 626 if cfg.AckDeadline == 0 { 627 cfg.AckDeadline = 10 * time.Second 628 } 629 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 630 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 631 } 632 633 sub := c.Subscription(id) 634 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 635 if err != nil { 636 return nil, err 637 } 638 return sub, nil 639} 640 641var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 642 643// Receive calls f with the outstanding messages from the subscription. 644// It blocks until ctx is done, or the service returns a non-retryable error. 645// 646// The standard way to terminate a Receive is to cancel its context: 647// 648// cctx, cancel := context.WithCancel(ctx) 649// err := sub.Receive(cctx, callback) 650// // Call cancel from callback, or another goroutine. 651// 652// If the service returns a non-retryable error, Receive returns that error after 653// all of the outstanding calls to f have returned. If ctx is done, Receive 654// returns nil after all of the outstanding calls to f have returned and 655// all messages have been acknowledged or have expired. 656// 657// Receive calls f concurrently from multiple goroutines. It is encouraged to 658// process messages synchronously in f, even if that processing is relatively 659// time-consuming; Receive will spawn new goroutines for incoming messages, 660// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 661// 662// The context passed to f will be canceled when ctx is Done or there is a 663// fatal service error. 664// 665// Receive will send an ack deadline extension on message receipt, then 666// automatically extend the ack deadline of all fetched Messages up to the 667// period specified by s.ReceiveSettings.MaxExtension. 668// 669// Each Subscription may have only one invocation of Receive active at a time. 670func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 671 s.mu.Lock() 672 if s.receiveActive { 673 s.mu.Unlock() 674 return errReceiveInProgress 675 } 676 s.receiveActive = true 677 s.mu.Unlock() 678 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 679 680 maxCount := s.ReceiveSettings.MaxOutstandingMessages 681 if maxCount == 0 { 682 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 683 } 684 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 685 if maxBytes == 0 { 686 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 687 } 688 maxExt := s.ReceiveSettings.MaxExtension 689 if maxExt == 0 { 690 maxExt = DefaultReceiveSettings.MaxExtension 691 } else if maxExt < 0 { 692 // If MaxExtension is negative, disable automatic extension. 693 maxExt = 0 694 } 695 var numGoroutines int 696 switch { 697 case s.ReceiveSettings.Synchronous: 698 numGoroutines = 1 699 case s.ReceiveSettings.NumGoroutines >= 1: 700 numGoroutines = s.ReceiveSettings.NumGoroutines 701 default: 702 numGoroutines = DefaultReceiveSettings.NumGoroutines 703 } 704 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 705 po := &pullOptions{ 706 maxExtension: maxExt, 707 maxPrefetch: trunc32(int64(maxCount)), 708 synchronous: s.ReceiveSettings.Synchronous, 709 } 710 fc := newFlowController(maxCount, maxBytes) 711 712 sched := scheduler.NewReceiveScheduler(maxCount) 713 714 // Wait for all goroutines started by Receive to return, so instead of an 715 // obscure goroutine leak we have an obvious blocked call to Receive. 716 group, gctx := errgroup.WithContext(ctx) 717 718 type closeablePair struct { 719 wg *sync.WaitGroup 720 iter *messageIterator 721 } 722 723 var pairs []closeablePair 724 725 // Cancel a sub-context which, when we finish a single receiver, will kick 726 // off the context-aware callbacks and the goroutine below (which stops 727 // all receivers, iterators, and the scheduler). 728 ctx2, cancel2 := context.WithCancel(gctx) 729 defer cancel2() 730 731 for i := 0; i < numGoroutines; i++ { 732 // The iterator does not use the context passed to Receive. If it did, 733 // canceling that context would immediately stop the iterator without 734 // waiting for unacked messages. 735 iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po) 736 737 // We cannot use errgroup from Receive here. Receive might already be 738 // calling group.Wait, and group.Wait cannot be called concurrently with 739 // group.Go. We give each receive() its own WaitGroup instead. 740 // 741 // Since wg.Add is only called from the main goroutine, wg.Wait is 742 // guaranteed to be called after all Adds. 743 var wg sync.WaitGroup 744 wg.Add(1) 745 pairs = append(pairs, closeablePair{wg: &wg, iter: iter}) 746 747 group.Go(func() error { 748 defer wg.Wait() 749 defer cancel2() 750 for { 751 var maxToPull int32 // maximum number of messages to pull 752 if po.synchronous { 753 if po.maxPrefetch < 0 { 754 // If there is no limit on the number of messages to 755 // pull, use a reasonable default. 756 maxToPull = 1000 757 } else { 758 // Limit the number of messages in memory to MaxOutstandingMessages 759 // (here, po.maxPrefetch). For each message currently in memory, we have 760 // called fc.acquire but not fc.release: this is fc.count(). The next 761 // call to Pull should fetch no more than the difference between these 762 // values. 763 maxToPull = po.maxPrefetch - int32(fc.count()) 764 if maxToPull <= 0 { 765 // Wait for some callbacks to finish. 766 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 767 // Return nil if the context is done, not err. 768 return nil 769 } 770 continue 771 } 772 } 773 } 774 msgs, err := iter.receive(maxToPull) 775 if err == io.EOF { 776 return nil 777 } 778 if err != nil { 779 return err 780 } 781 for i, msg := range msgs { 782 msg := msg 783 // TODO(jba): call acquire closer to when the message is allocated. 784 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 785 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 786 for _, m := range msgs[i:] { 787 m.Nack() 788 } 789 // Return nil if the context is done, not err. 790 return nil 791 } 792 old := msg.doneFunc 793 msgLen := len(msg.Data) 794 msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { 795 defer fc.release(msgLen) 796 old(ackID, ack, receiveTime) 797 } 798 wg.Add(1) 799 // TODO(deklerk): Can we have a generic handler at the 800 // constructor level? 801 if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) { 802 defer wg.Done() 803 f(ctx2, msg.(*Message)) 804 }); err != nil { 805 wg.Done() 806 return err 807 } 808 } 809 } 810 }) 811 } 812 813 go func() { 814 <-ctx2.Done() 815 816 // Wait for all iterators to stop. 817 for _, p := range pairs { 818 p.iter.stop() 819 p.wg.Done() 820 } 821 822 // This _must_ happen after every iterator has stopped, or some 823 // iterator will still have undelivered messages but the scheduler will 824 // already be shut down. 825 sched.Shutdown() 826 }() 827 828 return group.Wait() 829} 830 831type pullOptions struct { 832 maxExtension time.Duration 833 maxPrefetch int32 834 // If true, use unary Pull instead of StreamingPull, and never pull more 835 // than maxPrefetch messages. 836 synchronous bool 837} 838