1package stream 2 3import ( 4 "sync" 5) 6 7func newThreshold(cutoff int) *threshold { 8 t := &threshold{ 9 threshold: cutoff, 10 } 11 t.cond.L = &t.mu 12 return t 13} 14 15type threshold struct { 16 mu sync.Mutex 17 cond sync.Cond 18 19 count int 20 threshold int 21} 22 23// Acquire increments the counter. It will not block. 24func (t *threshold) Acquire() { 25 t.mu.Lock() 26 t.count++ 27 t.mu.Unlock() 28} 29 30// Release decrements the counter. 31func (t *threshold) Release() { 32 t.mu.Lock() 33 if t.count == 0 { 34 panic("negative count") 35 } 36 if t.threshold == t.count { 37 t.cond.Broadcast() 38 } 39 t.count-- 40 t.mu.Unlock() 41} 42 43// Wait waits for the counter to drop below the threshold 44func (t *threshold) Wait() { 45 t.mu.Lock() 46 for t.count >= t.threshold { 47 t.cond.Wait() 48 } 49 t.mu.Unlock() 50} 51