1package agent 2 3/* 4 The agent exposes an IPC mechanism that is used for both controlling 5 Serf as well as providing a fast streaming mechanism for events. This 6 allows other applications to easily leverage Serf as the event layer. 7 8 We additionally make use of the IPC layer to also handle RPC calls from 9 the CLI to unify the code paths. This results in a split Request/Response 10 as well as streaming mode of operation. 11 12 The system is fairly simple, each client opens a TCP connection to the 13 agent. The connection is initialized with a handshake which establishes 14 the protocol version being used. This is to allow for future changes to 15 the protocol. 16 17 Once initialized, clients send commands and wait for responses. Certain 18 commands will cause the client to subscribe to events, and those will be 19 pushed down the socket as they are received. This provides a low-latency 20 mechanism for applications to send and receive events, while also providing 21 a flexible control mechanism for Serf. 22*/ 23 24import ( 25 "bufio" 26 "fmt" 27 "io" 28 "log" 29 "net" 30 "os" 31 "regexp" 32 "strings" 33 "sync" 34 "sync/atomic" 35 "time" 36 37 "github.com/armon/go-metrics" 38 "github.com/hashicorp/go-msgpack/codec" 39 "github.com/hashicorp/logutils" 40 "github.com/hashicorp/serf/coordinate" 41 "github.com/hashicorp/serf/serf" 42) 43 44const ( 45 MinIPCVersion = 1 46 MaxIPCVersion = 1 47) 48 49const ( 50 handshakeCommand = "handshake" 51 eventCommand = "event" 52 forceLeaveCommand = "force-leave" 53 joinCommand = "join" 54 membersCommand = "members" 55 membersFilteredCommand = "members-filtered" 56 streamCommand = "stream" 57 stopCommand = "stop" 58 monitorCommand = "monitor" 59 leaveCommand = "leave" 60 installKeyCommand = "install-key" 61 useKeyCommand = "use-key" 62 removeKeyCommand = "remove-key" 63 listKeysCommand = "list-keys" 64 tagsCommand = "tags" 65 queryCommand = "query" 66 respondCommand = "respond" 67 authCommand = "auth" 68 statsCommand = "stats" 69 getCoordinateCommand = "get-coordinate" 70) 71 72const ( 73 unsupportedCommand = "Unsupported command" 74 unsupportedIPCVersion = "Unsupported IPC version" 75 duplicateHandshake = "Handshake already performed" 76 handshakeRequired = "Handshake required" 77 monitorExists = "Monitor already exists" 78 invalidFilter = "Invalid event filter" 79 streamExists = "Stream with given sequence exists" 80 invalidQueryID = "No pending queries matching ID" 81 authRequired = "Authentication required" 82 invalidAuthToken = "Invalid authentication token" 83) 84 85const ( 86 queryRecordAck = "ack" 87 queryRecordResponse = "response" 88 queryRecordDone = "done" 89) 90 91// Request header is sent before each request 92type requestHeader struct { 93 Command string 94 Seq uint64 95} 96 97// Response header is sent before each response 98type responseHeader struct { 99 Seq uint64 100 Error string 101} 102 103type handshakeRequest struct { 104 Version int32 105} 106 107type authRequest struct { 108 AuthKey string 109} 110 111type coordinateRequest struct { 112 Node string 113} 114 115type coordinateResponse struct { 116 Coord coordinate.Coordinate 117 Ok bool 118} 119 120type eventRequest struct { 121 Name string 122 Payload []byte 123 Coalesce bool 124} 125 126type forceLeaveRequest struct { 127 Node string 128 Prune bool 129} 130 131type joinRequest struct { 132 Existing []string 133 Replay bool 134} 135 136type joinResponse struct { 137 Num int32 138} 139 140type membersFilteredRequest struct { 141 Tags map[string]string 142 Status string 143 Name string 144} 145 146type membersResponse struct { 147 Members []Member 148} 149 150type keyRequest struct { 151 Key string 152} 153 154type keyResponse struct { 155 Messages map[string]string 156 Keys map[string]int 157 NumNodes int 158 NumErr int 159 NumResp int 160} 161 162type monitorRequest struct { 163 LogLevel string 164} 165 166type streamRequest struct { 167 Type string 168} 169 170type stopRequest struct { 171 Stop uint64 172} 173 174type tagsRequest struct { 175 Tags map[string]string 176 DeleteTags []string 177} 178 179type queryRequest struct { 180 FilterNodes []string 181 FilterTags map[string]string 182 RequestAck bool 183 RelayFactor uint8 184 Timeout time.Duration 185 Name string 186 Payload []byte 187} 188 189type respondRequest struct { 190 ID uint64 191 Payload []byte 192} 193 194type queryRecord struct { 195 Type string 196 From string 197 Payload []byte 198} 199 200type logRecord struct { 201 Log string 202} 203 204type userEventRecord struct { 205 Event string 206 LTime serf.LamportTime 207 Name string 208 Payload []byte 209 Coalesce bool 210} 211 212type queryEventRecord struct { 213 Event string 214 ID uint64 // ID is opaque to client, used to respond 215 LTime serf.LamportTime 216 Name string 217 Payload []byte 218} 219 220type Member struct { 221 Name string 222 Addr net.IP 223 Port uint16 224 Tags map[string]string 225 Status string 226 ProtocolMin uint8 227 ProtocolMax uint8 228 ProtocolCur uint8 229 DelegateMin uint8 230 DelegateMax uint8 231 DelegateCur uint8 232} 233 234type memberEventRecord struct { 235 Event string 236 Members []Member 237} 238 239type AgentIPC struct { 240 sync.Mutex 241 agent *Agent 242 authKey string 243 clients map[string]*IPCClient 244 listener net.Listener 245 logger *log.Logger 246 logWriter *logWriter 247 stop bool 248 stopCh chan struct{} 249} 250 251type IPCClient struct { 252 queryID uint64 // Used to increment query IDs 253 name string 254 conn net.Conn 255 reader *bufio.Reader 256 writer *bufio.Writer 257 dec *codec.Decoder 258 enc *codec.Encoder 259 writeLock sync.Mutex 260 version int32 // From the handshake, 0 before 261 logStreamer *logStream 262 eventStreams map[uint64]*eventStream 263 264 pendingQueries map[uint64]*serf.Query 265 queryLock sync.Mutex 266 267 didAuth bool // Did we get an auth token yet? 268} 269 270// send is used to send an object using the MsgPack encoding. send 271// is serialized to prevent write overlaps, while properly buffering. 272func (c *IPCClient) Send(header *responseHeader, obj interface{}) error { 273 c.writeLock.Lock() 274 defer c.writeLock.Unlock() 275 276 if err := c.enc.Encode(header); err != nil { 277 return err 278 } 279 280 if obj != nil { 281 if err := c.enc.Encode(obj); err != nil { 282 return err 283 } 284 } 285 286 if err := c.writer.Flush(); err != nil { 287 return err 288 } 289 290 return nil 291} 292 293func (c *IPCClient) String() string { 294 return fmt.Sprintf("ipc.client: %v", c.conn.RemoteAddr()) 295} 296 297// nextQueryID safely generates a new query ID 298func (c *IPCClient) nextQueryID() uint64 { 299 return atomic.AddUint64(&c.queryID, 1) 300} 301 302// RegisterQuery is used to register a pending query that may 303// get a response. The ID of the query is returned 304func (c *IPCClient) RegisterQuery(q *serf.Query) uint64 { 305 // Generate a unique-per-client ID 306 id := c.nextQueryID() 307 308 // Ensure the query deadline is in the future 309 timeout := q.Deadline().Sub(time.Now()) 310 if timeout < 0 { 311 return id 312 } 313 314 // Register the query 315 c.queryLock.Lock() 316 c.pendingQueries[id] = q 317 c.queryLock.Unlock() 318 319 // Setup a timer to deregister after the timeout 320 time.AfterFunc(timeout, func() { 321 c.queryLock.Lock() 322 delete(c.pendingQueries, id) 323 c.queryLock.Unlock() 324 }) 325 return id 326} 327 328// NewAgentIPC is used to create a new Agent IPC handler 329func NewAgentIPC(agent *Agent, authKey string, listener net.Listener, 330 logOutput io.Writer, logWriter *logWriter) *AgentIPC { 331 if logOutput == nil { 332 logOutput = os.Stderr 333 } 334 ipc := &AgentIPC{ 335 agent: agent, 336 authKey: authKey, 337 clients: make(map[string]*IPCClient), 338 listener: listener, 339 logger: log.New(logOutput, "", log.LstdFlags), 340 logWriter: logWriter, 341 stopCh: make(chan struct{}), 342 } 343 go ipc.listen() 344 return ipc 345} 346 347// Shutdown is used to shutdown the IPC layer 348func (i *AgentIPC) Shutdown() { 349 i.Lock() 350 defer i.Unlock() 351 352 if i.stop { 353 return 354 } 355 356 i.stop = true 357 close(i.stopCh) 358 i.listener.Close() 359 360 // Close the existing connections 361 for _, client := range i.clients { 362 client.conn.Close() 363 } 364} 365 366// listen is a long running routine that listens for new clients 367func (i *AgentIPC) listen() { 368 for { 369 conn, err := i.listener.Accept() 370 if err != nil { 371 if i.stop { 372 return 373 } 374 i.logger.Printf("[ERR] agent.ipc: Failed to accept client: %v", err) 375 continue 376 } 377 i.logger.Printf("[INFO] agent.ipc: Accepted client: %v", conn.RemoteAddr()) 378 metrics.IncrCounter([]string{"agent", "ipc", "accept"}, 1) 379 380 // Wrap the connection in a client 381 client := &IPCClient{ 382 name: conn.RemoteAddr().String(), 383 conn: conn, 384 reader: bufio.NewReader(conn), 385 writer: bufio.NewWriter(conn), 386 eventStreams: make(map[uint64]*eventStream), 387 pendingQueries: make(map[uint64]*serf.Query), 388 } 389 client.dec = codec.NewDecoder(client.reader, 390 &codec.MsgpackHandle{RawToString: true, WriteExt: true}) 391 client.enc = codec.NewEncoder(client.writer, 392 &codec.MsgpackHandle{RawToString: true, WriteExt: true}) 393 394 // Register the client 395 i.Lock() 396 if !i.stop { 397 i.clients[client.name] = client 398 go i.handleClient(client) 399 } else { 400 conn.Close() 401 } 402 i.Unlock() 403 } 404} 405 406// deregisterClient is called to cleanup after a client disconnects 407func (i *AgentIPC) deregisterClient(client *IPCClient) { 408 // Close the socket 409 client.conn.Close() 410 411 // Remove from the clients list 412 i.Lock() 413 delete(i.clients, client.name) 414 i.Unlock() 415 416 // Remove from the log writer 417 if client.logStreamer != nil { 418 i.logWriter.DeregisterHandler(client.logStreamer) 419 client.logStreamer.Stop() 420 } 421 422 // Remove from event handlers 423 for _, es := range client.eventStreams { 424 i.agent.DeregisterEventHandler(es) 425 es.Stop() 426 } 427} 428 429// handleClient is a long running routine that handles a single client 430func (i *AgentIPC) handleClient(client *IPCClient) { 431 defer i.deregisterClient(client) 432 var reqHeader requestHeader 433 for { 434 // Decode the header 435 if err := client.dec.Decode(&reqHeader); err != nil { 436 if !i.stop { 437 // The second part of this if is to block socket 438 // errors from Windows which appear to happen every 439 // time there is an EOF. 440 if err != io.EOF && !strings.Contains(strings.ToLower(err.Error()), "wsarecv") { 441 i.logger.Printf("[ERR] agent.ipc: failed to decode request header: %v", err) 442 } 443 } 444 return 445 } 446 447 // Evaluate the command 448 if err := i.handleRequest(client, &reqHeader); err != nil { 449 i.logger.Printf("[ERR] agent.ipc: Failed to evaluate request: %v", err) 450 return 451 } 452 } 453} 454 455// handleRequest is used to evaluate a single client command 456func (i *AgentIPC) handleRequest(client *IPCClient, reqHeader *requestHeader) error { 457 // Look for a command field 458 command := reqHeader.Command 459 seq := reqHeader.Seq 460 461 // Ensure the handshake is performed before other commands 462 if command != handshakeCommand && client.version == 0 { 463 respHeader := responseHeader{Seq: seq, Error: handshakeRequired} 464 client.Send(&respHeader, nil) 465 return fmt.Errorf(handshakeRequired) 466 } 467 metrics.IncrCounter([]string{"agent", "ipc", "command"}, 1) 468 469 // Ensure the client has authenticated after the handshake if necessary 470 if i.authKey != "" && !client.didAuth && command != authCommand && command != handshakeCommand { 471 i.logger.Printf("[WARN] agent.ipc: Client sending commands before auth") 472 respHeader := responseHeader{Seq: seq, Error: authRequired} 473 client.Send(&respHeader, nil) 474 return nil 475 } 476 477 // Dispatch command specific handlers 478 switch command { 479 case handshakeCommand: 480 return i.handleHandshake(client, seq) 481 482 case authCommand: 483 return i.handleAuth(client, seq) 484 485 case eventCommand: 486 return i.handleEvent(client, seq) 487 488 case membersCommand, membersFilteredCommand: 489 return i.handleMembers(client, command, seq) 490 491 case streamCommand: 492 return i.handleStream(client, seq) 493 494 case monitorCommand: 495 return i.handleMonitor(client, seq) 496 497 case stopCommand: 498 return i.handleStop(client, seq) 499 500 case forceLeaveCommand: 501 return i.handleForceLeave(client, seq) 502 503 case joinCommand: 504 return i.handleJoin(client, seq) 505 506 case leaveCommand: 507 return i.handleLeave(client, seq) 508 509 case installKeyCommand: 510 return i.handleInstallKey(client, seq) 511 512 case useKeyCommand: 513 return i.handleUseKey(client, seq) 514 515 case removeKeyCommand: 516 return i.handleRemoveKey(client, seq) 517 518 case listKeysCommand: 519 return i.handleListKeys(client, seq) 520 521 case tagsCommand: 522 return i.handleTags(client, seq) 523 524 case queryCommand: 525 return i.handleQuery(client, seq) 526 527 case respondCommand: 528 return i.handleRespond(client, seq) 529 530 case statsCommand: 531 return i.handleStats(client, seq) 532 533 case getCoordinateCommand: 534 return i.handleGetCoordinate(client, seq) 535 536 default: 537 respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} 538 client.Send(&respHeader, nil) 539 return fmt.Errorf("command '%s' not recognized", command) 540 } 541} 542 543func (i *AgentIPC) handleHandshake(client *IPCClient, seq uint64) error { 544 var req handshakeRequest 545 if err := client.dec.Decode(&req); err != nil { 546 return fmt.Errorf("decode failed: %v", err) 547 } 548 549 resp := responseHeader{ 550 Seq: seq, 551 Error: "", 552 } 553 554 // Check the version 555 if req.Version < MinIPCVersion || req.Version > MaxIPCVersion { 556 resp.Error = unsupportedIPCVersion 557 } else if client.version != 0 { 558 resp.Error = duplicateHandshake 559 } else { 560 client.version = req.Version 561 } 562 return client.Send(&resp, nil) 563} 564 565func (i *AgentIPC) handleAuth(client *IPCClient, seq uint64) error { 566 var req authRequest 567 if err := client.dec.Decode(&req); err != nil { 568 return fmt.Errorf("decode failed: %v", err) 569 } 570 571 resp := responseHeader{ 572 Seq: seq, 573 Error: "", 574 } 575 576 // Check the token matches 577 if req.AuthKey == i.authKey { 578 client.didAuth = true 579 } else { 580 resp.Error = invalidAuthToken 581 } 582 return client.Send(&resp, nil) 583} 584 585func (i *AgentIPC) handleEvent(client *IPCClient, seq uint64) error { 586 var req eventRequest 587 if err := client.dec.Decode(&req); err != nil { 588 return fmt.Errorf("decode failed: %v", err) 589 } 590 591 // Attempt the send 592 err := i.agent.UserEvent(req.Name, req.Payload, req.Coalesce) 593 594 // Respond 595 resp := responseHeader{ 596 Seq: seq, 597 Error: errToString(err), 598 } 599 return client.Send(&resp, nil) 600} 601 602func (i *AgentIPC) handleForceLeave(client *IPCClient, seq uint64) error { 603 var req forceLeaveRequest 604 if err := client.dec.Decode(&req); err != nil { 605 return fmt.Errorf("decode failed: %v", err) 606 } 607 608 // Attempt leave 609 var err error 610 if req.Prune { 611 err = i.agent.ForceLeavePrune(req.Node) 612 } else { 613 err = i.agent.ForceLeave(req.Node) 614 } 615 616 // Respond 617 resp := responseHeader{ 618 Seq: seq, 619 Error: errToString(err), 620 } 621 return client.Send(&resp, nil) 622} 623 624func (i *AgentIPC) handleJoin(client *IPCClient, seq uint64) error { 625 var req joinRequest 626 if err := client.dec.Decode(&req); err != nil { 627 return fmt.Errorf("decode failed: %v", err) 628 } 629 630 // Attempt the join 631 num, err := i.agent.Join(req.Existing, req.Replay) 632 633 // Respond 634 header := responseHeader{ 635 Seq: seq, 636 Error: errToString(err), 637 } 638 resp := joinResponse{ 639 Num: int32(num), 640 } 641 return client.Send(&header, &resp) 642} 643 644func (i *AgentIPC) handleMembers(client *IPCClient, command string, seq uint64) error { 645 serf := i.agent.Serf() 646 raw := serf.Members() 647 members := make([]Member, 0, len(raw)) 648 649 if command == membersFilteredCommand { 650 var req membersFilteredRequest 651 err := client.dec.Decode(&req) 652 if err != nil { 653 return fmt.Errorf("decode failed: %v", err) 654 } 655 raw, err = i.filterMembers(raw, req.Tags, req.Status, req.Name) 656 if err != nil { 657 return err 658 } 659 } 660 661 for _, m := range raw { 662 sm := Member{ 663 Name: m.Name, 664 Addr: m.Addr, 665 Port: m.Port, 666 Tags: m.Tags, 667 Status: m.Status.String(), 668 ProtocolMin: m.ProtocolMin, 669 ProtocolMax: m.ProtocolMax, 670 ProtocolCur: m.ProtocolCur, 671 DelegateMin: m.DelegateMin, 672 DelegateMax: m.DelegateMax, 673 DelegateCur: m.DelegateCur, 674 } 675 members = append(members, sm) 676 } 677 678 header := responseHeader{ 679 Seq: seq, 680 Error: "", 681 } 682 resp := membersResponse{ 683 Members: members, 684 } 685 return client.Send(&header, &resp) 686} 687 688func (i *AgentIPC) filterMembers(members []serf.Member, tags map[string]string, 689 status string, name string) ([]serf.Member, error) { 690 691 result := make([]serf.Member, 0, len(members)) 692 693 // Pre-compile all the regular expressions 694 tagsRe := make(map[string]*regexp.Regexp) 695 for tag, expr := range tags { 696 re, err := regexp.Compile(fmt.Sprintf("^%s$", expr)) 697 if err != nil { 698 return nil, fmt.Errorf("Failed to compile regex: %v", err) 699 } 700 tagsRe[tag] = re 701 } 702 703 statusRe, err := regexp.Compile(fmt.Sprintf("^%s$", status)) 704 if err != nil { 705 return nil, fmt.Errorf("Failed to compile regex: %v", err) 706 } 707 708 nameRe, err := regexp.Compile(fmt.Sprintf("^%s$", name)) 709 if err != nil { 710 return nil, fmt.Errorf("Failed to compile regex: %v", err) 711 } 712 713OUTER: 714 for _, m := range members { 715 // Check if tags were passed, and if they match 716 for tag := range tags { 717 if !tagsRe[tag].MatchString(m.Tags[tag]) { 718 continue OUTER 719 } 720 } 721 722 // Check if status matches 723 if status != "" && !statusRe.MatchString(m.Status.String()) { 724 continue 725 } 726 727 // Check if node name matches 728 if name != "" && !nameRe.MatchString(m.Name) { 729 continue 730 } 731 732 // Made it past the filters! 733 result = append(result, m) 734 } 735 736 return result, nil 737} 738 739func (i *AgentIPC) handleInstallKey(client *IPCClient, seq uint64) error { 740 var req keyRequest 741 if err := client.dec.Decode(&req); err != nil { 742 return fmt.Errorf("decode failed: %v", err) 743 } 744 745 queryResp, err := i.agent.InstallKey(req.Key) 746 747 header := responseHeader{ 748 Seq: seq, 749 Error: errToString(err), 750 } 751 resp := keyResponse{ 752 Messages: queryResp.Messages, 753 NumNodes: queryResp.NumNodes, 754 NumErr: queryResp.NumErr, 755 NumResp: queryResp.NumResp, 756 } 757 758 return client.Send(&header, &resp) 759} 760 761func (i *AgentIPC) handleUseKey(client *IPCClient, seq uint64) error { 762 var req keyRequest 763 if err := client.dec.Decode(&req); err != nil { 764 return fmt.Errorf("decode failed: %v", err) 765 } 766 767 queryResp, err := i.agent.UseKey(req.Key) 768 769 header := responseHeader{ 770 Seq: seq, 771 Error: errToString(err), 772 } 773 resp := keyResponse{ 774 Messages: queryResp.Messages, 775 NumNodes: queryResp.NumNodes, 776 NumErr: queryResp.NumErr, 777 NumResp: queryResp.NumResp, 778 } 779 780 return client.Send(&header, &resp) 781} 782 783func (i *AgentIPC) handleRemoveKey(client *IPCClient, seq uint64) error { 784 var req keyRequest 785 if err := client.dec.Decode(&req); err != nil { 786 return fmt.Errorf("decode failed: %v", err) 787 } 788 789 queryResp, err := i.agent.RemoveKey(req.Key) 790 791 header := responseHeader{ 792 Seq: seq, 793 Error: errToString(err), 794 } 795 resp := keyResponse{ 796 Messages: queryResp.Messages, 797 NumNodes: queryResp.NumNodes, 798 NumErr: queryResp.NumErr, 799 NumResp: queryResp.NumResp, 800 } 801 802 return client.Send(&header, &resp) 803} 804 805func (i *AgentIPC) handleListKeys(client *IPCClient, seq uint64) error { 806 queryResp, err := i.agent.ListKeys() 807 808 header := responseHeader{ 809 Seq: seq, 810 Error: errToString(err), 811 } 812 resp := keyResponse{ 813 Messages: queryResp.Messages, 814 Keys: queryResp.Keys, 815 NumNodes: queryResp.NumNodes, 816 NumErr: queryResp.NumErr, 817 NumResp: queryResp.NumResp, 818 } 819 820 return client.Send(&header, &resp) 821} 822 823func (i *AgentIPC) handleStream(client *IPCClient, seq uint64) error { 824 var es *eventStream 825 var req streamRequest 826 if err := client.dec.Decode(&req); err != nil { 827 return fmt.Errorf("decode failed: %v", err) 828 } 829 830 resp := responseHeader{ 831 Seq: seq, 832 Error: "", 833 } 834 835 // Create the event filters 836 filters := ParseEventFilter(req.Type) 837 for _, f := range filters { 838 if !f.Valid() { 839 resp.Error = invalidFilter 840 goto SEND 841 } 842 } 843 844 // Check if there is an existing stream 845 if _, ok := client.eventStreams[seq]; ok { 846 resp.Error = streamExists 847 goto SEND 848 } 849 850 // Create an event streamer 851 es = newEventStream(client, filters, seq, i.logger) 852 client.eventStreams[seq] = es 853 854 // Register with the agent. Defer so that we can respond before 855 // registration, avoids any possible race condition 856 defer i.agent.RegisterEventHandler(es) 857 858SEND: 859 return client.Send(&resp, nil) 860} 861 862func (i *AgentIPC) handleMonitor(client *IPCClient, seq uint64) error { 863 var req monitorRequest 864 if err := client.dec.Decode(&req); err != nil { 865 return fmt.Errorf("decode failed: %v", err) 866 } 867 868 resp := responseHeader{ 869 Seq: seq, 870 Error: "", 871 } 872 873 // Upper case the log level 874 req.LogLevel = strings.ToUpper(req.LogLevel) 875 876 // Create a level filter 877 filter := LevelFilter() 878 filter.MinLevel = logutils.LogLevel(req.LogLevel) 879 if !ValidateLevelFilter(filter.MinLevel, filter) { 880 resp.Error = fmt.Sprintf("Unknown log level: %s", filter.MinLevel) 881 goto SEND 882 } 883 884 // Check if there is an existing monitor 885 if client.logStreamer != nil { 886 resp.Error = monitorExists 887 goto SEND 888 } 889 890 // Create a log streamer 891 client.logStreamer = newLogStream(client, filter, seq, i.logger) 892 893 // Register with the log writer. Defer so that we can respond before 894 // registration, avoids any possible race condition 895 defer i.logWriter.RegisterHandler(client.logStreamer) 896 897SEND: 898 return client.Send(&resp, nil) 899} 900 901func (i *AgentIPC) handleStop(client *IPCClient, seq uint64) error { 902 var req stopRequest 903 if err := client.dec.Decode(&req); err != nil { 904 return fmt.Errorf("decode failed: %v", err) 905 } 906 907 // Remove a log monitor if any 908 if client.logStreamer != nil && client.logStreamer.seq == req.Stop { 909 i.logWriter.DeregisterHandler(client.logStreamer) 910 client.logStreamer.Stop() 911 client.logStreamer = nil 912 } 913 914 // Remove an event stream if any 915 if es, ok := client.eventStreams[req.Stop]; ok { 916 i.agent.DeregisterEventHandler(es) 917 es.Stop() 918 delete(client.eventStreams, req.Stop) 919 } 920 921 // Always succeed 922 resp := responseHeader{Seq: seq, Error: ""} 923 return client.Send(&resp, nil) 924} 925 926func (i *AgentIPC) handleLeave(client *IPCClient, seq uint64) error { 927 i.logger.Printf("[INFO] agent.ipc: Graceful leave triggered") 928 929 // Do the leave 930 err := i.agent.Leave() 931 if err != nil { 932 i.logger.Printf("[ERR] agent.ipc: leave failed: %v", err) 933 } 934 resp := responseHeader{Seq: seq, Error: errToString(err)} 935 936 // Send and wait 937 err = client.Send(&resp, nil) 938 939 // Trigger a shutdown! 940 if err := i.agent.Shutdown(); err != nil { 941 i.logger.Printf("[ERR] agent.ipc: shutdown failed: %v", err) 942 } 943 return err 944} 945 946func (i *AgentIPC) handleTags(client *IPCClient, seq uint64) error { 947 var req tagsRequest 948 if err := client.dec.Decode(&req); err != nil { 949 return fmt.Errorf("decode failed: %v", err) 950 } 951 952 tags := make(map[string]string) 953 954 for key, val := range i.agent.SerfConfig().Tags { 955 var delTag bool 956 for _, delkey := range req.DeleteTags { 957 delTag = (delTag || delkey == key) 958 } 959 if !delTag { 960 tags[key] = val 961 } 962 } 963 964 for key, val := range req.Tags { 965 tags[key] = val 966 } 967 968 err := i.agent.SetTags(tags) 969 970 resp := responseHeader{Seq: seq, Error: errToString(err)} 971 return client.Send(&resp, nil) 972} 973 974func (i *AgentIPC) handleQuery(client *IPCClient, seq uint64) error { 975 var req queryRequest 976 if err := client.dec.Decode(&req); err != nil { 977 return fmt.Errorf("decode failed: %v", err) 978 } 979 980 // Setup the query 981 params := serf.QueryParam{ 982 FilterNodes: req.FilterNodes, 983 FilterTags: req.FilterTags, 984 RequestAck: req.RequestAck, 985 RelayFactor: req.RelayFactor, 986 Timeout: req.Timeout, 987 } 988 989 // Start the query 990 queryResp, err := i.agent.Query(req.Name, req.Payload, ¶ms) 991 992 // Stream the query responses 993 if err == nil { 994 qs := newQueryResponseStream(client, seq, i.logger) 995 defer func() { 996 go qs.Stream(queryResp) 997 }() 998 } 999 1000 // Respond 1001 resp := responseHeader{ 1002 Seq: seq, 1003 Error: errToString(err), 1004 } 1005 return client.Send(&resp, nil) 1006} 1007 1008func (i *AgentIPC) handleRespond(client *IPCClient, seq uint64) error { 1009 var req respondRequest 1010 if err := client.dec.Decode(&req); err != nil { 1011 return fmt.Errorf("decode failed: %v", err) 1012 } 1013 1014 // Lookup the query 1015 client.queryLock.Lock() 1016 query, ok := client.pendingQueries[req.ID] 1017 client.queryLock.Unlock() 1018 1019 // Respond if we have a pending query 1020 var err error 1021 if ok { 1022 err = query.Respond(req.Payload) 1023 } else { 1024 err = fmt.Errorf(invalidQueryID) 1025 } 1026 1027 // Respond 1028 resp := responseHeader{ 1029 Seq: seq, 1030 Error: errToString(err), 1031 } 1032 return client.Send(&resp, nil) 1033} 1034 1035// handleStats is used to get various statistics 1036func (i *AgentIPC) handleStats(client *IPCClient, seq uint64) error { 1037 header := responseHeader{ 1038 Seq: seq, 1039 Error: "", 1040 } 1041 resp := i.agent.Stats() 1042 return client.Send(&header, resp) 1043} 1044 1045// handleGetCoordinate is used to get the cached coordinate for a node. 1046func (i *AgentIPC) handleGetCoordinate(client *IPCClient, seq uint64) error { 1047 var req coordinateRequest 1048 if err := client.dec.Decode(&req); err != nil { 1049 return fmt.Errorf("decode failed: %v", err) 1050 } 1051 1052 // Fetch the coordinate. 1053 var result coordinate.Coordinate 1054 coord, ok := i.agent.Serf().GetCachedCoordinate(req.Node) 1055 if ok { 1056 result = *coord 1057 } 1058 1059 // Respond 1060 header := responseHeader{ 1061 Seq: seq, 1062 Error: errToString(nil), 1063 } 1064 resp := coordinateResponse{ 1065 Coord: result, 1066 Ok: ok, 1067 } 1068 return client.Send(&header, &resp) 1069} 1070 1071// Used to convert an error to a string representation 1072func errToString(err error) string { 1073 if err == nil { 1074 return "" 1075 } 1076 return err.Error() 1077} 1078