1package statsd
2
3import (
4	"io/ioutil"
5	"net"
6	"os"
7	"path/filepath"
8	"reflect"
9	"testing"
10	"time"
11
12	"github.com/stretchr/testify/assert"
13
14	"github.com/stretchr/testify/suite"
15)
16
17var dogstatsdTests = []struct {
18	GlobalNamespace string
19	GlobalTags      []string
20	Method          string
21	Metric          string
22	Value           interface{}
23	Tags            []string
24	Rate            float64
25	Expected        string
26}{
27	{"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1|g"},
28	{"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1|g|@0.999999"},
29	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1|g|#tagA"},
30	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1|g|#tagA,tagB"},
31	{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1|g|@0.999999|#tagA"},
32	{"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"},
33	{"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"},
34	{"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.3|h|#tagA"},
35	{"", nil, "Distribution", "test.distribution", 2.3, []string{"tagA"}, 1.0, "test.distribution:2.3|d|#tagA"},
36	{"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"},
37	{"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"},
38	{"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"},
39	{"", nil, "Count", "test.count", int64(1), []string{"hello\nworld"}, 1.0, "test.count:1|c|#helloworld"},
40}
41
42type testUnixgramServer struct {
43	tmpDir string
44	*net.UnixConn
45}
46
47// newUnixgramServer returns a unix:// addr as well as a *net.UnixConn
48// server. Any error will fail the test given in param.
49func newTestUnixgramServer(t *testing.T) *testUnixgramServer {
50	dir, err := ioutil.TempDir("", "socket")
51	if err != nil {
52		t.Fatal(err)
53	}
54
55	addr := filepath.Join(dir, "dsd.socket")
56
57	udsAddr, err := net.ResolveUnixAddr("unixgram", addr)
58	if err != nil {
59		t.Fatal(err)
60	}
61
62	server, err := net.ListenUnixgram("unixgram", udsAddr)
63	if err != nil {
64		t.Fatal(err)
65	}
66
67	return &testUnixgramServer{dir, server}
68}
69
70func (ts *testUnixgramServer) Cleanup() {
71	os.RemoveAll(ts.tmpDir) // clean up
72	ts.Close()
73}
74
75func (ts *testUnixgramServer) AddrString() string {
76	return UnixAddressPrefix + ts.LocalAddr().String()
77}
78
79// UdsTestSuite contains generic tests valid for both UDS implementations
80type UdsTestSuite struct {
81	suite.Suite
82	options []Option
83}
84
85func TestUdsBlocking(t *testing.T) {
86	suite.Run(t, &UdsTestSuite{})
87}
88
89func (suite *UdsTestSuite) TestClientUDS() {
90	server := newTestUnixgramServer(suite.T())
91	defer server.Cleanup()
92
93	client, err := New(server.AddrString(), suite.options...)
94	if err != nil {
95		suite.T().Fatal(err)
96	}
97
98	for _, tt := range dogstatsdTests {
99		client.Namespace = tt.GlobalNamespace
100		client.Tags = tt.GlobalTags
101		method := reflect.ValueOf(client).MethodByName(tt.Method)
102		e := method.Call([]reflect.Value{
103			reflect.ValueOf(tt.Metric),
104			reflect.ValueOf(tt.Value),
105			reflect.ValueOf(tt.Tags),
106			reflect.ValueOf(tt.Rate)})[0]
107		errInter := e.Interface()
108		if errInter != nil {
109			suite.T().Fatal(errInter.(error))
110		}
111
112		bytes := make([]byte, 1024)
113		n, err := server.Read(bytes)
114		if err != nil {
115			suite.T().Fatal(err)
116		}
117		message := bytes[:n]
118		if string(message) != tt.Expected {
119			suite.T().Errorf("Expected: %s. Actual: %s", tt.Expected, string(message))
120		}
121	}
122}
123
124func (suite *UdsTestSuite) TestClientUDSConcurrent() {
125	server := newTestUnixgramServer(suite.T())
126	defer server.Cleanup()
127
128	client, err := New(server.AddrString(), WithMaxMessagesPerPayload(1))
129	if err != nil {
130		suite.T().Fatal(err)
131	}
132
133	numWrites := 10
134	for i := 0; i < numWrites; i++ {
135		go func() {
136			err := client.Gauge("test.gauge", 1.0, []string{}, 1)
137			if err != nil {
138				suite.T().Errorf("error outputting gauge %s", err)
139			}
140		}()
141	}
142
143	expected := "test.gauge:1|g"
144	var msgs []string
145	for {
146		bytes := make([]byte, 1024)
147		server.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
148		n, err := server.Read(bytes)
149		if err, ok := err.(net.Error); ok && err.Timeout() {
150			break
151		}
152		if err != nil {
153			suite.T().Fatal(err)
154		}
155		message := string(bytes[:n])
156		msgs = append(msgs, message)
157		if message != expected {
158			suite.T().Errorf("Got %s, expected %s", message, expected)
159		}
160	}
161
162	if len(msgs) != numWrites {
163		suite.T().Errorf("Got %d writes, expected %d. Data: %v", len(msgs), numWrites, msgs)
164	}
165}
166
167func (suite *UdsTestSuite) TestClientUDSClose() {
168	ts := newTestUnixgramServer(suite.T())
169	defer ts.Cleanup()
170
171	// close the server ourselves and test code when nobody's listening
172	ts.Close()
173	client, err := New(ts.AddrString())
174	if err != nil {
175		suite.T().Fatal(err)
176	}
177
178	assert.NotPanics(suite.T(), func() { client.Close() })
179}
180