1package peco
2
3import (
4	"time"
5
6	"github.com/lestrrat/go-pdebug"
7	"github.com/peco/peco/pipeline"
8	"github.com/pkg/errors"
9	"golang.org/x/net/context"
10)
11
12func NewFilteredBuffer(src Buffer, page, perPage int) *FilteredBuffer {
13	fb := FilteredBuffer{
14		src: src,
15	}
16
17	s := perPage * (page - 1)
18	if s > src.Size() {
19		return &fb
20	}
21
22	selection := make([]int, 0, src.Size())
23	e := s + perPage
24	if e >= src.Size() {
25		e = src.Size()
26	}
27
28	for i := s; i < e; i++ {
29		selection = append(selection, i)
30	}
31	fb.selection = selection
32
33	return &fb
34}
35
36func (flb *FilteredBuffer) Append(l Line) (Line, error) {
37	return l, nil
38}
39
40// LineAt returns the line at index `i`. Note that the i-th element
41// in this filtered buffer may actually correspond to a totally
42// different line number in the source buffer.
43func (flb FilteredBuffer) LineAt(i int) (Line, error) {
44	if i >= len(flb.selection) {
45		return nil, errors.Errorf("specified index %d is out of range", len(flb.selection))
46	}
47	return flb.src.LineAt(flb.selection[i])
48}
49
50// Size returns the number of lines in the buffer
51func (flb FilteredBuffer) Size() int {
52	return len(flb.selection)
53}
54
55func NewMemoryBuffer() *MemoryBuffer {
56	mb := &MemoryBuffer{}
57	mb.Reset()
58	return mb
59}
60
61func (mb *MemoryBuffer) Append(l Line) {
62	mb.mutex.Lock()
63	defer mb.mutex.Unlock()
64	bufferAppend(&mb.lines, l)
65}
66
67func bufferAppend(lines *[]Line, l Line) {
68	*lines = append(*lines, l)
69}
70
71func (mb *MemoryBuffer) Size() int {
72	mb.mutex.RLock()
73	defer mb.mutex.RUnlock()
74	return bufferSize(mb.lines)
75}
76
77func bufferSize(lines []Line) int {
78	return len(lines)
79}
80
81func (mb *MemoryBuffer) Reset() {
82	mb.mutex.Lock()
83	defer mb.mutex.Unlock()
84	if pdebug.Enabled {
85		g := pdebug.Marker("MemoryBuffer.Reset")
86		defer g.End()
87	}
88	mb.done = make(chan struct{})
89	mb.lines = []Line(nil)
90}
91
92func (mb *MemoryBuffer) Done() <-chan struct{} {
93	mb.mutex.RLock()
94	defer mb.mutex.RUnlock()
95	return mb.done
96}
97
98func (mb *MemoryBuffer) Accept(ctx context.Context, in chan interface{}, _ pipeline.OutputChannel) {
99	if pdebug.Enabled {
100		g := pdebug.Marker("MemoryBuffer.Accept")
101		defer g.End()
102	}
103	defer func() {
104		mb.mutex.Lock()
105		close(mb.done)
106		mb.mutex.Unlock()
107	}()
108
109	start := time.Now()
110	for {
111		select {
112		case <-ctx.Done():
113			if pdebug.Enabled {
114				pdebug.Printf("MemoryBuffer received context done")
115			}
116			return
117		case v := <-in:
118			switch v.(type) {
119			case error:
120				if pipeline.IsEndMark(v.(error)) {
121					if pdebug.Enabled {
122						pdebug.Printf("MemoryBuffer received end mark (read %d lines, %s since starting accept loop)", len(mb.lines), time.Since(start).String())
123					}
124					return
125				}
126			case Line:
127				mb.mutex.Lock()
128				mb.lines = append(mb.lines, v.(Line))
129				mb.mutex.Unlock()
130			}
131		}
132	}
133}
134
135func (mb *MemoryBuffer) LineAt(n int) (Line, error) {
136	mb.mutex.RLock()
137	defer mb.mutex.RUnlock()
138	return bufferLineAt(mb.lines, n)
139}
140
141func bufferLineAt(lines []Line, n int) (Line, error) {
142	if s := len(lines); s <= 0 || n >= s {
143		return nil, errors.New("empty buffer")
144	}
145
146	return lines[n], nil
147}
148