1package fluent
2
3import (
4	"encoding/json"
5	"io/ioutil"
6	"net"
7	"reflect"
8	"runtime"
9	"testing"
10	"time"
11
12	"github.com/bmizerany/assert"
13)
14
15const (
16	RECV_BUF_LEN = 1024
17)
18
19// Conn is net.Conn with the parameters to be verified in the test
20type Conn struct {
21	net.Conn
22	buf           []byte
23	writeDeadline time.Time
24}
25
26func (c *Conn) Read(b []byte) (int, error) {
27	copy(b, c.buf)
28	return len(c.buf), nil
29}
30
31func (c *Conn) Write(b []byte) (int, error) {
32	c.buf = make([]byte, len(b))
33	copy(c.buf, b)
34	return len(b), nil
35}
36
37func (c *Conn) SetWriteDeadline(t time.Time) error {
38	c.writeDeadline = t
39	return nil
40}
41
42func (c *Conn) Close() error {
43	return nil
44}
45
46func init() {
47	numProcs := runtime.NumCPU()
48	if numProcs < 2 {
49		numProcs = 2
50	}
51	runtime.GOMAXPROCS(numProcs)
52
53	listener, err := net.Listen("tcp", "0.0.0.0:6666")
54	if err != nil {
55		println("error listening:", err.Error())
56	}
57	go func() {
58		for {
59			conn, err := listener.Accept()
60			if err != nil {
61				println("Error accept:", err.Error())
62				return
63			}
64			go EchoFunc(conn)
65		}
66	}()
67}
68
69func EchoFunc(conn net.Conn) {
70	for {
71		buf := make([]byte, RECV_BUF_LEN)
72		n, err := conn.Read(buf)
73		if err != nil {
74			println("Error reading:", err.Error())
75			return
76		}
77		println("received ", n, " bytes of data =", string(buf))
78	}
79}
80
81func Test_New_itShouldUseDefaultConfigValuesIfNoOtherProvided(t *testing.T) {
82	f, _ := New(Config{})
83	assert.Equal(t, f.Config.FluentPort, defaultPort)
84	assert.Equal(t, f.Config.FluentHost, defaultHost)
85	assert.Equal(t, f.Config.Timeout, defaultTimeout)
86	assert.Equal(t, f.Config.WriteTimeout, defaultWriteTimeout)
87	assert.Equal(t, f.Config.BufferLimit, defaultBufferLimit)
88	assert.Equal(t, f.Config.FluentNetwork, defaultNetwork)
89	assert.Equal(t, f.Config.FluentSocketPath, defaultSocketPath)
90}
91
92func Test_New_itShouldUseUnixDomainSocketIfUnixSocketSpecified(t *testing.T) {
93	if runtime.GOOS == "windows" {
94		t.Skip("windows not supported")
95	}
96	socketFile := "/tmp/fluent-logger-golang.sock"
97	network := "unix"
98	l, err := net.Listen(network, socketFile)
99	if err != nil {
100		t.Error(err)
101		return
102	}
103	defer l.Close()
104
105	f, err := New(Config{
106		FluentNetwork:    network,
107		FluentSocketPath: socketFile})
108	if err != nil {
109		t.Error(err)
110		return
111	}
112	defer f.Close()
113	assert.Equal(t, f.Config.FluentNetwork, network)
114	assert.Equal(t, f.Config.FluentSocketPath, socketFile)
115
116	socketFile = "/tmp/fluent-logger-golang-xxx.sock"
117	network = "unixxxx"
118	fUnknown, err := New(Config{
119		FluentNetwork:    network,
120		FluentSocketPath: socketFile})
121	if _, ok := err.(net.UnknownNetworkError); !ok {
122		t.Errorf("err type: %T", err)
123	}
124	if err == nil {
125		t.Error(err)
126		fUnknown.Close()
127		return
128	}
129}
130
131func Test_New_itShouldUseConfigValuesFromArguments(t *testing.T) {
132	f, _ := New(Config{FluentPort: 6666, FluentHost: "foobarhost"})
133	assert.Equal(t, f.Config.FluentPort, 6666)
134	assert.Equal(t, f.Config.FluentHost, "foobarhost")
135}
136
137func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) {
138	f, _ := New(Config{MarshalAsJSON: true})
139	assert.Equal(t, f.Config.MarshalAsJSON, true)
140}
141
142func Test_send_WritePendingToConn(t *testing.T) {
143	f := &Fluent{Config: Config{}, reconnecting: false}
144
145	conn := &Conn{}
146	f.conn = conn
147
148	msg := "This is test writing."
149	bmsg := []byte(msg)
150	f.pending = append(f.pending, bmsg...)
151
152	err := f.send()
153	if err != nil {
154		t.Error(err)
155	}
156
157	rcv := make([]byte, len(conn.buf))
158	_, err = conn.Read(rcv)
159	if string(rcv) != msg {
160		t.Errorf("got %s, except %s", string(rcv), msg)
161	}
162}
163
164func Test_MarshalAsMsgpack(t *testing.T) {
165	f := &Fluent{Config: Config{}, reconnecting: false}
166
167	conn := &Conn{}
168	f.conn = conn
169
170	tag := "tag"
171	var data = map[string]string{
172		"foo":  "bar",
173		"hoge": "hoge"}
174	tm := time.Unix(1267867237, 0)
175	result, err := f.EncodeData(tag, tm, data)
176
177	if err != nil {
178		t.Error(err)
179	}
180	actual := string(result)
181
182	// map entries are disordered in golang
183	expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\xC0"
184	expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\xC0"
185	if actual != expected1 && actual != expected2 {
186		t.Errorf("got %x,\n         except %x\n             or %x", actual, expected1, expected2)
187	}
188}
189
190func Test_SubSecondPrecision(t *testing.T) {
191	// Setup the test subject
192	fluent := &Fluent{
193		Config: Config{
194			SubSecondPrecision: true,
195		},
196		reconnecting: false,
197	}
198	fluent.conn = &Conn{}
199
200	// Exercise the test subject
201	timestamp := time.Unix(1267867237, 256)
202	encodedData, err := fluent.EncodeData("tag", timestamp, map[string]string{
203		"foo": "bar",
204	})
205
206	// Assert no encoding errors and that the timestamp has been encoded into
207	// the message as expected.
208	if err != nil {
209		t.Error(err)
210	}
211
212	expected := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0"
213	actual := string(encodedData)
214	assert.Equal(t, expected, actual)
215}
216
217func Test_MarshalAsJSON(t *testing.T) {
218	f := &Fluent{Config: Config{MarshalAsJSON: true}, reconnecting: false}
219
220	conn := &Conn{}
221	f.conn = conn
222
223	var data = map[string]string{
224		"foo":  "bar",
225		"hoge": "hoge"}
226	tm := time.Unix(1267867237, 0)
227	result, err := f.EncodeData("tag", tm, data)
228
229	if err != nil {
230		t.Error(err)
231	}
232	// json.Encode marshals map keys in the order, so this expectation is safe
233	expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},null]`
234	actual := string(result)
235	if actual != expected {
236		t.Errorf("got %s, except %s", actual, expected)
237	}
238}
239
240func TestJsonConfig(t *testing.T) {
241	b, err := ioutil.ReadFile(`testdata/config.json`)
242	if err != nil {
243		t.Error(err)
244	}
245	var got Config
246	expect := Config{
247		FluentPort:       8888,
248		FluentHost:       "localhost",
249		FluentNetwork:    "tcp",
250		FluentSocketPath: "/var/tmp/fluent.sock",
251		Timeout:          3000,
252		WriteTimeout:     6000,
253		BufferLimit:      200,
254		RetryWait:        5,
255		MaxRetry:         3,
256		TagPrefix:        "fluent",
257		AsyncConnect:     false,
258		MarshalAsJSON:    true,
259	}
260
261	err = json.Unmarshal(b, &got)
262	if err != nil {
263		t.Error(err)
264	}
265
266	if !reflect.DeepEqual(expect, got) {
267		t.Errorf("got %v, except %v", got, expect)
268	}
269}
270
271func TestAsyncConnect(t *testing.T) {
272	type result struct {
273		f   *Fluent
274		err error
275	}
276	ch := make(chan result, 1)
277	go func() {
278		config := Config{
279			FluentPort:   8888,
280			AsyncConnect: true,
281		}
282		f, err := New(config)
283		ch <- result{f: f, err: err}
284	}()
285
286	select {
287	case res := <-ch:
288		if res.err != nil {
289			t.Errorf("fluent.New() failed with %#v", res.err)
290			return
291		}
292		res.f.Close()
293	case <-time.After(time.Millisecond * 500):
294		t.Error("AsyncConnect must not block")
295	}
296}
297
298func Test_PostWithTimeNotTimeOut(t *testing.T) {
299	f, err := New(Config{
300		FluentPort:    6666,
301		AsyncConnect:  false,
302		MarshalAsJSON: true, // easy to check equality
303	})
304	if err != nil {
305		t.Error(err)
306		return
307	}
308
309	var testData = []struct {
310		in  map[string]string
311		out string
312	}{
313		{
314			map[string]string{"foo": "bar"},
315			"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
316		},
317		{
318			map[string]string{"fuga": "bar", "hoge": "fuga"},
319			"[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},null]",
320		},
321	}
322	for _, tt := range testData {
323		conn := &Conn{}
324		f.conn = conn
325
326		err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in)
327		if err != nil {
328			t.Errorf("in=%s, err=%s", tt.in, err)
329		}
330
331		rcv := make([]byte, len(conn.buf))
332		_, err = conn.Read(rcv)
333		if string(rcv) != tt.out {
334			t.Errorf("got %s, except %s", string(rcv), tt.out)
335		}
336
337		if !conn.writeDeadline.IsZero() {
338			t.Errorf("got %s, except 0", conn.writeDeadline)
339		}
340	}
341}
342
343func Test_PostMsgpMarshaler(t *testing.T) {
344	f, err := New(Config{
345		FluentPort:    6666,
346		AsyncConnect:  false,
347		MarshalAsJSON: true, // easy to check equality
348	})
349	if err != nil {
350		t.Error(err)
351		return
352	}
353
354	var testData = []struct {
355		in  *TestMessage
356		out string
357	}{
358		{
359			&TestMessage{Foo: "bar"},
360			"[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]",
361		},
362	}
363	for _, tt := range testData {
364		conn := &Conn{}
365		f.conn = conn
366
367		err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in)
368		if err != nil {
369			t.Errorf("in=%s, err=%s", tt.in, err)
370		}
371
372		rcv := make([]byte, len(conn.buf))
373		_, err = conn.Read(rcv)
374		if string(rcv) != tt.out {
375			t.Errorf("got %s, except %s", string(rcv), tt.out)
376		}
377
378		if !conn.writeDeadline.IsZero() {
379			t.Errorf("got %s, except 0", conn.writeDeadline)
380		}
381	}
382}
383
384func Benchmark_PostWithShortMessage(b *testing.B) {
385	b.StopTimer()
386	f, err := New(Config{})
387	if err != nil {
388		panic(err)
389	}
390
391	b.StartTimer()
392	data := map[string]string{"message": "Hello World"}
393	for i := 0; i < b.N; i++ {
394		if err := f.Post("tag", data); err != nil {
395			panic(err)
396		}
397	}
398}
399
400func Benchmark_PostWithShortMessageMarshalAsJSON(b *testing.B) {
401	b.StopTimer()
402	f, err := New(Config{MarshalAsJSON: true})
403	if err != nil {
404		panic(err)
405	}
406
407	b.StartTimer()
408	data := map[string]string{"message": "Hello World"}
409	for i := 0; i < b.N; i++ {
410		if err := f.Post("tag", data); err != nil {
411			panic(err)
412		}
413	}
414}
415
416func Benchmark_LogWithChunks(b *testing.B) {
417	b.StopTimer()
418	f, err := New(Config{})
419	if err != nil {
420		panic(err)
421	}
422
423	b.StartTimer()
424	data := map[string]string{"msg": "sdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddf"}
425	for i := 0; i < b.N; i++ {
426		if err := f.Post("tag", data); err != nil {
427			panic(err)
428		}
429	}
430}
431
432func Benchmark_PostWithStruct(b *testing.B) {
433	b.StopTimer()
434	f, err := New(Config{})
435	if err != nil {
436		panic(err)
437	}
438
439	b.StartTimer()
440	data := struct {
441		Name string `msg:"msgnamename"`
442	}{
443		"john smith",
444	}
445	for i := 0; i < b.N; i++ {
446		if err := f.Post("tag", data); err != nil {
447			panic(err)
448		}
449	}
450}
451
452func Benchmark_PostWithStructTaggedAsCodec(b *testing.B) {
453	b.StopTimer()
454	f, err := New(Config{})
455	if err != nil {
456		panic(err)
457	}
458
459	b.StartTimer()
460	data := struct {
461		Name string `codec:"codecname"`
462	}{
463		"john smith",
464	}
465	for i := 0; i < b.N; i++ {
466		if err := f.Post("tag", data); err != nil {
467			panic(err)
468		}
469	}
470}
471
472func Benchmark_PostWithStructWithoutTag(b *testing.B) {
473	b.StopTimer()
474	f, err := New(Config{})
475	if err != nil {
476		panic(err)
477	}
478
479	b.StartTimer()
480	data := struct {
481		Name string
482	}{
483		"john smith",
484	}
485	for i := 0; i < b.N; i++ {
486		if err := f.Post("tag", data); err != nil {
487			panic(err)
488		}
489	}
490}
491
492func Benchmark_PostWithMapString(b *testing.B) {
493	b.StopTimer()
494	f, err := New(Config{})
495	if err != nil {
496		panic(err)
497	}
498
499	b.StartTimer()
500	data := map[string]string{
501		"foo": "bar",
502	}
503	for i := 0; i < b.N; i++ {
504		if err := f.Post("tag", data); err != nil {
505			panic(err)
506		}
507	}
508}
509
510func Benchmark_PostWithMsgpMarshaler(b *testing.B) {
511	b.StopTimer()
512	f, err := New(Config{})
513	if err != nil {
514		panic(err)
515	}
516
517	b.StartTimer()
518	data := &TestMessage{Foo: "bar"}
519	for i := 0; i < b.N; i++ {
520		if err := f.Post("tag", data); err != nil {
521			panic(err)
522		}
523	}
524}
525
526func Benchmark_PostWithMapSlice(b *testing.B) {
527	b.StopTimer()
528	f, err := New(Config{})
529	if err != nil {
530		panic(err)
531	}
532
533	b.StartTimer()
534	data := map[string][]int{
535		"foo": {1, 2, 3},
536	}
537	for i := 0; i < b.N; i++ {
538		if err := f.Post("tag", data); err != nil {
539			panic(err)
540		}
541	}
542}
543
544func Benchmark_PostWithMapStringAndTime(b *testing.B) {
545	b.StopTimer()
546	f, err := New(Config{})
547	if err != nil {
548		panic(err)
549	}
550
551	b.StartTimer()
552	data := map[string]string{
553		"foo": "bar",
554	}
555	tm := time.Now()
556	for i := 0; i < b.N; i++ {
557		if err := f.PostWithTime("tag", tm, data); err != nil {
558			panic(err)
559		}
560	}
561}
562