1package events // import "github.com/docker/docker/daemon/events"
2
3import (
4	"sync"
5	"time"
6
7	eventtypes "github.com/docker/docker/api/types/events"
8	"github.com/docker/docker/pkg/pubsub"
9)
10
11const (
12	eventsLimit = 256
13	bufferSize  = 1024
14)
15
16// Events is pubsub channel for events generated by the engine.
17type Events struct {
18	mu     sync.Mutex
19	events []eventtypes.Message
20	pub    *pubsub.Publisher
21}
22
23// New returns new *Events instance
24func New() *Events {
25	return &Events{
26		events: make([]eventtypes.Message, 0, eventsLimit),
27		pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
28	}
29}
30
31// Subscribe adds new listener to events, returns slice of 256 stored
32// last events, a channel in which you can expect new events (in form
33// of interface{}, so you need type assertion), and a function to call
34// to stop the stream of events.
35func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
36	eventSubscribers.Inc()
37	e.mu.Lock()
38	current := make([]eventtypes.Message, len(e.events))
39	copy(current, e.events)
40	l := e.pub.Subscribe()
41	e.mu.Unlock()
42
43	cancel := func() {
44		e.Evict(l)
45	}
46	return current, l, cancel
47}
48
49// SubscribeTopic adds new listener to events, returns slice of 256 stored
50// last events, a channel in which you can expect new events (in form
51// of interface{}, so you need type assertion).
52func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
53	eventSubscribers.Inc()
54	e.mu.Lock()
55
56	var topic func(m interface{}) bool
57	if ef != nil && ef.filter.Len() > 0 {
58		topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
59	}
60
61	buffered := e.loadBufferedEvents(since, until, topic)
62
63	var ch chan interface{}
64	if topic != nil {
65		ch = e.pub.SubscribeTopic(topic)
66	} else {
67		// Subscribe to all events if there are no filters
68		ch = e.pub.Subscribe()
69	}
70
71	e.mu.Unlock()
72	return buffered, ch
73}
74
75// Evict evicts listener from pubsub
76func (e *Events) Evict(l chan interface{}) {
77	eventSubscribers.Dec()
78	e.pub.Evict(l)
79}
80
81// Log creates a local scope message and publishes it
82func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
83	now := time.Now().UTC()
84	jm := eventtypes.Message{
85		Action:   action,
86		Type:     eventType,
87		Actor:    actor,
88		Scope:    "local",
89		Time:     now.Unix(),
90		TimeNano: now.UnixNano(),
91	}
92
93	// fill deprecated fields for container and images
94	switch eventType {
95	case eventtypes.ContainerEventType:
96		jm.ID = actor.ID
97		jm.Status = action
98		jm.From = actor.Attributes["image"]
99	case eventtypes.ImageEventType:
100		jm.ID = actor.ID
101		jm.Status = action
102	}
103
104	e.PublishMessage(jm)
105}
106
107// PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
108// receive the event or it will be skipped.
109func (e *Events) PublishMessage(jm eventtypes.Message) {
110	eventsCounter.Inc()
111
112	e.mu.Lock()
113	if len(e.events) == cap(e.events) {
114		// discard oldest event
115		copy(e.events, e.events[1:])
116		e.events[len(e.events)-1] = jm
117	} else {
118		e.events = append(e.events, jm)
119	}
120	e.mu.Unlock()
121	e.pub.Publish(jm)
122}
123
124// SubscribersCount returns number of event listeners
125func (e *Events) SubscribersCount() int {
126	return e.pub.Len()
127}
128
129// loadBufferedEvents iterates over the cached events in the buffer
130// and returns those that were emitted between two specific dates.
131// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
132// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
133func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
134	var buffered []eventtypes.Message
135	if since.IsZero() && until.IsZero() {
136		return buffered
137	}
138
139	var sinceNanoUnix int64
140	if !since.IsZero() {
141		sinceNanoUnix = since.UnixNano()
142	}
143
144	var untilNanoUnix int64
145	if !until.IsZero() {
146		untilNanoUnix = until.UnixNano()
147	}
148
149	for i := len(e.events) - 1; i >= 0; i-- {
150		ev := e.events[i]
151
152		if ev.TimeNano < sinceNanoUnix {
153			break
154		}
155
156		if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
157			continue
158		}
159
160		if topic == nil || topic(ev) {
161			buffered = append([]eventtypes.Message{ev}, buffered...)
162		}
163	}
164	return buffered
165}
166