1// +build !windows 2 3package statsd 4 5import ( 6 "io/ioutil" 7 "net" 8 "os" 9 "path/filepath" 10 "reflect" 11 "testing" 12 "time" 13 14 "github.com/stretchr/testify/assert" 15 16 "github.com/stretchr/testify/suite" 17) 18 19var dogstatsdTests = []struct { 20 GlobalNamespace string 21 GlobalTags []string 22 Method string 23 Metric string 24 Value interface{} 25 Tags []string 26 Rate float64 27 Expected string 28}{ 29 {"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"}, 30 {"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"}, 31 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"}, 32 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"}, 33 {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"}, 34 {"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"}, 35 {"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"}, 36 {"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"}, 37 {"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"}, 38 {"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"}, 39 {"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"}, 40 {"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"}, 41 {"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"}, 42} 43 44type testUnixgramServer struct { 45 tmpDir string 46 *net.UnixConn 47} 48 49// newUnixgramServer returns a unix:// addr as well as a *net.UnixConn 50// server. Any error will fail the test given in param. 51func newTestUnixgramServer(t *testing.T) *testUnixgramServer { 52 dir, err := ioutil.TempDir("", "socket") 53 if err != nil { 54 t.Fatal(err) 55 } 56 57 addr := filepath.Join(dir, "dsd.socket") 58 59 udsAddr, err := net.ResolveUnixAddr("unixgram", addr) 60 if err != nil { 61 t.Fatal(err) 62 } 63 64 server, err := net.ListenUnixgram("unixgram", udsAddr) 65 if err != nil { 66 t.Fatal(err) 67 } 68 69 return &testUnixgramServer{dir, server} 70} 71 72func (ts *testUnixgramServer) Cleanup() { 73 os.RemoveAll(ts.tmpDir) // clean up 74 ts.Close() 75} 76 77func (ts *testUnixgramServer) AddrString() string { 78 return UnixAddressPrefix + ts.LocalAddr().String() 79} 80 81// UdsTestSuite contains generic tests valid for both UDS implementations 82type UdsTestSuite struct { 83 suite.Suite 84 options []Option 85} 86 87func TestUdsBlocking(t *testing.T) { 88 suite.Run(t, &UdsTestSuite{}) 89} 90 91func (suite *UdsTestSuite) TestClientUDS() { 92 server := newTestUnixgramServer(suite.T()) 93 defer server.Cleanup() 94 95 client, err := New(server.AddrString(), suite.options...) 96 if err != nil { 97 suite.T().Fatal(err) 98 } 99 100 for _, tt := range dogstatsdTests { 101 client.Namespace = tt.GlobalNamespace 102 client.Tags = tt.GlobalTags 103 method := reflect.ValueOf(client).MethodByName(tt.Method) 104 e := method.Call([]reflect.Value{ 105 reflect.ValueOf(tt.Metric), 106 reflect.ValueOf(tt.Value), 107 reflect.ValueOf(tt.Tags), 108 reflect.ValueOf(tt.Rate)})[0] 109 errInter := e.Interface() 110 if errInter != nil { 111 suite.T().Fatal(errInter.(error)) 112 } 113 114 bytes := make([]byte, 1024) 115 n, err := server.Read(bytes) 116 if err != nil { 117 suite.T().Fatal(err) 118 } 119 message := bytes[:n] 120 if string(message) != tt.Expected { 121 suite.T().Errorf("Expected: %s. Actual: %s", tt.Expected, string(message)) 122 } 123 } 124} 125 126func (suite *UdsTestSuite) TestClientUDSConcurrent() { 127 server := newTestUnixgramServer(suite.T()) 128 defer server.Cleanup() 129 130 client, err := New(server.AddrString(), WithMaxMessagesPerPayload(1)) 131 if err != nil { 132 suite.T().Fatal(err) 133 } 134 135 numWrites := 10 136 for i := 0; i < numWrites; i++ { 137 go func() { 138 err := client.Gauge("test.gauge", 1.0, []string{}, 1) 139 if err != nil { 140 suite.T().Errorf("error outputting gauge %s", err) 141 } 142 }() 143 } 144 145 expected := "test.gauge:1|g" 146 var msgs []string 147 for { 148 bytes := make([]byte, 1024) 149 server.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) 150 n, err := server.Read(bytes) 151 if err, ok := err.(net.Error); ok && err.Timeout() { 152 break 153 } 154 if err != nil { 155 suite.T().Fatal(err) 156 } 157 message := string(bytes[:n]) 158 msgs = append(msgs, message) 159 if message != expected { 160 suite.T().Errorf("Got %s, expected %s", message, expected) 161 } 162 } 163 164 if len(msgs) != numWrites { 165 suite.T().Errorf("Got %d writes, expected %d. Data: %v", len(msgs), numWrites, msgs) 166 } 167} 168 169func (suite *UdsTestSuite) TestClientUDSClose() { 170 ts := newTestUnixgramServer(suite.T()) 171 defer ts.Cleanup() 172 173 // close the server ourselves and test code when nobody's listening 174 ts.Close() 175 client, err := New(ts.AddrString()) 176 if err != nil { 177 suite.T().Fatal(err) 178 } 179 180 assert.NotPanics(suite.T(), func() { client.Close() }) 181} 182 183func TestConnectionNotUnset(t *testing.T) { 184 ts := newTestUnixgramServer(t) 185 defer ts.Cleanup() 186 187 writer, err := newUDSWriter(ts.LocalAddr().String()) 188 assert.NoError(t, err) 189 190 _, err = writer.Write([]byte("test.gauge:1|g")) 191 assert.NoError(t, err) 192 assert.NotNil(t, writer.conn) 193} 194