1package agent
2
3/*
4 The agent exposes an IPC mechanism that is used for both controlling
5 Serf as well as providing a fast streaming mechanism for events. This
6 allows other applications to easily leverage Serf as the event layer.
7
8 We additionally make use of the IPC layer to also handle RPC calls from
9 the CLI to unify the code paths. This results in a split Request/Response
10 as well as streaming mode of operation.
11
12 The system is fairly simple, each client opens a TCP connection to the
13 agent. The connection is initialized with a handshake which establishes
14 the protocol version being used. This is to allow for future changes to
15 the protocol.
16
17 Once initialized, clients send commands and wait for responses. Certain
18 commands will cause the client to subscribe to events, and those will be
19 pushed down the socket as they are received. This provides a low-latency
20 mechanism for applications to send and receive events, while also providing
21 a flexible control mechanism for Serf.
22*/
23
24import (
25	"bufio"
26	"fmt"
27	"io"
28	"log"
29	"net"
30	"os"
31	"regexp"
32	"strings"
33	"sync"
34	"sync/atomic"
35	"time"
36
37	"github.com/armon/go-metrics"
38	"github.com/hashicorp/go-msgpack/codec"
39	"github.com/hashicorp/logutils"
40	"github.com/hashicorp/serf/coordinate"
41	"github.com/hashicorp/serf/serf"
42)
43
44const (
45	MinIPCVersion = 1
46	MaxIPCVersion = 1
47)
48
49const (
50	handshakeCommand       = "handshake"
51	eventCommand           = "event"
52	forceLeaveCommand      = "force-leave"
53	joinCommand            = "join"
54	membersCommand         = "members"
55	membersFilteredCommand = "members-filtered"
56	streamCommand          = "stream"
57	stopCommand            = "stop"
58	monitorCommand         = "monitor"
59	leaveCommand           = "leave"
60	installKeyCommand      = "install-key"
61	useKeyCommand          = "use-key"
62	removeKeyCommand       = "remove-key"
63	listKeysCommand        = "list-keys"
64	tagsCommand            = "tags"
65	queryCommand           = "query"
66	respondCommand         = "respond"
67	authCommand            = "auth"
68	statsCommand           = "stats"
69	getCoordinateCommand   = "get-coordinate"
70)
71
72const (
73	unsupportedCommand    = "Unsupported command"
74	unsupportedIPCVersion = "Unsupported IPC version"
75	duplicateHandshake    = "Handshake already performed"
76	handshakeRequired     = "Handshake required"
77	monitorExists         = "Monitor already exists"
78	invalidFilter         = "Invalid event filter"
79	streamExists          = "Stream with given sequence exists"
80	invalidQueryID        = "No pending queries matching ID"
81	authRequired          = "Authentication required"
82	invalidAuthToken      = "Invalid authentication token"
83)
84
85const (
86	queryRecordAck      = "ack"
87	queryRecordResponse = "response"
88	queryRecordDone     = "done"
89)
90
91// Request header is sent before each request
92type requestHeader struct {
93	Command string
94	Seq     uint64
95}
96
97// Response header is sent before each response
98type responseHeader struct {
99	Seq   uint64
100	Error string
101}
102
103type handshakeRequest struct {
104	Version int32
105}
106
107type authRequest struct {
108	AuthKey string
109}
110
111type coordinateRequest struct {
112	Node string
113}
114
115type coordinateResponse struct {
116	Coord coordinate.Coordinate
117	Ok    bool
118}
119
120type eventRequest struct {
121	Name     string
122	Payload  []byte
123	Coalesce bool
124}
125
126type forceLeaveRequest struct {
127	Node  string
128	Prune bool
129}
130
131type joinRequest struct {
132	Existing []string
133	Replay   bool
134}
135
136type joinResponse struct {
137	Num int32
138}
139
140type membersFilteredRequest struct {
141	Tags   map[string]string
142	Status string
143	Name   string
144}
145
146type membersResponse struct {
147	Members []Member
148}
149
150type keyRequest struct {
151	Key string
152}
153
154type keyResponse struct {
155	Messages map[string]string
156	Keys     map[string]int
157	NumNodes int
158	NumErr   int
159	NumResp  int
160}
161
162type monitorRequest struct {
163	LogLevel string
164}
165
166type streamRequest struct {
167	Type string
168}
169
170type stopRequest struct {
171	Stop uint64
172}
173
174type tagsRequest struct {
175	Tags       map[string]string
176	DeleteTags []string
177}
178
179type queryRequest struct {
180	FilterNodes []string
181	FilterTags  map[string]string
182	RequestAck  bool
183	RelayFactor uint8
184	Timeout     time.Duration
185	Name        string
186	Payload     []byte
187}
188
189type respondRequest struct {
190	ID      uint64
191	Payload []byte
192}
193
194type queryRecord struct {
195	Type    string
196	From    string
197	Payload []byte
198}
199
200type logRecord struct {
201	Log string
202}
203
204type userEventRecord struct {
205	Event    string
206	LTime    serf.LamportTime
207	Name     string
208	Payload  []byte
209	Coalesce bool
210}
211
212type queryEventRecord struct {
213	Event   string
214	ID      uint64 // ID is opaque to client, used to respond
215	LTime   serf.LamportTime
216	Name    string
217	Payload []byte
218}
219
220type Member struct {
221	Name        string
222	Addr        net.IP
223	Port        uint16
224	Tags        map[string]string
225	Status      string
226	ProtocolMin uint8
227	ProtocolMax uint8
228	ProtocolCur uint8
229	DelegateMin uint8
230	DelegateMax uint8
231	DelegateCur uint8
232}
233
234type memberEventRecord struct {
235	Event   string
236	Members []Member
237}
238
239type AgentIPC struct {
240	sync.Mutex
241	agent     *Agent
242	authKey   string
243	clients   map[string]*IPCClient
244	listener  net.Listener
245	logger    *log.Logger
246	logWriter *logWriter
247	stop      bool
248	stopCh    chan struct{}
249}
250
251type IPCClient struct {
252	queryID      uint64 // Used to increment query IDs
253	name         string
254	conn         net.Conn
255	reader       *bufio.Reader
256	writer       *bufio.Writer
257	dec          *codec.Decoder
258	enc          *codec.Encoder
259	writeLock    sync.Mutex
260	version      int32 // From the handshake, 0 before
261	logStreamer  *logStream
262	eventStreams map[uint64]*eventStream
263
264	pendingQueries map[uint64]*serf.Query
265	queryLock      sync.Mutex
266
267	didAuth bool // Did we get an auth token yet?
268}
269
270// send is used to send an object using the MsgPack encoding. send
271// is serialized to prevent write overlaps, while properly buffering.
272func (c *IPCClient) Send(header *responseHeader, obj interface{}) error {
273	c.writeLock.Lock()
274	defer c.writeLock.Unlock()
275
276	if err := c.enc.Encode(header); err != nil {
277		return err
278	}
279
280	if obj != nil {
281		if err := c.enc.Encode(obj); err != nil {
282			return err
283		}
284	}
285
286	if err := c.writer.Flush(); err != nil {
287		return err
288	}
289
290	return nil
291}
292
293func (c *IPCClient) String() string {
294	return fmt.Sprintf("ipc.client: %v", c.conn.RemoteAddr())
295}
296
297// nextQueryID safely generates a new query ID
298func (c *IPCClient) nextQueryID() uint64 {
299	return atomic.AddUint64(&c.queryID, 1)
300}
301
302// RegisterQuery is used to register a pending query that may
303// get a response. The ID of the query is returned
304func (c *IPCClient) RegisterQuery(q *serf.Query) uint64 {
305	// Generate a unique-per-client ID
306	id := c.nextQueryID()
307
308	// Ensure the query deadline is in the future
309	timeout := q.Deadline().Sub(time.Now())
310	if timeout < 0 {
311		return id
312	}
313
314	// Register the query
315	c.queryLock.Lock()
316	c.pendingQueries[id] = q
317	c.queryLock.Unlock()
318
319	// Setup a timer to deregister after the timeout
320	time.AfterFunc(timeout, func() {
321		c.queryLock.Lock()
322		delete(c.pendingQueries, id)
323		c.queryLock.Unlock()
324	})
325	return id
326}
327
328// NewAgentIPC is used to create a new Agent IPC handler
329func NewAgentIPC(agent *Agent, authKey string, listener net.Listener,
330	logOutput io.Writer, logWriter *logWriter) *AgentIPC {
331	if logOutput == nil {
332		logOutput = os.Stderr
333	}
334	ipc := &AgentIPC{
335		agent:     agent,
336		authKey:   authKey,
337		clients:   make(map[string]*IPCClient),
338		listener:  listener,
339		logger:    log.New(logOutput, "", log.LstdFlags),
340		logWriter: logWriter,
341		stopCh:    make(chan struct{}),
342	}
343	go ipc.listen()
344	return ipc
345}
346
347// Shutdown is used to shutdown the IPC layer
348func (i *AgentIPC) Shutdown() {
349	i.Lock()
350	defer i.Unlock()
351
352	if i.stop {
353		return
354	}
355
356	i.stop = true
357	close(i.stopCh)
358	i.listener.Close()
359
360	// Close the existing connections
361	for _, client := range i.clients {
362		client.conn.Close()
363	}
364}
365
366// listen is a long running routine that listens for new clients
367func (i *AgentIPC) listen() {
368	for {
369		conn, err := i.listener.Accept()
370		if err != nil {
371			if i.stop {
372				return
373			}
374			i.logger.Printf("[ERR] agent.ipc: Failed to accept client: %v", err)
375			continue
376		}
377		i.logger.Printf("[INFO] agent.ipc: Accepted client: %v", conn.RemoteAddr())
378		metrics.IncrCounter([]string{"agent", "ipc", "accept"}, 1)
379
380		// Wrap the connection in a client
381		client := &IPCClient{
382			name:           conn.RemoteAddr().String(),
383			conn:           conn,
384			reader:         bufio.NewReader(conn),
385			writer:         bufio.NewWriter(conn),
386			eventStreams:   make(map[uint64]*eventStream),
387			pendingQueries: make(map[uint64]*serf.Query),
388		}
389		client.dec = codec.NewDecoder(client.reader,
390			&codec.MsgpackHandle{RawToString: true, WriteExt: true})
391		client.enc = codec.NewEncoder(client.writer,
392			&codec.MsgpackHandle{RawToString: true, WriteExt: true})
393
394		// Register the client
395		i.Lock()
396		if !i.stop {
397			i.clients[client.name] = client
398			go i.handleClient(client)
399		} else {
400			conn.Close()
401		}
402		i.Unlock()
403	}
404}
405
406// deregisterClient is called to cleanup after a client disconnects
407func (i *AgentIPC) deregisterClient(client *IPCClient) {
408	// Close the socket
409	client.conn.Close()
410
411	// Remove from the clients list
412	i.Lock()
413	delete(i.clients, client.name)
414	i.Unlock()
415
416	// Remove from the log writer
417	if client.logStreamer != nil {
418		i.logWriter.DeregisterHandler(client.logStreamer)
419		client.logStreamer.Stop()
420	}
421
422	// Remove from event handlers
423	for _, es := range client.eventStreams {
424		i.agent.DeregisterEventHandler(es)
425		es.Stop()
426	}
427}
428
429// handleClient is a long running routine that handles a single client
430func (i *AgentIPC) handleClient(client *IPCClient) {
431	defer i.deregisterClient(client)
432	var reqHeader requestHeader
433	for {
434		// Decode the header
435		if err := client.dec.Decode(&reqHeader); err != nil {
436			if !i.stop {
437				// The second part of this if is to block socket
438				// errors from Windows which appear to happen every
439				// time there is an EOF.
440				if err != io.EOF && !strings.Contains(strings.ToLower(err.Error()), "wsarecv") {
441					i.logger.Printf("[ERR] agent.ipc: failed to decode request header: %v", err)
442				}
443			}
444			return
445		}
446
447		// Evaluate the command
448		if err := i.handleRequest(client, &reqHeader); err != nil {
449			i.logger.Printf("[ERR] agent.ipc: Failed to evaluate request: %v", err)
450			return
451		}
452	}
453}
454
455// handleRequest is used to evaluate a single client command
456func (i *AgentIPC) handleRequest(client *IPCClient, reqHeader *requestHeader) error {
457	// Look for a command field
458	command := reqHeader.Command
459	seq := reqHeader.Seq
460
461	// Ensure the handshake is performed before other commands
462	if command != handshakeCommand && client.version == 0 {
463		respHeader := responseHeader{Seq: seq, Error: handshakeRequired}
464		client.Send(&respHeader, nil)
465		return fmt.Errorf(handshakeRequired)
466	}
467	metrics.IncrCounter([]string{"agent", "ipc", "command"}, 1)
468
469	// Ensure the client has authenticated after the handshake if necessary
470	if i.authKey != "" && !client.didAuth && command != authCommand && command != handshakeCommand {
471		i.logger.Printf("[WARN] agent.ipc: Client sending commands before auth")
472		respHeader := responseHeader{Seq: seq, Error: authRequired}
473		client.Send(&respHeader, nil)
474		return nil
475	}
476
477	// Dispatch command specific handlers
478	switch command {
479	case handshakeCommand:
480		return i.handleHandshake(client, seq)
481
482	case authCommand:
483		return i.handleAuth(client, seq)
484
485	case eventCommand:
486		return i.handleEvent(client, seq)
487
488	case membersCommand, membersFilteredCommand:
489		return i.handleMembers(client, command, seq)
490
491	case streamCommand:
492		return i.handleStream(client, seq)
493
494	case monitorCommand:
495		return i.handleMonitor(client, seq)
496
497	case stopCommand:
498		return i.handleStop(client, seq)
499
500	case forceLeaveCommand:
501		return i.handleForceLeave(client, seq)
502
503	case joinCommand:
504		return i.handleJoin(client, seq)
505
506	case leaveCommand:
507		return i.handleLeave(client, seq)
508
509	case installKeyCommand:
510		return i.handleInstallKey(client, seq)
511
512	case useKeyCommand:
513		return i.handleUseKey(client, seq)
514
515	case removeKeyCommand:
516		return i.handleRemoveKey(client, seq)
517
518	case listKeysCommand:
519		return i.handleListKeys(client, seq)
520
521	case tagsCommand:
522		return i.handleTags(client, seq)
523
524	case queryCommand:
525		return i.handleQuery(client, seq)
526
527	case respondCommand:
528		return i.handleRespond(client, seq)
529
530	case statsCommand:
531		return i.handleStats(client, seq)
532
533	case getCoordinateCommand:
534		return i.handleGetCoordinate(client, seq)
535
536	default:
537		respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
538		client.Send(&respHeader, nil)
539		return fmt.Errorf("command '%s' not recognized", command)
540	}
541}
542
543func (i *AgentIPC) handleHandshake(client *IPCClient, seq uint64) error {
544	var req handshakeRequest
545	if err := client.dec.Decode(&req); err != nil {
546		return fmt.Errorf("decode failed: %v", err)
547	}
548
549	resp := responseHeader{
550		Seq:   seq,
551		Error: "",
552	}
553
554	// Check the version
555	if req.Version < MinIPCVersion || req.Version > MaxIPCVersion {
556		resp.Error = unsupportedIPCVersion
557	} else if client.version != 0 {
558		resp.Error = duplicateHandshake
559	} else {
560		client.version = req.Version
561	}
562	return client.Send(&resp, nil)
563}
564
565func (i *AgentIPC) handleAuth(client *IPCClient, seq uint64) error {
566	var req authRequest
567	if err := client.dec.Decode(&req); err != nil {
568		return fmt.Errorf("decode failed: %v", err)
569	}
570
571	resp := responseHeader{
572		Seq:   seq,
573		Error: "",
574	}
575
576	// Check the token matches
577	if req.AuthKey == i.authKey {
578		client.didAuth = true
579	} else {
580		resp.Error = invalidAuthToken
581	}
582	return client.Send(&resp, nil)
583}
584
585func (i *AgentIPC) handleEvent(client *IPCClient, seq uint64) error {
586	var req eventRequest
587	if err := client.dec.Decode(&req); err != nil {
588		return fmt.Errorf("decode failed: %v", err)
589	}
590
591	// Attempt the send
592	err := i.agent.UserEvent(req.Name, req.Payload, req.Coalesce)
593
594	// Respond
595	resp := responseHeader{
596		Seq:   seq,
597		Error: errToString(err),
598	}
599	return client.Send(&resp, nil)
600}
601
602func (i *AgentIPC) handleForceLeave(client *IPCClient, seq uint64) error {
603	var req forceLeaveRequest
604	if err := client.dec.Decode(&req); err != nil {
605		return fmt.Errorf("decode failed: %v", err)
606	}
607
608	// Attempt leave
609	var err error
610	if req.Prune {
611		err = i.agent.ForceLeavePrune(req.Node)
612	} else {
613		err = i.agent.ForceLeave(req.Node)
614	}
615
616	// Respond
617	resp := responseHeader{
618		Seq:   seq,
619		Error: errToString(err),
620	}
621	return client.Send(&resp, nil)
622}
623
624func (i *AgentIPC) handleJoin(client *IPCClient, seq uint64) error {
625	var req joinRequest
626	if err := client.dec.Decode(&req); err != nil {
627		return fmt.Errorf("decode failed: %v", err)
628	}
629
630	// Attempt the join
631	num, err := i.agent.Join(req.Existing, req.Replay)
632
633	// Respond
634	header := responseHeader{
635		Seq:   seq,
636		Error: errToString(err),
637	}
638	resp := joinResponse{
639		Num: int32(num),
640	}
641	return client.Send(&header, &resp)
642}
643
644func (i *AgentIPC) handleMembers(client *IPCClient, command string, seq uint64) error {
645	serf := i.agent.Serf()
646	raw := serf.Members()
647	members := make([]Member, 0, len(raw))
648
649	if command == membersFilteredCommand {
650		var req membersFilteredRequest
651		err := client.dec.Decode(&req)
652		if err != nil {
653			return fmt.Errorf("decode failed: %v", err)
654		}
655		raw, err = i.filterMembers(raw, req.Tags, req.Status, req.Name)
656		if err != nil {
657			return err
658		}
659	}
660
661	for _, m := range raw {
662		sm := Member{
663			Name:        m.Name,
664			Addr:        m.Addr,
665			Port:        m.Port,
666			Tags:        m.Tags,
667			Status:      m.Status.String(),
668			ProtocolMin: m.ProtocolMin,
669			ProtocolMax: m.ProtocolMax,
670			ProtocolCur: m.ProtocolCur,
671			DelegateMin: m.DelegateMin,
672			DelegateMax: m.DelegateMax,
673			DelegateCur: m.DelegateCur,
674		}
675		members = append(members, sm)
676	}
677
678	header := responseHeader{
679		Seq:   seq,
680		Error: "",
681	}
682	resp := membersResponse{
683		Members: members,
684	}
685	return client.Send(&header, &resp)
686}
687
688func (i *AgentIPC) filterMembers(members []serf.Member, tags map[string]string,
689	status string, name string) ([]serf.Member, error) {
690
691	result := make([]serf.Member, 0, len(members))
692
693	// Pre-compile all the regular expressions
694	tagsRe := make(map[string]*regexp.Regexp)
695	for tag, expr := range tags {
696		re, err := regexp.Compile(fmt.Sprintf("^%s$", expr))
697		if err != nil {
698			return nil, fmt.Errorf("Failed to compile regex: %v", err)
699		}
700		tagsRe[tag] = re
701	}
702
703	statusRe, err := regexp.Compile(fmt.Sprintf("^%s$", status))
704	if err != nil {
705		return nil, fmt.Errorf("Failed to compile regex: %v", err)
706	}
707
708	nameRe, err := regexp.Compile(fmt.Sprintf("^%s$", name))
709	if err != nil {
710		return nil, fmt.Errorf("Failed to compile regex: %v", err)
711	}
712
713OUTER:
714	for _, m := range members {
715		// Check if tags were passed, and if they match
716		for tag := range tags {
717			if !tagsRe[tag].MatchString(m.Tags[tag]) {
718				continue OUTER
719			}
720		}
721
722		// Check if status matches
723		if status != "" && !statusRe.MatchString(m.Status.String()) {
724			continue
725		}
726
727		// Check if node name matches
728		if name != "" && !nameRe.MatchString(m.Name) {
729			continue
730		}
731
732		// Made it past the filters!
733		result = append(result, m)
734	}
735
736	return result, nil
737}
738
739func (i *AgentIPC) handleInstallKey(client *IPCClient, seq uint64) error {
740	var req keyRequest
741	if err := client.dec.Decode(&req); err != nil {
742		return fmt.Errorf("decode failed: %v", err)
743	}
744
745	queryResp, err := i.agent.InstallKey(req.Key)
746
747	header := responseHeader{
748		Seq:   seq,
749		Error: errToString(err),
750	}
751	resp := keyResponse{
752		Messages: queryResp.Messages,
753		NumNodes: queryResp.NumNodes,
754		NumErr:   queryResp.NumErr,
755		NumResp:  queryResp.NumResp,
756	}
757
758	return client.Send(&header, &resp)
759}
760
761func (i *AgentIPC) handleUseKey(client *IPCClient, seq uint64) error {
762	var req keyRequest
763	if err := client.dec.Decode(&req); err != nil {
764		return fmt.Errorf("decode failed: %v", err)
765	}
766
767	queryResp, err := i.agent.UseKey(req.Key)
768
769	header := responseHeader{
770		Seq:   seq,
771		Error: errToString(err),
772	}
773	resp := keyResponse{
774		Messages: queryResp.Messages,
775		NumNodes: queryResp.NumNodes,
776		NumErr:   queryResp.NumErr,
777		NumResp:  queryResp.NumResp,
778	}
779
780	return client.Send(&header, &resp)
781}
782
783func (i *AgentIPC) handleRemoveKey(client *IPCClient, seq uint64) error {
784	var req keyRequest
785	if err := client.dec.Decode(&req); err != nil {
786		return fmt.Errorf("decode failed: %v", err)
787	}
788
789	queryResp, err := i.agent.RemoveKey(req.Key)
790
791	header := responseHeader{
792		Seq:   seq,
793		Error: errToString(err),
794	}
795	resp := keyResponse{
796		Messages: queryResp.Messages,
797		NumNodes: queryResp.NumNodes,
798		NumErr:   queryResp.NumErr,
799		NumResp:  queryResp.NumResp,
800	}
801
802	return client.Send(&header, &resp)
803}
804
805func (i *AgentIPC) handleListKeys(client *IPCClient, seq uint64) error {
806	queryResp, err := i.agent.ListKeys()
807
808	header := responseHeader{
809		Seq:   seq,
810		Error: errToString(err),
811	}
812	resp := keyResponse{
813		Messages: queryResp.Messages,
814		Keys:     queryResp.Keys,
815		NumNodes: queryResp.NumNodes,
816		NumErr:   queryResp.NumErr,
817		NumResp:  queryResp.NumResp,
818	}
819
820	return client.Send(&header, &resp)
821}
822
823func (i *AgentIPC) handleStream(client *IPCClient, seq uint64) error {
824	var es *eventStream
825	var req streamRequest
826	if err := client.dec.Decode(&req); err != nil {
827		return fmt.Errorf("decode failed: %v", err)
828	}
829
830	resp := responseHeader{
831		Seq:   seq,
832		Error: "",
833	}
834
835	// Create the event filters
836	filters := ParseEventFilter(req.Type)
837	for _, f := range filters {
838		if !f.Valid() {
839			resp.Error = invalidFilter
840			goto SEND
841		}
842	}
843
844	// Check if there is an existing stream
845	if _, ok := client.eventStreams[seq]; ok {
846		resp.Error = streamExists
847		goto SEND
848	}
849
850	// Create an event streamer
851	es = newEventStream(client, filters, seq, i.logger)
852	client.eventStreams[seq] = es
853
854	// Register with the agent. Defer so that we can respond before
855	// registration, avoids any possible race condition
856	defer i.agent.RegisterEventHandler(es)
857
858SEND:
859	return client.Send(&resp, nil)
860}
861
862func (i *AgentIPC) handleMonitor(client *IPCClient, seq uint64) error {
863	var req monitorRequest
864	if err := client.dec.Decode(&req); err != nil {
865		return fmt.Errorf("decode failed: %v", err)
866	}
867
868	resp := responseHeader{
869		Seq:   seq,
870		Error: "",
871	}
872
873	// Upper case the log level
874	req.LogLevel = strings.ToUpper(req.LogLevel)
875
876	// Create a level filter
877	filter := LevelFilter()
878	filter.MinLevel = logutils.LogLevel(req.LogLevel)
879	if !ValidateLevelFilter(filter.MinLevel, filter) {
880		resp.Error = fmt.Sprintf("Unknown log level: %s", filter.MinLevel)
881		goto SEND
882	}
883
884	// Check if there is an existing monitor
885	if client.logStreamer != nil {
886		resp.Error = monitorExists
887		goto SEND
888	}
889
890	// Create a log streamer
891	client.logStreamer = newLogStream(client, filter, seq, i.logger)
892
893	// Register with the log writer. Defer so that we can respond before
894	// registration, avoids any possible race condition
895	defer i.logWriter.RegisterHandler(client.logStreamer)
896
897SEND:
898	return client.Send(&resp, nil)
899}
900
901func (i *AgentIPC) handleStop(client *IPCClient, seq uint64) error {
902	var req stopRequest
903	if err := client.dec.Decode(&req); err != nil {
904		return fmt.Errorf("decode failed: %v", err)
905	}
906
907	// Remove a log monitor if any
908	if client.logStreamer != nil && client.logStreamer.seq == req.Stop {
909		i.logWriter.DeregisterHandler(client.logStreamer)
910		client.logStreamer.Stop()
911		client.logStreamer = nil
912	}
913
914	// Remove an event stream if any
915	if es, ok := client.eventStreams[req.Stop]; ok {
916		i.agent.DeregisterEventHandler(es)
917		es.Stop()
918		delete(client.eventStreams, req.Stop)
919	}
920
921	// Always succeed
922	resp := responseHeader{Seq: seq, Error: ""}
923	return client.Send(&resp, nil)
924}
925
926func (i *AgentIPC) handleLeave(client *IPCClient, seq uint64) error {
927	i.logger.Printf("[INFO] agent.ipc: Graceful leave triggered")
928
929	// Do the leave
930	err := i.agent.Leave()
931	if err != nil {
932		i.logger.Printf("[ERR] agent.ipc: leave failed: %v", err)
933	}
934	resp := responseHeader{Seq: seq, Error: errToString(err)}
935
936	// Send and wait
937	err = client.Send(&resp, nil)
938
939	// Trigger a shutdown!
940	if err := i.agent.Shutdown(); err != nil {
941		i.logger.Printf("[ERR] agent.ipc: shutdown failed: %v", err)
942	}
943	return err
944}
945
946func (i *AgentIPC) handleTags(client *IPCClient, seq uint64) error {
947	var req tagsRequest
948	if err := client.dec.Decode(&req); err != nil {
949		return fmt.Errorf("decode failed: %v", err)
950	}
951
952	tags := make(map[string]string)
953
954	for key, val := range i.agent.SerfConfig().Tags {
955		var delTag bool
956		for _, delkey := range req.DeleteTags {
957			delTag = (delTag || delkey == key)
958		}
959		if !delTag {
960			tags[key] = val
961		}
962	}
963
964	for key, val := range req.Tags {
965		tags[key] = val
966	}
967
968	err := i.agent.SetTags(tags)
969
970	resp := responseHeader{Seq: seq, Error: errToString(err)}
971	return client.Send(&resp, nil)
972}
973
974func (i *AgentIPC) handleQuery(client *IPCClient, seq uint64) error {
975	var req queryRequest
976	if err := client.dec.Decode(&req); err != nil {
977		return fmt.Errorf("decode failed: %v", err)
978	}
979
980	// Setup the query
981	params := serf.QueryParam{
982		FilterNodes: req.FilterNodes,
983		FilterTags:  req.FilterTags,
984		RequestAck:  req.RequestAck,
985		RelayFactor: req.RelayFactor,
986		Timeout:     req.Timeout,
987	}
988
989	// Start the query
990	queryResp, err := i.agent.Query(req.Name, req.Payload, &params)
991
992	// Stream the query responses
993	if err == nil {
994		qs := newQueryResponseStream(client, seq, i.logger)
995		defer func() {
996			go qs.Stream(queryResp)
997		}()
998	}
999
1000	// Respond
1001	resp := responseHeader{
1002		Seq:   seq,
1003		Error: errToString(err),
1004	}
1005	return client.Send(&resp, nil)
1006}
1007
1008func (i *AgentIPC) handleRespond(client *IPCClient, seq uint64) error {
1009	var req respondRequest
1010	if err := client.dec.Decode(&req); err != nil {
1011		return fmt.Errorf("decode failed: %v", err)
1012	}
1013
1014	// Lookup the query
1015	client.queryLock.Lock()
1016	query, ok := client.pendingQueries[req.ID]
1017	client.queryLock.Unlock()
1018
1019	// Respond if we have a pending query
1020	var err error
1021	if ok {
1022		err = query.Respond(req.Payload)
1023	} else {
1024		err = fmt.Errorf(invalidQueryID)
1025	}
1026
1027	// Respond
1028	resp := responseHeader{
1029		Seq:   seq,
1030		Error: errToString(err),
1031	}
1032	return client.Send(&resp, nil)
1033}
1034
1035// handleStats is used to get various statistics
1036func (i *AgentIPC) handleStats(client *IPCClient, seq uint64) error {
1037	header := responseHeader{
1038		Seq:   seq,
1039		Error: "",
1040	}
1041	resp := i.agent.Stats()
1042	return client.Send(&header, resp)
1043}
1044
1045// handleGetCoordinate is used to get the cached coordinate for a node.
1046func (i *AgentIPC) handleGetCoordinate(client *IPCClient, seq uint64) error {
1047	var req coordinateRequest
1048	if err := client.dec.Decode(&req); err != nil {
1049		return fmt.Errorf("decode failed: %v", err)
1050	}
1051
1052	// Fetch the coordinate.
1053	var result coordinate.Coordinate
1054	coord, ok := i.agent.Serf().GetCachedCoordinate(req.Node)
1055	if ok {
1056		result = *coord
1057	}
1058
1059	// Respond
1060	header := responseHeader{
1061		Seq:   seq,
1062		Error: errToString(nil),
1063	}
1064	resp := coordinateResponse{
1065		Coord: result,
1066		Ok:    ok,
1067	}
1068	return client.Send(&header, &resp)
1069}
1070
1071// Used to convert an error to a string representation
1072func errToString(err error) string {
1073	if err == nil {
1074		return ""
1075	}
1076	return err.Error()
1077}
1078