1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Code generated by protoc-gen-go_gapic. DO NOT EDIT. 16 17package pubsub 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "net/url" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 gax "github.com/googleapis/gax-go/v2" 28 "google.golang.org/api/iterator" 29 "google.golang.org/api/option" 30 gtransport "google.golang.org/api/transport/grpc" 31 pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/metadata" 35) 36 37var newPublisherClientHook clientHook 38 39// PublisherCallOptions contains the retry settings for each method of PublisherClient. 40type PublisherCallOptions struct { 41 CreateTopic []gax.CallOption 42 UpdateTopic []gax.CallOption 43 Publish []gax.CallOption 44 GetTopic []gax.CallOption 45 ListTopics []gax.CallOption 46 ListTopicSubscriptions []gax.CallOption 47 ListTopicSnapshots []gax.CallOption 48 DeleteTopic []gax.CallOption 49} 50 51func defaultPublisherClientOptions() []option.ClientOption { 52 return []option.ClientOption{ 53 option.WithEndpoint("pubsub.googleapis.com:443"), 54 option.WithGRPCDialOption(grpc.WithDisableServiceConfig()), 55 option.WithScopes(DefaultAuthScopes()...), 56 option.WithGRPCDialOption(grpc.WithDefaultCallOptions( 57 grpc.MaxCallRecvMsgSize(math.MaxInt32))), 58 } 59} 60 61func defaultPublisherCallOptions() *PublisherCallOptions { 62 return &PublisherCallOptions{ 63 CreateTopic: []gax.CallOption{ 64 gax.WithRetry(func() gax.Retryer { 65 return gax.OnCodes([]codes.Code{ 66 codes.Unavailable, 67 }, gax.Backoff{ 68 Initial: 100 * time.Millisecond, 69 Max: 60000 * time.Millisecond, 70 Multiplier: 1.30, 71 }) 72 }), 73 }, 74 UpdateTopic: []gax.CallOption{ 75 gax.WithRetry(func() gax.Retryer { 76 return gax.OnCodes([]codes.Code{ 77 codes.Unavailable, 78 }, gax.Backoff{ 79 Initial: 100 * time.Millisecond, 80 Max: 60000 * time.Millisecond, 81 Multiplier: 1.30, 82 }) 83 }), 84 }, 85 Publish: []gax.CallOption{ 86 gax.WithRetry(func() gax.Retryer { 87 return gax.OnCodes([]codes.Code{ 88 codes.Aborted, 89 codes.Canceled, 90 codes.Internal, 91 codes.ResourceExhausted, 92 codes.Unknown, 93 codes.Unavailable, 94 codes.DeadlineExceeded, 95 }, gax.Backoff{ 96 Initial: 100 * time.Millisecond, 97 Max: 60000 * time.Millisecond, 98 Multiplier: 1.30, 99 }) 100 }), 101 }, 102 GetTopic: []gax.CallOption{ 103 gax.WithRetry(func() gax.Retryer { 104 return gax.OnCodes([]codes.Code{ 105 codes.Unknown, 106 codes.Aborted, 107 codes.Unavailable, 108 }, gax.Backoff{ 109 Initial: 100 * time.Millisecond, 110 Max: 60000 * time.Millisecond, 111 Multiplier: 1.30, 112 }) 113 }), 114 }, 115 ListTopics: []gax.CallOption{ 116 gax.WithRetry(func() gax.Retryer { 117 return gax.OnCodes([]codes.Code{ 118 codes.Unknown, 119 codes.Aborted, 120 codes.Unavailable, 121 }, gax.Backoff{ 122 Initial: 100 * time.Millisecond, 123 Max: 60000 * time.Millisecond, 124 Multiplier: 1.30, 125 }) 126 }), 127 }, 128 ListTopicSubscriptions: []gax.CallOption{ 129 gax.WithRetry(func() gax.Retryer { 130 return gax.OnCodes([]codes.Code{ 131 codes.Unknown, 132 codes.Aborted, 133 codes.Unavailable, 134 }, gax.Backoff{ 135 Initial: 100 * time.Millisecond, 136 Max: 60000 * time.Millisecond, 137 Multiplier: 1.30, 138 }) 139 }), 140 }, 141 ListTopicSnapshots: []gax.CallOption{ 142 gax.WithRetry(func() gax.Retryer { 143 return gax.OnCodes([]codes.Code{ 144 codes.Unknown, 145 codes.Aborted, 146 codes.Unavailable, 147 }, gax.Backoff{ 148 Initial: 100 * time.Millisecond, 149 Max: 60000 * time.Millisecond, 150 Multiplier: 1.30, 151 }) 152 }), 153 }, 154 DeleteTopic: []gax.CallOption{ 155 gax.WithRetry(func() gax.Retryer { 156 return gax.OnCodes([]codes.Code{ 157 codes.Unavailable, 158 }, gax.Backoff{ 159 Initial: 100 * time.Millisecond, 160 Max: 60000 * time.Millisecond, 161 Multiplier: 1.30, 162 }) 163 }), 164 }, 165 } 166} 167 168// PublisherClient is a client for interacting with Cloud Pub/Sub API. 169// 170// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls. 171type PublisherClient struct { 172 // Connection pool of gRPC connections to the service. 173 connPool gtransport.ConnPool 174 175 // The gRPC API client. 176 publisherClient pubsubpb.PublisherClient 177 178 // The call options for this service. 179 CallOptions *PublisherCallOptions 180 181 // The x-goog-* metadata to be sent with each request. 182 xGoogMetadata metadata.MD 183} 184 185// NewPublisherClient creates a new publisher client. 186// 187// The service that an application uses to manipulate topics, and to send 188// messages to a topic. 189func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) { 190 clientOpts := defaultPublisherClientOptions() 191 192 if newPublisherClientHook != nil { 193 hookOpts, err := newPublisherClientHook(ctx, clientHookParams{}) 194 if err != nil { 195 return nil, err 196 } 197 clientOpts = append(clientOpts, hookOpts...) 198 } 199 200 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...) 201 if err != nil { 202 return nil, err 203 } 204 c := &PublisherClient{ 205 connPool: connPool, 206 CallOptions: defaultPublisherCallOptions(), 207 208 publisherClient: pubsubpb.NewPublisherClient(connPool), 209 } 210 c.setGoogleClientInfo() 211 212 return c, nil 213} 214 215// Connection returns a connection to the API service. 216// 217// Deprecated. 218func (c *PublisherClient) Connection() *grpc.ClientConn { 219 return c.connPool.Conn() 220} 221 222// Close closes the connection to the API service. The user should invoke this when 223// the client is no longer required. 224func (c *PublisherClient) Close() error { 225 return c.connPool.Close() 226} 227 228// setGoogleClientInfo sets the name and version of the application in 229// the `x-goog-api-client` header passed on each request. Intended for 230// use by Google-written clients. 231func (c *PublisherClient) setGoogleClientInfo(keyval ...string) { 232 kv := append([]string{"gl-go", versionGo()}, keyval...) 233 kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version) 234 c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) 235} 236 237// CreateTopic creates the given topic with the given name. See the 238// resource name rules (at https://cloud.google.com/pubsub/docs/admin#resource_names). 239func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 240 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))) 241 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 242 opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...) 243 var resp *pubsubpb.Topic 244 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 245 var err error 246 resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...) 247 return err 248 }, opts...) 249 if err != nil { 250 return nil, err 251 } 252 return resp, nil 253} 254 255// UpdateTopic updates an existing topic. Note that certain properties of a 256// topic are not modifiable. 257func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 258 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))) 259 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 260 opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...) 261 var resp *pubsubpb.Topic 262 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 263 var err error 264 resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...) 265 return err 266 }, opts...) 267 if err != nil { 268 return nil, err 269 } 270 return resp, nil 271} 272 273// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic 274// does not exist. 275func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) { 276 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 277 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 278 opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...) 279 var resp *pubsubpb.PublishResponse 280 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 281 var err error 282 resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...) 283 return err 284 }, opts...) 285 if err != nil { 286 return nil, err 287 } 288 return resp, nil 289} 290 291// GetTopic gets the configuration of a topic. 292func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 293 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 294 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 295 opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...) 296 var resp *pubsubpb.Topic 297 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 298 var err error 299 resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...) 300 return err 301 }, opts...) 302 if err != nil { 303 return nil, err 304 } 305 return resp, nil 306} 307 308// ListTopics lists matching topics. 309func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator { 310 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject()))) 311 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 312 opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...) 313 it := &TopicIterator{} 314 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest) 315 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) { 316 var resp *pubsubpb.ListTopicsResponse 317 req.PageToken = pageToken 318 if pageSize > math.MaxInt32 { 319 req.PageSize = math.MaxInt32 320 } else { 321 req.PageSize = int32(pageSize) 322 } 323 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 324 var err error 325 resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...) 326 return err 327 }, opts...) 328 if err != nil { 329 return nil, "", err 330 } 331 332 it.Response = resp 333 return resp.Topics, resp.NextPageToken, nil 334 } 335 fetch := func(pageSize int, pageToken string) (string, error) { 336 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 337 if err != nil { 338 return "", err 339 } 340 it.items = append(it.items, items...) 341 return nextPageToken, nil 342 } 343 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 344 it.pageInfo.MaxSize = int(req.PageSize) 345 it.pageInfo.Token = req.PageToken 346 return it 347} 348 349// ListTopicSubscriptions lists the names of the subscriptions on this topic. 350func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator { 351 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 352 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 353 opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...) 354 it := &StringIterator{} 355 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest) 356 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 357 var resp *pubsubpb.ListTopicSubscriptionsResponse 358 req.PageToken = pageToken 359 if pageSize > math.MaxInt32 { 360 req.PageSize = math.MaxInt32 361 } else { 362 req.PageSize = int32(pageSize) 363 } 364 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 365 var err error 366 resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...) 367 return err 368 }, opts...) 369 if err != nil { 370 return nil, "", err 371 } 372 373 it.Response = resp 374 return resp.Subscriptions, resp.NextPageToken, nil 375 } 376 fetch := func(pageSize int, pageToken string) (string, error) { 377 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 378 if err != nil { 379 return "", err 380 } 381 it.items = append(it.items, items...) 382 return nextPageToken, nil 383 } 384 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 385 it.pageInfo.MaxSize = int(req.PageSize) 386 it.pageInfo.Token = req.PageToken 387 return it 388} 389 390// ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in 391// Seek (at https://cloud.google.com/pubsub/docs/replay-overview) 392// operations, which allow 393// you to manage message acknowledgments in bulk. That is, you can set the 394// acknowledgment state of messages in an existing subscription to the state 395// captured by a snapshot. 396func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator { 397 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 398 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 399 opts = append(c.CallOptions.ListTopicSnapshots[0:len(c.CallOptions.ListTopicSnapshots):len(c.CallOptions.ListTopicSnapshots)], opts...) 400 it := &StringIterator{} 401 req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest) 402 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 403 var resp *pubsubpb.ListTopicSnapshotsResponse 404 req.PageToken = pageToken 405 if pageSize > math.MaxInt32 { 406 req.PageSize = math.MaxInt32 407 } else { 408 req.PageSize = int32(pageSize) 409 } 410 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 411 var err error 412 resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...) 413 return err 414 }, opts...) 415 if err != nil { 416 return nil, "", err 417 } 418 419 it.Response = resp 420 return resp.Snapshots, resp.NextPageToken, nil 421 } 422 fetch := func(pageSize int, pageToken string) (string, error) { 423 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 424 if err != nil { 425 return "", err 426 } 427 it.items = append(it.items, items...) 428 return nextPageToken, nil 429 } 430 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 431 it.pageInfo.MaxSize = int(req.PageSize) 432 it.pageInfo.Token = req.PageToken 433 return it 434} 435 436// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic 437// does not exist. After a topic is deleted, a new topic may be created with 438// the same name; this is an entirely new topic with none of the old 439// configuration or subscriptions. Existing subscriptions to this topic are 440// not deleted, but their topic field is set to _deleted-topic_. 441func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error { 442 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 443 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 444 opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...) 445 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 446 var err error 447 _, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...) 448 return err 449 }, opts...) 450 return err 451} 452 453// StringIterator manages a stream of string. 454type StringIterator struct { 455 items []string 456 pageInfo *iterator.PageInfo 457 nextFunc func() error 458 459 // Response is the raw response for the current page. 460 // It must be cast to the RPC response type. 461 // Calling Next() or InternalFetch() updates this value. 462 Response interface{} 463 464 // InternalFetch is for use by the Google Cloud Libraries only. 465 // It is not part of the stable interface of this package. 466 // 467 // InternalFetch returns results from a single call to the underlying RPC. 468 // The number of results is no greater than pageSize. 469 // If there are no more results, nextPageToken is empty and err is nil. 470 InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error) 471} 472 473// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 474func (it *StringIterator) PageInfo() *iterator.PageInfo { 475 return it.pageInfo 476} 477 478// Next returns the next result. Its second return value is iterator.Done if there are no more 479// results. Once Next returns Done, all subsequent calls will return Done. 480func (it *StringIterator) Next() (string, error) { 481 var item string 482 if err := it.nextFunc(); err != nil { 483 return item, err 484 } 485 item = it.items[0] 486 it.items = it.items[1:] 487 return item, nil 488} 489 490func (it *StringIterator) bufLen() int { 491 return len(it.items) 492} 493 494func (it *StringIterator) takeBuf() interface{} { 495 b := it.items 496 it.items = nil 497 return b 498} 499 500// TopicIterator manages a stream of *pubsubpb.Topic. 501type TopicIterator struct { 502 items []*pubsubpb.Topic 503 pageInfo *iterator.PageInfo 504 nextFunc func() error 505 506 // Response is the raw response for the current page. 507 // It must be cast to the RPC response type. 508 // Calling Next() or InternalFetch() updates this value. 509 Response interface{} 510 511 // InternalFetch is for use by the Google Cloud Libraries only. 512 // It is not part of the stable interface of this package. 513 // 514 // InternalFetch returns results from a single call to the underlying RPC. 515 // The number of results is no greater than pageSize. 516 // If there are no more results, nextPageToken is empty and err is nil. 517 InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error) 518} 519 520// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 521func (it *TopicIterator) PageInfo() *iterator.PageInfo { 522 return it.pageInfo 523} 524 525// Next returns the next result. Its second return value is iterator.Done if there are no more 526// results. Once Next returns Done, all subsequent calls will return Done. 527func (it *TopicIterator) Next() (*pubsubpb.Topic, error) { 528 var item *pubsubpb.Topic 529 if err := it.nextFunc(); err != nil { 530 return item, err 531 } 532 item = it.items[0] 533 it.items = it.items[1:] 534 return item, nil 535} 536 537func (it *TopicIterator) bufLen() int { 538 return len(it.items) 539} 540 541func (it *TopicIterator) takeBuf() interface{} { 542 b := it.items 543 it.items = nil 544 return b 545} 546