1// Copyright 2019 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Code generated by gapic-generator. DO NOT EDIT. 16 17package pubsub 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "net/url" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 gax "github.com/googleapis/gax-go/v2" 28 "google.golang.org/api/iterator" 29 "google.golang.org/api/option" 30 "google.golang.org/api/transport" 31 pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/metadata" 35) 36 37// PublisherCallOptions contains the retry settings for each method of PublisherClient. 38type PublisherCallOptions struct { 39 CreateTopic []gax.CallOption 40 UpdateTopic []gax.CallOption 41 Publish []gax.CallOption 42 GetTopic []gax.CallOption 43 ListTopics []gax.CallOption 44 ListTopicSubscriptions []gax.CallOption 45 DeleteTopic []gax.CallOption 46} 47 48func defaultPublisherClientOptions() []option.ClientOption { 49 return []option.ClientOption{ 50 option.WithEndpoint("pubsub.googleapis.com:443"), 51 option.WithScopes(DefaultAuthScopes()...), 52 option.WithGRPCDialOption(grpc.WithDefaultCallOptions( 53 grpc.MaxCallRecvMsgSize(math.MaxInt32))), 54 } 55} 56 57func defaultPublisherCallOptions() *PublisherCallOptions { 58 retry := map[[2]string][]gax.CallOption{ 59 {"default", "idempotent"}: { 60 gax.WithRetry(func() gax.Retryer { 61 return gax.OnCodes([]codes.Code{ 62 codes.Aborted, 63 codes.Unavailable, 64 codes.Unknown, 65 }, gax.Backoff{ 66 Initial: 100 * time.Millisecond, 67 Max: 60000 * time.Millisecond, 68 Multiplier: 1.3, 69 }) 70 }), 71 }, 72 {"default", "non_idempotent"}: { 73 gax.WithRetry(func() gax.Retryer { 74 return gax.OnCodes([]codes.Code{ 75 codes.Unavailable, 76 }, gax.Backoff{ 77 Initial: 100 * time.Millisecond, 78 Max: 60000 * time.Millisecond, 79 Multiplier: 1.3, 80 }) 81 }), 82 }, 83 {"messaging", "publish"}: { 84 gax.WithRetry(func() gax.Retryer { 85 return gax.OnCodes([]codes.Code{ 86 codes.Aborted, 87 codes.Canceled, 88 codes.DeadlineExceeded, 89 codes.Internal, 90 codes.ResourceExhausted, 91 codes.Unavailable, 92 codes.Unknown, 93 }, gax.Backoff{ 94 Initial: 100 * time.Millisecond, 95 Max: 60000 * time.Millisecond, 96 Multiplier: 1.3, 97 }) 98 }), 99 }, 100 } 101 return &PublisherCallOptions{ 102 CreateTopic: retry[[2]string{"default", "non_idempotent"}], 103 UpdateTopic: retry[[2]string{"default", "non_idempotent"}], 104 Publish: retry[[2]string{"messaging", "publish"}], 105 GetTopic: retry[[2]string{"default", "idempotent"}], 106 ListTopics: retry[[2]string{"default", "idempotent"}], 107 ListTopicSubscriptions: retry[[2]string{"default", "idempotent"}], 108 DeleteTopic: retry[[2]string{"default", "non_idempotent"}], 109 } 110} 111 112// PublisherClient is a client for interacting with Google Cloud Pub/Sub API. 113// 114// Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls. 115type PublisherClient struct { 116 // The connection to the service. 117 conn *grpc.ClientConn 118 119 // The gRPC API client. 120 publisherClient pubsubpb.PublisherClient 121 122 // The call options for this service. 123 CallOptions *PublisherCallOptions 124 125 // The x-goog-* metadata to be sent with each request. 126 xGoogMetadata metadata.MD 127} 128 129// NewPublisherClient creates a new publisher client. 130// 131// The service that an application uses to manipulate topics, and to send 132// messages to a topic. 133func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) { 134 conn, err := transport.DialGRPC(ctx, append(defaultPublisherClientOptions(), opts...)...) 135 if err != nil { 136 return nil, err 137 } 138 c := &PublisherClient{ 139 conn: conn, 140 CallOptions: defaultPublisherCallOptions(), 141 142 publisherClient: pubsubpb.NewPublisherClient(conn), 143 } 144 c.SetGoogleClientInfo() 145 return c, nil 146} 147 148// Connection returns the client's connection to the API service. 149func (c *PublisherClient) Connection() *grpc.ClientConn { 150 return c.conn 151} 152 153// Close closes the connection to the API service. The user should invoke this when 154// the client is no longer required. 155func (c *PublisherClient) Close() error { 156 return c.conn.Close() 157} 158 159// SetGoogleClientInfo sets the name and version of the application in 160// the `x-goog-api-client` header passed on each request. Intended for 161// use by Google-written clients. 162func (c *PublisherClient) SetGoogleClientInfo(keyval ...string) { 163 kv := append([]string{"gl-go", versionGo()}, keyval...) 164 kv = append(kv, "gapic", versionClient, "gax", gax.Version, "grpc", grpc.Version) 165 c.xGoogMetadata = metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) 166} 167 168// CreateTopic creates the given topic with the given name. See the 169// <a href="https://cloud.google.com/pubsub/docs/admin#resource_names"> 170// resource name rules</a>. 171func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 172 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))) 173 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 174 opts = append(c.CallOptions.CreateTopic[0:len(c.CallOptions.CreateTopic):len(c.CallOptions.CreateTopic)], opts...) 175 var resp *pubsubpb.Topic 176 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 177 var err error 178 resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...) 179 return err 180 }, opts...) 181 if err != nil { 182 return nil, err 183 } 184 return resp, nil 185} 186 187// UpdateTopic updates an existing topic. Note that certain properties of a 188// topic are not modifiable. 189func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 190 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))) 191 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 192 opts = append(c.CallOptions.UpdateTopic[0:len(c.CallOptions.UpdateTopic):len(c.CallOptions.UpdateTopic)], opts...) 193 var resp *pubsubpb.Topic 194 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 195 var err error 196 resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...) 197 return err 198 }, opts...) 199 if err != nil { 200 return nil, err 201 } 202 return resp, nil 203} 204 205// Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic 206// does not exist. 207func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) { 208 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 209 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 210 opts = append(c.CallOptions.Publish[0:len(c.CallOptions.Publish):len(c.CallOptions.Publish)], opts...) 211 var resp *pubsubpb.PublishResponse 212 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 213 var err error 214 resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...) 215 return err 216 }, opts...) 217 if err != nil { 218 return nil, err 219 } 220 return resp, nil 221} 222 223// GetTopic gets the configuration of a topic. 224func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) { 225 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 226 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 227 opts = append(c.CallOptions.GetTopic[0:len(c.CallOptions.GetTopic):len(c.CallOptions.GetTopic)], opts...) 228 var resp *pubsubpb.Topic 229 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 230 var err error 231 resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...) 232 return err 233 }, opts...) 234 if err != nil { 235 return nil, err 236 } 237 return resp, nil 238} 239 240// ListTopics lists matching topics. 241func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator { 242 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject()))) 243 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 244 opts = append(c.CallOptions.ListTopics[0:len(c.CallOptions.ListTopics):len(c.CallOptions.ListTopics)], opts...) 245 it := &TopicIterator{} 246 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest) 247 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) { 248 var resp *pubsubpb.ListTopicsResponse 249 req.PageToken = pageToken 250 if pageSize > math.MaxInt32 { 251 req.PageSize = math.MaxInt32 252 } else { 253 req.PageSize = int32(pageSize) 254 } 255 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 256 var err error 257 resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...) 258 return err 259 }, opts...) 260 if err != nil { 261 return nil, "", err 262 } 263 return resp.Topics, resp.NextPageToken, nil 264 } 265 fetch := func(pageSize int, pageToken string) (string, error) { 266 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 267 if err != nil { 268 return "", err 269 } 270 it.items = append(it.items, items...) 271 return nextPageToken, nil 272 } 273 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 274 it.pageInfo.MaxSize = int(req.PageSize) 275 it.pageInfo.Token = req.PageToken 276 return it 277} 278 279// ListTopicSubscriptions lists the names of the subscriptions on this topic. 280func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator { 281 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 282 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 283 opts = append(c.CallOptions.ListTopicSubscriptions[0:len(c.CallOptions.ListTopicSubscriptions):len(c.CallOptions.ListTopicSubscriptions)], opts...) 284 it := &StringIterator{} 285 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest) 286 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) { 287 var resp *pubsubpb.ListTopicSubscriptionsResponse 288 req.PageToken = pageToken 289 if pageSize > math.MaxInt32 { 290 req.PageSize = math.MaxInt32 291 } else { 292 req.PageSize = int32(pageSize) 293 } 294 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 295 var err error 296 resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...) 297 return err 298 }, opts...) 299 if err != nil { 300 return nil, "", err 301 } 302 return resp.Subscriptions, resp.NextPageToken, nil 303 } 304 fetch := func(pageSize int, pageToken string) (string, error) { 305 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken) 306 if err != nil { 307 return "", err 308 } 309 it.items = append(it.items, items...) 310 return nextPageToken, nil 311 } 312 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf) 313 it.pageInfo.MaxSize = int(req.PageSize) 314 it.pageInfo.Token = req.PageToken 315 return it 316} 317 318// DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic 319// does not exist. After a topic is deleted, a new topic may be created with 320// the same name; this is an entirely new topic with none of the old 321// configuration or subscriptions. Existing subscriptions to this topic are 322// not deleted, but their topic field is set to _deleted-topic_. 323func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error { 324 md := metadata.Pairs("x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))) 325 ctx = insertMetadata(ctx, c.xGoogMetadata, md) 326 opts = append(c.CallOptions.DeleteTopic[0:len(c.CallOptions.DeleteTopic):len(c.CallOptions.DeleteTopic)], opts...) 327 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error { 328 var err error 329 _, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...) 330 return err 331 }, opts...) 332 return err 333} 334 335// StringIterator manages a stream of string. 336type StringIterator struct { 337 items []string 338 pageInfo *iterator.PageInfo 339 nextFunc func() error 340 341 // InternalFetch is for use by the Google Cloud Libraries only. 342 // It is not part of the stable interface of this package. 343 // 344 // InternalFetch returns results from a single call to the underlying RPC. 345 // The number of results is no greater than pageSize. 346 // If there are no more results, nextPageToken is empty and err is nil. 347 InternalFetch func(pageSize int, pageToken string) (results []string, nextPageToken string, err error) 348} 349 350// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 351func (it *StringIterator) PageInfo() *iterator.PageInfo { 352 return it.pageInfo 353} 354 355// Next returns the next result. Its second return value is iterator.Done if there are no more 356// results. Once Next returns Done, all subsequent calls will return Done. 357func (it *StringIterator) Next() (string, error) { 358 var item string 359 if err := it.nextFunc(); err != nil { 360 return item, err 361 } 362 item = it.items[0] 363 it.items = it.items[1:] 364 return item, nil 365} 366 367func (it *StringIterator) bufLen() int { 368 return len(it.items) 369} 370 371func (it *StringIterator) takeBuf() interface{} { 372 b := it.items 373 it.items = nil 374 return b 375} 376 377// TopicIterator manages a stream of *pubsubpb.Topic. 378type TopicIterator struct { 379 items []*pubsubpb.Topic 380 pageInfo *iterator.PageInfo 381 nextFunc func() error 382 383 // InternalFetch is for use by the Google Cloud Libraries only. 384 // It is not part of the stable interface of this package. 385 // 386 // InternalFetch returns results from a single call to the underlying RPC. 387 // The number of results is no greater than pageSize. 388 // If there are no more results, nextPageToken is empty and err is nil. 389 InternalFetch func(pageSize int, pageToken string) (results []*pubsubpb.Topic, nextPageToken string, err error) 390} 391 392// PageInfo supports pagination. See the google.golang.org/api/iterator package for details. 393func (it *TopicIterator) PageInfo() *iterator.PageInfo { 394 return it.pageInfo 395} 396 397// Next returns the next result. Its second return value is iterator.Done if there are no more 398// results. Once Next returns Done, all subsequent calls will return Done. 399func (it *TopicIterator) Next() (*pubsubpb.Topic, error) { 400 var item *pubsubpb.Topic 401 if err := it.nextFunc(); err != nil { 402 return item, err 403 } 404 item = it.items[0] 405 it.items = it.items[1:] 406 return item, nil 407} 408 409func (it *TopicIterator) bufLen() int { 410 return len(it.items) 411} 412 413func (it *TopicIterator) takeBuf() interface{} { 414 b := it.items 415 it.items = nil 416 return b 417} 418