1package stream
2
3import (
4	"sync"
5)
6
7func newThreshold(cutoff int) *threshold {
8	t := &threshold{
9		threshold: cutoff,
10	}
11	t.cond.L = &t.mu
12	return t
13}
14
15type threshold struct {
16	mu   sync.Mutex
17	cond sync.Cond
18
19	count     int
20	threshold int
21}
22
23// Acquire increments the counter. It will not block.
24func (t *threshold) Acquire() {
25	t.mu.Lock()
26	t.count++
27	t.mu.Unlock()
28}
29
30// Release decrements the counter.
31func (t *threshold) Release() {
32	t.mu.Lock()
33	if t.count == 0 {
34		panic("negative count")
35	}
36	if t.threshold == t.count {
37		t.cond.Broadcast()
38	}
39	t.count--
40	t.mu.Unlock()
41}
42
43// Wait waits for the counter to drop below the threshold
44func (t *threshold) Wait() {
45	t.mu.Lock()
46	for t.count >= t.threshold {
47		t.cond.Wait()
48	}
49	t.mu.Unlock()
50}
51