1package pubsub // import "github.com/docker/docker/pkg/pubsub"
2
3import (
4	"sync"
5	"time"
6)
7
8var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
9
10// NewPublisher creates a new pub/sub publisher to broadcast messages.
11// The duration is used as the send timeout as to not block the publisher publishing
12// messages to other clients if one client is slow or unresponsive.
13// The buffer is used when creating new channels for subscribers.
14func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
15	return &Publisher{
16		buffer:      buffer,
17		timeout:     publishTimeout,
18		subscribers: make(map[subscriber]topicFunc),
19	}
20}
21
22type subscriber chan interface{}
23type topicFunc func(v interface{}) bool
24
25// Publisher is basic pub/sub structure. Allows to send events and subscribe
26// to them. Can be safely used from multiple goroutines.
27type Publisher struct {
28	m           sync.RWMutex
29	buffer      int
30	timeout     time.Duration
31	subscribers map[subscriber]topicFunc
32}
33
34// Len returns the number of subscribers for the publisher
35func (p *Publisher) Len() int {
36	p.m.RLock()
37	i := len(p.subscribers)
38	p.m.RUnlock()
39	return i
40}
41
42// Subscribe adds a new subscriber to the publisher returning the channel.
43func (p *Publisher) Subscribe() chan interface{} {
44	return p.SubscribeTopic(nil)
45}
46
47// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
48func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
49	ch := make(chan interface{}, p.buffer)
50	p.m.Lock()
51	p.subscribers[ch] = topic
52	p.m.Unlock()
53	return ch
54}
55
56// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
57// The returned channel has a buffer of the specified size.
58func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
59	ch := make(chan interface{}, buffer)
60	p.m.Lock()
61	p.subscribers[ch] = topic
62	p.m.Unlock()
63	return ch
64}
65
66// Evict removes the specified subscriber from receiving any more messages.
67func (p *Publisher) Evict(sub chan interface{}) {
68	p.m.Lock()
69	delete(p.subscribers, sub)
70	close(sub)
71	p.m.Unlock()
72}
73
74// Publish sends the data in v to all subscribers currently registered with the publisher.
75func (p *Publisher) Publish(v interface{}) {
76	p.m.RLock()
77	if len(p.subscribers) == 0 {
78		p.m.RUnlock()
79		return
80	}
81
82	wg := wgPool.Get().(*sync.WaitGroup)
83	for sub, topic := range p.subscribers {
84		wg.Add(1)
85		go p.sendTopic(sub, topic, v, wg)
86	}
87	wg.Wait()
88	wgPool.Put(wg)
89	p.m.RUnlock()
90}
91
92// Close closes the channels to all subscribers registered with the publisher.
93func (p *Publisher) Close() {
94	p.m.Lock()
95	for sub := range p.subscribers {
96		delete(p.subscribers, sub)
97		close(sub)
98	}
99	p.m.Unlock()
100}
101
102func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
103	defer wg.Done()
104	if topic != nil && !topic(v) {
105		return
106	}
107
108	// send under a select as to not block if the receiver is unavailable
109	if p.timeout > 0 {
110		select {
111		case sub <- v:
112		case <-time.After(p.timeout):
113		}
114		return
115	}
116
117	select {
118	case sub <- v:
119	default:
120	}
121}
122