1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"log"
22	"runtime"
23	"strings"
24	"sync"
25	"time"
26
27	"cloud.google.com/go/iam"
28	"github.com/golang/protobuf/proto"
29	gax "github.com/googleapis/gax-go/v2"
30	"go.opencensus.io/stats"
31	"go.opencensus.io/tag"
32	"google.golang.org/api/support/bundler"
33	pb "google.golang.org/genproto/googleapis/pubsub/v1"
34	fmpb "google.golang.org/genproto/protobuf/field_mask"
35	"google.golang.org/grpc"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/status"
38)
39
40const (
41	// MaxPublishRequestCount is the maximum number of messages that can be in
42	// a single publish request, as defined by the PubSub service.
43	MaxPublishRequestCount = 1000
44
45	// MaxPublishRequestBytes is the maximum size of a single publish request
46	// in bytes, as defined by the PubSub service.
47	MaxPublishRequestBytes = 1e7
48)
49
50// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
51var ErrOversizedMessage = bundler.ErrOversizedItem
52
53// Topic is a reference to a PubSub topic.
54//
55// The methods of Topic are safe for use by multiple goroutines.
56type Topic struct {
57	c *Client
58	// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
59	name string
60
61	// Settings for publishing messages. All changes must be made before the
62	// first call to Publish. The default is DefaultPublishSettings.
63	PublishSettings PublishSettings
64
65	mu      sync.RWMutex
66	stopped bool
67	bundler *bundler.Bundler
68}
69
70// PublishSettings control the bundling of published messages.
71type PublishSettings struct {
72
73	// Publish a non-empty batch after this delay has passed.
74	DelayThreshold time.Duration
75
76	// Publish a batch when it has this many messages. The maximum is
77	// MaxPublishRequestCount.
78	CountThreshold int
79
80	// Publish a batch when its size in bytes reaches this value.
81	ByteThreshold int
82
83	// The number of goroutines that invoke the Publish RPC concurrently.
84	//
85	// Defaults to a multiple of GOMAXPROCS.
86	NumGoroutines int
87
88	// The maximum time that the client will attempt to publish a bundle of messages.
89	Timeout time.Duration
90
91	// The maximum number of bytes that the Bundler will keep in memory before
92	// returning ErrOverflow.
93	//
94	// Defaults to DefaultPublishSettings.BufferedByteLimit.
95	BufferedByteLimit int
96}
97
98// DefaultPublishSettings holds the default values for topics' PublishSettings.
99var DefaultPublishSettings = PublishSettings{
100	DelayThreshold: 10 * time.Millisecond,
101	CountThreshold: 100,
102	ByteThreshold:  1e6,
103	Timeout:        60 * time.Second,
104	// By default, limit the bundler to 10 times the max message size. The number 10 is
105	// chosen as a reasonable amount of messages in the worst case whilst still
106	// capping the number to a low enough value to not OOM users.
107	BufferedByteLimit: 10 * MaxPublishRequestBytes,
108}
109
110// CreateTopic creates a new topic.
111//
112// The specified topic ID must start with a letter, and contain only letters
113// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
114// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
115// characters in length, and must not start with "goog". For more information,
116// see: https://cloud.google.com/pubsub/docs/admin#resource_names
117//
118// If the topic already exists an error will be returned.
119func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) {
120	t := c.Topic(topicID)
121	_, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
122	if err != nil {
123		return nil, err
124	}
125	return t, nil
126}
127
128// CreateTopicWithConfig creates a topic from TopicConfig.
129//
130// The specified topic ID must start with a letter, and contain only letters
131// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
132// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
133// characters in length, and must not start with "goog". For more information,
134// see: https://cloud.google.com/pubsub/docs/admin#resource_names.
135//
136// If the topic already exists, an error will be returned.
137func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
138	t := c.Topic(topicID)
139	_, err := c.pubc.CreateTopic(ctx, &pb.Topic{
140		Name:                 t.name,
141		Labels:               tc.Labels,
142		MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
143		KmsKeyName:           tc.KMSKeyName,
144	})
145	if err != nil {
146		return nil, err
147	}
148	return t, nil
149}
150
151// Topic creates a reference to a topic in the client's project.
152//
153// If a Topic's Publish method is called, it has background goroutines
154// associated with it. Clean them up by calling Topic.Stop.
155//
156// Avoid creating many Topic instances if you use them to publish.
157func (c *Client) Topic(id string) *Topic {
158	return c.TopicInProject(id, c.projectID)
159}
160
161// TopicInProject creates a reference to a topic in the given project.
162//
163// If a Topic's Publish method is called, it has background goroutines
164// associated with it. Clean them up by calling Topic.Stop.
165//
166// Avoid creating many Topic instances if you use them to publish.
167func (c *Client) TopicInProject(id, projectID string) *Topic {
168	return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
169}
170
171func newTopic(c *Client, name string) *Topic {
172	return &Topic{
173		c:               c,
174		name:            name,
175		PublishSettings: DefaultPublishSettings,
176	}
177}
178
179// TopicConfig describes the configuration of a topic.
180type TopicConfig struct {
181	// The set of labels for the topic.
182	Labels map[string]string
183
184	// The topic's message storage policy.
185	MessageStoragePolicy MessageStoragePolicy
186
187	// The name of the Cloud KMS key to be used to protect access to messages
188	// published to this topic, in the format
189	// "projects/P/locations/L/keyRings/R/cryptoKeys/K".
190	KMSKeyName string
191}
192
193// TopicConfigToUpdate describes how to update a topic.
194type TopicConfigToUpdate struct {
195	// If non-nil, the current set of labels is completely
196	// replaced by the new set.
197	Labels map[string]string
198
199	// If non-nil, the existing policy (containing the list of regions)
200	// is completely replaced by the new policy.
201	//
202	// Use the zero value &MessageStoragePolicy{} to reset the topic back to
203	// using the organization's Resource Location Restriction policy.
204	//
205	// If nil, the policy remains unchanged.
206	//
207	// This field has beta status. It is not subject to the stability guarantee
208	// and may change.
209	MessageStoragePolicy *MessageStoragePolicy
210}
211
212func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
213	return TopicConfig{
214		Labels:               pbt.Labels,
215		MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
216		KMSKeyName:           pbt.KmsKeyName,
217	}
218}
219
220// MessageStoragePolicy constrains how messages published to the topic may be stored. It
221// is determined when the topic is created based on the policy configured at
222// the project level.
223type MessageStoragePolicy struct {
224	// AllowedPersistenceRegions is the list of GCP regions where messages that are published
225	// to the topic may be persisted in storage. Messages published by publishers running in
226	// non-allowed GCP regions (or running outside of GCP altogether) will be
227	// routed for storage in one of the allowed regions.
228	//
229	// If empty, it indicates a misconfiguration at the project or organization level, which
230	// will result in all Publish operations failing. This field cannot be empty in updates.
231	//
232	// If nil, then the policy is not defined on a topic level. When used in updates, it resets
233	// the regions back to the organization level Resource Location Restriction policy.
234	//
235	// For more information, see
236	// https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
237	AllowedPersistenceRegions []string
238}
239
240func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
241	if msp == nil {
242		return MessageStoragePolicy{}
243	}
244	return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
245}
246
247func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
248	if msp == nil || msp.AllowedPersistenceRegions == nil {
249		return nil
250	}
251	return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
252}
253
254// Config returns the TopicConfig for the topic.
255func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
256	pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
257	if err != nil {
258		return TopicConfig{}, err
259	}
260	return protoToTopicConfig(pbt), nil
261}
262
263// Update changes an existing topic according to the fields set in cfg. It returns
264// the new TopicConfig.
265func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
266	req := t.updateRequest(cfg)
267	if len(req.UpdateMask.Paths) == 0 {
268		return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
269	}
270	rpt, err := t.c.pubc.UpdateTopic(ctx, req)
271	if err != nil {
272		return TopicConfig{}, err
273	}
274	return protoToTopicConfig(rpt), nil
275}
276
277func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
278	pt := &pb.Topic{Name: t.name}
279	var paths []string
280	if cfg.Labels != nil {
281		pt.Labels = cfg.Labels
282		paths = append(paths, "labels")
283	}
284	if cfg.MessageStoragePolicy != nil {
285		pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
286		paths = append(paths, "message_storage_policy")
287	}
288	return &pb.UpdateTopicRequest{
289		Topic:      pt,
290		UpdateMask: &fmpb.FieldMask{Paths: paths},
291	}
292}
293
294// Topics returns an iterator which returns all of the topics for the client's project.
295func (c *Client) Topics(ctx context.Context) *TopicIterator {
296	it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
297	return &TopicIterator{
298		c: c,
299		next: func() (string, error) {
300			topic, err := it.Next()
301			if err != nil {
302				return "", err
303			}
304			return topic.Name, nil
305		},
306	}
307}
308
309// TopicIterator is an iterator that returns a series of topics.
310type TopicIterator struct {
311	c    *Client
312	next func() (string, error)
313}
314
315// Next returns the next topic. If there are no more topics, iterator.Done will be returned.
316func (tps *TopicIterator) Next() (*Topic, error) {
317	topicName, err := tps.next()
318	if err != nil {
319		return nil, err
320	}
321	return newTopic(tps.c, topicName), nil
322}
323
324// ID returns the unique identifier of the topic within its project.
325func (t *Topic) ID() string {
326	slash := strings.LastIndex(t.name, "/")
327	if slash == -1 {
328		// name is not a fully-qualified name.
329		panic("bad topic name")
330	}
331	return t.name[slash+1:]
332}
333
334// String returns the printable globally unique name for the topic.
335func (t *Topic) String() string {
336	return t.name
337}
338
339// Delete deletes the topic.
340func (t *Topic) Delete(ctx context.Context) error {
341	return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
342}
343
344// Exists reports whether the topic exists on the server.
345func (t *Topic) Exists(ctx context.Context) (bool, error) {
346	if t.name == "_deleted-topic_" {
347		return false, nil
348	}
349	_, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
350	if err == nil {
351		return true, nil
352	}
353	if status.Code(err) == codes.NotFound {
354		return false, nil
355	}
356	return false, err
357}
358
359// IAM returns the topic's IAM handle.
360func (t *Topic) IAM() *iam.Handle {
361	return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
362}
363
364// Subscriptions returns an iterator which returns the subscriptions for this topic.
365//
366// Some of the returned subscriptions may belong to a project other than t.
367func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
368	it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
369		Topic: t.name,
370	})
371	return &SubscriptionIterator{
372		c:    t.c,
373		next: it.Next,
374	}
375}
376
377var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
378
379// Publish publishes msg to the topic asynchronously. Messages are batched and
380// sent according to the topic's PublishSettings. Publish never blocks.
381//
382// Publish returns a non-nil PublishResult which will be ready when the
383// message has been sent (or has failed to be sent) to the server.
384//
385// Publish creates goroutines for batching and sending messages. These goroutines
386// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
387// will immediately return a PublishResult with an error.
388func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
389	// Use a PublishRequest with only the Messages field to calculate the size
390	// of an individual message. This accurately calculates the size of the
391	// encoded proto message by accounting for the length of an individual
392	// PubSubMessage and Data/Attributes field.
393	// TODO(hongalex): if this turns out to take significant time, try to approximate it.
394	msg.size = proto.Size(&pb.PublishRequest{
395		Messages: []*pb.PubsubMessage{
396			{
397				Data:       msg.Data,
398				Attributes: msg.Attributes,
399			},
400		},
401	})
402	r := &PublishResult{ready: make(chan struct{})}
403	t.initBundler()
404	t.mu.RLock()
405	defer t.mu.RUnlock()
406	// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
407	if t.stopped {
408		r.set("", errTopicStopped)
409		return r
410	}
411
412	// TODO(jba) [from bcmills] consider using a shared channel per bundle
413	// (requires Bundler API changes; would reduce allocations)
414	err := t.bundler.Add(&bundledMessage{msg, r}, msg.size)
415	if err != nil {
416		r.set("", err)
417	}
418	return r
419}
420
421// Stop sends all remaining published messages and stop goroutines created for handling
422// publishing. Returns once all outstanding messages have been sent or have
423// failed to be sent.
424func (t *Topic) Stop() {
425	t.mu.Lock()
426	noop := t.stopped || t.bundler == nil
427	t.stopped = true
428	t.mu.Unlock()
429	if noop {
430		return
431	}
432	t.bundler.Flush()
433}
434
435// A PublishResult holds the result from a call to Publish.
436type PublishResult struct {
437	ready    chan struct{}
438	serverID string
439	err      error
440}
441
442// Ready returns a channel that is closed when the result is ready.
443// When the Ready channel is closed, Get is guaranteed not to block.
444func (r *PublishResult) Ready() <-chan struct{} { return r.ready }
445
446// Get returns the server-generated message ID and/or error result of a Publish call.
447// Get blocks until the Publish call completes or the context is done.
448func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) {
449	// If the result is already ready, return it even if the context is done.
450	select {
451	case <-r.Ready():
452		return r.serverID, r.err
453	default:
454	}
455	select {
456	case <-ctx.Done():
457		return "", ctx.Err()
458	case <-r.Ready():
459		return r.serverID, r.err
460	}
461}
462
463func (r *PublishResult) set(sid string, err error) {
464	r.serverID = sid
465	r.err = err
466	close(r.ready)
467}
468
469type bundledMessage struct {
470	msg *Message
471	res *PublishResult
472}
473
474func (t *Topic) initBundler() {
475	t.mu.RLock()
476	noop := t.stopped || t.bundler != nil
477	t.mu.RUnlock()
478	if noop {
479		return
480	}
481	t.mu.Lock()
482	defer t.mu.Unlock()
483	// Must re-check, since we released the lock.
484	if t.stopped || t.bundler != nil {
485		return
486	}
487
488	timeout := t.PublishSettings.Timeout
489	t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) {
490		// TODO(jba): use a context detached from the one passed to NewClient.
491		ctx := context.TODO()
492		if timeout != 0 {
493			var cancel func()
494			ctx, cancel = context.WithTimeout(ctx, timeout)
495			defer cancel()
496		}
497		t.publishMessageBundle(ctx, items.([]*bundledMessage))
498	})
499	t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold
500	t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold
501	if t.bundler.BundleCountThreshold > MaxPublishRequestCount {
502		t.bundler.BundleCountThreshold = MaxPublishRequestCount
503	}
504	t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold
505
506	bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
507	if t.PublishSettings.BufferedByteLimit > 0 {
508		bufferedByteLimit = t.PublishSettings.BufferedByteLimit
509	}
510	t.bundler.BufferedByteLimit = bufferedByteLimit
511
512	// Set the bundler's max size per payload, accounting for topic name's overhead.
513	t.bundler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name)
514	// Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently.
515	// The default value was determined via extensive load testing (see the loadtest subdirectory).
516	if t.PublishSettings.NumGoroutines > 0 {
517		t.bundler.HandlerLimit = t.PublishSettings.NumGoroutines
518	} else {
519		t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0)
520	}
521}
522
523func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
524	ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
525	if err != nil {
526		log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err)
527	}
528	pbMsgs := make([]*pb.PubsubMessage, len(bms))
529	for i, bm := range bms {
530		pbMsgs[i] = &pb.PubsubMessage{
531			Data:       bm.msg.Data,
532			Attributes: bm.msg.Attributes,
533		}
534		bm.msg = nil // release bm.msg for GC
535	}
536	start := time.Now()
537	res, err := t.c.pubc.Publish(ctx, &pb.PublishRequest{
538		Topic:    t.name,
539		Messages: pbMsgs,
540	}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)))
541	end := time.Now()
542	if err != nil {
543		// Update context with error tag for OpenCensus,
544		// using same stats.Record() call as success case.
545		ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"),
546			tag.Upsert(keyError, err.Error()))
547	}
548	stats.Record(ctx,
549		PublishLatency.M(float64(end.Sub(start)/time.Millisecond)),
550		PublishedMessages.M(int64(len(bms))))
551	for i, bm := range bms {
552		if err != nil {
553			bm.res.set("", err)
554		} else {
555			bm.res.set(res.MessageIds[i], nil)
556		}
557	}
558}
559