1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package profiler is a client for the Cloud Profiler service.
16//
17// Usage example:
18//
19//   import "cloud.google.com/go/profiler"
20//   ...
21//   if err := profiler.Start(profiler.Config{Service: "my-service"}); err != nil {
22//       // TODO: Handle error.
23//   }
24//
25// Calling Start will start a goroutine to collect profiles and upload to
26// the profiler server, at the rhythm specified by the server.
27//
28// The caller must provide the service string in the config, and may provide
29// other information as well. See Config for details.
30//
31// Profiler has CPU, heap and goroutine profiling enabled by default. Mutex
32// profiling can be enabled in the config. Note that goroutine and mutex
33// profiles are shown as "threads" and "contention" profiles in the profiler
34// UI.
35package profiler
36
37import (
38	"bytes"
39	"context"
40	"errors"
41	"fmt"
42	"log"
43	"os"
44	"regexp"
45	"runtime"
46	"runtime/pprof"
47	"strings"
48	"sync"
49	"time"
50
51	gcemd "cloud.google.com/go/compute/metadata"
52	"cloud.google.com/go/internal/version"
53	"github.com/golang/protobuf/proto"
54	"github.com/golang/protobuf/ptypes"
55	"github.com/google/pprof/profile"
56	gax "github.com/googleapis/gax-go/v2"
57	"google.golang.org/api/option"
58	gtransport "google.golang.org/api/transport/grpc"
59	pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
60	edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
61	"google.golang.org/grpc"
62	"google.golang.org/grpc/codes"
63	grpcmd "google.golang.org/grpc/metadata"
64	"google.golang.org/grpc/status"
65)
66
67var (
68	config       Config
69	startOnce    allowUntilSuccess
70	mutexEnabled bool
71	logger       *log.Logger
72	// The functions below are stubbed to be overrideable for testing.
73	getProjectID     = gcemd.ProjectID
74	getInstanceName  = gcemd.InstanceName
75	getZone          = gcemd.Zone
76	startCPUProfile  = pprof.StartCPUProfile
77	stopCPUProfile   = pprof.StopCPUProfile
78	writeHeapProfile = pprof.WriteHeapProfile
79	sleep            = gax.Sleep
80	dialGRPC         = gtransport.DialPool
81	onGCE            = gcemd.OnGCE
82	serviceRegexp    = regexp.MustCompile(`^[a-z]([-a-z0-9_.]{0,253}[a-z0-9])?$`)
83
84	// For testing only.
85	// When the profiling loop has exited without error and this channel is
86	// non-nil, "true" will be sent to this channel.
87	profilingDone chan bool
88)
89
90const (
91	apiAddress       = "cloudprofiler.googleapis.com:443"
92	xGoogAPIMetadata = "x-goog-api-client"
93	zoneNameLabel    = "zone"
94	versionLabel     = "version"
95	languageLabel    = "language"
96	instanceLabel    = "instance"
97	scope            = "https://www.googleapis.com/auth/monitoring.write"
98
99	initialBackoff = time.Minute
100	// Ensure the agent will recover within 1 hour.
101	maxBackoff        = time.Hour
102	backoffMultiplier = 1.3 // Backoff envelope increases by this factor on each retry.
103	retryInfoMetadata = "google.rpc.retryinfo-bin"
104)
105
106// Config is the profiler configuration.
107type Config struct {
108	// Service must be provided to start the profiler. It specifies the name of
109	// the service under which the profiled data will be recorded and exposed at
110	// the Profiler UI for the project. You can specify an arbitrary string, but
111	// see Deployment.target at
112	// https://github.com/googleapis/googleapis/blob/master/google/devtools/cloudprofiler/v2/profiler.proto
113	// for restrictions. If the parameter is not set, the agent will probe
114	// GAE_SERVICE environment variable which is present in Google App Engine
115	// environment.
116	// NOTE: The string should be the same across different replicas of
117	// your service so that the globally constant profiling rate is
118	// maintained. Do not put things like PID or unique pod ID in the name.
119	Service string
120
121	// ServiceVersion is an optional field specifying the version of the
122	// service. It can be an arbitrary string. Profiler profiles
123	// once per minute for each version of each service in each zone.
124	// ServiceVersion defaults to GAE_VERSION environment variable if that is
125	// set, or to empty string otherwise.
126	ServiceVersion string
127
128	// DebugLogging enables detailed debug logging from profiler. It
129	// defaults to false.
130	DebugLogging bool
131
132	// MutexProfiling enables mutex profiling. It defaults to false.
133	// Note that mutex profiling is not supported by Go versions older
134	// than Go 1.8.
135	MutexProfiling bool
136
137	// When true, collecting the CPU profiles is disabled.
138	NoCPUProfiling bool
139
140	// When true, collecting the allocation profiles is disabled.
141	NoAllocProfiling bool
142
143	// AllocForceGC forces garbage collection before the collection of each heap
144	// profile collected to produce the allocation profile. This increases the
145	// accuracy of allocation profiling. It defaults to false.
146	AllocForceGC bool
147
148	// When true, collecting the heap profiles is disabled.
149	NoHeapProfiling bool
150
151	// When true, collecting the goroutine profiles is disabled.
152	NoGoroutineProfiling bool
153
154	// When true, the agent sends all telemetries via OpenCensus exporter, which
155	// can be viewed in Cloud Trace and Cloud Monitoring.
156	// Default is false.
157	EnableOCTelemetry bool
158
159	// ProjectID is the Cloud Console project ID to use instead of the one set by
160	// GOOGLE_CLOUD_PROJECT environment variable or read from the VM metadata
161	// server.
162	//
163	// Set this if you are running the agent in your local environment
164	// or anywhere else outside of Google Cloud Platform.
165	ProjectID string
166
167	// APIAddr is the HTTP endpoint to use to connect to the profiler
168	// agent API. Defaults to the production environment, overridable
169	// for testing.
170	APIAddr string
171
172	// Instance is the name of Compute Engine instance the profiler agent runs
173	// on. This is normally determined from the Compute Engine metadata server
174	// and doesn't need to be initialized. It needs to be set in rare cases where
175	// the metadata server is present but is flaky or otherwise misbehave.
176	Instance string
177
178	// Zone is the zone of Compute Engine instance the profiler agent runs
179	// on. This is normally determined from the Compute Engine metadata server
180	// and doesn't need to be initialized. It needs to be set in rare cases where
181	// the metadata server is present but is flaky or otherwise misbehave.
182	Zone string
183
184	// numProfiles is the number of profiles which should be collected before
185	// the profile collection loop exits.When numProfiles is 0, profiles will
186	// be collected for the duration of the program. For testing only.
187	numProfiles int
188}
189
190// allowUntilSuccess is an object that will perform action till
191// it succeeds once.
192// This is a modified form of Go's sync.Once
193type allowUntilSuccess struct {
194	m    sync.Mutex
195	done uint32
196}
197
198// do calls function f only if it hasnt returned nil previously.
199// Once f returns nil, do will not call function f any more.
200// This is a modified form of Go's sync.Once.Do
201func (o *allowUntilSuccess) do(f func() error) (err error) {
202	o.m.Lock()
203	defer o.m.Unlock()
204	if o.done == 0 {
205		if err = f(); err == nil {
206			o.done = 1
207		}
208	} else {
209		debugLog("profiler.Start() called again after it was previously called")
210		err = nil
211	}
212	return err
213}
214
215// Start starts a goroutine to collect and upload profiles. The
216// caller must provide the service string in the config. See
217// Config for details. Start should only be called once. Any
218// additional calls will be ignored.
219func Start(cfg Config, options ...option.ClientOption) error {
220	startError := startOnce.do(func() error {
221		return start(cfg, options...)
222	})
223	return startError
224}
225
226func start(cfg Config, options ...option.ClientOption) error {
227	logger = log.New(os.Stderr, "Cloud Profiler: ", log.LstdFlags)
228	if err := initializeConfig(cfg); err != nil {
229		debugLog("failed to initialize config: %v", err)
230		return err
231	}
232	if config.MutexProfiling {
233		if mutexEnabled = enableMutexProfiling(); !mutexEnabled {
234			return fmt.Errorf("mutex profiling is not supported by %s, requires Go 1.8 or later", runtime.Version())
235		}
236	}
237
238	ctx := context.Background()
239
240	opts := []option.ClientOption{
241		option.WithEndpoint(config.APIAddr),
242		option.WithScopes(scope),
243		option.WithUserAgent(fmt.Sprintf("gcloud-go-profiler/%s", version.Repo)),
244	}
245	if !config.EnableOCTelemetry {
246		opts = append(opts, option.WithTelemetryDisabled())
247	}
248	opts = append(opts, options...)
249
250	connPool, err := dialGRPC(ctx, opts...)
251	if err != nil {
252		debugLog("failed to dial GRPC: %v", err)
253		return err
254	}
255
256	a, err := initializeAgent(pb.NewProfilerServiceClient(connPool))
257	if err != nil {
258		debugLog("failed to start the profiling agent: %v", err)
259		return err
260	}
261	go pollProfilerService(withXGoogHeader(ctx), a)
262	return nil
263}
264
265func debugLog(format string, e ...interface{}) {
266	if config.DebugLogging {
267		logger.Printf(format, e...)
268	}
269}
270
271// agent polls the profiler server for instructions on behalf of a task,
272// and collects and uploads profiles as requested.
273type agent struct {
274	client        pb.ProfilerServiceClient
275	deployment    *pb.Deployment
276	profileLabels map[string]string
277	profileTypes  []pb.ProfileType
278}
279
280// abortedBackoffDuration retrieves the retry duration from gRPC trailing
281// metadata, which is set by the profiler server.
282func abortedBackoffDuration(md grpcmd.MD) (time.Duration, error) {
283	elem := md[retryInfoMetadata]
284	if len(elem) <= 0 {
285		return 0, errors.New("no retry info")
286	}
287
288	var retryInfo edpb.RetryInfo
289	if err := proto.Unmarshal([]byte(elem[0]), &retryInfo); err != nil {
290		return 0, err
291	} else if time, err := ptypes.Duration(retryInfo.RetryDelay); err != nil {
292		return 0, err
293	} else {
294		if time < 0 {
295			return 0, errors.New("negative retry duration")
296		}
297		return time, nil
298	}
299}
300
301type retryer struct {
302	backoff gax.Backoff
303	md      *grpcmd.MD
304}
305
306func (r *retryer) Retry(err error) (time.Duration, bool) {
307	st, _ := status.FromError(err)
308	if st != nil && st.Code() == codes.Aborted {
309		dur, err := abortedBackoffDuration(*r.md)
310		if err == nil {
311			return dur, true
312		}
313		debugLog("failed to get backoff duration: %v", err)
314	}
315	return r.backoff.Pause(), true
316}
317
318// createProfile talks to the profiler server to create profile. In
319// case of error, the goroutine will sleep and retry. Sleep duration may
320// be specified by the server. Otherwise it will be an exponentially
321// increasing value, bounded by maxBackoff. Special handling for
322// certificate errors is described below.
323func (a *agent) createProfile(ctx context.Context) *pb.Profile {
324	req := pb.CreateProfileRequest{
325		Parent:      "projects/" + a.deployment.ProjectId,
326		Deployment:  a.deployment,
327		ProfileType: a.profileTypes,
328	}
329
330	var p *pb.Profile
331	md := grpcmd.New(nil)
332
333	gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
334		debugLog("creating a new profile via profiler service")
335		var err error
336		p, err = a.client.CreateProfile(ctx, &req, grpc.Trailer(&md))
337		if err != nil {
338			debugLog("failed to create profile, will retry: %v", err)
339			if strings.Contains(err.Error(), "x509: certificate signed by unknown authority") {
340				// gax.Invoke does not retry missing certificate error. Force a retry by returning
341				// a different error. See https://github.com/googleapis/google-cloud-go/issues/3158.
342				err = errors.New("retry the certificate error")
343			}
344		}
345		return err
346	}, gax.WithRetry(func() gax.Retryer {
347		return &retryer{
348			backoff: gax.Backoff{
349				Initial:    initialBackoff,
350				Max:        maxBackoff,
351				Multiplier: backoffMultiplier,
352			},
353			md: &md,
354		}
355	}))
356
357	debugLog("successfully created profile %v", p.GetProfileType())
358	return p
359}
360
361func (a *agent) profileAndUpload(ctx context.Context, p *pb.Profile) {
362	var prof bytes.Buffer
363	pt := p.GetProfileType()
364
365	ptEnabled := false
366	for _, enabled := range a.profileTypes {
367		if enabled == pt {
368			ptEnabled = true
369			break
370		}
371	}
372
373	if !ptEnabled {
374		debugLog("skipping collection of disabled profile type: %v", pt)
375		return
376	}
377
378	switch pt {
379	case pb.ProfileType_CPU:
380		duration, err := ptypes.Duration(p.Duration)
381		if err != nil {
382			debugLog("failed to get profile duration for CPU profile: %v", err)
383			return
384		}
385		if err := startCPUProfile(&prof); err != nil {
386			debugLog("failed to start CPU profile: %v", err)
387			return
388		}
389		sleep(ctx, duration)
390		stopCPUProfile()
391	case pb.ProfileType_HEAP:
392		if err := heapProfile(&prof); err != nil {
393			debugLog("failed to write heap profile: %v", err)
394			return
395		}
396	case pb.ProfileType_HEAP_ALLOC:
397		duration, err := ptypes.Duration(p.Duration)
398		if err != nil {
399			debugLog("failed to get profile duration for allocation profile: %v", err)
400			return
401		}
402		if err := deltaAllocProfile(ctx, duration, config.AllocForceGC, &prof); err != nil {
403			debugLog("failed to collect allocation profile: %v", err)
404			return
405		}
406	case pb.ProfileType_THREADS:
407		if err := pprof.Lookup("goroutine").WriteTo(&prof, 0); err != nil {
408			debugLog("failed to collect goroutine profile: %v", err)
409			return
410		}
411	case pb.ProfileType_CONTENTION:
412		duration, err := ptypes.Duration(p.Duration)
413		if err != nil {
414			debugLog("failed to get profile duration: %v", err)
415			return
416		}
417		if err := deltaMutexProfile(ctx, duration, &prof); err != nil {
418			debugLog("failed to collect mutex profile: %v", err)
419			return
420		}
421	default:
422		debugLog("unexpected profile type: %v", pt)
423		return
424	}
425
426	p.ProfileBytes = prof.Bytes()
427	p.Labels = a.profileLabels
428	req := pb.UpdateProfileRequest{Profile: p}
429
430	// Upload profile, discard profile in case of error.
431	debugLog("start uploading profile")
432	if _, err := a.client.UpdateProfile(ctx, &req); err != nil {
433		debugLog("failed to upload profile: %v", err)
434	}
435}
436
437// deltaMutexProfile writes mutex profile changes over a time period specified
438// with 'duration' to 'prof'.
439func deltaMutexProfile(ctx context.Context, duration time.Duration, prof *bytes.Buffer) error {
440	if !mutexEnabled {
441		return errors.New("mutex profiling is not enabled")
442	}
443	p0, err := mutexProfile()
444	if err != nil {
445		return err
446	}
447	sleep(ctx, duration)
448	p, err := mutexProfile()
449	if err != nil {
450		return err
451	}
452
453	p0.Scale(-1)
454	p, err = profile.Merge([]*profile.Profile{p0, p})
455	if err != nil {
456		return err
457	}
458
459	return p.Write(prof)
460}
461
462func mutexProfile() (*profile.Profile, error) {
463	p := pprof.Lookup("mutex")
464	if p == nil {
465		return nil, errors.New("mutex profiling is not supported")
466	}
467	var buf bytes.Buffer
468	if err := p.WriteTo(&buf, 0); err != nil {
469		return nil, err
470	}
471	return profile.Parse(&buf)
472}
473
474// withXGoogHeader sets the name and version of the application in
475// the `x-goog-api-client` header passed on each request. Intended for
476// use by Google-written clients.
477func withXGoogHeader(ctx context.Context, keyval ...string) context.Context {
478	kv := append([]string{"gl-go", version.Go(), "gccl", version.Repo}, keyval...)
479	kv = append(kv, "gax", gax.Version, "grpc", grpc.Version)
480
481	md, _ := grpcmd.FromOutgoingContext(ctx)
482	md = md.Copy()
483	md[xGoogAPIMetadata] = []string{gax.XGoogHeader(kv...)}
484	return grpcmd.NewOutgoingContext(ctx, md)
485}
486
487// initializeAgent initializes the profiling agent. It returns an error if
488// profile collection should not be started because collection is disabled
489// for all profile types.
490func initializeAgent(c pb.ProfilerServiceClient) (*agent, error) {
491	labels := map[string]string{languageLabel: "go"}
492	if config.Zone != "" {
493		labels[zoneNameLabel] = config.Zone
494	}
495	if config.ServiceVersion != "" {
496		labels[versionLabel] = config.ServiceVersion
497	}
498	d := &pb.Deployment{
499		ProjectId: config.ProjectID,
500		Target:    config.Service,
501		Labels:    labels,
502	}
503
504	profileLabels := map[string]string{}
505
506	if config.Instance != "" {
507		profileLabels[instanceLabel] = config.Instance
508	}
509
510	var profileTypes []pb.ProfileType
511	if !config.NoCPUProfiling {
512		profileTypes = append(profileTypes, pb.ProfileType_CPU)
513	}
514	if !config.NoHeapProfiling {
515		profileTypes = append(profileTypes, pb.ProfileType_HEAP)
516	}
517	if !config.NoGoroutineProfiling {
518		profileTypes = append(profileTypes, pb.ProfileType_THREADS)
519	}
520	if !config.NoAllocProfiling {
521		profileTypes = append(profileTypes, pb.ProfileType_HEAP_ALLOC)
522	}
523	if mutexEnabled {
524		profileTypes = append(profileTypes, pb.ProfileType_CONTENTION)
525	}
526
527	if len(profileTypes) == 0 {
528		return nil, fmt.Errorf("collection is not enabled for any profile types")
529	}
530
531	return &agent{
532		client:        c,
533		deployment:    d,
534		profileLabels: profileLabels,
535		profileTypes:  profileTypes,
536	}, nil
537}
538
539func initializeConfig(cfg Config) error {
540	config = cfg
541
542	if config.Service == "" {
543		for _, ev := range []string{"GAE_SERVICE", "K_SERVICE"} {
544			if val := os.Getenv(ev); val != "" {
545				config.Service = val
546				break
547			}
548		}
549	}
550	if config.Service == "" {
551		return errors.New("service name must be configured")
552	}
553	if !serviceRegexp.MatchString(config.Service) {
554		return fmt.Errorf("service name %q does not match regular expression %v", config.Service, serviceRegexp)
555	}
556
557	if config.ServiceVersion == "" {
558		for _, ev := range []string{"GAE_VERSION", "K_REVISION"} {
559			if val := os.Getenv(ev); val != "" {
560				config.ServiceVersion = val
561				break
562			}
563		}
564	}
565
566	if projectID := os.Getenv("GOOGLE_CLOUD_PROJECT"); config.ProjectID == "" && projectID != "" {
567		// Cloud Shell and App Engine set this environment variable to the project
568		// ID, so use it if present. In case of App Engine the project ID is also
569		// available from the GCE metadata server, but by using the environment
570		// variable saves one request to the metadata server. The environment
571		// project ID is only used if no project ID is provided in the
572		// configuration.
573		config.ProjectID = projectID
574	}
575	if onGCE() {
576		var err error
577		if config.ProjectID == "" {
578			if config.ProjectID, err = getProjectID(); err != nil {
579				return fmt.Errorf("failed to get the project ID from Compute Engine metadata: %v", err)
580			}
581		}
582
583		if config.Zone == "" {
584			if config.Zone, err = getZone(); err != nil {
585				return fmt.Errorf("failed to get zone from Compute Engine metadata: %v", err)
586			}
587		}
588
589		if config.Instance == "" {
590			if config.Instance, err = getInstanceName(); err != nil {
591				if _, ok := err.(gcemd.NotDefinedError); !ok {
592					return fmt.Errorf("failed to get instance name from Compute Engine metadata: %v", err)
593				}
594				debugLog("failed to get instance name from Compute Engine metadata, will use empty name: %v", err)
595			}
596		}
597	} else {
598		if config.ProjectID == "" {
599			return fmt.Errorf("project ID must be specified in the configuration if running outside of GCP")
600		}
601	}
602
603	if config.APIAddr == "" {
604		config.APIAddr = apiAddress
605	}
606	return nil
607}
608
609// pollProfilerService starts an endless loop to poll the profiler
610// server for instructions, and collects and uploads profiles as
611// requested.
612func pollProfilerService(ctx context.Context, a *agent) {
613	debugLog("Cloud Profiler Go Agent version: %s", version.Repo)
614	debugLog("profiler has started")
615	for i := 0; config.numProfiles == 0 || i < config.numProfiles; i++ {
616		p := a.createProfile(ctx)
617		a.profileAndUpload(ctx, p)
618	}
619
620	if profilingDone != nil {
621		profilingDone <- true
622	}
623}
624