1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 "cloud.google.com/go/pubsub/internal/scheduler" 29 "github.com/golang/protobuf/ptypes" 30 durpb "github.com/golang/protobuf/ptypes/duration" 31 gax "github.com/googleapis/gax-go/v2" 32 "golang.org/x/sync/errgroup" 33 pb "google.golang.org/genproto/googleapis/pubsub/v1" 34 fmpb "google.golang.org/genproto/protobuf/field_mask" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/status" 37) 38 39// Subscription is a reference to a PubSub subscription. 40type Subscription struct { 41 c *Client 42 43 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 44 name string 45 46 // Settings for pulling messages. Configure these before calling Receive. 47 ReceiveSettings ReceiveSettings 48 49 mu sync.Mutex 50 receiveActive bool 51} 52 53// Subscription creates a reference to a subscription. 54func (c *Client) Subscription(id string) *Subscription { 55 return c.SubscriptionInProject(id, c.projectID) 56} 57 58// SubscriptionInProject creates a reference to a subscription in a given project. 59func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 60 return &Subscription{ 61 c: c, 62 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 63 } 64} 65 66// String returns the globally unique printable name of the subscription. 67func (s *Subscription) String() string { 68 return s.name 69} 70 71// ID returns the unique identifier of the subscription within its project. 72func (s *Subscription) ID() string { 73 slash := strings.LastIndex(s.name, "/") 74 if slash == -1 { 75 // name is not a fully-qualified name. 76 panic("bad subscription name") 77 } 78 return s.name[slash+1:] 79} 80 81// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 82func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 83 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 84 Project: c.fullyQualifiedProjectName(), 85 }) 86 return &SubscriptionIterator{ 87 c: c, 88 next: func() (string, error) { 89 sub, err := it.Next() 90 if err != nil { 91 return "", err 92 } 93 return sub.Name, nil 94 }, 95 } 96} 97 98// SubscriptionIterator is an iterator that returns a series of subscriptions. 99type SubscriptionIterator struct { 100 c *Client 101 next func() (string, error) 102} 103 104// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 105func (subs *SubscriptionIterator) Next() (*Subscription, error) { 106 subName, err := subs.next() 107 if err != nil { 108 return nil, err 109 } 110 return &Subscription{c: subs.c, name: subName}, nil 111} 112 113// PushConfig contains configuration for subscriptions that operate in push mode. 114type PushConfig struct { 115 // A URL locating the endpoint to which messages should be pushed. 116 Endpoint string 117 118 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 119 Attributes map[string]string 120 121 // AuthenticationMethod is used by push endpoints to verify the source 122 // of push requests. 123 // It can be used with push endpoints that are private by default to 124 // allow requests only from the Cloud Pub/Sub system, for example. 125 // This field is optional and should be set only by users interested in 126 // authenticated push. 127 AuthenticationMethod AuthenticationMethod 128} 129 130func (pc *PushConfig) toProto() *pb.PushConfig { 131 if pc == nil { 132 return nil 133 } 134 pbCfg := &pb.PushConfig{ 135 Attributes: pc.Attributes, 136 PushEndpoint: pc.Endpoint, 137 } 138 if authMethod := pc.AuthenticationMethod; authMethod != nil { 139 switch am := authMethod.(type) { 140 case *OIDCToken: 141 pbCfg.AuthenticationMethod = am.toProto() 142 default: // TODO: add others here when GAIC adds more definitions. 143 } 144 } 145 return pbCfg 146} 147 148// AuthenticationMethod is used by push points to verify the source of push requests. 149// This interface defines fields that are part of a closed alpha that may not be accessible 150// to all users. 151type AuthenticationMethod interface { 152 isAuthMethod() bool 153} 154 155// OIDCToken allows PushConfigs to be authenticated using 156// the OpenID Connect protocol https://openid.net/connect/ 157type OIDCToken struct { 158 // Audience to be used when generating OIDC token. The audience claim 159 // identifies the recipients that the JWT is intended for. The audience 160 // value is a single case-sensitive string. Having multiple values (array) 161 // for the audience field is not supported. More info about the OIDC JWT 162 // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 163 // Note: if not specified, the Push endpoint URL will be used. 164 Audience string 165 166 // The service account email to be used for generating the OpenID Connect token. 167 // The caller of: 168 // * CreateSubscription 169 // * UpdateSubscription 170 // * ModifyPushConfig 171 // calls must have the iam.serviceAccounts.actAs permission for the service account. 172 // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. 173 ServiceAccountEmail string 174} 175 176var _ AuthenticationMethod = (*OIDCToken)(nil) 177 178func (oidcToken *OIDCToken) isAuthMethod() bool { return true } 179 180func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { 181 if oidcToken == nil { 182 return nil 183 } 184 return &pb.PushConfig_OidcToken_{ 185 OidcToken: &pb.PushConfig_OidcToken{ 186 Audience: oidcToken.Audience, 187 ServiceAccountEmail: oidcToken.ServiceAccountEmail, 188 }, 189 } 190} 191 192// SubscriptionConfig describes the configuration of a subscription. 193type SubscriptionConfig struct { 194 Topic *Topic 195 PushConfig PushConfig 196 197 // The default maximum time after a subscriber receives a message before 198 // the subscriber should acknowledge the message. Note: messages which are 199 // obtained via Subscription.Receive need not be acknowledged within this 200 // deadline, as the deadline will be automatically extended. 201 AckDeadline time.Duration 202 203 // Whether to retain acknowledged messages. If true, acknowledged messages 204 // will not be expunged until they fall out of the RetentionDuration window. 205 RetainAckedMessages bool 206 207 // How long to retain messages in backlog, from the time of publish. If 208 // RetainAckedMessages is true, this duration affects the retention of 209 // acknowledged messages, otherwise only unacknowledged messages are retained. 210 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 211 RetentionDuration time.Duration 212 213 // Expiration policy specifies the conditions for a subscription's expiration. 214 // A subscription is considered active as long as any connected subscriber is 215 // successfully consuming messages from the subscription or is issuing 216 // operations on the subscription. If `expiration_policy` is not set, a 217 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 218 // value for `expiration_policy.ttl` is 1 day. 219 // 220 // Use time.Duration(0) to indicate that the subscription should never expire. 221 ExpirationPolicy optional.Duration 222 223 // The set of labels for the subscription. 224 Labels map[string]string 225 226 // EnableMessageOrdering enables message ordering. 227 // 228 // It is EXPERIMENTAL and a part of a closed alpha that may not be 229 // accessible to all users. This field is subject to change or removal 230 // without notice. 231 EnableMessageOrdering bool 232 233 // DeadLetterPolicy specifies the conditions for dead lettering messages in 234 // a subscription. If not set, dead lettering is disabled. 235 DeadLetterPolicy *DeadLetterPolicy 236 237 // Filter is an expression written in the Cloud Pub/Sub filter language. If 238 // non-empty, then only `PubsubMessage`s whose `attributes` field matches the 239 // filter are delivered on this subscription. If empty, then no messages are 240 // filtered out. Cannot be changed after the subscription is created. 241 // 242 // It is EXPERIMENTAL and a part of a closed alpha that may not be 243 // accessible to all users. This field is subject to change or removal 244 // without notice. 245 Filter string 246 247 // RetryPolicy specifies how Cloud Pub/Sub retries message delivery. 248 RetryPolicy *RetryPolicy 249 250 // Detached indicates whether the subscription is detached from its topic. 251 // Detached subscriptions don't receive messages from their topic and don't 252 // retain any backlog. `Pull` and `StreamingPull` requests will return 253 // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to 254 // the endpoint will not be made. 255 Detached bool 256} 257 258func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 259 var pbPushConfig *pb.PushConfig 260 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { 261 pbPushConfig = cfg.PushConfig.toProto() 262 } 263 var retentionDuration *durpb.Duration 264 if cfg.RetentionDuration != 0 { 265 retentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 266 } 267 var pbDeadLetter *pb.DeadLetterPolicy 268 if cfg.DeadLetterPolicy != nil { 269 pbDeadLetter = cfg.DeadLetterPolicy.toProto() 270 } 271 var pbRetryPolicy *pb.RetryPolicy 272 if cfg.RetryPolicy != nil { 273 pbRetryPolicy = cfg.RetryPolicy.toProto() 274 } 275 return &pb.Subscription{ 276 Name: name, 277 Topic: cfg.Topic.name, 278 PushConfig: pbPushConfig, 279 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 280 RetainAckedMessages: cfg.RetainAckedMessages, 281 MessageRetentionDuration: retentionDuration, 282 Labels: cfg.Labels, 283 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 284 EnableMessageOrdering: cfg.EnableMessageOrdering, 285 DeadLetterPolicy: pbDeadLetter, 286 Filter: cfg.Filter, 287 RetryPolicy: pbRetryPolicy, 288 Detached: cfg.Detached, 289 } 290} 291 292func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 293 rd := time.Hour * 24 * 7 294 var err error 295 if pbSub.MessageRetentionDuration != nil { 296 rd, err = ptypes.Duration(pbSub.MessageRetentionDuration) 297 if err != nil { 298 return SubscriptionConfig{}, err 299 } 300 } 301 var expirationPolicy time.Duration 302 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 303 expirationPolicy, err = ptypes.Duration(ttl) 304 if err != nil { 305 return SubscriptionConfig{}, err 306 } 307 } 308 dlp := protoToDLP(pbSub.DeadLetterPolicy) 309 rp := protoToRetryPolicy(pbSub.RetryPolicy) 310 subC := SubscriptionConfig{ 311 Topic: newTopic(c, pbSub.Topic), 312 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 313 RetainAckedMessages: pbSub.RetainAckedMessages, 314 RetentionDuration: rd, 315 Labels: pbSub.Labels, 316 ExpirationPolicy: expirationPolicy, 317 DeadLetterPolicy: dlp, 318 Filter: pbSub.Filter, 319 RetryPolicy: rp, 320 Detached: pbSub.Detached, 321 } 322 pc := protoToPushConfig(pbSub.PushConfig) 323 if pc != nil { 324 subC.PushConfig = *pc 325 } 326 return subC, nil 327} 328 329func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { 330 if pbPc == nil { 331 return nil 332 } 333 pc := &PushConfig{ 334 Endpoint: pbPc.PushEndpoint, 335 Attributes: pbPc.Attributes, 336 } 337 if am := pbPc.AuthenticationMethod; am != nil { 338 if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil { 339 pc.AuthenticationMethod = &OIDCToken{ 340 Audience: oidcToken.OidcToken.GetAudience(), 341 ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(), 342 } 343 } 344 } 345 return pc 346} 347 348// DeadLetterPolicy specifies the conditions for dead lettering messages in 349// a subscription. 350type DeadLetterPolicy struct { 351 DeadLetterTopic string 352 MaxDeliveryAttempts int 353} 354 355func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy { 356 if dlp == nil || dlp.DeadLetterTopic == "" { 357 return nil 358 } 359 return &pb.DeadLetterPolicy{ 360 DeadLetterTopic: dlp.DeadLetterTopic, 361 MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts), 362 } 363} 364func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy { 365 if pbDLP == nil { 366 return nil 367 } 368 return &DeadLetterPolicy{ 369 DeadLetterTopic: pbDLP.GetDeadLetterTopic(), 370 MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts), 371 } 372} 373 374// RetryPolicy specifies how Cloud Pub/Sub retries message delivery. 375// 376// Retry delay will be exponential based on provided minimum and maximum 377// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff. 378// 379// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded 380// events for a given message. 381// 382// Retry Policy is implemented on a best effort basis. At times, the delay 383// between consecutive deliveries may not match the configuration. That is, 384// delay can be more or less than configured backoff. 385type RetryPolicy struct { 386 // MinimumBackoff is the minimum delay between consecutive deliveries of a 387 // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds. 388 MinimumBackoff optional.Duration 389 // MaximumBackoff is the maximum delay between consecutive deliveries of a 390 // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds. 391 MaximumBackoff optional.Duration 392} 393 394func (rp *RetryPolicy) toProto() *pb.RetryPolicy { 395 if rp == nil { 396 return nil 397 } 398 // If RetryPolicy is the empty struct, take this as an instruction 399 // to remove RetryPolicy from the subscription. 400 if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil { 401 return nil 402 } 403 404 // Initialize minDur and maxDur to be negative, such that if the conversion from an 405 // optional fails, RetryPolicy won't be updated in the proto as it will remain nil. 406 var minDur time.Duration = -1 407 var maxDur time.Duration = -1 408 if rp.MinimumBackoff != nil { 409 minDur = optional.ToDuration(rp.MinimumBackoff) 410 } 411 if rp.MaximumBackoff != nil { 412 maxDur = optional.ToDuration(rp.MaximumBackoff) 413 } 414 415 var minDurPB, maxDurPB *durpb.Duration 416 if minDur > 0 { 417 minDurPB = ptypes.DurationProto(minDur) 418 } 419 if maxDur > 0 { 420 maxDurPB = ptypes.DurationProto(maxDur) 421 } 422 423 return &pb.RetryPolicy{ 424 MinimumBackoff: minDurPB, 425 MaximumBackoff: maxDurPB, 426 } 427} 428 429func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy { 430 if rp == nil { 431 return nil 432 } 433 var minBackoff, maxBackoff time.Duration 434 var err error 435 if rp.MinimumBackoff != nil { 436 minBackoff, err = ptypes.Duration(rp.MinimumBackoff) 437 if err != nil { 438 return nil 439 } 440 } 441 if rp.MaximumBackoff != nil { 442 maxBackoff, err = ptypes.Duration(rp.MaximumBackoff) 443 if err != nil { 444 return nil 445 } 446 } 447 448 retryPolicy := &RetryPolicy{ 449 MinimumBackoff: minBackoff, 450 MaximumBackoff: maxBackoff, 451 } 452 return retryPolicy 453} 454 455// ReceiveSettings configure the Receive method. 456// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 457type ReceiveSettings struct { 458 // MaxExtension is the maximum period for which the Subscription should 459 // automatically extend the ack deadline for each message. 460 // 461 // The Subscription will automatically extend the ack deadline of all 462 // fetched Messages up to the duration specified. Automatic deadline 463 // extension beyond the initial receipt may be disabled by specifying a 464 // duration less than 0. 465 MaxExtension time.Duration 466 467 // MaxExtensionPeriod is the maximum duration by which to extend the ack 468 // deadline at a time. The ack deadline will continue to be extended by up 469 // to this duration until MaxExtension is reached. Setting MaxExtensionPeriod 470 // bounds the maximum amount of time before a message redelivery in the 471 // event the subscriber fails to extend the deadline. 472 // 473 // MaxExtensionPeriod configuration can be disabled by specifying a 474 // duration less than (or equal to) 0. 475 MaxExtensionPeriod time.Duration 476 477 // MaxOutstandingMessages is the maximum number of unprocessed messages 478 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 479 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 480 // If the value is negative, then there will be no limit on the number of 481 // unprocessed messages. 482 MaxOutstandingMessages int 483 484 // MaxOutstandingBytes is the maximum size of unprocessed messages 485 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 486 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 487 // the value is negative, then there will be no limit on the number of bytes 488 // for unprocessed messages. 489 MaxOutstandingBytes int 490 491 // NumGoroutines is the number of goroutines that each datastructure along 492 // the Receive path will spawn. Adjusting this value adjusts concurrency 493 // along the receive path. 494 // 495 // NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines. 496 // 497 // NumGoroutines does not limit the number of messages that can be processed 498 // concurrently. Even with one goroutine, many messages might be processed at 499 // once, because that goroutine may continually receive messages and invoke the 500 // function passed to Receive on them. To limit the number of messages being 501 // processed concurrently, set MaxOutstandingMessages. 502 NumGoroutines int 503 504 // If Synchronous is true, then no more than MaxOutstandingMessages will be in 505 // memory at one time. (In contrast, when Synchronous is false, more than 506 // MaxOutstandingMessages may have been received from the service and in memory 507 // before being processed.) MaxOutstandingBytes still refers to the total bytes 508 // processed, rather than in memory. NumGoroutines is ignored. 509 // The default is false. 510 Synchronous bool 511} 512 513// For synchronous receive, the time to wait if we are already processing 514// MaxOutstandingMessages. There is no point calling Pull and asking for zero 515// messages, so we pause to allow some message-processing callbacks to finish. 516// 517// The wait time is large enough to avoid consuming significant CPU, but 518// small enough to provide decent throughput. Users who want better 519// throughput should not be using synchronous mode. 520// 521// Waiting might seem like polling, so it's natural to think we could do better by 522// noticing when a callback is finished and immediately calling Pull. But if 523// callbacks finish in quick succession, this will result in frequent Pull RPCs that 524// request a single message, which wastes network bandwidth. Better to wait for a few 525// callbacks to finish, so we make fewer RPCs fetching more messages. 526// 527// This value is unexported so the user doesn't have another knob to think about. Note that 528// it is the same value as the one used for nackTicker, so it matches this client's 529// idea of a duration that is short, but not so short that we perform excessive RPCs. 530const synchronousWaitTime = 100 * time.Millisecond 531 532// This is a var so that tests can change it. 533var minAckDeadline = 10 * time.Second 534 535// DefaultReceiveSettings holds the default values for ReceiveSettings. 536var DefaultReceiveSettings = ReceiveSettings{ 537 MaxExtension: 60 * time.Minute, 538 MaxExtensionPeriod: 0, 539 MaxOutstandingMessages: 1000, 540 MaxOutstandingBytes: 1e9, // 1G 541 NumGoroutines: 10, 542} 543 544// Delete deletes the subscription. 545func (s *Subscription) Delete(ctx context.Context) error { 546 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 547} 548 549// Exists reports whether the subscription exists on the server. 550func (s *Subscription) Exists(ctx context.Context) (bool, error) { 551 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 552 if err == nil { 553 return true, nil 554 } 555 if status.Code(err) == codes.NotFound { 556 return false, nil 557 } 558 return false, err 559} 560 561// Config fetches the current configuration for the subscription. 562func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 563 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 564 if err != nil { 565 return SubscriptionConfig{}, err 566 } 567 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 568 if err != nil { 569 return SubscriptionConfig{}, err 570 } 571 return cfg, nil 572} 573 574// SubscriptionConfigToUpdate describes how to update a subscription. 575type SubscriptionConfigToUpdate struct { 576 // If non-nil, the push config is changed. 577 PushConfig *PushConfig 578 579 // If non-zero, the ack deadline is changed. 580 AckDeadline time.Duration 581 582 // If set, RetainAckedMessages is changed. 583 RetainAckedMessages optional.Bool 584 585 // If non-zero, RetentionDuration is changed. 586 RetentionDuration time.Duration 587 588 // If non-zero, Expiration is changed. 589 ExpirationPolicy optional.Duration 590 591 // If non-nil, DeadLetterPolicy is changed. To remove dead lettering from 592 // a subscription, use the zero value for this struct. 593 DeadLetterPolicy *DeadLetterPolicy 594 595 // If non-nil, the current set of labels is completely 596 // replaced by the new set. 597 // This field has beta status. It is not subject to the stability guarantee 598 // and may change. 599 Labels map[string]string 600 601 // If non-nil, RetryPolicy is changed. To remove an existing retry policy 602 // (to redeliver messages as soon as possible) use a pointer to the zero value 603 // for this struct. 604 RetryPolicy *RetryPolicy 605} 606 607// Update changes an existing subscription according to the fields set in cfg. 608// It returns the new SubscriptionConfig. 609// 610// Update returns an error if no fields were modified. 611func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 612 req := s.updateRequest(&cfg) 613 if err := cfg.validate(); err != nil { 614 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err) 615 } 616 if len(req.UpdateMask.Paths) == 0 { 617 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 618 } 619 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 620 if err != nil { 621 return SubscriptionConfig{}, err 622 } 623 return protoToSubscriptionConfig(rpsub, s.c) 624} 625 626func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 627 psub := &pb.Subscription{Name: s.name} 628 var paths []string 629 if cfg.PushConfig != nil { 630 psub.PushConfig = cfg.PushConfig.toProto() 631 paths = append(paths, "push_config") 632 } 633 if cfg.AckDeadline != 0 { 634 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 635 paths = append(paths, "ack_deadline_seconds") 636 } 637 if cfg.RetainAckedMessages != nil { 638 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 639 paths = append(paths, "retain_acked_messages") 640 } 641 if cfg.RetentionDuration != 0 { 642 psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration) 643 paths = append(paths, "message_retention_duration") 644 } 645 if cfg.ExpirationPolicy != nil { 646 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 647 paths = append(paths, "expiration_policy") 648 } 649 if cfg.DeadLetterPolicy != nil { 650 psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto() 651 paths = append(paths, "dead_letter_policy") 652 } 653 if cfg.Labels != nil { 654 psub.Labels = cfg.Labels 655 paths = append(paths, "labels") 656 } 657 if cfg.RetryPolicy != nil { 658 psub.RetryPolicy = cfg.RetryPolicy.toProto() 659 paths = append(paths, "retry_policy") 660 } 661 return &pb.UpdateSubscriptionRequest{ 662 Subscription: psub, 663 UpdateMask: &fmpb.FieldMask{Paths: paths}, 664 } 665} 666 667const ( 668 // The minimum expiration policy duration is 1 day as per: 669 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 670 minExpirationPolicy = 24 * time.Hour 671 672 // If an expiration policy is not specified, the default of 31 days is used as per: 673 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 674 defaultExpirationPolicy = 31 * 24 * time.Hour 675) 676 677func (cfg *SubscriptionConfigToUpdate) validate() error { 678 if cfg == nil || cfg.ExpirationPolicy == nil { 679 return nil 680 } 681 expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy 682 if expPolicy != 0 && expPolicy < min { 683 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min) 684 } 685 return nil 686} 687 688func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy { 689 if expirationPolicy == nil { 690 return nil 691 } 692 693 dur := optional.ToDuration(expirationPolicy) 694 var ttl *durpb.Duration 695 // As per: 696 // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl 697 // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire. 698 if dur != 0 { 699 ttl = ptypes.DurationProto(dur) 700 } 701 return &pb.ExpirationPolicy{ 702 Ttl: ttl, 703 } 704} 705 706// IAM returns the subscription's IAM handle. 707func (s *Subscription) IAM() *iam.Handle { 708 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 709} 710 711// CreateSubscription creates a new subscription on a topic. 712// 713// id is the name of the subscription to create. It must start with a letter, 714// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 715// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 716// must be between 3 and 255 characters in length, and must not start with 717// "goog". 718// 719// cfg.Topic is the topic from which the subscription should receive messages. It 720// need not belong to the same project as the subscription. This field is required. 721// 722// cfg.AckDeadline is the maximum time after a subscriber receives a message before 723// the subscriber should acknowledge the message. It must be between 10 and 600 724// seconds (inclusive), and is rounded down to the nearest second. If the 725// provided ackDeadline is 0, then the default value of 10 seconds is used. 726// Note: messages which are obtained via Subscription.Receive need not be 727// acknowledged within this deadline, as the deadline will be automatically 728// extended. 729// 730// cfg.PushConfig may be set to configure this subscription for push delivery. 731// 732// If the subscription already exists an error will be returned. 733func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 734 if cfg.Topic == nil { 735 return nil, errors.New("pubsub: require non-nil Topic") 736 } 737 if cfg.AckDeadline == 0 { 738 cfg.AckDeadline = 10 * time.Second 739 } 740 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 741 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 742 } 743 744 sub := c.Subscription(id) 745 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 746 if err != nil { 747 return nil, err 748 } 749 return sub, nil 750} 751 752var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 753 754// Receive calls f with the outstanding messages from the subscription. 755// It blocks until ctx is done, or the service returns a non-retryable error. 756// 757// The standard way to terminate a Receive is to cancel its context: 758// 759// cctx, cancel := context.WithCancel(ctx) 760// err := sub.Receive(cctx, callback) 761// // Call cancel from callback, or another goroutine. 762// 763// If the service returns a non-retryable error, Receive returns that error after 764// all of the outstanding calls to f have returned. If ctx is done, Receive 765// returns nil after all of the outstanding calls to f have returned and 766// all messages have been acknowledged or have expired. 767// 768// Receive calls f concurrently from multiple goroutines. It is encouraged to 769// process messages synchronously in f, even if that processing is relatively 770// time-consuming; Receive will spawn new goroutines for incoming messages, 771// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 772// 773// The context passed to f will be canceled when ctx is Done or there is a 774// fatal service error. 775// 776// Receive will send an ack deadline extension on message receipt, then 777// automatically extend the ack deadline of all fetched Messages up to the 778// period specified by s.ReceiveSettings.MaxExtension. 779// 780// Each Subscription may have only one invocation of Receive active at a time. 781func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 782 s.mu.Lock() 783 if s.receiveActive { 784 s.mu.Unlock() 785 return errReceiveInProgress 786 } 787 s.receiveActive = true 788 s.mu.Unlock() 789 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 790 791 maxCount := s.ReceiveSettings.MaxOutstandingMessages 792 if maxCount == 0 { 793 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 794 } 795 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 796 if maxBytes == 0 { 797 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 798 } 799 maxExt := s.ReceiveSettings.MaxExtension 800 if maxExt == 0 { 801 maxExt = DefaultReceiveSettings.MaxExtension 802 } else if maxExt < 0 { 803 // If MaxExtension is negative, disable automatic extension. 804 maxExt = 0 805 } 806 var numGoroutines int 807 switch { 808 case s.ReceiveSettings.Synchronous: 809 numGoroutines = 1 810 case s.ReceiveSettings.NumGoroutines >= 1: 811 numGoroutines = s.ReceiveSettings.NumGoroutines 812 default: 813 numGoroutines = DefaultReceiveSettings.NumGoroutines 814 } 815 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 816 po := &pullOptions{ 817 maxExtension: maxExt, 818 maxPrefetch: trunc32(int64(maxCount)), 819 synchronous: s.ReceiveSettings.Synchronous, 820 } 821 fc := newFlowController(maxCount, maxBytes) 822 823 sched := scheduler.NewReceiveScheduler(maxCount) 824 825 // Wait for all goroutines started by Receive to return, so instead of an 826 // obscure goroutine leak we have an obvious blocked call to Receive. 827 group, gctx := errgroup.WithContext(ctx) 828 829 type closeablePair struct { 830 wg *sync.WaitGroup 831 iter *messageIterator 832 } 833 834 var pairs []closeablePair 835 836 // Cancel a sub-context which, when we finish a single receiver, will kick 837 // off the context-aware callbacks and the goroutine below (which stops 838 // all receivers, iterators, and the scheduler). 839 ctx2, cancel2 := context.WithCancel(gctx) 840 defer cancel2() 841 842 for i := 0; i < numGoroutines; i++ { 843 // The iterator does not use the context passed to Receive. If it did, 844 // canceling that context would immediately stop the iterator without 845 // waiting for unacked messages. 846 iter := newMessageIterator(s.c.subc, s.name, &s.ReceiveSettings.MaxExtension, po) 847 848 // We cannot use errgroup from Receive here. Receive might already be 849 // calling group.Wait, and group.Wait cannot be called concurrently with 850 // group.Go. We give each receive() its own WaitGroup instead. 851 // 852 // Since wg.Add is only called from the main goroutine, wg.Wait is 853 // guaranteed to be called after all Adds. 854 var wg sync.WaitGroup 855 wg.Add(1) 856 pairs = append(pairs, closeablePair{wg: &wg, iter: iter}) 857 858 group.Go(func() error { 859 defer wg.Wait() 860 defer cancel2() 861 for { 862 var maxToPull int32 // maximum number of messages to pull 863 if po.synchronous { 864 if po.maxPrefetch < 0 { 865 // If there is no limit on the number of messages to 866 // pull, use a reasonable default. 867 maxToPull = 1000 868 } else { 869 // Limit the number of messages in memory to MaxOutstandingMessages 870 // (here, po.maxPrefetch). For each message currently in memory, we have 871 // called fc.acquire but not fc.release: this is fc.count(). The next 872 // call to Pull should fetch no more than the difference between these 873 // values. 874 maxToPull = po.maxPrefetch - int32(fc.count()) 875 if maxToPull <= 0 { 876 // Wait for some callbacks to finish. 877 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 878 // Return nil if the context is done, not err. 879 return nil 880 } 881 continue 882 } 883 } 884 } 885 msgs, err := iter.receive(maxToPull) 886 if err == io.EOF { 887 return nil 888 } 889 if err != nil { 890 return err 891 } 892 for i, msg := range msgs { 893 msg := msg 894 // TODO(jba): call acquire closer to when the message is allocated. 895 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 896 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 897 for _, m := range msgs[i:] { 898 m.Nack() 899 } 900 // Return nil if the context is done, not err. 901 return nil 902 } 903 old := msg.doneFunc 904 msgLen := len(msg.Data) 905 msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { 906 defer fc.release(msgLen) 907 old(ackID, ack, receiveTime) 908 } 909 wg.Add(1) 910 // TODO(deklerk): Can we have a generic handler at the 911 // constructor level? 912 if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) { 913 defer wg.Done() 914 f(ctx2, msg.(*Message)) 915 }); err != nil { 916 wg.Done() 917 return err 918 } 919 } 920 } 921 }) 922 } 923 924 go func() { 925 <-ctx2.Done() 926 927 // Wait for all iterators to stop. 928 for _, p := range pairs { 929 p.iter.stop() 930 p.wg.Done() 931 } 932 933 // This _must_ happen after every iterator has stopped, or some 934 // iterator will still have undelivered messages but the scheduler will 935 // already be shut down. 936 sched.Shutdown() 937 }() 938 939 return group.Wait() 940} 941 942type pullOptions struct { 943 maxExtension time.Duration 944 maxPrefetch int32 945 // If true, use unary Pull instead of StreamingPull, and never pull more 946 // than maxPrefetch messages. 947 synchronous bool 948} 949