1package queue
2
3import (
4	"container/list"
5	"fmt"
6	"sync"
7
8	"github.com/docker/go-events"
9	"github.com/sirupsen/logrus"
10)
11
12// ErrQueueFull is returned by a Write operation when that Write causes the
13// queue to reach its size limit.
14var ErrQueueFull = fmt.Errorf("queue closed due to size limit")
15
16// LimitQueue accepts all messages into a queue for asynchronous consumption by
17// a sink until an upper limit of messages is reached. When that limit is
18// reached, the entire Queue is Closed. It is thread safe but the
19// sink must be reliable or events will be dropped.
20// If a size of 0 is provided, the LimitQueue is considered limitless.
21type LimitQueue struct {
22	dst        events.Sink
23	events     *list.List
24	limit      uint64
25	cond       *sync.Cond
26	mu         sync.Mutex
27	closed     bool
28	full       chan struct{}
29	fullClosed bool
30}
31
32// NewLimitQueue returns a queue to the provided Sink dst.
33func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
34	eq := LimitQueue{
35		dst:    dst,
36		events: list.New(),
37		limit:  limit,
38		full:   make(chan struct{}),
39	}
40
41	eq.cond = sync.NewCond(&eq.mu)
42	go eq.run()
43	return &eq
44}
45
46// Write accepts the events into the queue, only failing if the queue has
47// been closed or has reached its size limit.
48func (eq *LimitQueue) Write(event events.Event) error {
49	eq.mu.Lock()
50	defer eq.mu.Unlock()
51
52	if eq.closed {
53		return events.ErrSinkClosed
54	}
55
56	if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
57		// If the limit has been reached, don't write the event to the queue,
58		// and close the Full channel. This notifies listeners that the queue
59		// is now full, but the sink is still permitted to consume events. It's
60		// the responsibility of the listener to decide whether they want to
61		// live with dropped events or whether they want to Close() the
62		// LimitQueue
63		if !eq.fullClosed {
64			eq.fullClosed = true
65			close(eq.full)
66		}
67		return ErrQueueFull
68	}
69
70	eq.events.PushBack(event)
71	eq.cond.Signal() // signal waiters
72
73	return nil
74}
75
76// Full returns a channel that is closed when the queue becomes full for the
77// first time.
78func (eq *LimitQueue) Full() chan struct{} {
79	return eq.full
80}
81
82// Close shuts down the event queue, flushing all events
83func (eq *LimitQueue) Close() error {
84	eq.mu.Lock()
85	defer eq.mu.Unlock()
86
87	if eq.closed {
88		return nil
89	}
90
91	// set the closed flag
92	eq.closed = true
93	eq.cond.Signal() // signal flushes queue
94	eq.cond.Wait()   // wait for signal from last flush
95	return eq.dst.Close()
96}
97
98// run is the main goroutine to flush events to the target sink.
99func (eq *LimitQueue) run() {
100	for {
101		event := eq.next()
102
103		if event == nil {
104			return // nil block means event queue is closed.
105		}
106
107		if err := eq.dst.Write(event); err != nil {
108			// TODO(aaronl): Dropping events could be bad depending
109			// on the application. We should have a way of
110			// communicating this condition. However, logging
111			// at a log level above debug may not be appropriate.
112			// Eventually, go-events should not use logrus at all,
113			// and should bubble up conditions like this through
114			// error values.
115			logrus.WithFields(logrus.Fields{
116				"event": event,
117				"sink":  eq.dst,
118			}).WithError(err).Debug("eventqueue: dropped event")
119		}
120	}
121}
122
123// Len returns the number of items that are currently stored in the queue and
124// not consumed by its sink.
125func (eq *LimitQueue) Len() int {
126	eq.mu.Lock()
127	defer eq.mu.Unlock()
128	return eq.events.Len()
129}
130
131func (eq *LimitQueue) String() string {
132	eq.mu.Lock()
133	defer eq.mu.Unlock()
134	return fmt.Sprintf("%v", eq.events)
135}
136
137// next encompasses the critical section of the run loop. When the queue is
138// empty, it will block on the condition. If new data arrives, it will wake
139// and return a block. When closed, a nil slice will be returned.
140func (eq *LimitQueue) next() events.Event {
141	eq.mu.Lock()
142	defer eq.mu.Unlock()
143
144	for eq.events.Len() < 1 {
145		if eq.closed {
146			eq.cond.Broadcast()
147			return nil
148		}
149
150		eq.cond.Wait()
151	}
152
153	front := eq.events.Front()
154	block := front.Value.(events.Event)
155	eq.events.Remove(front)
156
157	return block
158}
159