1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package zpages
17
18import (
19	"fmt"
20	"io"
21	"log"
22	"math"
23	"net/http"
24	"sort"
25	"sync"
26	"text/tabwriter"
27	"time"
28
29	"go.opencensus.io/plugin/ocgrpc"
30	"go.opencensus.io/stats/view"
31)
32
33const bytesPerKb = 1024
34
35var (
36	programStartTime = time.Now()
37	mu               sync.Mutex // protects snaps
38	snaps            = make(map[methodKey]*statSnapshot)
39
40	// viewType lists the views we are interested in for RPC stats.
41	// A view's map value indicates whether that view contains data for received
42	// RPCs.
43	viewType = map[*view.View]bool{
44		ocgrpc.ClientCompletedRPCsView:          false,
45		ocgrpc.ClientSentBytesPerRPCView:        false,
46		ocgrpc.ClientSentMessagesPerRPCView:     false,
47		ocgrpc.ClientReceivedBytesPerRPCView:    false,
48		ocgrpc.ClientReceivedMessagesPerRPCView: false,
49		ocgrpc.ClientRoundtripLatencyView:       false,
50		ocgrpc.ServerCompletedRPCsView:          true,
51		ocgrpc.ServerReceivedBytesPerRPCView:    true,
52		ocgrpc.ServerReceivedMessagesPerRPCView: true,
53		ocgrpc.ServerSentBytesPerRPCView:        true,
54		ocgrpc.ServerSentMessagesPerRPCView:     true,
55		ocgrpc.ServerLatencyView:                true,
56	}
57)
58
59func registerRPCViews() {
60	views := make([]*view.View, 0, len(viewType))
61	for v := range viewType {
62		views = append(views, v)
63	}
64	if err := view.Register(views...); err != nil {
65		log.Printf("error subscribing to views: %v", err)
66	}
67	view.RegisterExporter(snapExporter{})
68}
69
70func rpczHandler(w http.ResponseWriter, r *http.Request) {
71	w.Header().Set("Content-Type", "text/html; charset=utf-8")
72	WriteHTMLRpczPage(w)
73}
74
75// WriteHTMLRpczPage writes an HTML document to w containing per-method RPC stats.
76func WriteHTMLRpczPage(w io.Writer) {
77	if err := headerTemplate.Execute(w, headerData{Title: "RPC Stats"}); err != nil {
78		log.Printf("zpages: executing template: %v", err)
79	}
80	WriteHTMLRpczSummary(w)
81	if err := footerTemplate.Execute(w, nil); err != nil {
82		log.Printf("zpages: executing template: %v", err)
83	}
84}
85
86// WriteHTMLRpczSummary writes HTML to w containing per-method RPC stats.
87//
88// It includes neither a header nor footer, so you can embed this data in other pages.
89func WriteHTMLRpczSummary(w io.Writer) {
90	mu.Lock()
91	if err := statsTemplate.Execute(w, getStatsPage()); err != nil {
92		log.Printf("zpages: executing template: %v", err)
93	}
94	mu.Unlock()
95}
96
97// WriteTextRpczPage writes formatted text to w containing per-method RPC stats.
98func WriteTextRpczPage(w io.Writer) {
99	mu.Lock()
100	defer mu.Unlock()
101	page := getStatsPage()
102
103	for i, sg := range page.StatGroups {
104		switch i {
105		case 0:
106			fmt.Fprint(w, "Sent:\n")
107		case 1:
108			fmt.Fprint(w, "\nReceived:\n")
109		}
110		tw := tabwriter.NewWriter(w, 6, 8, 1, ' ', 0)
111		fmt.Fprint(tw, "Method\tCount\t\t\tAvgLat\t\t\tMaxLat\t\t\tRate\t\t\tIn (MiB/s)\t\t\tOut (MiB/s)\t\t\tErrors\t\t\n")
112		fmt.Fprint(tw, "\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\n")
113		for _, s := range sg.Snapshots {
114			fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%v\t%v\t%v\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%d\t%d\t%d\n",
115				s.Method,
116				s.CountMinute,
117				s.CountHour,
118				s.CountTotal,
119				s.AvgLatencyMinute,
120				s.AvgLatencyHour,
121				s.AvgLatencyTotal,
122				s.RPCRateMinute,
123				s.RPCRateHour,
124				s.RPCRateTotal,
125				s.InputRateMinute/bytesPerKb,
126				s.InputRateHour/bytesPerKb,
127				s.InputRateTotal/bytesPerKb,
128				s.OutputRateMinute/bytesPerKb,
129				s.OutputRateHour/bytesPerKb,
130				s.OutputRateTotal/bytesPerKb,
131				s.ErrorsMinute,
132				s.ErrorsHour,
133				s.ErrorsTotal)
134		}
135		tw.Flush()
136	}
137}
138
139// headerData contains data for the header template.
140type headerData struct {
141	Title string
142}
143
144// statsPage aggregates stats on the page for 'sent' and 'received' categories
145type statsPage struct {
146	StatGroups []*statGroup
147}
148
149// statGroup aggregates snapshots for a directional category
150type statGroup struct {
151	Direction string
152	Snapshots []*statSnapshot
153}
154
155func (s *statGroup) Len() int {
156	return len(s.Snapshots)
157}
158
159func (s *statGroup) Swap(i, j int) {
160	s.Snapshots[i], s.Snapshots[j] = s.Snapshots[j], s.Snapshots[i]
161}
162
163func (s *statGroup) Less(i, j int) bool {
164	return s.Snapshots[i].Method < s.Snapshots[j].Method
165}
166
167// statSnapshot holds the data items that are presented in a single row of RPC
168// stat information.
169type statSnapshot struct {
170	// TODO: compute hour/minute values from cumulative
171	Method           string
172	Received         bool
173	CountMinute      uint64
174	CountHour        uint64
175	CountTotal       uint64
176	AvgLatencyMinute time.Duration
177	AvgLatencyHour   time.Duration
178	AvgLatencyTotal  time.Duration
179	RPCRateMinute    float64
180	RPCRateHour      float64
181	RPCRateTotal     float64
182	InputRateMinute  float64
183	InputRateHour    float64
184	InputRateTotal   float64
185	OutputRateMinute float64
186	OutputRateHour   float64
187	OutputRateTotal  float64
188	ErrorsMinute     uint64
189	ErrorsHour       uint64
190	ErrorsTotal      uint64
191}
192
193type methodKey struct {
194	method   string
195	received bool
196}
197
198type snapExporter struct{}
199
200func (s snapExporter) ExportView(vd *view.Data) {
201	received, ok := viewType[vd.View]
202	if !ok {
203		return
204	}
205	if len(vd.Rows) == 0 {
206		return
207	}
208	ageSec := float64(time.Since(programStartTime)) / float64(time.Second)
209
210	computeRate := func(maxSec, x float64) float64 {
211		dur := ageSec
212		if maxSec > 0 && dur > maxSec {
213			dur = maxSec
214		}
215		return x / dur
216	}
217
218	convertTime := func(ms float64) time.Duration {
219		if math.IsInf(ms, 0) || math.IsNaN(ms) {
220			return 0
221		}
222		return time.Duration(float64(time.Millisecond) * ms)
223	}
224
225	haveResetErrors := make(map[string]struct{})
226
227	mu.Lock()
228	defer mu.Unlock()
229	for _, row := range vd.Rows {
230		var method string
231		for _, tag := range row.Tags {
232			if tag.Key == ocgrpc.KeyClientMethod || tag.Key == ocgrpc.KeyServerMethod {
233				method = tag.Value
234				break
235			}
236		}
237
238		key := methodKey{method: method, received: received}
239		s := snaps[key]
240		if s == nil {
241			s = &statSnapshot{Method: method, Received: received}
242			snaps[key] = s
243		}
244
245		var (
246			sum   float64
247			count float64
248		)
249		switch v := row.Data.(type) {
250		case *view.CountData:
251			sum = float64(v.Value)
252			count = float64(v.Value)
253		case *view.DistributionData:
254			sum = v.Sum()
255			count = float64(v.Count)
256		case *view.SumData:
257			sum = v.Value
258			count = v.Value
259		}
260
261		// Update field of s corresponding to the view.
262		switch vd.View {
263		case ocgrpc.ClientCompletedRPCsView:
264			if _, ok := haveResetErrors[method]; !ok {
265				haveResetErrors[method] = struct{}{}
266				s.ErrorsTotal = 0
267			}
268			for _, tag := range row.Tags {
269				if tag.Key == ocgrpc.KeyClientStatus && tag.Value != "OK" {
270					s.ErrorsTotal += uint64(count)
271				}
272			}
273
274		case ocgrpc.ClientRoundtripLatencyView:
275			s.AvgLatencyTotal = convertTime(sum / count)
276
277		case ocgrpc.ClientSentBytesPerRPCView:
278			s.OutputRateTotal = computeRate(0, sum)
279
280		case ocgrpc.ClientReceivedBytesPerRPCView:
281			s.InputRateTotal = computeRate(0, sum)
282
283		case ocgrpc.ClientSentMessagesPerRPCView:
284			s.CountTotal = uint64(count)
285			s.RPCRateTotal = computeRate(0, count)
286
287		case ocgrpc.ClientReceivedMessagesPerRPCView:
288			// currently unused
289
290		case ocgrpc.ServerCompletedRPCsView:
291			if _, ok := haveResetErrors[method]; !ok {
292				haveResetErrors[method] = struct{}{}
293				s.ErrorsTotal = 0
294			}
295			for _, tag := range row.Tags {
296				if tag.Key == ocgrpc.KeyServerStatus && tag.Value != "OK" {
297					s.ErrorsTotal += uint64(count)
298				}
299			}
300
301		case ocgrpc.ServerLatencyView:
302			s.AvgLatencyTotal = convertTime(sum / count)
303
304		case ocgrpc.ServerSentBytesPerRPCView:
305			s.OutputRateTotal = computeRate(0, sum)
306
307		case ocgrpc.ServerReceivedMessagesPerRPCView:
308			s.CountTotal = uint64(count)
309			s.RPCRateTotal = computeRate(0, count)
310
311		case ocgrpc.ServerSentMessagesPerRPCView:
312			// currently unused
313		}
314	}
315}
316
317func getStatsPage() *statsPage {
318	sentStats := statGroup{Direction: "Sent"}
319	receivedStats := statGroup{Direction: "Received"}
320	for key, sg := range snaps {
321		if key.received {
322			receivedStats.Snapshots = append(receivedStats.Snapshots, sg)
323		} else {
324			sentStats.Snapshots = append(sentStats.Snapshots, sg)
325		}
326	}
327	sort.Sort(&sentStats)
328	sort.Sort(&receivedStats)
329
330	return &statsPage{
331		StatGroups: []*statGroup{&sentStats, &receivedStats},
332	}
333}
334