1// Copyright 2017 Istio Authors. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15 16package fgrpc 17 18import ( 19 "fmt" 20 "net" 21 "os" 22 "time" 23 24 "golang.org/x/net/context" 25 "google.golang.org/grpc" 26 "google.golang.org/grpc/credentials" 27 "google.golang.org/grpc/health" 28 "google.golang.org/grpc/health/grpc_health_v1" 29 "google.golang.org/grpc/reflection" 30 31 "fortio.org/fortio/fnet" 32 "fortio.org/fortio/log" 33 "fortio.org/fortio/stats" 34) 35 36const ( 37 // DefaultHealthServiceName is the default health service name used by fortio. 38 DefaultHealthServiceName = "ping" 39 //Error indicates that something went wrong with healthcheck in grpc 40 Error = "ERROR" 41) 42 43type pingSrv struct { 44} 45 46func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) { 47 log.LogVf("Ping called %+v (ctx %+v)", *in, c) 48 out := *in // copy the input including the payload etc 49 out.Ts = time.Now().UnixNano() 50 if in.DelayNanos > 0 { 51 s := time.Duration(in.DelayNanos) 52 log.LogVf("GRPC ping: sleeping for %v", s) 53 time.Sleep(s) 54 } 55 return &out, nil 56} 57 58// PingServer starts a grpc ping (and health) echo server. 59// returns the port being bound (useful when passing "0" as the port to 60// get a dynamic server). Pass the healthServiceName to use for the 61// grpc service name health check (or pass DefaultHealthServiceName) 62// to be marked as SERVING. Pass maxConcurrentStreams > 0 to set that option. 63func PingServer(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) net.Addr { 64 socket, addr := fnet.Listen("grpc '"+healthServiceName+"'", port) 65 if addr == nil { 66 return nil 67 } 68 var grpcOptions []grpc.ServerOption 69 if maxConcurrentStreams > 0 { 70 log.Infof("Setting grpc.MaxConcurrentStreams server to %d", maxConcurrentStreams) 71 grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(maxConcurrentStreams)) 72 } 73 if cert != "" && key != "" { 74 creds, err := credentials.NewServerTLSFromFile(cert, key) 75 if err != nil { 76 log.Fatalf("Invalid TLS credentials: %v\n", err) 77 } 78 log.Infof("Using server certificate %v to construct TLS credentials", cert) 79 log.Infof("Using server key %v to construct TLS credentials", key) 80 grpcOptions = append(grpcOptions, grpc.Creds(creds)) 81 } 82 grpcServer := grpc.NewServer(grpcOptions...) 83 reflection.Register(grpcServer) 84 healthServer := health.NewServer() 85 healthServer.SetServingStatus(healthServiceName, grpc_health_v1.HealthCheckResponse_SERVING) 86 grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) 87 RegisterPingServerServer(grpcServer, &pingSrv{}) 88 go func() { 89 if err := grpcServer.Serve(socket); err != nil { 90 log.Fatalf("failed to start grpc server: %v", err) 91 } 92 }() 93 return addr 94} 95 96// PingServerTCP is PingServer() assuming tcp instead of possible unix domain socket port, returns 97// the numeric port. 98func PingServerTCP(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) int { 99 addr := PingServer(port, cert, key, healthServiceName, maxConcurrentStreams) 100 if addr == nil { 101 return -1 102 } 103 return addr.(*net.TCPAddr).Port 104} 105 106// PingClientCall calls the ping service (presumably running as PingServer on 107// the destination). returns the average round trip in seconds. 108func PingClientCall(serverAddr, cacert string, n int, payload string, delay time.Duration) (float64, error) { 109 o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert} 110 conn, err := Dial(&o) // somehow this never seem to error out, error comes later 111 if err != nil { 112 return -1, err // error already logged 113 } 114 msg := &PingMessage{Payload: payload, DelayNanos: delay.Nanoseconds()} 115 cli := NewPingServerClient(conn) 116 // Warm up: 117 _, err = cli.Ping(context.Background(), msg) 118 if err != nil { 119 log.Errf("grpc error from Ping0 %v", err) 120 return -1, err 121 } 122 skewHistogram := stats.NewHistogram(-10, 2) 123 rttHistogram := stats.NewHistogram(0, 10) 124 for i := 1; i <= n; i++ { 125 msg.Seq = int64(i) 126 t1a := time.Now().UnixNano() 127 msg.Ts = t1a 128 res1, err := cli.Ping(context.Background(), msg) 129 t2a := time.Now().UnixNano() 130 if err != nil { 131 log.Errf("grpc error from Ping1 iter %d: %v", i, err) 132 return -1, err 133 } 134 t1b := res1.Ts 135 res2, err := cli.Ping(context.Background(), msg) 136 t3a := time.Now().UnixNano() 137 t2b := res2.Ts 138 if err != nil { 139 log.Errf("grpc error from Ping2 iter %d: %v", i, err) 140 return -1, err 141 } 142 rt1 := t2a - t1a 143 rttHistogram.Record(float64(rt1) / 1000.) 144 rt2 := t3a - t2a 145 rttHistogram.Record(float64(rt2) / 1000.) 146 rtR := t2b - t1b 147 rttHistogram.Record(float64(rtR) / 1000.) 148 midR := t1b + (rtR / 2) 149 avgRtt := (rt1 + rt2 + rtR) / 3 150 x := (midR - t2a) 151 log.Infof("Ping RTT %d (avg of %d, %d, %d ns) clock skew %d", 152 avgRtt, rt1, rtR, rt2, x) 153 skewHistogram.Record(float64(x) / 1000.) 154 msg = res2 155 } 156 skewHistogram.Print(os.Stdout, "Clock skew histogram usec", []float64{50}) 157 rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50}) 158 return rttHistogram.Avg() / 1e6, nil 159} 160 161// HealthResultMap short cut for the map of results to count. 162type HealthResultMap map[string]int64 163 164// GrpcHealthCheck makes a grpc client call to the standard grpc health check 165// service. 166func GrpcHealthCheck(serverAddr, cacert string, svcname string, n int) (*HealthResultMap, error) { 167 log.Debugf("GrpcHealthCheck for %s svc '%s', %d iterations", serverAddr, svcname, n) 168 o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert} 169 conn, err := Dial(&o) 170 if err != nil { 171 return nil, err 172 } 173 msg := &grpc_health_v1.HealthCheckRequest{Service: svcname} 174 cli := grpc_health_v1.NewHealthClient(conn) 175 rttHistogram := stats.NewHistogram(0, 10) 176 statuses := make(HealthResultMap) 177 178 for i := 1; i <= n; i++ { 179 start := time.Now() 180 res, err := cli.Check(context.Background(), msg) 181 dur := time.Since(start) 182 log.LogVf("Reply from health check %d: %+v", i, res) 183 if err != nil { 184 log.Errf("grpc error from Check %v", err) 185 return nil, err 186 } 187 statuses[res.Status.String()]++ 188 rttHistogram.Record(dur.Seconds() * 1000000.) 189 } 190 rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50}) 191 for k, v := range statuses { 192 fmt.Printf("Health %s : %d\n", k, v) 193 } 194 return &statuses, nil 195} 196