1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package stubserver is a stubbable implementation of
20// google.golang.org/grpc/test/grpc_testing for testing purposes.
21package stubserver
22
23import (
24	"context"
25	"fmt"
26	"net"
27	"time"
28
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/connectivity"
31	"google.golang.org/grpc/resolver"
32	"google.golang.org/grpc/resolver/manual"
33	"google.golang.org/grpc/serviceconfig"
34
35	testpb "google.golang.org/grpc/test/grpc_testing"
36)
37
38// StubServer is a server that is easy to customize within individual test
39// cases.
40type StubServer struct {
41	// Guarantees we satisfy this interface; panics if unimplemented methods are called.
42	testpb.TestServiceServer
43
44	// Customizable implementations of server handlers.
45	EmptyCallF      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
46	UnaryCallF      func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
47	FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error
48
49	// A client connected to this service the test may use.  Created in Start().
50	Client testpb.TestServiceClient
51	CC     *grpc.ClientConn
52	S      *grpc.Server
53
54	// Parameters for Listen and Dial. Defaults will be used if these are empty
55	// before Start.
56	Network string
57	Address string
58	Target  string
59
60	cleanups []func() // Lambdas executed in Stop(); populated by Start().
61
62	// Set automatically if Target == ""
63	R *manual.Resolver
64}
65
66// EmptyCall is the handler for testpb.EmptyCall
67func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
68	return ss.EmptyCallF(ctx, in)
69}
70
71// UnaryCall is the handler for testpb.UnaryCall
72func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
73	return ss.UnaryCallF(ctx, in)
74}
75
76// FullDuplexCall is the handler for testpb.FullDuplexCall
77func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
78	return ss.FullDuplexCallF(stream)
79}
80
81// Start starts the server and creates a client connected to it.
82func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
83	if ss.Network == "" {
84		ss.Network = "tcp"
85	}
86	if ss.Address == "" {
87		ss.Address = "localhost:0"
88	}
89	if ss.Target == "" {
90		ss.R = manual.NewBuilderWithScheme("whatever")
91	}
92
93	lis, err := net.Listen(ss.Network, ss.Address)
94	if err != nil {
95		return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
96	}
97	ss.Address = lis.Addr().String()
98	ss.cleanups = append(ss.cleanups, func() { lis.Close() })
99
100	s := grpc.NewServer(sopts...)
101	testpb.RegisterTestServiceServer(s, ss)
102	go s.Serve(lis)
103	ss.cleanups = append(ss.cleanups, s.Stop)
104	ss.S = s
105
106	opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
107	if ss.R != nil {
108		ss.Target = ss.R.Scheme() + ":///" + ss.Address
109		opts = append(opts, grpc.WithResolvers(ss.R))
110	}
111
112	cc, err := grpc.Dial(ss.Target, opts...)
113	if err != nil {
114		return fmt.Errorf("grpc.Dial(%q) = %v", ss.Target, err)
115	}
116	ss.CC = cc
117	if ss.R != nil {
118		ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
119	}
120	if err := waitForReady(cc); err != nil {
121		return err
122	}
123
124	ss.cleanups = append(ss.cleanups, func() { cc.Close() })
125
126	ss.Client = testpb.NewTestServiceClient(cc)
127	return nil
128}
129
130// NewServiceConfig applies sc to ss.Client using the resolver (if present).
131func (ss *StubServer) NewServiceConfig(sc string) {
132	if ss.R != nil {
133		ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}, ServiceConfig: parseCfg(ss.R, sc)})
134	}
135}
136
137func waitForReady(cc *grpc.ClientConn) error {
138	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
139	defer cancel()
140	for {
141		s := cc.GetState()
142		if s == connectivity.Ready {
143			return nil
144		}
145		if !cc.WaitForStateChange(ctx, s) {
146			// ctx got timeout or canceled.
147			return ctx.Err()
148		}
149	}
150}
151
152// Stop stops ss and cleans up all resources it consumed.
153func (ss *StubServer) Stop() {
154	for i := len(ss.cleanups) - 1; i >= 0; i-- {
155		ss.cleanups[i]()
156	}
157}
158
159func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
160	g := r.CC.ParseServiceConfig(s)
161	if g.Err != nil {
162		panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
163	}
164	return g
165}
166