1package stressClient 2 3import ( 4 "log" 5 "strconv" 6 "time" 7 8 influx "github.com/influxdata/influxdb/client/v2" 9) 10 11// reporting.go contains functions to emit tags and points from various parts of stressClient 12// These points are then written to the ("_%v", sf.TestName) database 13 14// These are the tags that stressClient adds to any response points 15func (sc *stressClient) tags(statementID string) map[string]string { 16 tags := map[string]string{ 17 "number_targets": fmtInt(len(sc.addresses)), 18 "precision": sc.precision, 19 "writers": fmtInt(sc.wconc), 20 "readers": fmtInt(sc.qconc), 21 "test_id": sc.testID, 22 "statement_id": statementID, 23 "write_interval": sc.wdelay, 24 "query_interval": sc.qdelay, 25 } 26 return tags 27} 28 29// These are the tags that the StressTest adds to any response points 30func (st *StressTest) tags() map[string]string { 31 tags := map[string]string{ 32 "precision": st.Precision, 33 "batch_size": fmtInt(st.BatchSize), 34 } 35 return tags 36} 37 38// This function makes a *client.Point for reporting on writes 39func (sc *stressClient) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point { 40 41 tags := sumTags(sc.tags(statementID), addedTags) 42 43 fields := map[string]interface{}{ 44 "status_code": statusCode, 45 "response_time_ns": responseTime.Nanoseconds(), 46 "num_bytes": writeBytes, 47 } 48 49 point, err := influx.NewPoint("write", tags, fields, time.Now()) 50 51 if err != nil { 52 log.Fatalf("Error creating write results point\n error: %v\n", err) 53 } 54 55 return point 56} 57 58// This function makes a *client.Point for reporting on queries 59func (sc *stressClient) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point { 60 61 tags := sumTags(sc.tags(statementID), addedTags) 62 63 fields := map[string]interface{}{ 64 "status_code": statusCode, 65 "num_bytes": len(body), 66 "response_time_ns": responseTime.Nanoseconds(), 67 } 68 69 point, err := influx.NewPoint("query", tags, fields, time.Now()) 70 71 if err != nil { 72 log.Fatalf("Error creating query results point\n error: %v\n", err) 73 } 74 75 return point 76} 77 78// Adds two map[string]string together 79func sumTags(tags1, tags2 map[string]string) map[string]string { 80 tags := make(map[string]string) 81 // Add all tags from first map to return map 82 for k, v := range tags1 { 83 tags[k] = v 84 } 85 // Add all tags from second map to return map 86 for k, v := range tags2 { 87 tags[k] = v 88 } 89 return tags 90} 91 92// Turns an int into a string 93func fmtInt(i int) string { 94 return strconv.FormatInt(int64(i), 10) 95} 96