1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20	"bytes"
21	"context"
22	"crypto/tls"
23	"crypto/x509"
24	"errors"
25	"fmt"
26	"io"
27	"net"
28	"net/http"
29	"os"
30	"os/exec"
31	"reflect"
32	"strings"
33	"sync"
34	"time"
35
36	"github.com/davecgh/go-spew/spew"
37	"golang.org/x/crypto/ssh/terminal"
38	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39	"k8s.io/apimachinery/pkg/runtime"
40	"k8s.io/apimachinery/pkg/runtime/schema"
41	"k8s.io/apimachinery/pkg/runtime/serializer"
42	"k8s.io/apimachinery/pkg/util/clock"
43	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
44	"k8s.io/client-go/pkg/apis/clientauthentication"
45	"k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
46	"k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
47	"k8s.io/client-go/tools/clientcmd/api"
48	"k8s.io/client-go/tools/metrics"
49	"k8s.io/client-go/transport"
50	"k8s.io/client-go/util/connrotation"
51	"k8s.io/klog/v2"
52)
53
54const execInfoEnv = "KUBERNETES_EXEC_INFO"
55const onRotateListWarningLength = 1000
56const installHintVerboseHelp = `
57
58It looks like you are trying to use a client-go credential plugin that is not installed.
59
60To learn more about this feature, consult the documentation available at:
61      https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
62
63var scheme = runtime.NewScheme()
64var codecs = serializer.NewCodecFactory(scheme)
65
66func init() {
67	v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
68	utilruntime.Must(v1alpha1.AddToScheme(scheme))
69	utilruntime.Must(v1beta1.AddToScheme(scheme))
70	utilruntime.Must(clientauthentication.AddToScheme(scheme))
71}
72
73var (
74	// Since transports can be constantly re-initialized by programs like kubectl,
75	// keep a cache of initialized authenticators keyed by a hash of their config.
76	globalCache = newCache()
77	// The list of API versions we accept.
78	apiVersions = map[string]schema.GroupVersion{
79		v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
80		v1beta1.SchemeGroupVersion.String():  v1beta1.SchemeGroupVersion,
81	}
82)
83
84func newCache() *cache {
85	return &cache{m: make(map[string]*Authenticator)}
86}
87
88var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
89
90func cacheKey(c *api.ExecConfig) string {
91	return spewConfig.Sprint(c)
92}
93
94type cache struct {
95	mu sync.Mutex
96	m  map[string]*Authenticator
97}
98
99func (c *cache) get(s string) (*Authenticator, bool) {
100	c.mu.Lock()
101	defer c.mu.Unlock()
102	a, ok := c.m[s]
103	return a, ok
104}
105
106// put inserts an authenticator into the cache. If an authenticator is already
107// associated with the key, the first one is returned instead.
108func (c *cache) put(s string, a *Authenticator) *Authenticator {
109	c.mu.Lock()
110	defer c.mu.Unlock()
111	existing, ok := c.m[s]
112	if ok {
113		return existing
114	}
115	c.m[s] = a
116	return a
117}
118
119// sometimes rate limits how often a function f() is called. Specifically, Do()
120// will run the provided function f() up to threshold times every interval
121// duration.
122type sometimes struct {
123	threshold int
124	interval  time.Duration
125
126	clock clock.Clock
127	mu    sync.Mutex
128
129	count  int       // times we have called f() in this window
130	window time.Time // beginning of current window of length interval
131}
132
133func (s *sometimes) Do(f func()) {
134	s.mu.Lock()
135	defer s.mu.Unlock()
136
137	now := s.clock.Now()
138	if s.window.IsZero() {
139		s.window = now
140	}
141
142	// If we are no longer in our saved time window, then we get to reset our run
143	// count back to 0 and start increasing towards the threshold again.
144	if inWindow := now.Sub(s.window) < s.interval; !inWindow {
145		s.window = now
146		s.count = 0
147	}
148
149	// If we have not run the function more than threshold times in this current
150	// time window, we get to run it now!
151	if underThreshold := s.count < s.threshold; underThreshold {
152		s.count++
153		f()
154	}
155}
156
157// GetAuthenticator returns an exec-based plugin for providing client credentials.
158func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
159	return newAuthenticator(globalCache, config)
160}
161
162func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
163	key := cacheKey(config)
164	if a, ok := c.get(key); ok {
165		return a, nil
166	}
167
168	gv, ok := apiVersions[config.APIVersion]
169	if !ok {
170		return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
171	}
172
173	a := &Authenticator{
174		cmd:   config.Command,
175		args:  config.Args,
176		group: gv,
177
178		installHint: config.InstallHint,
179		sometimes: &sometimes{
180			threshold: 10,
181			interval:  time.Hour,
182			clock:     clock.RealClock{},
183		},
184
185		stdin:       os.Stdin,
186		stderr:      os.Stderr,
187		interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
188		now:         time.Now,
189		environ:     os.Environ,
190	}
191
192	for _, env := range config.Env {
193		a.env = append(a.env, env.Name+"="+env.Value)
194	}
195
196	return c.put(key, a), nil
197}
198
199// Authenticator is a client credential provider that rotates credentials by executing a plugin.
200// The plugin input and output are defined by the API group client.authentication.k8s.io.
201type Authenticator struct {
202	// Set by the config
203	cmd   string
204	args  []string
205	group schema.GroupVersion
206	env   []string
207
208	// Used to avoid log spew by rate limiting install hint printing. We didn't do
209	// this by interval based rate limiting alone since that way may have prevented
210	// the install hint from showing up for kubectl users.
211	sometimes   *sometimes
212	installHint string
213
214	// Stubbable for testing
215	stdin       io.Reader
216	stderr      io.Writer
217	interactive bool
218	now         func() time.Time
219	environ     func() []string
220
221	// Cached results.
222	//
223	// The mutex also guards calling the plugin. Since the plugin could be
224	// interactive we want to make sure it's only called once.
225	mu          sync.Mutex
226	cachedCreds *credentials
227	exp         time.Time
228
229	onRotateList []func()
230}
231
232type credentials struct {
233	token string
234	cert  *tls.Certificate
235}
236
237// UpdateTransportConfig updates the transport.Config to use credentials
238// returned by the plugin.
239func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
240	// If a bearer token is present in the request - avoid the GetCert callback when
241	// setting up the transport, as that triggers the exec action if the server is
242	// also configured to allow client certificates for authentication. For requests
243	// like "kubectl get --token (token) pods" we should assume the intention is to
244	// use the provided token for authentication.
245	if c.HasTokenAuth() {
246		return nil
247	}
248
249	c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
250		return &roundTripper{a, rt}
251	})
252
253	if c.TLS.GetCert != nil {
254		return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
255	}
256	c.TLS.GetCert = a.cert
257
258	var dial func(ctx context.Context, network, addr string) (net.Conn, error)
259	if c.Dial != nil {
260		dial = c.Dial
261	} else {
262		dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
263	}
264	d := connrotation.NewDialer(dial)
265
266	a.mu.Lock()
267	defer a.mu.Unlock()
268	a.onRotateList = append(a.onRotateList, d.CloseAll)
269	onRotateListLength := len(a.onRotateList)
270	if onRotateListLength > onRotateListWarningLength {
271		klog.Warningf("constructing many client instances from the same exec auth config can cause performance problems during cert rotation and can exhaust available network connections; %d clients constructed calling %q", onRotateListLength, a.cmd)
272	}
273
274	c.Dial = d.DialContext
275
276	return nil
277}
278
279type roundTripper struct {
280	a    *Authenticator
281	base http.RoundTripper
282}
283
284func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
285	// If a user has already set credentials, use that. This makes commands like
286	// "kubectl get --token (token) pods" work.
287	if req.Header.Get("Authorization") != "" {
288		return r.base.RoundTrip(req)
289	}
290
291	creds, err := r.a.getCreds()
292	if err != nil {
293		return nil, fmt.Errorf("getting credentials: %v", err)
294	}
295	if creds.token != "" {
296		req.Header.Set("Authorization", "Bearer "+creds.token)
297	}
298
299	res, err := r.base.RoundTrip(req)
300	if err != nil {
301		return nil, err
302	}
303	if res.StatusCode == http.StatusUnauthorized {
304		resp := &clientauthentication.Response{
305			Header: res.Header,
306			Code:   int32(res.StatusCode),
307		}
308		if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
309			klog.Errorf("refreshing credentials: %v", err)
310		}
311	}
312	return res, nil
313}
314
315func (a *Authenticator) credsExpired() bool {
316	if a.exp.IsZero() {
317		return false
318	}
319	return a.now().After(a.exp)
320}
321
322func (a *Authenticator) cert() (*tls.Certificate, error) {
323	creds, err := a.getCreds()
324	if err != nil {
325		return nil, err
326	}
327	return creds.cert, nil
328}
329
330func (a *Authenticator) getCreds() (*credentials, error) {
331	a.mu.Lock()
332	defer a.mu.Unlock()
333
334	if a.cachedCreds != nil && !a.credsExpired() {
335		return a.cachedCreds, nil
336	}
337
338	if err := a.refreshCredsLocked(nil); err != nil {
339		return nil, err
340	}
341
342	return a.cachedCreds, nil
343}
344
345// maybeRefreshCreds executes the plugin to force a rotation of the
346// credentials, unless they were rotated already.
347func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
348	a.mu.Lock()
349	defer a.mu.Unlock()
350
351	// Since we're not making a new pointer to a.cachedCreds in getCreds, no
352	// need to do deep comparison.
353	if creds != a.cachedCreds {
354		// Credentials already rotated.
355		return nil
356	}
357
358	return a.refreshCredsLocked(r)
359}
360
361// refreshCredsLocked executes the plugin and reads the credentials from
362// stdout. It must be called while holding the Authenticator's mutex.
363func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
364	cred := &clientauthentication.ExecCredential{
365		Spec: clientauthentication.ExecCredentialSpec{
366			Response:    r,
367			Interactive: a.interactive,
368		},
369	}
370
371	env := append(a.environ(), a.env...)
372	if a.group == v1alpha1.SchemeGroupVersion {
373		// Input spec disabled for beta due to lack of use. Possibly re-enable this later if
374		// someone wants it back.
375		//
376		// See: https://github.com/kubernetes/kubernetes/issues/61796
377		data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
378		if err != nil {
379			return fmt.Errorf("encode ExecCredentials: %v", err)
380		}
381		env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
382	}
383
384	stdout := &bytes.Buffer{}
385	cmd := exec.Command(a.cmd, a.args...)
386	cmd.Env = env
387	cmd.Stderr = a.stderr
388	cmd.Stdout = stdout
389	if a.interactive {
390		cmd.Stdin = a.stdin
391	}
392
393	if err := cmd.Run(); err != nil {
394		return a.wrapCmdRunErrorLocked(err)
395	}
396
397	_, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
398	if err != nil {
399		return fmt.Errorf("decoding stdout: %v", err)
400	}
401	if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
402		return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
403			a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
404	}
405
406	if cred.Status == nil {
407		return fmt.Errorf("exec plugin didn't return a status field")
408	}
409	if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
410		return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
411	}
412	if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
413		return fmt.Errorf("exec plugin returned only certificate or key, not both")
414	}
415
416	if cred.Status.ExpirationTimestamp != nil {
417		a.exp = cred.Status.ExpirationTimestamp.Time
418	} else {
419		a.exp = time.Time{}
420	}
421
422	newCreds := &credentials{
423		token: cred.Status.Token,
424	}
425	if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
426		cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
427		if err != nil {
428			return fmt.Errorf("failed parsing client key/certificate: %v", err)
429		}
430
431		// Leaf is initialized to be nil:
432		//  https://golang.org/pkg/crypto/tls/#X509KeyPair
433		// Leaf certificate is the first certificate:
434		//  https://golang.org/pkg/crypto/tls/#Certificate
435		// Populating leaf is useful for quickly accessing the underlying x509
436		// certificate values.
437		cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
438		if err != nil {
439			return fmt.Errorf("failed parsing client leaf certificate: %v", err)
440		}
441		newCreds.cert = &cert
442	}
443
444	oldCreds := a.cachedCreds
445	a.cachedCreds = newCreds
446	// Only close all connections when TLS cert rotates. Token rotation doesn't
447	// need the extra noise.
448	if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
449		// Can be nil if the exec auth plugin only returned token auth.
450		if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
451			metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
452		}
453		for _, onRotate := range a.onRotateList {
454			onRotate()
455		}
456	}
457
458	expiry := time.Time{}
459	if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
460		expiry = a.cachedCreds.cert.Leaf.NotAfter
461	}
462	expirationMetrics.set(a, expiry)
463	return nil
464}
465
466// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
467// for when the exec plugin's binary fails to Run().
468//
469// It must be called while holding the Authenticator's mutex.
470func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
471	switch err.(type) {
472	case *exec.Error: // Binary does not exist (see exec.Error).
473		builder := strings.Builder{}
474		fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
475
476		a.sometimes.Do(func() {
477			fmt.Fprint(&builder, installHintVerboseHelp)
478			if a.installHint != "" {
479				fmt.Fprintf(&builder, "\n\n%s", a.installHint)
480			}
481		})
482
483		return errors.New(builder.String())
484
485	case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
486		e := err.(*exec.ExitError)
487		return fmt.Errorf(
488			"exec: executable %s failed with exit code %d",
489			a.cmd,
490			e.ProcessState.ExitCode(),
491		)
492
493	default:
494		return fmt.Errorf("exec: %v", err)
495	}
496}
497