1package mpb 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "log" 10 "strings" 11 "sync" 12 "time" 13 "unicode/utf8" 14 15 "github.com/vbauerster/mpb/v4/decor" 16) 17 18// Filler interface. 19// Bar renders by calling Filler's Fill method. You can literally have 20// any bar kind, by implementing this interface and passing it to the 21// Add method. 22type Filler interface { 23 Fill(w io.Writer, width int, stat *decor.Statistics) 24} 25 26// FillerFunc is function type adapter to convert function into Filler. 27type FillerFunc func(w io.Writer, width int, stat *decor.Statistics) 28 29func (f FillerFunc) Fill(w io.Writer, width int, stat *decor.Statistics) { 30 f(w, width, stat) 31} 32 33// Bar represents a progress Bar. 34type Bar struct { 35 priority int // used by heap 36 index int // used by heap 37 38 extendedLines int 39 toShutdown bool 40 toDrop bool 41 operateState chan func(*bState) 42 frameCh chan io.Reader 43 syncTableCh chan [][]chan int 44 completed chan bool 45 46 // concel is called either by user or on complete event 47 cancel func() 48 // done is closed after cacheState is assigned 49 done chan struct{} 50 // cacheState is populated, right after close(shutdown) 51 cacheState *bState 52 53 arbitraryCurrent struct { 54 sync.Mutex 55 current int64 56 } 57 58 container *Progress 59 dlogger *log.Logger 60 recoveredPanic interface{} 61} 62 63type bState struct { 64 filler Filler 65 extender Filler 66 id int 67 width int 68 total int64 69 current int64 70 trimSpace bool 71 toComplete bool 72 completeFlushed bool 73 noBufBOnComplete bool 74 aDecorators []decor.Decorator 75 pDecorators []decor.Decorator 76 amountReceivers []decor.AmountReceiver 77 shutdownListeners []decor.ShutdownListener 78 bufP, bufB, bufA *bytes.Buffer 79 bufE *bytes.Buffer 80 81 // priority overrides *Bar's priority, if set 82 priority int 83 // dropOnComplete propagates to *Bar 84 dropOnComplete bool 85 // runningBar is a key for *pState.parkedBars 86 runningBar *Bar 87 88 debugOut io.Writer 89} 90 91func newBar(container *Progress, bs *bState) *Bar { 92 93 bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width)) 94 bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width)) 95 bs.bufA = bytes.NewBuffer(make([]byte, 0, bs.width)) 96 if bs.extender != nil { 97 bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width)) 98 } 99 100 logPrefix := fmt.Sprintf("%sbar#%02d ", container.dlogger.Prefix(), bs.id) 101 ctx, cancel := context.WithCancel(container.ctx) 102 bar := &Bar{ 103 container: container, 104 priority: bs.priority, 105 toDrop: bs.dropOnComplete, 106 operateState: make(chan func(*bState)), 107 frameCh: make(chan io.Reader, 1), 108 syncTableCh: make(chan [][]chan int), 109 completed: make(chan bool, 1), 110 done: make(chan struct{}), 111 cancel: cancel, 112 dlogger: log.New(bs.debugOut, logPrefix, log.Lshortfile), 113 } 114 115 go bar.serve(ctx, bs) 116 return bar 117} 118 119// RemoveAllPrependers removes all prepend functions. 120func (b *Bar) RemoveAllPrependers() { 121 select { 122 case b.operateState <- func(s *bState) { s.pDecorators = nil }: 123 case <-b.done: 124 } 125} 126 127// RemoveAllAppenders removes all append functions. 128func (b *Bar) RemoveAllAppenders() { 129 select { 130 case b.operateState <- func(s *bState) { s.aDecorators = nil }: 131 case <-b.done: 132 } 133} 134 135// ProxyReader wraps r with metrics required for progress tracking. 136func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser { 137 if r == nil { 138 return nil 139 } 140 rc, ok := r.(io.ReadCloser) 141 if !ok { 142 rc = ioutil.NopCloser(r) 143 } 144 return &proxyReader{rc, b, time.Now()} 145} 146 147// ID returs id of the bar. 148func (b *Bar) ID() int { 149 result := make(chan int) 150 select { 151 case b.operateState <- func(s *bState) { result <- s.id }: 152 return <-result 153 case <-b.done: 154 return b.cacheState.id 155 } 156} 157 158// Current returns bar's current number, in other words sum of all increments. 159func (b *Bar) Current() int64 { 160 result := make(chan int64) 161 select { 162 case b.operateState <- func(s *bState) { result <- s.current }: 163 return <-result 164 case <-b.done: 165 return b.cacheState.current 166 } 167} 168 169// SetRefill sets refill, if supported by underlying Filler. 170func (b *Bar) SetRefill(amount int64) { 171 b.operateState <- func(s *bState) { 172 if f, ok := s.filler.(interface{ SetRefill(int64) }); ok { 173 f.SetRefill(amount) 174 } 175 } 176} 177 178// SetTotal sets total dynamically. 179// Set complete to true, to trigger bar complete event now. 180func (b *Bar) SetTotal(total int64, complete bool) { 181 select { 182 case b.operateState <- func(s *bState) { 183 s.total = total 184 if complete && !s.toComplete { 185 s.current = s.total 186 s.toComplete = true 187 go b.refreshNowTillShutdown() 188 } 189 }: 190 case <-b.done: 191 } 192} 193 194// SetCurrent sets progress' current to arbitrary amount. 195func (b *Bar) SetCurrent(current int64, wdd ...time.Duration) { 196 if current <= 0 { 197 return 198 } 199 b.arbitraryCurrent.Lock() 200 last := b.arbitraryCurrent.current 201 b.IncrBy(int(current-last), wdd...) 202 b.arbitraryCurrent.current = current 203 b.arbitraryCurrent.Unlock() 204} 205 206// Increment is a shorthand for b.IncrBy(1). 207func (b *Bar) Increment() { 208 b.IncrBy(1) 209} 210 211// IncrBy increments progress bar by amount of n. 212// wdd is optional work duration i.e. time.Since(start), which expected 213// to be provided, if any ewma based decorator is used. 214func (b *Bar) IncrBy(n int, wdd ...time.Duration) { 215 select { 216 case b.operateState <- func(s *bState) { 217 s.current += int64(n) 218 if s.total > 0 && s.current >= s.total { 219 s.current = s.total 220 s.toComplete = true 221 go b.refreshNowTillShutdown() 222 } 223 for _, ar := range s.amountReceivers { 224 ar.NextAmount(n, wdd...) 225 } 226 }: 227 case <-b.done: 228 } 229} 230 231// SetPriority changes bar's order among multiple bars. Zero is highest 232// priority, i.e. bar will be on top. If you don't need to set priority 233// dynamically, better use BarPriority option. 234func (b *Bar) SetPriority(priority int) { 235 select { 236 case <-b.done: 237 default: 238 b.container.setBarPriority(b, priority) 239 } 240} 241 242// Abort interrupts bar's running goroutine. Call this, if you'd like 243// to stop/remove bar before completion event. It has no effect after 244// completion event. If drop is true bar will be removed as well. 245func (b *Bar) Abort(drop bool) { 246 select { 247 case <-b.done: 248 default: 249 if drop { 250 b.container.dropBar(b) 251 } 252 b.cancel() 253 } 254} 255 256// Completed reports whether the bar is in completed state. 257func (b *Bar) Completed() bool { 258 select { 259 case b.operateState <- func(s *bState) { b.completed <- s.toComplete }: 260 return <-b.completed 261 case <-b.done: 262 return true 263 } 264} 265 266func (b *Bar) wSyncTable() [][]chan int { 267 select { 268 case b.operateState <- func(s *bState) { b.syncTableCh <- s.wSyncTable() }: 269 return <-b.syncTableCh 270 case <-b.done: 271 return b.cacheState.wSyncTable() 272 } 273} 274 275func (b *Bar) serve(ctx context.Context, s *bState) { 276 defer b.container.bwg.Done() 277 for { 278 select { 279 case op := <-b.operateState: 280 op(s) 281 case <-ctx.Done(): 282 b.cacheState = s 283 close(b.done) 284 // Notifying decorators about shutdown event 285 for _, sl := range s.shutdownListeners { 286 sl.Shutdown() 287 } 288 return 289 } 290 } 291} 292 293func (b *Bar) render(tw int) { 294 if b.recoveredPanic != nil { 295 b.toShutdown = false 296 b.frameCh <- b.panicToFrame(tw) 297 return 298 } 299 select { 300 case b.operateState <- func(s *bState) { 301 defer func() { 302 // recovering if user defined decorator panics for example 303 if p := recover(); p != nil { 304 b.dlogger.Println(p) 305 b.recoveredPanic = p 306 b.toShutdown = !s.completeFlushed 307 b.frameCh <- b.panicToFrame(tw) 308 } 309 }() 310 311 frame := s.draw(tw) 312 313 if s.extender != nil { 314 s.extender.Fill(s.bufE, tw, newStatistics(s)) 315 b.extendedLines = countLines(s.bufE.Bytes()) 316 frame = io.MultiReader(frame, s.bufE) 317 } 318 319 b.toDrop = s.dropOnComplete 320 b.toShutdown = s.toComplete && !s.completeFlushed 321 s.completeFlushed = s.toComplete 322 323 b.frameCh <- frame 324 }: 325 case <-b.done: 326 s := b.cacheState 327 frame := s.draw(tw) 328 if s.extender != nil { 329 s.extender.Fill(s.bufE, tw, newStatistics(s)) 330 b.extendedLines = countLines(s.bufE.Bytes()) 331 frame = io.MultiReader(frame, s.bufE) 332 } 333 b.frameCh <- frame 334 } 335} 336 337func (b *Bar) panicToFrame(termWidth int) io.Reader { 338 return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.recoveredPanic)) 339} 340 341func (s *bState) draw(termWidth int) io.Reader { 342 343 stat := newStatistics(s) 344 345 for _, d := range s.pDecorators { 346 s.bufP.WriteString(d.Decor(stat)) 347 } 348 349 for _, d := range s.aDecorators { 350 s.bufA.WriteString(d.Decor(stat)) 351 } 352 353 if s.noBufBOnComplete && s.completeFlushed { 354 s.bufA.WriteByte('\n') 355 return io.MultiReader(s.bufP, s.bufA) 356 } 357 358 prependCount := utf8.RuneCount(s.bufP.Bytes()) 359 appendCount := utf8.RuneCount(s.bufA.Bytes()) 360 361 if !s.trimSpace { 362 // reserve space for edge spaces 363 termWidth -= 2 364 s.bufB.WriteByte(' ') 365 } 366 367 calcWidth := s.width 368 if prependCount+s.width+appendCount > termWidth { 369 calcWidth = termWidth - prependCount - appendCount 370 } 371 s.filler.Fill(s.bufB, calcWidth, stat) 372 373 if !s.trimSpace { 374 s.bufB.WriteByte(' ') 375 } 376 377 s.bufA.WriteByte('\n') 378 return io.MultiReader(s.bufP, s.bufB, s.bufA) 379} 380 381func (s *bState) wSyncTable() [][]chan int { 382 columns := make([]chan int, 0, len(s.pDecorators)+len(s.aDecorators)) 383 var pCount int 384 for _, d := range s.pDecorators { 385 if ch, ok := d.Sync(); ok { 386 columns = append(columns, ch) 387 pCount++ 388 } 389 } 390 var aCount int 391 for _, d := range s.aDecorators { 392 if ch, ok := d.Sync(); ok { 393 columns = append(columns, ch) 394 aCount++ 395 } 396 } 397 table := make([][]chan int, 2) 398 table[0] = columns[0:pCount] 399 table[1] = columns[pCount : pCount+aCount : pCount+aCount] 400 return table 401} 402 403func (b *Bar) refreshNowTillShutdown() { 404 for { 405 select { 406 case b.container.forceRefresh <- time.Now(): 407 case <-b.done: 408 return 409 } 410 } 411} 412 413func (b *Bar) dropOnComplete() { 414 select { 415 case b.operateState <- func(s *bState) { s.dropOnComplete = true }: 416 case <-b.done: 417 } 418} 419 420func newStatistics(s *bState) *decor.Statistics { 421 return &decor.Statistics{ 422 ID: s.id, 423 Completed: s.completeFlushed, 424 Total: s.total, 425 Current: s.current, 426 } 427} 428 429func countLines(b []byte) int { 430 return bytes.Count(b, []byte("\n")) 431} 432