1package statsd 2 3import ( 4 "io/ioutil" 5 "net" 6 "os" 7 "path/filepath" 8 "reflect" 9 "testing" 10 "time" 11 12 "github.com/stretchr/testify/assert" 13 14 "github.com/stretchr/testify/suite" 15) 16 17var dogstatsdTests = []struct { 18 GlobalNamespace string 19 GlobalTags []string 20 Method string 21 Metric string 22 Value interface{} 23 Tags []string 24 Rate float64 25 Expected string 26}{ 27 {"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"}, 28 {"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"}, 29 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"}, 30 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"}, 31 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"}, 32 {"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"}, 33 {"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"}, 34 {"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"}, 35 {"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"}, 36 {"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"}, 37 {"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"}, 38 {"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"}, 39 {"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"}, 40} 41 42type testUnixgramServer struct { 43 tmpDir string 44 *net.UnixConn 45} 46 47// newUnixgramServer returns a unix:// addr as well as a *net.UnixConn 48// server. Any error will fail the test given in param. 49func newTestUnixgramServer(t *testing.T) *testUnixgramServer { 50 dir, err := ioutil.TempDir("", "socket") 51 if err != nil { 52 t.Fatal(err) 53 } 54 55 addr := filepath.Join(dir, "dsd.socket") 56 57 udsAddr, err := net.ResolveUnixAddr("unixgram", addr) 58 if err != nil { 59 t.Fatal(err) 60 } 61 62 server, err := net.ListenUnixgram("unixgram", udsAddr) 63 if err != nil { 64 t.Fatal(err) 65 } 66 67 return &testUnixgramServer{dir, server} 68} 69 70func (ts *testUnixgramServer) Cleanup() { 71 os.RemoveAll(ts.tmpDir) // clean up 72 ts.Close() 73} 74 75func (ts *testUnixgramServer) AddrString() string { 76 return UnixAddressPrefix + ts.LocalAddr().String() 77} 78 79// UdsTestSuite contains generic tests valid for both UDS implementations 80type UdsTestSuite struct { 81 suite.Suite 82 options []Option 83} 84 85func TestUdsBlocking(t *testing.T) { 86 suite.Run(t, &UdsTestSuite{}) 87} 88 89func (suite *UdsTestSuite) TestClientUDS() { 90 server := newTestUnixgramServer(suite.T()) 91 defer server.Cleanup() 92 93 client, err := New(server.AddrString(), suite.options...) 94 if err != nil { 95 suite.T().Fatal(err) 96 } 97 98 for _, tt := range dogstatsdTests { 99 client.Namespace = tt.GlobalNamespace 100 client.Tags = tt.GlobalTags 101 method := reflect.ValueOf(client).MethodByName(tt.Method) 102 e := method.Call([]reflect.Value{ 103 reflect.ValueOf(tt.Metric), 104 reflect.ValueOf(tt.Value), 105 reflect.ValueOf(tt.Tags), 106 reflect.ValueOf(tt.Rate)})[0] 107 errInter := e.Interface() 108 if errInter != nil { 109 suite.T().Fatal(errInter.(error)) 110 } 111 112 bytes := make([]byte, 1024) 113 n, err := server.Read(bytes) 114 if err != nil { 115 suite.T().Fatal(err) 116 } 117 message := bytes[:n] 118 if string(message) != tt.Expected { 119 suite.T().Errorf("Expected: %s. Actual: %s", tt.Expected, string(message)) 120 } 121 } 122} 123 124func (suite *UdsTestSuite) TestClientUDSConcurrent() { 125 server := newTestUnixgramServer(suite.T()) 126 defer server.Cleanup() 127 128 client, err := New(server.AddrString(), WithMaxMessagesPerPayload(1)) 129 if err != nil { 130 suite.T().Fatal(err) 131 } 132 133 numWrites := 10 134 for i := 0; i < numWrites; i++ { 135 go func() { 136 err := client.Gauge("test.gauge", 1.0, []string{}, 1) 137 if err != nil { 138 suite.T().Errorf("error outputting gauge %s", err) 139 } 140 }() 141 } 142 143 expected := "test.gauge:1|g" 144 var msgs []string 145 for { 146 bytes := make([]byte, 1024) 147 server.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) 148 n, err := server.Read(bytes) 149 if err, ok := err.(net.Error); ok && err.Timeout() { 150 break 151 } 152 if err != nil { 153 suite.T().Fatal(err) 154 } 155 message := string(bytes[:n]) 156 msgs = append(msgs, message) 157 if message != expected { 158 suite.T().Errorf("Got %s, expected %s", message, expected) 159 } 160 } 161 162 if len(msgs) != numWrites { 163 suite.T().Errorf("Got %d writes, expected %d. Data: %v", len(msgs), numWrites, msgs) 164 } 165} 166 167func (suite *UdsTestSuite) TestClientUDSClose() { 168 ts := newTestUnixgramServer(suite.T()) 169 defer ts.Cleanup() 170 171 // close the server ourselves and test code when nobody's listening 172 ts.Close() 173 client, err := New(ts.AddrString()) 174 if err != nil { 175 suite.T().Fatal(err) 176 } 177 178 assert.NotPanics(suite.T(), func() { client.Close() }) 179} 180