1// Copyright 2016 Keybase Inc. All rights reserved.
2// Use of this source code is governed by a BSD
3// license that can be found in the LICENSE file.
4package libkbfs
5
6import (
7	"errors"
8	"sync"
9	"testing"
10	"time"
11
12	"github.com/keybase/client/go/kbfs/data"
13	"github.com/keybase/client/go/kbfs/env"
14	"github.com/keybase/client/go/kbfs/kbfscodec"
15	"github.com/keybase/client/go/kbfs/kbfscrypto"
16	"github.com/keybase/client/go/kbfs/libkey"
17	"github.com/stretchr/testify/require"
18	"golang.org/x/net/context"
19)
20
21// blockReturner contains a block value to copy into requested blocks, and a
22// channel to synchronize on with the worker.
23type blockReturner struct {
24	block      data.Block
25	continueCh chan error
26	startCh    chan struct{}
27}
28
29// fakeBlockGetter allows specifying and obtaining fake blocks.
30type fakeBlockGetter struct {
31	mtx           sync.RWMutex
32	blockMap      map[data.BlockPointer]blockReturner
33	codec         kbfscodec.Codec
34	respectCancel bool
35}
36
37// newFakeBlockGetter returns a fakeBlockGetter.
38func newFakeBlockGetter(respectCancel bool) *fakeBlockGetter {
39	return &fakeBlockGetter{
40		blockMap:      make(map[data.BlockPointer]blockReturner),
41		codec:         kbfscodec.NewMsgpack(),
42		respectCancel: respectCancel,
43	}
44}
45
46// setBlockToReturn sets the block that will be returned for a given
47// BlockPointer. Returns a writeable channel that getBlock will wait on, to
48// allow synchronization of tests.
49func (bg *fakeBlockGetter) setBlockToReturn(blockPtr data.BlockPointer,
50	block data.Block) (startCh <-chan struct{}, continueCh chan<- error) {
51	bg.mtx.Lock()
52	defer bg.mtx.Unlock()
53	sCh, cCh := make(chan struct{}), make(chan error)
54	bg.blockMap[blockPtr] = blockReturner{
55		block:      block,
56		startCh:    sCh,
57		continueCh: cCh,
58	}
59	return sCh, cCh
60}
61
62// getBlock implements the interface for realBlockGetter.
63func (bg *fakeBlockGetter) getBlock(
64	ctx context.Context, kmd libkey.KeyMetadata, blockPtr data.BlockPointer,
65	block data.Block, _ DiskBlockCacheType) error {
66	bg.mtx.RLock()
67	defer bg.mtx.RUnlock()
68	source, ok := bg.blockMap[blockPtr]
69	if !ok {
70		return errors.New("Block doesn't exist in fake block map")
71	}
72	cancelCh := make(chan struct{})
73	if bg.respectCancel {
74		go func() {
75			<-ctx.Done()
76			close(cancelCh)
77		}()
78	}
79	// Wait until the caller tells us to continue
80	for {
81		select {
82		case source.startCh <- struct{}{}:
83		case err := <-source.continueCh:
84			if err != nil {
85				return err
86			}
87			block.Set(source.block)
88			return nil
89		case <-cancelCh:
90			return ctx.Err()
91		}
92	}
93}
94
95func (bg *fakeBlockGetter) assembleBlock(ctx context.Context,
96	kmd libkey.KeyMetadata, ptr data.BlockPointer, block data.Block, buf []byte,
97	serverHalf kbfscrypto.BlockCryptKeyServerHalf) error {
98	bg.mtx.RLock()
99	defer bg.mtx.RUnlock()
100	source, ok := bg.blockMap[ptr]
101	if !ok {
102		return errors.New("Block doesn't exist in fake block map")
103	}
104	block.Set(source.block)
105	return nil
106}
107
108func (bg *fakeBlockGetter) assembleBlockLocal(ctx context.Context,
109	kmd libkey.KeyMetadata, ptr data.BlockPointer, block data.Block, buf []byte,
110	serverHalf kbfscrypto.BlockCryptKeyServerHalf) error {
111	return bg.assembleBlock(ctx, kmd, ptr, block, buf, serverHalf)
112}
113
114func TestBlockRetrievalWorkerBasic(t *testing.T) {
115	t.Log("Test the basic ability of a worker to return a block.")
116	bg := newFakeBlockGetter(false)
117	q := newBlockRetrievalQueue(
118		0, 1, 0, newTestBlockRetrievalConfig(t, bg, nil),
119		env.EmptyAppStateUpdater{})
120	require.NotNil(t, q)
121	defer endBlockRetrievalQueueTest(t, q)
122
123	ptr1 := makeRandomBlockPointer(t)
124	block1 := makeFakeFileBlock(t, false)
125	_, continueCh1 := bg.setBlockToReturn(ptr1, block1)
126
127	block := &data.FileBlock{}
128	ch := q.Request(
129		context.Background(), 1, makeKMD(), ptr1, block,
130		data.NoCacheEntry, BlockRequestSolo)
131	continueCh1 <- nil
132	err := <-ch
133	require.NoError(t, err)
134	require.Equal(t, block1, block)
135}
136
137func TestBlockRetrievalWorkerBasicSoloCached(t *testing.T) {
138	t.Log("Test the worker fetching and caching a solo block.")
139	bg := newFakeBlockGetter(false)
140	q := newBlockRetrievalQueue(
141		0, 1, 0, newTestBlockRetrievalConfig(t, bg, nil),
142		env.EmptyAppStateUpdater{})
143	require.NotNil(t, q)
144	defer endBlockRetrievalQueueTest(t, q)
145
146	ptr1 := makeRandomBlockPointer(t)
147	block1 := makeFakeFileBlock(t, false)
148	_, continueCh1 := bg.setBlockToReturn(ptr1, block1)
149
150	block := &data.FileBlock{}
151	ch := q.Request(
152		context.Background(), 1, makeKMD(), ptr1, block, data.TransientEntry,
153		BlockRequestSolo)
154	continueCh1 <- nil
155	err := <-ch
156	require.NoError(t, err)
157
158	_, err = q.config.BlockCache().Get(ptr1)
159	require.NoError(t, err)
160}
161
162func TestBlockRetrievalWorkerMultipleWorkers(t *testing.T) {
163	t.Log("Test the ability of multiple workers to retrieve concurrently.")
164	bg := newFakeBlockGetter(false)
165	q := newBlockRetrievalQueue(
166		2, 0, 0, newTestBlockRetrievalConfig(t, bg, nil),
167		env.EmptyAppStateUpdater{})
168	require.NotNil(t, q)
169	defer endBlockRetrievalQueueTest(t, q)
170
171	ptr1, ptr2 := makeRandomBlockPointer(t), makeRandomBlockPointer(t)
172	block1, block2 := makeFakeFileBlock(t, false), makeFakeFileBlock(t, false)
173	_, continueCh1 := bg.setBlockToReturn(ptr1, block1)
174	_, continueCh2 := bg.setBlockToReturn(ptr2, block2)
175
176	t.Log("Make 2 requests for 2 different blocks")
177	block := &data.FileBlock{}
178	// Set the base priority to be above the default on-demand
179	// fetching, so that the pre-prefetch request for a block doesn't
180	// override the other blocks' requests.
181	basePriority := defaultOnDemandRequestPriority + 1
182	req1Ch := q.Request(
183		context.Background(), basePriority, makeKMD(), ptr1, block,
184		data.NoCacheEntry, BlockRequestSolo)
185	req2Ch := q.Request(
186		context.Background(), basePriority, makeKMD(), ptr2, block,
187		data.NoCacheEntry, BlockRequestSolo)
188
189	t.Log("Allow the second request to complete before the first")
190	continueCh2 <- nil
191	err := <-req2Ch
192	require.NoError(t, err)
193	require.Equal(t, block2, block)
194
195	t.Log("Make another request for ptr2")
196	req2Ch = q.Request(
197		context.Background(), basePriority, makeKMD(), ptr2, block,
198		data.NoCacheEntry, BlockRequestSolo)
199	continueCh2 <- nil
200	err = <-req2Ch
201	require.NoError(t, err)
202	require.Equal(t, block2, block)
203
204	t.Log("Complete the ptr1 request")
205	continueCh1 <- nil
206	err = <-req1Ch
207	require.NoError(t, err)
208	require.Equal(t, block1, block)
209}
210
211func TestBlockRetrievalWorkerWithQueue(t *testing.T) {
212	t.Log("Test the ability of a worker and queue to work correctly together.")
213	bg := newFakeBlockGetter(false)
214	q := newBlockRetrievalQueue(
215		1, 0, 0, newTestBlockRetrievalConfig(t, bg, nil),
216		env.EmptyAppStateUpdater{})
217	require.NotNil(t, q)
218	defer endBlockRetrievalQueueTest(t, q)
219
220	ptr1, ptr2, ptr3 := makeRandomBlockPointer(t), makeRandomBlockPointer(t),
221		makeRandomBlockPointer(t)
222	block1, block2, block3 := makeFakeFileBlock(t, false),
223		makeFakeFileBlock(t, false), makeFakeFileBlock(t, false)
224	startCh1, continueCh1 := bg.setBlockToReturn(ptr1, block1)
225	_, continueCh2 := bg.setBlockToReturn(ptr2, block2)
226	_, continueCh3 := bg.setBlockToReturn(ptr3, block3)
227
228	t.Log("Make 3 retrievals for 3 different blocks. All retrievals after " +
229		"the first should be queued.")
230	block := &data.FileBlock{}
231	testBlock1 := &data.FileBlock{}
232	testBlock2 := &data.FileBlock{}
233	// Set the base priority to be above the default on-demand
234	// fetching, so that the pre-prefetch request for a block doesn't
235	// override the other blocks' requests.
236	basePriority := defaultOnDemandRequestPriority + 1
237	req1Ch := q.Request(
238		context.Background(), basePriority, makeKMD(), ptr1,
239		block, data.NoCacheEntry, BlockRequestSolo)
240	req2Ch := q.Request(
241		context.Background(), basePriority, makeKMD(), ptr2,
242		block, data.NoCacheEntry, BlockRequestSolo)
243	req3Ch := q.Request(
244		context.Background(), basePriority, makeKMD(), ptr3, testBlock1,
245		data.NoCacheEntry, BlockRequestSolo)
246	// Ensure the worker picks up the first request
247	<-startCh1
248	t.Log("Make a high priority request for the third block, which should " +
249		"complete next.")
250	req4Ch := q.Request(
251		context.Background(), basePriority+1, makeKMD(), ptr3, testBlock2,
252		data.NoCacheEntry, BlockRequestSolo)
253
254	t.Log("Allow the ptr1 retrieval to complete.")
255	continueCh1 <- nil
256	err := <-req1Ch
257	require.NoError(t, err)
258	require.Equal(t, block1, block)
259
260	t.Log("Allow the ptr3 retrieval to complete. Both waiting requests " +
261		"should complete.")
262	continueCh3 <- nil
263	err1 := <-req3Ch
264	err2 := <-req4Ch
265	require.NoError(t, err1)
266	require.NoError(t, err2)
267	require.Equal(t, block3, testBlock1)
268	require.Equal(t, block3, testBlock2)
269
270	t.Log("Complete the ptr2 retrieval.")
271	continueCh2 <- nil
272	err = <-req2Ch
273	require.NoError(t, err)
274	require.Equal(t, block2, block)
275}
276
277func TestBlockRetrievalWorkerCancel(t *testing.T) {
278	t.Log("Test the ability of a worker to handle a request cancelation.")
279	bg := newFakeBlockGetter(true)
280	q := newBlockRetrievalQueue(
281		0, 1, 0, newTestBlockRetrievalConfig(t, bg, nil),
282		env.EmptyAppStateUpdater{})
283	require.NotNil(t, q)
284	defer endBlockRetrievalQueueTest(t, q)
285
286	ptr1 := makeRandomBlockPointer(t)
287	block1 := makeFakeFileBlock(t, false)
288	// Don't need continueCh here.
289	_, _ = bg.setBlockToReturn(ptr1, block1)
290
291	block := &data.FileBlock{}
292	ctx, cancel := context.WithCancel(context.Background())
293	cancel()
294	ch := q.Request(
295		ctx, 1, makeKMD(), ptr1, block, data.NoCacheEntry, BlockRequestSolo)
296	err := <-ch
297	require.EqualError(t, err, context.Canceled.Error())
298}
299
300func TestBlockRetrievalWorkerShutdown(t *testing.T) {
301	t.Log("Test that worker shutdown works.")
302	bg := newFakeBlockGetter(false)
303	q := newBlockRetrievalQueue(
304		1, 0, 0, newTestBlockRetrievalConfig(t, bg, nil),
305		env.EmptyAppStateUpdater{})
306	require.NotNil(t, q)
307	defer endBlockRetrievalQueueTest(t, q)
308
309	w := q.workers[0]
310	require.NotNil(t, w)
311
312	ptr1 := makeRandomBlockPointer(t)
313	block1 := makeFakeFileBlock(t, false)
314	_, continueCh := bg.setBlockToReturn(ptr1, block1)
315
316	w.Shutdown()
317	block := &data.FileBlock{}
318	ctx, cancel := context.WithCancel(context.Background())
319	// Ensure the context loop is stopped so the test doesn't leak goroutines
320	defer cancel()
321	ch := q.Request(
322		ctx, 1, makeKMD(), ptr1, block, data.NoCacheEntry, BlockRequestSolo)
323	shutdown := false
324	select {
325	case <-ch:
326		t.Fatal("Expected not to retrieve a result from the Request.")
327	case continueCh <- nil:
328		t.Fatal("Expected the block getter not to be receiving.")
329	default:
330		shutdown = true
331	}
332	require.True(t, shutdown)
333
334	// Ensure the test completes in a reasonable time.
335	timer := time.NewTimer(10 * time.Second)
336	doneCh := make(chan struct{})
337	go func() {
338		w.Shutdown()
339		close(doneCh)
340	}()
341	select {
342	case <-timer.C:
343		t.Fatal("Expected another Shutdown not to block.")
344	case <-doneCh:
345	}
346}
347
348func TestBlockRetrievalWorkerPrefetchedPriorityElevation(t *testing.T) {
349	t.Log("Test that we can escalate the priority of a request and it " +
350		"correctly switches workers.")
351	bg := newFakeBlockGetter(false)
352	q := newBlockRetrievalQueue(
353		1, 1, 0, newTestBlockRetrievalConfig(t, bg, nil),
354		env.EmptyAppStateUpdater{})
355	require.NotNil(t, q)
356	defer endBlockRetrievalQueueTest(t, q)
357
358	t.Log("Setup source blocks")
359	ptr1, ptr2 := makeRandomBlockPointer(t), makeRandomBlockPointer(t)
360	block1, block2 := makeFakeFileBlock(t, false), makeFakeFileBlock(t, false)
361	_, continueCh1 := bg.setBlockToReturn(ptr1, block1)
362	_, continueCh2 := bg.setBlockToReturn(ptr2, block2)
363
364	t.Log("Make a low-priority request. This will get to the worker.")
365	testBlock1 := &data.FileBlock{}
366	req1Ch := q.Request(
367		context.Background(), 1, makeKMD(), ptr1, testBlock1,
368		data.NoCacheEntry, BlockRequestSolo)
369
370	t.Log("Make another low-priority request. This will block.")
371	testBlock2 := &data.FileBlock{}
372	req2Ch := q.Request(
373		context.Background(), 1, makeKMD(), ptr2, testBlock2,
374		data.NoCacheEntry, BlockRequestSolo)
375
376	t.Log("Make an on-demand request for the same block as the blocked " +
377		"request.")
378	testBlock3 := &data.FileBlock{}
379	req3Ch := q.Request(
380		context.Background(), defaultOnDemandRequestPriority,
381		makeKMD(), ptr2, testBlock3, data.NoCacheEntry, BlockRequestSolo)
382
383	t.Log("Release the requests for the second block first. " +
384		"Since the prefetch worker is still blocked, this confirms that the " +
385		"escalation to an on-demand worker was successful.")
386	continueCh2 <- nil
387	err := <-req3Ch
388	require.NoError(t, err)
389	require.Equal(t, testBlock3, block2)
390	err = <-req2Ch
391	require.NoError(t, err)
392	require.Equal(t, testBlock2, block2)
393
394	t.Log("Allow the initial ptr1 request to complete.")
395	continueCh1 <- nil
396	err = <-req1Ch
397	require.NoError(t, err)
398	require.Equal(t, testBlock1, block1)
399}
400
401func TestBlockRetrievalWorkerStopIfFull(t *testing.T) {
402	ctx, cancel := context.WithTimeout(
403		context.Background(), individualTestTimeout)
404	defer cancel()
405	dbc, dbcConfig := initDiskBlockCacheTest(t)
406	defer dbc.Shutdown(ctx)
407
408	bg := newFakeBlockGetter(false)
409	q := newBlockRetrievalQueue(
410		1, 1, 0, newTestBlockRetrievalConfig(t, bg, dbc),
411		env.EmptyAppStateUpdater{})
412	require.NotNil(t, q)
413	<-q.TogglePrefetcher(false, nil, nil)
414	defer endBlockRetrievalQueueTest(t, q)
415
416	ptr := makeRandomBlockPointer(t)
417	syncCache := dbc.syncCache
418	workingCache := dbc.workingSetCache
419
420	t.Log("Set the cache maximum bytes to the current total.")
421	syncBytes, workingBytes := testGetDiskCacheBytes(syncCache, workingCache)
422	limiter := dbcConfig.DiskLimiter().(*backpressureDiskLimiter)
423	setLimiterLimits(limiter, syncBytes, workingBytes)
424
425	t.Log("Request with stop-if-full, when full")
426	testBlock := &data.FileBlock{}
427	req := q.Request(
428		ctx, 1, makeKMD(), ptr, testBlock, data.NoCacheEntry,
429		BlockRequestPrefetchUntilFull)
430	select {
431	case err := <-req:
432		require.IsType(t, DiskCacheTooFullForBlockError{}, err)
433	case <-ctx.Done():
434		require.FailNow(t, ctx.Err().Error())
435	}
436
437	t.Log("Request without stop-if-full, when full")
438	block := makeFakeFileBlock(t, false)
439	startCh, continueCh := bg.setBlockToReturn(ptr, block)
440	req = q.Request(
441		ctx, 1, makeKMD(), ptr, testBlock, data.NoCacheEntry,
442		BlockRequestSolo)
443	<-startCh
444	continueCh <- nil
445	select {
446	case err := <-req:
447		require.NoError(t, err)
448	case <-ctx.Done():
449		require.FailNow(t, ctx.Err().Error())
450	}
451}
452