1package peco 2 3import ( 4 "time" 5 6 "github.com/lestrrat/go-pdebug" 7 "github.com/peco/peco/pipeline" 8 "github.com/pkg/errors" 9 "golang.org/x/net/context" 10) 11 12func NewFilteredBuffer(src Buffer, page, perPage int) *FilteredBuffer { 13 fb := FilteredBuffer{ 14 src: src, 15 } 16 17 s := perPage * (page - 1) 18 if s > src.Size() { 19 return &fb 20 } 21 22 selection := make([]int, 0, src.Size()) 23 e := s + perPage 24 if e >= src.Size() { 25 e = src.Size() 26 } 27 28 for i := s; i < e; i++ { 29 selection = append(selection, i) 30 } 31 fb.selection = selection 32 33 return &fb 34} 35 36func (flb *FilteredBuffer) Append(l Line) (Line, error) { 37 return l, nil 38} 39 40// LineAt returns the line at index `i`. Note that the i-th element 41// in this filtered buffer may actually correspond to a totally 42// different line number in the source buffer. 43func (flb FilteredBuffer) LineAt(i int) (Line, error) { 44 if i >= len(flb.selection) { 45 return nil, errors.Errorf("specified index %d is out of range", len(flb.selection)) 46 } 47 return flb.src.LineAt(flb.selection[i]) 48} 49 50// Size returns the number of lines in the buffer 51func (flb FilteredBuffer) Size() int { 52 return len(flb.selection) 53} 54 55func NewMemoryBuffer() *MemoryBuffer { 56 mb := &MemoryBuffer{} 57 mb.Reset() 58 return mb 59} 60 61func (mb *MemoryBuffer) Append(l Line) { 62 mb.mutex.Lock() 63 defer mb.mutex.Unlock() 64 bufferAppend(&mb.lines, l) 65} 66 67func bufferAppend(lines *[]Line, l Line) { 68 *lines = append(*lines, l) 69} 70 71func (mb *MemoryBuffer) Size() int { 72 mb.mutex.RLock() 73 defer mb.mutex.RUnlock() 74 return bufferSize(mb.lines) 75} 76 77func bufferSize(lines []Line) int { 78 return len(lines) 79} 80 81func (mb *MemoryBuffer) Reset() { 82 mb.mutex.Lock() 83 defer mb.mutex.Unlock() 84 if pdebug.Enabled { 85 g := pdebug.Marker("MemoryBuffer.Reset") 86 defer g.End() 87 } 88 mb.done = make(chan struct{}) 89 mb.lines = []Line(nil) 90} 91 92func (mb *MemoryBuffer) Done() <-chan struct{} { 93 mb.mutex.RLock() 94 defer mb.mutex.RUnlock() 95 return mb.done 96} 97 98func (mb *MemoryBuffer) Accept(ctx context.Context, in chan interface{}, _ pipeline.OutputChannel) { 99 if pdebug.Enabled { 100 g := pdebug.Marker("MemoryBuffer.Accept") 101 defer g.End() 102 } 103 defer func() { 104 mb.mutex.Lock() 105 close(mb.done) 106 mb.mutex.Unlock() 107 }() 108 109 start := time.Now() 110 for { 111 select { 112 case <-ctx.Done(): 113 if pdebug.Enabled { 114 pdebug.Printf("MemoryBuffer received context done") 115 } 116 return 117 case v := <-in: 118 switch v.(type) { 119 case error: 120 if pipeline.IsEndMark(v.(error)) { 121 if pdebug.Enabled { 122 pdebug.Printf("MemoryBuffer received end mark (read %d lines, %s since starting accept loop)", len(mb.lines), time.Since(start).String()) 123 } 124 return 125 } 126 case Line: 127 mb.mutex.Lock() 128 mb.lines = append(mb.lines, v.(Line)) 129 mb.mutex.Unlock() 130 } 131 } 132 } 133} 134 135func (mb *MemoryBuffer) LineAt(n int) (Line, error) { 136 mb.mutex.RLock() 137 defer mb.mutex.RUnlock() 138 return bufferLineAt(mb.lines, n) 139} 140 141func bufferLineAt(lines []Line, n int) (Line, error) { 142 if s := len(lines); s <= 0 || n >= s { 143 return nil, errors.New("empty buffer") 144 } 145 146 return lines[n], nil 147} 148