1// Copyright 2015 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package events
16
17import (
18	"errors"
19	"sort"
20	"strings"
21	"sync"
22	"time"
23
24	info "github.com/google/cadvisor/info/v1"
25	"github.com/google/cadvisor/utils"
26
27	"github.com/golang/glog"
28)
29
30type byTimestamp []*info.Event
31
32// functions necessary to implement the sort interface on the Events struct
33func (e byTimestamp) Len() int {
34	return len(e)
35}
36
37func (e byTimestamp) Swap(i, j int) {
38	e[i], e[j] = e[j], e[i]
39}
40
41func (e byTimestamp) Less(i, j int) bool {
42	return e[i].Timestamp.Before(e[j].Timestamp)
43}
44
45type EventChannel struct {
46	// Watch ID. Can be used by the caller to request cancellation of watch events.
47	watchId int
48	// Channel on which the caller can receive watch events.
49	channel chan *info.Event
50}
51
52// Request holds a set of parameters by which Event objects may be screened.
53// The caller may want events that occurred within a specific timeframe
54// or of a certain type, which may be specified in the *Request object
55// they pass to an EventManager function
56type Request struct {
57	// events falling before StartTime do not satisfy the request. StartTime
58	// must be left blank in calls to WatchEvents
59	StartTime time.Time
60	// events falling after EndTime do not satisfy the request. EndTime
61	// must be left blank in calls to WatchEvents
62	EndTime time.Time
63	// EventType is a map that specifies the type(s) of events wanted
64	EventType map[info.EventType]bool
65	// allows the caller to put a limit on how many
66	// events to receive. If there are more events than MaxEventsReturned
67	// then the most chronologically recent events in the time period
68	// specified are returned. Must be >= 1
69	MaxEventsReturned int
70	// the absolute container name for which the event occurred
71	ContainerName string
72	// if IncludeSubcontainers is false, only events occurring in the specific
73	// container, and not the subcontainers, will be returned
74	IncludeSubcontainers bool
75}
76
77// EventManager is implemented by Events. It provides two ways to monitor
78// events and one way to add events
79type EventManager interface {
80	// WatchEvents() allows a caller to register for receiving events based on the specified request.
81	// On successful registration, an EventChannel object is returned.
82	WatchEvents(request *Request) (*EventChannel, error)
83	// GetEvents() returns all detected events based on the filters specified in request.
84	GetEvents(request *Request) ([]*info.Event, error)
85	// AddEvent allows the caller to add an event to an EventManager
86	// object
87	AddEvent(e *info.Event) error
88	// Cancels a previously requested watch event.
89	StopWatch(watch_id int)
90}
91
92// events provides an implementation for the EventManager interface.
93type events struct {
94	// eventStore holds the events by event type.
95	eventStore map[info.EventType]*utils.TimedStore
96	// map of registered watchers keyed by watch id.
97	watchers map[int]*watch
98	// lock guarding the eventStore.
99	eventsLock sync.RWMutex
100	// lock guarding watchers.
101	watcherLock sync.RWMutex
102	// last allocated watch id.
103	lastId int
104	// Event storage policy.
105	storagePolicy StoragePolicy
106}
107
108// initialized by a call to WatchEvents(), a watch struct will then be added
109// to the events slice of *watch objects. When AddEvent() finds an event that
110// satisfies the request parameter of a watch object in events.watchers,
111// it will send that event out over the watch object's channel. The caller that
112// called WatchEvents will receive the event over the channel provided to
113// WatchEvents
114type watch struct {
115	// request parameters passed in by the caller of WatchEvents()
116	request *Request
117	// a channel used to send event back to the caller.
118	eventChannel *EventChannel
119}
120
121func NewEventChannel(watchId int) *EventChannel {
122	return &EventChannel{
123		watchId: watchId,
124		channel: make(chan *info.Event, 10),
125	}
126}
127
128// Policy specifying how many events to store.
129// MaxAge is the max duration for which to keep events.
130// MaxNumEvents is the max number of events to keep (-1 for no limit).
131type StoragePolicy struct {
132	// Defaults limites, used if a per-event limit is not set.
133	DefaultMaxAge       time.Duration
134	DefaultMaxNumEvents int
135
136	// Per-event type limits.
137	PerTypeMaxAge       map[info.EventType]time.Duration
138	PerTypeMaxNumEvents map[info.EventType]int
139}
140
141func DefaultStoragePolicy() StoragePolicy {
142	return StoragePolicy{
143		DefaultMaxAge:       24 * time.Hour,
144		DefaultMaxNumEvents: 100000,
145		PerTypeMaxAge:       make(map[info.EventType]time.Duration),
146		PerTypeMaxNumEvents: make(map[info.EventType]int),
147	}
148}
149
150// returns a pointer to an initialized Events object.
151func NewEventManager(storagePolicy StoragePolicy) *events {
152	return &events{
153		eventStore:    make(map[info.EventType]*utils.TimedStore, 0),
154		watchers:      make(map[int]*watch),
155		storagePolicy: storagePolicy,
156	}
157}
158
159// returns a pointer to an initialized Request object
160func NewRequest() *Request {
161	return &Request{
162		EventType:            map[info.EventType]bool{},
163		IncludeSubcontainers: false,
164		MaxEventsReturned:    10,
165	}
166}
167
168// returns a pointer to an initialized watch object
169func newWatch(request *Request, eventChannel *EventChannel) *watch {
170	return &watch{
171		request:      request,
172		eventChannel: eventChannel,
173	}
174}
175
176func (self *EventChannel) GetChannel() chan *info.Event {
177	return self.channel
178}
179
180func (self *EventChannel) GetWatchId() int {
181	return self.watchId
182}
183
184// sorts and returns up to the last MaxEventsReturned chronological elements
185func getMaxEventsReturned(request *Request, eSlice []*info.Event) []*info.Event {
186	sort.Sort(byTimestamp(eSlice))
187	n := request.MaxEventsReturned
188	if n >= len(eSlice) || n <= 0 {
189		return eSlice
190	}
191	return eSlice[len(eSlice)-n:]
192}
193
194// If the request wants all subcontainers, this returns if the request's
195// container path is a prefix of the event container path.  Otherwise,
196// it checks that the container paths of the event and request are
197// equivalent
198func checkIfIsSubcontainer(request *Request, event *info.Event) bool {
199	if request.IncludeSubcontainers == true {
200		return request.ContainerName == "/" || strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/")
201	}
202	return event.ContainerName == request.ContainerName
203}
204
205// determines if an event occurs within the time set in the request object and is the right type
206func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool {
207	startTime := request.StartTime
208	endTime := request.EndTime
209	eventTime := event.Timestamp
210	if !startTime.IsZero() {
211		if startTime.After(eventTime) {
212			return false
213		}
214	}
215	if !endTime.IsZero() {
216		if endTime.Before(eventTime) {
217			return false
218		}
219	}
220	if !request.EventType[event.EventType] {
221		return false
222	}
223	if request.ContainerName != "" {
224		return checkIfIsSubcontainer(request, event)
225	}
226	return true
227}
228
229// method of Events object that screens Event objects found in the eventStore
230// attribute and if they fit the parameters passed by the Request object,
231// adds it to a slice of *Event objects that is returned. If both MaxEventsReturned
232// and StartTime/EndTime are specified in the request object, then only
233// up to the most recent MaxEventsReturned events in that time range are returned.
234func (self *events) GetEvents(request *Request) ([]*info.Event, error) {
235	returnEventList := []*info.Event{}
236	self.eventsLock.RLock()
237	defer self.eventsLock.RUnlock()
238	for eventType, fetch := range request.EventType {
239		if !fetch {
240			continue
241		}
242		evs, ok := self.eventStore[eventType]
243		if !ok {
244			continue
245		}
246
247		res := evs.InTimeRange(request.StartTime, request.EndTime, request.MaxEventsReturned)
248		for _, in := range res {
249			e := in.(*info.Event)
250			if checkIfEventSatisfiesRequest(request, e) {
251				returnEventList = append(returnEventList, e)
252			}
253		}
254	}
255	returnEventList = getMaxEventsReturned(request, returnEventList)
256	return returnEventList, nil
257}
258
259// method of Events object that maintains an *Event channel passed by the user.
260// When an event is added by AddEvents that satisfies the parameters in the passed
261// Request object it is fed to the channel. The StartTime and EndTime of the watch
262// request should be uninitialized because the purpose is to watch indefinitely
263// for events that will happen in the future
264func (self *events) WatchEvents(request *Request) (*EventChannel, error) {
265	if !request.StartTime.IsZero() || !request.EndTime.IsZero() {
266		return nil, errors.New(
267			"for a call to watch, request.StartTime and request.EndTime must be uninitialized")
268	}
269	self.watcherLock.Lock()
270	defer self.watcherLock.Unlock()
271	new_id := self.lastId + 1
272	returnEventChannel := NewEventChannel(new_id)
273	newWatcher := newWatch(request, returnEventChannel)
274	self.watchers[new_id] = newWatcher
275	self.lastId = new_id
276	return returnEventChannel, nil
277}
278
279// helper function to update the event manager's eventStore
280func (self *events) updateEventStore(e *info.Event) {
281	self.eventsLock.Lock()
282	defer self.eventsLock.Unlock()
283	if _, ok := self.eventStore[e.EventType]; !ok {
284		maxAge := self.storagePolicy.DefaultMaxAge
285		maxNumEvents := self.storagePolicy.DefaultMaxNumEvents
286		if age, ok := self.storagePolicy.PerTypeMaxAge[e.EventType]; ok {
287			maxAge = age
288		}
289		if numEvents, ok := self.storagePolicy.PerTypeMaxNumEvents[e.EventType]; ok {
290			maxNumEvents = numEvents
291		}
292
293		self.eventStore[e.EventType] = utils.NewTimedStore(maxAge, maxNumEvents)
294	}
295	self.eventStore[e.EventType].Add(e.Timestamp, e)
296}
297
298func (self *events) findValidWatchers(e *info.Event) []*watch {
299	watchesToSend := make([]*watch, 0)
300	for _, watcher := range self.watchers {
301		watchRequest := watcher.request
302		if checkIfEventSatisfiesRequest(watchRequest, e) {
303			watchesToSend = append(watchesToSend, watcher)
304		}
305	}
306	return watchesToSend
307}
308
309// method of Events object that adds the argument Event object to the
310// eventStore. It also feeds the event to a set of watch channels
311// held by the manager if it satisfies the request keys of the channels
312func (self *events) AddEvent(e *info.Event) error {
313	self.updateEventStore(e)
314	self.watcherLock.RLock()
315	defer self.watcherLock.RUnlock()
316	watchesToSend := self.findValidWatchers(e)
317	for _, watchObject := range watchesToSend {
318		watchObject.eventChannel.GetChannel() <- e
319	}
320	glog.V(4).Infof("Added event %v", e)
321	return nil
322}
323
324// Removes a watch instance from the EventManager's watchers map
325func (self *events) StopWatch(watchId int) {
326	self.watcherLock.Lock()
327	defer self.watcherLock.Unlock()
328	_, ok := self.watchers[watchId]
329	if !ok {
330		glog.Errorf("Could not find watcher instance %v", watchId)
331	}
332	close(self.watchers[watchId].eventChannel.GetChannel())
333	delete(self.watchers, watchId)
334}
335