1// Package batcher implements the batching resiliency pattern for Go. 2package batcher 3 4import ( 5 "sync" 6 "time" 7) 8 9type work struct { 10 param interface{} 11 future chan error 12} 13 14// Batcher implements the batching resiliency pattern 15type Batcher struct { 16 timeout time.Duration 17 prefilter func(interface{}) error 18 19 lock sync.Mutex 20 submit chan *work 21 doWork func([]interface{}) error 22} 23 24// New constructs a new batcher that will batch all calls to Run that occur within 25// `timeout` time before calling doWork just once for the entire batch. The doWork 26// function must be safe to run concurrently with itself as this may occur, especially 27// when the timeout is small. 28func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher { 29 return &Batcher{ 30 timeout: timeout, 31 doWork: doWork, 32 } 33} 34 35// Run runs the work function with the given parameter, possibly 36// including it in a batch with other calls to Run that occur within the 37// specified timeout. It is safe to call Run concurrently on the same batcher. 38func (b *Batcher) Run(param interface{}) error { 39 if b.prefilter != nil { 40 if err := b.prefilter(param); err != nil { 41 return err 42 } 43 } 44 45 if b.timeout == 0 { 46 return b.doWork([]interface{}{param}) 47 } 48 49 w := &work{ 50 param: param, 51 future: make(chan error, 1), 52 } 53 54 b.submitWork(w) 55 56 return <-w.future 57} 58 59// Prefilter specifies an optional function that can be used to run initial checks on parameters 60// passed to Run before being added to the batch. If the prefilter returns a non-nil error, 61// that error is returned immediately from Run and the batcher is not invoked. A prefilter 62// cannot safely be specified for a batcher if Run has already been invoked. The filter function 63// specified must be concurrency-safe. 64func (b *Batcher) Prefilter(filter func(interface{}) error) { 65 b.prefilter = filter 66} 67 68func (b *Batcher) submitWork(w *work) { 69 b.lock.Lock() 70 defer b.lock.Unlock() 71 72 if b.submit == nil { 73 b.submit = make(chan *work, 4) 74 go b.batch() 75 } 76 77 b.submit <- w 78} 79 80func (b *Batcher) batch() { 81 var params []interface{} 82 var futures []chan error 83 input := b.submit 84 85 go b.timer() 86 87 for work := range input { 88 params = append(params, work.param) 89 futures = append(futures, work.future) 90 } 91 92 ret := b.doWork(params) 93 94 for _, future := range futures { 95 future <- ret 96 close(future) 97 } 98} 99 100func (b *Batcher) timer() { 101 time.Sleep(b.timeout) 102 103 b.lock.Lock() 104 defer b.lock.Unlock() 105 106 close(b.submit) 107 b.submit = nil 108} 109