1// Copyright 2015 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package events 16 17import ( 18 "errors" 19 "sort" 20 "strings" 21 "sync" 22 "time" 23 24 info "github.com/google/cadvisor/info/v1" 25 "github.com/google/cadvisor/utils" 26 27 "github.com/golang/glog" 28) 29 30type byTimestamp []*info.Event 31 32// functions necessary to implement the sort interface on the Events struct 33func (e byTimestamp) Len() int { 34 return len(e) 35} 36 37func (e byTimestamp) Swap(i, j int) { 38 e[i], e[j] = e[j], e[i] 39} 40 41func (e byTimestamp) Less(i, j int) bool { 42 return e[i].Timestamp.Before(e[j].Timestamp) 43} 44 45type EventChannel struct { 46 // Watch ID. Can be used by the caller to request cancellation of watch events. 47 watchId int 48 // Channel on which the caller can receive watch events. 49 channel chan *info.Event 50} 51 52// Request holds a set of parameters by which Event objects may be screened. 53// The caller may want events that occurred within a specific timeframe 54// or of a certain type, which may be specified in the *Request object 55// they pass to an EventManager function 56type Request struct { 57 // events falling before StartTime do not satisfy the request. StartTime 58 // must be left blank in calls to WatchEvents 59 StartTime time.Time 60 // events falling after EndTime do not satisfy the request. EndTime 61 // must be left blank in calls to WatchEvents 62 EndTime time.Time 63 // EventType is a map that specifies the type(s) of events wanted 64 EventType map[info.EventType]bool 65 // allows the caller to put a limit on how many 66 // events to receive. If there are more events than MaxEventsReturned 67 // then the most chronologically recent events in the time period 68 // specified are returned. Must be >= 1 69 MaxEventsReturned int 70 // the absolute container name for which the event occurred 71 ContainerName string 72 // if IncludeSubcontainers is false, only events occurring in the specific 73 // container, and not the subcontainers, will be returned 74 IncludeSubcontainers bool 75} 76 77// EventManager is implemented by Events. It provides two ways to monitor 78// events and one way to add events 79type EventManager interface { 80 // WatchEvents() allows a caller to register for receiving events based on the specified request. 81 // On successful registration, an EventChannel object is returned. 82 WatchEvents(request *Request) (*EventChannel, error) 83 // GetEvents() returns all detected events based on the filters specified in request. 84 GetEvents(request *Request) ([]*info.Event, error) 85 // AddEvent allows the caller to add an event to an EventManager 86 // object 87 AddEvent(e *info.Event) error 88 // Cancels a previously requested watch event. 89 StopWatch(watch_id int) 90} 91 92// events provides an implementation for the EventManager interface. 93type events struct { 94 // eventStore holds the events by event type. 95 eventStore map[info.EventType]*utils.TimedStore 96 // map of registered watchers keyed by watch id. 97 watchers map[int]*watch 98 // lock guarding the eventStore. 99 eventsLock sync.RWMutex 100 // lock guarding watchers. 101 watcherLock sync.RWMutex 102 // last allocated watch id. 103 lastId int 104 // Event storage policy. 105 storagePolicy StoragePolicy 106} 107 108// initialized by a call to WatchEvents(), a watch struct will then be added 109// to the events slice of *watch objects. When AddEvent() finds an event that 110// satisfies the request parameter of a watch object in events.watchers, 111// it will send that event out over the watch object's channel. The caller that 112// called WatchEvents will receive the event over the channel provided to 113// WatchEvents 114type watch struct { 115 // request parameters passed in by the caller of WatchEvents() 116 request *Request 117 // a channel used to send event back to the caller. 118 eventChannel *EventChannel 119} 120 121func NewEventChannel(watchId int) *EventChannel { 122 return &EventChannel{ 123 watchId: watchId, 124 channel: make(chan *info.Event, 10), 125 } 126} 127 128// Policy specifying how many events to store. 129// MaxAge is the max duration for which to keep events. 130// MaxNumEvents is the max number of events to keep (-1 for no limit). 131type StoragePolicy struct { 132 // Defaults limites, used if a per-event limit is not set. 133 DefaultMaxAge time.Duration 134 DefaultMaxNumEvents int 135 136 // Per-event type limits. 137 PerTypeMaxAge map[info.EventType]time.Duration 138 PerTypeMaxNumEvents map[info.EventType]int 139} 140 141func DefaultStoragePolicy() StoragePolicy { 142 return StoragePolicy{ 143 DefaultMaxAge: 24 * time.Hour, 144 DefaultMaxNumEvents: 100000, 145 PerTypeMaxAge: make(map[info.EventType]time.Duration), 146 PerTypeMaxNumEvents: make(map[info.EventType]int), 147 } 148} 149 150// returns a pointer to an initialized Events object. 151func NewEventManager(storagePolicy StoragePolicy) *events { 152 return &events{ 153 eventStore: make(map[info.EventType]*utils.TimedStore, 0), 154 watchers: make(map[int]*watch), 155 storagePolicy: storagePolicy, 156 } 157} 158 159// returns a pointer to an initialized Request object 160func NewRequest() *Request { 161 return &Request{ 162 EventType: map[info.EventType]bool{}, 163 IncludeSubcontainers: false, 164 MaxEventsReturned: 10, 165 } 166} 167 168// returns a pointer to an initialized watch object 169func newWatch(request *Request, eventChannel *EventChannel) *watch { 170 return &watch{ 171 request: request, 172 eventChannel: eventChannel, 173 } 174} 175 176func (self *EventChannel) GetChannel() chan *info.Event { 177 return self.channel 178} 179 180func (self *EventChannel) GetWatchId() int { 181 return self.watchId 182} 183 184// sorts and returns up to the last MaxEventsReturned chronological elements 185func getMaxEventsReturned(request *Request, eSlice []*info.Event) []*info.Event { 186 sort.Sort(byTimestamp(eSlice)) 187 n := request.MaxEventsReturned 188 if n >= len(eSlice) || n <= 0 { 189 return eSlice 190 } 191 return eSlice[len(eSlice)-n:] 192} 193 194// If the request wants all subcontainers, this returns if the request's 195// container path is a prefix of the event container path. Otherwise, 196// it checks that the container paths of the event and request are 197// equivalent 198func checkIfIsSubcontainer(request *Request, event *info.Event) bool { 199 if request.IncludeSubcontainers == true { 200 return request.ContainerName == "/" || strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/") 201 } 202 return event.ContainerName == request.ContainerName 203} 204 205// determines if an event occurs within the time set in the request object and is the right type 206func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool { 207 startTime := request.StartTime 208 endTime := request.EndTime 209 eventTime := event.Timestamp 210 if !startTime.IsZero() { 211 if startTime.After(eventTime) { 212 return false 213 } 214 } 215 if !endTime.IsZero() { 216 if endTime.Before(eventTime) { 217 return false 218 } 219 } 220 if !request.EventType[event.EventType] { 221 return false 222 } 223 if request.ContainerName != "" { 224 return checkIfIsSubcontainer(request, event) 225 } 226 return true 227} 228 229// method of Events object that screens Event objects found in the eventStore 230// attribute and if they fit the parameters passed by the Request object, 231// adds it to a slice of *Event objects that is returned. If both MaxEventsReturned 232// and StartTime/EndTime are specified in the request object, then only 233// up to the most recent MaxEventsReturned events in that time range are returned. 234func (self *events) GetEvents(request *Request) ([]*info.Event, error) { 235 returnEventList := []*info.Event{} 236 self.eventsLock.RLock() 237 defer self.eventsLock.RUnlock() 238 for eventType, fetch := range request.EventType { 239 if !fetch { 240 continue 241 } 242 evs, ok := self.eventStore[eventType] 243 if !ok { 244 continue 245 } 246 247 res := evs.InTimeRange(request.StartTime, request.EndTime, request.MaxEventsReturned) 248 for _, in := range res { 249 e := in.(*info.Event) 250 if checkIfEventSatisfiesRequest(request, e) { 251 returnEventList = append(returnEventList, e) 252 } 253 } 254 } 255 returnEventList = getMaxEventsReturned(request, returnEventList) 256 return returnEventList, nil 257} 258 259// method of Events object that maintains an *Event channel passed by the user. 260// When an event is added by AddEvents that satisfies the parameters in the passed 261// Request object it is fed to the channel. The StartTime and EndTime of the watch 262// request should be uninitialized because the purpose is to watch indefinitely 263// for events that will happen in the future 264func (self *events) WatchEvents(request *Request) (*EventChannel, error) { 265 if !request.StartTime.IsZero() || !request.EndTime.IsZero() { 266 return nil, errors.New( 267 "for a call to watch, request.StartTime and request.EndTime must be uninitialized") 268 } 269 self.watcherLock.Lock() 270 defer self.watcherLock.Unlock() 271 new_id := self.lastId + 1 272 returnEventChannel := NewEventChannel(new_id) 273 newWatcher := newWatch(request, returnEventChannel) 274 self.watchers[new_id] = newWatcher 275 self.lastId = new_id 276 return returnEventChannel, nil 277} 278 279// helper function to update the event manager's eventStore 280func (self *events) updateEventStore(e *info.Event) { 281 self.eventsLock.Lock() 282 defer self.eventsLock.Unlock() 283 if _, ok := self.eventStore[e.EventType]; !ok { 284 maxAge := self.storagePolicy.DefaultMaxAge 285 maxNumEvents := self.storagePolicy.DefaultMaxNumEvents 286 if age, ok := self.storagePolicy.PerTypeMaxAge[e.EventType]; ok { 287 maxAge = age 288 } 289 if numEvents, ok := self.storagePolicy.PerTypeMaxNumEvents[e.EventType]; ok { 290 maxNumEvents = numEvents 291 } 292 293 self.eventStore[e.EventType] = utils.NewTimedStore(maxAge, maxNumEvents) 294 } 295 self.eventStore[e.EventType].Add(e.Timestamp, e) 296} 297 298func (self *events) findValidWatchers(e *info.Event) []*watch { 299 watchesToSend := make([]*watch, 0) 300 for _, watcher := range self.watchers { 301 watchRequest := watcher.request 302 if checkIfEventSatisfiesRequest(watchRequest, e) { 303 watchesToSend = append(watchesToSend, watcher) 304 } 305 } 306 return watchesToSend 307} 308 309// method of Events object that adds the argument Event object to the 310// eventStore. It also feeds the event to a set of watch channels 311// held by the manager if it satisfies the request keys of the channels 312func (self *events) AddEvent(e *info.Event) error { 313 self.updateEventStore(e) 314 self.watcherLock.RLock() 315 defer self.watcherLock.RUnlock() 316 watchesToSend := self.findValidWatchers(e) 317 for _, watchObject := range watchesToSend { 318 watchObject.eventChannel.GetChannel() <- e 319 } 320 glog.V(4).Infof("Added event %v", e) 321 return nil 322} 323 324// Removes a watch instance from the EventManager's watchers map 325func (self *events) StopWatch(watchId int) { 326 self.watcherLock.Lock() 327 defer self.watcherLock.Unlock() 328 _, ok := self.watchers[watchId] 329 if !ok { 330 glog.Errorf("Could not find watcher instance %v", watchId) 331 } 332 close(self.watchers[watchId].eventChannel.GetChannel()) 333 delete(self.watchers, watchId) 334} 335