1// Copyright 2017 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package fgrpc // import "fortio.org/fortio/fgrpc"
16
17import (
18	"context"
19	"fmt"
20	"net"
21	"os"
22	"runtime"
23	"runtime/pprof"
24	"time"
25
26	"google.golang.org/grpc"
27	"google.golang.org/grpc/credentials"
28	"google.golang.org/grpc/health/grpc_health_v1"
29
30	"strings"
31
32	"fortio.org/fortio/fnet"
33	"fortio.org/fortio/log"
34	"fortio.org/fortio/periodic"
35)
36
37// Dial dials grpc using insecure or tls transport security when serverAddr
38// has prefixHTTPS or cert is provided. If override is set to a non empty string,
39// it will override the virtual host name of authority in requests.
40func Dial(o *GRPCRunnerOptions) (conn *grpc.ClientConn, err error) {
41	var opts []grpc.DialOption
42	switch {
43	case o.CACert != "":
44		var creds credentials.TransportCredentials
45		creds, err = credentials.NewClientTLSFromFile(o.CACert, o.CertOverride)
46		if err != nil {
47			log.Errf("Invalid TLS credentials: %v\n", err)
48			return nil, err
49		}
50		log.Infof("Using CA certificate %v to construct TLS credentials", o.CACert)
51		opts = append(opts, grpc.WithTransportCredentials(creds))
52	case strings.HasPrefix(o.Destination, fnet.PrefixHTTPS):
53		creds := credentials.NewTLS(nil)
54		opts = append(opts, grpc.WithTransportCredentials(creds))
55	default:
56		opts = append(opts, grpc.WithInsecure())
57	}
58	serverAddr := grpcDestination(o.Destination)
59	if o.UnixDomainSocket != "" {
60		log.Warnf("Using domain socket %v instead of %v for grpc connection", o.UnixDomainSocket, serverAddr)
61		opts = append(opts, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
62			return net.DialTimeout(fnet.UnixDomainSocket, o.UnixDomainSocket, timeout)
63		}))
64	}
65	conn, err = grpc.Dial(serverAddr, opts...)
66	if err != nil {
67		log.Errf("failed to connect to %s with certificate %s and override %s: %v", serverAddr, o.CACert, o.CertOverride, err)
68	}
69	return conn, err
70}
71
72// TODO: refactor common parts between http and grpc runners
73
74// GRPCRunnerResults is the aggregated result of an GRPCRunner.
75// Also is the internal type used per thread/goroutine.
76type GRPCRunnerResults struct {
77	periodic.RunnerResults
78	clientH     grpc_health_v1.HealthClient
79	reqH        grpc_health_v1.HealthCheckRequest
80	clientP     PingServerClient
81	reqP        PingMessage
82	RetCodes    HealthResultMap
83	Destination string
84	Streams     int
85	Ping        bool
86}
87
88// Run exercises GRPC health check or ping at the target QPS.
89// To be set as the Function in RunnerOptions.
90func (grpcstate *GRPCRunnerResults) Run(t int) {
91	log.Debugf("Calling in %d", t)
92	var err error
93	var res interface{}
94	status := grpc_health_v1.HealthCheckResponse_SERVING
95	if grpcstate.Ping {
96		res, err = grpcstate.clientP.Ping(context.Background(), &grpcstate.reqP)
97	} else {
98		var r *grpc_health_v1.HealthCheckResponse
99		r, err = grpcstate.clientH.Check(context.Background(), &grpcstate.reqH)
100		if r != nil {
101			status = r.Status
102			res = r
103		}
104	}
105	log.Debugf("For %d (ping=%v) got %v %v", t, grpcstate.Ping, err, res)
106	if err != nil {
107		log.Warnf("Error making grpc call: %v", err)
108		grpcstate.RetCodes[Error]++
109	} else {
110		grpcstate.RetCodes[status.String()]++
111	}
112}
113
114// GRPCRunnerOptions includes the base RunnerOptions plus http specific
115// options.
116type GRPCRunnerOptions struct {
117	periodic.RunnerOptions
118	Destination        string
119	Service            string        // Service to be checked when using grpc health check
120	Profiler           string        // file to save profiles to. defaults to no profiling
121	Payload            string        // Payload to be sent for grpc ping service
122	Streams            int           // number of streams. total go routines and data streams will be streams*numthreads.
123	Delay              time.Duration // Delay to be sent when using grpc ping service
124	CACert             string        // Path to CA certificate for grpc TLS
125	CertOverride       string        // Override the cert virtual host of authority for testing
126	AllowInitialErrors bool          // whether initial errors don't cause an abort
127	UsePing            bool          // use our own Ping proto for grpc load instead of standard health check one.
128	UnixDomainSocket   string        // unix domain socket path to use for physical connection instead of Destination
129}
130
131// RunGRPCTest runs an http test and returns the aggregated stats.
132func RunGRPCTest(o *GRPCRunnerOptions) (*GRPCRunnerResults, error) {
133	if o.Streams < 1 {
134		o.Streams = 1
135	}
136	if o.NumThreads < 1 {
137		// sort of todo, this redoing some of periodic normalize (but we can't use normalize which does too much)
138		o.NumThreads = periodic.DefaultRunnerOptions.NumThreads
139	}
140	if o.UsePing {
141		o.RunType = "GRPC Ping"
142		if o.Delay > 0 {
143			o.RunType += fmt.Sprintf(" Delay=%v", o.Delay)
144		}
145	} else {
146		o.RunType = "GRPC Health"
147	}
148	pll := len(o.Payload)
149	if pll > 0 {
150		o.RunType += fmt.Sprintf(" PayloadLength=%d", pll)
151	}
152	log.Infof("Starting %s test for %s with %d*%d threads at %.1f qps", o.RunType, o.Destination, o.Streams, o.NumThreads, o.QPS)
153	o.NumThreads *= o.Streams
154	r := periodic.NewPeriodicRunner(&o.RunnerOptions)
155	defer r.Options().Abort()
156	numThreads := r.Options().NumThreads // may change
157	total := GRPCRunnerResults{
158		RetCodes:    make(HealthResultMap),
159		Destination: o.Destination,
160		Streams:     o.Streams,
161		Ping:        o.UsePing,
162	}
163	grpcstate := make([]GRPCRunnerResults, numThreads)
164	out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner
165	var conn *grpc.ClientConn
166	var err error
167	ts := time.Now().UnixNano()
168	for i := 0; i < numThreads; i++ {
169		r.Options().Runners[i] = &grpcstate[i]
170		if (i % o.Streams) == 0 {
171			conn, err = Dial(o)
172			if err != nil {
173				log.Errf("Error in grpc dial for %s %v", o.Destination, err)
174				return nil, err
175			}
176		} else {
177			log.Debugf("Reusing previous client connection for %d", i)
178		}
179		grpcstate[i].Ping = o.UsePing
180		var err error
181		if o.UsePing {
182			grpcstate[i].clientP = NewPingServerClient(conn)
183			if grpcstate[i].clientP == nil {
184				return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination)
185			}
186			grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts}
187			if o.Exactly <= 0 {
188				_, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP)
189			}
190		} else {
191			grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn)
192			if grpcstate[i].clientH == nil {
193				return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination)
194			}
195			grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service}
196			if o.Exactly <= 0 {
197				_, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH)
198			}
199		}
200		if !o.AllowInitialErrors && err != nil {
201			log.Errf("Error in first grpc call (ping = %v) for %s: %v", o.UsePing, o.Destination, err)
202			return nil, err
203		}
204		// Setup the stats for each 'thread'
205		grpcstate[i].RetCodes = make(HealthResultMap)
206	}
207
208	if o.Profiler != "" {
209		fc, err := os.Create(o.Profiler + ".cpu")
210		if err != nil {
211			log.Critf("Unable to create .cpu profile: %v", err)
212			return nil, err
213		}
214		pprof.StartCPUProfile(fc) //nolint: gas,errcheck
215	}
216	total.RunnerResults = r.Run()
217	if o.Profiler != "" {
218		pprof.StopCPUProfile()
219		fm, err := os.Create(o.Profiler + ".mem")
220		if err != nil {
221			log.Critf("Unable to create .mem profile: %v", err)
222			return nil, err
223		}
224		runtime.GC()               // get up-to-date statistics
225		pprof.WriteHeapProfile(fm) // nolint:gas,errcheck
226		fm.Close()                 // nolint:gas,errcheck
227		fmt.Printf("Wrote profile data to %s.{cpu|mem}\n", o.Profiler)
228	}
229	// Numthreads may have reduced
230	numThreads = r.Options().NumThreads
231	keys := []string{}
232	for i := 0; i < numThreads; i++ {
233		// Q: is there some copying each time stats[i] is used?
234		for k := range grpcstate[i].RetCodes {
235			if _, exists := total.RetCodes[k]; !exists {
236				keys = append(keys, k)
237			}
238			total.RetCodes[k] += grpcstate[i].RetCodes[k]
239		}
240		// TODO: if grpc client needs 'cleanup'/Close like http one, do it on original NumThreads
241	}
242	// Cleanup state:
243	r.Options().ReleaseRunners()
244	which := "Health"
245	if o.UsePing {
246		which = "Ping"
247	}
248	for _, k := range keys {
249		_, _ = fmt.Fprintf(out, "%s %s : %d\n", which, k, total.RetCodes[k])
250	}
251	return &total, nil
252}
253
254// grpcDestination parses dest and returns dest:port based on dest being
255// a hostname, IP address, hostname:port, or ip:port. The original dest is
256// returned if dest is an invalid hostname or invalid IP address. An http/https
257// prefix is removed from dest if one exists and the port number is set to
258// StandardHTTPPort for http, StandardHTTPSPort for https, or DefaultGRPCPort
259// if http, https, or :port is not specified in dest.
260// TODO: change/fix this (NormalizePort and more)
261func grpcDestination(dest string) (parsedDest string) {
262	var port string
263	// strip any unintentional http/https scheme prefixes from dest
264	// and set the port number.
265	switch {
266	case strings.HasPrefix(dest, fnet.PrefixHTTP):
267		parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTP, "", 1), "/")
268		port = fnet.StandardHTTPPort
269		log.Infof("stripping http scheme. grpc destination: %v: grpc port: %s",
270			parsedDest, port)
271	case strings.HasPrefix(dest, fnet.PrefixHTTPS):
272		parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTPS, "", 1), "/")
273		port = fnet.StandardHTTPSPort
274		log.Infof("stripping https scheme. grpc destination: %v. grpc port: %s",
275			parsedDest, port)
276	default:
277		parsedDest = dest
278		port = fnet.DefaultGRPCPort
279	}
280	if _, _, err := net.SplitHostPort(parsedDest); err == nil {
281		return parsedDest
282	}
283	if ip := net.ParseIP(parsedDest); ip != nil {
284		switch {
285		case ip.To4() != nil:
286			parsedDest = ip.String() + fnet.NormalizePort(port)
287			return parsedDest
288		case ip.To16() != nil:
289			parsedDest = "[" + ip.String() + "]" + fnet.NormalizePort(port)
290			return parsedDest
291		}
292	}
293	// parsedDest is in the form of a domain name,
294	// append ":port" and return.
295	parsedDest += fnet.NormalizePort(port)
296	return parsedDest
297}
298