1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Code generated by protoc-gen-go_gapic. DO NOT EDIT. 16 17package pubsub 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "net/url" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 gax "github.com/googleapis/gax-go/v2" 28 "google.golang.org/api/iterator" 29 "google.golang.org/api/option" 30 gtransport "google.golang.org/api/transport/grpc" 31 pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/metadata" 35) 36 37var newPublisherClientHook clientHook 38 39// PublisherCallOptions contains the retry settings for each method of PublisherClient. 40type PublisherCallOptions struct { 41 CreateTopic []gax.CallOption 42 UpdateTopic []gax.CallOption 43 Publish []gax.CallOption 44 GetTopic []gax.CallOption 45 ListTopics []gax.CallOption 46 ListTopicSubscriptions []gax.CallOption 47 ListTopicSnapshots []gax.CallOption 48 DeleteTopic []gax.CallOption 49 DetachSubscription []gax.CallOption 50} 51 52func defaultPublisherClientOptions() []option.ClientOption { 53 return []option.ClientOption{ 54 option.WithEndpoint("pubsub.googleapis.com:443"), 55 option.WithGRPCDialOption(grpc.WithDisableServiceConfig()), 56 option.WithScopes(DefaultAuthScopes()...), 57 option.WithGRPCDialOption(grpc.WithDefaultCallOptions( 58 grpc.MaxCallRecvMsgSize(math.MaxInt32))), 59 } 60} 61 62func defaultPublisherCallOptions() *PublisherCallOptions { 63 return &PublisherCallOptions{ 64 CreateTopic: []gax.CallOption{ 65 gax.WithRetry(func() gax.Retryer { 66 return gax.OnCodes([]codes.Code{ 67 codes.Unavailable, 68 }, gax.Backoff{ 69 Initial: 100 * time.Millisecond, 70 Max: 60000 * time.Millisecond, 71 Multiplier: 1.30, 72 }) 73 }), 74 }, 75 UpdateTopic: []gax.CallOption{ 76 gax.WithRetry(func() gax.Retryer { 77 return gax.OnCodes([]codes.Code{ 78 codes.Unavailable, 79 }, gax.Backoff{ 80 Initial: 100 * time.Millisecond, 81 Max: 60000 * time.Millisecond, 82 Multiplier: 1.30, 83 }) 84 }), 85 }, 86 Publish: []gax.CallOption{ 87 gax.WithRetry(func() gax.Retryer { 88 return gax.OnCodes([]codes.Code{ 89 codes.Aborted, 90 codes.Canceled, 91 codes.Internal, 92 codes.ResourceExhausted, 93 codes.Unknown, 94 codes.Unavailable, 95 codes.DeadlineExceeded, 96 }, gax.Backoff{ 97 Initial: 100 * time.Millisecond, 98 Max: 60000 * time.Millisecond, 99 Multiplier: 1.30, 100 }) 101 }), 102 }, 103 GetTopic: []gax.CallOption{ 104 gax.WithRetry(func() gax.Retryer { 105 return gax.OnCodes([]codes.Code{ 106 codes.Unknown, 107 codes.Aborted, 108 codes.Unavailable, 109 }, gax.Backoff{ 110 Initial: 100 * time.Millisecond, 111 Max: 60000 * time.Millisecond, 112 Multiplier: 1.30, 113 }) 114 }), 115 }, 116 ListTopics: []gax.CallOption{ 117 gax.WithRetry(func() gax.Retryer { 118 return gax.OnCodes([]codes.Code{ 119 codes.Unknown, 120 codes.Aborted, 121 codes.Unavailable, 122 }, gax.Backoff{ 123 Initial: 100 * time.Millisecond, 124 Max: 60000 * time.Millisecond, 125 Multiplier: 1.30, 126 }) 127 }), 128 }, 129 ListTopicSubscriptions: []gax.CallOption{ 130 gax.WithRetry(func() gax.Retryer { 131 return gax.OnCodes([]codes.Code{ 132 codes.Unknown, 133 codes.Aborted, 134 codes.Unavailable, 135 }, gax.Backoff{ 136 Initial: 100 * time.Millisecond, 137 Max: 60000 * time.Millisecond, 138 Multiplier: 1.30, 139 }) 140 }), 141 }, 142 ListTopicSnapshots: []gax.CallOption{ 143 gax.WithRetry(func() gax.Retryer { 144 return gax.OnCodes([]codes.Code{ 145 codes.Unknown, 146 codes.Aborted, 147 codes.Unavailable, 148 }, gax.Backoff{ 149 Initial: 100 * time.Millisecond, 150 Max: 60000 * time.Millisecond, 151 Multiplier: 1.30, 152 }) 153 }), 154 }, 155 DeleteTopic: []gax.CallOption{ 156 gax.WithRetry(func() gax.Retryer { 157 return gax.OnCodes([]codes.Code{ 158 codes.Unavailable, 159 }, gax.Backoff{ 160 Initial: 100 * time.Millisecond, 161 Max: 60000 * time.Millisecond, 162 Multiplier: 1.30, 163 }) 164 }), 165 }, 166 DetachSubscription: []gax.CallOption{ 167 gax.WithRetry(func() gax.Retryer { 168 return gax.OnCodes([]codes.Code{ 169 codes.Unavailable, 170 }, gax.Backoff{ 171 Initial: 100 * time.Millisecond, 172 Max: 60000 * time.Millisecond, 173 Multiplier: 1.30, 174 }) 175 }), 176 }, 177 } 178} 179 180// PublisherClient is a client for interacting with Cloud Pub/Sub API. 181// 182// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls. 183type PublisherClient struct { 184 // Connection pool of gRPC connections to the service. 185 connPool gtransport.ConnPool 186 187 // The gRPC API client. 188 publisherClient pubsubpb.PublisherClient 189 190 // The call options for this service. 191 CallOptions *PublisherCallOptions 192 193 // The x-goog-* metadata to be sent with each request. 194 xGoogMetadata metadata.MD 195} 196 197// NewPublisherClient creates a new publisher client. 198// 199// The service that an application uses to manipulate topics, and to send 200// messages to a topic. 201func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) { 202 clientOpts := defaultPublisherClientOptions() 203 204 if newPublisherClientHook != nil { 205 hookOpts, err := newPublisherClientHook(ctx, clientHookParams{}) 206 if err != nil { 207 return nil, err 208 } 209 clientOpts = append(clientOpts, hookOpts...) 210 } 211 212 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...) 213 if err != nil { 214 return nil, err 215 } 216 c := &PublisherClient{ 217 connPool: connPool, 218 CallOptions: defaultPublisherCallOptions(), 219 220 publisherClient: pubsubpb.NewPublisherClient(connPool), 221 } 222 c.setGoogleClientInfo() 223 224 return c, nil 225} 226 227// Connection returns a connection to the API service. 228// 229// Deprecated. 230func (c *PublisherClient) Connection() *grpc.ClientConn { 231 return c.connPool.Conn() 232} 233 234// Close closes the connection to the API service. The user should invoke this when 235// the client is no longer required. 236func (c *PublisherClient) Close() error { 237 return c.connPool.Close() 238} 239 240// setGoogleClientInfo sets the name and version of the application in 241// the `x-goog-api-client` header passed on each request. Intended for 242// use by Google-written clients. 243func (c *PublisherClient) setGoogleClientInfo(keyval ...string) { 244 kv := append([]string{"gl-go", versionGo()}, keyval...) 245 kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version) 246 c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) 247} 248 249// CreateTopic creates the given topic with the given name. See the 250// resource name rules (at https://cloud.google.com/pubsub/docs/admin#resource_names). 251func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 252 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))) 253 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 254 opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...) 255 var resp *pubsubpb.Topic 256 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 257 var err error 258 resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...) 259 return err 260 }, opts...) 261 if err != nil { 262 return nil, err 263 } 264 return resp, nil 265} 266 267// UpdateTopic updates an existing topic. Note that certain properties of a 268// topic are not modifiable. 269func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 270 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))) 271 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 272 opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...) 273 var resp *pubsubpb.Topic 274 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 275 var err error 276 resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...) 277 return err 278 }, opts...) 279 if err != nil { 280 return nil, err 281 } 282 return resp, nil 283} 284 285// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic 286// does not exist. 287func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) { 288 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 289 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 290 opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...) 291 var resp *pubsubpb.PublishResponse 292 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 293 var err error 294 resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...) 295 return err 296 }, opts...) 297 if err != nil { 298 return nil, err 299 } 300 return resp, nil 301} 302 303// GetTopic gets the configuration of a topic. 304func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 305 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 306 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 307 opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...) 308 var resp *pubsubpb.Topic 309 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 310 var err error 311 resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...) 312 return err 313 }, opts...) 314 if err != nil { 315 return nil, err 316 } 317 return resp, nil 318} 319 320// ListTopics lists matching topics. 321func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator { 322 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject()))) 323 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 324 opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...) 325 it := &TopicIterator{} 326 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest) 327 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) { 328 var resp *pubsubpb.ListTopicsResponse 329 req.PageToken = pageToken 330 if pageSize > math.MaxInt32 { 331 req.PageSize = math.MaxInt32 332 } else { 333 req.PageSize = int32(pageSize) 334 } 335 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 336 var err error 337 resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...) 338 return err 339 }, opts...) 340 if err != nil { 341 return nil, "", err 342 } 343 344 it.Response = resp 345 return resp.Topics, resp.NextPageToken, nil 346 } 347 fetch := func(pageSize int, pageToken string) (string, error) { 348 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 349 if err != nil { 350 return "", err 351 } 352 it.items = append(it.items, items...) 353 return nextPageToken, nil 354 } 355 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 356 it.pageInfo.MaxSize = int(req.PageSize) 357 it.pageInfo.Token = req.PageToken 358 return it 359} 360 361// ListTopicSubscriptions lists the names of the attached subscriptions on this topic. 362func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator { 363 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 364 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 365 opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...) 366 it := &StringIterator{} 367 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest) 368 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 369 var resp *pubsubpb.ListTopicSubscriptionsResponse 370 req.PageToken = pageToken 371 if pageSize > math.MaxInt32 { 372 req.PageSize = math.MaxInt32 373 } else { 374 req.PageSize = int32(pageSize) 375 } 376 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 377 var err error 378 resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...) 379 return err 380 }, opts...) 381 if err != nil { 382 return nil, "", err 383 } 384 385 it.Response = resp 386 return resp.Subscriptions, resp.NextPageToken, nil 387 } 388 fetch := func(pageSize int, pageToken string) (string, error) { 389 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 390 if err != nil { 391 return "", err 392 } 393 it.items = append(it.items, items...) 394 return nextPageToken, nil 395 } 396 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 397 it.pageInfo.MaxSize = int(req.PageSize) 398 it.pageInfo.Token = req.PageToken 399 return it 400} 401 402// ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in 403// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) 404// operations, which allow 405// you to manage message acknowledgments in bulk. That is, you can set the 406// acknowledgment state of messages in an existing subscription to the state 407// captured by a snapshot. 408func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator { 409 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 410 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 411 opts = append(c.CallOptions.ListTopicSnapshots[0:len(c.CallOptions.ListTopicSnapshots):len(c.CallOptions.ListTopicSnapshots)], opts...) 412 it := &StringIterator{} 413 req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest) 414 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 415 var resp *pubsubpb.ListTopicSnapshotsResponse 416 req.PageToken = pageToken 417 if pageSize > math.MaxInt32 { 418 req.PageSize = math.MaxInt32 419 } else { 420 req.PageSize = int32(pageSize) 421 } 422 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 423 var err error 424 resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...) 425 return err 426 }, opts...) 427 if err != nil { 428 return nil, "", err 429 } 430 431 it.Response = resp 432 return resp.Snapshots, resp.NextPageToken, nil 433 } 434 fetch := func(pageSize int, pageToken string) (string, error) { 435 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 436 if err != nil { 437 return "", err 438 } 439 it.items = append(it.items, items...) 440 return nextPageToken, nil 441 } 442 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 443 it.pageInfo.MaxSize = int(req.PageSize) 444 it.pageInfo.Token = req.PageToken 445 return it 446} 447 448// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic 449// does not exist. After a topic is deleted, a new topic may be created with 450// the same name; this is an entirely new topic with none of the old 451// configuration or subscriptions. Existing subscriptions to this topic are 452// not deleted, but their topic field is set to _deleted-topic_. 453func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error { 454 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 455 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 456 opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...) 457 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 458 var err error 459 _, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...) 460 return err 461 }, opts...) 462 return err 463} 464 465// DetachSubscription detaches a subscription from this topic. All messages retained in the 466// subscription are dropped. Subsequent Pull and StreamingPull requests 467// will return FAILED_PRECONDITION. If the subscription is a push 468// subscription, pushes to the endpoint will stop. 469func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) { 470 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription()))) 471 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 472 opts = append(c.CallOptions.DetachSubscription[0:len(c.CallOptions.DetachSubscription):len(c.CallOptions.DetachSubscription)], opts...) 473 var resp *pubsubpb.DetachSubscriptionResponse 474 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 475 var err error 476 resp, err = c.publisherClient.DetachSubscription(ctx, req, settings.GRPC...) 477 return err 478 }, opts...) 479 if err != nil { 480 return nil, err 481 } 482 return resp, nil 483} 484 485// StringIterator manages a stream of string. 486type StringIterator struct { 487 items []string 488 pageInfo *iterator.PageInfo 489 nextFunc func() error 490 491 // Response is the raw response for the current page. 492 // It must be cast to the RPC response type. 493 // Calling Next() or InternalFetch() updates this value. 494 Response interface{} 495 496 // InternalFetch is for use by the Google Cloud Libraries only. 497 // It is not part of the stable interface of this package. 498 // 499 // InternalFetch returns results from a single call to the underlying RPC. 500 // The number of results is no greater than pageSize. 501 // If there are no more results, nextPageToken is empty and err is nil. 502 InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error) 503} 504 505// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 506func (it *StringIterator) PageInfo() *iterator.PageInfo { 507 return it.pageInfo 508} 509 510// Next returns the next result. Its second return value is iterator.Done if there are no more 511// results. Once Next returns Done, all subsequent calls will return Done. 512func (it *StringIterator) Next() (string, error) { 513 var item string 514 if err := it.nextFunc(); err != nil { 515 return item, err 516 } 517 item = it.items[0] 518 it.items = it.items[1:] 519 return item, nil 520} 521 522func (it *StringIterator) bufLen() int { 523 return len(it.items) 524} 525 526func (it *StringIterator) takeBuf() interface{} { 527 b := it.items 528 it.items = nil 529 return b 530} 531 532// TopicIterator manages a stream of *pubsubpb.Topic. 533type TopicIterator struct { 534 items []*pubsubpb.Topic 535 pageInfo *iterator.PageInfo 536 nextFunc func() error 537 538 // Response is the raw response for the current page. 539 // It must be cast to the RPC response type. 540 // Calling Next() or InternalFetch() updates this value. 541 Response interface{} 542 543 // InternalFetch is for use by the Google Cloud Libraries only. 544 // It is not part of the stable interface of this package. 545 // 546 // InternalFetch returns results from a single call to the underlying RPC. 547 // The number of results is no greater than pageSize. 548 // If there are no more results, nextPageToken is empty and err is nil. 549 InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error) 550} 551 552// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 553func (it *TopicIterator) PageInfo() *iterator.PageInfo { 554 return it.pageInfo 555} 556 557// Next returns the next result. Its second return value is iterator.Done if there are no more 558// results. Once Next returns Done, all subsequent calls will return Done. 559func (it *TopicIterator) Next() (*pubsubpb.Topic, error) { 560 var item *pubsubpb.Topic 561 if err := it.nextFunc(); err != nil { 562 return item, err 563 } 564 item = it.items[0] 565 it.items = it.items[1:] 566 return item, nil 567} 568 569func (it *TopicIterator) bufLen() int { 570 return len(it.items) 571} 572 573func (it *TopicIterator) takeBuf() interface{} { 574 b := it.items 575 it.items = nil 576 return b 577} 578