1package agent 2 3import ( 4 "bytes" 5 "log" 6 "net" 7 "os" 8 "testing" 9 "time" 10 11 "github.com/hashicorp/serf/serf" 12) 13 14type MockStreamClient struct { 15 headers []*responseHeader 16 objs []interface{} 17 err error 18} 19 20func (m *MockStreamClient) Send(h *responseHeader, o interface{}) error { 21 m.headers = append(m.headers, h) 22 m.objs = append(m.objs, o) 23 return m.err 24} 25 26func (m *MockStreamClient) RegisterQuery(q *serf.Query) uint64 { 27 return 42 28} 29 30func TestIPCEventStream(t *testing.T) { 31 sc := &MockStreamClient{} 32 filters := ParseEventFilter("user:foobar,member-join,query:deploy") 33 es := newEventStream(sc, filters, 42, log.New(os.Stderr, "", log.LstdFlags)) 34 defer es.Stop() 35 36 es.HandleEvent(serf.UserEvent{ 37 LTime: 123, 38 Name: "foobar", 39 Payload: []byte("test"), 40 Coalesce: true, 41 }) 42 es.HandleEvent(serf.UserEvent{ 43 LTime: 124, 44 Name: "ignore", 45 Payload: []byte("test"), 46 Coalesce: true, 47 }) 48 es.HandleEvent(serf.MemberEvent{ 49 Type: serf.EventMemberJoin, 50 Members: []serf.Member{ 51 serf.Member{ 52 Name: "TestNode", 53 Addr: net.IP([]byte{127, 0, 0, 1}), 54 Port: 12345, 55 Tags: map[string]string{"role": "node"}, 56 Status: serf.StatusAlive, 57 ProtocolMin: 0, 58 ProtocolMax: 0, 59 ProtocolCur: 0, 60 DelegateMin: 0, 61 DelegateMax: 0, 62 DelegateCur: 0, 63 }, 64 }, 65 }) 66 es.HandleEvent(&serf.Query{ 67 LTime: 125, 68 Name: "deploy", 69 Payload: []byte("test"), 70 }) 71 72 time.Sleep(5 * time.Millisecond) 73 74 if len(sc.headers) != 3 { 75 t.Fatalf("expected 2 messages!") 76 } 77 for _, h := range sc.headers { 78 if h.Seq != 42 { 79 t.Fatalf("bad seq") 80 } 81 if h.Error != "" { 82 t.Fatalf("bad err") 83 } 84 } 85 86 obj1 := sc.objs[0].(*userEventRecord) 87 if obj1.Event != "user" { 88 t.Fatalf("bad event: %#v", obj1) 89 } 90 if obj1.LTime != 123 { 91 t.Fatalf("bad event: %#v", obj1) 92 } 93 if obj1.Name != "foobar" { 94 t.Fatalf("bad event: %#v", obj1) 95 } 96 if bytes.Compare(obj1.Payload, []byte("test")) != 0 { 97 t.Fatalf("bad event: %#v", obj1) 98 } 99 if !obj1.Coalesce { 100 t.Fatalf("bad event: %#v", obj1) 101 } 102 103 obj2 := sc.objs[1].(*memberEventRecord) 104 if obj2.Event != "member-join" { 105 t.Fatalf("bad event: %#v", obj2) 106 } 107 mem1 := obj2.Members[0] 108 if mem1.Name != "TestNode" { 109 t.Fatalf("bad member: %#v", mem1) 110 } 111 if bytes.Compare(mem1.Addr, []byte{127, 0, 0, 1}) != 0 { 112 t.Fatalf("bad member: %#v", mem1) 113 } 114 if mem1.Port != 12345 { 115 t.Fatalf("bad member: %#v", mem1) 116 } 117 if mem1.Status != "alive" { 118 t.Fatalf("bad member: %#v", mem1) 119 } 120 if mem1.ProtocolMin != 0 { 121 t.Fatalf("bad member: %#v", mem1) 122 } 123 if mem1.ProtocolMax != 0 { 124 t.Fatalf("bad member: %#v", mem1) 125 } 126 if mem1.ProtocolCur != 0 { 127 t.Fatalf("bad member: %#v", mem1) 128 } 129 if mem1.DelegateMin != 0 { 130 t.Fatalf("bad member: %#v", mem1) 131 } 132 if mem1.DelegateMax != 0 { 133 t.Fatalf("bad member: %#v", mem1) 134 } 135 if mem1.DelegateCur != 0 { 136 t.Fatalf("bad member: %#v", mem1) 137 } 138 139 obj3 := sc.objs[2].(*queryEventRecord) 140 if obj3.Event != "query" { 141 t.Fatalf("bad query: %#v", obj3) 142 } 143 if obj3.ID != 42 { 144 t.Fatalf("bad query: %#v", obj3) 145 } 146 if obj3.LTime != 125 { 147 t.Fatalf("bad query: %#v", obj3) 148 } 149 if obj3.Name != "deploy" { 150 t.Fatalf("bad query: %#v", obj3) 151 } 152 if bytes.Compare(obj3.Payload, []byte("test")) != 0 { 153 t.Fatalf("bad query: %#v", obj3) 154 } 155 156} 157