1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package bundler supports bundling (batching) of items. Bundling amortizes an 16// action with fixed costs over multiple items. For example, if an API provides 17// an RPC that accepts a list of items as input, but clients would prefer 18// adding items one at a time, then a Bundler can accept individual items from 19// the client and bundle many of them into a single RPC. 20// 21// This package is experimental and subject to change without notice. 22package bundler 23 24import ( 25 "context" 26 "errors" 27 "math" 28 "reflect" 29 "sync" 30 "time" 31 32 "golang.org/x/sync/semaphore" 33) 34 35const ( 36 DefaultDelayThreshold = time.Second 37 DefaultBundleCountThreshold = 10 38 DefaultBundleByteThreshold = 1e6 // 1M 39 DefaultBufferedByteLimit = 1e9 // 1G 40) 41 42var ( 43 // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. 44 ErrOverflow = errors.New("bundler reached buffered byte limit") 45 46 // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. 47 ErrOversizedItem = errors.New("item size exceeds bundle byte limit") 48) 49 50// A Bundler collects items added to it into a bundle until the bundle 51// exceeds a given size, then calls a user-provided function to handle the bundle. 52type Bundler struct { 53 // Starting from the time that the first message is added to a bundle, once 54 // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. 55 DelayThreshold time.Duration 56 57 // Once a bundle has this many items, handle the bundle. Since only one 58 // item at a time is added to a bundle, no bundle will exceed this 59 // threshold, so it also serves as a limit. The default is 60 // DefaultBundleCountThreshold. 61 BundleCountThreshold int 62 63 // Once the number of bytes in current bundle reaches this threshold, handle 64 // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, 65 // but does not cap the total size of a bundle. 66 BundleByteThreshold int 67 68 // The maximum size of a bundle, in bytes. Zero means unlimited. 69 BundleByteLimit int 70 71 // The maximum number of bytes that the Bundler will keep in memory before 72 // returning ErrOverflow. The default is DefaultBufferedByteLimit. 73 BufferedByteLimit int 74 75 // The maximum number of handler invocations that can be running at once. 76 // The default is 1. 77 HandlerLimit int 78 79 handler func(interface{}) // called to handle a bundle 80 itemSliceZero reflect.Value // nil (zero value) for slice of items 81 flushTimer *time.Timer // implements DelayThreshold 82 83 mu sync.Mutex 84 sem *semaphore.Weighted // enforces BufferedByteLimit 85 semOnce sync.Once 86 curBundle bundle // incoming items added to this bundle 87 88 // Each bundle is assigned a unique ticket that determines the order in which the 89 // handler is called. The ticket is assigned with mu locked, but waiting for tickets 90 // to be handled is done via mu2 and cond, below. 91 nextTicket uint64 // next ticket to be assigned 92 93 mu2 sync.Mutex 94 cond *sync.Cond 95 nextHandled uint64 // next ticket to be handled 96 97 // In this implementation, active uses space proportional to HandlerLimit, and 98 // waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire 99 // or release occurs, so large values of HandlerLimit max may cause performance 100 // issues. 101 active map[uint64]bool // tickets of bundles actively being handled 102} 103 104type bundle struct { 105 items reflect.Value // slice of item type 106 size int // size in bytes of all items 107} 108 109// NewBundler creates a new Bundler. 110// 111// itemExample is a value of the type that will be bundled. For example, if you 112// want to create bundles of *Entry, you could pass &Entry{} for itemExample. 113// 114// handler is a function that will be called on each bundle. If itemExample is 115// of type T, the argument to handler is of type []T. handler is always called 116// sequentially for each bundle, and never in parallel. 117// 118// Configure the Bundler by setting its thresholds and limits before calling 119// any of its methods. 120func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { 121 b := &Bundler{ 122 DelayThreshold: DefaultDelayThreshold, 123 BundleCountThreshold: DefaultBundleCountThreshold, 124 BundleByteThreshold: DefaultBundleByteThreshold, 125 BufferedByteLimit: DefaultBufferedByteLimit, 126 HandlerLimit: 1, 127 128 handler: handler, 129 itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))), 130 active: map[uint64]bool{}, 131 } 132 b.curBundle.items = b.itemSliceZero 133 b.cond = sync.NewCond(&b.mu2) 134 return b 135} 136 137func (b *Bundler) initSemaphores() { 138 // Create the semaphores lazily, because the user may set limits 139 // after NewBundler. 140 b.semOnce.Do(func() { 141 b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) 142 }) 143} 144 145// Add adds item to the current bundle. It marks the bundle for handling and 146// starts a new one if any of the thresholds or limits are exceeded. 147// 148// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then 149// the item can never be handled. Add returns ErrOversizedItem in this case. 150// 151// If adding the item would exceed the maximum memory allowed 152// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for 153// memory, Add returns ErrOverflow. 154// 155// Add never blocks. 156func (b *Bundler) Add(item interface{}, size int) error { 157 // If this item exceeds the maximum size of a bundle, 158 // we can never send it. 159 if b.BundleByteLimit > 0 && size > b.BundleByteLimit { 160 return ErrOversizedItem 161 } 162 // If adding this item would exceed our allotted memory 163 // footprint, we can't accept it. 164 // (TryAcquire also returns false if anything is waiting on the semaphore, 165 // so calls to Add and AddWait shouldn't be mixed.) 166 b.initSemaphores() 167 if !b.sem.TryAcquire(int64(size)) { 168 return ErrOverflow 169 } 170 b.add(item, size) 171 return nil 172} 173 174// add adds item to the current bundle. It marks the bundle for handling and 175// starts a new one if any of the thresholds or limits are exceeded. 176func (b *Bundler) add(item interface{}, size int) { 177 b.mu.Lock() 178 defer b.mu.Unlock() 179 180 // If adding this item to the current bundle would cause it to exceed the 181 // maximum bundle size, close the current bundle and start a new one. 182 if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit { 183 b.startFlushLocked() 184 } 185 // Add the item. 186 b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item)) 187 b.curBundle.size += size 188 189 // Start a timer to flush the item if one isn't already running. 190 // startFlushLocked clears the timer and closes the bundle at the same time, 191 // so we only allocate a new timer for the first item in each bundle. 192 // (We could try to call Reset on the timer instead, but that would add a lot 193 // of complexity to the code just to save one small allocation.) 194 if b.flushTimer == nil { 195 b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush) 196 } 197 198 // If the current bundle equals the count threshold, close it. 199 if b.curBundle.items.Len() == b.BundleCountThreshold { 200 b.startFlushLocked() 201 } 202 // If the current bundle equals or exceeds the byte threshold, close it. 203 if b.curBundle.size >= b.BundleByteThreshold { 204 b.startFlushLocked() 205 } 206} 207 208// AddWait adds item to the current bundle. It marks the bundle for handling and 209// starts a new one if any of the thresholds or limits are exceeded. 210// 211// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then 212// the item can never be handled. AddWait returns ErrOversizedItem in this case. 213// 214// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), 215// AddWait blocks until space is available or ctx is done. 216// 217// Calls to Add and AddWait should not be mixed on the same Bundler. 218func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error { 219 // If this item exceeds the maximum size of a bundle, 220 // we can never send it. 221 if b.BundleByteLimit > 0 && size > b.BundleByteLimit { 222 return ErrOversizedItem 223 } 224 // If adding this item would exceed our allotted memory footprint, block 225 // until space is available. The semaphore is FIFO, so there will be no 226 // starvation. 227 b.initSemaphores() 228 if err := b.sem.Acquire(ctx, int64(size)); err != nil { 229 return err 230 } 231 // Here, we've reserved space for item. Other goroutines can call AddWait 232 // and even acquire space, but no one can take away our reservation 233 // (assuming sem.Release is used correctly). So there is no race condition 234 // resulting from locking the mutex after sem.Acquire returns. 235 b.add(item, size) 236 return nil 237} 238 239// Flush invokes the handler for all remaining items in the Bundler and waits 240// for it to return. 241func (b *Bundler) Flush() { 242 b.mu.Lock() 243 b.startFlushLocked() 244 // Here, all bundles with tickets < b.nextTicket are 245 // either finished or active. Those are the ones 246 // we want to wait for. 247 t := b.nextTicket 248 b.mu.Unlock() 249 b.initSemaphores() 250 b.waitUntilAllHandled(t) 251} 252 253func (b *Bundler) startFlushLocked() { 254 if b.flushTimer != nil { 255 b.flushTimer.Stop() 256 b.flushTimer = nil 257 } 258 if b.curBundle.items.Len() == 0 { 259 return 260 } 261 // Here, both semaphores must have been initialized. 262 bun := b.curBundle 263 b.curBundle = bundle{items: b.itemSliceZero} 264 ticket := b.nextTicket 265 b.nextTicket++ 266 go func() { 267 defer func() { 268 b.sem.Release(int64(bun.size)) 269 b.release(ticket) 270 }() 271 b.acquire(ticket) 272 b.handler(bun.items.Interface()) 273 }() 274} 275 276// acquire blocks until ticket is the next to be served, then returns. In order for N 277// acquire calls to return, the tickets must be in the range [0, N). A ticket must 278// not be presented to acquire more than once. 279func (b *Bundler) acquire(ticket uint64) { 280 b.mu2.Lock() 281 defer b.mu2.Unlock() 282 if ticket < b.nextHandled { 283 panic("bundler: acquire: arg too small") 284 } 285 for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) { 286 b.cond.Wait() 287 } 288 // Here, 289 // ticket == b.nextHandled: the caller is the next one to be handled; 290 // and len(b.active) < b.HandlerLimit: there is space available. 291 b.active[ticket] = true 292 b.nextHandled++ 293 // Broadcast, not Signal: although at most one acquire waiter can make progress, 294 // there might be waiters in waitUntilAllHandled. 295 b.cond.Broadcast() 296} 297 298// If a ticket is used for a call to acquire, it must later be passed to release. A 299// ticket must not be presented to release more than once. 300func (b *Bundler) release(ticket uint64) { 301 b.mu2.Lock() 302 defer b.mu2.Unlock() 303 if !b.active[ticket] { 304 panic("bundler: release: not an active ticket") 305 } 306 delete(b.active, ticket) 307 b.cond.Broadcast() 308} 309 310// waitUntilAllHandled blocks until all tickets < n have called release, meaning 311// all bundles with tickets < n have been handled. 312func (b *Bundler) waitUntilAllHandled(n uint64) { 313 // Proof of correctness of this function. 314 // "N is acquired" means acquire(N) has returned. 315 // "N is released" means release(N) has returned. 316 // 1. If N is acquired, N-1 is acquired. 317 // Follows from the loop test in acquire, and the fact 318 // that nextHandled is incremented by 1. 319 // 2. If nextHandled >= N, then N-1 is acquired. 320 // Because we only increment nextHandled to N after N-1 is acquired. 321 // 3. If nextHandled >= N, then all n < N is acquired. 322 // Follows from #1 and #2. 323 // 4. If N is acquired and N is not in active, then N is released. 324 // Because we put N in active before acquire returns, and only 325 // remove it when it is released. 326 // Let min(active) be the smallest member of active, or infinity if active is empty. 327 // 5. If nextHandled >= N and N <= min(active), then all n < N is released. 328 // From nextHandled >= N and #3, all n < N is acquired. 329 // N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active. 330 // So from #4, all n < N is released. 331 // The loop test below is the antecedent of #5. 332 b.mu2.Lock() 333 defer b.mu2.Unlock() 334 for !(b.nextHandled >= n && n <= min(b.active)) { 335 b.cond.Wait() 336 } 337} 338 339// min returns the minimum value of the set s, or the largest uint64 if 340// s is empty. 341func min(s map[uint64]bool) uint64 { 342 var m uint64 = math.MaxUint64 343 for n := range s { 344 if n < m { 345 m = n 346 } 347 } 348 return m 349} 350