1package agent
2
3import (
4	"bytes"
5	"log"
6	"net"
7	"os"
8	"testing"
9	"time"
10
11	"github.com/hashicorp/serf/serf"
12)
13
14type MockStreamClient struct {
15	headers []*responseHeader
16	objs    []interface{}
17	err     error
18}
19
20func (m *MockStreamClient) Send(h *responseHeader, o interface{}) error {
21	m.headers = append(m.headers, h)
22	m.objs = append(m.objs, o)
23	return m.err
24}
25
26func (m *MockStreamClient) RegisterQuery(q *serf.Query) uint64 {
27	return 42
28}
29
30func TestIPCEventStream(t *testing.T) {
31	sc := &MockStreamClient{}
32	filters := ParseEventFilter("user:foobar,member-join,query:deploy")
33	es := newEventStream(sc, filters, 42, log.New(os.Stderr, "", log.LstdFlags))
34	defer es.Stop()
35
36	es.HandleEvent(serf.UserEvent{
37		LTime:    123,
38		Name:     "foobar",
39		Payload:  []byte("test"),
40		Coalesce: true,
41	})
42	es.HandleEvent(serf.UserEvent{
43		LTime:    124,
44		Name:     "ignore",
45		Payload:  []byte("test"),
46		Coalesce: true,
47	})
48	es.HandleEvent(serf.MemberEvent{
49		Type: serf.EventMemberJoin,
50		Members: []serf.Member{
51			serf.Member{
52				Name:        "TestNode",
53				Addr:        net.IP([]byte{127, 0, 0, 1}),
54				Port:        12345,
55				Tags:        map[string]string{"role": "node"},
56				Status:      serf.StatusAlive,
57				ProtocolMin: 0,
58				ProtocolMax: 0,
59				ProtocolCur: 0,
60				DelegateMin: 0,
61				DelegateMax: 0,
62				DelegateCur: 0,
63			},
64		},
65	})
66	es.HandleEvent(&serf.Query{
67		LTime:   125,
68		Name:    "deploy",
69		Payload: []byte("test"),
70	})
71
72	time.Sleep(5 * time.Millisecond)
73
74	if len(sc.headers) != 3 {
75		t.Fatalf("expected 2 messages!")
76	}
77	for _, h := range sc.headers {
78		if h.Seq != 42 {
79			t.Fatalf("bad seq")
80		}
81		if h.Error != "" {
82			t.Fatalf("bad err")
83		}
84	}
85
86	obj1 := sc.objs[0].(*userEventRecord)
87	if obj1.Event != "user" {
88		t.Fatalf("bad event: %#v", obj1)
89	}
90	if obj1.LTime != 123 {
91		t.Fatalf("bad event: %#v", obj1)
92	}
93	if obj1.Name != "foobar" {
94		t.Fatalf("bad event: %#v", obj1)
95	}
96	if bytes.Compare(obj1.Payload, []byte("test")) != 0 {
97		t.Fatalf("bad event: %#v", obj1)
98	}
99	if !obj1.Coalesce {
100		t.Fatalf("bad event: %#v", obj1)
101	}
102
103	obj2 := sc.objs[1].(*memberEventRecord)
104	if obj2.Event != "member-join" {
105		t.Fatalf("bad event: %#v", obj2)
106	}
107	mem1 := obj2.Members[0]
108	if mem1.Name != "TestNode" {
109		t.Fatalf("bad member: %#v", mem1)
110	}
111	if bytes.Compare(mem1.Addr, []byte{127, 0, 0, 1}) != 0 {
112		t.Fatalf("bad member: %#v", mem1)
113	}
114	if mem1.Port != 12345 {
115		t.Fatalf("bad member: %#v", mem1)
116	}
117	if mem1.Status != "alive" {
118		t.Fatalf("bad member: %#v", mem1)
119	}
120	if mem1.ProtocolMin != 0 {
121		t.Fatalf("bad member: %#v", mem1)
122	}
123	if mem1.ProtocolMax != 0 {
124		t.Fatalf("bad member: %#v", mem1)
125	}
126	if mem1.ProtocolCur != 0 {
127		t.Fatalf("bad member: %#v", mem1)
128	}
129	if mem1.DelegateMin != 0 {
130		t.Fatalf("bad member: %#v", mem1)
131	}
132	if mem1.DelegateMax != 0 {
133		t.Fatalf("bad member: %#v", mem1)
134	}
135	if mem1.DelegateCur != 0 {
136		t.Fatalf("bad member: %#v", mem1)
137	}
138
139	obj3 := sc.objs[2].(*queryEventRecord)
140	if obj3.Event != "query" {
141		t.Fatalf("bad query: %#v", obj3)
142	}
143	if obj3.ID != 42 {
144		t.Fatalf("bad query: %#v", obj3)
145	}
146	if obj3.LTime != 125 {
147		t.Fatalf("bad query: %#v", obj3)
148	}
149	if obj3.Name != "deploy" {
150		t.Fatalf("bad query: %#v", obj3)
151	}
152	if bytes.Compare(obj3.Payload, []byte("test")) != 0 {
153		t.Fatalf("bad query: %#v", obj3)
154	}
155
156}
157