1// +build !windows
2
3package statsd
4
5import (
6	"io/ioutil"
7	"net"
8	"os"
9	"path/filepath"
10	"reflect"
11	"testing"
12	"time"
13
14	"github.com/stretchr/testify/assert"
15
16	"github.com/stretchr/testify/suite"
17)
18
19var dogstatsdTests = []struct {
20	GlobalNamespace string
21	GlobalTags      []string
22	Method          string
23	Metric          string
24	Value           interface{}
25	Tags            []string
26	Rate            float64
27	Expected        string
28}{
29	{"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"},
30	{"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"},
31	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"},
32	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"},
33	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"},
34	{"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"},
35	{"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"},
36	{"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"},
37	{"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"},
38	{"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"},
39	{"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"},
40	{"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"},
41	{"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"},
42}
43
44type testUnixgramServer struct {
45	tmpDir string
46	*net.UnixConn
47}
48
49// newUnixgramServer returns a unix:// addr as well as a *net.UnixConn
50// server. Any error will fail the test given in param.
51func newTestUnixgramServer(t *testing.T) *testUnixgramServer {
52	dir, err := ioutil.TempDir("", "socket")
53	if err != nil {
54		t.Fatal(err)
55	}
56
57	addr := filepath.Join(dir, "dsd.socket")
58
59	udsAddr, err := net.ResolveUnixAddr("unixgram", addr)
60	if err != nil {
61		t.Fatal(err)
62	}
63
64	server, err := net.ListenUnixgram("unixgram", udsAddr)
65	if err != nil {
66		t.Fatal(err)
67	}
68
69	return &testUnixgramServer{dir, server}
70}
71
72func (ts *testUnixgramServer) Cleanup() {
73	os.RemoveAll(ts.tmpDir) // clean up
74	ts.Close()
75}
76
77func (ts *testUnixgramServer) AddrString() string {
78	return UnixAddressPrefix + ts.LocalAddr().String()
79}
80
81// UdsTestSuite contains generic tests valid for both UDS implementations
82type UdsTestSuite struct {
83	suite.Suite
84	options []Option
85}
86
87func TestUdsBlocking(t *testing.T) {
88	suite.Run(t, &UdsTestSuite{})
89}
90
91func (suite *UdsTestSuite) TestClientUDS() {
92	server := newTestUnixgramServer(suite.T())
93	defer server.Cleanup()
94
95	client, err := New(server.AddrString(), suite.options...)
96	if err != nil {
97		suite.T().Fatal(err)
98	}
99
100	for _, tt := range dogstatsdTests {
101		client.Namespace = tt.GlobalNamespace
102		client.Tags = tt.GlobalTags
103		method := reflect.ValueOf(client).MethodByName(tt.Method)
104		e := method.Call([]reflect.Value{
105			reflect.ValueOf(tt.Metric),
106			reflect.ValueOf(tt.Value),
107			reflect.ValueOf(tt.Tags),
108			reflect.ValueOf(tt.Rate)})[0]
109		errInter := e.Interface()
110		if errInter != nil {
111			suite.T().Fatal(errInter.(error))
112		}
113
114		bytes := make([]byte, 1024)
115		n, err := server.Read(bytes)
116		if err != nil {
117			suite.T().Fatal(err)
118		}
119		message := bytes[:n]
120		if string(message) != tt.Expected {
121			suite.T().Errorf("Expected: %s. Actual: %s", tt.Expected, string(message))
122		}
123	}
124}
125
126func (suite *UdsTestSuite) TestClientUDSConcurrent() {
127	server := newTestUnixgramServer(suite.T())
128	defer server.Cleanup()
129
130	client, err := New(server.AddrString(), WithMaxMessagesPerPayload(1))
131	if err != nil {
132		suite.T().Fatal(err)
133	}
134
135	numWrites := 10
136	for i := 0; i < numWrites; i++ {
137		go func() {
138			err := client.Gauge("test.gauge", 1.0, []string{}, 1)
139			if err != nil {
140				suite.T().Errorf("error outputting gauge %s", err)
141			}
142		}()
143	}
144
145	expected := "test.gauge:1|g"
146	var msgs []string
147	for {
148		bytes := make([]byte, 1024)
149		server.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
150		n, err := server.Read(bytes)
151		if err, ok := err.(net.Error); ok && err.Timeout() {
152			break
153		}
154		if err != nil {
155			suite.T().Fatal(err)
156		}
157		message := string(bytes[:n])
158		msgs = append(msgs, message)
159		if message != expected {
160			suite.T().Errorf("Got %s, expected %s", message, expected)
161		}
162	}
163
164	if len(msgs) != numWrites {
165		suite.T().Errorf("Got %d writes, expected %d. Data: %v", len(msgs), numWrites, msgs)
166	}
167}
168
169func (suite *UdsTestSuite) TestClientUDSClose() {
170	ts := newTestUnixgramServer(suite.T())
171	defer ts.Cleanup()
172
173	// close the server ourselves and test code when nobody's listening
174	ts.Close()
175	client, err := New(ts.AddrString())
176	if err != nil {
177		suite.T().Fatal(err)
178	}
179
180	assert.NotPanics(suite.T(), func() { client.Close() })
181}
182
183func TestConnectionNotUnset(t *testing.T) {
184	ts := newTestUnixgramServer(t)
185	defer ts.Cleanup()
186
187	writer, err := newUDSWriter(ts.LocalAddr().String())
188	assert.NoError(t, err)
189
190	_, err = writer.Write([]byte("test.gauge:1|g"))
191	assert.NoError(t, err)
192	assert.NotNil(t, writer.conn)
193}
194