1// Copyright 2016 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "errors" 19 "fmt" 20 "io" 21 "strings" 22 "sync" 23 "time" 24 25 "cloud.google.com/go/iam" 26 "golang.org/x/net/context" 27 "golang.org/x/sync/errgroup" 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30) 31 32// Subscription is a reference to a PubSub subscription. 33type Subscription struct { 34 s service 35 36 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 37 name string 38 39 // Settings for pulling messages. Configure these before calling Receive. 40 ReceiveSettings ReceiveSettings 41 42 mu sync.Mutex 43 receiveActive bool 44} 45 46// Subscription creates a reference to a subscription. 47func (c *Client) Subscription(id string) *Subscription { 48 return newSubscription(c.s, fmt.Sprintf("projects/%s/subscriptions/%s", c.projectID, id)) 49} 50 51func newSubscription(s service, name string) *Subscription { 52 return &Subscription{ 53 s: s, 54 name: name, 55 } 56} 57 58// String returns the globally unique printable name of the subscription. 59func (s *Subscription) String() string { 60 return s.name 61} 62 63// ID returns the unique identifier of the subscription within its project. 64func (s *Subscription) ID() string { 65 slash := strings.LastIndex(s.name, "/") 66 if slash == -1 { 67 // name is not a fully-qualified name. 68 panic("bad subscription name") 69 } 70 return s.name[slash+1:] 71} 72 73// Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 74func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 75 return &SubscriptionIterator{ 76 s: c.s, 77 next: c.s.listProjectSubscriptions(ctx, c.fullyQualifiedProjectName()), 78 } 79} 80 81// SubscriptionIterator is an iterator that returns a series of subscriptions. 82type SubscriptionIterator struct { 83 s service 84 next nextStringFunc 85} 86 87// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 88func (subs *SubscriptionIterator) Next() (*Subscription, error) { 89 subName, err := subs.next() 90 if err != nil { 91 return nil, err 92 } 93 return newSubscription(subs.s, subName), nil 94} 95 96// PushConfig contains configuration for subscriptions that operate in push mode. 97type PushConfig struct { 98 // A URL locating the endpoint to which messages should be pushed. 99 Endpoint string 100 101 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 102 Attributes map[string]string 103} 104 105// Subscription config contains the configuration of a subscription. 106type SubscriptionConfig struct { 107 Topic *Topic 108 PushConfig PushConfig 109 110 // The default maximum time after a subscriber receives a message before 111 // the subscriber should acknowledge the message. Note: messages which are 112 // obtained via Subscription.Receive need not be acknowledged within this 113 // deadline, as the deadline will be automatically extended. 114 AckDeadline time.Duration 115 116 // Whether to retain acknowledged messages. If true, acknowledged messages 117 // will not be expunged until they fall out of the RetentionDuration window. 118 retainAckedMessages bool 119 120 // How long to retain messages in backlog, from the time of publish. If RetainAckedMessages is true, 121 // this duration affects the retention of acknowledged messages, 122 // otherwise only unacknowledged messages are retained. 123 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 124 retentionDuration time.Duration 125} 126 127// ReceiveSettings configure the Receive method. 128// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 129type ReceiveSettings struct { 130 // MaxExtension is the maximum period for which the Subscription should 131 // automatically extend the ack deadline for each message. 132 // 133 // The Subscription will automatically extend the ack deadline of all 134 // fetched Messages for the duration specified. Automatic deadline 135 // extension may be disabled by specifying a duration less than 1. 136 MaxExtension time.Duration 137 138 // MaxOutstandingMessages is the maximum number of unprocessed messages 139 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 140 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 141 // If the value is negative, then there will be no limit on the number of 142 // unprocessed messages. 143 MaxOutstandingMessages int 144 145 // MaxOutstandingBytes is the maximum size of unprocessed messages 146 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 147 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 148 // the value is negative, then there will be no limit on the number of bytes 149 // for unprocessed messages. 150 MaxOutstandingBytes int 151 152 // NumGoroutines is the number of goroutines Receive will spawn to pull 153 // messages concurrently. If NumGoroutines is less than 1, it will be treated 154 // as if it were DefaultReceiveSettings.NumGoroutines. 155 // 156 // NumGoroutines does not limit the number of messages that can be processed 157 // concurrently. Even with one goroutine, many messages might be processed at 158 // once, because that goroutine may continually receive messages and invoke the 159 // function passed to Receive on them. To limit the number of messages being 160 // processed concurrently, set MaxOutstandingMessages. 161 NumGoroutines int 162} 163 164// DefaultReceiveSettings holds the default values for ReceiveSettings. 165var DefaultReceiveSettings = ReceiveSettings{ 166 MaxExtension: 10 * time.Minute, 167 MaxOutstandingMessages: 1000, 168 MaxOutstandingBytes: 1e9, // 1G 169 NumGoroutines: 1, 170} 171 172// Delete deletes the subscription. 173func (s *Subscription) Delete(ctx context.Context) error { 174 return s.s.deleteSubscription(ctx, s.name) 175} 176 177// Exists reports whether the subscription exists on the server. 178func (s *Subscription) Exists(ctx context.Context) (bool, error) { 179 return s.s.subscriptionExists(ctx, s.name) 180} 181 182// Config fetches the current configuration for the subscription. 183func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 184 conf, topicName, err := s.s.getSubscriptionConfig(ctx, s.name) 185 if err != nil { 186 return SubscriptionConfig{}, err 187 } 188 conf.Topic = &Topic{ 189 s: s.s, 190 name: topicName, 191 } 192 return conf, nil 193} 194 195// SubscriptionConfigToUpdate describes how to update a subscription. 196type SubscriptionConfigToUpdate struct { 197 // If non-nil, the push config is changed. 198 PushConfig *PushConfig 199} 200 201// Update changes an existing subscription according to the fields set in cfg. 202// It returns the new SubscriptionConfig. 203// 204// Update returns an error if no fields were modified. 205func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 206 if cfg.PushConfig == nil { 207 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 208 } 209 if err := s.s.modifyPushConfig(ctx, s.name, *cfg.PushConfig); err != nil { 210 return SubscriptionConfig{}, err 211 } 212 return s.Config(ctx) 213} 214 215func (s *Subscription) IAM() *iam.Handle { 216 return s.s.iamHandle(s.name) 217} 218 219// CreateSubscription creates a new subscription on a topic. 220// 221// id is the name of the subscription to create. It must start with a letter, 222// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 223// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 224// must be between 3 and 255 characters in length, and must not start with 225// "goog". 226// 227// cfg.Topic is the topic from which the subscription should receive messages. It 228// need not belong to the same project as the subscription. This field is required. 229// 230// cfg.AckDeadline is the maximum time after a subscriber receives a message before 231// the subscriber should acknowledge the message. It must be between 10 and 600 232// seconds (inclusive), and is rounded down to the nearest second. If the 233// provided ackDeadline is 0, then the default value of 10 seconds is used. 234// Note: messages which are obtained via Subscription.Receive need not be 235// acknowledged within this deadline, as the deadline will be automatically 236// extended. 237// 238// cfg.PushConfig may be set to configure this subscription for push delivery. 239// 240// If the subscription already exists an error will be returned. 241func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 242 if cfg.Topic == nil { 243 return nil, errors.New("pubsub: require non-nil Topic") 244 } 245 if cfg.AckDeadline == 0 { 246 cfg.AckDeadline = 10 * time.Second 247 } 248 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 249 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 250 } 251 252 sub := c.Subscription(id) 253 err := c.s.createSubscription(ctx, sub.name, cfg) 254 return sub, err 255} 256 257var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 258 259// Receive calls f with the outstanding messages from the subscription. 260// It blocks until ctx is done, or the service returns a non-retryable error. 261// 262// The standard way to terminate a Receive is to cancel its context: 263// 264// cctx, cancel := context.WithCancel(ctx) 265// err := sub.Receive(cctx, callback) 266// // Call cancel from callback, or another goroutine. 267// 268// If the service returns a non-retryable error, Receive returns that error after 269// all of the outstanding calls to f have returned. If ctx is done, Receive 270// returns nil after all of the outstanding calls to f have returned and 271// all messages have been acknowledged or have expired. 272// 273// Receive calls f concurrently from multiple goroutines. It is encouraged to 274// process messages synchronously in f, even if that processing is relatively 275// time-consuming; Receive will spawn new goroutines for incoming messages, 276// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 277// 278// The context passed to f will be canceled when ctx is Done or there is a 279// fatal service error. 280// 281// Receive will automatically extend the ack deadline of all fetched Messages for the 282// period specified by s.ReceiveSettings.MaxExtension. 283// 284// Each Subscription may have only one invocation of Receive active at a time. 285func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 286 s.mu.Lock() 287 if s.receiveActive { 288 s.mu.Unlock() 289 return errReceiveInProgress 290 } 291 s.receiveActive = true 292 s.mu.Unlock() 293 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 294 295 config, err := s.Config(ctx) 296 if err != nil { 297 if grpc.Code(err) == codes.Canceled { 298 return nil 299 } 300 return err 301 } 302 maxCount := s.ReceiveSettings.MaxOutstandingMessages 303 if maxCount == 0 { 304 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 305 } 306 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 307 if maxBytes == 0 { 308 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 309 } 310 maxExt := s.ReceiveSettings.MaxExtension 311 if maxExt == 0 { 312 maxExt = DefaultReceiveSettings.MaxExtension 313 } else if maxExt < 0 { 314 // If MaxExtension is negative, disable automatic extension. 315 maxExt = 0 316 } 317 numGoroutines := s.ReceiveSettings.NumGoroutines 318 if numGoroutines < 1 { 319 numGoroutines = DefaultReceiveSettings.NumGoroutines 320 } 321 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 322 po := &pullOptions{ 323 maxExtension: maxExt, 324 maxPrefetch: trunc32(int64(maxCount)), 325 ackDeadline: config.AckDeadline, 326 } 327 fc := newFlowController(maxCount, maxBytes) 328 329 // Wait for all goroutines started by Receive to return, so instead of an 330 // obscure goroutine leak we have an obvious blocked call to Receive. 331 group, gctx := errgroup.WithContext(ctx) 332 for i := 0; i < numGoroutines; i++ { 333 group.Go(func() error { 334 return s.receive(gctx, po, fc, f) 335 }) 336 } 337 return group.Wait() 338} 339 340func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error { 341 // Cancel a sub-context when we return, to kick the context-aware callbacks 342 // and the goroutine below. 343 ctx2, cancel := context.WithCancel(ctx) 344 // Call stop when Receive's context is done. 345 // Stop will block until all outstanding messages have been acknowledged 346 // or there was a fatal service error. 347 // The iterator does not use the context passed to Receive. If it did, canceling 348 // that context would immediately stop the iterator without waiting for unacked 349 // messages. 350 iter := newMessageIterator(context.Background(), s.s, s.name, po) 351 352 // We cannot use errgroup from Receive here. Receive might already be calling group.Wait, 353 // and group.Wait cannot be called concurrently with group.Go. We give each receive() its 354 // own WaitGroup instead. 355 // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed 356 // to be called after all Adds. 357 var wg sync.WaitGroup 358 wg.Add(1) 359 go func() { 360 <-ctx2.Done() 361 iter.stop() 362 wg.Done() 363 }() 364 defer wg.Wait() 365 366 defer cancel() 367 for { 368 msgs, err := iter.receive() 369 if err == io.EOF { 370 return nil 371 } 372 if err != nil { 373 return err 374 } 375 for i, msg := range msgs { 376 msg := msg 377 // TODO(jba): call acquire closer to when the message is allocated. 378 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 379 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 380 for _, m := range msgs[i:] { 381 m.Nack() 382 } 383 return nil 384 } 385 wg.Add(1) 386 go func() { 387 // TODO(jba): call release when the message is available for GC. 388 // This considers the message to be released when 389 // f is finished, but f may ack early or not at all. 390 defer wg.Done() 391 defer fc.release(len(msg.Data)) 392 f(ctx2, msg) 393 }() 394 } 395 } 396} 397 398// TODO(jba): remove when we delete messageIterator. 399type pullOptions struct { 400 maxExtension time.Duration 401 maxPrefetch int32 402 // ackDeadline is the default ack deadline for the subscription. Not 403 // configurable. 404 ackDeadline time.Duration 405} 406