1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"runtime"
22	"strings"
23	"sync"
24	"time"
25
26	"cloud.google.com/go/iam"
27	"github.com/golang/protobuf/proto"
28	gax "github.com/googleapis/gax-go/v2"
29	"google.golang.org/api/support/bundler"
30	pb "google.golang.org/genproto/googleapis/pubsub/v1"
31	fmpb "google.golang.org/genproto/protobuf/field_mask"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/codes"
34)
35
36const (
37	// MaxPublishRequestCount is the maximum number of messages that can be in
38	// a single publish request, as defined by the PubSub service.
39	MaxPublishRequestCount = 1000
40
41	// MaxPublishRequestBytes is the maximum size of a single publish request
42	// in bytes, as defined by the PubSub service.
43	MaxPublishRequestBytes = 1e7
44)
45
46// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
47var ErrOversizedMessage = bundler.ErrOversizedItem
48
49// Topic is a reference to a PubSub topic.
50//
51// The methods of Topic are safe for use by multiple goroutines.
52type Topic struct {
53	c *Client
54	// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
55	name string
56
57	// Settings for publishing messages. All changes must be made before the
58	// first call to Publish. The default is DefaultPublishSettings.
59	PublishSettings PublishSettings
60
61	mu      sync.RWMutex
62	stopped bool
63	bundler *bundler.Bundler
64}
65
66// PublishSettings control the bundling of published messages.
67type PublishSettings struct {
68
69	// Publish a non-empty batch after this delay has passed.
70	DelayThreshold time.Duration
71
72	// Publish a batch when it has this many messages. The maximum is
73	// MaxPublishRequestCount.
74	CountThreshold int
75
76	// Publish a batch when its size in bytes reaches this value.
77	ByteThreshold int
78
79	// The number of goroutines that invoke the Publish RPC concurrently.
80	//
81	// Defaults to a multiple of GOMAXPROCS.
82	NumGoroutines int
83
84	// The maximum time that the client will attempt to publish a bundle of messages.
85	Timeout time.Duration
86
87	// The maximum number of bytes that the Bundler will keep in memory before
88	// returning ErrOverflow.
89	//
90	// Defaults to DefaultPublishSettings.BufferedByteLimit.
91	BufferedByteLimit int
92}
93
94// DefaultPublishSettings holds the default values for topics' PublishSettings.
95var DefaultPublishSettings = PublishSettings{
96	DelayThreshold: 1 * time.Millisecond,
97	CountThreshold: 100,
98	ByteThreshold:  1e6,
99	Timeout:        60 * time.Second,
100	// By default, limit the bundler to 10 times the max message size. The number 10 is
101	// chosen as a reasonable amount of messages in the worst case whilst still
102	// capping the number to a low enough value to not OOM users.
103	BufferedByteLimit: 10 * MaxPublishRequestBytes,
104}
105
106// CreateTopic creates a new topic.
107// The specified topic ID must start with a letter, and contain only letters
108// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
109// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
110// characters in length, and must not start with "goog".
111// If the topic already exists an error will be returned.
112func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error) {
113	t := c.Topic(id)
114	_, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
115	if err != nil {
116		return nil, err
117	}
118	return t, nil
119}
120
121// Topic creates a reference to a topic in the client's project.
122//
123// If a Topic's Publish method is called, it has background goroutines
124// associated with it. Clean them up by calling Topic.Stop.
125//
126// Avoid creating many Topic instances if you use them to publish.
127func (c *Client) Topic(id string) *Topic {
128	return c.TopicInProject(id, c.projectID)
129}
130
131// TopicInProject creates a reference to a topic in the given project.
132//
133// If a Topic's Publish method is called, it has background goroutines
134// associated with it. Clean them up by calling Topic.Stop.
135//
136// Avoid creating many Topic instances if you use them to publish.
137func (c *Client) TopicInProject(id, projectID string) *Topic {
138	return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
139}
140
141func newTopic(c *Client, name string) *Topic {
142	return &Topic{
143		c:               c,
144		name:            name,
145		PublishSettings: DefaultPublishSettings,
146	}
147}
148
149// TopicConfig describes the configuration of a topic.
150type TopicConfig struct {
151	// The set of labels for the topic.
152	Labels map[string]string
153	// The topic's message storage policy.
154	MessageStoragePolicy MessageStoragePolicy
155}
156
157// TopicConfigToUpdate describes how to update a topic.
158type TopicConfigToUpdate struct {
159	// If non-nil, the current set of labels is completely
160	// replaced by the new set.
161	// This field has beta status. It is not subject to the stability guarantee
162	// and may change.
163	Labels map[string]string
164}
165
166func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
167	return TopicConfig{
168		Labels:               pbt.Labels,
169		MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
170	}
171}
172
173// MessageStoragePolicy constrains how messages published to the topic may be stored. It
174// is determined when the topic is created based on the policy configured at
175// the project level.
176type MessageStoragePolicy struct {
177	// The list of GCP regions where messages that are published to the topic may
178	// be persisted in storage. Messages published by publishers running in
179	// non-allowed GCP regions (or running outside of GCP altogether) will be
180	// routed for storage in one of the allowed regions. An empty list indicates a
181	// misconfiguration at the project or organization level, which will result in
182	// all Publish operations failing.
183	AllowedPersistenceRegions []string
184}
185
186func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
187	if msp == nil {
188		return MessageStoragePolicy{}
189	}
190	return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
191}
192
193// Config returns the TopicConfig for the topic.
194func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
195	pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
196	if err != nil {
197		return TopicConfig{}, err
198	}
199	return protoToTopicConfig(pbt), nil
200}
201
202// Update changes an existing topic according to the fields set in cfg. It returns
203// the new TopicConfig.
204//
205// Any call to Update (even with an empty TopicConfigToUpdate) will update the
206// MessageStoragePolicy for the topic from the organization's settings.
207func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
208	req := t.updateRequest(cfg)
209	if len(req.UpdateMask.Paths) == 0 {
210		return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
211	}
212	rpt, err := t.c.pubc.UpdateTopic(ctx, req)
213	if err != nil {
214		return TopicConfig{}, err
215	}
216	return protoToTopicConfig(rpt), nil
217}
218
219func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
220	pt := &pb.Topic{Name: t.name}
221	paths := []string{"message_storage_policy"} // always fetch
222	if cfg.Labels != nil {
223		pt.Labels = cfg.Labels
224		paths = append(paths, "labels")
225	}
226	return &pb.UpdateTopicRequest{
227		Topic:      pt,
228		UpdateMask: &fmpb.FieldMask{Paths: paths},
229	}
230}
231
232// Topics returns an iterator which returns all of the topics for the client's project.
233func (c *Client) Topics(ctx context.Context) *TopicIterator {
234	it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
235	return &TopicIterator{
236		c: c,
237		next: func() (string, error) {
238			topic, err := it.Next()
239			if err != nil {
240				return "", err
241			}
242			return topic.Name, nil
243		},
244	}
245}
246
247// TopicIterator is an iterator that returns a series of topics.
248type TopicIterator struct {
249	c    *Client
250	next func() (string, error)
251}
252
253// Next returns the next topic. If there are no more topics, iterator.Done will be returned.
254func (tps *TopicIterator) Next() (*Topic, error) {
255	topicName, err := tps.next()
256	if err != nil {
257		return nil, err
258	}
259	return newTopic(tps.c, topicName), nil
260}
261
262// ID returns the unique identifier of the topic within its project.
263func (t *Topic) ID() string {
264	slash := strings.LastIndex(t.name, "/")
265	if slash == -1 {
266		// name is not a fully-qualified name.
267		panic("bad topic name")
268	}
269	return t.name[slash+1:]
270}
271
272// String returns the printable globally unique name for the topic.
273func (t *Topic) String() string {
274	return t.name
275}
276
277// Delete deletes the topic.
278func (t *Topic) Delete(ctx context.Context) error {
279	return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
280}
281
282// Exists reports whether the topic exists on the server.
283func (t *Topic) Exists(ctx context.Context) (bool, error) {
284	if t.name == "_deleted-topic_" {
285		return false, nil
286	}
287	_, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
288	if err == nil {
289		return true, nil
290	}
291	if grpc.Code(err) == codes.NotFound {
292		return false, nil
293	}
294	return false, err
295}
296
297// IAM returns the topic's IAM handle.
298func (t *Topic) IAM() *iam.Handle {
299	return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
300}
301
302// Subscriptions returns an iterator which returns the subscriptions for this topic.
303//
304// Some of the returned subscriptions may belong to a project other than t.
305func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
306	it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
307		Topic: t.name,
308	})
309	return &SubscriptionIterator{
310		c:    t.c,
311		next: it.Next,
312	}
313}
314
315var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
316
317// Publish publishes msg to the topic asynchronously. Messages are batched and
318// sent according to the topic's PublishSettings. Publish never blocks.
319//
320// Publish returns a non-nil PublishResult which will be ready when the
321// message has been sent (or has failed to be sent) to the server.
322//
323// Publish creates goroutines for batching and sending messages. These goroutines
324// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
325// will immediately return a PublishResult with an error.
326func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
327	// TODO(jba): if this turns out to take significant time, try to approximate it.
328	// Or, convert the messages to protos in Publish, instead of in the service.
329	msg.size = proto.Size(&pb.PubsubMessage{
330		Data:       msg.Data,
331		Attributes: msg.Attributes,
332	})
333	r := &PublishResult{ready: make(chan struct{})}
334	t.initBundler()
335	t.mu.RLock()
336	defer t.mu.RUnlock()
337	// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
338	if t.stopped {
339		r.set("", errTopicStopped)
340		return r
341	}
342
343	// TODO(jba) [from bcmills] consider using a shared channel per bundle
344	// (requires Bundler API changes; would reduce allocations)
345	err := t.bundler.Add(&bundledMessage{msg, r}, msg.size)
346	if err != nil {
347		r.set("", err)
348	}
349	return r
350}
351
352// Stop sends all remaining published messages and stop goroutines created for handling
353// publishing. Returns once all outstanding messages have been sent or have
354// failed to be sent.
355func (t *Topic) Stop() {
356	t.mu.Lock()
357	noop := t.stopped || t.bundler == nil
358	t.stopped = true
359	t.mu.Unlock()
360	if noop {
361		return
362	}
363	t.bundler.Flush()
364}
365
366// A PublishResult holds the result from a call to Publish.
367type PublishResult struct {
368	ready    chan struct{}
369	serverID string
370	err      error
371}
372
373// Ready returns a channel that is closed when the result is ready.
374// When the Ready channel is closed, Get is guaranteed not to block.
375func (r *PublishResult) Ready() <-chan struct{} { return r.ready }
376
377// Get returns the server-generated message ID and/or error result of a Publish call.
378// Get blocks until the Publish call completes or the context is done.
379func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) {
380	// If the result is already ready, return it even if the context is done.
381	select {
382	case <-r.Ready():
383		return r.serverID, r.err
384	default:
385	}
386	select {
387	case <-ctx.Done():
388		return "", ctx.Err()
389	case <-r.Ready():
390		return r.serverID, r.err
391	}
392}
393
394func (r *PublishResult) set(sid string, err error) {
395	r.serverID = sid
396	r.err = err
397	close(r.ready)
398}
399
400type bundledMessage struct {
401	msg *Message
402	res *PublishResult
403}
404
405func (t *Topic) initBundler() {
406	t.mu.RLock()
407	noop := t.stopped || t.bundler != nil
408	t.mu.RUnlock()
409	if noop {
410		return
411	}
412	t.mu.Lock()
413	defer t.mu.Unlock()
414	// Must re-check, since we released the lock.
415	if t.stopped || t.bundler != nil {
416		return
417	}
418
419	timeout := t.PublishSettings.Timeout
420	t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) {
421		// TODO(jba): use a context detached from the one passed to NewClient.
422		ctx := context.TODO()
423		if timeout != 0 {
424			var cancel func()
425			ctx, cancel = context.WithTimeout(ctx, timeout)
426			defer cancel()
427		}
428		t.publishMessageBundle(ctx, items.([]*bundledMessage))
429	})
430	t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold
431	t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold
432	if t.bundler.BundleCountThreshold > MaxPublishRequestCount {
433		t.bundler.BundleCountThreshold = MaxPublishRequestCount
434	}
435	t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold
436
437	bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
438	if t.PublishSettings.BufferedByteLimit > 0 {
439		bufferedByteLimit = t.PublishSettings.BufferedByteLimit
440	}
441	t.bundler.BufferedByteLimit = bufferedByteLimit
442
443	t.bundler.BundleByteLimit = MaxPublishRequestBytes
444	// Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently.
445	// The default value was determined via extensive load testing (see the loadtest subdirectory).
446	if t.PublishSettings.NumGoroutines > 0 {
447		t.bundler.HandlerLimit = t.PublishSettings.NumGoroutines
448	} else {
449		t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
450	}
451}
452
453func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
454	pbMsgs := make([]*pb.PubsubMessage, len(bms))
455	for i, bm := range bms {
456		pbMsgs[i] = &pb.PubsubMessage{
457			Data:       bm.msg.Data,
458			Attributes: bm.msg.Attributes,
459		}
460		bm.msg = nil // release bm.msg for GC
461	}
462	res, err := t.c.pubc.Publish(ctx, &pb.PublishRequest{
463		Topic:    t.name,
464		Messages: pbMsgs,
465	}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)))
466	for i, bm := range bms {
467		if err != nil {
468			bm.res.set("", err)
469		} else {
470			bm.res.set(res.MessageIds[i], nil)
471		}
472	}
473}
474