1// Copyright 2017, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// 15 16package zpages 17 18import ( 19 "fmt" 20 "io" 21 "log" 22 "math" 23 "net/http" 24 "sort" 25 "sync" 26 "text/tabwriter" 27 "time" 28 29 "go.opencensus.io/plugin/ocgrpc" 30 "go.opencensus.io/stats/view" 31) 32 33const bytesPerKb = 1024 34 35var ( 36 programStartTime = time.Now() 37 mu sync.Mutex // protects snaps 38 snaps = make(map[methodKey]*statSnapshot) 39 40 // viewType lists the views we are interested in for RPC stats. 41 // A view's map value indicates whether that view contains data for received 42 // RPCs. 43 viewType = map[*view.View]bool{ 44 ocgrpc.ClientCompletedRPCsView: false, 45 ocgrpc.ClientSentBytesPerRPCView: false, 46 ocgrpc.ClientSentMessagesPerRPCView: false, 47 ocgrpc.ClientReceivedBytesPerRPCView: false, 48 ocgrpc.ClientReceivedMessagesPerRPCView: false, 49 ocgrpc.ClientRoundtripLatencyView: false, 50 ocgrpc.ServerCompletedRPCsView: true, 51 ocgrpc.ServerReceivedBytesPerRPCView: true, 52 ocgrpc.ServerReceivedMessagesPerRPCView: true, 53 ocgrpc.ServerSentBytesPerRPCView: true, 54 ocgrpc.ServerSentMessagesPerRPCView: true, 55 ocgrpc.ServerLatencyView: true, 56 } 57) 58 59func registerRPCViews() { 60 views := make([]*view.View, 0, len(viewType)) 61 for v := range viewType { 62 views = append(views, v) 63 } 64 if err := view.Register(views...); err != nil { 65 log.Printf("error subscribing to views: %v", err) 66 } 67 view.RegisterExporter(snapExporter{}) 68} 69 70func rpczHandler(w http.ResponseWriter, r *http.Request) { 71 w.Header().Set("Content-Type", "text/html; charset=utf-8") 72 WriteHTMLRpczPage(w) 73} 74 75// WriteHTMLRpczPage writes an HTML document to w containing per-method RPC stats. 76func WriteHTMLRpczPage(w io.Writer) { 77 if err := headerTemplate.Execute(w, headerData{Title: "RPC Stats"}); err != nil { 78 log.Printf("zpages: executing template: %v", err) 79 } 80 WriteHTMLRpczSummary(w) 81 if err := footerTemplate.Execute(w, nil); err != nil { 82 log.Printf("zpages: executing template: %v", err) 83 } 84} 85 86// WriteHTMLRpczSummary writes HTML to w containing per-method RPC stats. 87// 88// It includes neither a header nor footer, so you can embed this data in other pages. 89func WriteHTMLRpczSummary(w io.Writer) { 90 mu.Lock() 91 if err := statsTemplate.Execute(w, getStatsPage()); err != nil { 92 log.Printf("zpages: executing template: %v", err) 93 } 94 mu.Unlock() 95} 96 97// WriteTextRpczPage writes formatted text to w containing per-method RPC stats. 98func WriteTextRpczPage(w io.Writer) { 99 mu.Lock() 100 defer mu.Unlock() 101 page := getStatsPage() 102 103 for i, sg := range page.StatGroups { 104 switch i { 105 case 0: 106 fmt.Fprint(w, "Sent:\n") 107 case 1: 108 fmt.Fprint(w, "\nReceived:\n") 109 } 110 tw := tabwriter.NewWriter(w, 6, 8, 1, ' ', 0) 111 fmt.Fprint(tw, "Method\tCount\t\t\tAvgLat\t\t\tMaxLat\t\t\tRate\t\t\tIn (MiB/s)\t\t\tOut (MiB/s)\t\t\tErrors\t\t\n") 112 fmt.Fprint(tw, "\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\n") 113 for _, s := range sg.Snapshots { 114 fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%v\t%v\t%v\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%d\t%d\t%d\n", 115 s.Method, 116 s.CountMinute, 117 s.CountHour, 118 s.CountTotal, 119 s.AvgLatencyMinute, 120 s.AvgLatencyHour, 121 s.AvgLatencyTotal, 122 s.RPCRateMinute, 123 s.RPCRateHour, 124 s.RPCRateTotal, 125 s.InputRateMinute/bytesPerKb, 126 s.InputRateHour/bytesPerKb, 127 s.InputRateTotal/bytesPerKb, 128 s.OutputRateMinute/bytesPerKb, 129 s.OutputRateHour/bytesPerKb, 130 s.OutputRateTotal/bytesPerKb, 131 s.ErrorsMinute, 132 s.ErrorsHour, 133 s.ErrorsTotal) 134 } 135 tw.Flush() 136 } 137} 138 139// headerData contains data for the header template. 140type headerData struct { 141 Title string 142} 143 144// statsPage aggregates stats on the page for 'sent' and 'received' categories 145type statsPage struct { 146 StatGroups []*statGroup 147} 148 149// statGroup aggregates snapshots for a directional category 150type statGroup struct { 151 Direction string 152 Snapshots []*statSnapshot 153} 154 155func (s *statGroup) Len() int { 156 return len(s.Snapshots) 157} 158 159func (s *statGroup) Swap(i, j int) { 160 s.Snapshots[i], s.Snapshots[j] = s.Snapshots[j], s.Snapshots[i] 161} 162 163func (s *statGroup) Less(i, j int) bool { 164 return s.Snapshots[i].Method < s.Snapshots[j].Method 165} 166 167// statSnapshot holds the data items that are presented in a single row of RPC 168// stat information. 169type statSnapshot struct { 170 // TODO: compute hour/minute values from cumulative 171 Method string 172 Received bool 173 CountMinute uint64 174 CountHour uint64 175 CountTotal uint64 176 AvgLatencyMinute time.Duration 177 AvgLatencyHour time.Duration 178 AvgLatencyTotal time.Duration 179 RPCRateMinute float64 180 RPCRateHour float64 181 RPCRateTotal float64 182 InputRateMinute float64 183 InputRateHour float64 184 InputRateTotal float64 185 OutputRateMinute float64 186 OutputRateHour float64 187 OutputRateTotal float64 188 ErrorsMinute uint64 189 ErrorsHour uint64 190 ErrorsTotal uint64 191} 192 193type methodKey struct { 194 method string 195 received bool 196} 197 198type snapExporter struct{} 199 200func (s snapExporter) ExportView(vd *view.Data) { 201 received, ok := viewType[vd.View] 202 if !ok { 203 return 204 } 205 if len(vd.Rows) == 0 { 206 return 207 } 208 ageSec := float64(time.Since(programStartTime)) / float64(time.Second) 209 210 computeRate := func(maxSec, x float64) float64 { 211 dur := ageSec 212 if maxSec > 0 && dur > maxSec { 213 dur = maxSec 214 } 215 return x / dur 216 } 217 218 convertTime := func(ms float64) time.Duration { 219 if math.IsInf(ms, 0) || math.IsNaN(ms) { 220 return 0 221 } 222 return time.Duration(float64(time.Millisecond) * ms) 223 } 224 225 haveResetErrors := make(map[string]struct{}) 226 227 mu.Lock() 228 defer mu.Unlock() 229 for _, row := range vd.Rows { 230 var method string 231 for _, tag := range row.Tags { 232 if tag.Key == ocgrpc.KeyClientMethod || tag.Key == ocgrpc.KeyServerMethod { 233 method = tag.Value 234 break 235 } 236 } 237 238 key := methodKey{method: method, received: received} 239 s := snaps[key] 240 if s == nil { 241 s = &statSnapshot{Method: method, Received: received} 242 snaps[key] = s 243 } 244 245 var ( 246 sum float64 247 count float64 248 ) 249 switch v := row.Data.(type) { 250 case *view.CountData: 251 sum = float64(v.Value) 252 count = float64(v.Value) 253 case *view.DistributionData: 254 sum = v.Sum() 255 count = float64(v.Count) 256 case *view.SumData: 257 sum = v.Value 258 count = v.Value 259 } 260 261 // Update field of s corresponding to the view. 262 switch vd.View { 263 case ocgrpc.ClientCompletedRPCsView: 264 if _, ok := haveResetErrors[method]; !ok { 265 haveResetErrors[method] = struct{}{} 266 s.ErrorsTotal = 0 267 } 268 for _, tag := range row.Tags { 269 if tag.Key == ocgrpc.KeyClientStatus && tag.Value != "OK" { 270 s.ErrorsTotal += uint64(count) 271 } 272 } 273 274 case ocgrpc.ClientRoundtripLatencyView: 275 s.AvgLatencyTotal = convertTime(sum / count) 276 277 case ocgrpc.ClientSentBytesPerRPCView: 278 s.OutputRateTotal = computeRate(0, sum) 279 280 case ocgrpc.ClientReceivedBytesPerRPCView: 281 s.InputRateTotal = computeRate(0, sum) 282 283 case ocgrpc.ClientSentMessagesPerRPCView: 284 s.CountTotal = uint64(count) 285 s.RPCRateTotal = computeRate(0, count) 286 287 case ocgrpc.ClientReceivedMessagesPerRPCView: 288 // currently unused 289 290 case ocgrpc.ServerCompletedRPCsView: 291 if _, ok := haveResetErrors[method]; !ok { 292 haveResetErrors[method] = struct{}{} 293 s.ErrorsTotal = 0 294 } 295 for _, tag := range row.Tags { 296 if tag.Key == ocgrpc.KeyServerStatus && tag.Value != "OK" { 297 s.ErrorsTotal += uint64(count) 298 } 299 } 300 301 case ocgrpc.ServerLatencyView: 302 s.AvgLatencyTotal = convertTime(sum / count) 303 304 case ocgrpc.ServerSentBytesPerRPCView: 305 s.OutputRateTotal = computeRate(0, sum) 306 307 case ocgrpc.ServerReceivedMessagesPerRPCView: 308 s.CountTotal = uint64(count) 309 s.RPCRateTotal = computeRate(0, count) 310 311 case ocgrpc.ServerSentMessagesPerRPCView: 312 // currently unused 313 } 314 } 315} 316 317func getStatsPage() *statsPage { 318 sentStats := statGroup{Direction: "Sent"} 319 receivedStats := statGroup{Direction: "Received"} 320 for key, sg := range snaps { 321 if key.received { 322 receivedStats.Snapshots = append(receivedStats.Snapshots, sg) 323 } else { 324 sentStats.Snapshots = append(sentStats.Snapshots, sg) 325 } 326 } 327 sort.Sort(&sentStats) 328 sort.Sort(&receivedStats) 329 330 return &statsPage{ 331 StatGroups: []*statGroup{&sentStats, &receivedStats}, 332 } 333} 334