1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20	"bytes"
21	"crypto/tls"
22	"crypto/x509"
23	"errors"
24	"fmt"
25	"io"
26	"net"
27	"net/http"
28	"os"
29	"os/exec"
30	"reflect"
31	"strings"
32	"sync"
33	"time"
34
35	"github.com/davecgh/go-spew/spew"
36	"golang.org/x/term"
37
38	"k8s.io/apimachinery/pkg/runtime"
39	"k8s.io/apimachinery/pkg/runtime/schema"
40	"k8s.io/apimachinery/pkg/runtime/serializer"
41	"k8s.io/apimachinery/pkg/util/clock"
42	"k8s.io/client-go/pkg/apis/clientauthentication"
43	"k8s.io/client-go/pkg/apis/clientauthentication/install"
44	clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1"
45	clientauthenticationv1alpha1 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
46	clientauthenticationv1beta1 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
47	"k8s.io/client-go/tools/clientcmd/api"
48	"k8s.io/client-go/tools/metrics"
49	"k8s.io/client-go/transport"
50	"k8s.io/client-go/util/connrotation"
51	"k8s.io/klog/v2"
52)
53
54const execInfoEnv = "KUBERNETES_EXEC_INFO"
55const installHintVerboseHelp = `
56
57It looks like you are trying to use a client-go credential plugin that is not installed.
58
59To learn more about this feature, consult the documentation available at:
60      https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
61
62var scheme = runtime.NewScheme()
63var codecs = serializer.NewCodecFactory(scheme)
64
65func init() {
66	install.Install(scheme)
67}
68
69var (
70	// Since transports can be constantly re-initialized by programs like kubectl,
71	// keep a cache of initialized authenticators keyed by a hash of their config.
72	globalCache = newCache()
73	// The list of API versions we accept.
74	apiVersions = map[string]schema.GroupVersion{
75		clientauthenticationv1alpha1.SchemeGroupVersion.String(): clientauthenticationv1alpha1.SchemeGroupVersion,
76		clientauthenticationv1beta1.SchemeGroupVersion.String():  clientauthenticationv1beta1.SchemeGroupVersion,
77		clientauthenticationv1.SchemeGroupVersion.String():       clientauthenticationv1.SchemeGroupVersion,
78	}
79)
80
81func newCache() *cache {
82	return &cache{m: make(map[string]*Authenticator)}
83}
84
85var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
86
87func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
88	key := struct {
89		conf    *api.ExecConfig
90		cluster *clientauthentication.Cluster
91	}{
92		conf:    conf,
93		cluster: cluster,
94	}
95	return spewConfig.Sprint(key)
96}
97
98type cache struct {
99	mu sync.Mutex
100	m  map[string]*Authenticator
101}
102
103func (c *cache) get(s string) (*Authenticator, bool) {
104	c.mu.Lock()
105	defer c.mu.Unlock()
106	a, ok := c.m[s]
107	return a, ok
108}
109
110// put inserts an authenticator into the cache. If an authenticator is already
111// associated with the key, the first one is returned instead.
112func (c *cache) put(s string, a *Authenticator) *Authenticator {
113	c.mu.Lock()
114	defer c.mu.Unlock()
115	existing, ok := c.m[s]
116	if ok {
117		return existing
118	}
119	c.m[s] = a
120	return a
121}
122
123// sometimes rate limits how often a function f() is called. Specifically, Do()
124// will run the provided function f() up to threshold times every interval
125// duration.
126type sometimes struct {
127	threshold int
128	interval  time.Duration
129
130	clock clock.Clock
131	mu    sync.Mutex
132
133	count  int       // times we have called f() in this window
134	window time.Time // beginning of current window of length interval
135}
136
137func (s *sometimes) Do(f func()) {
138	s.mu.Lock()
139	defer s.mu.Unlock()
140
141	now := s.clock.Now()
142	if s.window.IsZero() {
143		s.window = now
144	}
145
146	// If we are no longer in our saved time window, then we get to reset our run
147	// count back to 0 and start increasing towards the threshold again.
148	if inWindow := now.Sub(s.window) < s.interval; !inWindow {
149		s.window = now
150		s.count = 0
151	}
152
153	// If we have not run the function more than threshold times in this current
154	// time window, we get to run it now!
155	if underThreshold := s.count < s.threshold; underThreshold {
156		s.count++
157		f()
158	}
159}
160
161// GetAuthenticator returns an exec-based plugin for providing client credentials.
162func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
163	return newAuthenticator(globalCache, term.IsTerminal, config, cluster)
164}
165
166func newAuthenticator(c *cache, isTerminalFunc func(int) bool, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
167	key := cacheKey(config, cluster)
168	if a, ok := c.get(key); ok {
169		return a, nil
170	}
171
172	gv, ok := apiVersions[config.APIVersion]
173	if !ok {
174		return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
175	}
176
177	connTracker := connrotation.NewConnectionTracker()
178	defaultDialer := connrotation.NewDialerWithTracker(
179		(&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
180		connTracker,
181	)
182
183	a := &Authenticator{
184		cmd:                config.Command,
185		args:               config.Args,
186		group:              gv,
187		cluster:            cluster,
188		provideClusterInfo: config.ProvideClusterInfo,
189
190		installHint: config.InstallHint,
191		sometimes: &sometimes{
192			threshold: 10,
193			interval:  time.Hour,
194			clock:     clock.RealClock{},
195		},
196
197		stdin:           os.Stdin,
198		stderr:          os.Stderr,
199		interactiveFunc: func() (bool, error) { return isInteractive(isTerminalFunc, config) },
200		now:             time.Now,
201		environ:         os.Environ,
202
203		defaultDialer: defaultDialer,
204		connTracker:   connTracker,
205	}
206
207	for _, env := range config.Env {
208		a.env = append(a.env, env.Name+"="+env.Value)
209	}
210
211	return c.put(key, a), nil
212}
213
214func isInteractive(isTerminalFunc func(int) bool, config *api.ExecConfig) (bool, error) {
215	var shouldBeInteractive bool
216	switch config.InteractiveMode {
217	case api.NeverExecInteractiveMode:
218		shouldBeInteractive = false
219	case api.IfAvailableExecInteractiveMode:
220		shouldBeInteractive = !config.StdinUnavailable && isTerminalFunc(int(os.Stdin.Fd()))
221	case api.AlwaysExecInteractiveMode:
222		if !isTerminalFunc(int(os.Stdin.Fd())) {
223			return false, errors.New("standard input is not a terminal")
224		}
225		if config.StdinUnavailable {
226			suffix := ""
227			if len(config.StdinUnavailableMessage) > 0 {
228				// only print extra ": <message>" if the user actually specified a message
229				suffix = fmt.Sprintf(": %s", config.StdinUnavailableMessage)
230			}
231			return false, fmt.Errorf("standard input is unavailable%s", suffix)
232		}
233		shouldBeInteractive = true
234	default:
235		return false, fmt.Errorf("unknown interactiveMode: %q", config.InteractiveMode)
236	}
237
238	return shouldBeInteractive, nil
239}
240
241// Authenticator is a client credential provider that rotates credentials by executing a plugin.
242// The plugin input and output are defined by the API group client.authentication.k8s.io.
243type Authenticator struct {
244	// Set by the config
245	cmd                string
246	args               []string
247	group              schema.GroupVersion
248	env                []string
249	cluster            *clientauthentication.Cluster
250	provideClusterInfo bool
251
252	// Used to avoid log spew by rate limiting install hint printing. We didn't do
253	// this by interval based rate limiting alone since that way may have prevented
254	// the install hint from showing up for kubectl users.
255	sometimes   *sometimes
256	installHint string
257
258	// Stubbable for testing
259	stdin           io.Reader
260	stderr          io.Writer
261	interactiveFunc func() (bool, error)
262	now             func() time.Time
263	environ         func() []string
264
265	// defaultDialer is used for clients which don't specify a custom dialer
266	defaultDialer *connrotation.Dialer
267	// connTracker tracks all connections opened that we need to close when rotating a client certificate
268	connTracker *connrotation.ConnectionTracker
269
270	// Cached results.
271	//
272	// The mutex also guards calling the plugin. Since the plugin could be
273	// interactive we want to make sure it's only called once.
274	mu          sync.Mutex
275	cachedCreds *credentials
276	exp         time.Time
277}
278
279type credentials struct {
280	token string           `datapolicy:"token"`
281	cert  *tls.Certificate `datapolicy:"secret-key"`
282}
283
284// UpdateTransportConfig updates the transport.Config to use credentials
285// returned by the plugin.
286func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
287	// If a bearer token is present in the request - avoid the GetCert callback when
288	// setting up the transport, as that triggers the exec action if the server is
289	// also configured to allow client certificates for authentication. For requests
290	// like "kubectl get --token (token) pods" we should assume the intention is to
291	// use the provided token for authentication. The same can be said for when the
292	// user specifies basic auth.
293	if c.HasTokenAuth() || c.HasBasicAuth() {
294		return nil
295	}
296
297	c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
298		return &roundTripper{a, rt}
299	})
300
301	if c.TLS.GetCert != nil {
302		return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
303	}
304	c.TLS.GetCert = a.cert
305
306	var d *connrotation.Dialer
307	if c.Dial != nil {
308		// if c has a custom dialer, we have to wrap it
309		d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
310	} else {
311		d = a.defaultDialer
312	}
313
314	c.Dial = d.DialContext
315
316	return nil
317}
318
319type roundTripper struct {
320	a    *Authenticator
321	base http.RoundTripper
322}
323
324func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
325	// If a user has already set credentials, use that. This makes commands like
326	// "kubectl get --token (token) pods" work.
327	if req.Header.Get("Authorization") != "" {
328		return r.base.RoundTrip(req)
329	}
330
331	creds, err := r.a.getCreds()
332	if err != nil {
333		return nil, fmt.Errorf("getting credentials: %v", err)
334	}
335	if creds.token != "" {
336		req.Header.Set("Authorization", "Bearer "+creds.token)
337	}
338
339	res, err := r.base.RoundTrip(req)
340	if err != nil {
341		return nil, err
342	}
343	if res.StatusCode == http.StatusUnauthorized {
344		resp := &clientauthentication.Response{
345			Header: res.Header,
346			Code:   int32(res.StatusCode),
347		}
348		if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
349			klog.Errorf("refreshing credentials: %v", err)
350		}
351	}
352	return res, nil
353}
354
355func (a *Authenticator) credsExpired() bool {
356	if a.exp.IsZero() {
357		return false
358	}
359	return a.now().After(a.exp)
360}
361
362func (a *Authenticator) cert() (*tls.Certificate, error) {
363	creds, err := a.getCreds()
364	if err != nil {
365		return nil, err
366	}
367	return creds.cert, nil
368}
369
370func (a *Authenticator) getCreds() (*credentials, error) {
371	a.mu.Lock()
372	defer a.mu.Unlock()
373
374	if a.cachedCreds != nil && !a.credsExpired() {
375		return a.cachedCreds, nil
376	}
377
378	if err := a.refreshCredsLocked(nil); err != nil {
379		return nil, err
380	}
381
382	return a.cachedCreds, nil
383}
384
385// maybeRefreshCreds executes the plugin to force a rotation of the
386// credentials, unless they were rotated already.
387func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
388	a.mu.Lock()
389	defer a.mu.Unlock()
390
391	// Since we're not making a new pointer to a.cachedCreds in getCreds, no
392	// need to do deep comparison.
393	if creds != a.cachedCreds {
394		// Credentials already rotated.
395		return nil
396	}
397
398	return a.refreshCredsLocked(r)
399}
400
401// refreshCredsLocked executes the plugin and reads the credentials from
402// stdout. It must be called while holding the Authenticator's mutex.
403func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
404	interactive, err := a.interactiveFunc()
405	if err != nil {
406		return fmt.Errorf("exec plugin cannot support interactive mode: %w", err)
407	}
408
409	cred := &clientauthentication.ExecCredential{
410		Spec: clientauthentication.ExecCredentialSpec{
411			Response:    r,
412			Interactive: interactive,
413		},
414	}
415	if a.provideClusterInfo {
416		cred.Spec.Cluster = a.cluster
417	}
418
419	env := append(a.environ(), a.env...)
420	data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
421	if err != nil {
422		return fmt.Errorf("encode ExecCredentials: %v", err)
423	}
424	env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
425
426	stdout := &bytes.Buffer{}
427	cmd := exec.Command(a.cmd, a.args...)
428	cmd.Env = env
429	cmd.Stderr = a.stderr
430	cmd.Stdout = stdout
431	if interactive {
432		cmd.Stdin = a.stdin
433	}
434
435	err = cmd.Run()
436	incrementCallsMetric(err)
437	if err != nil {
438		return a.wrapCmdRunErrorLocked(err)
439	}
440
441	_, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
442	if err != nil {
443		return fmt.Errorf("decoding stdout: %v", err)
444	}
445	if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
446		return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
447			a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
448	}
449
450	if cred.Status == nil {
451		return fmt.Errorf("exec plugin didn't return a status field")
452	}
453	if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
454		return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
455	}
456	if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
457		return fmt.Errorf("exec plugin returned only certificate or key, not both")
458	}
459
460	if cred.Status.ExpirationTimestamp != nil {
461		a.exp = cred.Status.ExpirationTimestamp.Time
462	} else {
463		a.exp = time.Time{}
464	}
465
466	newCreds := &credentials{
467		token: cred.Status.Token,
468	}
469	if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
470		cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
471		if err != nil {
472			return fmt.Errorf("failed parsing client key/certificate: %v", err)
473		}
474
475		// Leaf is initialized to be nil:
476		//  https://golang.org/pkg/crypto/tls/#X509KeyPair
477		// Leaf certificate is the first certificate:
478		//  https://golang.org/pkg/crypto/tls/#Certificate
479		// Populating leaf is useful for quickly accessing the underlying x509
480		// certificate values.
481		cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
482		if err != nil {
483			return fmt.Errorf("failed parsing client leaf certificate: %v", err)
484		}
485		newCreds.cert = &cert
486	}
487
488	oldCreds := a.cachedCreds
489	a.cachedCreds = newCreds
490	// Only close all connections when TLS cert rotates. Token rotation doesn't
491	// need the extra noise.
492	if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
493		// Can be nil if the exec auth plugin only returned token auth.
494		if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
495			metrics.ClientCertRotationAge.Observe(time.Since(oldCreds.cert.Leaf.NotBefore))
496		}
497		a.connTracker.CloseAll()
498	}
499
500	expiry := time.Time{}
501	if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
502		expiry = a.cachedCreds.cert.Leaf.NotAfter
503	}
504	expirationMetrics.set(a, expiry)
505	return nil
506}
507
508// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
509// for when the exec plugin's binary fails to Run().
510//
511// It must be called while holding the Authenticator's mutex.
512func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
513	switch err.(type) {
514	case *exec.Error: // Binary does not exist (see exec.Error).
515		builder := strings.Builder{}
516		fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
517
518		a.sometimes.Do(func() {
519			fmt.Fprint(&builder, installHintVerboseHelp)
520			if a.installHint != "" {
521				fmt.Fprintf(&builder, "\n\n%s", a.installHint)
522			}
523		})
524
525		return errors.New(builder.String())
526
527	case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
528		e := err.(*exec.ExitError)
529		return fmt.Errorf(
530			"exec: executable %s failed with exit code %d",
531			a.cmd,
532			e.ProcessState.ExitCode(),
533		)
534
535	default:
536		return fmt.Errorf("exec: %v", err)
537	}
538}
539