1package libkb
2
3import (
4	"fmt"
5	"net/http"
6	"strings"
7	"sync"
8	"time"
9
10	humanize "github.com/dustin/go-humanize"
11	keybase1 "github.com/keybase/client/go/protocol/keybase1"
12	"github.com/keybase/go-framed-msgpack-rpc/rpc"
13	context "golang.org/x/net/context"
14	"golang.org/x/sync/errgroup"
15)
16
17type NetworkStatsJSON struct {
18	Local  []keybase1.InstrumentationStat `json:"local"`
19	Remote []keybase1.InstrumentationStat `json:"remote"`
20}
21
22var internalHosts = map[string]struct{}{
23	DevelServerURI:      {},
24	StagingServerURI:    {},
25	ProductionServerURI: {},
26	ProductionSiteURI:   {},
27}
28
29func InstrumentationTagFromRequest(req *http.Request) string {
30	if req.URL == nil {
31		return ""
32	}
33	host := req.URL.Host
34	path := req.URL.Path
35	if _, ok := internalHosts[fmt.Sprintf("%s://%s", req.URL.Scheme, host)]; ok {
36		host = ""
37		path = strings.TrimPrefix(req.URL.Path, APIURIPathPrefix)
38		path = strings.TrimPrefix(path, "/")
39	}
40	return fmt.Sprintf("%s %s%s", req.Method, host, path)
41}
42
43func AddRPCRecord(tag string, stat keybase1.InstrumentationStat, record rpc.InstrumentationRecord) keybase1.InstrumentationStat {
44	if stat.NumCalls == 0 {
45		stat.Ctime = keybase1.ToTime(time.Now())
46	}
47	if stat.Tag == "" {
48		stat.Tag = tag
49	}
50	stat.Mtime = keybase1.ToTime(time.Now())
51	stat.NumCalls++
52	dur := keybase1.ToDurationMsec(record.Dur)
53	stat.TotalDur += dur
54	if dur > stat.MaxDur {
55		stat.MaxDur = dur
56	}
57	if dur < stat.MinDur || stat.NumCalls == 1 {
58		stat.MinDur = dur
59	}
60
61	stat.TotalSize += record.Size
62	if record.Size > stat.MaxSize {
63		stat.MaxSize = record.Size
64	}
65	if record.Size < stat.MinSize || stat.NumCalls == 1 {
66		stat.MinSize = record.Size
67	}
68
69	stat.AvgDur = stat.TotalDur / keybase1.DurationMsec(stat.NumCalls)
70	stat.AvgSize = stat.TotalSize / int64(stat.NumCalls)
71	return stat
72}
73
74type DiskInstrumentationStorage struct {
75	Contextified
76	sync.Mutex
77	src     keybase1.NetworkSource
78	storage map[string]keybase1.InstrumentationStat
79
80	eg      errgroup.Group
81	stopCh  chan struct{}
82	started bool
83}
84
85var _ rpc.NetworkInstrumenterStorage = (*DiskInstrumentationStorage)(nil)
86
87func NewDiskInstrumentationStorage(g *GlobalContext, src keybase1.NetworkSource) *DiskInstrumentationStorage {
88	return &DiskInstrumentationStorage{
89		Contextified: NewContextified(g),
90		src:          src,
91		storage:      make(map[string]keybase1.InstrumentationStat),
92	}
93}
94
95func (s *DiskInstrumentationStorage) Start(ctx context.Context) {
96	defer s.G().CTrace(ctx, "DiskInstrumentationStorage: Start", nil)()
97	s.Lock()
98	defer s.Unlock()
99	if s.started {
100		return
101	}
102	s.stopCh = make(chan struct{})
103	s.started = true
104	s.eg.Go(func() error { return s.flushLoop(s.stopCh) })
105}
106
107func (s *DiskInstrumentationStorage) Stop(ctx context.Context) chan struct{} {
108	defer s.G().CTrace(ctx, "DiskInstrumentationStorage: Stop", nil)()
109	s.Lock()
110	defer s.Unlock()
111	ch := make(chan struct{})
112	if s.started {
113		close(s.stopCh)
114		s.started = false
115		go func() {
116			if err := s.eg.Wait(); err != nil {
117				s.G().Log.Debug("DiskInstrumentationStorage: flush: unable to wait for shutdown: %v", err)
118			}
119			close(ch)
120		}()
121	} else {
122		close(ch)
123	}
124	return ch
125}
126
127func (s *DiskInstrumentationStorage) flushLoop(stopCh chan struct{}) error {
128	ctx := context.Background()
129	for {
130		select {
131		case <-stopCh:
132			return s.Flush(ctx)
133		case <-time.After(5 * time.Minute):
134			if err := s.Flush(ctx); err != nil {
135				s.G().Log.CDebugf(ctx, "DiskInstrumentationStorage: flushLoop: unable to flush: %v", err)
136			}
137		}
138	}
139}
140
141func (s *DiskInstrumentationStorage) Flush(ctx context.Context) (err error) {
142	s.Lock()
143	storage := s.storage
144	s.storage = make(map[string]keybase1.InstrumentationStat)
145	s.Unlock()
146	return s.flush(ctx, storage)
147}
148
149func (s *DiskInstrumentationStorage) flush(ctx context.Context, storage map[string]keybase1.InstrumentationStat) (err error) {
150	defer s.G().CTrace(ctx, "DiskInstrumentationStorage: flush", &err)()
151	for tag, record := range storage {
152		var existing keybase1.InstrumentationStat
153		found, err := s.G().LocalDb.GetIntoMsgpack(&existing, s.dbKey(tag))
154		if err != nil {
155			return err
156		}
157		// Keep only window of the past month
158		if found && time.Since(existing.Ctime.Time()) <= time.Hour*24*30 {
159			record = existing.AppendStat(record)
160		}
161		if err := s.G().LocalDb.PutObjMsgpack(s.dbKey(tag), nil, record); err != nil {
162			return err
163		}
164	}
165	return nil
166}
167
168func (s *DiskInstrumentationStorage) keyPrefix() string {
169	return fmt.Sprintf("src:%d", s.src)
170}
171
172func (s *DiskInstrumentationStorage) dbKey(tag string) DbKey {
173	return DbKey{
174		Typ: DBNetworkInstrumentation,
175		Key: fmt.Sprintf("%s|%s", s.keyPrefix(), tag),
176	}
177}
178
179func (s *DiskInstrumentationStorage) getAllKeysLocked() (keys []DbKey, err error) {
180	prefix := DbKey{
181		Typ: DBNetworkInstrumentation,
182	}.ToBytes()
183	dbKeys, err := s.G().LocalDb.KeysWithPrefixes(prefix)
184	if err != nil {
185		return nil, fmt.Errorf("could not get KeysWithPrefixes: %v", err)
186	}
187	keys = make([]DbKey, 0, len(dbKeys))
188	for dbKey := range dbKeys {
189		if dbKey.Typ == DBNetworkInstrumentation {
190			keys = append(keys, dbKey)
191		}
192	}
193	return keys, nil
194}
195
196func (s *DiskInstrumentationStorage) GetAll(ctx context.Context) (res []keybase1.InstrumentationStat, err error) {
197	defer s.G().CTrace(ctx, "DiskInstrumentationStorage: GetAll", &err)()
198	s.Lock()
199	defer s.Unlock()
200
201	if err := s.flush(ctx, s.storage); err != nil {
202		return nil, err
203	}
204
205	dbKeys, err := s.getAllKeysLocked()
206	if err != nil {
207		return nil, err
208	}
209	keyPrefix := s.keyPrefix()
210	for _, dbKey := range dbKeys {
211		// ensure key matches expected format
212		keyParts := strings.Split(dbKey.Key, "|")
213		if len(keyParts) < 2 || keyParts[0] != keyPrefix {
214			continue
215		}
216		var record keybase1.InstrumentationStat
217		ok, err := s.G().LocalDb.GetIntoMsgpack(&record, dbKey)
218		if err != nil {
219			return nil, err
220		} else if !ok {
221			continue
222		}
223		res = append(res, record)
224	}
225	return res, nil
226}
227
228func (s *DiskInstrumentationStorage) Stats(ctx context.Context) (res []keybase1.InstrumentationStat, err error) {
229	defer s.G().CTrace(ctx, "DiskInstrumentationStorage: Stats", &err)()
230	return s.GetAll(ctx)
231}
232
233var tagLogBlacklist = map[string]struct{}{
234	"Call gregor.1.incoming.ping": {},
235}
236
237func (s *DiskInstrumentationStorage) logRecord(ctx context.Context, tag string, record rpc.InstrumentationRecord) {
238	if s.src == keybase1.NetworkSource_LOCAL {
239		return
240	}
241	if _, ok := tagLogBlacklist[tag]; !ok {
242		s.G().PerfLog.CDebugf(ctx, "%s %v %s", tag, record.Dur, humanize.Bytes(uint64(record.Size)))
243		s.G().RuntimeStats.PushPerfEvent(keybase1.PerfEvent{
244			EventType: keybase1.PerfEventType_NETWORK,
245			Message:   tag,
246			Ctime:     keybase1.ToTime(record.Ctime),
247		})
248	}
249}
250
251func (s *DiskInstrumentationStorage) Put(ctx context.Context, tag string, record rpc.InstrumentationRecord) error {
252	s.Lock()
253	defer s.Unlock()
254	s.storage[tag] = AddRPCRecord(tag, s.storage[tag], record)
255	s.logRecord(ctx, tag, record)
256	return nil
257}
258
259func NetworkInstrumenterStorageFromSrc(g *GlobalContext, src keybase1.NetworkSource) rpc.NetworkInstrumenterStorage {
260	switch src {
261	case keybase1.NetworkSource_LOCAL:
262		return g.LocalNetworkInstrumenterStorage
263	case keybase1.NetworkSource_REMOTE:
264		return g.RemoteNetworkInstrumenterStorage
265	default:
266		return rpc.NewDummyInstrumentationStorage()
267	}
268}
269