1// Copyright 2017 Istio Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package fgrpc
17
18import (
19	"fmt"
20	"net"
21	"os"
22	"time"
23
24	"golang.org/x/net/context"
25	"google.golang.org/grpc"
26	"google.golang.org/grpc/credentials"
27	"google.golang.org/grpc/health"
28	"google.golang.org/grpc/health/grpc_health_v1"
29	"google.golang.org/grpc/reflection"
30
31	"fortio.org/fortio/fnet"
32	"fortio.org/fortio/log"
33	"fortio.org/fortio/stats"
34)
35
36const (
37	// DefaultHealthServiceName is the default health service name used by fortio.
38	DefaultHealthServiceName = "ping"
39	//Error indicates that something went wrong with healthcheck in grpc
40	Error = "ERROR"
41)
42
43type pingSrv struct {
44}
45
46func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) {
47	log.LogVf("Ping called %+v (ctx %+v)", *in, c)
48	out := *in // copy the input including the payload etc
49	out.Ts = time.Now().UnixNano()
50	if in.DelayNanos > 0 {
51		s := time.Duration(in.DelayNanos)
52		log.LogVf("GRPC ping: sleeping for %v", s)
53		time.Sleep(s)
54	}
55	return &out, nil
56}
57
58// PingServer starts a grpc ping (and health) echo server.
59// returns the port being bound (useful when passing "0" as the port to
60// get a dynamic server). Pass the healthServiceName to use for the
61// grpc service name health check (or pass DefaultHealthServiceName)
62// to be marked as SERVING. Pass maxConcurrentStreams > 0 to set that option.
63func PingServer(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) net.Addr {
64	socket, addr := fnet.Listen("grpc '"+healthServiceName+"'", port)
65	if addr == nil {
66		return nil
67	}
68	var grpcOptions []grpc.ServerOption
69	if maxConcurrentStreams > 0 {
70		log.Infof("Setting grpc.MaxConcurrentStreams server to %d", maxConcurrentStreams)
71		grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(maxConcurrentStreams))
72	}
73	if cert != "" && key != "" {
74		creds, err := credentials.NewServerTLSFromFile(cert, key)
75		if err != nil {
76			log.Fatalf("Invalid TLS credentials: %v\n", err)
77		}
78		log.Infof("Using server certificate %v to construct TLS credentials", cert)
79		log.Infof("Using server key %v to construct TLS credentials", key)
80		grpcOptions = append(grpcOptions, grpc.Creds(creds))
81	}
82	grpcServer := grpc.NewServer(grpcOptions...)
83	reflection.Register(grpcServer)
84	healthServer := health.NewServer()
85	healthServer.SetServingStatus(healthServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
86	grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
87	RegisterPingServerServer(grpcServer, &pingSrv{})
88	go func() {
89		if err := grpcServer.Serve(socket); err != nil {
90			log.Fatalf("failed to start grpc server: %v", err)
91		}
92	}()
93	return addr
94}
95
96// PingServerTCP is PingServer() assuming tcp instead of possible unix domain socket port, returns
97// the numeric port.
98func PingServerTCP(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) int {
99	addr := PingServer(port, cert, key, healthServiceName, maxConcurrentStreams)
100	if addr == nil {
101		return -1
102	}
103	return addr.(*net.TCPAddr).Port
104}
105
106// PingClientCall calls the ping service (presumably running as PingServer on
107// the destination). returns the average round trip in seconds.
108func PingClientCall(serverAddr, cacert string, n int, payload string, delay time.Duration) (float64, error) {
109	o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert}
110	conn, err := Dial(&o) // somehow this never seem to error out, error comes later
111	if err != nil {
112		return -1, err // error already logged
113	}
114	msg := &PingMessage{Payload: payload, DelayNanos: delay.Nanoseconds()}
115	cli := NewPingServerClient(conn)
116	// Warm up:
117	_, err = cli.Ping(context.Background(), msg)
118	if err != nil {
119		log.Errf("grpc error from Ping0 %v", err)
120		return -1, err
121	}
122	skewHistogram := stats.NewHistogram(-10, 2)
123	rttHistogram := stats.NewHistogram(0, 10)
124	for i := 1; i <= n; i++ {
125		msg.Seq = int64(i)
126		t1a := time.Now().UnixNano()
127		msg.Ts = t1a
128		res1, err := cli.Ping(context.Background(), msg)
129		t2a := time.Now().UnixNano()
130		if err != nil {
131			log.Errf("grpc error from Ping1 iter %d: %v", i, err)
132			return -1, err
133		}
134		t1b := res1.Ts
135		res2, err := cli.Ping(context.Background(), msg)
136		t3a := time.Now().UnixNano()
137		t2b := res2.Ts
138		if err != nil {
139			log.Errf("grpc error from Ping2 iter %d: %v", i, err)
140			return -1, err
141		}
142		rt1 := t2a - t1a
143		rttHistogram.Record(float64(rt1) / 1000.)
144		rt2 := t3a - t2a
145		rttHistogram.Record(float64(rt2) / 1000.)
146		rtR := t2b - t1b
147		rttHistogram.Record(float64(rtR) / 1000.)
148		midR := t1b + (rtR / 2)
149		avgRtt := (rt1 + rt2 + rtR) / 3
150		x := (midR - t2a)
151		log.Infof("Ping RTT %d (avg of %d, %d, %d ns) clock skew %d",
152			avgRtt, rt1, rtR, rt2, x)
153		skewHistogram.Record(float64(x) / 1000.)
154		msg = res2
155	}
156	skewHistogram.Print(os.Stdout, "Clock skew histogram usec", []float64{50})
157	rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
158	return rttHistogram.Avg() / 1e6, nil
159}
160
161// HealthResultMap short cut for the map of results to count.
162type HealthResultMap map[string]int64
163
164// GrpcHealthCheck makes a grpc client call to the standard grpc health check
165// service.
166func GrpcHealthCheck(serverAddr, cacert string, svcname string, n int) (*HealthResultMap, error) {
167	log.Debugf("GrpcHealthCheck for %s svc '%s', %d iterations", serverAddr, svcname, n)
168	o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert}
169	conn, err := Dial(&o)
170	if err != nil {
171		return nil, err
172	}
173	msg := &grpc_health_v1.HealthCheckRequest{Service: svcname}
174	cli := grpc_health_v1.NewHealthClient(conn)
175	rttHistogram := stats.NewHistogram(0, 10)
176	statuses := make(HealthResultMap)
177
178	for i := 1; i <= n; i++ {
179		start := time.Now()
180		res, err := cli.Check(context.Background(), msg)
181		dur := time.Since(start)
182		log.LogVf("Reply from health check %d: %+v", i, res)
183		if err != nil {
184			log.Errf("grpc error from Check %v", err)
185			return nil, err
186		}
187		statuses[res.Status.String()]++
188		rttHistogram.Record(dur.Seconds() * 1000000.)
189	}
190	rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
191	for k, v := range statuses {
192		fmt.Printf("Health %s : %d\n", k, v)
193	}
194	return &statuses, nil
195}
196