1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package agent 16 17import ( 18 "math" 19 "net" 20 "os" 21 "os/exec" 22 "strings" 23 24 "github.com/coreos/etcd/functional/rpcpb" 25 "github.com/coreos/etcd/pkg/proxy" 26 27 "go.uber.org/zap" 28 "google.golang.org/grpc" 29) 30 31// Server implements "rpcpb.TransportServer" 32// and other etcd operations as an agent 33// no need to lock fields since request operations are 34// serialized in tester-side 35type Server struct { 36 grpcServer *grpc.Server 37 lg *zap.Logger 38 39 network string 40 address string 41 ln net.Listener 42 43 rpcpb.TransportServer 44 last rpcpb.Operation 45 46 *rpcpb.Member 47 *rpcpb.Tester 48 49 etcdCmd *exec.Cmd 50 etcdLogFile *os.File 51 52 // forward incoming advertise URLs traffic to listen URLs 53 advertiseClientPortToProxy map[int]proxy.Server 54 advertisePeerPortToProxy map[int]proxy.Server 55} 56 57// NewServer returns a new agent server. 58func NewServer( 59 lg *zap.Logger, 60 network string, 61 address string, 62) *Server { 63 return &Server{ 64 lg: lg, 65 network: network, 66 address: address, 67 last: rpcpb.Operation_NOT_STARTED, 68 advertiseClientPortToProxy: make(map[int]proxy.Server), 69 advertisePeerPortToProxy: make(map[int]proxy.Server), 70 } 71} 72 73const ( 74 maxRequestBytes = 1.5 * 1024 * 1024 75 grpcOverheadBytes = 512 * 1024 76 maxStreams = math.MaxUint32 77 maxSendBytes = math.MaxInt32 78) 79 80// StartServe starts serving agent server. 81func (srv *Server) StartServe() error { 82 var err error 83 srv.ln, err = net.Listen(srv.network, srv.address) 84 if err != nil { 85 return err 86 } 87 88 var opts []grpc.ServerOption 89 opts = append(opts, grpc.MaxRecvMsgSize(int(maxRequestBytes+grpcOverheadBytes))) 90 opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) 91 opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) 92 srv.grpcServer = grpc.NewServer(opts...) 93 94 rpcpb.RegisterTransportServer(srv.grpcServer, srv) 95 96 srv.lg.Info( 97 "gRPC server started", 98 zap.String("address", srv.address), 99 zap.String("listener-address", srv.ln.Addr().String()), 100 ) 101 err = srv.grpcServer.Serve(srv.ln) 102 if err != nil && strings.Contains(err.Error(), "use of closed network connection") { 103 srv.lg.Info( 104 "gRPC server is shut down", 105 zap.String("address", srv.address), 106 zap.Error(err), 107 ) 108 } else { 109 srv.lg.Warn( 110 "gRPC server returned with error", 111 zap.String("address", srv.address), 112 zap.Error(err), 113 ) 114 } 115 return err 116} 117 118// Stop stops serving gRPC server. 119func (srv *Server) Stop() { 120 srv.lg.Info("gRPC server stopping", zap.String("address", srv.address)) 121 srv.grpcServer.Stop() 122 srv.lg.Info("gRPC server stopped", zap.String("address", srv.address)) 123} 124 125// Transport communicates with etcd tester. 126func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (err error) { 127 errc := make(chan error) 128 go func() { 129 for { 130 var req *rpcpb.Request 131 req, err = stream.Recv() 132 if err != nil { 133 errc <- err 134 // TODO: handle error and retry 135 return 136 } 137 if req.Member != nil { 138 srv.Member = req.Member 139 } 140 if req.Tester != nil { 141 srv.Tester = req.Tester 142 } 143 144 var resp *rpcpb.Response 145 resp, err = srv.handleTesterRequest(req) 146 if err != nil { 147 errc <- err 148 // TODO: handle error and retry 149 return 150 } 151 152 if err = stream.Send(resp); err != nil { 153 errc <- err 154 // TODO: handle error and retry 155 return 156 } 157 } 158 }() 159 160 select { 161 case err = <-errc: 162 case <-stream.Context().Done(): 163 err = stream.Context().Err() 164 } 165 return err 166} 167