1// Copyright 2019 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Code generated by gapic-generator. DO NOT EDIT. 16 17package pubsub 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "time" 24 25 "github.com/golang/protobuf/proto" 26 gax "github.com/googleapis/gax-go/v2" 27 "google.golang.org/api/iterator" 28 "google.golang.org/api/option" 29 "google.golang.org/api/transport" 30 pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/metadata" 34) 35 36// PublisherCallOptions contains the retry settings for each method of PublisherClient. 37type PublisherCallOptions struct { 38 CreateTopic []gax.CallOption 39 UpdateTopic []gax.CallOption 40 Publish []gax.CallOption 41 GetTopic []gax.CallOption 42 ListTopics []gax.CallOption 43 ListTopicSubscriptions []gax.CallOption 44 DeleteTopic []gax.CallOption 45} 46 47func defaultPublisherClientOptions() []option.ClientOption { 48 return []option.ClientOption{ 49 option.WithEndpoint("pubsub.googleapis.com:443"), 50 option.WithScopes(DefaultAuthScopes()...), 51 } 52} 53 54func defaultPublisherCallOptions() *PublisherCallOptions { 55 retry := map[[2]string][]gax.CallOption{ 56 {"default", "idempotent"}: { 57 gax.WithRetry(func() gax.Retryer { 58 return gax.OnCodes([]codes.Code{ 59 codes.Aborted, 60 codes.Unavailable, 61 codes.Unknown, 62 }, gax.Backoff{ 63 Initial: 100 * time.Millisecond, 64 Max: 60000 * time.Millisecond, 65 Multiplier: 1.3, 66 }) 67 }), 68 }, 69 {"default", "non_idempotent"}: { 70 gax.WithRetry(func() gax.Retryer { 71 return gax.OnCodes([]codes.Code{ 72 codes.Unavailable, 73 }, gax.Backoff{ 74 Initial: 100 * time.Millisecond, 75 Max: 60000 * time.Millisecond, 76 Multiplier: 1.3, 77 }) 78 }), 79 }, 80 {"messaging", "publish"}: { 81 gax.WithRetry(func() gax.Retryer { 82 return gax.OnCodes([]codes.Code{ 83 codes.Aborted, 84 codes.Canceled, 85 codes.DeadlineExceeded, 86 codes.Internal, 87 codes.ResourceExhausted, 88 codes.Unavailable, 89 codes.Unknown, 90 }, gax.Backoff{ 91 Initial: 100 * time.Millisecond, 92 Max: 60000 * time.Millisecond, 93 Multiplier: 1.3, 94 }) 95 }), 96 }, 97 } 98 return &PublisherCallOptions{ 99 CreateTopic: retry[[2]string{"default", "non_idempotent"}], 100 UpdateTopic: retry[[2]string{"default", "non_idempotent"}], 101 Publish: retry[[2]string{"messaging", "publish"}], 102 GetTopic: retry[[2]string{"default", "idempotent"}], 103 ListTopics: retry[[2]string{"default", "idempotent"}], 104 ListTopicSubscriptions: retry[[2]string{"default", "idempotent"}], 105 DeleteTopic: retry[[2]string{"default", "non_idempotent"}], 106 } 107} 108 109// PublisherClient is a client for interacting with Google Cloud Pub/Sub API. 110// 111// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls. 112type PublisherClient struct { 113 // The connection to the service. 114 conn *grpc.ClientConn 115 116 // The gRPC API client. 117 publisherClient pubsubpb.PublisherClient 118 119 // The call options for this service. 120 CallOptions *PublisherCallOptions 121 122 // The x-goog-* metadata to be sent with each request. 123 xGoogMetadata metadata.MD 124} 125 126// NewPublisherClient creates a new publisher client. 127// 128// The service that an application uses to manipulate topics, and to send 129// messages to a topic. 130func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) { 131 conn, err := transport.DialGRPC(ctx, append(defaultPublisherClientOptions(), opts...)...) 132 if err != nil { 133 return nil, err 134 } 135 c := &PublisherClient{ 136 conn: conn, 137 CallOptions: defaultPublisherCallOptions(), 138 139 publisherClient: pubsubpb.NewPublisherClient(conn), 140 } 141 c.SetGoogleClientInfo() 142 return c, nil 143} 144 145// Connection returns the client's connection to the API service. 146func (c *PublisherClient) Connection() *grpc.ClientConn { 147 return c.conn 148} 149 150// Close closes the connection to the API service. The user should invoke this when 151// the client is no longer required. 152func (c *PublisherClient) Close() error { 153 return c.conn.Close() 154} 155 156// SetGoogleClientInfo sets the name and version of the application in 157// the `x-goog-api-client` header passed on each request. Intended for 158// use by Google-written clients. 159func (c *PublisherClient) SetGoogleClientInfo(keyval ...string) { 160 kv := append([]string{"gl-go", versionGo()}, keyval...) 161 kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version) 162 c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) 163} 164 165// CreateTopic creates the given topic with the given name. See the 166// <a href="https://cloud.google.com/pubsub/docs/admin#resource_names"> 167// resource name rules</a>. 168func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 169 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", req.GetName())) 170 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 171 opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...) 172 var resp *pubsubpb.Topic 173 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 174 var err error 175 resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...) 176 return err 177 }, opts...) 178 if err != nil { 179 return nil, err 180 } 181 return resp, nil 182} 183 184// UpdateTopic updates an existing topic. Note that certain properties of a 185// topic are not modifiable. 186func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 187 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", req.GetTopic().GetName())) 188 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 189 opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...) 190 var resp *pubsubpb.Topic 191 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 192 var err error 193 resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...) 194 return err 195 }, opts...) 196 if err != nil { 197 return nil, err 198 } 199 return resp, nil 200} 201 202// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic 203// does not exist. 204func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) { 205 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic())) 206 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 207 opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...) 208 var resp *pubsubpb.PublishResponse 209 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 210 var err error 211 resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...) 212 return err 213 }, opts...) 214 if err != nil { 215 return nil, err 216 } 217 return resp, nil 218} 219 220// GetTopic gets the configuration of a topic. 221func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 222 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic())) 223 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 224 opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...) 225 var resp *pubsubpb.Topic 226 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 227 var err error 228 resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...) 229 return err 230 }, opts...) 231 if err != nil { 232 return nil, err 233 } 234 return resp, nil 235} 236 237// ListTopics lists matching topics. 238func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator { 239 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", req.GetProject())) 240 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 241 opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...) 242 it := &TopicIterator{} 243 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest) 244 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) { 245 var resp *pubsubpb.ListTopicsResponse 246 req.PageToken = pageToken 247 if pageSize > math.MaxInt32 { 248 req.PageSize = math.MaxInt32 249 } else { 250 req.PageSize = int32(pageSize) 251 } 252 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 253 var err error 254 resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...) 255 return err 256 }, opts...) 257 if err != nil { 258 return nil, "", err 259 } 260 return resp.Topics, resp.NextPageToken, nil 261 } 262 fetch := func(pageSize int, pageToken string) (string, error) { 263 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 264 if err != nil { 265 return "", err 266 } 267 it.items = append(it.items, items...) 268 return nextPageToken, nil 269 } 270 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 271 it.pageInfo.MaxSize = int(req.PageSize) 272 it.pageInfo.Token = req.PageToken 273 return it 274} 275 276// ListTopicSubscriptions lists the names of the subscriptions on this topic. 277func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator { 278 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic())) 279 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 280 opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...) 281 it := &StringIterator{} 282 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest) 283 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 284 var resp *pubsubpb.ListTopicSubscriptionsResponse 285 req.PageToken = pageToken 286 if pageSize > math.MaxInt32 { 287 req.PageSize = math.MaxInt32 288 } else { 289 req.PageSize = int32(pageSize) 290 } 291 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 292 var err error 293 resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...) 294 return err 295 }, opts...) 296 if err != nil { 297 return nil, "", err 298 } 299 return resp.Subscriptions, resp.NextPageToken, nil 300 } 301 fetch := func(pageSize int, pageToken string) (string, error) { 302 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 303 if err != nil { 304 return "", err 305 } 306 it.items = append(it.items, items...) 307 return nextPageToken, nil 308 } 309 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 310 it.pageInfo.MaxSize = int(req.PageSize) 311 it.pageInfo.Token = req.PageToken 312 return it 313} 314 315// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic 316// does not exist. After a topic is deleted, a new topic may be created with 317// the same name; this is an entirely new topic with none of the old 318// configuration or subscriptions. Existing subscriptions to this topic are 319// not deleted, but their topic field is set to _deleted-topic_. 320func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error { 321 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", req.GetTopic())) 322 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 323 opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...) 324 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 325 var err error 326 _, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...) 327 return err 328 }, opts...) 329 return err 330} 331 332// StringIterator manages a stream of string. 333type StringIterator struct { 334 items []string 335 pageInfo *iterator.PageInfo 336 nextFunc func() error 337 338 // InternalFetch is for use by the Google Cloud Libraries only. 339 // It is not part of the stable interface of this package. 340 // 341 // InternalFetch returns results from a single call to the underlying RPC. 342 // The number of results is no greater than pageSize. 343 // If there are no more results, nextPageToken is empty and err is nil. 344 InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error) 345} 346 347// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 348func (it *StringIterator) PageInfo() *iterator.PageInfo { 349 return it.pageInfo 350} 351 352// Next returns the next result. Its second return value is iterator.Done if there are no more 353// results. Once Next returns Done, all subsequent calls will return Done. 354func (it *StringIterator) Next() (string, error) { 355 var item string 356 if err := it.nextFunc(); err != nil { 357 return item, err 358 } 359 item = it.items[0] 360 it.items = it.items[1:] 361 return item, nil 362} 363 364func (it *StringIterator) bufLen() int { 365 return len(it.items) 366} 367 368func (it *StringIterator) takeBuf() interface{} { 369 b := it.items 370 it.items = nil 371 return b 372} 373 374// TopicIterator manages a stream of *pubsubpb.Topic. 375type TopicIterator struct { 376 items []*pubsubpb.Topic 377 pageInfo *iterator.PageInfo 378 nextFunc func() error 379 380 // InternalFetch is for use by the Google Cloud Libraries only. 381 // It is not part of the stable interface of this package. 382 // 383 // InternalFetch returns results from a single call to the underlying RPC. 384 // The number of results is no greater than pageSize. 385 // If there are no more results, nextPageToken is empty and err is nil. 386 InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error) 387} 388 389// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 390func (it *TopicIterator) PageInfo() *iterator.PageInfo { 391 return it.pageInfo 392} 393 394// Next returns the next result. Its second return value is iterator.Done if there are no more 395// results. Once Next returns Done, all subsequent calls will return Done. 396func (it *TopicIterator) Next() (*pubsubpb.Topic, error) { 397 var item *pubsubpb.Topic 398 if err := it.nextFunc(); err != nil { 399 return item, err 400 } 401 item = it.items[0] 402 it.items = it.items[1:] 403 return item, nil 404} 405 406func (it *TopicIterator) bufLen() int { 407 return len(it.items) 408} 409 410func (it *TopicIterator) takeBuf() interface{} { 411 b := it.items 412 it.items = nil 413 return b 414} 415