1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package bundler supports bundling (batching) of items. Bundling amortizes an
16// action with fixed costs over multiple items. For example, if an API provides
17// an RPC that accepts a list of items as input, but clients would prefer
18// adding items one at a time, then a Bundler can accept individual items from
19// the client and bundle many of them into a single RPC.
20//
21// This package is experimental and subject to change without notice.
22package bundler
23
24import (
25	"context"
26	"errors"
27	"math"
28	"reflect"
29	"sync"
30	"time"
31
32	"golang.org/x/sync/semaphore"
33)
34
35const (
36	DefaultDelayThreshold       = time.Second
37	DefaultBundleCountThreshold = 10
38	DefaultBundleByteThreshold  = 1e6 // 1M
39	DefaultBufferedByteLimit    = 1e9 // 1G
40)
41
42var (
43	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
44	ErrOverflow = errors.New("bundler reached buffered byte limit")
45
46	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
47	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
48)
49
50// A Bundler collects items added to it into a bundle until the bundle
51// exceeds a given size, then calls a user-provided function to handle the bundle.
52type Bundler struct {
53	// Starting from the time that the first message is added to a bundle, once
54	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
55	DelayThreshold time.Duration
56
57	// Once a bundle has this many items, handle the bundle. Since only one
58	// item at a time is added to a bundle, no bundle will exceed this
59	// threshold, so it also serves as a limit. The default is
60	// DefaultBundleCountThreshold.
61	BundleCountThreshold int
62
63	// Once the number of bytes in current bundle reaches this threshold, handle
64	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
65	// but does not cap the total size of a bundle.
66	BundleByteThreshold int
67
68	// The maximum size of a bundle, in bytes. Zero means unlimited.
69	BundleByteLimit int
70
71	// The maximum number of bytes that the Bundler will keep in memory before
72	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
73	BufferedByteLimit int
74
75	// The maximum number of handler invocations that can be running at once.
76	// The default is 1.
77	HandlerLimit int
78
79	handler       func(interface{}) // called to handle a bundle
80	itemSliceZero reflect.Value     // nil (zero value) for slice of items
81	flushTimer    *time.Timer       // implements DelayThreshold
82
83	mu        sync.Mutex
84	sem       *semaphore.Weighted // enforces BufferedByteLimit
85	semOnce   sync.Once
86	curBundle bundle // incoming items added to this bundle
87
88	// Each bundle is assigned a unique ticket that determines the order in which the
89	// handler is called. The ticket is assigned with mu locked, but waiting for tickets
90	// to be handled is done via mu2 and cond, below.
91	nextTicket uint64 // next ticket to be assigned
92
93	mu2         sync.Mutex
94	cond        *sync.Cond
95	nextHandled uint64 // next ticket to be handled
96
97	// In this implementation, active uses space proportional to HandlerLimit, and
98	// waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
99	// or release occurs, so large values of HandlerLimit max may cause performance
100	// issues.
101	active map[uint64]bool // tickets of bundles actively being handled
102}
103
104type bundle struct {
105	items reflect.Value // slice of item type
106	size  int           // size in bytes of all items
107}
108
109// NewBundler creates a new Bundler.
110//
111// itemExample is a value of the type that will be bundled. For example, if you
112// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
113//
114// handler is a function that will be called on each bundle. If itemExample is
115// of type T, the argument to handler is of type []T. handler is always called
116// sequentially for each bundle, and never in parallel.
117//
118// Configure the Bundler by setting its thresholds and limits before calling
119// any of its methods.
120func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
121	b := &Bundler{
122		DelayThreshold:       DefaultDelayThreshold,
123		BundleCountThreshold: DefaultBundleCountThreshold,
124		BundleByteThreshold:  DefaultBundleByteThreshold,
125		BufferedByteLimit:    DefaultBufferedByteLimit,
126		HandlerLimit:         1,
127
128		handler:       handler,
129		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
130		active:        map[uint64]bool{},
131	}
132	b.curBundle.items = b.itemSliceZero
133	b.cond = sync.NewCond(&b.mu2)
134	return b
135}
136
137func (b *Bundler) initSemaphores() {
138	// Create the semaphores lazily, because the user may set limits
139	// after NewBundler.
140	b.semOnce.Do(func() {
141		b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
142	})
143}
144
145// Add adds item to the current bundle. It marks the bundle for handling and
146// starts a new one if any of the thresholds or limits are exceeded.
147//
148// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
149// the item can never be handled. Add returns ErrOversizedItem in this case.
150//
151// If adding the item would exceed the maximum memory allowed
152// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
153// memory, Add returns ErrOverflow.
154//
155// Add never blocks.
156func (b *Bundler) Add(item interface{}, size int) error {
157	// If this item exceeds the maximum size of a bundle,
158	// we can never send it.
159	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
160		return ErrOversizedItem
161	}
162	// If adding this item would exceed our allotted memory
163	// footprint, we can't accept it.
164	// (TryAcquire also returns false if anything is waiting on the semaphore,
165	// so calls to Add and AddWait shouldn't be mixed.)
166	b.initSemaphores()
167	if !b.sem.TryAcquire(int64(size)) {
168		return ErrOverflow
169	}
170	b.add(item, size)
171	return nil
172}
173
174// add adds item to the current bundle. It marks the bundle for handling and
175// starts a new one if any of the thresholds or limits are exceeded.
176func (b *Bundler) add(item interface{}, size int) {
177	b.mu.Lock()
178	defer b.mu.Unlock()
179
180	// If adding this item to the current bundle would cause it to exceed the
181	// maximum bundle size, close the current bundle and start a new one.
182	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
183		b.startFlushLocked()
184	}
185	// Add the item.
186	b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
187	b.curBundle.size += size
188
189	// Start a timer to flush the item if one isn't already running.
190	// startFlushLocked clears the timer and closes the bundle at the same time,
191	// so we only allocate a new timer for the first item in each bundle.
192	// (We could try to call Reset on the timer instead, but that would add a lot
193	// of complexity to the code just to save one small allocation.)
194	if b.flushTimer == nil {
195		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush)
196	}
197
198	// If the current bundle equals the count threshold, close it.
199	if b.curBundle.items.Len() == b.BundleCountThreshold {
200		b.startFlushLocked()
201	}
202	// If the current bundle equals or exceeds the byte threshold, close it.
203	if b.curBundle.size >= b.BundleByteThreshold {
204		b.startFlushLocked()
205	}
206}
207
208// AddWait adds item to the current bundle. It marks the bundle for handling and
209// starts a new one if any of the thresholds or limits are exceeded.
210//
211// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
212// the item can never be handled. AddWait returns ErrOversizedItem in this case.
213//
214// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
215// AddWait blocks until space is available or ctx is done.
216//
217// Calls to Add and AddWait should not be mixed on the same Bundler.
218func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
219	// If this item exceeds the maximum size of a bundle,
220	// we can never send it.
221	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
222		return ErrOversizedItem
223	}
224	// If adding this item would exceed our allotted memory footprint, block
225	// until space is available. The semaphore is FIFO, so there will be no
226	// starvation.
227	b.initSemaphores()
228	if err := b.sem.Acquire(ctx, int64(size)); err != nil {
229		return err
230	}
231	// Here, we've reserved space for item. Other goroutines can call AddWait
232	// and even acquire space, but no one can take away our reservation
233	// (assuming sem.Release is used correctly). So there is no race condition
234	// resulting from locking the mutex after sem.Acquire returns.
235	b.add(item, size)
236	return nil
237}
238
239// Flush invokes the handler for all remaining items in the Bundler and waits
240// for it to return.
241func (b *Bundler) Flush() {
242	b.mu.Lock()
243	b.startFlushLocked()
244	// Here, all bundles with tickets < b.nextTicket are
245	// either finished or active. Those are the ones
246	// we want to wait for.
247	t := b.nextTicket
248	b.mu.Unlock()
249	b.initSemaphores()
250	b.waitUntilAllHandled(t)
251}
252
253func (b *Bundler) startFlushLocked() {
254	if b.flushTimer != nil {
255		b.flushTimer.Stop()
256		b.flushTimer = nil
257	}
258	if b.curBundle.items.Len() == 0 {
259		return
260	}
261	// Here, both semaphores must have been initialized.
262	bun := b.curBundle
263	b.curBundle = bundle{items: b.itemSliceZero}
264	ticket := b.nextTicket
265	b.nextTicket++
266	go func() {
267		defer func() {
268			b.sem.Release(int64(bun.size))
269			b.release(ticket)
270		}()
271		b.acquire(ticket)
272		b.handler(bun.items.Interface())
273	}()
274}
275
276// acquire blocks until ticket is the next to be served, then returns. In order for N
277// acquire calls to return, the tickets must be in the range [0, N). A ticket must
278// not be presented to acquire more than once.
279func (b *Bundler) acquire(ticket uint64) {
280	b.mu2.Lock()
281	defer b.mu2.Unlock()
282	if ticket < b.nextHandled {
283		panic("bundler: acquire: arg too small")
284	}
285	for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
286		b.cond.Wait()
287	}
288	// Here,
289	// ticket == b.nextHandled: the caller is the next one to be handled;
290	// and len(b.active) < b.HandlerLimit: there is space available.
291	b.active[ticket] = true
292	b.nextHandled++
293	// Broadcast, not Signal: although at most one acquire waiter can make progress,
294	// there might be waiters in waitUntilAllHandled.
295	b.cond.Broadcast()
296}
297
298// If a ticket is used for a call to acquire, it must later be passed to release. A
299// ticket must not be presented to release more than once.
300func (b *Bundler) release(ticket uint64) {
301	b.mu2.Lock()
302	defer b.mu2.Unlock()
303	if !b.active[ticket] {
304		panic("bundler: release: not an active ticket")
305	}
306	delete(b.active, ticket)
307	b.cond.Broadcast()
308}
309
310// waitUntilAllHandled blocks until all tickets < n have called release, meaning
311// all bundles with tickets < n have been handled.
312func (b *Bundler) waitUntilAllHandled(n uint64) {
313	// Proof of correctness of this function.
314	// "N is acquired" means acquire(N) has returned.
315	// "N is released" means release(N) has returned.
316	// 1. If N is acquired, N-1 is acquired.
317	//    Follows from the loop test in acquire, and the fact
318	//    that nextHandled is incremented by 1.
319	// 2. If nextHandled >= N, then N-1 is acquired.
320	//    Because we only increment nextHandled to N after N-1 is acquired.
321	// 3. If nextHandled >= N, then all n < N is acquired.
322	//    Follows from #1 and #2.
323	// 4. If N is acquired and N is not in active, then N is released.
324	//    Because we put N in active before acquire returns, and only
325	//    remove it when it is released.
326	// Let min(active) be the smallest member of active, or infinity if active is empty.
327	// 5. If nextHandled >= N and N <= min(active), then all n < N is released.
328	//    From nextHandled >= N and #3, all n < N is acquired.
329	//    N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
330	//    So from #4, all n < N is released.
331	// The loop test below is the antecedent of #5.
332	b.mu2.Lock()
333	defer b.mu2.Unlock()
334	for !(b.nextHandled >= n && n <= min(b.active)) {
335		b.cond.Wait()
336	}
337}
338
339// min returns the minimum value of the set s, or the largest uint64 if
340// s is empty.
341func min(s map[uint64]bool) uint64 {
342	var m uint64 = math.MaxUint64
343	for n := range s {
344		if n < m {
345			m = n
346		}
347	}
348	return m
349}
350