1package eventstream
2
3import (
4	"bytes"
5	"encoding/hex"
6	"io/ioutil"
7	"os"
8	"reflect"
9	"testing"
10)
11
12func TestWriteEncodedFromDecoded(t *testing.T) {
13	cases, err := readPositiveTests("testdata")
14	if err != nil {
15		t.Fatalf("failed to load positive tests, %v", err)
16	}
17
18	for _, c := range cases {
19		f, err := ioutil.TempFile(os.TempDir(), "encoded_positive_"+c.Name)
20		if err != nil {
21			t.Fatalf("failed to open %q, %v", c.Name, err)
22		}
23
24		encoder := NewEncoder(f)
25
26		msg := c.Decoded.Message()
27		if err := encoder.Encode(msg); err != nil {
28			t.Errorf("failed to encode %q, %v", c.Name, err)
29		}
30
31		if err = f.Close(); err != nil {
32			t.Errorf("expected %v, got %v", "no error", err)
33		}
34		if err = os.Remove(f.Name()); err != nil {
35			t.Errorf("expected %v, got %v", "no error", err)
36		}
37	}
38}
39
40func TestDecoder_Decode(t *testing.T) {
41	cases, err := readPositiveTests("testdata")
42	if err != nil {
43		t.Fatalf("failed to load positive tests, %v", err)
44	}
45
46	for _, c := range cases {
47		decoder := NewDecoder(bytes.NewBuffer(c.Encoded))
48
49		msg, err := decoder.Decode(nil)
50		if err != nil {
51			t.Fatalf("%s, expect no decode error, got %v", c.Name, err)
52		}
53
54		raw, err := msg.rawMessage() // rawMessage will fail if payload read CRC fails
55		if err != nil {
56			t.Fatalf("%s, failed to get raw decoded message %v", c.Name, err)
57		}
58
59		if e, a := c.Decoded.Length, raw.Length; e != a {
60			t.Errorf("%s, expect %v length, got %v", c.Name, e, a)
61		}
62		if e, a := c.Decoded.HeadersLen, raw.HeadersLen; e != a {
63			t.Errorf("%s, expect %v HeadersLen, got %v", c.Name, e, a)
64		}
65		if e, a := c.Decoded.PreludeCRC, raw.PreludeCRC; e != a {
66			t.Errorf("%s, expect %v PreludeCRC, got %v", c.Name, e, a)
67		}
68		if e, a := Headers(c.Decoded.Headers), msg.Headers; !reflect.DeepEqual(e, a) {
69			t.Errorf("%s, expect %v headers, got %v", c.Name, e, a)
70		}
71		if e, a := c.Decoded.Payload, raw.Payload; !bytes.Equal(e, a) {
72			t.Errorf("%s, expect %v payload, got %v", c.Name, e, a)
73		}
74		if e, a := c.Decoded.CRC, raw.CRC; e != a {
75			t.Errorf("%s, expect %v CRC, got %v", c.Name, e, a)
76		}
77	}
78}
79
80func TestDecoder_Decode_Negative(t *testing.T) {
81	cases, err := readNegativeTests("testdata")
82	if err != nil {
83		t.Fatalf("failed to load negative tests, %v", err)
84	}
85
86	for _, c := range cases {
87		decoder := NewDecoder(bytes.NewBuffer(c.Encoded))
88
89		msg, err := decoder.Decode(nil)
90		if err == nil {
91			rawMsg, rawMsgErr := msg.rawMessage()
92			t.Fatalf("%s, expect error, got none, %s,\n%s\n%#v, %v\n", c.Name,
93				c.Err, hex.Dump(c.Encoded), rawMsg, rawMsgErr)
94		}
95	}
96}
97
98var testEncodedMsg = []byte{0, 0, 0, 61, 0, 0, 0, 32, 7, 253, 131, 150, 12, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 16, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 123, 39, 102, 111, 111, 39, 58, 39, 98, 97, 114, 39, 125, 141, 156, 8, 177}
99
100func TestDecoder_DecodeMultipleMessages(t *testing.T) {
101	const (
102		expectMsgCount   = 10
103		expectPayloadLen = 13
104	)
105
106	r := bytes.NewBuffer(nil)
107	for i := 0; i < expectMsgCount; i++ {
108		r.Write(testEncodedMsg)
109	}
110
111	decoder := NewDecoder(r)
112
113	var err error
114	var msg Message
115	var count int
116	for {
117		msg, err = decoder.Decode(nil)
118		if err != nil {
119			break
120		}
121		count++
122
123		if e, a := expectPayloadLen, len(msg.Payload); e != a {
124			t.Errorf("expect %v payload len, got %v", e, a)
125		}
126
127		if e, a := []byte(`{'foo':'bar'}`), msg.Payload; !bytes.Equal(e, a) {
128			t.Errorf("expect %v payload, got %v", e, a)
129		}
130	}
131
132	type causer interface {
133		Cause() error
134	}
135	if err != nil && count != expectMsgCount {
136		t.Fatalf("expect, no error, got %v", err)
137	}
138
139	if e, a := expectMsgCount, count; e != a {
140		t.Errorf("expect %v messages read, got %v", e, a)
141	}
142}
143
144func BenchmarkDecode(b *testing.B) {
145	r := bytes.NewReader(testEncodedMsg)
146	decoder := NewDecoder(r)
147	payloadBuf := make([]byte, 0, 5*1024)
148	b.ResetTimer()
149
150	for i := 0; i < b.N; i++ {
151		msg, err := decoder.Decode(payloadBuf)
152		if err != nil {
153			b.Fatal(err)
154		}
155
156		// Release the payload buffer
157		payloadBuf = msg.Payload[0:0]
158		r.Seek(0, 0)
159	}
160}
161
162func BenchmarkDecode_NoPayloadBuf(b *testing.B) {
163	r := bytes.NewReader(testEncodedMsg)
164	decoder := NewDecoder(r)
165	b.ResetTimer()
166
167	for i := 0; i < b.N; i++ {
168		_, err := decoder.Decode(nil)
169		if err != nil {
170			b.Fatal(err)
171		}
172		r.Seek(0, 0)
173	}
174}
175