1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub // import "cloud.google.com/go/pubsub" 16 17import ( 18 "context" 19 "fmt" 20 "os" 21 "runtime" 22 "time" 23 24 "cloud.google.com/go/internal/version" 25 vkit "cloud.google.com/go/pubsub/apiv1" 26 "google.golang.org/api/option" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/keepalive" 29) 30 31const ( 32 // ScopePubSub grants permissions to view and manage Pub/Sub 33 // topics and subscriptions. 34 ScopePubSub = "https://www.googleapis.com/auth/pubsub" 35 36 // ScopeCloudPlatform grants permissions to view and manage your data 37 // across Google Cloud Platform services. 38 ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" 39 40 maxAckDeadline = 10 * time.Minute 41) 42 43// Client is a Google Pub/Sub client scoped to a single project. 44// 45// Clients should be reused rather than being created as needed. 46// A Client may be shared by multiple goroutines. 47type Client struct { 48 projectID string 49 pubc *vkit.PublisherClient 50 subc *vkit.SubscriberClient 51} 52 53// NewClient creates a new PubSub client. 54func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { 55 var o []option.ClientOption 56 // Environment variables for gcloud emulator: 57 // https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/ 58 if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" { 59 conn, err := grpc.Dial(addr, grpc.WithInsecure()) 60 if err != nil { 61 return nil, fmt.Errorf("grpc.Dial: %v", err) 62 } 63 o = []option.ClientOption{option.WithGRPCConn(conn)} 64 } else { 65 numConns := runtime.GOMAXPROCS(0) 66 if numConns > 4 { 67 numConns = 4 68 } 69 o = []option.ClientOption{ 70 // Create multiple connections to increase throughput. 71 option.WithGRPCConnectionPool(numConns), 72 option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ 73 Time: 5 * time.Minute, 74 })), 75 } 76 o = append(o, openCensusOptions()...) 77 } 78 o = append(o, opts...) 79 pubc, err := vkit.NewPublisherClient(ctx, o...) 80 if err != nil { 81 return nil, fmt.Errorf("pubsub: %v", err) 82 } 83 subc, err := vkit.NewSubscriberClient(ctx, option.WithGRPCConn(pubc.Connection())) 84 if err != nil { 85 // Should never happen, since we are passing in the connection. 86 // If it does, we cannot close, because the user may have passed in their 87 // own connection originally. 88 return nil, fmt.Errorf("pubsub: %v", err) 89 } 90 pubc.SetGoogleClientInfo("gccl", version.Repo) 91 return &Client{ 92 projectID: projectID, 93 pubc: pubc, 94 subc: subc, 95 }, nil 96} 97 98// Close releases any resources held by the client, 99// such as memory and goroutines. 100// 101// If the client is available for the lifetime of the program, then Close need not be 102// called at exit. 103func (c *Client) Close() error { 104 // Return the first error, because the first call closes the connection. 105 err := c.pubc.Close() 106 _ = c.subc.Close() 107 return err 108} 109 110func (c *Client) fullyQualifiedProjectName() string { 111 return fmt.Sprintf("projects/%s", c.projectID) 112} 113