1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17/* 18Copyright 2016 The Kubernetes Authors. 19 20Licensed under the Apache License, Version 2.0 (the "License"); 21you may not use this file except in compliance with the License. 22You may obtain a copy of the License at 23 24 http://www.apache.org/licenses/LICENSE-2.0 25 26Unless required by applicable law or agreed to in writing, software 27distributed under the License is distributed on an "AS IS" BASIS, 28WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 29See the License for the specific language governing permissions and 30limitations under the License. 31*/ 32 33package streaming 34 35import ( 36 "container/list" 37 "crypto/rand" 38 "encoding/base64" 39 "fmt" 40 "math" 41 "sync" 42 "time" 43 44 "k8s.io/apimachinery/pkg/util/clock" 45) 46 47var ( 48 // cacheTTL is the timeout after which tokens become invalid. 49 cacheTTL = 1 * time.Minute 50 // maxInFlight is the maximum number of in-flight requests to allow. 51 maxInFlight = 1000 52 // tokenLen is the length of the random base64 encoded token identifying the request. 53 tokenLen = 8 54) 55 56// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use 57// random token for their retrieval. The requestCache is used for building streaming URLs without 58// the need to encode every request parameter in the URL. 59type requestCache struct { 60 // clock is used to obtain the current time 61 clock clock.Clock 62 63 // tokens maps the generate token to the request for fast retrieval. 64 tokens map[string]*list.Element 65 // ll maintains an age-ordered request list for faster garbage collection of expired requests. 66 ll *list.List 67 68 lock sync.Mutex 69} 70 71// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest. 72type request interface{} 73 74type cacheEntry struct { 75 token string 76 req request 77 expireTime time.Time 78} 79 80func newRequestCache() *requestCache { 81 return &requestCache{ 82 clock: clock.RealClock{}, 83 ll: list.New(), 84 tokens: make(map[string]*list.Element), 85 } 86} 87 88// Insert the given request into the cache and returns the token used for fetching it out. 89func (c *requestCache) Insert(req request) (token string, err error) { 90 c.lock.Lock() 91 defer c.lock.Unlock() 92 93 // Remove expired entries. 94 c.gc() 95 // If the cache is full, reject the request. 96 if c.ll.Len() == maxInFlight { 97 return "", NewErrorTooManyInFlight() 98 } 99 token, err = c.uniqueToken() 100 if err != nil { 101 return "", err 102 } 103 ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)}) 104 105 c.tokens[token] = ele 106 return token, nil 107} 108 109// Consume the token (remove it from the cache) and return the cached request, if found. 110func (c *requestCache) Consume(token string) (req request, found bool) { 111 c.lock.Lock() 112 defer c.lock.Unlock() 113 ele, ok := c.tokens[token] 114 if !ok { 115 return nil, false 116 } 117 c.ll.Remove(ele) 118 delete(c.tokens, token) 119 120 entry := ele.Value.(*cacheEntry) 121 if c.clock.Now().After(entry.expireTime) { 122 // Entry already expired. 123 return nil, false 124 } 125 return entry.req, true 126} 127 128// uniqueToken generates a random URL-safe token and ensures uniqueness. 129func (c *requestCache) uniqueToken() (string, error) { 130 const maxTries = 10 131 // Number of bytes to be tokenLen when base64 encoded. 132 tokenSize := math.Ceil(float64(tokenLen) * 6 / 8) 133 rawToken := make([]byte, int(tokenSize)) 134 for i := 0; i < maxTries; i++ { 135 if _, err := rand.Read(rawToken); err != nil { 136 return "", err 137 } 138 encoded := base64.RawURLEncoding.EncodeToString(rawToken) 139 token := encoded[:tokenLen] 140 // If it's unique, return it. Otherwise retry. 141 if _, exists := c.tokens[encoded]; !exists { 142 return token, nil 143 } 144 } 145 return "", fmt.Errorf("failed to generate unique token") 146} 147 148// Must be write-locked prior to calling. 149func (c *requestCache) gc() { 150 now := c.clock.Now() 151 for c.ll.Len() > 0 { 152 oldest := c.ll.Back() 153 entry := oldest.Value.(*cacheEntry) 154 if !now.After(entry.expireTime) { 155 return 156 } 157 158 // Oldest value is expired; remove it. 159 c.ll.Remove(oldest) 160 delete(c.tokens, entry.token) 161 } 162} 163