Lines Matching refs:eq
34 eq := LimitQueue{
41 eq.cond = sync.NewCond(&eq.mu)
42 go eq.run()
43 return &eq
48 func (eq *LimitQueue) Write(event events.Event) error {
49 eq.mu.Lock()
50 defer eq.mu.Unlock()
52 if eq.closed {
56 if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
63 if !eq.fullClosed {
64 eq.fullClosed = true
65 close(eq.full)
70 eq.events.PushBack(event)
71 eq.cond.Signal() // signal waiters
78 func (eq *LimitQueue) Full() chan struct{} {
79 return eq.full
83 func (eq *LimitQueue) Close() error {
84 eq.mu.Lock()
85 defer eq.mu.Unlock()
87 if eq.closed {
92 eq.closed = true
93 eq.cond.Signal() // signal flushes queue
94 eq.cond.Wait() // wait for signal from last flush
95 return eq.dst.Close()
99 func (eq *LimitQueue) run() {
101 event := eq.next()
107 if err := eq.dst.Write(event); err != nil {
117 "sink": eq.dst,
125 func (eq *LimitQueue) Len() int {
126 eq.mu.Lock()
127 defer eq.mu.Unlock()
128 return eq.events.Len()
131 func (eq *LimitQueue) String() string {
132 eq.mu.Lock()
133 defer eq.mu.Unlock()
134 return fmt.Sprintf("%v", eq.events)
140 func (eq *LimitQueue) next() events.Event {
141 eq.mu.Lock()
142 defer eq.mu.Unlock()
144 for eq.events.Len() < 1 {
145 if eq.closed {
146 eq.cond.Broadcast()
150 eq.cond.Wait()
153 front := eq.events.Front()
155 eq.events.Remove(front)