1package stressClient
2
3import (
4	"log"
5	"strconv"
6	"time"
7
8	influx "github.com/influxdata/influxdb/client/v2"
9)
10
11// reporting.go contains functions to emit tags and points from various parts of stressClient
12// These points are then written to the ("_%v", sf.TestName) database
13
14// These are the tags that stressClient adds to any response points
15func (sc *stressClient) tags(statementID string) map[string]string {
16	tags := map[string]string{
17		"number_targets": fmtInt(len(sc.addresses)),
18		"precision":      sc.precision,
19		"writers":        fmtInt(sc.wconc),
20		"readers":        fmtInt(sc.qconc),
21		"test_id":        sc.testID,
22		"statement_id":   statementID,
23		"write_interval": sc.wdelay,
24		"query_interval": sc.qdelay,
25	}
26	return tags
27}
28
29// These are the tags that the StressTest adds to any response points
30func (st *StressTest) tags() map[string]string {
31	tags := map[string]string{
32		"precision":  st.Precision,
33		"batch_size": fmtInt(st.BatchSize),
34	}
35	return tags
36}
37
38// This function makes a *client.Point for reporting on writes
39func (sc *stressClient) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point {
40
41	tags := sumTags(sc.tags(statementID), addedTags)
42
43	fields := map[string]interface{}{
44		"status_code":      statusCode,
45		"response_time_ns": responseTime.Nanoseconds(),
46		"num_bytes":        writeBytes,
47	}
48
49	point, err := influx.NewPoint("write", tags, fields, time.Now())
50
51	if err != nil {
52		log.Fatalf("Error creating write results point\n  error: %v\n", err)
53	}
54
55	return point
56}
57
58// This function makes a *client.Point for reporting on queries
59func (sc *stressClient) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point {
60
61	tags := sumTags(sc.tags(statementID), addedTags)
62
63	fields := map[string]interface{}{
64		"status_code":      statusCode,
65		"num_bytes":        len(body),
66		"response_time_ns": responseTime.Nanoseconds(),
67	}
68
69	point, err := influx.NewPoint("query", tags, fields, time.Now())
70
71	if err != nil {
72		log.Fatalf("Error creating query results point\n  error: %v\n", err)
73	}
74
75	return point
76}
77
78// Adds two map[string]string together
79func sumTags(tags1, tags2 map[string]string) map[string]string {
80	tags := make(map[string]string)
81	// Add all tags from first map to return map
82	for k, v := range tags1 {
83		tags[k] = v
84	}
85	// Add all tags from second map to return map
86	for k, v := range tags2 {
87		tags[k] = v
88	}
89	return tags
90}
91
92// Turns an int into a string
93func fmtInt(i int) string {
94	return strconv.FormatInt(int64(i), 10)
95}
96