1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package profiler
16
17import (
18	"bytes"
19	"compress/gzip"
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"log"
25	"math/rand"
26	"os"
27	"runtime"
28	"strings"
29	"sync"
30	"testing"
31	"time"
32
33	gcemd "cloud.google.com/go/compute/metadata"
34	"cloud.google.com/go/internal/testutil"
35	"cloud.google.com/go/profiler/mocks"
36	"cloud.google.com/go/profiler/testdata"
37	"github.com/golang/mock/gomock"
38	"github.com/golang/protobuf/proto"
39	"github.com/golang/protobuf/ptypes"
40	"github.com/google/pprof/profile"
41	gax "github.com/googleapis/gax-go/v2"
42	"google.golang.org/api/option"
43	gtransport "google.golang.org/api/transport/grpc"
44	pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
45	edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
46	"google.golang.org/grpc"
47	"google.golang.org/grpc/codes"
48	grpcmd "google.golang.org/grpc/metadata"
49	"google.golang.org/grpc/status"
50)
51
52const (
53	testProjectID                = "test-project-ID"
54	testInstance                 = "test-instance"
55	testZone                     = "test-zone"
56	testService                  = "test-service"
57	testSvcVersion               = "test-service-version"
58	testProfileDuration          = time.Second * 10
59	testProfileCollectionTimeout = time.Second * 15
60)
61
62func createTestDeployment() *pb.Deployment {
63	labels := map[string]string{
64		zoneNameLabel: testZone,
65		versionLabel:  testSvcVersion,
66	}
67	return &pb.Deployment{
68		ProjectId: testProjectID,
69		Target:    testService,
70		Labels:    labels,
71	}
72}
73
74func createTestAgent(psc pb.ProfilerServiceClient) *agent {
75	return &agent{
76		client:        psc,
77		deployment:    createTestDeployment(),
78		profileLabels: map[string]string{instanceLabel: testInstance},
79		profileTypes:  []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_HEAP_ALLOC, pb.ProfileType_THREADS},
80	}
81}
82
83func createTrailers(dur time.Duration) map[string]string {
84	b, _ := proto.Marshal(&edpb.RetryInfo{
85		RetryDelay: ptypes.DurationProto(dur),
86	})
87	return map[string]string{
88		retryInfoMetadata: string(b),
89	}
90}
91
92func TestCreateProfile(t *testing.T) {
93	ctx := context.Background()
94	ctrl := gomock.NewController(t)
95	defer ctrl.Finish()
96	mpc := mocks.NewMockProfilerServiceClient(ctrl)
97	a := createTestAgent(mpc)
98	p := &pb.Profile{Name: "test_profile"}
99	wantRequest := pb.CreateProfileRequest{
100		Parent:      "projects/" + a.deployment.ProjectId,
101		Deployment:  a.deployment,
102		ProfileType: a.profileTypes,
103	}
104
105	mpc.EXPECT().CreateProfile(ctx, gomock.Eq(&wantRequest), gomock.Any()).Times(1).Return(p, nil)
106
107	gotP := a.createProfile(ctx)
108
109	if !testutil.Equal(gotP, p) {
110		t.Errorf("CreateProfile() got wrong profile, got %v, want %v", gotP, p)
111	}
112}
113
114func TestProfileAndUpload(t *testing.T) {
115	oldStartCPUProfile, oldStopCPUProfile, oldWriteHeapProfile, oldSleep := startCPUProfile, stopCPUProfile, writeHeapProfile, sleep
116	defer func() {
117		startCPUProfile, stopCPUProfile, writeHeapProfile, sleep = oldStartCPUProfile, oldStopCPUProfile, oldWriteHeapProfile, oldSleep
118	}()
119
120	ctx := context.Background()
121	ctrl := gomock.NewController(t)
122	defer ctrl.Finish()
123
124	var heapCollected1, heapCollected2, heapUploaded, allocUploaded bytes.Buffer
125	testdata.HeapProfileCollected1.Write(&heapCollected1)
126	testdata.HeapProfileCollected2.Write(&heapCollected2)
127	testdata.HeapProfileUploaded.Write(&heapUploaded)
128	testdata.AllocProfileUploaded.Write(&allocUploaded)
129	callCount := 0
130	writeTwoHeapFunc := func(w io.Writer) error {
131		callCount++
132		if callCount%2 == 1 {
133			w.Write(heapCollected1.Bytes())
134			return nil
135		}
136		w.Write(heapCollected2.Bytes())
137		return nil
138	}
139
140	errFunc := func(io.Writer) error { return errors.New("") }
141	testDuration := time.Second * 5
142	tests := []struct {
143		profileType           pb.ProfileType
144		duration              *time.Duration
145		startCPUProfileFunc   func(io.Writer) error
146		writeHeapProfileFunc  func(io.Writer) error
147		deltaMutexProfileFunc func(io.Writer) error
148		wantBytes             []byte
149	}{
150		{
151			profileType: pb.ProfileType_CPU,
152			duration:    &testDuration,
153			startCPUProfileFunc: func(w io.Writer) error {
154				w.Write([]byte{1})
155				return nil
156			},
157			writeHeapProfileFunc: errFunc,
158			wantBytes:            []byte{1},
159		},
160		{
161			profileType:          pb.ProfileType_CPU,
162			startCPUProfileFunc:  errFunc,
163			writeHeapProfileFunc: errFunc,
164		},
165		{
166			profileType: pb.ProfileType_CPU,
167			duration:    &testDuration,
168			startCPUProfileFunc: func(w io.Writer) error {
169				w.Write([]byte{2})
170				return nil
171			},
172			writeHeapProfileFunc: func(w io.Writer) error {
173				w.Write([]byte{3})
174				return nil
175			},
176			wantBytes: []byte{2},
177		},
178		{
179			profileType:         pb.ProfileType_HEAP,
180			startCPUProfileFunc: errFunc,
181			writeHeapProfileFunc: func(w io.Writer) error {
182				w.Write(heapCollected1.Bytes())
183				return nil
184			},
185			wantBytes: heapUploaded.Bytes(),
186		},
187		{
188			profileType:          pb.ProfileType_HEAP_ALLOC,
189			startCPUProfileFunc:  errFunc,
190			writeHeapProfileFunc: writeTwoHeapFunc,
191			duration:             &testDuration,
192			wantBytes:            allocUploaded.Bytes(),
193		},
194		{
195			profileType:          pb.ProfileType_HEAP,
196			startCPUProfileFunc:  errFunc,
197			writeHeapProfileFunc: errFunc,
198		},
199		{
200			profileType: pb.ProfileType_HEAP,
201			startCPUProfileFunc: func(w io.Writer) error {
202				w.Write([]byte{5})
203				return nil
204			},
205			writeHeapProfileFunc: func(w io.Writer) error {
206				w.Write(heapCollected1.Bytes())
207				return nil
208			},
209			wantBytes: heapUploaded.Bytes(),
210		},
211		{
212			profileType: pb.ProfileType_PROFILE_TYPE_UNSPECIFIED,
213			startCPUProfileFunc: func(w io.Writer) error {
214				w.Write([]byte{7})
215				return nil
216			},
217			writeHeapProfileFunc: func(w io.Writer) error {
218				w.Write(heapCollected1.Bytes())
219				return nil
220			},
221		},
222		{
223			profileType:           pb.ProfileType_CONTENTION,
224			deltaMutexProfileFunc: errFunc,
225		},
226	}
227
228	for _, tt := range tests {
229		mpc := mocks.NewMockProfilerServiceClient(ctrl)
230		a := createTestAgent(mpc)
231		startCPUProfile = tt.startCPUProfileFunc
232		stopCPUProfile = func() {}
233		writeHeapProfile = tt.writeHeapProfileFunc
234		var gotSleep *time.Duration
235		sleep = func(ctx context.Context, d time.Duration) error {
236			gotSleep = &d
237			return nil
238		}
239		p := &pb.Profile{ProfileType: tt.profileType}
240		if tt.duration != nil {
241			p.Duration = ptypes.DurationProto(*tt.duration)
242		}
243		if tt.wantBytes != nil {
244			wantProfile := &pb.Profile{
245				ProfileType:  p.ProfileType,
246				Duration:     p.Duration,
247				ProfileBytes: tt.wantBytes,
248				Labels:       a.profileLabels,
249			}
250			wantRequest := pb.UpdateProfileRequest{
251				Profile: wantProfile,
252			}
253			mpc.EXPECT().UpdateProfile(ctx, gomock.Eq(&wantRequest)).Times(1)
254		} else {
255			mpc.EXPECT().UpdateProfile(gomock.Any(), gomock.Any()).MaxTimes(0)
256		}
257
258		a.profileAndUpload(ctx, p)
259
260		if tt.duration == nil {
261			if gotSleep != nil {
262				t.Errorf("profileAndUpload(%v) slept for: %v, want no sleep", p, gotSleep)
263			}
264		} else {
265			if gotSleep == nil {
266				t.Errorf("profileAndUpload(%v) didn't sleep, want sleep for: %v", p, tt.duration)
267			} else if *gotSleep != *tt.duration {
268				t.Errorf("profileAndUpload(%v) slept for wrong duration, got: %v, want: %v", p, gotSleep, tt.duration)
269			}
270		}
271	}
272}
273
274func TestRetry(t *testing.T) {
275	normalDuration := time.Second * 3
276	negativeDuration := time.Second * -3
277
278	tests := []struct {
279		trailers  map[string]string
280		wantPause *time.Duration
281	}{
282		{
283			createTrailers(normalDuration),
284			&normalDuration,
285		},
286		{
287			createTrailers(negativeDuration),
288			nil,
289		},
290		{
291			map[string]string{retryInfoMetadata: "wrong format"},
292			nil,
293		},
294		{
295			map[string]string{},
296			nil,
297		},
298	}
299
300	for _, tt := range tests {
301		md := grpcmd.New(tt.trailers)
302		r := &retryer{
303			backoff: gax.Backoff{
304				Initial:    initialBackoff,
305				Max:        maxBackoff,
306				Multiplier: backoffMultiplier,
307			},
308			md: &md,
309		}
310
311		pause, shouldRetry := r.Retry(status.Error(codes.Aborted, ""))
312
313		if !shouldRetry {
314			t.Error("retryer.Retry() returned shouldRetry false, want true")
315		}
316
317		if tt.wantPause != nil {
318			if pause != *tt.wantPause {
319				t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: %v", pause, tt.wantPause)
320			}
321		} else {
322			if pause > initialBackoff {
323				t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: < %v", pause, initialBackoff)
324			}
325		}
326	}
327
328	md := grpcmd.New(nil)
329	r := &retryer{
330		backoff: gax.Backoff{
331			Initial:    initialBackoff,
332			Max:        maxBackoff,
333			Multiplier: backoffMultiplier,
334		},
335		md: &md,
336	}
337	for i := 0; i < 100; i++ {
338		pause, shouldRetry := r.Retry(errors.New(""))
339		if !shouldRetry {
340			t.Errorf("retryer.Retry() called %v times, returned shouldRetry false, want true", i)
341		}
342		if pause > maxBackoff {
343			t.Errorf("retryer.Retry() called %v times, returned wrong pause, got: %v, want: < %v", i, pause, maxBackoff)
344		}
345	}
346}
347
348func TestWithXGoogHeader(t *testing.T) {
349	ctx := withXGoogHeader(context.Background())
350	md, _ := grpcmd.FromOutgoingContext(ctx)
351
352	if xg := md[xGoogAPIMetadata]; len(xg) == 0 {
353		t.Errorf("withXGoogHeader() sets empty xGoogHeader")
354	} else {
355		if !strings.Contains(xg[0], "gl-go/") {
356			t.Errorf("withXGoogHeader() got: %v, want gl-go key", xg[0])
357		}
358		if !strings.Contains(xg[0], "gccl/") {
359			t.Errorf("withXGoogHeader() got: %v, want gccl key", xg[0])
360		}
361		if !strings.Contains(xg[0], "gax/") {
362			t.Errorf("withXGoogHeader() got: %v, want gax key", xg[0])
363		}
364		if !strings.Contains(xg[0], "grpc/") {
365			t.Errorf("withXGoogHeader() got: %v, want grpc key", xg[0])
366		}
367	}
368}
369
370func TestInitializeAgent(t *testing.T) {
371	oldConfig, oldMutexEnabled := config, mutexEnabled
372	defer func() {
373		config, mutexEnabled = oldConfig, oldMutexEnabled
374	}()
375
376	for _, tt := range []struct {
377		config               Config
378		enableMutex          bool
379		wantErr              bool
380		wantProfileTypes     []pb.ProfileType
381		wantDeploymentLabels map[string]string
382		wantProfileLabels    map[string]string
383	}{
384		{
385			config:               Config{ServiceVersion: testSvcVersion, Zone: testZone},
386			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
387			wantDeploymentLabels: map[string]string{zoneNameLabel: testZone, versionLabel: testSvcVersion, languageLabel: "go"},
388			wantProfileLabels:    map[string]string{},
389		},
390		{
391			config:               Config{Zone: testZone},
392			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
393			wantDeploymentLabels: map[string]string{zoneNameLabel: testZone, languageLabel: "go"},
394			wantProfileLabels:    map[string]string{},
395		},
396		{
397			config:               Config{ServiceVersion: testSvcVersion},
398			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
399			wantDeploymentLabels: map[string]string{versionLabel: testSvcVersion, languageLabel: "go"},
400			wantProfileLabels:    map[string]string{},
401		},
402		{
403			config:               Config{Instance: testInstance},
404			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
405			wantDeploymentLabels: map[string]string{languageLabel: "go"},
406			wantProfileLabels:    map[string]string{instanceLabel: testInstance},
407		},
408		{
409			config:               Config{Instance: testInstance},
410			enableMutex:          true,
411			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC, pb.ProfileType_CONTENTION},
412			wantDeploymentLabels: map[string]string{languageLabel: "go"},
413			wantProfileLabels:    map[string]string{instanceLabel: testInstance},
414		},
415		{
416			config:               Config{NoHeapProfiling: true},
417			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
418			wantDeploymentLabels: map[string]string{languageLabel: "go"},
419			wantProfileLabels:    map[string]string{},
420		},
421		{
422			config:               Config{NoHeapProfiling: true, NoGoroutineProfiling: true, NoAllocProfiling: true},
423			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_CPU},
424			wantDeploymentLabels: map[string]string{languageLabel: "go"},
425			wantProfileLabels:    map[string]string{},
426		},
427		{
428			config:               Config{NoCPUProfiling: true},
429			wantProfileTypes:     []pb.ProfileType{pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
430			wantDeploymentLabels: map[string]string{languageLabel: "go"},
431			wantProfileLabels:    map[string]string{},
432		},
433		{
434			config:  Config{NoCPUProfiling: true, NoHeapProfiling: true, NoGoroutineProfiling: true, NoAllocProfiling: true},
435			wantErr: true,
436		},
437	} {
438
439		config = tt.config
440		config.ProjectID = testProjectID
441		config.Service = testService
442		mutexEnabled = tt.enableMutex
443		a, err := initializeAgent(nil)
444		if err != nil {
445			if !tt.wantErr {
446				t.Fatalf("initializeAgent() got error: %v, want no error", err)
447			}
448			continue
449		}
450
451		wantDeployment := &pb.Deployment{
452			ProjectId: testProjectID,
453			Target:    testService,
454			Labels:    tt.wantDeploymentLabels,
455		}
456		if !testutil.Equal(a.deployment, wantDeployment) {
457			t.Errorf("initializeAgent() got deployment: %v, want %v", a.deployment, wantDeployment)
458		}
459		if !testutil.Equal(a.profileLabels, tt.wantProfileLabels) {
460			t.Errorf("initializeAgent() got profile labels: %v, want %v", a.profileLabels, tt.wantProfileLabels)
461		}
462		if !testutil.Equal(a.profileTypes, tt.wantProfileTypes) {
463			t.Errorf("initializeAgent() got profile types: %v, want %v", a.profileTypes, tt.wantProfileTypes)
464		}
465	}
466}
467
468func TestInitializeConfig(t *testing.T) {
469	oldConfig, oldGAEService, oldGAEVersion, oldKnativeService, oldKnativeVersion, oldEnvProjectID, oldGetProjectID, oldGetInstanceName, oldGetZone, oldOnGCE := config, os.Getenv("GAE_SERVICE"), os.Getenv("GAE_VERSION"), os.Getenv("K_SERVICE"), os.Getenv("K_REVISION"), os.Getenv("GOOGLE_CLOUD_PROJECT"), getProjectID, getInstanceName, getZone, onGCE
470	defer func() {
471		config, getProjectID, getInstanceName, getZone, onGCE = oldConfig, oldGetProjectID, oldGetInstanceName, oldGetZone, oldOnGCE
472		if err := os.Setenv("GAE_SERVICE", oldGAEService); err != nil {
473			t.Fatal(err)
474		}
475		if err := os.Setenv("GAE_VERSION", oldGAEVersion); err != nil {
476			t.Fatal(err)
477		}
478		if err := os.Setenv("K_SERVICE", oldKnativeService); err != nil {
479			t.Fatal(err)
480		}
481		if err := os.Setenv("K_REVISION", oldKnativeVersion); err != nil {
482			t.Fatal(err)
483		}
484		if err := os.Setenv("GOOGLE_CLOUD_PROJECT", oldEnvProjectID); err != nil {
485			t.Fatal(err)
486		}
487	}()
488	const (
489		testGAEService     = "test-gae-service"
490		testGAEVersion     = "test-gae-version"
491		testKnativeService = "test-knative-service"
492		testKnativeVersion = "test-knative-version"
493		testGCEProjectID   = "test-gce-project-id"
494		testEnvProjectID   = "test-env-project-id"
495	)
496	for _, tt := range []struct {
497		desc            string
498		config          Config
499		wantConfig      Config
500		wantErrorString string
501		onGAE           bool
502		onKnative       bool
503		onGCE           bool
504		envProjectID    bool
505	}{
506		{
507			"accepts service name",
508			Config{Service: testService},
509			Config{Service: testService, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
510			"",
511			false,
512			false,
513			true,
514			false,
515		},
516		{
517			"env project overrides GCE project",
518			Config{Service: testService},
519			Config{Service: testService, ProjectID: testEnvProjectID, Zone: testZone, Instance: testInstance},
520			"",
521			false,
522			false,
523			true,
524			true,
525		},
526		{
527			"requires service name",
528			Config{},
529			Config{},
530			"service name must be configured",
531			false,
532			false,
533			true,
534			false,
535		},
536		{
537			"requires valid service name",
538			Config{Service: "Service"},
539			Config{Service: "Service"},
540			"service name \"Service\" does not match regular expression ^[a-z]([-a-z0-9_.]{0,253}[a-z0-9])?$",
541			false,
542			false,
543			true,
544			false,
545		},
546		{
547			"accepts service name from config and service version from GAE",
548			Config{Service: testService},
549			Config{Service: testService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
550			"",
551			true,
552			false,
553			true,
554			false,
555		},
556		{
557			"reads both service name and version from GAE env vars",
558			Config{},
559			Config{Service: testGAEService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
560			"",
561			true,
562			false,
563			true,
564			false,
565		},
566		{
567			"reads both service name and version from Knative env vars",
568			Config{},
569			Config{Service: testKnativeService, ServiceVersion: testKnativeVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
570			"",
571			false,
572			true,
573			true,
574			false,
575		},
576		{
577			"accepts service version from config",
578			Config{Service: testService, ServiceVersion: testSvcVersion},
579			Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
580			"",
581			false,
582			false,
583			true,
584			false,
585		},
586		{
587			"configured version has priority over GAE-provided version",
588			Config{Service: testService, ServiceVersion: testSvcVersion},
589			Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
590			"",
591			true,
592			false,
593			true,
594			false,
595		},
596		{
597			"configured version has priority over Knative-provided version",
598			Config{Service: testService, ServiceVersion: testSvcVersion},
599			Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
600			"",
601			false,
602			true,
603			true,
604			false,
605		},
606		{
607			"GAE version has priority over Knative-provided version",
608			Config{},
609			Config{Service: testGAEService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
610			"",
611			true,
612			true,
613			true,
614			false,
615		},
616		{
617			"configured project ID has priority over metadata-provided project ID",
618			Config{Service: testService, ProjectID: testProjectID},
619			Config{Service: testService, ProjectID: testProjectID, Zone: testZone, Instance: testInstance},
620			"",
621			false,
622			false,
623			true,
624			false,
625		},
626		{
627			"configured project ID has priority over environment project ID",
628			Config{Service: testService, ProjectID: testProjectID},
629			Config{Service: testService, ProjectID: testProjectID},
630			"",
631			false,
632			false,
633			false,
634			true,
635		},
636		{
637			"requires project ID if not on GCE",
638			Config{Service: testService},
639			Config{Service: testService},
640			"project ID must be specified in the configuration if running outside of GCP",
641			false,
642			false,
643			false,
644			false,
645		},
646		{
647			"configured zone has priority over metadata-provided zone",
648			Config{Service: testService, ProjectID: testProjectID, Zone: testZone + "-override"},
649			Config{Service: testService, ProjectID: testProjectID, Zone: testZone + "-override", Instance: testInstance},
650			"",
651			false,
652			false,
653			true,
654			false,
655		},
656		{
657			"configured instance has priority over metadata-provided instance",
658			Config{Service: testService, ProjectID: testProjectID, Instance: testInstance + "-override"},
659			Config{Service: testService, ProjectID: testProjectID, Zone: testZone, Instance: testInstance + "-override"},
660			"",
661			false,
662			false,
663			true,
664			false,
665		},
666	} {
667		t.Logf("Running test: %s", tt.desc)
668		gaeEnvService, gaeEnvVersion := "", ""
669		if tt.onGAE {
670			gaeEnvService, gaeEnvVersion = testGAEService, testGAEVersion
671		}
672		if err := os.Setenv("GAE_SERVICE", gaeEnvService); err != nil {
673			t.Fatal(err)
674		}
675		if err := os.Setenv("GAE_VERSION", gaeEnvVersion); err != nil {
676			t.Fatal(err)
677		}
678		knEnvService, knEnvVersion := "", ""
679		if tt.onKnative {
680			knEnvService, knEnvVersion = testKnativeService, testKnativeVersion
681		}
682		if err := os.Setenv("K_SERVICE", knEnvService); err != nil {
683			t.Fatal(err)
684		}
685		if err := os.Setenv("K_REVISION", knEnvVersion); err != nil {
686			t.Fatal(err)
687		}
688		if tt.onGCE {
689			onGCE = func() bool { return true }
690			getProjectID = func() (string, error) { return testGCEProjectID, nil }
691			getZone = func() (string, error) { return testZone, nil }
692			getInstanceName = func() (string, error) { return testInstance, nil }
693		} else {
694			onGCE = func() bool { return false }
695			getProjectID = func() (string, error) { return "", fmt.Errorf("test get project id error") }
696			getZone = func() (string, error) { return "", fmt.Errorf("test get zone error") }
697			getInstanceName = func() (string, error) { return "", fmt.Errorf("test get instance error") }
698		}
699		envProjectID := ""
700		if tt.envProjectID {
701			envProjectID = testEnvProjectID
702		}
703		if err := os.Setenv("GOOGLE_CLOUD_PROJECT", envProjectID); err != nil {
704			t.Fatal(err)
705		}
706
707		errorString := ""
708		if err := initializeConfig(tt.config); err != nil {
709			errorString = err.Error()
710		}
711
712		if !strings.Contains(errorString, tt.wantErrorString) {
713			t.Errorf("initializeConfig(%v) got error: %v, want contain %v", tt.config, errorString, tt.wantErrorString)
714		}
715		if tt.wantErrorString == "" {
716			tt.wantConfig.APIAddr = apiAddress
717		}
718		if config != tt.wantConfig {
719			t.Errorf("initializeConfig(%v) got: %v, want %v", tt.config, config, tt.wantConfig)
720		}
721	}
722
723	for _, tt := range []struct {
724		desc              string
725		wantErr           bool
726		getProjectIDError error
727		getZoneError      error
728		getInstanceError  error
729	}{
730		{
731			desc:              "metadata returns error for project ID",
732			wantErr:           true,
733			getProjectIDError: errors.New("fake get project ID error"),
734		},
735		{
736			desc:         "metadata returns error for zone",
737			wantErr:      true,
738			getZoneError: errors.New("fake get zone error"),
739		},
740		{
741			desc:             "metadata returns error for instance",
742			wantErr:          true,
743			getInstanceError: errors.New("fake get instance error"),
744		},
745		{
746			desc:             "metadata returns NotDefinedError for instance",
747			getInstanceError: gcemd.NotDefinedError("fake GCE metadata NotDefinedError error"),
748		},
749	} {
750		onGCE = func() bool { return true }
751		getProjectID = func() (string, error) { return testGCEProjectID, tt.getProjectIDError }
752		getZone = func() (string, error) { return testZone, tt.getZoneError }
753		getInstanceName = func() (string, error) { return testInstance, tt.getInstanceError }
754
755		if err := initializeConfig(Config{Service: testService}); (err != nil) != tt.wantErr {
756			t.Errorf("%s: initializeConfig() got error: %v, want error %t", tt.desc, err, tt.wantErr)
757		}
758	}
759}
760
761type fakeProfilerServer struct {
762	count       int
763	gotProfiles map[string][]byte
764}
765
766func (fs *fakeProfilerServer) CreateProfile(ctx context.Context, in *pb.CreateProfileRequest) (*pb.Profile, error) {
767	fs.count++
768	switch fs.count % 2 {
769	case 1:
770		return &pb.Profile{Name: "testCPU", ProfileType: pb.ProfileType_CPU, Duration: ptypes.DurationProto(testProfileDuration)}, nil
771	default:
772		return &pb.Profile{Name: "testHeap", ProfileType: pb.ProfileType_HEAP}, nil
773	}
774}
775
776func (fs *fakeProfilerServer) UpdateProfile(ctx context.Context, in *pb.UpdateProfileRequest) (*pb.Profile, error) {
777	switch in.Profile.ProfileType {
778	case pb.ProfileType_CPU:
779		fs.gotProfiles["CPU"] = in.Profile.ProfileBytes
780	case pb.ProfileType_HEAP:
781		fs.gotProfiles["HEAP"] = in.Profile.ProfileBytes
782	}
783	return in.Profile, nil
784}
785
786func (fs *fakeProfilerServer) CreateOfflineProfile(_ context.Context, _ *pb.CreateOfflineProfileRequest) (*pb.Profile, error) {
787	return nil, status.Error(codes.Unimplemented, "")
788}
789
790func profileeLoop(quit chan bool) {
791	data := make([]byte, 10*1024*1024)
792	rand.Read(data)
793	for {
794		select {
795		case <-quit:
796			return
797		default:
798			profileeWork(data)
799		}
800	}
801}
802
803func profileeWork(data []byte) {
804	var b bytes.Buffer
805	gz := gzip.NewWriter(&b)
806	if _, err := gz.Write(data); err != nil {
807		log.Println("failed to write to gzip stream", err)
808		return
809	}
810	if err := gz.Flush(); err != nil {
811		log.Println("failed to flush to gzip stream", err)
812		return
813	}
814	if err := gz.Close(); err != nil {
815		log.Println("failed to close gzip stream", err)
816	}
817}
818
819func validateProfile(rawData []byte, wantFunctionName string) error {
820	p, err := profile.ParseData(rawData)
821	if err != nil {
822		return fmt.Errorf("ParseData failed: %v", err)
823	}
824
825	if len(p.Sample) == 0 {
826		return fmt.Errorf("profile contains zero samples: %v", p)
827	}
828
829	if len(p.Location) == 0 {
830		return fmt.Errorf("profile contains zero locations: %v", p)
831	}
832
833	if len(p.Function) == 0 {
834		return fmt.Errorf("profile contains zero functions: %v", p)
835	}
836
837	for _, l := range p.Location {
838		if len(l.Line) > 0 && l.Line[0].Function != nil && strings.Contains(l.Line[0].Function.Name, wantFunctionName) {
839			return nil
840		}
841	}
842	return fmt.Errorf("wanted function name %s not found in the profile", wantFunctionName)
843}
844
845func TestDeltaMutexProfile(t *testing.T) {
846	oldMutexEnabled, oldMaxProcs := mutexEnabled, runtime.GOMAXPROCS(10)
847	defer func() {
848		mutexEnabled = oldMutexEnabled
849		runtime.GOMAXPROCS(oldMaxProcs)
850	}()
851	if mutexEnabled = enableMutexProfiling(); !mutexEnabled {
852		t.Skip("Go too old - mutex profiling not supported.")
853	}
854
855	hog(time.Second, mutexHog)
856	go func() {
857		hog(2*time.Second, backgroundHog)
858	}()
859
860	var prof bytes.Buffer
861	if err := deltaMutexProfile(context.Background(), time.Second, &prof); err != nil {
862		t.Fatalf("deltaMutexProfile() got error: %v", err)
863	}
864	p, err := profile.Parse(&prof)
865	if err != nil {
866		t.Fatalf("profile.Parse() got error: %v", err)
867	}
868
869	if s := sum(p, "mutexHog"); s != 0 {
870		t.Errorf("mutexHog found in the delta mutex profile (sum=%d):\n%s", s, p)
871	}
872	if s := sum(p, "backgroundHog"); s <= 0 {
873		t.Errorf("backgroundHog not in the delta mutex profile (sum=%d):\n%s", s, p)
874	}
875}
876
877// sum returns the sum of all mutex counts from the samples whose
878// stacks include the specified function name.
879func sum(p *profile.Profile, fname string) int64 {
880	locIDs := map[*profile.Location]bool{}
881	for _, loc := range p.Location {
882		for _, l := range loc.Line {
883			if strings.Contains(l.Function.Name, fname) {
884				locIDs[loc] = true
885				break
886			}
887		}
888	}
889	var s int64
890	for _, sample := range p.Sample {
891		for _, loc := range sample.Location {
892			if locIDs[loc] {
893				s += sample.Value[0]
894				break
895			}
896		}
897	}
898	return s
899}
900
901func mutexHog(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration) {
902	for time.Since(start) < dt {
903		mu1.Lock()
904		runtime.Gosched()
905		mu2.Lock()
906		mu1.Unlock()
907		mu2.Unlock()
908	}
909}
910
911// backgroundHog is identical to mutexHog. We keep them separate
912// in order to distinguish them with function names in the stack trace.
913func backgroundHog(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration) {
914	for time.Since(start) < dt {
915		mu1.Lock()
916		runtime.Gosched()
917		mu2.Lock()
918		mu1.Unlock()
919		mu2.Unlock()
920	}
921}
922
923func hog(dt time.Duration, hogger func(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration)) {
924	start := time.Now()
925	mu1 := new(sync.Mutex)
926	mu2 := new(sync.Mutex)
927	var wg sync.WaitGroup
928	wg.Add(10)
929	for i := 0; i < 10; i++ {
930		go func() {
931			defer wg.Done()
932			hogger(mu1, mu2, start, dt)
933		}()
934	}
935	wg.Wait()
936}
937
938func TestAgentWithServer(t *testing.T) {
939	oldDialGRPC, oldConfig, oldProfilingDone := dialGRPC, config, profilingDone
940	defer func() {
941		dialGRPC, config, profilingDone = oldDialGRPC, oldConfig, oldProfilingDone
942	}()
943
944	profilingDone = make(chan bool)
945
946	srv, err := testutil.NewServer()
947	if err != nil {
948		t.Fatalf("testutil.NewServer(): %v", err)
949	}
950	fakeServer := &fakeProfilerServer{gotProfiles: map[string][]byte{}}
951	pb.RegisterProfilerServiceServer(srv.Gsrv, fakeServer)
952	srv.Start()
953
954	dialGRPC = func(ctx context.Context, opts ...option.ClientOption) (gtransport.ConnPool, error) {
955		conn, err := gtransport.DialInsecure(ctx, opts...)
956		if err != nil {
957			return nil, err
958		}
959		return testConnPool{conn}, nil
960	}
961
962	quitProfilee := make(chan bool)
963	go profileeLoop(quitProfilee)
964
965	if err := Start(Config{
966		Service:     testService,
967		ProjectID:   testProjectID,
968		APIAddr:     srv.Addr,
969		Instance:    testInstance,
970		Zone:        testZone,
971		numProfiles: 2,
972	}); err != nil {
973		t.Fatalf("Start(): %v", err)
974	}
975
976	select {
977	case <-profilingDone:
978	case <-time.After(testProfileCollectionTimeout):
979		t.Errorf("got timeout after %v, want profile collection done", testProfileCollectionTimeout)
980	}
981	quitProfilee <- true
982
983	for _, pType := range []string{"CPU", "HEAP"} {
984		if profile, ok := fakeServer.gotProfiles[pType]; !ok {
985			t.Errorf("fakeServer.gotProfiles[%s] got no profile, want profile", pType)
986		} else if err := validateProfile(profile, "profilee"); err != nil {
987			t.Errorf("validateProfile(%s) got error: %v", pType, err)
988		}
989	}
990}
991
992// testConnPool is a gtransport.ConnPool used for testing.
993type testConnPool struct{ *grpc.ClientConn }
994
995func (p testConnPool) Num() int               { return 1 }
996func (p testConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
997