1package queue 2 3import ( 4 "container/list" 5 "fmt" 6 "sync" 7 8 "github.com/docker/go-events" 9 "github.com/sirupsen/logrus" 10) 11 12// ErrQueueFull is returned by a Write operation when that Write causes the 13// queue to reach its size limit. 14var ErrQueueFull = fmt.Errorf("queue closed due to size limit") 15 16// LimitQueue accepts all messages into a queue for asynchronous consumption by 17// a sink until an upper limit of messages is reached. When that limit is 18// reached, the entire Queue is Closed. It is thread safe but the 19// sink must be reliable or events will be dropped. 20// If a size of 0 is provided, the LimitQueue is considered limitless. 21type LimitQueue struct { 22 dst events.Sink 23 events *list.List 24 limit uint64 25 cond *sync.Cond 26 mu sync.Mutex 27 closed bool 28 full chan struct{} 29 fullClosed bool 30} 31 32// NewLimitQueue returns a queue to the provided Sink dst. 33func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue { 34 eq := LimitQueue{ 35 dst: dst, 36 events: list.New(), 37 limit: limit, 38 full: make(chan struct{}), 39 } 40 41 eq.cond = sync.NewCond(&eq.mu) 42 go eq.run() 43 return &eq 44} 45 46// Write accepts the events into the queue, only failing if the queue has 47// been closed or has reached its size limit. 48func (eq *LimitQueue) Write(event events.Event) error { 49 eq.mu.Lock() 50 defer eq.mu.Unlock() 51 52 if eq.closed { 53 return events.ErrSinkClosed 54 } 55 56 if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit { 57 // If the limit has been reached, don't write the event to the queue, 58 // and close the Full channel. This notifies listeners that the queue 59 // is now full, but the sink is still permitted to consume events. It's 60 // the responsibility of the listener to decide whether they want to 61 // live with dropped events or whether they want to Close() the 62 // LimitQueue 63 if !eq.fullClosed { 64 eq.fullClosed = true 65 close(eq.full) 66 } 67 return ErrQueueFull 68 } 69 70 eq.events.PushBack(event) 71 eq.cond.Signal() // signal waiters 72 73 return nil 74} 75 76// Full returns a channel that is closed when the queue becomes full for the 77// first time. 78func (eq *LimitQueue) Full() chan struct{} { 79 return eq.full 80} 81 82// Close shuts down the event queue, flushing all events 83func (eq *LimitQueue) Close() error { 84 eq.mu.Lock() 85 defer eq.mu.Unlock() 86 87 if eq.closed { 88 return nil 89 } 90 91 // set the closed flag 92 eq.closed = true 93 eq.cond.Signal() // signal flushes queue 94 eq.cond.Wait() // wait for signal from last flush 95 return eq.dst.Close() 96} 97 98// run is the main goroutine to flush events to the target sink. 99func (eq *LimitQueue) run() { 100 for { 101 event := eq.next() 102 103 if event == nil { 104 return // nil block means event queue is closed. 105 } 106 107 if err := eq.dst.Write(event); err != nil { 108 // TODO(aaronl): Dropping events could be bad depending 109 // on the application. We should have a way of 110 // communicating this condition. However, logging 111 // at a log level above debug may not be appropriate. 112 // Eventually, go-events should not use logrus at all, 113 // and should bubble up conditions like this through 114 // error values. 115 logrus.WithFields(logrus.Fields{ 116 "event": event, 117 "sink": eq.dst, 118 }).WithError(err).Debug("eventqueue: dropped event") 119 } 120 } 121} 122 123// Len returns the number of items that are currently stored in the queue and 124// not consumed by its sink. 125func (eq *LimitQueue) Len() int { 126 eq.mu.Lock() 127 defer eq.mu.Unlock() 128 return eq.events.Len() 129} 130 131func (eq *LimitQueue) String() string { 132 eq.mu.Lock() 133 defer eq.mu.Unlock() 134 return fmt.Sprintf("%v", eq.events) 135} 136 137// next encompasses the critical section of the run loop. When the queue is 138// empty, it will block on the condition. If new data arrives, it will wake 139// and return a block. When closed, a nil slice will be returned. 140func (eq *LimitQueue) next() events.Event { 141 eq.mu.Lock() 142 defer eq.mu.Unlock() 143 144 for eq.events.Len() < 1 { 145 if eq.closed { 146 eq.cond.Broadcast() 147 return nil 148 } 149 150 eq.cond.Wait() 151 } 152 153 front := eq.events.Front() 154 block := front.Value.(events.Event) 155 eq.events.Remove(front) 156 157 return block 158} 159