1// Copyright 2014 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub // import "cloud.google.com/go/pubsub"
16
17import (
18	"context"
19	"fmt"
20	"os"
21	"runtime"
22	"time"
23
24	"cloud.google.com/go/internal/version"
25	vkit "cloud.google.com/go/pubsub/apiv1"
26	"google.golang.org/api/option"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/keepalive"
29)
30
31const (
32	// ScopePubSub grants permissions to view and manage Pub/Sub
33	// topics and subscriptions.
34	ScopePubSub = "https://www.googleapis.com/auth/pubsub"
35
36	// ScopeCloudPlatform grants permissions to view and manage your data
37	// across Google Cloud Platform services.
38	ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
39
40	maxAckDeadline = 10 * time.Minute
41)
42
43// Client is a Google Pub/Sub client scoped to a single project.
44//
45// Clients should be reused rather than being created as needed.
46// A Client may be shared by multiple goroutines.
47type Client struct {
48	projectID string
49	pubc      *vkit.PublisherClient
50	subc      *vkit.SubscriberClient
51}
52
53// NewClient creates a new PubSub client.
54func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
55	var o []option.ClientOption
56	// Environment variables for gcloud emulator:
57	// https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/
58	if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" {
59		conn, err := grpc.Dial(addr, grpc.WithInsecure())
60		if err != nil {
61			return nil, fmt.Errorf("grpc.Dial: %v", err)
62		}
63		o = []option.ClientOption{option.WithGRPCConn(conn)}
64	} else {
65		numConns := runtime.GOMAXPROCS(0)
66		if numConns > 4 {
67			numConns = 4
68		}
69		o = []option.ClientOption{
70			// Create multiple connections to increase throughput.
71			option.WithGRPCConnectionPool(numConns),
72			option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
73				Time: 5 * time.Minute,
74			})),
75		}
76		o = append(o, openCensusOptions()...)
77	}
78	o = append(o, opts...)
79	pubc, err := vkit.NewPublisherClient(ctx, o...)
80	if err != nil {
81		return nil, fmt.Errorf("pubsub: %v", err)
82	}
83	subc, err := vkit.NewSubscriberClient(ctx, option.WithGRPCConn(pubc.Connection()))
84	if err != nil {
85		// Should never happen, since we are passing in the connection.
86		// If it does, we cannot close, because the user may have passed in their
87		// own connection originally.
88		return nil, fmt.Errorf("pubsub: %v", err)
89	}
90	pubc.SetGoogleClientInfo("gccl", version.Repo)
91	return &Client{
92		projectID: projectID,
93		pubc:      pubc,
94		subc:      subc,
95	}, nil
96}
97
98// Close releases any resources held by the client,
99// such as memory and goroutines.
100//
101// If the client is available for the lifetime of the program, then Close need not be
102// called at exit.
103func (c *Client) Close() error {
104	// Return the first error, because the first call closes the connection.
105	err := c.pubc.Close()
106	_ = c.subc.Close()
107	return err
108}
109
110func (c *Client) fullyQualifiedProjectName() string {
111	return fmt.Sprintf("projects/%s", c.projectID)
112}
113