1// Copyright 2016 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package cmd 16 17import ( 18 "bytes" 19 "encoding/csv" 20 "fmt" 21 "log" 22 "sort" 23 "sync" 24 "time" 25) 26 27type timeSeries struct { 28 timestamp int64 29 avgLatency time.Duration 30 throughPut int64 31} 32 33type TimeSeries []timeSeries 34 35func (t TimeSeries) Swap(i, j int) { t[i], t[j] = t[j], t[i] } 36func (t TimeSeries) Len() int { return len(t) } 37func (t TimeSeries) Less(i, j int) bool { return t[i].timestamp < t[j].timestamp } 38 39type secondPoint struct { 40 totalLatency time.Duration 41 count int64 42} 43 44type secondPoints struct { 45 mu sync.Mutex 46 tm map[int64]secondPoint 47} 48 49func newSecondPoints() *secondPoints { 50 return &secondPoints{tm: make(map[int64]secondPoint)} 51} 52 53func (sp *secondPoints) Add(ts time.Time, lat time.Duration) { 54 sp.mu.Lock() 55 defer sp.mu.Unlock() 56 57 tk := ts.Unix() 58 if v, ok := sp.tm[tk]; !ok { 59 sp.tm[tk] = secondPoint{totalLatency: lat, count: 1} 60 } else { 61 v.totalLatency += lat 62 v.count += 1 63 sp.tm[tk] = v 64 } 65} 66 67func (sp *secondPoints) getTimeSeries() TimeSeries { 68 sp.mu.Lock() 69 defer sp.mu.Unlock() 70 71 tslice := make(TimeSeries, len(sp.tm)) 72 i := 0 73 for k, v := range sp.tm { 74 tslice[i] = timeSeries{ 75 timestamp: k, 76 avgLatency: time.Duration(v.totalLatency) / time.Duration(v.count), 77 throughPut: v.count, 78 } 79 i++ 80 } 81 sort.Sort(tslice) 82 return tslice 83} 84 85func (ts TimeSeries) String() string { 86 buf := new(bytes.Buffer) 87 wr := csv.NewWriter(buf) 88 if err := wr.Write([]string{"unix_ts", "avg_latency", "throughput"}); err != nil { 89 log.Fatal(err) 90 } 91 rows := [][]string{} 92 for i := range ts { 93 row := []string{ 94 fmt.Sprintf("%d", ts[i].timestamp), 95 fmt.Sprintf("%s", ts[i].avgLatency), 96 fmt.Sprintf("%d", ts[i].throughPut), 97 } 98 rows = append(rows, row) 99 } 100 if err := wr.WriteAll(rows); err != nil { 101 log.Fatal(err) 102 } 103 wr.Flush() 104 if err := wr.Error(); err != nil { 105 log.Fatal(err) 106 } 107 return fmt.Sprintf("\nSample in one second (unix latency throughput):\n%s", buf.String()) 108} 109