1// (c) Copyright IBM Corp. 2021
2// (c) Copyright Instana Inc. 2020
3
4package instana
5
6import (
7	"bytes"
8	"context"
9	"encoding/json"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"net/http"
14	"os"
15	"strconv"
16	"sync"
17	"time"
18
19	"github.com/instana/go-sensor/acceptor"
20	"github.com/instana/go-sensor/autoprofile"
21	"github.com/instana/go-sensor/aws"
22	"github.com/instana/go-sensor/docker"
23)
24
25type fargateSnapshot struct {
26	Service   serverlessSnapshot
27	Task      aws.ECSTaskMetadata
28	Container aws.ECSContainerMetadata
29}
30
31func newFargateSnapshot(pid int, taskMD aws.ECSTaskMetadata, containerMD aws.ECSContainerMetadata) fargateSnapshot {
32	return fargateSnapshot{
33		Service: serverlessSnapshot{
34			EntityID:  ecsEntityID(containerMD),
35			Host:      containerMD.TaskARN,
36			PID:       pid,
37			StartedAt: processStartedAt,
38			Container: containerSnapshot{
39				ID:    containerMD.DockerID,
40				Type:  "docker",
41				Image: containerMD.Image,
42			},
43		},
44		Task:      taskMD,
45		Container: containerMD,
46	}
47}
48
49func newECSTaskPluginPayload(snapshot fargateSnapshot) acceptor.PluginPayload {
50	return acceptor.NewECSTaskPluginPayload(snapshot.Task.TaskARN, acceptor.ECSTaskData{
51		TaskARN:               snapshot.Task.TaskARN,
52		ClusterARN:            snapshot.Container.Cluster,
53		AvailabilityZone:      snapshot.Task.AvailabilityZone,
54		InstanaZone:           snapshot.Service.Zone,
55		TaskDefinition:        snapshot.Task.Family,
56		TaskDefinitionVersion: snapshot.Task.Revision,
57		DesiredStatus:         snapshot.Task.DesiredStatus,
58		KnownStatus:           snapshot.Task.KnownStatus,
59		Limits: acceptor.AWSContainerLimits{
60			CPU:    snapshot.Container.Limits.CPU,
61			Memory: snapshot.Container.Limits.Memory,
62		},
63		PullStartedAt: snapshot.Task.PullStartedAt,
64		PullStoppedAt: snapshot.Task.PullStoppedAt,
65		Tags:          snapshot.Service.Tags,
66	})
67}
68
69func newECSContainerPluginPayload(container aws.ECSContainerMetadata, instrumented bool) acceptor.PluginPayload {
70	data := acceptor.ECSContainerData{
71		Instrumented:          instrumented,
72		DockerID:              container.DockerID,
73		DockerName:            container.DockerName,
74		ContainerName:         container.Name,
75		Image:                 container.Image,
76		ImageID:               container.ImageID,
77		TaskARN:               container.TaskARN,
78		TaskDefinition:        container.TaskDefinition,
79		TaskDefinitionVersion: container.TaskDefinitionVersion,
80		ClusterARN:            container.Cluster,
81		DesiredStatus:         container.DesiredStatus,
82		KnownStatus:           container.KnownStatus,
83		Limits: acceptor.AWSContainerLimits{
84			CPU:    container.Limits.CPU,
85			Memory: container.Limits.Memory,
86		},
87		CreatedAt: container.CreatedAt,
88		StartedAt: container.StartedAt,
89		Type:      container.Type,
90	}
91
92	// we only know the runtime for sure for the instrumented container
93	if instrumented {
94		data.Runtime = "go"
95	}
96
97	return acceptor.NewECSContainerPluginPayload(ecsEntityID(container), data)
98}
99
100func newDockerContainerPluginPayload(
101	container aws.ECSContainerMetadata,
102	prevStats, currentStats docker.ContainerStats,
103	instrumented bool,
104) acceptor.PluginPayload {
105
106	var networkMode string
107	if len(container.Networks) > 0 {
108		networkMode = container.Networks[0].Mode
109	}
110
111	data := acceptor.DockerData{
112		ID:          container.DockerID,
113		CreatedAt:   container.CreatedAt,
114		StartedAt:   container.StartedAt,
115		Image:       container.Image,
116		Labels:      container.ContainerLabels,
117		Names:       []string{container.DockerName},
118		NetworkMode: networkMode,
119		Memory:      acceptor.NewDockerMemoryStatsUpdate(prevStats.Memory, currentStats.Memory),
120		CPU:         acceptor.NewDockerCPUStatsDelta(prevStats.CPU, currentStats.CPU),
121		Network:     acceptor.NewDockerNetworkAggregatedStatsDelta(prevStats.Networks, currentStats.Networks),
122		BlockIO:     acceptor.NewDockerBlockIOStatsDelta(prevStats.BlockIO, currentStats.BlockIO),
123	}
124
125	// we only know the command for the instrumented container
126	if instrumented {
127		data.Command = os.Args[0]
128	}
129
130	return acceptor.NewDockerPluginPayload(ecsEntityID(container), data)
131}
132
133type metricsPayload struct {
134	Plugins []acceptor.PluginPayload `json:"plugins"`
135}
136
137type fargateAgent struct {
138	Endpoint string
139	Key      string
140	PID      int
141	Zone     string
142	Tags     map[string]interface{}
143
144	snapshot         fargateSnapshot
145	lastDockerStats  map[string]docker.ContainerStats
146	lastProcessStats processStats
147
148	mu        sync.Mutex
149	spanQueue []Span
150
151	runtimeSnapshot *SnapshotCollector
152	dockerStats     *ecsDockerStatsCollector
153	processStats    *processStatsCollector
154	client          *http.Client
155	ecs             *aws.ECSMetadataProvider
156	logger          LeveledLogger
157}
158
159func newFargateAgent(
160	serviceName, acceptorEndpoint, agentKey string,
161	client *http.Client,
162	mdProvider *aws.ECSMetadataProvider,
163	logger LeveledLogger,
164) *fargateAgent {
165
166	if logger == nil {
167		logger = defaultLogger
168	}
169
170	if client == nil {
171		client = http.DefaultClient
172	}
173
174	logger.Debug("initializing aws fargate agent")
175
176	agent := &fargateAgent{
177		Endpoint: acceptorEndpoint,
178		Key:      agentKey,
179		PID:      os.Getpid(),
180		Zone:     os.Getenv("INSTANA_ZONE"),
181		Tags:     parseInstanaTags(os.Getenv("INSTANA_TAGS")),
182		runtimeSnapshot: &SnapshotCollector{
183			CollectionInterval: snapshotCollectionInterval,
184			ServiceName:        serviceName,
185		},
186		dockerStats: &ecsDockerStatsCollector{
187			ecs:    mdProvider,
188			logger: logger,
189		},
190		processStats: &processStatsCollector{
191			logger: logger,
192		},
193		client: client,
194		ecs:    mdProvider,
195		logger: logger,
196	}
197
198	go func() {
199		for {
200			// ECS task metadata publishes the full data (e.g. container.StartedAt)
201			// only after a while, so we need to keep trying to gather the full data
202			for i := 0; i < maximumRetries; i++ {
203				snapshot, ok := agent.collectSnapshot(context.Background())
204				if ok {
205					agent.snapshot = snapshot
206					break
207				}
208
209				time.Sleep(expDelay(i + 1))
210			}
211			time.Sleep(snapshotCollectionInterval)
212		}
213	}()
214	go agent.dockerStats.Run(context.Background(), time.Second)
215	go agent.processStats.Run(context.Background(), time.Second)
216
217	return agent
218}
219
220func (a *fargateAgent) Ready() bool { return a.snapshot.Service.EntityID != "" }
221
222func (a *fargateAgent) SendMetrics(data acceptor.Metrics) (err error) {
223	dockerStats := a.dockerStats.Collect()
224	processStats := a.processStats.Collect()
225	defer func() {
226		if err == nil {
227			// only update the last sent stats if they were transmitted successfully
228			// since they are updated on the backend incrementally using received
229			// deltas
230			a.lastDockerStats = dockerStats
231			a.lastProcessStats = processStats
232		}
233	}()
234
235	payload := struct {
236		Metrics metricsPayload `json:"metrics,omitempty"`
237		Spans   []Span         `json:"spans,omitempty"`
238	}{
239		Metrics: metricsPayload{
240			Plugins: []acceptor.PluginPayload{
241				newECSTaskPluginPayload(a.snapshot),
242				newProcessPluginPayload(a.snapshot.Service, a.lastProcessStats, processStats),
243				acceptor.NewGoProcessPluginPayload(acceptor.GoProcessData{
244					PID:      a.PID,
245					Snapshot: a.runtimeSnapshot.Collect(),
246					Metrics:  data,
247				}),
248			},
249		},
250	}
251
252	for _, container := range a.snapshot.Task.Containers {
253		instrumented := ecsEntityID(container) == a.snapshot.Service.EntityID
254		payload.Metrics.Plugins = append(
255			payload.Metrics.Plugins,
256			newECSContainerPluginPayload(container, instrumented),
257			newDockerContainerPluginPayload(
258				container,
259				a.lastDockerStats[container.DockerID],
260				dockerStats[container.DockerID],
261				instrumented,
262			),
263		)
264	}
265
266	a.mu.Lock()
267	if len(a.spanQueue) > 0 {
268		payload.Spans = make([]Span, len(a.spanQueue))
269		copy(payload.Spans, a.spanQueue)
270		a.spanQueue = a.spanQueue[:0]
271	}
272	a.mu.Unlock()
273
274	buf := bytes.NewBuffer(nil)
275	if err := json.NewEncoder(buf).Encode(payload); err != nil {
276		return fmt.Errorf("failed to marshal metrics payload: %s", err)
277	}
278
279	req, err := http.NewRequest(http.MethodPost, a.Endpoint+"/bundle", buf)
280	if err != nil {
281		return fmt.Errorf("failed to prepare send metrics request: %s", err)
282	}
283
284	req.Header.Set("Content-Type", "application/json")
285
286	return a.sendRequest(req)
287}
288
289func (a *fargateAgent) SendEvent(event *EventData) error { return nil }
290
291func (a *fargateAgent) SendSpans(spans []Span) error {
292	from := newServerlessAgentFromS(a.snapshot.Service.EntityID, "aws")
293	for i := range spans {
294		spans[i].From = from
295	}
296
297	// enqueue the spans to send them in a bundle with metrics instead of sending immediately
298	a.mu.Lock()
299	a.spanQueue = append(a.spanQueue, spans...)
300	a.mu.Unlock()
301
302	return nil
303}
304
305func (a *fargateAgent) SendProfiles(profiles []autoprofile.Profile) error { return nil }
306
307func (a *fargateAgent) Flush(ctx context.Context) error {
308	if len(a.spanQueue) == 0 {
309		return nil
310	}
311
312	if !a.Ready() {
313		return ErrAgentNotReady
314	}
315
316	a.mu.Lock()
317	defer a.mu.Unlock()
318
319	buf := bytes.NewBuffer(nil)
320	if err := json.NewEncoder(buf).Encode(a.spanQueue); err != nil {
321		return fmt.Errorf("failed to marshal traces payload: %s", err)
322	}
323	a.spanQueue = a.spanQueue[:0]
324
325	req, err := http.NewRequest(http.MethodPost, a.Endpoint+"/traces", buf)
326	if err != nil {
327		return fmt.Errorf("failed to prepare send traces request: %s", err)
328	}
329
330	req.Header.Set("Content-Type", "application/json")
331
332	return a.sendRequest(req.WithContext(ctx))
333}
334
335func (a *fargateAgent) sendRequest(req *http.Request) error {
336	req.Header.Set("X-Instana-Host", a.snapshot.Service.EntityID)
337	req.Header.Set("X-Instana-Key", a.Key)
338	req.Header.Set("X-Instana-Time", strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10))
339
340	resp, err := a.client.Do(req)
341	if err != nil {
342		return fmt.Errorf("failed to send request to the serverless agent: %s", err)
343	}
344
345	defer resp.Body.Close()
346
347	if resp.StatusCode >= http.StatusBadRequest {
348		respBody, err := ioutil.ReadAll(resp.Body)
349		if err != nil {
350			a.logger.Debug("failed to read serverless agent response: ", err)
351			return nil
352		}
353
354		a.logger.Info("serverless agent has responded with ", resp.Status, ": ", string(respBody))
355		return nil
356	}
357
358	io.CopyN(ioutil.Discard, resp.Body, 1<<20)
359
360	return nil
361}
362
363func (a *fargateAgent) collectSnapshot(ctx context.Context) (fargateSnapshot, bool) {
364	var wg sync.WaitGroup
365
366	// fetch task metadata
367	wg.Add(1)
368	var taskMD aws.ECSTaskMetadata
369	go func() {
370		defer wg.Done()
371
372		var err error
373		taskMD, err = a.ecs.TaskMetadata(ctx)
374		if err != nil {
375			a.logger.Warn("failed to get task metadata: ", err)
376		}
377	}()
378
379	// fetch container metadata
380	wg.Add(1)
381	var containerMD aws.ECSContainerMetadata
382	go func() {
383		defer wg.Done()
384
385		var err error
386		containerMD, err = a.ecs.ContainerMetadata(ctx)
387		if err != nil {
388			a.logger.Warn("failed to get container metadata: ", err)
389		}
390	}()
391
392	wg.Wait()
393
394	// ensure that all metadata has been gathered
395	if taskMD.TaskARN == "" || containerMD.StartedAt.IsZero() {
396		a.logger.Error("snapshot collection failed (the metadata might not be ready yet)")
397		return fargateSnapshot{}, false
398	}
399
400	snapshot := newFargateSnapshot(a.PID, taskMD, containerMD)
401	snapshot.Service.Zone = a.Zone
402	snapshot.Service.Tags = a.Tags
403
404	a.logger.Debug("collected snapshot")
405
406	return snapshot, true
407}
408
409type ecsDockerStatsCollector struct {
410	ecs interface {
411		TaskStats(context.Context) (map[string]docker.ContainerStats, error)
412	}
413	logger LeveledLogger
414
415	mu    sync.RWMutex
416	stats map[string]docker.ContainerStats
417}
418
419func (c *ecsDockerStatsCollector) Run(ctx context.Context, collectionInterval time.Duration) {
420	timer := time.NewTicker(collectionInterval)
421	defer timer.Stop()
422
423	for {
424		select {
425		case <-timer.C:
426			fetchCtx, cancel := context.WithTimeout(ctx, collectionInterval)
427			c.fetchStats(fetchCtx)
428			cancel()
429		case <-ctx.Done():
430			return
431		}
432	}
433}
434
435func (c *ecsDockerStatsCollector) Collect() map[string]docker.ContainerStats {
436	c.mu.RLock()
437	defer c.mu.RUnlock()
438
439	return c.stats
440}
441
442func (c *ecsDockerStatsCollector) fetchStats(ctx context.Context) {
443	stats, err := c.ecs.TaskStats(ctx)
444	if err != nil {
445		if ctx.Err() != nil {
446			// request either timed out or had been cancelled, keep the old value
447			c.logger.Debug("failed to retrieve Docker container stats (timed out), skipping")
448			return
449		}
450
451		// request failed, reset recorded stats
452		c.logger.Warn("failed to retrieve Docker container stats: ", err)
453		stats = nil
454	}
455
456	c.mu.Lock()
457	c.stats = stats
458	defer c.mu.Unlock()
459}
460
461func ecsEntityID(md aws.ECSContainerMetadata) string {
462	return md.TaskARN + "::" + md.Name
463}
464