1// (c) Copyright IBM Corp. 2021
2// (c) Copyright Instana Inc. 2020
3
4package instana
5
6import (
7	"context"
8	"errors"
9	"os"
10	"os/user"
11	"strconv"
12	"sync"
13	"time"
14
15	"github.com/instana/go-sensor/acceptor"
16	"github.com/instana/go-sensor/process"
17)
18
19// ErrAgentNotReady is an error returned for an attempt to communicate with an agent before the client
20// announcement process is done
21var ErrAgentNotReady = errors.New("agent not ready")
22
23type containerSnapshot struct {
24	ID    string
25	Type  string
26	Image string
27}
28
29type serverlessSnapshot struct {
30	EntityID  string
31	Host      string
32	PID       int
33	Container containerSnapshot
34	StartedAt time.Time
35	Zone      string
36	Tags      map[string]interface{}
37}
38
39func newProcessPluginPayload(snapshot serverlessSnapshot, prevStats, currentStats processStats) acceptor.PluginPayload {
40	var currUser, currGroup string
41	if u, err := user.Current(); err == nil {
42		currUser = u.Username
43
44		if g, err := user.LookupGroupId(u.Gid); err == nil {
45			currGroup = g.Name
46		}
47	}
48
49	env := getProcessEnv()
50	for k := range env {
51		if k == "INSTANA_AGENT_KEY" {
52			continue
53		}
54
55		if sensor.options.Tracer.Secrets.Match(k) {
56			env[k] = "<redacted>"
57		}
58	}
59
60	return acceptor.NewProcessPluginPayload(strconv.Itoa(snapshot.PID), acceptor.ProcessData{
61		PID:           snapshot.PID,
62		Exec:          os.Args[0],
63		Args:          os.Args[1:],
64		Env:           env,
65		User:          currUser,
66		Group:         currGroup,
67		ContainerID:   snapshot.Container.ID,
68		ContainerType: snapshot.Container.Type,
69		Start:         snapshot.StartedAt.UnixNano() / int64(time.Millisecond),
70		HostName:      snapshot.Host,
71		HostPID:       snapshot.PID,
72		CPU:           acceptor.NewProcessCPUStatsDelta(prevStats.CPU, currentStats.CPU, currentStats.Tick-prevStats.Tick),
73		Memory:        acceptor.NewProcessMemoryStatsUpdate(prevStats.Memory, currentStats.Memory),
74		OpenFiles:     acceptor.NewProcessOpenFilesStatsUpdate(prevStats.Limits, currentStats.Limits),
75	})
76}
77
78type processStats struct {
79	Tick   int
80	CPU    process.CPUStats
81	Memory process.MemStats
82	Limits process.ResourceLimits
83}
84
85type processStatsCollector struct {
86	logger LeveledLogger
87	mu     sync.RWMutex
88	stats  processStats
89}
90
91func (c *processStatsCollector) Run(ctx context.Context, collectionInterval time.Duration) {
92	timer := time.NewTicker(collectionInterval)
93	defer timer.Stop()
94
95	for {
96		select {
97		case <-timer.C:
98			fetchCtx, cancel := context.WithTimeout(ctx, collectionInterval)
99			c.fetchStats(fetchCtx)
100			cancel()
101		case <-ctx.Done():
102			return
103		}
104	}
105}
106
107func (c *processStatsCollector) Collect() processStats {
108	c.mu.RLock()
109	defer c.mu.RUnlock()
110
111	return c.stats
112}
113
114func (c *processStatsCollector) fetchStats(ctx context.Context) {
115	stats := c.Collect()
116
117	var wg sync.WaitGroup
118	wg.Add(3)
119
120	done := make(chan struct{})
121	go func() {
122		wg.Wait()
123		close(done)
124	}()
125
126	go func() {
127		defer wg.Done()
128
129		st, tick, err := process.Stats().CPU()
130		if err != nil {
131			c.logger.Debug("failed to read process CPU stats, skipping: ", err)
132			return
133		}
134
135		stats.CPU, stats.Tick = st, tick
136	}()
137
138	go func() {
139		defer wg.Done()
140
141		st, err := process.Stats().Memory()
142		if err != nil {
143			c.logger.Debug("failed to read process memory stats, skipping: ", err)
144			return
145		}
146
147		stats.Memory = st
148	}()
149
150	go func() {
151		defer wg.Done()
152
153		st, err := process.Stats().Limits()
154		if err != nil {
155			c.logger.Debug("failed to read process open files stats, skipping: ", err)
156			return
157		}
158
159		stats.Limits = st
160	}()
161
162	select {
163	case <-done:
164		break
165	case <-ctx.Done():
166		c.logger.Debug("failed to obtain process stats (timed out)")
167		return // context has been cancelled, skip this update
168	}
169
170	c.mu.Lock()
171	c.stats = stats
172	defer c.mu.Unlock()
173}
174