1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "log" 22 "runtime" 23 "strings" 24 "sync" 25 "time" 26 27 "cloud.google.com/go/iam" 28 "github.com/golang/protobuf/proto" 29 gax "github.com/googleapis/gax-go/v2" 30 "go.opencensus.io/stats" 31 "go.opencensus.io/tag" 32 "google.golang.org/api/support/bundler" 33 pb "google.golang.org/genproto/googleapis/pubsub/v1" 34 fmpb "google.golang.org/genproto/protobuf/field_mask" 35 "google.golang.org/grpc" 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/status" 38) 39 40const ( 41 // MaxPublishRequestCount is the maximum number of messages that can be in 42 // a single publish request, as defined by the PubSub service. 43 MaxPublishRequestCount = 1000 44 45 // MaxPublishRequestBytes is the maximum size of a single publish request 46 // in bytes, as defined by the PubSub service. 47 MaxPublishRequestBytes = 1e7 48) 49 50// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes. 51var ErrOversizedMessage = bundler.ErrOversizedItem 52 53// Topic is a reference to a PubSub topic. 54// 55// The methods of Topic are safe for use by multiple goroutines. 56type Topic struct { 57 c *Client 58 // The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>" 59 name string 60 61 // Settings for publishing messages. All changes must be made before the 62 // first call to Publish. The default is DefaultPublishSettings. 63 PublishSettings PublishSettings 64 65 mu sync.RWMutex 66 stopped bool 67 bundler *bundler.Bundler 68} 69 70// PublishSettings control the bundling of published messages. 71type PublishSettings struct { 72 73 // Publish a non-empty batch after this delay has passed. 74 DelayThreshold time.Duration 75 76 // Publish a batch when it has this many messages. The maximum is 77 // MaxPublishRequestCount. 78 CountThreshold int 79 80 // Publish a batch when its size in bytes reaches this value. 81 ByteThreshold int 82 83 // The number of goroutines that invoke the Publish RPC concurrently. 84 // 85 // Defaults to a multiple of GOMAXPROCS. 86 NumGoroutines int 87 88 // The maximum time that the client will attempt to publish a bundle of messages. 89 Timeout time.Duration 90 91 // The maximum number of bytes that the Bundler will keep in memory before 92 // returning ErrOverflow. 93 // 94 // Defaults to DefaultPublishSettings.BufferedByteLimit. 95 BufferedByteLimit int 96} 97 98// DefaultPublishSettings holds the default values for topics' PublishSettings. 99var DefaultPublishSettings = PublishSettings{ 100 DelayThreshold: 10 * time.Millisecond, 101 CountThreshold: 100, 102 ByteThreshold: 1e6, 103 Timeout: 60 * time.Second, 104 // By default, limit the bundler to 10 times the max message size. The number 10 is 105 // chosen as a reasonable amount of messages in the worst case whilst still 106 // capping the number to a low enough value to not OOM users. 107 BufferedByteLimit: 10 * MaxPublishRequestBytes, 108} 109 110// CreateTopic creates a new topic. 111// 112// The specified topic ID must start with a letter, and contain only letters 113// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), 114// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 115// characters in length, and must not start with "goog". For more information, 116// see: https://cloud.google.com/pubsub/docs/admin#resource_names 117// 118// If the topic already exists an error will be returned. 119func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) { 120 t := c.Topic(topicID) 121 _, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name}) 122 if err != nil { 123 return nil, err 124 } 125 return t, nil 126} 127 128// CreateTopicWithConfig creates a topic from TopicConfig. 129// 130// The specified topic ID must start with a letter, and contain only letters 131// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), 132// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 133// characters in length, and must not start with "goog". For more information, 134// see: https://cloud.google.com/pubsub/docs/admin#resource_names. 135// 136// If the topic already exists, an error will be returned. 137func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) { 138 t := c.Topic(topicID) 139 _, err := c.pubc.CreateTopic(ctx, &pb.Topic{ 140 Name: t.name, 141 Labels: tc.Labels, 142 MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), 143 KmsKeyName: tc.KMSKeyName, 144 }) 145 if err != nil { 146 return nil, err 147 } 148 return t, nil 149} 150 151// Topic creates a reference to a topic in the client's project. 152// 153// If a Topic's Publish method is called, it has background goroutines 154// associated with it. Clean them up by calling Topic.Stop. 155// 156// Avoid creating many Topic instances if you use them to publish. 157func (c *Client) Topic(id string) *Topic { 158 return c.TopicInProject(id, c.projectID) 159} 160 161// TopicInProject creates a reference to a topic in the given project. 162// 163// If a Topic's Publish method is called, it has background goroutines 164// associated with it. Clean them up by calling Topic.Stop. 165// 166// Avoid creating many Topic instances if you use them to publish. 167func (c *Client) TopicInProject(id, projectID string) *Topic { 168 return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id)) 169} 170 171func newTopic(c *Client, name string) *Topic { 172 return &Topic{ 173 c: c, 174 name: name, 175 PublishSettings: DefaultPublishSettings, 176 } 177} 178 179// TopicConfig describes the configuration of a topic. 180type TopicConfig struct { 181 // The set of labels for the topic. 182 Labels map[string]string 183 184 // The topic's message storage policy. 185 MessageStoragePolicy MessageStoragePolicy 186 187 // The name of the Cloud KMS key to be used to protect access to messages 188 // published to this topic, in the format 189 // "projects/P/locations/L/keyRings/R/cryptoKeys/K". 190 KMSKeyName string 191} 192 193// TopicConfigToUpdate describes how to update a topic. 194type TopicConfigToUpdate struct { 195 // If non-nil, the current set of labels is completely 196 // replaced by the new set. 197 Labels map[string]string 198 199 // If non-nil, the existing policy (containing the list of regions) 200 // is completely replaced by the new policy. 201 // 202 // Use the zero value &MessageStoragePolicy{} to reset the topic back to 203 // using the organization's Resource Location Restriction policy. 204 // 205 // If nil, the policy remains unchanged. 206 // 207 // This field has beta status. It is not subject to the stability guarantee 208 // and may change. 209 MessageStoragePolicy *MessageStoragePolicy 210} 211 212func protoToTopicConfig(pbt *pb.Topic) TopicConfig { 213 return TopicConfig{ 214 Labels: pbt.Labels, 215 MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), 216 KMSKeyName: pbt.KmsKeyName, 217 } 218} 219 220// MessageStoragePolicy constrains how messages published to the topic may be stored. It 221// is determined when the topic is created based on the policy configured at 222// the project level. 223type MessageStoragePolicy struct { 224 // AllowedPersistenceRegions is the list of GCP regions where messages that are published 225 // to the topic may be persisted in storage. Messages published by publishers running in 226 // non-allowed GCP regions (or running outside of GCP altogether) will be 227 // routed for storage in one of the allowed regions. 228 // 229 // If empty, it indicates a misconfiguration at the project or organization level, which 230 // will result in all Publish operations failing. This field cannot be empty in updates. 231 // 232 // If nil, then the policy is not defined on a topic level. When used in updates, it resets 233 // the regions back to the organization level Resource Location Restriction policy. 234 // 235 // For more information, see 236 // https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations. 237 AllowedPersistenceRegions []string 238} 239 240func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy { 241 if msp == nil { 242 return MessageStoragePolicy{} 243 } 244 return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} 245} 246 247func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy { 248 if msp == nil || msp.AllowedPersistenceRegions == nil { 249 return nil 250 } 251 return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} 252} 253 254// Config returns the TopicConfig for the topic. 255func (t *Topic) Config(ctx context.Context) (TopicConfig, error) { 256 pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) 257 if err != nil { 258 return TopicConfig{}, err 259 } 260 return protoToTopicConfig(pbt), nil 261} 262 263// Update changes an existing topic according to the fields set in cfg. It returns 264// the new TopicConfig. 265func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) { 266 req := t.updateRequest(cfg) 267 if len(req.UpdateMask.Paths) == 0 { 268 return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update") 269 } 270 rpt, err := t.c.pubc.UpdateTopic(ctx, req) 271 if err != nil { 272 return TopicConfig{}, err 273 } 274 return protoToTopicConfig(rpt), nil 275} 276 277func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { 278 pt := &pb.Topic{Name: t.name} 279 var paths []string 280 if cfg.Labels != nil { 281 pt.Labels = cfg.Labels 282 paths = append(paths, "labels") 283 } 284 if cfg.MessageStoragePolicy != nil { 285 pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy) 286 paths = append(paths, "message_storage_policy") 287 } 288 return &pb.UpdateTopicRequest{ 289 Topic: pt, 290 UpdateMask: &fmpb.FieldMask{Paths: paths}, 291 } 292} 293 294// Topics returns an iterator which returns all of the topics for the client's project. 295func (c *Client) Topics(ctx context.Context) *TopicIterator { 296 it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()}) 297 return &TopicIterator{ 298 c: c, 299 next: func() (string, error) { 300 topic, err := it.Next() 301 if err != nil { 302 return "", err 303 } 304 return topic.Name, nil 305 }, 306 } 307} 308 309// TopicIterator is an iterator that returns a series of topics. 310type TopicIterator struct { 311 c *Client 312 next func() (string, error) 313} 314 315// Next returns the next topic. If there are no more topics, iterator.Done will be returned. 316func (tps *TopicIterator) Next() (*Topic, error) { 317 topicName, err := tps.next() 318 if err != nil { 319 return nil, err 320 } 321 return newTopic(tps.c, topicName), nil 322} 323 324// ID returns the unique identifier of the topic within its project. 325func (t *Topic) ID() string { 326 slash := strings.LastIndex(t.name, "/") 327 if slash == -1 { 328 // name is not a fully-qualified name. 329 panic("bad topic name") 330 } 331 return t.name[slash+1:] 332} 333 334// String returns the printable globally unique name for the topic. 335func (t *Topic) String() string { 336 return t.name 337} 338 339// Delete deletes the topic. 340func (t *Topic) Delete(ctx context.Context) error { 341 return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name}) 342} 343 344// Exists reports whether the topic exists on the server. 345func (t *Topic) Exists(ctx context.Context) (bool, error) { 346 if t.name == "_deleted-topic_" { 347 return false, nil 348 } 349 _, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) 350 if err == nil { 351 return true, nil 352 } 353 if status.Code(err) == codes.NotFound { 354 return false, nil 355 } 356 return false, err 357} 358 359// IAM returns the topic's IAM handle. 360func (t *Topic) IAM() *iam.Handle { 361 return iam.InternalNewHandle(t.c.pubc.Connection(), t.name) 362} 363 364// Subscriptions returns an iterator which returns the subscriptions for this topic. 365// 366// Some of the returned subscriptions may belong to a project other than t. 367func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { 368 it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{ 369 Topic: t.name, 370 }) 371 return &SubscriptionIterator{ 372 c: t.c, 373 next: it.Next, 374 } 375} 376 377var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") 378 379// Publish publishes msg to the topic asynchronously. Messages are batched and 380// sent according to the topic's PublishSettings. Publish never blocks. 381// 382// Publish returns a non-nil PublishResult which will be ready when the 383// message has been sent (or has failed to be sent) to the server. 384// 385// Publish creates goroutines for batching and sending messages. These goroutines 386// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish 387// will immediately return a PublishResult with an error. 388func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { 389 // Use a PublishRequest with only the Messages field to calculate the size 390 // of an individual message. This accurately calculates the size of the 391 // encoded proto message by accounting for the length of an individual 392 // PubSubMessage and Data/Attributes field. 393 // TODO(hongalex): if this turns out to take significant time, try to approximate it. 394 msg.size = proto.Size(&pb.PublishRequest{ 395 Messages: []*pb.PubsubMessage{ 396 { 397 Data: msg.Data, 398 Attributes: msg.Attributes, 399 }, 400 }, 401 }) 402 r := &PublishResult{ready: make(chan struct{})} 403 t.initBundler() 404 t.mu.RLock() 405 defer t.mu.RUnlock() 406 // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here 407 if t.stopped { 408 r.set("", errTopicStopped) 409 return r 410 } 411 412 // TODO(jba) [from bcmills] consider using a shared channel per bundle 413 // (requires Bundler API changes; would reduce allocations) 414 err := t.bundler.Add(&bundledMessage{msg, r}, msg.size) 415 if err != nil { 416 r.set("", err) 417 } 418 return r 419} 420 421// Stop sends all remaining published messages and stop goroutines created for handling 422// publishing. Returns once all outstanding messages have been sent or have 423// failed to be sent. 424func (t *Topic) Stop() { 425 t.mu.Lock() 426 noop := t.stopped || t.bundler == nil 427 t.stopped = true 428 t.mu.Unlock() 429 if noop { 430 return 431 } 432 t.bundler.Flush() 433} 434 435// A PublishResult holds the result from a call to Publish. 436type PublishResult struct { 437 ready chan struct{} 438 serverID string 439 err error 440} 441 442// Ready returns a channel that is closed when the result is ready. 443// When the Ready channel is closed, Get is guaranteed not to block. 444func (r *PublishResult) Ready() <-chan struct{} { return r.ready } 445 446// Get returns the server-generated message ID and/or error result of a Publish call. 447// Get blocks until the Publish call completes or the context is done. 448func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) { 449 // If the result is already ready, return it even if the context is done. 450 select { 451 case <-r.Ready(): 452 return r.serverID, r.err 453 default: 454 } 455 select { 456 case <-ctx.Done(): 457 return "", ctx.Err() 458 case <-r.Ready(): 459 return r.serverID, r.err 460 } 461} 462 463func (r *PublishResult) set(sid string, err error) { 464 r.serverID = sid 465 r.err = err 466 close(r.ready) 467} 468 469type bundledMessage struct { 470 msg *Message 471 res *PublishResult 472} 473 474func (t *Topic) initBundler() { 475 t.mu.RLock() 476 noop := t.stopped || t.bundler != nil 477 t.mu.RUnlock() 478 if noop { 479 return 480 } 481 t.mu.Lock() 482 defer t.mu.Unlock() 483 // Must re-check, since we released the lock. 484 if t.stopped || t.bundler != nil { 485 return 486 } 487 488 timeout := t.PublishSettings.Timeout 489 t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) { 490 // TODO(jba): use a context detached from the one passed to NewClient. 491 ctx := context.TODO() 492 if timeout != 0 { 493 var cancel func() 494 ctx, cancel = context.WithTimeout(ctx, timeout) 495 defer cancel() 496 } 497 t.publishMessageBundle(ctx, items.([]*bundledMessage)) 498 }) 499 t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold 500 t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold 501 if t.bundler.BundleCountThreshold > MaxPublishRequestCount { 502 t.bundler.BundleCountThreshold = MaxPublishRequestCount 503 } 504 t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold 505 506 bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit 507 if t.PublishSettings.BufferedByteLimit > 0 { 508 bufferedByteLimit = t.PublishSettings.BufferedByteLimit 509 } 510 t.bundler.BufferedByteLimit = bufferedByteLimit 511 512 // Set the bundler's max size per payload, accounting for topic name's overhead. 513 t.bundler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) 514 // Unless overridden, allow many goroutines per CPU to call the Publish RPC concurrently. 515 // The default value was determined via extensive load testing (see the loadtest subdirectory). 516 if t.PublishSettings.NumGoroutines > 0 { 517 t.bundler.HandlerLimit = t.PublishSettings.NumGoroutines 518 } else { 519 t.bundler.HandlerLimit = 25 * runtime.GOMAXPROCS(0) 520 } 521} 522 523func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { 524 ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) 525 if err != nil { 526 log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err) 527 } 528 pbMsgs := make([]*pb.PubsubMessage, len(bms)) 529 for i, bm := range bms { 530 pbMsgs[i] = &pb.PubsubMessage{ 531 Data: bm.msg.Data, 532 Attributes: bm.msg.Attributes, 533 } 534 bm.msg = nil // release bm.msg for GC 535 } 536 start := time.Now() 537 res, err := t.c.pubc.Publish(ctx, &pb.PublishRequest{ 538 Topic: t.name, 539 Messages: pbMsgs, 540 }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes))) 541 end := time.Now() 542 if err != nil { 543 // Update context with error tag for OpenCensus, 544 // using same stats.Record() call as success case. 545 ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"), 546 tag.Upsert(keyError, err.Error())) 547 } 548 stats.Record(ctx, 549 PublishLatency.M(float64(end.Sub(start)/time.Millisecond)), 550 PublishedMessages.M(int64(len(bms)))) 551 for i, bm := range bms { 552 if err != nil { 553 bm.res.set("", err) 554 } else { 555 bm.res.set(res.MessageIds[i], nil) 556 } 557 } 558} 559