1// Copyright 2014 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub // import "cloud.google.com/go/pubsub" 16 17import ( 18 "context" 19 "fmt" 20 "os" 21 "runtime" 22 "time" 23 24 "cloud.google.com/go/internal/version" 25 vkit "cloud.google.com/go/pubsub/apiv1" 26 "google.golang.org/api/option" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/keepalive" 29) 30 31const ( 32 // ScopePubSub grants permissions to view and manage Pub/Sub 33 // topics and subscriptions. 34 ScopePubSub = "https://www.googleapis.com/auth/pubsub" 35 36 // ScopeCloudPlatform grants permissions to view and manage your data 37 // across Google Cloud Platform services. 38 ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" 39 40 prodAddr = "https://pubsub.googleapis.com/" 41 42 maxAckDeadline = 10 * time.Minute 43) 44 45// Client is a Google Pub/Sub client scoped to a single project. 46// 47// Clients should be reused rather than being created as needed. 48// A Client may be shared by multiple goroutines. 49type Client struct { 50 projectID string 51 pubc *vkit.PublisherClient 52 subc *vkit.SubscriberClient 53} 54 55// NewClient creates a new PubSub client. 56func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { 57 var o []option.ClientOption 58 // Environment variables for gcloud emulator: 59 // https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/ 60 if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" { 61 conn, err := grpc.Dial(addr, grpc.WithInsecure()) 62 if err != nil { 63 return nil, fmt.Errorf("grpc.Dial: %v", err) 64 } 65 o = []option.ClientOption{option.WithGRPCConn(conn)} 66 } else { 67 o = []option.ClientOption{ 68 // Create multiple connections to increase throughput. 69 option.WithGRPCConnectionPool(runtime.GOMAXPROCS(0)), 70 option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ 71 Time: 5 * time.Minute, 72 })), 73 } 74 o = append(o, openCensusOptions()...) 75 } 76 o = append(o, opts...) 77 pubc, err := vkit.NewPublisherClient(ctx, o...) 78 if err != nil { 79 return nil, fmt.Errorf("pubsub: %v", err) 80 } 81 subc, err := vkit.NewSubscriberClient(ctx, option.WithGRPCConn(pubc.Connection())) 82 if err != nil { 83 // Should never happen, since we are passing in the connection. 84 // If it does, we cannot close, because the user may have passed in their 85 // own connection originally. 86 return nil, fmt.Errorf("pubsub: %v", err) 87 } 88 pubc.SetGoogleClientInfo("gccl", version.Repo) 89 subc.SetGoogleClientInfo("gccl", version.Repo) 90 return &Client{ 91 projectID: projectID, 92 pubc: pubc, 93 subc: subc, 94 }, nil 95} 96 97// Close releases any resources held by the client, 98// such as memory and goroutines. 99// 100// If the client is available for the lifetime of the program, then Close need not be 101// called at exit. 102func (c *Client) Close() error { 103 // Return the first error, because the first call closes the connection. 104 err := c.pubc.Close() 105 _ = c.subc.Close() 106 return err 107} 108 109func (c *Client) fullyQualifiedProjectName() string { 110 return fmt.Sprintf("projects/%s", c.projectID) 111} 112