1package agent 2 3import ( 4 "fmt" 5 "log" 6 7 "github.com/hashicorp/serf/serf" 8) 9 10type streamClient interface { 11 Send(*responseHeader, interface{}) error 12 RegisterQuery(*serf.Query) uint64 13} 14 15// eventStream is used to stream events to a client over IPC 16type eventStream struct { 17 client streamClient 18 eventCh chan serf.Event 19 filters []EventFilter 20 logger *log.Logger 21 seq uint64 22} 23 24func newEventStream(client streamClient, filters []EventFilter, seq uint64, logger *log.Logger) *eventStream { 25 es := &eventStream{ 26 client: client, 27 eventCh: make(chan serf.Event, 512), 28 filters: filters, 29 logger: logger, 30 seq: seq, 31 } 32 go es.stream() 33 return es 34} 35 36func (es *eventStream) HandleEvent(e serf.Event) { 37 // Check the event 38 for _, f := range es.filters { 39 if f.Invoke(e) { 40 goto HANDLE 41 } 42 } 43 return 44 45 // Do a non-blocking send 46HANDLE: 47 select { 48 case es.eventCh <- e: 49 default: 50 es.logger.Printf("[WARN] agent.ipc: Dropping event to %v", es.client) 51 } 52} 53 54func (es *eventStream) Stop() { 55 close(es.eventCh) 56} 57 58func (es *eventStream) stream() { 59 var err error 60 for event := range es.eventCh { 61 switch e := event.(type) { 62 case serf.MemberEvent: 63 err = es.sendMemberEvent(e) 64 case serf.UserEvent: 65 err = es.sendUserEvent(e) 66 case *serf.Query: 67 err = es.sendQuery(e) 68 default: 69 err = fmt.Errorf("Unknown event type: %s", event.EventType().String()) 70 } 71 if err != nil { 72 es.logger.Printf("[ERR] agent.ipc: Failed to stream event to %v: %v", 73 es.client, err) 74 return 75 } 76 } 77} 78 79// sendMemberEvent is used to send a single member event 80func (es *eventStream) sendMemberEvent(me serf.MemberEvent) error { 81 members := make([]Member, 0, len(me.Members)) 82 for _, m := range me.Members { 83 sm := Member{ 84 Name: m.Name, 85 Addr: m.Addr, 86 Port: m.Port, 87 Tags: m.Tags, 88 Status: m.Status.String(), 89 ProtocolMin: m.ProtocolMin, 90 ProtocolMax: m.ProtocolMax, 91 ProtocolCur: m.ProtocolCur, 92 DelegateMin: m.DelegateMin, 93 DelegateMax: m.DelegateMax, 94 DelegateCur: m.DelegateCur, 95 } 96 members = append(members, sm) 97 } 98 99 header := responseHeader{ 100 Seq: es.seq, 101 Error: "", 102 } 103 rec := memberEventRecord{ 104 Event: me.String(), 105 Members: members, 106 } 107 return es.client.Send(&header, &rec) 108} 109 110// sendUserEvent is used to send a single user event 111func (es *eventStream) sendUserEvent(ue serf.UserEvent) error { 112 header := responseHeader{ 113 Seq: es.seq, 114 Error: "", 115 } 116 rec := userEventRecord{ 117 Event: ue.EventType().String(), 118 LTime: ue.LTime, 119 Name: ue.Name, 120 Payload: ue.Payload, 121 Coalesce: ue.Coalesce, 122 } 123 return es.client.Send(&header, &rec) 124} 125 126// sendQuery is used to send a single query event 127func (es *eventStream) sendQuery(q *serf.Query) error { 128 id := es.client.RegisterQuery(q) 129 130 header := responseHeader{ 131 Seq: es.seq, 132 Error: "", 133 } 134 rec := queryEventRecord{ 135 Event: q.EventType().String(), 136 ID: id, 137 LTime: q.LTime, 138 Name: q.Name, 139 Payload: q.Payload, 140 } 141 return es.client.Send(&header, &rec) 142} 143