1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package metainfo 5 6import ( 7 "context" 8 "sync" 9 10 "go.uber.org/zap" 11 12 "storj.io/common/errs2" 13 "storj.io/common/pb" 14 "storj.io/common/rpc/rpcstatus" 15 "storj.io/common/storj" 16 "storj.io/common/useragent" 17 "storj.io/common/uuid" 18 "storj.io/drpc/drpccache" 19 "storj.io/storj/satellite/attribution" 20 "storj.io/storj/satellite/console" 21) 22 23// MaxUserAgentLength is the maximum allowable length of the User Agent. 24const MaxUserAgentLength = 500 25 26// ensureAttribution ensures that the bucketName has the partner information specified by keyInfo partner ID or the header user agent. 27// PartnerID from keyInfo is a value associated with registered user and prevails over header user agent. 28// 29// Assumes that the user has permissions sufficient for authenticating. 30func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, keyInfo *console.APIKeyInfo, bucketName []byte) error { 31 if header == nil { 32 return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil") 33 } 34 if len(header.UserAgent) == 0 && keyInfo.PartnerID.IsZero() && keyInfo.UserAgent == nil { 35 return nil 36 } 37 38 if conncache := drpccache.FromContext(ctx); conncache != nil { 39 cache := conncache.LoadOrCreate(attributionCheckCacheKey{}, 40 func() interface{} { 41 return &attributionCheckCache{} 42 }).(*attributionCheckCache) 43 if !cache.needsCheck(string(bucketName)) { 44 return nil 45 } 46 } 47 48 partnerID := keyInfo.PartnerID 49 userAgent := keyInfo.UserAgent 50 // first check keyInfo (user) attribution 51 if partnerID.IsZero() && userAgent == nil { 52 // otherwise, use header (partner tool) as attribution 53 userAgent = header.UserAgent 54 if userAgent == nil { 55 return nil 56 } 57 } 58 59 userAgent, err := TrimUserAgent(userAgent) 60 if err != nil { 61 return err 62 } 63 64 err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID, userAgent) 65 if errs2.IsRPC(err, rpcstatus.NotFound) || errs2.IsRPC(err, rpcstatus.AlreadyExists) { 66 return nil 67 } 68 return err 69} 70 71// TrimUserAgent returns userAgentBytes that consist of only the product portion of the user agent, and is bounded by 72// the maxUserAgentLength. 73func TrimUserAgent(userAgent []byte) ([]byte, error) { 74 if len(userAgent) == 0 { 75 return userAgent, nil 76 } 77 userAgentEntries, err := useragent.ParseEntries(userAgent) 78 if err != nil { 79 return userAgent, Error.New("error while parsing user agent: %w", err) 80 } 81 // strip comments, libraries, and empty products from the user agent 82 newEntries := userAgentEntries[:0] 83 for _, e := range userAgentEntries { 84 switch product := e.Product; product { 85 case "uplink", "common", "drpc", "": 86 default: 87 e.Comment = "" 88 newEntries = append(newEntries, e) 89 } 90 } 91 userAgent, err = useragent.EncodeEntries(newEntries) 92 if err != nil { 93 return userAgent, Error.New("error while encoding user agent entries: %w", err) 94 } 95 96 // bound the user agent length 97 if len(userAgent) > MaxUserAgentLength && len(newEntries) > 0 { 98 // try to preserve the first entry 99 if (len(newEntries[0].Product) + len(newEntries[0].Version)) <= MaxUserAgentLength { 100 userAgent, err = useragent.EncodeEntries(newEntries[:1]) 101 if err != nil { 102 return userAgent, Error.New("error while encoding first user agent entry: %w", err) 103 } 104 } else { 105 // first entry is too large, truncate 106 userAgent = userAgent[:MaxUserAgentLength] 107 } 108 } 109 return userAgent, nil 110} 111 112func (endpoint *Endpoint) tryUpdateBucketAttribution(ctx context.Context, header *pb.RequestHeader, projectID uuid.UUID, bucketName []byte, partnerID uuid.UUID, userAgent []byte) error { 113 if header == nil { 114 return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil") 115 } 116 117 // check if attribution is set for given bucket 118 _, err := endpoint.attributions.Get(ctx, projectID, bucketName) 119 if err == nil { 120 // bucket has already an attribution, no need to update 121 return nil 122 } 123 if !attribution.ErrBucketNotAttributed.Has(err) { 124 // try only to set the attribution, when it's missing 125 endpoint.log.Error("error while getting attribution from DB", zap.Error(err)) 126 return rpcstatus.Error(rpcstatus.Internal, err.Error()) 127 } 128 129 empty, err := endpoint.isBucketEmpty(ctx, projectID, bucketName) 130 if err != nil { 131 endpoint.log.Error("internal", zap.Error(err)) 132 return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) 133 } 134 if !empty { 135 return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q is not empty, PartnerID %q cannot be attributed", bucketName, partnerID) 136 } 137 138 // checks if bucket exists before updates it or makes a new entry 139 bucket, err := endpoint.buckets.GetBucket(ctx, bucketName, projectID) 140 if err != nil { 141 if storj.ErrBucketNotFound.Has(err) { 142 return rpcstatus.Errorf(rpcstatus.NotFound, "bucket %q does not exist", bucketName) 143 } 144 endpoint.log.Error("error while getting bucket", zap.ByteString("bucketName", bucketName), zap.Error(err)) 145 return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution") 146 } 147 if !bucket.PartnerID.IsZero() || bucket.UserAgent != nil { 148 return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q already has attribution, PartnerID %q cannot be attributed", bucketName, partnerID) 149 } 150 151 // update bucket information 152 bucket.PartnerID = partnerID 153 bucket.UserAgent = userAgent 154 _, err = endpoint.buckets.UpdateBucket(ctx, bucket) 155 if err != nil { 156 endpoint.log.Error("error while updating bucket", zap.ByteString("bucketName", bucketName), zap.Error(err)) 157 return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution") 158 } 159 160 // update attribution table 161 _, err = endpoint.attributions.Insert(ctx, &attribution.Info{ 162 ProjectID: projectID, 163 BucketName: bucketName, 164 PartnerID: partnerID, 165 UserAgent: userAgent, 166 }) 167 if err != nil { 168 endpoint.log.Error("error while inserting attribution to DB", zap.Error(err)) 169 return rpcstatus.Error(rpcstatus.Internal, err.Error()) 170 } 171 172 return nil 173} 174 175// maxAttributionCacheSize determines how many buckets attributionCheckCache remembers. 176const maxAttributionCacheSize = 10 177 178// attributionCheckCacheKey is used as a key for the connection cache. 179type attributionCheckCacheKey struct{} 180 181// attributionCheckCache implements a basic lru cache, with a constant size. 182type attributionCheckCache struct { 183 mu sync.Mutex 184 pos int 185 buckets []string 186} 187 188// needsCheck returns true when the bucket should be tested for setting the useragent. 189func (cache *attributionCheckCache) needsCheck(bucket string) bool { 190 cache.mu.Lock() 191 defer cache.mu.Unlock() 192 193 for _, b := range cache.buckets { 194 if b == bucket { 195 return false 196 } 197 } 198 199 if len(cache.buckets) >= maxAttributionCacheSize { 200 cache.pos = (cache.pos + 1) % len(cache.buckets) 201 cache.buckets[cache.pos] = bucket 202 } else { 203 cache.pos = len(cache.buckets) 204 cache.buckets = append(cache.buckets, bucket) 205 } 206 207 return true 208} 209