1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package main
20
21import (
22	"encoding/binary"
23	"encoding/json"
24	"fmt"
25	"os"
26	"sort"
27	"strings"
28
29	ppb "google.golang.org/grpc/profiling/proto"
30)
31
32type jsonNode struct {
33	Name      string  `json:"name"`
34	Cat       string  `json:"cat"`
35	ID        string  `json:"id"`
36	Cname     string  `json:"cname"`
37	Phase     string  `json:"ph"`
38	Timestamp float64 `json:"ts"`
39	PID       string  `json:"pid"`
40	TID       string  `json:"tid"`
41}
42
43// Catapult does not allow specifying colours manually; a 20-odd predefined
44// labels are used (that don't make much sense outside the context of
45// Chromium). See this for more details:
46//
47// https://github.com/catapult-project/catapult/blob/bef344f7017fc9e04f7049d0f58af6d9ce9f4ab6/tracing/tracing/base/color_scheme.html#L29
48func hashCname(tag string) string {
49	if strings.Contains(tag, "encoding") {
50		return "rail_response"
51	}
52
53	if strings.Contains(tag, "compression") {
54		return "cq_build_passed"
55	}
56
57	if strings.Contains(tag, "transport") {
58		if strings.Contains(tag, "blocking") {
59			return "rail_animation"
60		}
61		return "good"
62	}
63
64	if strings.Contains(tag, "header") {
65		return "cq_build_attempt_failed"
66	}
67
68	if tag == "/" {
69		return "heap_dump_stack_frame"
70	}
71
72	if strings.Contains(tag, "flow") || strings.Contains(tag, "tmp") {
73		return "heap_dump_stack_frame"
74	}
75
76	return ""
77}
78
79// filterCounter identifies the counter-th instance of a timer of the type
80// `filter` within a Stat. This, in conjunction with the counter data structure
81// defined below, is used to draw flows between linked loopy writer/reader
82// events with application goroutine events in trace-viewer. This is possible
83// because enqueues and dequeues are ordered -- that is, the first dequeue must
84// be dequeueing the first enqueue operation.
85func filterCounter(stat *ppb.Stat, filter string, counter int) int {
86	localCounter := 0
87	for i := 0; i < len(stat.Timers); i++ {
88		if stat.Timers[i].Tags == filter {
89			if localCounter == counter {
90				return i
91			}
92			localCounter++
93		}
94	}
95
96	return -1
97}
98
99// counter is state object used to store and retrieve the number of timers of a
100// particular type that have been seen.
101type counter struct {
102	c map[string]int
103}
104
105func newCounter() *counter {
106	return &counter{c: make(map[string]int)}
107}
108
109func (c *counter) GetAndInc(s string) int {
110	ret := c.c[s]
111	c.c[s]++
112	return ret
113}
114
115func catapultNs(sec int64, nsec int32) float64 {
116	return float64((sec * 1000000000) + int64(nsec))
117}
118
119// streamStatsCatapultJSONSingle processes a single proto Stat object to return
120// an array of jsonNodes in trace-viewer's format.
121func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32) []jsonNode {
122	if len(stat.Timers) == 0 {
123		return nil
124	}
125
126	connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8])
127	streamID := binary.BigEndian.Uint32(stat.Metadata[8:12])
128	opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID)
129
130	var loopyReaderGoID, loopyWriterGoID int64
131	for i := 0; i < len(stat.Timers) && (loopyReaderGoID == 0 || loopyWriterGoID == 0); i++ {
132		if strings.Contains(stat.Timers[i].Tags, "/loopyReader") {
133			loopyReaderGoID = stat.Timers[i].GoId
134		} else if strings.Contains(stat.Timers[i].Tags, "/loopyWriter") {
135			loopyWriterGoID = stat.Timers[i].GoId
136		}
137	}
138
139	lrc, lwc := newCounter(), newCounter()
140
141	var result []jsonNode
142	result = append(result,
143		jsonNode{
144			Name:      "loopyReaderTmp",
145			ID:        opid,
146			Cname:     hashCname("tmp"),
147			Phase:     "i",
148			Timestamp: 0,
149			PID:       fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter),
150			TID:       fmt.Sprintf("%d", loopyReaderGoID),
151		},
152		jsonNode{
153			Name:      "loopyWriterTmp",
154			ID:        opid,
155			Cname:     hashCname("tmp"),
156			Phase:     "i",
157			Timestamp: 0,
158			PID:       fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter),
159			TID:       fmt.Sprintf("%d", loopyWriterGoID),
160		},
161	)
162
163	for i := 0; i < len(stat.Timers); i++ {
164		categories := stat.Tags
165		pid, tid := opid, fmt.Sprintf("%d", stat.Timers[i].GoId)
166
167		if stat.Timers[i].GoId == loopyReaderGoID {
168			pid, tid = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
169
170			var flowEndID int
171			var flowEndPID, flowEndTID string
172			switch stat.Timers[i].Tags {
173			case "/http2/recv/header":
174				flowEndID = filterCounter(stat, "/grpc/stream/recv/header", lrc.GetAndInc("/http2/recv/header"))
175				if flowEndID != -1 {
176					flowEndPID = opid
177					flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
178				} else {
179					logger.Infof("cannot find %s/grpc/stream/recv/header for %s/http2/recv/header", opid, opid)
180				}
181			case "/http2/recv/dataFrame/loopyReader":
182				flowEndID = filterCounter(stat, "/recvAndDecompress", lrc.GetAndInc("/http2/recv/dataFrame/loopyReader"))
183				if flowEndID != -1 {
184					flowEndPID = opid
185					flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
186				} else {
187					logger.Infof("cannot find %s/recvAndDecompress for %s/http2/recv/dataFrame/loopyReader", opid, opid)
188				}
189			default:
190				flowEndID = -1
191			}
192
193			if flowEndID != -1 {
194				flowID := fmt.Sprintf("lrc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[i].Tags, connectionCounter, stat.Timers[flowEndID].Tags, i, pid, tid, flowEndID, flowEndPID, flowEndTID)
195				result = append(result,
196					jsonNode{
197						Name:      fmt.Sprintf("%s/flow", opid),
198						Cat:       categories + ",flow",
199						ID:        flowID,
200						Cname:     hashCname("flow"),
201						Phase:     "s",
202						Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
203						PID:       pid,
204						TID:       tid,
205					},
206					jsonNode{
207						Name:      fmt.Sprintf("%s/flow", opid),
208						Cat:       categories + ",flow",
209						ID:        flowID,
210						Cname:     hashCname("flow"),
211						Phase:     "f",
212						Timestamp: catapultNs(stat.Timers[flowEndID].BeginSec-baseSec, stat.Timers[flowEndID].BeginNsec-baseNsec),
213						PID:       flowEndPID,
214						TID:       flowEndTID,
215					},
216				)
217			}
218		} else if stat.Timers[i].GoId == loopyWriterGoID {
219			pid, tid = fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
220
221			var flowBeginID int
222			var flowBeginPID, flowBeginTID string
223			switch stat.Timers[i].Tags {
224			case "/http2/recv/header/loopyWriter/registerOutStream":
225				flowBeginID = filterCounter(stat, "/http2/recv/header", lwc.GetAndInc("/http2/recv/header/loopyWriter/registerOutStream"))
226				flowBeginPID = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter)
227				flowBeginTID = fmt.Sprintf("%d", loopyReaderGoID)
228			case "/http2/send/dataFrame/loopyWriter/preprocess":
229				flowBeginID = filterCounter(stat, "/transport/enqueue", lwc.GetAndInc("/http2/send/dataFrame/loopyWriter/preprocess"))
230				if flowBeginID != -1 {
231					flowBeginPID = opid
232					flowBeginTID = fmt.Sprintf("%d", stat.Timers[flowBeginID].GoId)
233				} else {
234					logger.Infof("cannot find /%d/transport/enqueue for /%d/http2/send/dataFrame/loopyWriter/preprocess", connectionCounter, connectionCounter)
235				}
236			default:
237				flowBeginID = -1
238			}
239
240			if flowBeginID != -1 {
241				flowID := fmt.Sprintf("lwc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[flowBeginID].Tags, connectionCounter, stat.Timers[i].Tags, flowBeginID, flowBeginPID, flowBeginTID, i, pid, tid)
242				result = append(result,
243					jsonNode{
244						Name:      fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
245						Cat:       categories + ",flow",
246						ID:        flowID,
247						Cname:     hashCname("flow"),
248						Phase:     "s",
249						Timestamp: catapultNs(stat.Timers[flowBeginID].EndSec-baseSec, stat.Timers[flowBeginID].EndNsec-baseNsec),
250						PID:       flowBeginPID,
251						TID:       flowBeginTID,
252					},
253					jsonNode{
254						Name:      fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
255						Cat:       categories + ",flow",
256						ID:        flowID,
257						Cname:     hashCname("flow"),
258						Phase:     "f",
259						Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
260						PID:       pid,
261						TID:       tid,
262					},
263				)
264			}
265		}
266
267		result = append(result,
268			jsonNode{
269				Name:      fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
270				Cat:       categories,
271				ID:        opid,
272				Cname:     hashCname(stat.Timers[i].Tags),
273				Phase:     "B",
274				Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
275				PID:       pid,
276				TID:       tid,
277			},
278			jsonNode{
279				Name:      fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
280				Cat:       categories,
281				ID:        opid,
282				Cname:     hashCname(stat.Timers[i].Tags),
283				Phase:     "E",
284				Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
285				PID:       pid,
286				TID:       tid,
287			},
288		)
289	}
290
291	return result
292}
293
294// timerBeginIsBefore compares two proto Timer objects to determine if the
295// first comes before the second chronologically.
296func timerBeginIsBefore(ti *ppb.Timer, tj *ppb.Timer) bool {
297	if ti.BeginSec == tj.BeginSec {
298		return ti.BeginNsec < tj.BeginNsec
299	}
300	return ti.BeginSec < tj.BeginSec
301}
302
303// streamStatsCatapulJSON receives a *snapshot and the name of a JSON file to
304// write to. The grpc-go profiling snapshot is processed and converted to a
305// JSON format that can be understood by trace-viewer.
306func streamStatsCatapultJSON(s *snapshot, streamStatsCatapultJSONFileName string) (err error) {
307	logger.Infof("calculating stream stats filters")
308	filterArray := strings.Split(*flagStreamStatsFilter, ",")
309	filter := make(map[string]bool)
310	for _, f := range filterArray {
311		filter[f] = true
312	}
313
314	logger.Infof("filter stream stats for %s", *flagStreamStatsFilter)
315	var streamStats []*ppb.Stat
316	for _, stat := range s.StreamStats {
317		if _, ok := filter[stat.Tags]; ok {
318			streamStats = append(streamStats, stat)
319		}
320	}
321
322	logger.Infof("sorting timers within all stats")
323	for id := range streamStats {
324		sort.Slice(streamStats[id].Timers, func(i, j int) bool {
325			return timerBeginIsBefore(streamStats[id].Timers[i], streamStats[id].Timers[j])
326		})
327	}
328
329	logger.Infof("sorting stream stats")
330	sort.Slice(streamStats, func(i, j int) bool {
331		if len(streamStats[j].Timers) == 0 {
332			return true
333		} else if len(streamStats[i].Timers) == 0 {
334			return false
335		}
336		pi := binary.BigEndian.Uint64(streamStats[i].Metadata[0:8])
337		pj := binary.BigEndian.Uint64(streamStats[j].Metadata[0:8])
338		if pi == pj {
339			return timerBeginIsBefore(streamStats[i].Timers[0], streamStats[j].Timers[0])
340		}
341
342		return pi < pj
343	})
344
345	// Clip the last stat as it's from the /Profiling/GetStreamStats call that we
346	// made to retrieve the stats themselves. This likely happened millions of
347	// nanoseconds after the last stream we want to profile, so it'd just make
348	// the catapult graph less readable.
349	if len(streamStats) > 0 {
350		streamStats = streamStats[:len(streamStats)-1]
351	}
352
353	// All timestamps use the earliest timestamp available as the reference.
354	logger.Infof("calculating the earliest timestamp across all timers")
355	var base *ppb.Timer
356	for _, stat := range streamStats {
357		for _, timer := range stat.Timers {
358			if base == nil || timerBeginIsBefore(base, timer) {
359				base = timer
360			}
361		}
362	}
363
364	logger.Infof("converting %d stats to catapult JSON format", len(streamStats))
365	var jsonNodes []jsonNode
366	for _, stat := range streamStats {
367		jsonNodes = append(jsonNodes, streamStatsCatapultJSONSingle(stat, base.BeginSec, base.BeginNsec)...)
368	}
369
370	logger.Infof("marshalling catapult JSON")
371	b, err := json.Marshal(jsonNodes)
372	if err != nil {
373		logger.Errorf("cannot marshal JSON: %v", err)
374		return err
375	}
376
377	logger.Infof("creating catapult JSON file")
378	streamStatsCatapultJSONFile, err := os.Create(streamStatsCatapultJSONFileName)
379	if err != nil {
380		logger.Errorf("cannot create file %s: %v", streamStatsCatapultJSONFileName, err)
381		return err
382	}
383	defer streamStatsCatapultJSONFile.Close()
384
385	logger.Infof("writing catapult JSON to disk")
386	_, err = streamStatsCatapultJSONFile.Write(b)
387	if err != nil {
388		logger.Errorf("cannot write marshalled JSON: %v", err)
389		return err
390	}
391
392	logger.Infof("successfully wrote catapult JSON file %s", streamStatsCatapultJSONFileName)
393	return nil
394}
395