1// Copyright 2014 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub // import "cloud.google.com/go/pubsub"
16
17import (
18	"context"
19	"fmt"
20	"os"
21	"runtime"
22	"time"
23
24	"cloud.google.com/go/internal/version"
25	vkit "cloud.google.com/go/pubsub/apiv1"
26	"google.golang.org/api/option"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/keepalive"
29)
30
31const (
32	// ScopePubSub grants permissions to view and manage Pub/Sub
33	// topics and subscriptions.
34	ScopePubSub = "https://www.googleapis.com/auth/pubsub"
35
36	// ScopeCloudPlatform grants permissions to view and manage your data
37	// across Google Cloud Platform services.
38	ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
39
40	prodAddr = "https://pubsub.googleapis.com/"
41
42	maxAckDeadline = 10 * time.Minute
43)
44
45// Client is a Google Pub/Sub client scoped to a single project.
46//
47// Clients should be reused rather than being created as needed.
48// A Client may be shared by multiple goroutines.
49type Client struct {
50	projectID string
51	pubc      *vkit.PublisherClient
52	subc      *vkit.SubscriberClient
53}
54
55// NewClient creates a new PubSub client.
56func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
57	var o []option.ClientOption
58	// Environment variables for gcloud emulator:
59	// https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/
60	if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" {
61		conn, err := grpc.Dial(addr, grpc.WithInsecure())
62		if err != nil {
63			return nil, fmt.Errorf("grpc.Dial: %v", err)
64		}
65		o = []option.ClientOption{option.WithGRPCConn(conn)}
66	} else {
67		o = []option.ClientOption{
68			// Create multiple connections to increase throughput.
69			option.WithGRPCConnectionPool(runtime.GOMAXPROCS(0)),
70			option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
71				Time: 5 * time.Minute,
72			})),
73		}
74		o = append(o, openCensusOptions()...)
75	}
76	o = append(o, opts...)
77	pubc, err := vkit.NewPublisherClient(ctx, o...)
78	if err != nil {
79		return nil, fmt.Errorf("pubsub: %v", err)
80	}
81	subc, err := vkit.NewSubscriberClient(ctx, option.WithGRPCConn(pubc.Connection()))
82	if err != nil {
83		// Should never happen, since we are passing in the connection.
84		// If it does, we cannot close, because the user may have passed in their
85		// own connection originally.
86		return nil, fmt.Errorf("pubsub: %v", err)
87	}
88	pubc.SetGoogleClientInfo("gccl", version.Repo)
89	subc.SetGoogleClientInfo("gccl", version.Repo)
90	return &Client{
91		projectID: projectID,
92		pubc:      pubc,
93		subc:      subc,
94	}, nil
95}
96
97// Close releases any resources held by the client,
98// such as memory and goroutines.
99//
100// If the client is available for the lifetime of the program, then Close need not be
101// called at exit.
102func (c *Client) Close() error {
103	// Return the first error, because the first call closes the connection.
104	err := c.pubc.Close()
105	_ = c.subc.Close()
106	return err
107}
108
109func (c *Client) fullyQualifiedProjectName() string {
110	return fmt.Sprintf("projects/%s", c.projectID)
111}
112