1// Copyright 2017 Keybase Inc. All rights reserved.
2// Use of this source code is governed by a BSD
3// license that can be found in the LICENSE file.
4
5package libkbfs
6
7import (
8	"fmt"
9	"sync"
10	"time"
11
12	"github.com/keybase/client/go/kbfs/kbfsblock"
13	"github.com/keybase/client/go/libkb"
14	"github.com/keybase/client/go/logger"
15	"github.com/keybase/client/go/protocol/keybase1"
16	"github.com/pkg/errors"
17	"golang.org/x/net/context"
18)
19
20const (
21	// quotaServerTimeoutWhenCached defines how long we wait for the
22	// quota usage to return from the server, when we've already read
23	// it from the disk cache.
24	quotaServerTimeoutWhenCached = 500 * time.Millisecond
25)
26
27// ECQUCtxTagKey is the type for unique ECQU background operation IDs.
28type ECQUCtxTagKey struct{}
29
30// ECQUID is used in EventuallyConsistentQuotaUsage for only background RPCs.
31// More specifically, when we need to spawn a background goroutine for
32// GetUserQuotaInfo, a new context with this tag is created and used. This is
33// also used as a prefix for the logger module name in
34// EventuallyConsistentQuotaUsage.
35const ECQUID = "ECQU"
36
37type cachedQuotaUsage struct {
38	timestamp       time.Time
39	usageBytes      int64
40	archiveBytes    int64
41	limitBytes      int64
42	gitUsageBytes   int64
43	gitArchiveBytes int64
44	gitLimitBytes   int64
45}
46
47// EventuallyConsistentQuotaUsage keeps tracks of quota usage, in a way user of
48// which can choose to accept stale data to reduce calls into block servers.
49type EventuallyConsistentQuotaUsage struct {
50	config  Config
51	log     logger.Logger
52	tid     keybase1.TeamID
53	fetcher *fetchDecider
54
55	mu      sync.RWMutex
56	cached  cachedQuotaUsage
57	bgFetch bool
58}
59
60// QuotaUsageLogModule makes a log module for a quota usage log.
61func QuotaUsageLogModule(suffix string) string {
62	return fmt.Sprintf("%s - %s", ECQUID, suffix)
63}
64
65// NewEventuallyConsistentQuotaUsage creates a new
66// EventuallyConsistentQuotaUsage object.
67func NewEventuallyConsistentQuotaUsage(
68	config Config, log logger.Logger,
69	vlog *libkb.VDebugLog) *EventuallyConsistentQuotaUsage {
70	q := &EventuallyConsistentQuotaUsage{
71		config: config,
72		log:    log,
73	}
74	q.fetcher = newFetchDecider(
75		q.log, vlog, q.getAndCache, ECQUCtxTagKey{}, ECQUID, q.config)
76	return q
77}
78
79// NewEventuallyConsistentTeamQuotaUsage creates a new
80// EventuallyConsistentQuotaUsage object.
81func NewEventuallyConsistentTeamQuotaUsage(
82	config Config, tid keybase1.TeamID,
83	log logger.Logger, vlog *libkb.VDebugLog) *EventuallyConsistentQuotaUsage {
84	q := NewEventuallyConsistentQuotaUsage(config, log, vlog)
85	q.tid = tid
86	return q
87}
88
89func (q *EventuallyConsistentQuotaUsage) getCached() cachedQuotaUsage {
90	q.mu.RLock()
91	defer q.mu.RUnlock()
92	return q.cached
93}
94
95func (q *EventuallyConsistentQuotaUsage) getID(
96	ctx context.Context) (keybase1.UserOrTeamID, error) {
97	if q.tid.IsNil() {
98		session, err := q.config.KBPKI().GetCurrentSession(ctx)
99		if err != nil {
100			return keybase1.UserOrTeamID(""), err
101		}
102		return session.UID.AsUserOrTeam(), nil
103	}
104	return q.tid.AsUserOrTeam(), nil
105}
106
107func (q *EventuallyConsistentQuotaUsage) cache(
108	ctx context.Context, quotaInfo *kbfsblock.QuotaInfo, doCacheToDisk bool) {
109	id, err := q.getID(ctx)
110	if err != nil {
111		q.log.CDebugf(ctx, "Can't get ID: %+v", err)
112		return
113	}
114
115	q.mu.Lock()
116	defer q.mu.Unlock()
117	q.cached.limitBytes = quotaInfo.Limit
118	q.cached.gitLimitBytes = quotaInfo.GitLimit
119	if quotaInfo.Total != nil {
120		q.cached.usageBytes = quotaInfo.Total.Bytes[kbfsblock.UsageWrite]
121		q.cached.archiveBytes = quotaInfo.Total.Bytes[kbfsblock.UsageArchive]
122		q.cached.gitUsageBytes = quotaInfo.Total.Bytes[kbfsblock.UsageGitWrite]
123		q.cached.gitArchiveBytes =
124			quotaInfo.Total.Bytes[kbfsblock.UsageGitArchive]
125	} else {
126		q.cached.usageBytes = 0
127	}
128	q.cached.timestamp = q.config.Clock().Now()
129
130	dqc := q.config.DiskQuotaCache()
131	if !doCacheToDisk || dqc == nil {
132		return
133	}
134
135	err = dqc.Put(ctx, id, *quotaInfo)
136	if err != nil {
137		q.log.CDebugf(ctx, "Can't cache quota for %s: %+v", id, err)
138	}
139}
140
141func (q *EventuallyConsistentQuotaUsage) fetch(ctx context.Context) (
142	quotaInfo *kbfsblock.QuotaInfo, err error) {
143	bserver := q.config.BlockServer()
144	for i := 0; bserver == nil; i++ {
145		// This is possible if a login event comes in during
146		// initialization.
147		if i == 0 {
148			q.log.CDebugf(ctx, "Waiting for bserver")
149		}
150		time.Sleep(100 * time.Millisecond)
151		bserver = q.config.BlockServer()
152	}
153	if q.tid.IsNil() {
154		return bserver.GetUserQuotaInfo(ctx)
155	}
156	return bserver.GetTeamQuotaInfo(ctx, q.tid)
157}
158
159func (q *EventuallyConsistentQuotaUsage) doBackgroundFetch() {
160	doFetch := func() bool {
161		q.mu.Lock()
162		defer q.mu.Unlock()
163		if q.bgFetch {
164			return false
165		}
166		q.bgFetch = true
167		return true
168	}()
169	if !doFetch {
170		return
171	}
172
173	defer func() {
174		q.mu.Lock()
175		defer q.mu.Unlock()
176		q.bgFetch = false
177	}()
178
179	ctx := CtxWithRandomIDReplayable(
180		context.Background(), ECQUCtxTagKey{}, ECQUID, q.log)
181	q.log.CDebugf(ctx, "Running background quota fetch, without a timeout")
182
183	quotaInfo, err := q.fetch(ctx)
184	if err != nil {
185		q.log.CDebugf(ctx, "Unable to fetch quota in background: %+v", err)
186		return
187	}
188	q.cache(ctx, quotaInfo, true)
189}
190
191func (q *EventuallyConsistentQuotaUsage) getAndCache(
192	ctx context.Context) (err error) {
193	defer func() {
194		q.log.CDebugf(ctx, "getAndCache: error=%v", err)
195	}()
196
197	// Try pulling the quota from the disk cache.  If it exists, still
198	// try the servers, but give it a short timeout.
199	var quotaInfoFromCache *kbfsblock.QuotaInfo
200	id, err := q.getID(ctx)
201	if err != nil {
202		return err
203	}
204	getCtx := ctx
205	dqc := q.config.DiskQuotaCache()
206	if dqc != nil {
207		qi, err := dqc.Get(ctx, id)
208		if err == nil {
209			q.log.CDebugf(ctx, "Read quota for %s from disk cache", id)
210			quotaInfoFromCache = &qi
211			var cancel context.CancelFunc
212			getCtx, cancel = context.WithTimeout(
213				ctx, quotaServerTimeoutWhenCached)
214			defer cancel()
215		}
216	}
217
218	quotaInfo, err := q.fetch(getCtx)
219	doCacheToDisk := dqc != nil
220	switch err {
221	case nil:
222	case context.DeadlineExceeded:
223		go q.doBackgroundFetch()
224		if quotaInfoFromCache != nil {
225			q.log.CDebugf(ctx, "Can't contact server; using cached quota")
226			quotaInfo = quotaInfoFromCache
227			doCacheToDisk = false
228		} else {
229			return err
230		}
231	default:
232		return err
233	}
234
235	q.cache(ctx, quotaInfo, doCacheToDisk)
236	return nil
237}
238
239// Get returns KBFS bytes used and limit for user, for the current
240// default block type. To help avoid having too frequent calls into
241// bserver, caller can provide a positive tolerance, to accept stale
242// LimitBytes and UsageBytes data. If tolerance is 0 or negative, this
243// always makes a blocking RPC to bserver and return latest quota
244// usage.
245//
246// 1) If the age of cached data is more than blockTolerance, a blocking RPC is
247// issued and the function only returns after RPC finishes, with the newest
248// data from RPC. The RPC causes cached data to be refreshed as well.
249// 2) Otherwise, if the age of cached data is more than bgTolerance,
250// a background RPC is spawned to refresh cached data, and the stale
251// data is returned immediately.
252// 3) Otherwise, the cached stale data is returned immediately.
253func (q *EventuallyConsistentQuotaUsage) Get(
254	ctx context.Context, bgTolerance, blockTolerance time.Duration) (
255	timestamp time.Time, usageBytes, archiveBytes, limitBytes int64,
256	err error) {
257	c := q.getCached()
258	err = q.fetcher.Do(ctx, bgTolerance, blockTolerance, c.timestamp)
259	if err != nil {
260		return time.Time{}, -1, -1, -1, err
261	}
262
263	c = q.getCached()
264	switch q.config.DefaultBlockType() {
265	case keybase1.BlockType_DATA:
266		return c.timestamp, c.usageBytes, c.archiveBytes, c.limitBytes, nil
267	case keybase1.BlockType_GIT:
268		return c.timestamp, c.gitUsageBytes, c.gitArchiveBytes,
269			c.gitLimitBytes, nil
270	default:
271		return time.Time{}, -1, -1, -1, errors.Errorf(
272			"Unknown default block type: %d", q.config.DefaultBlockType())
273	}
274}
275
276// GetAllTypes is the same as Get, except it returns usage and limits
277// for all block types.
278func (q *EventuallyConsistentQuotaUsage) GetAllTypes(
279	ctx context.Context, bgTolerance, blockTolerance time.Duration) (
280	timestamp time.Time,
281	usageBytes, archiveBytes, limitBytes,
282	gitUsageBytes, gitArchiveBytes, gitLimitBytes int64, err error) {
283	c := q.getCached()
284	err = q.fetcher.Do(ctx, bgTolerance, blockTolerance, c.timestamp)
285	if err != nil {
286		return time.Time{}, -1, -1, -1, -1, -1, -1, err
287	}
288
289	c = q.getCached()
290	return c.timestamp,
291		c.usageBytes, c.archiveBytes, c.limitBytes,
292		c.gitUsageBytes, c.gitArchiveBytes, c.gitLimitBytes, nil
293}
294