1// Package batcher implements the batching resiliency pattern for Go.
2package batcher
3
4import (
5	"sync"
6	"time"
7)
8
9type work struct {
10	param  interface{}
11	future chan error
12}
13
14// Batcher implements the batching resiliency pattern
15type Batcher struct {
16	timeout   time.Duration
17	prefilter func(interface{}) error
18
19	lock   sync.Mutex
20	submit chan *work
21	doWork func([]interface{}) error
22}
23
24// New constructs a new batcher that will batch all calls to Run that occur within
25// `timeout` time before calling doWork just once for the entire batch. The doWork
26// function must be safe to run concurrently with itself as this may occur, especially
27// when the timeout is small.
28func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
29	return &Batcher{
30		timeout: timeout,
31		doWork:  doWork,
32	}
33}
34
35// Run runs the work function with the given parameter, possibly
36// including it in a batch with other calls to Run that occur within the
37// specified timeout. It is safe to call Run concurrently on the same batcher.
38func (b *Batcher) Run(param interface{}) error {
39	if b.prefilter != nil {
40		if err := b.prefilter(param); err != nil {
41			return err
42		}
43	}
44
45	if b.timeout == 0 {
46		return b.doWork([]interface{}{param})
47	}
48
49	w := &work{
50		param:  param,
51		future: make(chan error, 1),
52	}
53
54	b.submitWork(w)
55
56	return <-w.future
57}
58
59// Prefilter specifies an optional function that can be used to run initial checks on parameters
60// passed to Run before being added to the batch. If the prefilter returns a non-nil error,
61// that error is returned immediately from Run and the batcher is not invoked. A prefilter
62// cannot safely be specified for a batcher if Run has already been invoked. The filter function
63// specified must be concurrency-safe.
64func (b *Batcher) Prefilter(filter func(interface{}) error) {
65	b.prefilter = filter
66}
67
68func (b *Batcher) submitWork(w *work) {
69	b.lock.Lock()
70	defer b.lock.Unlock()
71
72	if b.submit == nil {
73		b.submit = make(chan *work, 4)
74		go b.batch()
75	}
76
77	b.submit <- w
78}
79
80func (b *Batcher) batch() {
81	var params []interface{}
82	var futures []chan error
83	input := b.submit
84
85	go b.timer()
86
87	for work := range input {
88		params = append(params, work.param)
89		futures = append(futures, work.future)
90	}
91
92	ret := b.doWork(params)
93
94	for _, future := range futures {
95		future <- ret
96		close(future)
97	}
98}
99
100func (b *Batcher) timer() {
101	time.Sleep(b.timeout)
102
103	b.lock.Lock()
104	defer b.lock.Unlock()
105
106	close(b.submit)
107	b.submit = nil
108}
109