1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17/*
18Copyright 2016 The Kubernetes Authors.
19
20Licensed under the Apache License, Version 2.0 (the "License");
21you may not use this file except in compliance with the License.
22You may obtain a copy of the License at
23
24    http://www.apache.org/licenses/LICENSE-2.0
25
26Unless required by applicable law or agreed to in writing, software
27distributed under the License is distributed on an "AS IS" BASIS,
28WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29See the License for the specific language governing permissions and
30limitations under the License.
31*/
32
33package streaming
34
35import (
36	"container/list"
37	"crypto/rand"
38	"encoding/base64"
39	"fmt"
40	"math"
41	"sync"
42	"time"
43
44	"k8s.io/apimachinery/pkg/util/clock"
45)
46
47var (
48	// cacheTTL is the timeout after which tokens become invalid.
49	cacheTTL = 1 * time.Minute
50	// maxInFlight is the maximum number of in-flight requests to allow.
51	maxInFlight = 1000
52	// tokenLen is the length of the random base64 encoded token identifying the request.
53	tokenLen = 8
54)
55
56// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
57// random token for their retrieval. The requestCache is used for building streaming URLs without
58// the need to encode every request parameter in the URL.
59type requestCache struct {
60	// clock is used to obtain the current time
61	clock clock.Clock
62
63	// tokens maps the generate token to the request for fast retrieval.
64	tokens map[string]*list.Element
65	// ll maintains an age-ordered request list for faster garbage collection of expired requests.
66	ll *list.List
67
68	lock sync.Mutex
69}
70
71// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
72type request interface{}
73
74type cacheEntry struct {
75	token      string
76	req        request
77	expireTime time.Time
78}
79
80func newRequestCache() *requestCache {
81	return &requestCache{
82		clock:  clock.RealClock{},
83		ll:     list.New(),
84		tokens: make(map[string]*list.Element),
85	}
86}
87
88// Insert the given request into the cache and returns the token used for fetching it out.
89func (c *requestCache) Insert(req request) (token string, err error) {
90	c.lock.Lock()
91	defer c.lock.Unlock()
92
93	// Remove expired entries.
94	c.gc()
95	// If the cache is full, reject the request.
96	if c.ll.Len() == maxInFlight {
97		return "", NewErrorTooManyInFlight()
98	}
99	token, err = c.uniqueToken()
100	if err != nil {
101		return "", err
102	}
103	ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)})
104
105	c.tokens[token] = ele
106	return token, nil
107}
108
109// Consume the token (remove it from the cache) and return the cached request, if found.
110func (c *requestCache) Consume(token string) (req request, found bool) {
111	c.lock.Lock()
112	defer c.lock.Unlock()
113	ele, ok := c.tokens[token]
114	if !ok {
115		return nil, false
116	}
117	c.ll.Remove(ele)
118	delete(c.tokens, token)
119
120	entry := ele.Value.(*cacheEntry)
121	if c.clock.Now().After(entry.expireTime) {
122		// Entry already expired.
123		return nil, false
124	}
125	return entry.req, true
126}
127
128// uniqueToken generates a random URL-safe token and ensures uniqueness.
129func (c *requestCache) uniqueToken() (string, error) {
130	const maxTries = 10
131	// Number of bytes to be tokenLen when base64 encoded.
132	tokenSize := math.Ceil(float64(tokenLen) * 6 / 8)
133	rawToken := make([]byte, int(tokenSize))
134	for i := 0; i < maxTries; i++ {
135		if _, err := rand.Read(rawToken); err != nil {
136			return "", err
137		}
138		encoded := base64.RawURLEncoding.EncodeToString(rawToken)
139		token := encoded[:tokenLen]
140		// If it's unique, return it. Otherwise retry.
141		if _, exists := c.tokens[encoded]; !exists {
142			return token, nil
143		}
144	}
145	return "", fmt.Errorf("failed to generate unique token")
146}
147
148// Must be write-locked prior to calling.
149func (c *requestCache) gc() {
150	now := c.clock.Now()
151	for c.ll.Len() > 0 {
152		oldest := c.ll.Back()
153		entry := oldest.Value.(*cacheEntry)
154		if !now.After(entry.expireTime) {
155			return
156		}
157
158		// Oldest value is expired; remove it.
159		c.ll.Remove(oldest)
160		delete(c.tokens, entry.token)
161	}
162}
163