1// Copyright 2016 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package cmd
16
17import (
18	"bytes"
19	"encoding/csv"
20	"fmt"
21	"log"
22	"sort"
23	"sync"
24	"time"
25)
26
27type timeSeries struct {
28	timestamp  int64
29	avgLatency time.Duration
30	throughPut int64
31}
32
33type TimeSeries []timeSeries
34
35func (t TimeSeries) Swap(i, j int)      { t[i], t[j] = t[j], t[i] }
36func (t TimeSeries) Len() int           { return len(t) }
37func (t TimeSeries) Less(i, j int) bool { return t[i].timestamp < t[j].timestamp }
38
39type secondPoint struct {
40	totalLatency time.Duration
41	count        int64
42}
43
44type secondPoints struct {
45	mu sync.Mutex
46	tm map[int64]secondPoint
47}
48
49func newSecondPoints() *secondPoints {
50	return &secondPoints{tm: make(map[int64]secondPoint)}
51}
52
53func (sp *secondPoints) Add(ts time.Time, lat time.Duration) {
54	sp.mu.Lock()
55	defer sp.mu.Unlock()
56
57	tk := ts.Unix()
58	if v, ok := sp.tm[tk]; !ok {
59		sp.tm[tk] = secondPoint{totalLatency: lat, count: 1}
60	} else {
61		v.totalLatency += lat
62		v.count += 1
63		sp.tm[tk] = v
64	}
65}
66
67func (sp *secondPoints) getTimeSeries() TimeSeries {
68	sp.mu.Lock()
69	defer sp.mu.Unlock()
70
71	tslice := make(TimeSeries, len(sp.tm))
72	i := 0
73	for k, v := range sp.tm {
74		tslice[i] = timeSeries{
75			timestamp:  k,
76			avgLatency: time.Duration(v.totalLatency) / time.Duration(v.count),
77			throughPut: v.count,
78		}
79		i++
80	}
81	sort.Sort(tslice)
82	return tslice
83}
84
85func (ts TimeSeries) String() string {
86	buf := new(bytes.Buffer)
87	wr := csv.NewWriter(buf)
88	if err := wr.Write([]string{"unix_ts", "avg_latency", "throughput"}); err != nil {
89		log.Fatal(err)
90	}
91	rows := [][]string{}
92	for i := range ts {
93		row := []string{
94			fmt.Sprintf("%d", ts[i].timestamp),
95			fmt.Sprintf("%s", ts[i].avgLatency),
96			fmt.Sprintf("%d", ts[i].throughPut),
97		}
98		rows = append(rows, row)
99	}
100	if err := wr.WriteAll(rows); err != nil {
101		log.Fatal(err)
102	}
103	wr.Flush()
104	if err := wr.Error(); err != nil {
105		log.Fatal(err)
106	}
107	return fmt.Sprintf("\nSample in one second (unix latency throughput):\n%s", buf.String())
108}
109