1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package agent
16
17import (
18	"math"
19	"net"
20	"os"
21	"os/exec"
22	"strings"
23
24	"github.com/coreos/etcd/functional/rpcpb"
25	"github.com/coreos/etcd/pkg/proxy"
26
27	"go.uber.org/zap"
28	"google.golang.org/grpc"
29)
30
31// Server implements "rpcpb.TransportServer"
32// and other etcd operations as an agent
33// no need to lock fields since request operations are
34// serialized in tester-side
35type Server struct {
36	grpcServer *grpc.Server
37	lg         *zap.Logger
38
39	network string
40	address string
41	ln      net.Listener
42
43	rpcpb.TransportServer
44	last rpcpb.Operation
45
46	*rpcpb.Member
47	*rpcpb.Tester
48
49	etcdCmd     *exec.Cmd
50	etcdLogFile *os.File
51
52	// forward incoming advertise URLs traffic to listen URLs
53	advertiseClientPortToProxy map[int]proxy.Server
54	advertisePeerPortToProxy   map[int]proxy.Server
55}
56
57// NewServer returns a new agent server.
58func NewServer(
59	lg *zap.Logger,
60	network string,
61	address string,
62) *Server {
63	return &Server{
64		lg:                         lg,
65		network:                    network,
66		address:                    address,
67		last:                       rpcpb.Operation_NOT_STARTED,
68		advertiseClientPortToProxy: make(map[int]proxy.Server),
69		advertisePeerPortToProxy:   make(map[int]proxy.Server),
70	}
71}
72
73const (
74	maxRequestBytes   = 1.5 * 1024 * 1024
75	grpcOverheadBytes = 512 * 1024
76	maxStreams        = math.MaxUint32
77	maxSendBytes      = math.MaxInt32
78)
79
80// StartServe starts serving agent server.
81func (srv *Server) StartServe() error {
82	var err error
83	srv.ln, err = net.Listen(srv.network, srv.address)
84	if err != nil {
85		return err
86	}
87
88	var opts []grpc.ServerOption
89	opts = append(opts, grpc.MaxRecvMsgSize(int(maxRequestBytes+grpcOverheadBytes)))
90	opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
91	opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
92	srv.grpcServer = grpc.NewServer(opts...)
93
94	rpcpb.RegisterTransportServer(srv.grpcServer, srv)
95
96	srv.lg.Info(
97		"gRPC server started",
98		zap.String("address", srv.address),
99		zap.String("listener-address", srv.ln.Addr().String()),
100	)
101	err = srv.grpcServer.Serve(srv.ln)
102	if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
103		srv.lg.Info(
104			"gRPC server is shut down",
105			zap.String("address", srv.address),
106			zap.Error(err),
107		)
108	} else {
109		srv.lg.Warn(
110			"gRPC server returned with error",
111			zap.String("address", srv.address),
112			zap.Error(err),
113		)
114	}
115	return err
116}
117
118// Stop stops serving gRPC server.
119func (srv *Server) Stop() {
120	srv.lg.Info("gRPC server stopping", zap.String("address", srv.address))
121	srv.grpcServer.Stop()
122	srv.lg.Info("gRPC server stopped", zap.String("address", srv.address))
123}
124
125// Transport communicates with etcd tester.
126func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (err error) {
127	errc := make(chan error)
128	go func() {
129		for {
130			var req *rpcpb.Request
131			req, err = stream.Recv()
132			if err != nil {
133				errc <- err
134				// TODO: handle error and retry
135				return
136			}
137			if req.Member != nil {
138				srv.Member = req.Member
139			}
140			if req.Tester != nil {
141				srv.Tester = req.Tester
142			}
143
144			var resp *rpcpb.Response
145			resp, err = srv.handleTesterRequest(req)
146			if err != nil {
147				errc <- err
148				// TODO: handle error and retry
149				return
150			}
151
152			if err = stream.Send(resp); err != nil {
153				errc <- err
154				// TODO: handle error and retry
155				return
156			}
157		}
158	}()
159
160	select {
161	case err = <-errc:
162	case <-stream.Context().Done():
163		err = stream.Context().Err()
164	}
165	return err
166}
167