1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "runtime" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "github.com/golang/protobuf/proto" 28 gax "github.com/googleapis/gax-go/v2" 29 "google.golang.org/api/support/bundler" 30 pb "google.golang.org/genproto/googleapis/pubsub/v1" 31 fmpb "google.golang.org/genproto/protobuf/field_mask" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/codes" 34) 35 36const ( 37 // MaxPublishRequestCount is the maximum number of messages that can be in 38 // a single publish request, as defined by the PubSub service. 39 MaxPublishRequestCount = 1000 40 41 // MaxPublishRequestBytes is the maximum size of a single publish request 42 // in bytes, as defined by the PubSub service. 43 MaxPublishRequestBytes = 1e7 44) 45 46// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes. 47var ErrOversizedMessage = bundler.ErrOversizedItem 48 49// Topic is a reference to a PubSub topic. 50// 51// The methods of Topic are safe for use by multiple goroutines. 52type Topic struct { 53 c *Client 54 // The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>" 55 name string 56 57 // Settings for publishing messages. All changes must be made before the 58 // first call to Publish. The default is DefaultPublishSettings. 59 PublishSettings PublishSettings 60 61 mu sync.RWMutex 62 stopped bool 63 bundler *bundler.Bundler 64} 65 66// PublishSettings control the bundling of published messages. 67type PublishSettings struct { 68 69 // Publish a non-empty batch after this delay has passed. 70 DelayThreshold time.Duration 71 72 // Publish a batch when it has this many messages. The maximum is 73 // MaxPublishRequestCount. 74 CountThreshold int 75 76 // Publish a batch when its size in bytes reaches this value. 77 ByteThreshold int 78 79 // The number of goroutines that invoke the Publish RPC concurrently. 80 // 81 // Defaults to a multiple of GOMAXPROCS. 82 NumGoroutines int 83 84 // The maximum time that the client will attempt to publish a bundle of messages. 85 Timeout time.Duration 86 87 // The maximum number of bytes that the Bundler will keep in memory before 88 // returning ErrOverflow. 89 // 90 // Defaults to DefaultPublishSettings.BufferedByteLimit. 91 BufferedByteLimit int 92} 93 94// DefaultPublishSettings holds the default values for topics' PublishSettings. 95var DefaultPublishSettings = PublishSettings{ 96 DelayThreshold: 1 * time.Millisecond, 97 CountThreshold: 100, 98 ByteThreshold: 1e6, 99 Timeout: 60 * time.Second, 100 // By default, limit the bundler to 10 times the max message size. The number 10 is 101 // chosen as a reasonable amount of messages in the worst case whilst still 102 // capping the number to a low enough value to not OOM users. 103 BufferedByteLimit: 10 * MaxPublishRequestBytes, 104} 105 106// CreateTopic creates a new topic. 107// The specified topic ID must start with a letter, and contain only letters 108// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), 109// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 110// characters in length, and must not start with "goog". 111// If the topic already exists an error will be returned. 112func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error) { 113 t := c.Topic(id) 114 _, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name}) 115 if err != nil { 116 return nil, err 117 } 118 return t, nil 119} 120 121// Topic creates a reference to a topic in the client's project. 122// 123// If a Topic's Publish method is called, it has background goroutines 124// associated with it. Clean them up by calling Topic.Stop. 125// 126// Avoid creating many Topic instances if you use them to publish. 127func (c *Client) Topic(id string) *Topic { 128 return c.TopicInProject(id, c.projectID) 129} 130 131// TopicInProject creates a reference to a topic in the given project. 132// 133// If a Topic's Publish method is called, it has background goroutines 134// associated with it. Clean them up by calling Topic.Stop. 135// 136// Avoid creating many Topic instances if you use them to publish. 137func (c *Client) TopicInProject(id, projectID string) *Topic { 138 return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id)) 139} 140 141func newTopic(c *Client, name string) *Topic { 142 return &Topic{ 143 c: c, 144 name: name, 145 PublishSettings: DefaultPublishSettings, 146 } 147} 148 149// TopicConfig describes the configuration of a topic. 150type TopicConfig struct { 151 // The set of labels for the topic. 152 Labels map[string]string 153 // The topic's message storage policy. 154 MessageStoragePolicy MessageStoragePolicy 155} 156 157// TopicConfigToUpdate describes how to update a topic. 158type TopicConfigToUpdate struct { 159 // If non-nil, the current set of labels is completely 160 // replaced by the new set. 161 // This field has beta status. It is not subject to the stability guarantee 162 // and may change. 163 Labels map[string]string 164} 165 166func protoToTopicConfig(pbt *pb.Topic) TopicConfig { 167 return TopicConfig{ 168 Labels: pbt.Labels, 169 MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), 170 } 171} 172 173// MessageStoragePolicy constrains how messages published to the topic may be stored. It 174// is determined when the topic is created based on the policy configured at 175// the project level. 176type MessageStoragePolicy struct { 177 // The list of GCP regions where messages that are published to the topic may 178 // be persisted in storage. Messages published by publishers running in 179 // non-allowed GCP regions (or running outside of GCP altogether) will be 180 // routed for storage in one of the allowed regions. An empty list indicates a 181 // misconfiguration at the project or organization level, which will result in 182 // all Publish operations failing. 183 AllowedPersistenceRegions []string 184} 185 186func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy { 187 if msp == nil { 188 return MessageStoragePolicy{} 189 } 190 return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} 191} 192 193// Config returns the TopicConfig for the topic. 194func (t *Topic) Config(ctx context.Context) (TopicConfig, error) { 195 pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) 196 if err != nil { 197 return TopicConfig{}, err 198 } 199 return protoToTopicConfig(pbt), nil 200} 201 202// Update changes an existing topic according to the fields set in cfg. It returns 203// the new TopicConfig. 204// 205// Any call to Update (even with an empty TopicConfigToUpdate) will update the 206// MessageStoragePolicy for the topic from the organization's settings. 207func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) { 208 req := t.updateRequest(cfg) 209 if len(req.UpdateMask.Paths) == 0 { 210 return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update") 211 } 212 rpt, err := t.c.pubc.UpdateTopic(ctx, req) 213 if err != nil { 214 return TopicConfig{}, err 215 } 216 return protoToTopicConfig(rpt), nil 217} 218 219func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { 220 pt := &pb.Topic{Name: t.name} 221 paths := []string{"message_storage_policy"} // always fetch 222 if cfg.Labels != nil { 223 pt.Labels = cfg.Labels 224 paths = append(paths, "labels") 225 } 226 return &pb.UpdateTopicRequest{ 227 Topic: pt, 228 UpdateMask: &fmpb.FieldMask{Paths: paths}, 229 } 230} 231 232// Topics returns an iterator which returns all of the topics for the client's project. 233func (c *Client) Topics(ctx context.Context) *TopicIterator { 234 it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()}) 235 return &TopicIterator{ 236 c: c, 237 next: func() (string, error) { 238 topic, err := it.Next() 239 if err != nil { 240 return "", err 241 } 242 return topic.Name, nil 243 }, 244 } 245} 246 247// TopicIterator is an iterator that returns a series of topics. 248type TopicIterator struct { 249 c *Client 250 next func() (string, error) 251} 252 253// Next returns the next topic. If there are no more topics, iterator.Done will be returned. 254func (tps *TopicIterator) Next() (*Topic, error) { 255 topicName, err := tps.next() 256 if err != nil { 257 return nil, err 258 } 259 return newTopic(tps.c, topicName), nil 260} 261 262// ID returns the unique identifier of the topic within its project. 263func (t *Topic) ID() string { 264 slash := strings.LastIndex(t.name, "/") 265 if slash == -1 { 266 // name is not a fully-qualified name. 267 panic("bad topic name") 268 } 269 return t.name[slash+1:] 270} 271 272// String returns the printable globally unique name for the topic. 273func (t *Topic) String() string { 274 return t.name 275} 276 277// Delete deletes the topic. 278func (t *Topic) Delete(ctx context.Context) error { 279 return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name}) 280} 281 282// Exists reports whether the topic exists on the server. 283func (t *Topic) Exists(ctx context.Context) (bool, error) { 284 if t.name == "_deleted-topic_" { 285 return false, nil 286 } 287 _, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) 288 if err == nil { 289 return true, nil 290 } 291 if grpc.Code(err) == codes.NotFound { 292 return false, nil 293 } 294 return false, err 295} 296 297// IAM returns the topic's IAM handle. 298func (t *Topic) IAM() *iam.Handle { 299 return iam.InternalNewHandle(t.c.pubc.Connection(), t.name) 300} 301 302// Subscriptions returns an iterator which returns the subscriptions for this topic. 303// 304// Some of the returned subscriptions may belong to a project other than t. 305func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { 306 it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{ 307 Topic: t.name, 308 }) 309 return &SubscriptionIterator{ 310 c: t.c, 311 next: it.Next, 312 } 313} 314 315var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") 316 317// Publish publishes msg to the topic asynchronously. Messages are batched and 318// sent according to the topic's PublishSettings. Publish never blocks. 319// 320// Publish returns a non-nil PublishResult which will be ready when the 321// message has been sent (or has failed to be sent) to the server. 322// 323// Publish creates goroutines for batching and sending messages. These goroutines 324// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish 325// will immediately return a PublishResult with an error. 326func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { 327 // TODO(jba): if this turns out to take significant time, try to approximate it. 328 // Or, convert the messages to protos in Publish, instead of in the service. 329 msg.size = proto.Size(&pb.PubsubMessage{ 330 Data: msg.Data, 331 Attributes: msg.Attributes, 332 }) 333 r := &PublishResult{ready: make(chan struct{})} 334 t.initBundler() 335 t.mu.RLock() 336 defer t.mu.RUnlock() 337 // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here 338 if t.stopped { 339 r.set("", errTopicStopped) 340 return r 341 } 342 343 // TODO(jba) [from bcmills] consider using a shared channel per bundle 344 // (requires Bundler API changes; would reduce allocations) 345 err := t.bundler.Add(&bundledMessage{msg, r}, msg.size) 346 if err != nil { 347 r.set("", err) 348 } 349 return r 350} 351 352// Stop sends all remaining published messages and stop goroutines created for handling 353// publishing. Returns once all outstanding messages have been sent or have 354// failed to be sent. 355func (t *Topic) Stop() { 356 t.mu.Lock() 357 noop := t.stopped || t.bundler == nil 358 t.stopped = true 359 t.mu.Unlock() 360 if noop { 361 return 362 } 363 t.bundler.Flush() 364} 365 366// A PublishResult holds the result from a call to Publish. 367type PublishResult struct { 368 ready chan struct{} 369 serverID string 370 err error 371} 372 373// Ready returns a channel that is closed when the result is ready. 374// When the Ready channel is closed, Get is guaranteed not to block. 375func (r *PublishResult) Ready() <-chan struct{} { return r.ready } 376 377// Get returns the server-generated message ID and/or error result of a Publish call. 378// Get blocks until the Publish call completes or the context is done. 379func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) { 380 // If the result is already ready, return it even if the context is done. 381 select { 382 case <-r.Ready(): 383 return r.serverID, r.err 384 default: 385 } 386 select { 387 case <-ctx.Done(): 388 return "", ctx.Err() 389 case <-r.Ready(): 390 return r.serverID, r.err 391 } 392} 393 394func (r *PublishResult) set(sid string, err error) { 395 r.serverID = sid 396 r.err = err 397 close(r.ready) 398} 399 400type bundledMessage struct { 401 msg *Message 402 res *PublishResult 403} 404 405func (t *Topic) initBundler() { 406 t.mu.RLock() 407 noop := t.stopped || t.bundler != nil 408 t.mu.RUnlock() 409 if noop { 410 return 411 } 412 t.mu.Lock() 413 defer t.mu.Unlock() 414 // Must re-check, since we released the lock. 415 if t.stopped || t.bundler != nil { 416 return 417 } 418 419 timeout := t.PublishSettings.Timeout 420 t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) { 421 // TODO(jba): use a context detached from the one passed to NewClient. 422 ctx := context.TODO() 423 if timeout != 0 { 424 var cancel func() 425 ctx, cancel = context.WithTimeout(ctx, timeout) 426 defer cancel() 427 } 428 t.publishMessageBundle(ctx, items.([]*bundledMessage)) 429 }) 430 t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold 431 t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold 432 if t.bundler.BundleCountThreshold > MaxPublishRequestCount { 433 t.bundler.BundleCountThreshold = MaxPublishRequestCount 434 } 435 t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold 436 437 bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit 438 if t.PublishSettings.BufferedByteLimit > 0 { 439 bufferedByteLimit = t.PublishSettings.BufferedByteLimit 440 } 441 t.bundler.BufferedByteLimit = bufferedByteLimit 442 443 t.bundler.BundleByteLimit = MaxPublishRequestBytes 444 // Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently. 445 // The default value was determined via extensive load testing (see the loadtest subdirectory). 446 if t.PublishSettings.NumGoroutines > 0 { 447 t.bundler.HandlerLimit = t.PublishSettings.NumGoroutines 448 } else { 449 t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0) 450 } 451} 452 453func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { 454 pbMsgs := make([]*pb.PubsubMessage, len(bms)) 455 for i, bm := range bms { 456 pbMsgs[i] = &pb.PubsubMessage{ 457 Data: bm.msg.Data, 458 Attributes: bm.msg.Attributes, 459 } 460 bm.msg = nil // release bm.msg for GC 461 } 462 res, err := t.c.pubc.Publish(ctx, &pb.PublishRequest{ 463 Topic: t.name, 464 Messages: pbMsgs, 465 }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes))) 466 for i, bm := range bms { 467 if err != nil { 468 bm.res.set("", err) 469 } else { 470 bm.res.set(res.MessageIds[i], nil) 471 } 472 } 473} 474