1// Copyright 2016 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"errors"
19	"fmt"
20	"io"
21	"strings"
22	"sync"
23	"time"
24
25	"cloud.google.com/go/iam"
26	"golang.org/x/net/context"
27	"golang.org/x/sync/errgroup"
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30)
31
32// Subscription is a reference to a PubSub subscription.
33type Subscription struct {
34	s service
35
36	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
37	name string
38
39	// Settings for pulling messages. Configure these before calling Receive.
40	ReceiveSettings ReceiveSettings
41
42	mu            sync.Mutex
43	receiveActive bool
44}
45
46// Subscription creates a reference to a subscription.
47func (c *Client) Subscription(id string) *Subscription {
48	return newSubscription(c.s, fmt.Sprintf("projects/%s/subscriptions/%s", c.projectID, id))
49}
50
51func newSubscription(s service, name string) *Subscription {
52	return &Subscription{
53		s:    s,
54		name: name,
55	}
56}
57
58// String returns the globally unique printable name of the subscription.
59func (s *Subscription) String() string {
60	return s.name
61}
62
63// ID returns the unique identifier of the subscription within its project.
64func (s *Subscription) ID() string {
65	slash := strings.LastIndex(s.name, "/")
66	if slash == -1 {
67		// name is not a fully-qualified name.
68		panic("bad subscription name")
69	}
70	return s.name[slash+1:]
71}
72
73// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
74func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
75	return &SubscriptionIterator{
76		s:    c.s,
77		next: c.s.listProjectSubscriptions(ctx, c.fullyQualifiedProjectName()),
78	}
79}
80
81// SubscriptionIterator is an iterator that returns a series of subscriptions.
82type SubscriptionIterator struct {
83	s    service
84	next nextStringFunc
85}
86
87// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
88func (subs *SubscriptionIterator) Next() (*Subscription, error) {
89	subName, err := subs.next()
90	if err != nil {
91		return nil, err
92	}
93	return newSubscription(subs.s, subName), nil
94}
95
96// PushConfig contains configuration for subscriptions that operate in push mode.
97type PushConfig struct {
98	// A URL locating the endpoint to which messages should be pushed.
99	Endpoint string
100
101	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
102	Attributes map[string]string
103}
104
105// Subscription config contains the configuration of a subscription.
106type SubscriptionConfig struct {
107	Topic      *Topic
108	PushConfig PushConfig
109
110	// The default maximum time after a subscriber receives a message before
111	// the subscriber should acknowledge the message. Note: messages which are
112	// obtained via Subscription.Receive need not be acknowledged within this
113	// deadline, as the deadline will be automatically extended.
114	AckDeadline time.Duration
115
116	// Whether to retain acknowledged messages. If true, acknowledged messages
117	// will not be expunged until they fall out of the RetentionDuration window.
118	retainAckedMessages bool
119
120	// How long to retain messages in backlog, from the time of publish. If RetainAckedMessages is true,
121	// this duration affects the retention of acknowledged messages,
122	// otherwise only unacknowledged messages are retained.
123	// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
124	retentionDuration time.Duration
125}
126
127// ReceiveSettings configure the Receive method.
128// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
129type ReceiveSettings struct {
130	// MaxExtension is the maximum period for which the Subscription should
131	// automatically extend the ack deadline for each message.
132	//
133	// The Subscription will automatically extend the ack deadline of all
134	// fetched Messages for the duration specified. Automatic deadline
135	// extension may be disabled by specifying a duration less than 1.
136	MaxExtension time.Duration
137
138	// MaxOutstandingMessages is the maximum number of unprocessed messages
139	// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
140	// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
141	// If the value is negative, then there will be no limit on the number of
142	// unprocessed messages.
143	MaxOutstandingMessages int
144
145	// MaxOutstandingBytes is the maximum size of unprocessed messages
146	// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
147	// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
148	// the value is negative, then there will be no limit on the number of bytes
149	// for unprocessed messages.
150	MaxOutstandingBytes int
151
152	// NumGoroutines is the number of goroutines Receive will spawn to pull
153	// messages concurrently. If NumGoroutines is less than 1, it will be treated
154	// as if it were DefaultReceiveSettings.NumGoroutines.
155	//
156	// NumGoroutines does not limit the number of messages that can be processed
157	// concurrently. Even with one goroutine, many messages might be processed at
158	// once, because that goroutine may continually receive messages and invoke the
159	// function passed to Receive on them. To limit the number of messages being
160	// processed concurrently, set MaxOutstandingMessages.
161	NumGoroutines int
162}
163
164// DefaultReceiveSettings holds the default values for ReceiveSettings.
165var DefaultReceiveSettings = ReceiveSettings{
166	MaxExtension:           10 * time.Minute,
167	MaxOutstandingMessages: 1000,
168	MaxOutstandingBytes:    1e9, // 1G
169	NumGoroutines:          1,
170}
171
172// Delete deletes the subscription.
173func (s *Subscription) Delete(ctx context.Context) error {
174	return s.s.deleteSubscription(ctx, s.name)
175}
176
177// Exists reports whether the subscription exists on the server.
178func (s *Subscription) Exists(ctx context.Context) (bool, error) {
179	return s.s.subscriptionExists(ctx, s.name)
180}
181
182// Config fetches the current configuration for the subscription.
183func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
184	conf, topicName, err := s.s.getSubscriptionConfig(ctx, s.name)
185	if err != nil {
186		return SubscriptionConfig{}, err
187	}
188	conf.Topic = &Topic{
189		s:    s.s,
190		name: topicName,
191	}
192	return conf, nil
193}
194
195// SubscriptionConfigToUpdate describes how to update a subscription.
196type SubscriptionConfigToUpdate struct {
197	// If non-nil, the push config is changed.
198	PushConfig *PushConfig
199}
200
201// Update changes an existing subscription according to the fields set in cfg.
202// It returns the new SubscriptionConfig.
203//
204// Update returns an error if no fields were modified.
205func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
206	if cfg.PushConfig == nil {
207		return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
208	}
209	if err := s.s.modifyPushConfig(ctx, s.name, *cfg.PushConfig); err != nil {
210		return SubscriptionConfig{}, err
211	}
212	return s.Config(ctx)
213}
214
215func (s *Subscription) IAM() *iam.Handle {
216	return s.s.iamHandle(s.name)
217}
218
219// CreateSubscription creates a new subscription on a topic.
220//
221// id is the name of the subscription to create. It must start with a letter,
222// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
223// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
224// must be between 3 and 255 characters in length, and must not start with
225// "goog".
226//
227// cfg.Topic is the topic from which the subscription should receive messages. It
228// need not belong to the same project as the subscription. This field is required.
229//
230// cfg.AckDeadline is the maximum time after a subscriber receives a message before
231// the subscriber should acknowledge the message. It must be between 10 and 600
232// seconds (inclusive), and is rounded down to the nearest second. If the
233// provided ackDeadline is 0, then the default value of 10 seconds is used.
234// Note: messages which are obtained via Subscription.Receive need not be
235// acknowledged within this deadline, as the deadline will be automatically
236// extended.
237//
238// cfg.PushConfig may be set to configure this subscription for push delivery.
239//
240// If the subscription already exists an error will be returned.
241func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
242	if cfg.Topic == nil {
243		return nil, errors.New("pubsub: require non-nil Topic")
244	}
245	if cfg.AckDeadline == 0 {
246		cfg.AckDeadline = 10 * time.Second
247	}
248	if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
249		return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
250	}
251
252	sub := c.Subscription(id)
253	err := c.s.createSubscription(ctx, sub.name, cfg)
254	return sub, err
255}
256
257var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
258
259// Receive calls f with the outstanding messages from the subscription.
260// It blocks until ctx is done, or the service returns a non-retryable error.
261//
262// The standard way to terminate a Receive is to cancel its context:
263//
264//   cctx, cancel := context.WithCancel(ctx)
265//   err := sub.Receive(cctx, callback)
266//   // Call cancel from callback, or another goroutine.
267//
268// If the service returns a non-retryable error, Receive returns that error after
269// all of the outstanding calls to f have returned. If ctx is done, Receive
270// returns nil after all of the outstanding calls to f have returned and
271// all messages have been acknowledged or have expired.
272//
273// Receive calls f concurrently from multiple goroutines. It is encouraged to
274// process messages synchronously in f, even if that processing is relatively
275// time-consuming; Receive will spawn new goroutines for incoming messages,
276// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
277//
278// The context passed to f will be canceled when ctx is Done or there is a
279// fatal service error.
280//
281// Receive will automatically extend the ack deadline of all fetched Messages for the
282// period specified by s.ReceiveSettings.MaxExtension.
283//
284// Each Subscription may have only one invocation of Receive active at a time.
285func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
286	s.mu.Lock()
287	if s.receiveActive {
288		s.mu.Unlock()
289		return errReceiveInProgress
290	}
291	s.receiveActive = true
292	s.mu.Unlock()
293	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
294
295	config, err := s.Config(ctx)
296	if err != nil {
297		if grpc.Code(err) == codes.Canceled {
298			return nil
299		}
300		return err
301	}
302	maxCount := s.ReceiveSettings.MaxOutstandingMessages
303	if maxCount == 0 {
304		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
305	}
306	maxBytes := s.ReceiveSettings.MaxOutstandingBytes
307	if maxBytes == 0 {
308		maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
309	}
310	maxExt := s.ReceiveSettings.MaxExtension
311	if maxExt == 0 {
312		maxExt = DefaultReceiveSettings.MaxExtension
313	} else if maxExt < 0 {
314		// If MaxExtension is negative, disable automatic extension.
315		maxExt = 0
316	}
317	numGoroutines := s.ReceiveSettings.NumGoroutines
318	if numGoroutines < 1 {
319		numGoroutines = DefaultReceiveSettings.NumGoroutines
320	}
321	// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
322	po := &pullOptions{
323		maxExtension: maxExt,
324		maxPrefetch:  trunc32(int64(maxCount)),
325		ackDeadline:  config.AckDeadline,
326	}
327	fc := newFlowController(maxCount, maxBytes)
328
329	// Wait for all goroutines started by Receive to return, so instead of an
330	// obscure goroutine leak we have an obvious blocked call to Receive.
331	group, gctx := errgroup.WithContext(ctx)
332	for i := 0; i < numGoroutines; i++ {
333		group.Go(func() error {
334			return s.receive(gctx, po, fc, f)
335		})
336	}
337	return group.Wait()
338}
339
340func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
341	// Cancel a sub-context when we return, to kick the context-aware callbacks
342	// and the goroutine below.
343	ctx2, cancel := context.WithCancel(ctx)
344	// Call stop when Receive's context is done.
345	// Stop will block until all outstanding messages have been acknowledged
346	// or there was a fatal service error.
347	// The iterator does not use the context passed to Receive. If it did, canceling
348	// that context would immediately stop the iterator without waiting for unacked
349	// messages.
350	iter := newMessageIterator(context.Background(), s.s, s.name, po)
351
352	// We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
353	// and group.Wait cannot be called concurrently with group.Go. We give each receive() its
354	// own WaitGroup instead.
355	// Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
356	// to be called after all Adds.
357	var wg sync.WaitGroup
358	wg.Add(1)
359	go func() {
360		<-ctx2.Done()
361		iter.stop()
362		wg.Done()
363	}()
364	defer wg.Wait()
365
366	defer cancel()
367	for {
368		msgs, err := iter.receive()
369		if err == io.EOF {
370			return nil
371		}
372		if err != nil {
373			return err
374		}
375		for i, msg := range msgs {
376			msg := msg
377			// TODO(jba): call acquire closer to when the message is allocated.
378			if err := fc.acquire(ctx, len(msg.Data)); err != nil {
379				// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
380				for _, m := range msgs[i:] {
381					m.Nack()
382				}
383				return nil
384			}
385			wg.Add(1)
386			go func() {
387				// TODO(jba): call release when the message is available for GC.
388				// This considers the message to be released when
389				// f is finished, but f may ack early or not at all.
390				defer wg.Done()
391				defer fc.release(len(msg.Data))
392				f(ctx2, msg)
393			}()
394		}
395	}
396}
397
398// TODO(jba): remove when we delete messageIterator.
399type pullOptions struct {
400	maxExtension time.Duration
401	maxPrefetch  int32
402	// ackDeadline is the default ack deadline for the subscription. Not
403	// configurable.
404	ackDeadline time.Duration
405}
406