1// Copyright 2017 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package fgrpc // import "fortio.org/fortio/fgrpc" 16 17import ( 18 "context" 19 "fmt" 20 "net" 21 "os" 22 "runtime" 23 "runtime/pprof" 24 "time" 25 26 "google.golang.org/grpc" 27 "google.golang.org/grpc/credentials" 28 "google.golang.org/grpc/health/grpc_health_v1" 29 30 "strings" 31 32 "fortio.org/fortio/fnet" 33 "fortio.org/fortio/log" 34 "fortio.org/fortio/periodic" 35) 36 37// Dial dials grpc using insecure or tls transport security when serverAddr 38// has prefixHTTPS or cert is provided. If override is set to a non empty string, 39// it will override the virtual host name of authority in requests. 40func Dial(o *GRPCRunnerOptions) (conn *grpc.ClientConn, err error) { 41 var opts []grpc.DialOption 42 switch { 43 case o.CACert != "": 44 var creds credentials.TransportCredentials 45 creds, err = credentials.NewClientTLSFromFile(o.CACert, o.CertOverride) 46 if err != nil { 47 log.Errf("Invalid TLS credentials: %v\n", err) 48 return nil, err 49 } 50 log.Infof("Using CA certificate %v to construct TLS credentials", o.CACert) 51 opts = append(opts, grpc.WithTransportCredentials(creds)) 52 case strings.HasPrefix(o.Destination, fnet.PrefixHTTPS): 53 creds := credentials.NewTLS(nil) 54 opts = append(opts, grpc.WithTransportCredentials(creds)) 55 default: 56 opts = append(opts, grpc.WithInsecure()) 57 } 58 serverAddr := grpcDestination(o.Destination) 59 if o.UnixDomainSocket != "" { 60 log.Warnf("Using domain socket %v instead of %v for grpc connection", o.UnixDomainSocket, serverAddr) 61 opts = append(opts, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { 62 return net.DialTimeout(fnet.UnixDomainSocket, o.UnixDomainSocket, timeout) 63 })) 64 } 65 conn, err = grpc.Dial(serverAddr, opts...) 66 if err != nil { 67 log.Errf("failed to connect to %s with certificate %s and override %s: %v", serverAddr, o.CACert, o.CertOverride, err) 68 } 69 return conn, err 70} 71 72// TODO: refactor common parts between http and grpc runners 73 74// GRPCRunnerResults is the aggregated result of an GRPCRunner. 75// Also is the internal type used per thread/goroutine. 76type GRPCRunnerResults struct { 77 periodic.RunnerResults 78 clientH grpc_health_v1.HealthClient 79 reqH grpc_health_v1.HealthCheckRequest 80 clientP PingServerClient 81 reqP PingMessage 82 RetCodes HealthResultMap 83 Destination string 84 Streams int 85 Ping bool 86} 87 88// Run exercises GRPC health check or ping at the target QPS. 89// To be set as the Function in RunnerOptions. 90func (grpcstate *GRPCRunnerResults) Run(t int) { 91 log.Debugf("Calling in %d", t) 92 var err error 93 var res interface{} 94 status := grpc_health_v1.HealthCheckResponse_SERVING 95 if grpcstate.Ping { 96 res, err = grpcstate.clientP.Ping(context.Background(), &grpcstate.reqP) 97 } else { 98 var r *grpc_health_v1.HealthCheckResponse 99 r, err = grpcstate.clientH.Check(context.Background(), &grpcstate.reqH) 100 if r != nil { 101 status = r.Status 102 res = r 103 } 104 } 105 log.Debugf("For %d (ping=%v) got %v %v", t, grpcstate.Ping, err, res) 106 if err != nil { 107 log.Warnf("Error making grpc call: %v", err) 108 grpcstate.RetCodes[Error]++ 109 } else { 110 grpcstate.RetCodes[status.String()]++ 111 } 112} 113 114// GRPCRunnerOptions includes the base RunnerOptions plus http specific 115// options. 116type GRPCRunnerOptions struct { 117 periodic.RunnerOptions 118 Destination string 119 Service string // Service to be checked when using grpc health check 120 Profiler string // file to save profiles to. defaults to no profiling 121 Payload string // Payload to be sent for grpc ping service 122 Streams int // number of streams. total go routines and data streams will be streams*numthreads. 123 Delay time.Duration // Delay to be sent when using grpc ping service 124 CACert string // Path to CA certificate for grpc TLS 125 CertOverride string // Override the cert virtual host of authority for testing 126 AllowInitialErrors bool // whether initial errors don't cause an abort 127 UsePing bool // use our own Ping proto for grpc load instead of standard health check one. 128 UnixDomainSocket string // unix domain socket path to use for physical connection instead of Destination 129} 130 131// RunGRPCTest runs an http test and returns the aggregated stats. 132func RunGRPCTest(o *GRPCRunnerOptions) (*GRPCRunnerResults, error) { 133 if o.Streams < 1 { 134 o.Streams = 1 135 } 136 if o.NumThreads < 1 { 137 // sort of todo, this redoing some of periodic normalize (but we can't use normalize which does too much) 138 o.NumThreads = periodic.DefaultRunnerOptions.NumThreads 139 } 140 if o.UsePing { 141 o.RunType = "GRPC Ping" 142 if o.Delay > 0 { 143 o.RunType += fmt.Sprintf(" Delay=%v", o.Delay) 144 } 145 } else { 146 o.RunType = "GRPC Health" 147 } 148 pll := len(o.Payload) 149 if pll > 0 { 150 o.RunType += fmt.Sprintf(" PayloadLength=%d", pll) 151 } 152 log.Infof("Starting %s test for %s with %d*%d threads at %.1f qps", o.RunType, o.Destination, o.Streams, o.NumThreads, o.QPS) 153 o.NumThreads *= o.Streams 154 r := periodic.NewPeriodicRunner(&o.RunnerOptions) 155 defer r.Options().Abort() 156 numThreads := r.Options().NumThreads // may change 157 total := GRPCRunnerResults{ 158 RetCodes: make(HealthResultMap), 159 Destination: o.Destination, 160 Streams: o.Streams, 161 Ping: o.UsePing, 162 } 163 grpcstate := make([]GRPCRunnerResults, numThreads) 164 out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner 165 var conn *grpc.ClientConn 166 var err error 167 ts := time.Now().UnixNano() 168 for i := 0; i < numThreads; i++ { 169 r.Options().Runners[i] = &grpcstate[i] 170 if (i % o.Streams) == 0 { 171 conn, err = Dial(o) 172 if err != nil { 173 log.Errf("Error in grpc dial for %s %v", o.Destination, err) 174 return nil, err 175 } 176 } else { 177 log.Debugf("Reusing previous client connection for %d", i) 178 } 179 grpcstate[i].Ping = o.UsePing 180 var err error 181 if o.UsePing { 182 grpcstate[i].clientP = NewPingServerClient(conn) 183 if grpcstate[i].clientP == nil { 184 return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination) 185 } 186 grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts} 187 if o.Exactly <= 0 { 188 _, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP) 189 } 190 } else { 191 grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn) 192 if grpcstate[i].clientH == nil { 193 return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination) 194 } 195 grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service} 196 if o.Exactly <= 0 { 197 _, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH) 198 } 199 } 200 if !o.AllowInitialErrors && err != nil { 201 log.Errf("Error in first grpc call (ping = %v) for %s: %v", o.UsePing, o.Destination, err) 202 return nil, err 203 } 204 // Setup the stats for each 'thread' 205 grpcstate[i].RetCodes = make(HealthResultMap) 206 } 207 208 if o.Profiler != "" { 209 fc, err := os.Create(o.Profiler + ".cpu") 210 if err != nil { 211 log.Critf("Unable to create .cpu profile: %v", err) 212 return nil, err 213 } 214 pprof.StartCPUProfile(fc) //nolint: gas,errcheck 215 } 216 total.RunnerResults = r.Run() 217 if o.Profiler != "" { 218 pprof.StopCPUProfile() 219 fm, err := os.Create(o.Profiler + ".mem") 220 if err != nil { 221 log.Critf("Unable to create .mem profile: %v", err) 222 return nil, err 223 } 224 runtime.GC() // get up-to-date statistics 225 pprof.WriteHeapProfile(fm) // nolint:gas,errcheck 226 fm.Close() // nolint:gas,errcheck 227 fmt.Printf("Wrote profile data to %s.{cpu|mem}\n", o.Profiler) 228 } 229 // Numthreads may have reduced 230 numThreads = r.Options().NumThreads 231 keys := []string{} 232 for i := 0; i < numThreads; i++ { 233 // Q: is there some copying each time stats[i] is used? 234 for k := range grpcstate[i].RetCodes { 235 if _, exists := total.RetCodes[k]; !exists { 236 keys = append(keys, k) 237 } 238 total.RetCodes[k] += grpcstate[i].RetCodes[k] 239 } 240 // TODO: if grpc client needs 'cleanup'/Close like http one, do it on original NumThreads 241 } 242 // Cleanup state: 243 r.Options().ReleaseRunners() 244 which := "Health" 245 if o.UsePing { 246 which = "Ping" 247 } 248 for _, k := range keys { 249 _, _ = fmt.Fprintf(out, "%s %s : %d\n", which, k, total.RetCodes[k]) 250 } 251 return &total, nil 252} 253 254// grpcDestination parses dest and returns dest:port based on dest being 255// a hostname, IP address, hostname:port, or ip:port. The original dest is 256// returned if dest is an invalid hostname or invalid IP address. An http/https 257// prefix is removed from dest if one exists and the port number is set to 258// StandardHTTPPort for http, StandardHTTPSPort for https, or DefaultGRPCPort 259// if http, https, or :port is not specified in dest. 260// TODO: change/fix this (NormalizePort and more) 261func grpcDestination(dest string) (parsedDest string) { 262 var port string 263 // strip any unintentional http/https scheme prefixes from dest 264 // and set the port number. 265 switch { 266 case strings.HasPrefix(dest, fnet.PrefixHTTP): 267 parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTP, "", 1), "/") 268 port = fnet.StandardHTTPPort 269 log.Infof("stripping http scheme. grpc destination: %v: grpc port: %s", 270 parsedDest, port) 271 case strings.HasPrefix(dest, fnet.PrefixHTTPS): 272 parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTPS, "", 1), "/") 273 port = fnet.StandardHTTPSPort 274 log.Infof("stripping https scheme. grpc destination: %v. grpc port: %s", 275 parsedDest, port) 276 default: 277 parsedDest = dest 278 port = fnet.DefaultGRPCPort 279 } 280 if _, _, err := net.SplitHostPort(parsedDest); err == nil { 281 return parsedDest 282 } 283 if ip := net.ParseIP(parsedDest); ip != nil { 284 switch { 285 case ip.To4() != nil: 286 parsedDest = ip.String() + fnet.NormalizePort(port) 287 return parsedDest 288 case ip.To16() != nil: 289 parsedDest = "[" + ip.String() + "]" + fnet.NormalizePort(port) 290 return parsedDest 291 } 292 } 293 // parsedDest is in the form of a domain name, 294 // append ":port" and return. 295 parsedDest += fnet.NormalizePort(port) 296 return parsedDest 297} 298