1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"errors"
22	"flag"
23	"fmt"
24	"strings"
25	"time"
26
27	"cloud.google.com/go/bigtable/bttest"
28	btopt "cloud.google.com/go/bigtable/internal/option"
29	"cloud.google.com/go/internal/testutil"
30	"google.golang.org/api/option"
31	gtransport "google.golang.org/api/transport/grpc"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/peer"
34)
35
36var legacyUseProd string
37var integrationConfig IntegrationTestConfig
38
39func init() {
40	c := &integrationConfig
41
42	flag.BoolVar(&c.UseProd, "it.use-prod", false, "Use remote bigtable instead of local emulator")
43	flag.StringVar(&c.AdminEndpoint, "it.admin-endpoint", "", "Admin api host and port")
44	flag.StringVar(&c.DataEndpoint, "it.data-endpoint", "", "Data api host and port")
45	flag.StringVar(&c.Project, "it.project", "", "Project to use for integration test")
46	flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
47	flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
48	flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
49	flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath")
50	flag.BoolVar(&c.DirectPathIPV4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a ipv4-only VM")
51
52	// Backwards compat
53	flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`)
54
55}
56
57// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing
58type IntegrationTestConfig struct {
59	UseProd            bool
60	AdminEndpoint      string
61	DataEndpoint       string
62	Project            string
63	Instance           string
64	Cluster            string
65	Table              string
66	AttemptDirectPath  bool
67	DirectPathIPV4Only bool
68}
69
70// IntegrationEnv represents a testing environment.
71// The environment can be implemented using production or an emulator
72type IntegrationEnv interface {
73	Config() IntegrationTestConfig
74	NewAdminClient() (*AdminClient, error)
75	// NewInstanceAdminClient will return nil if instance administration is unsupported in this environment
76	NewInstanceAdminClient() (*InstanceAdminClient, error)
77	NewClient() (*Client, error)
78	Close()
79	Peer() *peer.Peer
80}
81
82// NewIntegrationEnv creates a new environment based on the command line args
83func NewIntegrationEnv() (IntegrationEnv, error) {
84	c := integrationConfig
85
86	if legacyUseProd != "" {
87		fmt.Println("WARNING: using legacy commandline arg -use_prod, please switch to -it.*")
88		parts := strings.SplitN(legacyUseProd, ",", 3)
89		c.UseProd = true
90		c.Project = parts[0]
91		c.Instance = parts[1]
92		c.Table = parts[2]
93	}
94
95	if integrationConfig.UseProd {
96		return NewProdEnv(c)
97	}
98	return NewEmulatedEnv(c)
99}
100
101// EmulatedEnv encapsulates the state of an emulator
102type EmulatedEnv struct {
103	config IntegrationTestConfig
104	server *bttest.Server
105}
106
107// NewEmulatedEnv builds and starts the emulator based environment
108func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) {
109	srv, err := bttest.NewServer("localhost:0", grpc.MaxRecvMsgSize(200<<20), grpc.MaxSendMsgSize(100<<20))
110	if err != nil {
111		return nil, err
112	}
113
114	if config.Project == "" {
115		config.Project = "project"
116	}
117	if config.Instance == "" {
118		config.Instance = "instance"
119	}
120	if config.Table == "" {
121		config.Table = "mytable"
122	}
123	config.AdminEndpoint = srv.Addr
124	config.DataEndpoint = srv.Addr
125
126	env := &EmulatedEnv{
127		config: config,
128		server: srv,
129	}
130	return env, nil
131}
132
133func (e *EmulatedEnv) Peer() *peer.Peer {
134	return nil
135}
136
137// Close stops & cleans up the emulator
138func (e *EmulatedEnv) Close() {
139	e.server.Close()
140}
141
142// Config gets the config used to build this environment
143func (e *EmulatedEnv) Config() IntegrationTestConfig {
144	return e.config
145}
146
147var headersInterceptor = testutil.DefaultHeadersEnforcer()
148
149// NewAdminClient builds a new connected admin client for this environment
150func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
151	o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, AdminScope, clientUserAgent)
152	if err != nil {
153		return nil, err
154	}
155	// Add gRPC client interceptors to supply Google client information.
156	//
157	// Inject interceptors from headersInterceptor, since they are used to verify
158	// client requests under test.
159	o = append(o, btopt.ClientInterceptorOptions(
160		headersInterceptor.StreamInterceptors(),
161		headersInterceptor.UnaryInterceptors())...)
162
163	timeout := 20 * time.Second
164	ctx, _ := context.WithTimeout(context.Background(), timeout)
165
166	o = append(o, option.WithGRPCDialOption(grpc.WithBlock()))
167	conn, err := gtransport.DialInsecure(ctx, o...)
168	if err != nil {
169		return nil, err
170	}
171
172	return NewAdminClient(ctx, e.config.Project, e.config.Instance,
173		option.WithGRPCConn(conn))
174}
175
176// NewInstanceAdminClient returns nil for the emulated environment since the API is not implemented.
177func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
178	return nil, nil
179}
180
181// NewClient builds a new connected data client for this environment
182func (e *EmulatedEnv) NewClient() (*Client, error) {
183	o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, Scope, clientUserAgent)
184	if err != nil {
185		return nil, err
186	}
187	// Add gRPC client interceptors to supply Google client information.
188	//
189	// Inject interceptors from headersInterceptor, since they are used to verify
190	// client requests under test.
191	o = append(o, btopt.ClientInterceptorOptions(
192		headersInterceptor.StreamInterceptors(),
193		headersInterceptor.UnaryInterceptors())...)
194
195	timeout := 20 * time.Second
196	ctx, _ := context.WithTimeout(context.Background(), timeout)
197
198	o = append(o, option.WithGRPCDialOption(grpc.WithBlock()))
199	o = append(o, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
200		grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20))))
201	conn, err := gtransport.DialInsecure(ctx, o...)
202	if err != nil {
203		return nil, err
204	}
205	return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
206}
207
208// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
209type ProdEnv struct {
210	config   IntegrationTestConfig
211	peerInfo *peer.Peer
212}
213
214// NewProdEnv builds the environment representation
215func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) {
216	if config.Project == "" {
217		return nil, errors.New("Project not set")
218	}
219	if config.Instance == "" {
220		return nil, errors.New("Instance not set")
221	}
222	if config.Table == "" {
223		return nil, errors.New("Table not set")
224	}
225
226	env := &ProdEnv{
227		config:   config,
228		peerInfo: &peer.Peer{},
229	}
230	return env, nil
231}
232
233func (e *ProdEnv) Peer() *peer.Peer {
234	return e.peerInfo
235}
236
237// Close is a no-op for production environments
238func (e *ProdEnv) Close() {}
239
240// Config gets the config used to build this environment
241func (e *ProdEnv) Config() IntegrationTestConfig {
242	return e.config
243}
244
245// NewAdminClient builds a new connected admin client for this environment
246func (e *ProdEnv) NewAdminClient() (*AdminClient, error) {
247	clientOpts := headersInterceptor.CallOptions()
248	if endpoint := e.config.AdminEndpoint; endpoint != "" {
249		clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
250	}
251	return NewAdminClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
252}
253
254// NewInstanceAdminClient returns a new connected instance admin client for this environment
255func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
256	clientOpts := headersInterceptor.CallOptions()
257	if endpoint := e.config.AdminEndpoint; endpoint != "" {
258		clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
259	}
260	return NewInstanceAdminClient(context.Background(), e.config.Project, clientOpts...)
261}
262
263// NewClient builds a connected data client for this environment
264func (e *ProdEnv) NewClient() (*Client, error) {
265	clientOpts := headersInterceptor.CallOptions()
266	if endpoint := e.config.DataEndpoint; endpoint != "" {
267		clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
268	}
269
270	if e.config.AttemptDirectPath {
271		// For DirectPath tests, we need to add an interceptor to check the peer IP.
272		clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo))))
273	}
274
275	return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
276}
277