1// Package streamcache provides a cache for large blobs (in the order of
2// gigabytes). Because storing gigabytes of data is slow, cache entries
3// can be streamed on the read end before they have finished on the write
4// end. Because storing gigabytes of data is expensive, cache entries
5// have a back pressure mechanism: if the readers don't make progress
6// reading the data, the writers will block. That way our disk can fill
7// up no faster than our readers can read from the cache.
8//
9// The cache has 3 main parts: Cache (in-memory index), filestore (files
10// to store the cached data in because it does not fit in memory), and
11// pipe (coordinated IO to one file between one writer and multiple
12// readers). A cache entry consists of a key, an maximum age, a
13// pipe and the error result of the thing writing to the pipe.
14//
15// Eviction
16//
17// There are two eviction goroutines: one for Cache and one for filestore.
18// The Cache eviction goroutine evicts entries after a set amount of time,
19// and deletes their underlying files too. This is safe because Unix file
20// semantics guarantee that readers/writers that are still using those
21// files can keep using them. In addition to evicting known cache
22// entries, we also have a goroutine at the filestore level which
23// performs a directory walk. This will clean up cache files left behind
24// by other processes.
25package streamcache
26
27import (
28	"context"
29	"fmt"
30	"io"
31	"os"
32	"sync"
33	"time"
34
35	"github.com/prometheus/client_golang/prometheus"
36	"github.com/prometheus/client_golang/prometheus/promauto"
37	"github.com/sirupsen/logrus"
38	"gitlab.com/gitlab-org/gitaly/v14/internal/dontpanic"
39	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
40)
41
42var cacheIndexSize = promauto.NewGaugeVec(
43	prometheus.GaugeOpts{
44		Name: "gitaly_streamcache_index_entries",
45		Help: "Number of index entries in streamcache",
46	},
47	[]string{"dir"},
48)
49
50// Cache is a cache for large byte streams.
51type Cache interface {
52	// FindOrCreate finds or creates a cache entry. If the create callback
53	// runs, it will be asynchronous and created is set to true. Callers must
54	// Close() the returned stream to free underlying resources.
55	FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error)
56	// Stop stops the cleanup goroutines of the cache.
57	Stop()
58}
59
60var _ = Cache(&TestLoggingCache{})
61
62// TestLogEntry records the result of a cache lookup for testing purposes.
63type TestLogEntry struct {
64	Key     string
65	Created bool
66	Err     error
67}
68
69// TestLoggingCache wraps a real Cache and logs all its lookups. This is
70// not suitable for production because the log will grow indefinitely.
71// Use only for testing.
72type TestLoggingCache struct {
73	Cache
74	entries []*TestLogEntry
75	m       sync.Mutex
76}
77
78// FindOrCreate calls the underlying FindOrCreate method and logs the
79// result.
80func (tlc *TestLoggingCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
81	s, created, err = tlc.Cache.FindOrCreate(key, create)
82
83	tlc.m.Lock()
84	defer tlc.m.Unlock()
85	tlc.entries = append(tlc.entries, &TestLogEntry{Key: key, Created: created, Err: err})
86	return s, created, err
87}
88
89// Entries returns a reference to the log of entries observed so far.
90// This is a reference so the caller should not modify the underlying
91// array or its elements.
92func (tlc *TestLoggingCache) Entries() []*TestLogEntry {
93	tlc.m.Lock()
94	defer tlc.m.Unlock()
95	return tlc.entries
96}
97
98var _ = Cache(NullCache{})
99
100// NullCache is a null implementation of Cache. Every lookup is a miss,
101// and it uses no storage.
102type NullCache struct{}
103
104// FindOrCreate runs create in a goroutine and lets the caller consume
105// the result via the returned stream. The created flag is always true.
106func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
107	pr, pw := io.Pipe()
108	w := newWaiter()
109	go func() { w.SetError(runCreate(pw, create)) }()
110	return &Stream{ReadCloser: pr, waiter: w}, true, nil
111}
112
113// Stop is a no-op.
114func (NullCache) Stop() {}
115
116type cache struct {
117	m          sync.Mutex
118	maxAge     time.Duration
119	index      map[string]*entry
120	createFile func() (namedWriteCloser, error)
121	stop       chan struct{}
122	stopOnce   sync.Once
123	logger     logrus.FieldLogger
124	dir        string
125}
126
127// New returns a new cache instance.
128func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache {
129	if cfg.Enabled {
130		return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.Sleep, logger)
131	}
132
133	return NullCache{}
134}
135
136func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) Cache {
137	fs := newFilestore(dir, maxAge, sleep, logger)
138
139	c := &cache{
140		maxAge:     maxAge,
141		index:      make(map[string]*entry),
142		createFile: fs.Create,
143		stop:       make(chan struct{}),
144		logger:     logger,
145		dir:        dir,
146	}
147
148	dontpanic.GoForever(1*time.Minute, func() {
149		sleepLoop(c.stop, c.maxAge, sleep, c.clean)
150	})
151	go func() {
152		<-c.stop
153		fs.Stop()
154	}()
155
156	return c
157}
158
159func (c *cache) Stop() {
160	c.stopOnce.Do(func() { close(c.stop) })
161}
162
163func (c *cache) clean() {
164	c.m.Lock()
165	defer c.m.Unlock()
166
167	var removed []*entry
168	cutoff := time.Now().Add(-c.maxAge)
169	for k, e := range c.index {
170		if e.created.Before(cutoff) {
171			c.delete(k)
172			removed = append(removed, e)
173		}
174	}
175
176	// Batch together file removals in a goroutine, without holding the mutex
177	go func() {
178		for _, e := range removed {
179			if err := e.pipe.RemoveFile(); err != nil && !os.IsNotExist(err) {
180				c.logger.WithError(err).Error("streamcache: remove file evicted from index")
181			}
182		}
183	}()
184}
185
186func (c *cache) delete(key string) {
187	delete(c.index, key)
188	c.setIndexSize()
189}
190
191func (c *cache) setIndexSize() {
192	cacheIndexSize.WithLabelValues(c.dir).Set(float64(len(c.index)))
193}
194
195func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
196	c.m.Lock()
197	defer c.m.Unlock()
198
199	if e := c.index[key]; e != nil {
200		if s, err := e.Open(); err == nil {
201			return s, false, nil
202		}
203
204		// In this case err != nil. That is allowed to happen, for instance if
205		// the *filestore cleanup goroutine deleted the file already. But let's
206		// remove the key from the cache to save the next caller the effort of
207		// trying to open this entry.
208		c.delete(key)
209	}
210
211	s, e, err := c.newEntry(key, create)
212	if err != nil {
213		return nil, false, err
214	}
215
216	c.index[key] = e
217	c.setIndexSize()
218
219	return s, true, nil
220}
221
222type entry struct {
223	key     string
224	cache   *cache
225	pipe    *pipe
226	created time.Time
227	waiter  *waiter
228}
229
230// Stream abstracts a stream of bytes (via Read()) plus an error (via
231// Wait()). Callers must always call Close() to prevent resource leaks.
232type Stream struct {
233	waiter *waiter
234	io.ReadCloser
235}
236
237// Wait returns the error value of the Stream. If ctx is canceled,
238// Wait unblocks and returns early.
239func (s *Stream) Wait(ctx context.Context) error { return s.waiter.Wait(ctx) }
240
241// WriteTo implements io.WriterTo. For some w on some platforms, this
242// uses sendfile to make copying data more efficient.
243func (s *Stream) WriteTo(w io.Writer) (int64, error) {
244	if wt, ok := s.ReadCloser.(io.WriterTo); ok {
245		return wt.WriteTo(w)
246	}
247
248	return io.Copy(w, s.ReadCloser)
249}
250
251func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ *entry, err error) {
252	e := &entry{
253		key:     key,
254		cache:   c,
255		created: time.Now(),
256		waiter:  newWaiter(),
257	}
258
259	// Every entry gets a unique underlying file. We do not want to reuse
260	// existing cache files because we do not know whether they are the
261	// result of a succesfull call to create.
262	//
263	// This may sound like we should be using an anonymous tempfile, but that
264	// would be at odds with the requirement to be able to open and close
265	// multiple instances of the file independently: one for the writer, and
266	// one for each reader.
267	//
268	// So the name of the file is irrelevant, but the file must have _a_
269	// name.
270	f, err := c.createFile()
271	if err != nil {
272		return nil, nil, err
273	}
274	defer func() {
275		if err != nil {
276			f.Close()
277		}
278	}()
279
280	var pr io.ReadCloser
281	pr, e.pipe, err = newPipe(f)
282	if err != nil {
283		return nil, nil, err
284	}
285
286	go func() {
287		err := runCreate(e.pipe, create)
288		e.waiter.SetError(err)
289		if err != nil {
290			c.logger.WithError(err).Error("create cache entry")
291			c.m.Lock()
292			defer c.m.Unlock()
293			c.delete(key)
294		}
295	}()
296
297	return e.wrapReadCloser(pr), e, nil
298}
299
300func (e *entry) wrapReadCloser(r io.ReadCloser) *Stream {
301	return &Stream{ReadCloser: r, waiter: e.waiter}
302}
303
304func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) {
305	// Catch panics because this function runs in a goroutine. That means that
306	// unlike RPC handlers, which are guarded by a panic catching middleware,
307	// an uncaught panic can crash the whole process.
308	defer func() {
309		if p := recover(); p != nil {
310			err = fmt.Errorf("panic: %v", p)
311		}
312	}()
313
314	defer w.Close()
315
316	if err := create(w); err != nil {
317		return err
318	}
319
320	if err := w.Close(); err != nil {
321		return err
322	}
323
324	return nil
325}
326
327func (e *entry) Open() (*Stream, error) {
328	r, err := e.pipe.OpenReader()
329	return e.wrapReadCloser(r), err
330}
331
332type waiter struct {
333	done chan struct{}
334	err  error
335	once sync.Once
336}
337
338func newWaiter() *waiter { return &waiter{done: make(chan struct{})} }
339
340func (w *waiter) SetError(err error) {
341	w.once.Do(func() {
342		w.err = err
343		close(w.done)
344	})
345}
346
347func (w *waiter) Wait(ctx context.Context) error {
348	select {
349	case <-ctx.Done():
350		return ctx.Err()
351	case <-w.done:
352		return w.err
353	}
354}
355
356func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration), callback func()) {
357	const maxPeriod = time.Minute
358	if period <= 0 || period >= maxPeriod {
359		period = maxPeriod
360	}
361
362	for {
363		sleep(period)
364
365		select {
366		case <-done:
367			return
368		default:
369		}
370
371		callback()
372	}
373}
374