1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package stubserver is a stubbable implementation of 20// google.golang.org/grpc/test/grpc_testing for testing purposes. 21package stubserver 22 23import ( 24 "context" 25 "fmt" 26 "net" 27 "time" 28 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/connectivity" 31 "google.golang.org/grpc/resolver" 32 "google.golang.org/grpc/resolver/manual" 33 "google.golang.org/grpc/serviceconfig" 34 35 testpb "google.golang.org/grpc/test/grpc_testing" 36) 37 38// StubServer is a server that is easy to customize within individual test 39// cases. 40type StubServer struct { 41 // Guarantees we satisfy this interface; panics if unimplemented methods are called. 42 testpb.TestServiceServer 43 44 // Customizable implementations of server handlers. 45 EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) 46 UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 47 FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error 48 49 // A client connected to this service the test may use. Created in Start(). 50 Client testpb.TestServiceClient 51 CC *grpc.ClientConn 52 S *grpc.Server 53 54 // Parameters for Listen and Dial. Defaults will be used if these are empty 55 // before Start. 56 Network string 57 Address string 58 Target string 59 60 cleanups []func() // Lambdas executed in Stop(); populated by Start(). 61 62 // Set automatically if Target == "" 63 R *manual.Resolver 64} 65 66// EmptyCall is the handler for testpb.EmptyCall 67func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 68 return ss.EmptyCallF(ctx, in) 69} 70 71// UnaryCall is the handler for testpb.UnaryCall 72func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 73 return ss.UnaryCallF(ctx, in) 74} 75 76// FullDuplexCall is the handler for testpb.FullDuplexCall 77func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 78 return ss.FullDuplexCallF(stream) 79} 80 81// Start starts the server and creates a client connected to it. 82func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { 83 if ss.Network == "" { 84 ss.Network = "tcp" 85 } 86 if ss.Address == "" { 87 ss.Address = "localhost:0" 88 } 89 if ss.Target == "" { 90 ss.R = manual.NewBuilderWithScheme("whatever") 91 } 92 93 lis, err := net.Listen(ss.Network, ss.Address) 94 if err != nil { 95 return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err) 96 } 97 ss.Address = lis.Addr().String() 98 ss.cleanups = append(ss.cleanups, func() { lis.Close() }) 99 100 s := grpc.NewServer(sopts...) 101 testpb.RegisterTestServiceServer(s, ss) 102 go s.Serve(lis) 103 ss.cleanups = append(ss.cleanups, s.Stop) 104 ss.S = s 105 106 opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) 107 if ss.R != nil { 108 ss.Target = ss.R.Scheme() + ":///" + ss.Address 109 opts = append(opts, grpc.WithResolvers(ss.R)) 110 } 111 112 cc, err := grpc.Dial(ss.Target, opts...) 113 if err != nil { 114 return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err) 115 } 116 ss.CC = cc 117 if ss.R != nil { 118 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}}) 119 } 120 if err := waitForReady(cc); err != nil { 121 return err 122 } 123 124 ss.cleanups = append(ss.cleanups, func() { cc.Close() }) 125 126 ss.Client = testpb.NewTestServiceClient(cc) 127 return nil 128} 129 130// NewServiceConfig applies sc to ss.Client using the resolver (if present). 131func (ss *StubServer) NewServiceConfig(sc string) { 132 if ss.R != nil { 133 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)}) 134 } 135} 136 137func waitForReady(cc *grpc.ClientConn) error { 138 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 139 defer cancel() 140 for { 141 s := cc.GetState() 142 if s == connectivity.Ready { 143 return nil 144 } 145 if !cc.WaitForStateChange(ctx, s) { 146 // ctx got timeout or canceled. 147 return ctx.Err() 148 } 149 } 150} 151 152// Stop stops ss and cleans up all resources it consumed. 153func (ss *StubServer) Stop() { 154 for i := len(ss.cleanups) - 1; i >= 0; i-- { 155 ss.cleanups[i]() 156 } 157} 158 159func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { 160 g := r.CC.ParseServiceConfig(s) 161 if g.Err != nil { 162 panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err)) 163 } 164 return g 165} 166