1// Package streamcache provides a cache for large blobs (in the order of 2// gigabytes). Because storing gigabytes of data is slow, cache entries 3// can be streamed on the read end before they have finished on the write 4// end. Because storing gigabytes of data is expensive, cache entries 5// have a back pressure mechanism: if the readers don't make progress 6// reading the data, the writers will block. That way our disk can fill 7// up no faster than our readers can read from the cache. 8// 9// The cache has 3 main parts: Cache (in-memory index), filestore (files 10// to store the cached data in because it does not fit in memory), and 11// pipe (coordinated IO to one file between one writer and multiple 12// readers). A cache entry consists of a key, an maximum age, a 13// pipe and the error result of the thing writing to the pipe. 14// 15// Eviction 16// 17// There are two eviction goroutines: one for Cache and one for filestore. 18// The Cache eviction goroutine evicts entries after a set amount of time, 19// and deletes their underlying files too. This is safe because Unix file 20// semantics guarantee that readers/writers that are still using those 21// files can keep using them. In addition to evicting known cache 22// entries, we also have a goroutine at the filestore level which 23// performs a directory walk. This will clean up cache files left behind 24// by other processes. 25package streamcache 26 27import ( 28 "context" 29 "fmt" 30 "io" 31 "os" 32 "sync" 33 "time" 34 35 "github.com/prometheus/client_golang/prometheus" 36 "github.com/prometheus/client_golang/prometheus/promauto" 37 "github.com/sirupsen/logrus" 38 "gitlab.com/gitlab-org/gitaly/v14/internal/dontpanic" 39 "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" 40) 41 42var cacheIndexSize = promauto.NewGaugeVec( 43 prometheus.GaugeOpts{ 44 Name: "gitaly_streamcache_index_entries", 45 Help: "Number of index entries in streamcache", 46 }, 47 []string{"dir"}, 48) 49 50// Cache is a cache for large byte streams. 51type Cache interface { 52 // FindOrCreate finds or creates a cache entry. If the create callback 53 // runs, it will be asynchronous and created is set to true. Callers must 54 // Close() the returned stream to free underlying resources. 55 FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) 56 // Stop stops the cleanup goroutines of the cache. 57 Stop() 58} 59 60var _ = Cache(&TestLoggingCache{}) 61 62// TestLogEntry records the result of a cache lookup for testing purposes. 63type TestLogEntry struct { 64 Key string 65 Created bool 66 Err error 67} 68 69// TestLoggingCache wraps a real Cache and logs all its lookups. This is 70// not suitable for production because the log will grow indefinitely. 71// Use only for testing. 72type TestLoggingCache struct { 73 Cache 74 entries []*TestLogEntry 75 m sync.Mutex 76} 77 78// FindOrCreate calls the underlying FindOrCreate method and logs the 79// result. 80func (tlc *TestLoggingCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { 81 s, created, err = tlc.Cache.FindOrCreate(key, create) 82 83 tlc.m.Lock() 84 defer tlc.m.Unlock() 85 tlc.entries = append(tlc.entries, &TestLogEntry{Key: key, Created: created, Err: err}) 86 return s, created, err 87} 88 89// Entries returns a reference to the log of entries observed so far. 90// This is a reference so the caller should not modify the underlying 91// array or its elements. 92func (tlc *TestLoggingCache) Entries() []*TestLogEntry { 93 tlc.m.Lock() 94 defer tlc.m.Unlock() 95 return tlc.entries 96} 97 98var _ = Cache(NullCache{}) 99 100// NullCache is a null implementation of Cache. Every lookup is a miss, 101// and it uses no storage. 102type NullCache struct{} 103 104// FindOrCreate runs create in a goroutine and lets the caller consume 105// the result via the returned stream. The created flag is always true. 106func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { 107 pr, pw := io.Pipe() 108 w := newWaiter() 109 go func() { w.SetError(runCreate(pw, create)) }() 110 return &Stream{ReadCloser: pr, waiter: w}, true, nil 111} 112 113// Stop is a no-op. 114func (NullCache) Stop() {} 115 116type cache struct { 117 m sync.Mutex 118 maxAge time.Duration 119 index map[string]*entry 120 createFile func() (namedWriteCloser, error) 121 stop chan struct{} 122 stopOnce sync.Once 123 logger logrus.FieldLogger 124 dir string 125} 126 127// New returns a new cache instance. 128func New(cfg config.StreamCacheConfig, logger logrus.FieldLogger) Cache { 129 if cfg.Enabled { 130 return newCacheWithSleep(cfg.Dir, cfg.MaxAge.Duration(), time.Sleep, logger) 131 } 132 133 return NullCache{} 134} 135 136func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) Cache { 137 fs := newFilestore(dir, maxAge, sleep, logger) 138 139 c := &cache{ 140 maxAge: maxAge, 141 index: make(map[string]*entry), 142 createFile: fs.Create, 143 stop: make(chan struct{}), 144 logger: logger, 145 dir: dir, 146 } 147 148 dontpanic.GoForever(1*time.Minute, func() { 149 sleepLoop(c.stop, c.maxAge, sleep, c.clean) 150 }) 151 go func() { 152 <-c.stop 153 fs.Stop() 154 }() 155 156 return c 157} 158 159func (c *cache) Stop() { 160 c.stopOnce.Do(func() { close(c.stop) }) 161} 162 163func (c *cache) clean() { 164 c.m.Lock() 165 defer c.m.Unlock() 166 167 var removed []*entry 168 cutoff := time.Now().Add(-c.maxAge) 169 for k, e := range c.index { 170 if e.created.Before(cutoff) { 171 c.delete(k) 172 removed = append(removed, e) 173 } 174 } 175 176 // Batch together file removals in a goroutine, without holding the mutex 177 go func() { 178 for _, e := range removed { 179 if err := e.pipe.RemoveFile(); err != nil && !os.IsNotExist(err) { 180 c.logger.WithError(err).Error("streamcache: remove file evicted from index") 181 } 182 } 183 }() 184} 185 186func (c *cache) delete(key string) { 187 delete(c.index, key) 188 c.setIndexSize() 189} 190 191func (c *cache) setIndexSize() { 192 cacheIndexSize.WithLabelValues(c.dir).Set(float64(len(c.index))) 193} 194 195func (c *cache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) { 196 c.m.Lock() 197 defer c.m.Unlock() 198 199 if e := c.index[key]; e != nil { 200 if s, err := e.Open(); err == nil { 201 return s, false, nil 202 } 203 204 // In this case err != nil. That is allowed to happen, for instance if 205 // the *filestore cleanup goroutine deleted the file already. But let's 206 // remove the key from the cache to save the next caller the effort of 207 // trying to open this entry. 208 c.delete(key) 209 } 210 211 s, e, err := c.newEntry(key, create) 212 if err != nil { 213 return nil, false, err 214 } 215 216 c.index[key] = e 217 c.setIndexSize() 218 219 return s, true, nil 220} 221 222type entry struct { 223 key string 224 cache *cache 225 pipe *pipe 226 created time.Time 227 waiter *waiter 228} 229 230// Stream abstracts a stream of bytes (via Read()) plus an error (via 231// Wait()). Callers must always call Close() to prevent resource leaks. 232type Stream struct { 233 waiter *waiter 234 io.ReadCloser 235} 236 237// Wait returns the error value of the Stream. If ctx is canceled, 238// Wait unblocks and returns early. 239func (s *Stream) Wait(ctx context.Context) error { return s.waiter.Wait(ctx) } 240 241// WriteTo implements io.WriterTo. For some w on some platforms, this 242// uses sendfile to make copying data more efficient. 243func (s *Stream) WriteTo(w io.Writer) (int64, error) { 244 if wt, ok := s.ReadCloser.(io.WriterTo); ok { 245 return wt.WriteTo(w) 246 } 247 248 return io.Copy(w, s.ReadCloser) 249} 250 251func (c *cache) newEntry(key string, create func(io.Writer) error) (_ *Stream, _ *entry, err error) { 252 e := &entry{ 253 key: key, 254 cache: c, 255 created: time.Now(), 256 waiter: newWaiter(), 257 } 258 259 // Every entry gets a unique underlying file. We do not want to reuse 260 // existing cache files because we do not know whether they are the 261 // result of a succesfull call to create. 262 // 263 // This may sound like we should be using an anonymous tempfile, but that 264 // would be at odds with the requirement to be able to open and close 265 // multiple instances of the file independently: one for the writer, and 266 // one for each reader. 267 // 268 // So the name of the file is irrelevant, but the file must have _a_ 269 // name. 270 f, err := c.createFile() 271 if err != nil { 272 return nil, nil, err 273 } 274 defer func() { 275 if err != nil { 276 f.Close() 277 } 278 }() 279 280 var pr io.ReadCloser 281 pr, e.pipe, err = newPipe(f) 282 if err != nil { 283 return nil, nil, err 284 } 285 286 go func() { 287 err := runCreate(e.pipe, create) 288 e.waiter.SetError(err) 289 if err != nil { 290 c.logger.WithError(err).Error("create cache entry") 291 c.m.Lock() 292 defer c.m.Unlock() 293 c.delete(key) 294 } 295 }() 296 297 return e.wrapReadCloser(pr), e, nil 298} 299 300func (e *entry) wrapReadCloser(r io.ReadCloser) *Stream { 301 return &Stream{ReadCloser: r, waiter: e.waiter} 302} 303 304func runCreate(w io.WriteCloser, create func(io.Writer) error) (err error) { 305 // Catch panics because this function runs in a goroutine. That means that 306 // unlike RPC handlers, which are guarded by a panic catching middleware, 307 // an uncaught panic can crash the whole process. 308 defer func() { 309 if p := recover(); p != nil { 310 err = fmt.Errorf("panic: %v", p) 311 } 312 }() 313 314 defer w.Close() 315 316 if err := create(w); err != nil { 317 return err 318 } 319 320 if err := w.Close(); err != nil { 321 return err 322 } 323 324 return nil 325} 326 327func (e *entry) Open() (*Stream, error) { 328 r, err := e.pipe.OpenReader() 329 return e.wrapReadCloser(r), err 330} 331 332type waiter struct { 333 done chan struct{} 334 err error 335 once sync.Once 336} 337 338func newWaiter() *waiter { return &waiter{done: make(chan struct{})} } 339 340func (w *waiter) SetError(err error) { 341 w.once.Do(func() { 342 w.err = err 343 close(w.done) 344 }) 345} 346 347func (w *waiter) Wait(ctx context.Context) error { 348 select { 349 case <-ctx.Done(): 350 return ctx.Err() 351 case <-w.done: 352 return w.err 353 } 354} 355 356func sleepLoop(done chan struct{}, period time.Duration, sleep func(time.Duration), callback func()) { 357 const maxPeriod = time.Minute 358 if period <= 0 || period >= maxPeriod { 359 period = maxPeriod 360 } 361 362 for { 363 sleep(period) 364 365 select { 366 case <-done: 367 return 368 default: 369 } 370 371 callback() 372 } 373} 374