1package agent
2
3import (
4	"fmt"
5	"io"
6	"log"
7	"os"
8	"os/exec"
9	"regexp"
10	"runtime"
11	"strings"
12	"time"
13
14	"github.com/armon/circbuf"
15	"github.com/armon/go-metrics"
16	"github.com/hashicorp/serf/serf"
17)
18
19const (
20	windows = "windows"
21
22	// maxBufSize limits how much data we collect from a handler.
23	// This is to prevent Serf's memory from growing to an enormous
24	// amount due to a faulty handler.
25	maxBufSize = 8 * 1024
26
27	// warnSlow is used to warn about a slow handler invocation
28	warnSlow = time.Second
29)
30
31var sanitizeTagRegexp = regexp.MustCompile(`[^A-Z0-9_]`)
32
33// invokeEventScript will execute the given event script with the given
34// event. Depending on the event, the semantics of how data are passed
35// are a bit different. For all events, the SERF_EVENT environmental
36// variable is the type of the event. For user events, the SERF_USER_EVENT
37// environmental variable is also set, containing the name of the user
38// event that was fired.
39//
40// In all events, data is passed in via stdin to facilitate piping. See
41// the various stdin functions below for more information.
42func invokeEventScript(logger *log.Logger, script string, self serf.Member, event serf.Event) error {
43	defer metrics.MeasureSince([]string{"agent", "invoke", script}, time.Now())
44	output, _ := circbuf.NewBuffer(maxBufSize)
45
46	// Determine the shell invocation based on OS
47	var shell, flag string
48	if runtime.GOOS == windows {
49		shell = "cmd"
50		flag = "/C"
51	} else {
52		shell = "/bin/sh"
53		flag = "-c"
54	}
55
56	cmd := exec.Command(shell, flag, script)
57	cmd.Env = append(os.Environ(),
58		"SERF_EVENT="+event.EventType().String(),
59		"SERF_SELF_NAME="+self.Name,
60		"SERF_SELF_ROLE="+self.Tags["role"],
61	)
62	cmd.Stderr = output
63	cmd.Stdout = output
64
65	// Add all the tags
66	for name, val := range self.Tags {
67		//http://stackoverflow.com/questions/2821043/allowed-characters-in-linux-environment-variable-names
68		//(http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html for the long version)
69		//says that env var names must be in [A-Z0-9_] and not start with [0-9].
70		//we only care about the first part, so convert all chars not in [A-Z0-9_] to _
71		sanitizedName := sanitizeTagRegexp.ReplaceAllString(strings.ToUpper(name), "_")
72		tag_env := fmt.Sprintf("SERF_TAG_%s=%s", sanitizedName, val)
73		cmd.Env = append(cmd.Env, tag_env)
74	}
75
76	stdin, err := cmd.StdinPipe()
77	if err != nil {
78		return err
79	}
80
81	switch e := event.(type) {
82	case serf.MemberEvent:
83		go memberEventStdin(logger, stdin, &e)
84	case serf.UserEvent:
85		cmd.Env = append(cmd.Env, "SERF_USER_EVENT="+e.Name)
86		cmd.Env = append(cmd.Env, fmt.Sprintf("SERF_USER_LTIME=%d", e.LTime))
87		go streamPayload(logger, stdin, e.Payload)
88	case *serf.Query:
89		cmd.Env = append(cmd.Env, "SERF_QUERY_NAME="+e.Name)
90		cmd.Env = append(cmd.Env, fmt.Sprintf("SERF_QUERY_LTIME=%d", e.LTime))
91		go streamPayload(logger, stdin, e.Payload)
92	default:
93		return fmt.Errorf("Unknown event type: %s", event.EventType().String())
94	}
95
96	// Start a timer to warn about slow handlers
97	slowTimer := time.AfterFunc(warnSlow, func() {
98		logger.Printf("[WARN] agent: Script '%s' slow, execution exceeding %v",
99			script, warnSlow)
100	})
101
102	if err := cmd.Start(); err != nil {
103		return err
104	}
105
106	// Warn if buffer is overritten
107	if output.TotalWritten() > output.Size() {
108		logger.Printf("[WARN] agent: Script '%s' generated %d bytes of output, truncated to %d",
109			script, output.TotalWritten(), output.Size())
110	}
111
112	err = cmd.Wait()
113	slowTimer.Stop()
114	logger.Printf("[DEBUG] agent: Event '%s' script output: %s",
115		event.EventType().String(), output.String())
116	if err != nil {
117		return err
118	}
119
120	// If this is a query and we have output, respond
121	if query, ok := event.(*serf.Query); ok && output.TotalWritten() > 0 {
122		if err := query.Respond(output.Bytes()); err != nil {
123			logger.Printf("[WARN] agent: Failed to respond to query '%s': %s",
124				event.String(), err)
125		}
126	}
127
128	return nil
129}
130
131// eventClean cleans a value to be a parameter in an event line.
132func eventClean(v string) string {
133	v = strings.Replace(v, "\t", "\\t", -1)
134	v = strings.Replace(v, "\n", "\\n", -1)
135	return v
136}
137
138// Sends data on stdin for a member event.
139//
140// The format for the data is unix tool friendly, separated by whitespace
141// and newlines. The structure of each line for any member event is:
142// "NAME    ADDRESS    ROLE    TAGS" where the whitespace is actually tabs.
143// The name and role are cleaned so that newlines and tabs are replaced
144// with "\n" and "\t" respectively.
145func memberEventStdin(logger *log.Logger, stdin io.WriteCloser, e *serf.MemberEvent) {
146	defer stdin.Close()
147	for _, member := range e.Members {
148		// Format the tags as tag1=v1,tag2=v2,...
149		var tagPairs []string
150		for name, value := range member.Tags {
151			tagPairs = append(tagPairs, fmt.Sprintf("%s=%s", name, value))
152		}
153		tags := strings.Join(tagPairs, ",")
154
155		// Send the entire line
156		_, err := stdin.Write([]byte(fmt.Sprintf(
157			"%s\t%s\t%s\t%s\n",
158			eventClean(member.Name),
159			member.Addr.String(),
160			eventClean(member.Tags["role"]),
161			eventClean(tags))))
162		if err != nil {
163			return
164		}
165	}
166}
167
168// Sends data on stdin for an event. The stdin simply contains the
169// payload (if any).
170// Most shells read implementations need a newline, force it to be there
171func streamPayload(logger *log.Logger, stdin io.WriteCloser, buf []byte) {
172	defer stdin.Close()
173
174	// Append a newline to payload if missing
175	payload := buf
176	if len(payload) > 0 && payload[len(payload)-1] != '\n' {
177		payload = append(payload, '\n')
178	}
179
180	if _, err := stdin.Write(payload); err != nil {
181		logger.Printf("[ERR] Error writing payload: %s", err)
182		return
183	}
184}
185