1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package metainfo
5
6import (
7	"context"
8	"sync"
9
10	"go.uber.org/zap"
11
12	"storj.io/common/errs2"
13	"storj.io/common/pb"
14	"storj.io/common/rpc/rpcstatus"
15	"storj.io/common/storj"
16	"storj.io/common/useragent"
17	"storj.io/common/uuid"
18	"storj.io/drpc/drpccache"
19	"storj.io/storj/satellite/attribution"
20	"storj.io/storj/satellite/console"
21)
22
23// MaxUserAgentLength is the maximum allowable length of the User Agent.
24const MaxUserAgentLength = 500
25
26// ensureAttribution ensures that the bucketName has the partner information specified by keyInfo partner ID or the header user agent.
27// PartnerID from keyInfo is a value associated with registered user and prevails over header user agent.
28//
29// Assumes that the user has permissions sufficient for authenticating.
30func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, keyInfo *console.APIKeyInfo, bucketName []byte) error {
31	if header == nil {
32		return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
33	}
34	if len(header.UserAgent) == 0 && keyInfo.PartnerID.IsZero() && keyInfo.UserAgent == nil {
35		return nil
36	}
37
38	if conncache := drpccache.FromContext(ctx); conncache != nil {
39		cache := conncache.LoadOrCreate(attributionCheckCacheKey{},
40			func() interface{} {
41				return &attributionCheckCache{}
42			}).(*attributionCheckCache)
43		if !cache.needsCheck(string(bucketName)) {
44			return nil
45		}
46	}
47
48	partnerID := keyInfo.PartnerID
49	userAgent := keyInfo.UserAgent
50	// first check keyInfo (user) attribution
51	if partnerID.IsZero() && userAgent == nil {
52		// otherwise, use header (partner tool) as attribution
53		userAgent = header.UserAgent
54		if userAgent == nil {
55			return nil
56		}
57	}
58
59	userAgent, err := TrimUserAgent(userAgent)
60	if err != nil {
61		return err
62	}
63
64	err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID, userAgent)
65	if errs2.IsRPC(err, rpcstatus.NotFound) || errs2.IsRPC(err, rpcstatus.AlreadyExists) {
66		return nil
67	}
68	return err
69}
70
71// TrimUserAgent returns userAgentBytes that consist of only the product portion of the user agent, and is bounded by
72// the maxUserAgentLength.
73func TrimUserAgent(userAgent []byte) ([]byte, error) {
74	if len(userAgent) == 0 {
75		return userAgent, nil
76	}
77	userAgentEntries, err := useragent.ParseEntries(userAgent)
78	if err != nil {
79		return userAgent, Error.New("error while parsing user agent: %w", err)
80	}
81	// strip comments, libraries, and empty products from the user agent
82	newEntries := userAgentEntries[:0]
83	for _, e := range userAgentEntries {
84		switch product := e.Product; product {
85		case "uplink", "common", "drpc", "":
86		default:
87			e.Comment = ""
88			newEntries = append(newEntries, e)
89		}
90	}
91	userAgent, err = useragent.EncodeEntries(newEntries)
92	if err != nil {
93		return userAgent, Error.New("error while encoding user agent entries: %w", err)
94	}
95
96	// bound the user agent length
97	if len(userAgent) > MaxUserAgentLength && len(newEntries) > 0 {
98		// try to preserve the first entry
99		if (len(newEntries[0].Product) + len(newEntries[0].Version)) <= MaxUserAgentLength {
100			userAgent, err = useragent.EncodeEntries(newEntries[:1])
101			if err != nil {
102				return userAgent, Error.New("error while encoding first user agent entry: %w", err)
103			}
104		} else {
105			// first entry is too large, truncate
106			userAgent = userAgent[:MaxUserAgentLength]
107		}
108	}
109	return userAgent, nil
110}
111
112func (endpoint *Endpoint) tryUpdateBucketAttribution(ctx context.Context, header *pb.RequestHeader, projectID uuid.UUID, bucketName []byte, partnerID uuid.UUID, userAgent []byte) error {
113	if header == nil {
114		return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
115	}
116
117	// check if attribution is set for given bucket
118	_, err := endpoint.attributions.Get(ctx, projectID, bucketName)
119	if err == nil {
120		// bucket has already an attribution, no need to update
121		return nil
122	}
123	if !attribution.ErrBucketNotAttributed.Has(err) {
124		// try only to set the attribution, when it's missing
125		endpoint.log.Error("error while getting attribution from DB", zap.Error(err))
126		return rpcstatus.Error(rpcstatus.Internal, err.Error())
127	}
128
129	empty, err := endpoint.isBucketEmpty(ctx, projectID, bucketName)
130	if err != nil {
131		endpoint.log.Error("internal", zap.Error(err))
132		return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
133	}
134	if !empty {
135		return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q is not empty, PartnerID %q cannot be attributed", bucketName, partnerID)
136	}
137
138	// checks if bucket exists before updates it or makes a new entry
139	bucket, err := endpoint.buckets.GetBucket(ctx, bucketName, projectID)
140	if err != nil {
141		if storj.ErrBucketNotFound.Has(err) {
142			return rpcstatus.Errorf(rpcstatus.NotFound, "bucket %q does not exist", bucketName)
143		}
144		endpoint.log.Error("error while getting bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
145		return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
146	}
147	if !bucket.PartnerID.IsZero() || bucket.UserAgent != nil {
148		return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q already has attribution, PartnerID %q cannot be attributed", bucketName, partnerID)
149	}
150
151	// update bucket information
152	bucket.PartnerID = partnerID
153	bucket.UserAgent = userAgent
154	_, err = endpoint.buckets.UpdateBucket(ctx, bucket)
155	if err != nil {
156		endpoint.log.Error("error while updating bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
157		return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
158	}
159
160	// update attribution table
161	_, err = endpoint.attributions.Insert(ctx, &attribution.Info{
162		ProjectID:  projectID,
163		BucketName: bucketName,
164		PartnerID:  partnerID,
165		UserAgent:  userAgent,
166	})
167	if err != nil {
168		endpoint.log.Error("error while inserting attribution to DB", zap.Error(err))
169		return rpcstatus.Error(rpcstatus.Internal, err.Error())
170	}
171
172	return nil
173}
174
175// maxAttributionCacheSize determines how many buckets attributionCheckCache remembers.
176const maxAttributionCacheSize = 10
177
178// attributionCheckCacheKey is used as a key for the connection cache.
179type attributionCheckCacheKey struct{}
180
181// attributionCheckCache implements a basic lru cache, with a constant size.
182type attributionCheckCache struct {
183	mu      sync.Mutex
184	pos     int
185	buckets []string
186}
187
188// needsCheck returns true when the bucket should be tested for setting the useragent.
189func (cache *attributionCheckCache) needsCheck(bucket string) bool {
190	cache.mu.Lock()
191	defer cache.mu.Unlock()
192
193	for _, b := range cache.buckets {
194		if b == bucket {
195			return false
196		}
197	}
198
199	if len(cache.buckets) >= maxAttributionCacheSize {
200		cache.pos = (cache.pos + 1) % len(cache.buckets)
201		cache.buckets[cache.pos] = bucket
202	} else {
203		cache.pos = len(cache.buckets)
204		cache.buckets = append(cache.buckets, bucket)
205	}
206
207	return true
208}
209