1package agent 2 3import ( 4 "fmt" 5 "io" 6 "log" 7 "os" 8 "os/exec" 9 "regexp" 10 "runtime" 11 "strings" 12 "time" 13 14 "github.com/armon/circbuf" 15 "github.com/armon/go-metrics" 16 "github.com/hashicorp/serf/serf" 17) 18 19const ( 20 windows = "windows" 21 22 // maxBufSize limits how much data we collect from a handler. 23 // This is to prevent Serf's memory from growing to an enormous 24 // amount due to a faulty handler. 25 maxBufSize = 8 * 1024 26 27 // warnSlow is used to warn about a slow handler invocation 28 warnSlow = time.Second 29) 30 31var sanitizeTagRegexp = regexp.MustCompile(`[^A-Z0-9_]`) 32 33// invokeEventScript will execute the given event script with the given 34// event. Depending on the event, the semantics of how data are passed 35// are a bit different. For all events, the SERF_EVENT environmental 36// variable is the type of the event. For user events, the SERF_USER_EVENT 37// environmental variable is also set, containing the name of the user 38// event that was fired. 39// 40// In all events, data is passed in via stdin to facilitate piping. See 41// the various stdin functions below for more information. 42func invokeEventScript(logger *log.Logger, script string, self serf.Member, event serf.Event) error { 43 defer metrics.MeasureSince([]string{"agent", "invoke", script}, time.Now()) 44 output, _ := circbuf.NewBuffer(maxBufSize) 45 46 // Determine the shell invocation based on OS 47 var shell, flag string 48 if runtime.GOOS == windows { 49 shell = "cmd" 50 flag = "/C" 51 } else { 52 shell = "/bin/sh" 53 flag = "-c" 54 } 55 56 cmd := exec.Command(shell, flag, script) 57 cmd.Env = append(os.Environ(), 58 "SERF_EVENT="+event.EventType().String(), 59 "SERF_SELF_NAME="+self.Name, 60 "SERF_SELF_ROLE="+self.Tags["role"], 61 ) 62 cmd.Stderr = output 63 cmd.Stdout = output 64 65 // Add all the tags 66 for name, val := range self.Tags { 67 //http://stackoverflow.com/questions/2821043/allowed-characters-in-linux-environment-variable-names 68 //(http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html for the long version) 69 //says that env var names must be in [A-Z0-9_] and not start with [0-9]. 70 //we only care about the first part, so convert all chars not in [A-Z0-9_] to _ 71 sanitizedName := sanitizeTagRegexp.ReplaceAllString(strings.ToUpper(name), "_") 72 tag_env := fmt.Sprintf("SERF_TAG_%s=%s", sanitizedName, val) 73 cmd.Env = append(cmd.Env, tag_env) 74 } 75 76 stdin, err := cmd.StdinPipe() 77 if err != nil { 78 return err 79 } 80 81 switch e := event.(type) { 82 case serf.MemberEvent: 83 go memberEventStdin(logger, stdin, &e) 84 case serf.UserEvent: 85 cmd.Env = append(cmd.Env, "SERF_USER_EVENT="+e.Name) 86 cmd.Env = append(cmd.Env, fmt.Sprintf("SERF_USER_LTIME=%d", e.LTime)) 87 go streamPayload(logger, stdin, e.Payload) 88 case *serf.Query: 89 cmd.Env = append(cmd.Env, "SERF_QUERY_NAME="+e.Name) 90 cmd.Env = append(cmd.Env, fmt.Sprintf("SERF_QUERY_LTIME=%d", e.LTime)) 91 go streamPayload(logger, stdin, e.Payload) 92 default: 93 return fmt.Errorf("Unknown event type: %s", event.EventType().String()) 94 } 95 96 // Start a timer to warn about slow handlers 97 slowTimer := time.AfterFunc(warnSlow, func() { 98 logger.Printf("[WARN] agent: Script '%s' slow, execution exceeding %v", 99 script, warnSlow) 100 }) 101 102 if err := cmd.Start(); err != nil { 103 return err 104 } 105 106 // Warn if buffer is overritten 107 if output.TotalWritten() > output.Size() { 108 logger.Printf("[WARN] agent: Script '%s' generated %d bytes of output, truncated to %d", 109 script, output.TotalWritten(), output.Size()) 110 } 111 112 err = cmd.Wait() 113 slowTimer.Stop() 114 logger.Printf("[DEBUG] agent: Event '%s' script output: %s", 115 event.EventType().String(), output.String()) 116 if err != nil { 117 return err 118 } 119 120 // If this is a query and we have output, respond 121 if query, ok := event.(*serf.Query); ok && output.TotalWritten() > 0 { 122 if err := query.Respond(output.Bytes()); err != nil { 123 logger.Printf("[WARN] agent: Failed to respond to query '%s': %s", 124 event.String(), err) 125 } 126 } 127 128 return nil 129} 130 131// eventClean cleans a value to be a parameter in an event line. 132func eventClean(v string) string { 133 v = strings.Replace(v, "\t", "\\t", -1) 134 v = strings.Replace(v, "\n", "\\n", -1) 135 return v 136} 137 138// Sends data on stdin for a member event. 139// 140// The format for the data is unix tool friendly, separated by whitespace 141// and newlines. The structure of each line for any member event is: 142// "NAME ADDRESS ROLE TAGS" where the whitespace is actually tabs. 143// The name and role are cleaned so that newlines and tabs are replaced 144// with "\n" and "\t" respectively. 145func memberEventStdin(logger *log.Logger, stdin io.WriteCloser, e *serf.MemberEvent) { 146 defer stdin.Close() 147 for _, member := range e.Members { 148 // Format the tags as tag1=v1,tag2=v2,... 149 var tagPairs []string 150 for name, value := range member.Tags { 151 tagPairs = append(tagPairs, fmt.Sprintf("%s=%s", name, value)) 152 } 153 tags := strings.Join(tagPairs, ",") 154 155 // Send the entire line 156 _, err := stdin.Write([]byte(fmt.Sprintf( 157 "%s\t%s\t%s\t%s\n", 158 eventClean(member.Name), 159 member.Addr.String(), 160 eventClean(member.Tags["role"]), 161 eventClean(tags)))) 162 if err != nil { 163 return 164 } 165 } 166} 167 168// Sends data on stdin for an event. The stdin simply contains the 169// payload (if any). 170// Most shells read implementations need a newline, force it to be there 171func streamPayload(logger *log.Logger, stdin io.WriteCloser, buf []byte) { 172 defer stdin.Close() 173 174 // Append a newline to payload if missing 175 payload := buf 176 if len(payload) > 0 && payload[len(payload)-1] != '\n' { 177 payload = append(payload, '\n') 178 } 179 180 if _, err := stdin.Write(payload); err != nil { 181 logger.Printf("[ERR] Error writing payload: %s", err) 182 return 183 } 184} 185