1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "errors" 22 "flag" 23 "fmt" 24 "strings" 25 "time" 26 27 "cloud.google.com/go/bigtable/bttest" 28 btopt "cloud.google.com/go/bigtable/internal/option" 29 "cloud.google.com/go/internal/testutil" 30 "google.golang.org/api/option" 31 gtransport "google.golang.org/api/transport/grpc" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/peer" 34) 35 36var legacyUseProd string 37var integrationConfig IntegrationTestConfig 38 39func init() { 40 c := &integrationConfig 41 42 flag.BoolVar(&c.UseProd, "it.use-prod", false, "Use remote bigtable instead of local emulator") 43 flag.StringVar(&c.AdminEndpoint, "it.admin-endpoint", "", "Admin api host and port") 44 flag.StringVar(&c.DataEndpoint, "it.data-endpoint", "", "Data api host and port") 45 flag.StringVar(&c.Project, "it.project", "", "Project to use for integration test") 46 flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use") 47 flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use") 48 flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create") 49 flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath") 50 flag.BoolVar(&c.DirectPathIPV4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a ipv4-only VM") 51 52 // Backwards compat 53 flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`) 54 55} 56 57// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing 58type IntegrationTestConfig struct { 59 UseProd bool 60 AdminEndpoint string 61 DataEndpoint string 62 Project string 63 Instance string 64 Cluster string 65 Table string 66 AttemptDirectPath bool 67 DirectPathIPV4Only bool 68} 69 70// IntegrationEnv represents a testing environment. 71// The environment can be implemented using production or an emulator 72type IntegrationEnv interface { 73 Config() IntegrationTestConfig 74 NewAdminClient() (*AdminClient, error) 75 // NewInstanceAdminClient will return nil if instance administration is unsupported in this environment 76 NewInstanceAdminClient() (*InstanceAdminClient, error) 77 NewClient() (*Client, error) 78 Close() 79 Peer() *peer.Peer 80} 81 82// NewIntegrationEnv creates a new environment based on the command line args 83func NewIntegrationEnv() (IntegrationEnv, error) { 84 c := integrationConfig 85 86 if legacyUseProd != "" { 87 fmt.Println("WARNING: using legacy commandline arg -use_prod, please switch to -it.*") 88 parts := strings.SplitN(legacyUseProd, ",", 3) 89 c.UseProd = true 90 c.Project = parts[0] 91 c.Instance = parts[1] 92 c.Table = parts[2] 93 } 94 95 if integrationConfig.UseProd { 96 return NewProdEnv(c) 97 } 98 return NewEmulatedEnv(c) 99} 100 101// EmulatedEnv encapsulates the state of an emulator 102type EmulatedEnv struct { 103 config IntegrationTestConfig 104 server *bttest.Server 105} 106 107// NewEmulatedEnv builds and starts the emulator based environment 108func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) { 109 srv, err := bttest.NewServer("localhost:0", grpc.MaxRecvMsgSize(200<<20), grpc.MaxSendMsgSize(100<<20)) 110 if err != nil { 111 return nil, err 112 } 113 114 if config.Project == "" { 115 config.Project = "project" 116 } 117 if config.Instance == "" { 118 config.Instance = "instance" 119 } 120 if config.Table == "" { 121 config.Table = "mytable" 122 } 123 config.AdminEndpoint = srv.Addr 124 config.DataEndpoint = srv.Addr 125 126 env := &EmulatedEnv{ 127 config: config, 128 server: srv, 129 } 130 return env, nil 131} 132 133func (e *EmulatedEnv) Peer() *peer.Peer { 134 return nil 135} 136 137// Close stops & cleans up the emulator 138func (e *EmulatedEnv) Close() { 139 e.server.Close() 140} 141 142// Config gets the config used to build this environment 143func (e *EmulatedEnv) Config() IntegrationTestConfig { 144 return e.config 145} 146 147var headersInterceptor = testutil.DefaultHeadersEnforcer() 148 149// NewAdminClient builds a new connected admin client for this environment 150func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) { 151 o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, AdminScope, clientUserAgent) 152 if err != nil { 153 return nil, err 154 } 155 // Add gRPC client interceptors to supply Google client information. 156 // 157 // Inject interceptors from headersInterceptor, since they are used to verify 158 // client requests under test. 159 o = append(o, btopt.ClientInterceptorOptions( 160 headersInterceptor.StreamInterceptors(), 161 headersInterceptor.UnaryInterceptors())...) 162 163 timeout := 20 * time.Second 164 ctx, _ := context.WithTimeout(context.Background(), timeout) 165 166 o = append(o, option.WithGRPCDialOption(grpc.WithBlock())) 167 conn, err := gtransport.DialInsecure(ctx, o...) 168 if err != nil { 169 return nil, err 170 } 171 172 return NewAdminClient(ctx, e.config.Project, e.config.Instance, 173 option.WithGRPCConn(conn)) 174} 175 176// NewInstanceAdminClient returns nil for the emulated environment since the API is not implemented. 177func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) { 178 return nil, nil 179} 180 181// NewClient builds a new connected data client for this environment 182func (e *EmulatedEnv) NewClient() (*Client, error) { 183 o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, Scope, clientUserAgent) 184 if err != nil { 185 return nil, err 186 } 187 // Add gRPC client interceptors to supply Google client information. 188 // 189 // Inject interceptors from headersInterceptor, since they are used to verify 190 // client requests under test. 191 o = append(o, btopt.ClientInterceptorOptions( 192 headersInterceptor.StreamInterceptors(), 193 headersInterceptor.UnaryInterceptors())...) 194 195 timeout := 20 * time.Second 196 ctx, _ := context.WithTimeout(context.Background(), timeout) 197 198 o = append(o, option.WithGRPCDialOption(grpc.WithBlock())) 199 o = append(o, option.WithGRPCDialOption(grpc.WithDefaultCallOptions( 200 grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)))) 201 conn, err := gtransport.DialInsecure(ctx, o...) 202 if err != nil { 203 return nil, err 204 } 205 return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn)) 206} 207 208// ProdEnv encapsulates the state necessary to connect to the external Bigtable service 209type ProdEnv struct { 210 config IntegrationTestConfig 211 peerInfo *peer.Peer 212} 213 214// NewProdEnv builds the environment representation 215func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) { 216 if config.Project == "" { 217 return nil, errors.New("Project not set") 218 } 219 if config.Instance == "" { 220 return nil, errors.New("Instance not set") 221 } 222 if config.Table == "" { 223 return nil, errors.New("Table not set") 224 } 225 226 env := &ProdEnv{ 227 config: config, 228 peerInfo: &peer.Peer{}, 229 } 230 return env, nil 231} 232 233func (e *ProdEnv) Peer() *peer.Peer { 234 return e.peerInfo 235} 236 237// Close is a no-op for production environments 238func (e *ProdEnv) Close() {} 239 240// Config gets the config used to build this environment 241func (e *ProdEnv) Config() IntegrationTestConfig { 242 return e.config 243} 244 245// NewAdminClient builds a new connected admin client for this environment 246func (e *ProdEnv) NewAdminClient() (*AdminClient, error) { 247 clientOpts := headersInterceptor.CallOptions() 248 if endpoint := e.config.AdminEndpoint; endpoint != "" { 249 clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) 250 } 251 return NewAdminClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...) 252} 253 254// NewInstanceAdminClient returns a new connected instance admin client for this environment 255func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) { 256 clientOpts := headersInterceptor.CallOptions() 257 if endpoint := e.config.AdminEndpoint; endpoint != "" { 258 clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) 259 } 260 return NewInstanceAdminClient(context.Background(), e.config.Project, clientOpts...) 261} 262 263// NewClient builds a connected data client for this environment 264func (e *ProdEnv) NewClient() (*Client, error) { 265 clientOpts := headersInterceptor.CallOptions() 266 if endpoint := e.config.DataEndpoint; endpoint != "" { 267 clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) 268 } 269 270 if e.config.AttemptDirectPath { 271 // For DirectPath tests, we need to add an interceptor to check the peer IP. 272 clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo)))) 273 } 274 275 return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...) 276} 277