1package agent
2
3import (
4	"fmt"
5	"regexp"
6
7	"github.com/hashicorp/consul/agent/structs"
8	"github.com/hashicorp/go-uuid"
9)
10
11const (
12	// userEventMaxVersion is the maximum protocol version we understand
13	userEventMaxVersion = 1
14
15	// remoteExecName is the event name for a remote exec command
16	remoteExecName = "_rexec"
17)
18
19// UserEventParam is used to parameterize a user event
20type UserEvent struct {
21	// ID of the user event. Automatically generated.
22	ID string
23
24	// Name of the event
25	Name string `codec:"n"`
26
27	// Optional payload
28	Payload []byte `codec:"p,omitempty"`
29
30	// NodeFilter is a regular expression to filter on nodes
31	NodeFilter string `codec:"nf,omitempty"`
32
33	// ServiceFilter is a regular expression to filter on services
34	ServiceFilter string `codec:"sf,omitempty"`
35
36	// TagFilter is a regular expression to filter on tags of a service,
37	// must be provided with ServiceFilter
38	TagFilter string `codec:"tf,omitempty"`
39
40	// Version of the user event. Automatically generated.
41	Version int `codec:"v"`
42
43	// LTime is the lamport time. Automatically generated.
44	LTime uint64 `codec:"-"`
45}
46
47// validateUserEventParams is used to sanity check the inputs
48func validateUserEventParams(params *UserEvent) error {
49	// Validate the inputs
50	if params.Name == "" {
51		return fmt.Errorf("User event missing name")
52	}
53	if params.TagFilter != "" && params.ServiceFilter == "" {
54		return fmt.Errorf("Cannot provide tag filter without service filter")
55	}
56	if params.NodeFilter != "" {
57		if _, err := regexp.Compile(params.NodeFilter); err != nil {
58			return fmt.Errorf("Invalid node filter: %v", err)
59		}
60	}
61	if params.ServiceFilter != "" {
62		if _, err := regexp.Compile(params.ServiceFilter); err != nil {
63			return fmt.Errorf("Invalid service filter: %v", err)
64		}
65	}
66	if params.TagFilter != "" {
67		if _, err := regexp.Compile(params.TagFilter); err != nil {
68			return fmt.Errorf("Invalid tag filter: %v", err)
69		}
70	}
71	return nil
72}
73
74// UserEvent is used to fire an event via the Serf layer on the LAN
75func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
76	// Validate the params
77	if err := validateUserEventParams(params); err != nil {
78		return err
79	}
80
81	// Format message
82	var err error
83	if params.ID, err = uuid.GenerateUUID(); err != nil {
84		return fmt.Errorf("UUID generation failed: %v", err)
85	}
86	params.Version = userEventMaxVersion
87	payload, err := encodeMsgPack(&params)
88	if err != nil {
89		return fmt.Errorf("UserEvent encoding failed: %v", err)
90	}
91
92	// Service the event fire over RPC. This ensures that we authorize
93	// the request against the token first.
94	args := structs.EventFireRequest{
95		Datacenter:   dc,
96		Name:         params.Name,
97		Payload:      payload,
98		QueryOptions: structs.QueryOptions{Token: token},
99	}
100
101	// Any server can process in the remote DC, since the
102	// gossip will take over anyways
103	args.AllowStale = true
104	var out structs.EventFireResponse
105	return a.RPC("Internal.EventFire", &args, &out)
106}
107
108// handleEvents is used to process incoming user events
109func (a *Agent) handleEvents() {
110	for {
111		select {
112		case e := <-a.eventCh:
113			// Decode the event
114			msg := new(UserEvent)
115			if err := decodeMsgPack(e.Payload, msg); err != nil {
116				a.logger.Printf("[ERR] agent: Failed to decode event: %v", err)
117				continue
118			}
119			msg.LTime = uint64(e.LTime)
120
121			// Skip if we don't pass filtering
122			if !a.shouldProcessUserEvent(msg) {
123				continue
124			}
125
126			// Ingest the event
127			a.ingestUserEvent(msg)
128
129		case <-a.shutdownCh:
130			return
131		}
132	}
133}
134
135// shouldProcessUserEvent checks if an event makes it through our filters
136func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
137	// Check the version
138	if msg.Version > userEventMaxVersion {
139		a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)",
140			msg.Version, msg.Name)
141	}
142
143	// Apply the filters
144	if msg.NodeFilter != "" {
145		re, err := regexp.Compile(msg.NodeFilter)
146		if err != nil {
147			a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v",
148				msg.NodeFilter, msg.Name, err)
149			return false
150		}
151		if !re.MatchString(a.config.NodeName) {
152			return false
153		}
154	}
155
156	if msg.ServiceFilter != "" {
157		re, err := regexp.Compile(msg.ServiceFilter)
158		if err != nil {
159			a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v",
160				msg.ServiceFilter, msg.Name, err)
161			return false
162		}
163
164		var tagRe *regexp.Regexp
165		if msg.TagFilter != "" {
166			re, err := regexp.Compile(msg.TagFilter)
167			if err != nil {
168				a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v",
169					msg.TagFilter, msg.Name, err)
170				return false
171			}
172			tagRe = re
173		}
174
175		// Scan for a match
176		services := a.State.Services()
177		found := false
178	OUTER:
179		for name, info := range services {
180			// Check the service name
181			if !re.MatchString(name) {
182				continue
183			}
184			if tagRe == nil {
185				found = true
186				break
187			}
188
189			// Look for a matching tag
190			for _, tag := range info.Tags {
191				if !tagRe.MatchString(tag) {
192					continue
193				}
194				found = true
195				break OUTER
196			}
197		}
198
199		// No matching services
200		if !found {
201			return false
202		}
203	}
204	return true
205}
206
207// ingestUserEvent is used to process an event that passes filtering
208func (a *Agent) ingestUserEvent(msg *UserEvent) {
209	// Special handling for internal events
210	switch msg.Name {
211	case remoteExecName:
212		if a.config.DisableRemoteExec {
213			a.logger.Printf("[INFO] agent: ignoring remote exec event (%s), disabled.", msg.ID)
214		} else {
215			go a.handleRemoteExec(msg)
216		}
217		return
218	default:
219		a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID)
220	}
221
222	a.eventLock.Lock()
223	defer func() {
224		a.eventLock.Unlock()
225		a.eventNotify.Notify()
226	}()
227
228	idx := a.eventIndex
229	a.eventBuf[idx] = msg
230	a.eventIndex = (idx + 1) % len(a.eventBuf)
231}
232
233// UserEvents is used to return a slice of the most recent
234// user events.
235func (a *Agent) UserEvents() []*UserEvent {
236	n := len(a.eventBuf)
237	out := make([]*UserEvent, n)
238	a.eventLock.RLock()
239	defer a.eventLock.RUnlock()
240
241	// Check if the buffer is full
242	if a.eventBuf[a.eventIndex] != nil {
243		if a.eventIndex == 0 {
244			copy(out, a.eventBuf)
245		} else {
246			copy(out, a.eventBuf[a.eventIndex:])
247			copy(out[n-a.eventIndex:], a.eventBuf[:a.eventIndex])
248		}
249	} else {
250		// We haven't filled the buffer yet
251		copy(out, a.eventBuf[:a.eventIndex])
252		out = out[:a.eventIndex]
253	}
254	return out
255}
256
257// LastUserEvent is used to return the last user event.
258// This will return nil if there is no recent event.
259func (a *Agent) LastUserEvent() *UserEvent {
260	a.eventLock.RLock()
261	defer a.eventLock.RUnlock()
262	n := len(a.eventBuf)
263	idx := (((a.eventIndex - 1) % n) + n) % n
264	return a.eventBuf[idx]
265}
266