1package agent
2
3import (
4	"fmt"
5	"log"
6
7	"github.com/hashicorp/serf/serf"
8)
9
10type streamClient interface {
11	Send(*responseHeader, interface{}) error
12	RegisterQuery(*serf.Query) uint64
13}
14
15// eventStream is used to stream events to a client over IPC
16type eventStream struct {
17	client  streamClient
18	eventCh chan serf.Event
19	filters []EventFilter
20	logger  *log.Logger
21	seq     uint64
22}
23
24func newEventStream(client streamClient, filters []EventFilter, seq uint64, logger *log.Logger) *eventStream {
25	es := &eventStream{
26		client:  client,
27		eventCh: make(chan serf.Event, 512),
28		filters: filters,
29		logger:  logger,
30		seq:     seq,
31	}
32	go es.stream()
33	return es
34}
35
36func (es *eventStream) HandleEvent(e serf.Event) {
37	// Check the event
38	for _, f := range es.filters {
39		if f.Invoke(e) {
40			goto HANDLE
41		}
42	}
43	return
44
45	// Do a non-blocking send
46HANDLE:
47	select {
48	case es.eventCh <- e:
49	default:
50		es.logger.Printf("[WARN] agent.ipc: Dropping event to %v", es.client)
51	}
52}
53
54func (es *eventStream) Stop() {
55	close(es.eventCh)
56}
57
58func (es *eventStream) stream() {
59	var err error
60	for event := range es.eventCh {
61		switch e := event.(type) {
62		case serf.MemberEvent:
63			err = es.sendMemberEvent(e)
64		case serf.UserEvent:
65			err = es.sendUserEvent(e)
66		case *serf.Query:
67			err = es.sendQuery(e)
68		default:
69			err = fmt.Errorf("Unknown event type: %s", event.EventType().String())
70		}
71		if err != nil {
72			es.logger.Printf("[ERR] agent.ipc: Failed to stream event to %v: %v",
73				es.client, err)
74			return
75		}
76	}
77}
78
79// sendMemberEvent is used to send a single member event
80func (es *eventStream) sendMemberEvent(me serf.MemberEvent) error {
81	members := make([]Member, 0, len(me.Members))
82	for _, m := range me.Members {
83		sm := Member{
84			Name:        m.Name,
85			Addr:        m.Addr,
86			Port:        m.Port,
87			Tags:        m.Tags,
88			Status:      m.Status.String(),
89			ProtocolMin: m.ProtocolMin,
90			ProtocolMax: m.ProtocolMax,
91			ProtocolCur: m.ProtocolCur,
92			DelegateMin: m.DelegateMin,
93			DelegateMax: m.DelegateMax,
94			DelegateCur: m.DelegateCur,
95		}
96		members = append(members, sm)
97	}
98
99	header := responseHeader{
100		Seq:   es.seq,
101		Error: "",
102	}
103	rec := memberEventRecord{
104		Event:   me.String(),
105		Members: members,
106	}
107	return es.client.Send(&header, &rec)
108}
109
110// sendUserEvent is used to send a single user event
111func (es *eventStream) sendUserEvent(ue serf.UserEvent) error {
112	header := responseHeader{
113		Seq:   es.seq,
114		Error: "",
115	}
116	rec := userEventRecord{
117		Event:    ue.EventType().String(),
118		LTime:    ue.LTime,
119		Name:     ue.Name,
120		Payload:  ue.Payload,
121		Coalesce: ue.Coalesce,
122	}
123	return es.client.Send(&header, &rec)
124}
125
126// sendQuery is used to send a single query event
127func (es *eventStream) sendQuery(q *serf.Query) error {
128	id := es.client.RegisterQuery(q)
129
130	header := responseHeader{
131		Seq:   es.seq,
132		Error: "",
133	}
134	rec := queryEventRecord{
135		Event:   q.EventType().String(),
136		ID:      id,
137		LTime:   q.LTime,
138		Name:    q.Name,
139		Payload: q.Payload,
140	}
141	return es.client.Send(&header, &rec)
142}
143