1// Copyright 2016 Keybase Inc. All rights reserved.
2// Use of this source code is governed by a BSD
3// license that can be found in the LICENSE file.
4
5package libkbfs
6
7import (
8	"github.com/keybase/client/go/kbfs/data"
9	"github.com/keybase/client/go/kbfs/kbfsblock"
10	"github.com/keybase/client/go/kbfs/kbfscodec"
11	"github.com/keybase/client/go/kbfs/kbfscrypto"
12	"github.com/keybase/client/go/kbfs/libkey"
13	"github.com/keybase/client/go/kbfs/tlf"
14	"github.com/pkg/errors"
15	"golang.org/x/net/context"
16	"golang.org/x/sync/errgroup"
17)
18
19func isRecoverableBlockError(err error) bool {
20	_, isArchiveError := err.(kbfsblock.ServerErrorBlockArchived)
21	_, isDeleteError := err.(kbfsblock.ServerErrorBlockDeleted)
22	_, isRefError := err.(kbfsblock.ServerErrorBlockNonExistent)
23	_, isMaxExceededError := err.(kbfsblock.ServerErrorMaxRefExceeded)
24	return isArchiveError || isDeleteError || isRefError || isMaxExceededError
25}
26
27// putBlockToServer either puts the full block to the block server, or
28// just adds a reference, depending on the refnonce in blockPtr.
29func putBlockToServer(
30	ctx context.Context, bserv BlockServer, tlfID tlf.ID,
31	blockPtr data.BlockPointer, readyBlockData data.ReadyBlockData,
32	cacheType DiskBlockCacheType) error {
33	var err error
34	if blockPtr.RefNonce == kbfsblock.ZeroRefNonce {
35		err = bserv.Put(ctx, tlfID, blockPtr.ID, blockPtr.Context,
36			readyBlockData.Buf, readyBlockData.ServerHalf, cacheType)
37	} else {
38		// non-zero block refnonce means this is a new reference to an
39		// existing block.
40		err = bserv.AddBlockReference(ctx, tlfID, blockPtr.ID,
41			blockPtr.Context)
42	}
43	return err
44}
45
46// PutBlockCheckLimitErrs is a thin wrapper around putBlockToServer (which
47// calls either bserver.Put or bserver.AddBlockReference) that reports
48// quota and disk limit errors.
49func PutBlockCheckLimitErrs(ctx context.Context, bserv BlockServer,
50	reporter Reporter, tlfID tlf.ID, blockPtr data.BlockPointer,
51	readyBlockData data.ReadyBlockData, tlfName tlf.CanonicalName,
52	cacheType DiskBlockCacheType) error {
53	err := putBlockToServer(
54		ctx, bserv, tlfID, blockPtr, readyBlockData, cacheType)
55	switch typedErr := errors.Cause(err).(type) {
56	case kbfsblock.ServerErrorOverQuota:
57		if !typedErr.Throttled {
58			// Report the error, but since it's not throttled the Put
59			// actually succeeded, so return nil back to the caller.
60			reporter.ReportErr(ctx, tlfName, tlfID.Type(),
61				WriteMode, OverQuotaWarning{typedErr.Usage, typedErr.Limit})
62			return nil
63		}
64	case *ErrDiskLimitTimeout:
65		// Report this here in case the put is happening in a
66		// background goroutine (via `SyncAll` perhaps) and wouldn't
67		// otherwise be reported.  Mark the error as unreportable to
68		// avoid the upper FS layer reporting it twice, if this block
69		// put is the result of a foreground fsync.
70		reporter.ReportErr(
71			ctx, tlfName, tlfID.Type(), WriteMode, err)
72		typedErr.reportable = false
73		return err
74	}
75	return err
76}
77
78func doOneBlockPut(ctx context.Context, bserv BlockServer, reporter Reporter,
79	tlfID tlf.ID, tlfName tlf.CanonicalName, ptr data.BlockPointer,
80	bps blockPutState, blocksToRemoveChan chan data.BlockPointer,
81	cacheType DiskBlockCacheType) error {
82	readyBlockData, err := bps.getReadyBlockData(ctx, ptr)
83	if err != nil {
84		return err
85	}
86	err = PutBlockCheckLimitErrs(
87		ctx, bserv, reporter, tlfID, ptr, readyBlockData, tlfName, cacheType)
88	if err == nil {
89		err = bps.synced(ptr)
90	}
91	if err != nil && isRecoverableBlockError(err) {
92		block, blockErr := bps.GetBlock(ctx, ptr)
93		if blockErr == nil {
94			fblock, ok := block.(*data.FileBlock)
95			if ok && !fblock.IsInd {
96				blocksToRemoveChan <- ptr
97			}
98		}
99	}
100
101	return err
102}
103
104// doBlockPuts writes all the pending block puts to the cache and
105// server. If the err returned by this function satisfies
106// isRecoverableBlockError(err), the caller should retry its entire
107// operation, starting from when the MD successor was created.
108//
109// Returns a slice of block pointers that resulted in recoverable
110// errors and should be removed by the caller from any saved state.
111func doBlockPuts(ctx context.Context, bserv BlockServer, bcache data.BlockCache,
112	reporter Reporter, log, deferLog traceLogger, tlfID tlf.ID,
113	tlfName tlf.CanonicalName, bps blockPutState,
114	cacheType DiskBlockCacheType) (blocksToRemove []data.BlockPointer, err error) {
115	blockCount := bps.numBlocks()
116	log.LazyTrace(ctx, "doBlockPuts with %d blocks", blockCount)
117	defer func() {
118		deferLog.LazyTrace(ctx, "doBlockPuts with %d blocks (err=%v)", blockCount, err)
119	}()
120
121	eg, groupCtx := errgroup.WithContext(ctx)
122
123	blocks := make(chan data.BlockPointer, blockCount)
124
125	numWorkers := blockCount
126	if numWorkers > maxParallelBlockPuts {
127		numWorkers = maxParallelBlockPuts
128	}
129	// A channel to list any blocks that have been archived or
130	// deleted.  Any of these will result in an error, so the maximum
131	// we'll get is the same as the number of workers.
132	blocksToRemoveChan := make(chan data.BlockPointer, numWorkers)
133
134	worker := func() error {
135		for ptr := range blocks {
136			err := doOneBlockPut(groupCtx, bserv, reporter, tlfID,
137				tlfName, ptr, bps, blocksToRemoveChan, cacheType)
138			if err != nil {
139				return err
140			}
141		}
142		return nil
143	}
144	for i := 0; i < numWorkers; i++ {
145		eg.Go(worker)
146	}
147
148	for _, ptr := range bps.Ptrs() {
149		blocks <- ptr
150	}
151	close(blocks)
152
153	err = eg.Wait()
154	close(blocksToRemoveChan)
155	if isRecoverableBlockError(err) {
156		// Wait for all the outstanding puts to finish, to amortize
157		// the work of re-doing the put.
158		for ptr := range blocksToRemoveChan {
159			// Let the caller know which blocks shouldn't be
160			// retried.
161			blocksToRemove = append(blocksToRemove, ptr)
162			if block, err := bps.GetBlock(ctx, ptr); err == nil {
163				if fblock, ok := block.(*data.FileBlock); ok {
164					// Remove each problematic block from the cache so
165					// the redo can just make a new block instead.
166					if err := bcache.DeleteKnownPtr(tlfID, fblock); err != nil {
167						log.CWarningf(
168							ctx, "Couldn't delete ptr for a block: %v", err)
169					}
170				}
171			}
172			if err := bcache.DeleteTransient(ptr.ID, tlfID); err != nil {
173				log.CWarningf(ctx, "Couldn't delete block: %v", err)
174			}
175		}
176	}
177	return blocksToRemove, err
178}
179
180func doAssembleBlock(
181	ctx context.Context, keyGetter blockKeyGetter, codec kbfscodec.Codec,
182	cryptoPure cryptoPure, kmd libkey.KeyMetadata, blockPtr data.BlockPointer,
183	block data.Block, buf []byte,
184	blockServerHalf kbfscrypto.BlockCryptKeyServerHalf) error {
185	tlfCryptKey, err := keyGetter.GetTLFCryptKeyForBlockDecryption(
186		ctx, kmd, blockPtr)
187	if err != nil {
188		return err
189	}
190
191	var encryptedBlock kbfscrypto.EncryptedBlock
192	err = codec.Decode(buf, &encryptedBlock)
193	if err != nil {
194		return err
195	}
196
197	if idType, blockType :=
198		blockPtr.ID.HashType(),
199		encryptedBlock.Version.ToHashType(); idType != blockType {
200		return errors.Errorf(
201			"Block ID %s and encrypted block disagree on encryption method "+
202				"(block ID: %s, encrypted block: %s)",
203			blockPtr.ID, idType, blockType)
204	}
205
206	// decrypt the block
207	err = cryptoPure.DecryptBlock(
208		encryptedBlock, tlfCryptKey, blockServerHalf, block)
209	if err != nil {
210		return err
211	}
212
213	block.SetEncodedSize(uint32(len(buf)))
214	return nil
215}
216
217func assembleBlockLocal(
218	ctx context.Context, keyGetter blockKeyGetter, codec kbfscodec.Codec,
219	cryptoPure cryptoPure, kmd libkey.KeyMetadata, blockPtr data.BlockPointer,
220	block data.Block, buf []byte,
221	blockServerHalf kbfscrypto.BlockCryptKeyServerHalf) error {
222	// This call only verifies the block ID if we're not running
223	// production mode, for performance reasons.
224	if err := verifyLocalBlockIDMaybe(buf, blockPtr.ID); err != nil {
225		return err
226	}
227
228	return doAssembleBlock(
229		ctx, keyGetter, codec, cryptoPure, kmd, blockPtr, block, buf,
230		blockServerHalf)
231}
232
233func assembleBlock(
234	ctx context.Context, keyGetter blockKeyGetter, codec kbfscodec.Codec,
235	cryptoPure cryptoPure, kmd libkey.KeyMetadata, blockPtr data.BlockPointer,
236	block data.Block, buf []byte,
237	blockServerHalf kbfscrypto.BlockCryptKeyServerHalf) error {
238	if err := kbfsblock.VerifyID(buf, blockPtr.ID); err != nil {
239		return err
240	}
241
242	return doAssembleBlock(
243		ctx, keyGetter, codec, cryptoPure, kmd, blockPtr, block, buf,
244		blockServerHalf)
245}
246