1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20	"bytes"
21	"crypto/tls"
22	"crypto/x509"
23	"errors"
24	"fmt"
25	"io"
26	"net"
27	"net/http"
28	"os"
29	"os/exec"
30	"reflect"
31	"strings"
32	"sync"
33	"time"
34
35	"github.com/davecgh/go-spew/spew"
36	"golang.org/x/term"
37
38	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39	"k8s.io/apimachinery/pkg/runtime"
40	"k8s.io/apimachinery/pkg/runtime/schema"
41	"k8s.io/apimachinery/pkg/runtime/serializer"
42	"k8s.io/apimachinery/pkg/util/clock"
43	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
44	"k8s.io/client-go/pkg/apis/clientauthentication"
45	"k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
46	"k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
47	"k8s.io/client-go/tools/clientcmd/api"
48	"k8s.io/client-go/tools/metrics"
49	"k8s.io/client-go/transport"
50	"k8s.io/client-go/util/connrotation"
51	"k8s.io/klog/v2"
52)
53
54const execInfoEnv = "KUBERNETES_EXEC_INFO"
55const installHintVerboseHelp = `
56
57It looks like you are trying to use a client-go credential plugin that is not installed.
58
59To learn more about this feature, consult the documentation available at:
60      https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
61
62var scheme = runtime.NewScheme()
63var codecs = serializer.NewCodecFactory(scheme)
64
65func init() {
66	v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
67	utilruntime.Must(v1alpha1.AddToScheme(scheme))
68	utilruntime.Must(v1beta1.AddToScheme(scheme))
69	utilruntime.Must(clientauthentication.AddToScheme(scheme))
70}
71
72var (
73	// Since transports can be constantly re-initialized by programs like kubectl,
74	// keep a cache of initialized authenticators keyed by a hash of their config.
75	globalCache = newCache()
76	// The list of API versions we accept.
77	apiVersions = map[string]schema.GroupVersion{
78		v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
79		v1beta1.SchemeGroupVersion.String():  v1beta1.SchemeGroupVersion,
80	}
81)
82
83func newCache() *cache {
84	return &cache{m: make(map[string]*Authenticator)}
85}
86
87var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
88
89func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
90	key := struct {
91		conf    *api.ExecConfig
92		cluster *clientauthentication.Cluster
93	}{
94		conf:    conf,
95		cluster: cluster,
96	}
97	return spewConfig.Sprint(key)
98}
99
100type cache struct {
101	mu sync.Mutex
102	m  map[string]*Authenticator
103}
104
105func (c *cache) get(s string) (*Authenticator, bool) {
106	c.mu.Lock()
107	defer c.mu.Unlock()
108	a, ok := c.m[s]
109	return a, ok
110}
111
112// put inserts an authenticator into the cache. If an authenticator is already
113// associated with the key, the first one is returned instead.
114func (c *cache) put(s string, a *Authenticator) *Authenticator {
115	c.mu.Lock()
116	defer c.mu.Unlock()
117	existing, ok := c.m[s]
118	if ok {
119		return existing
120	}
121	c.m[s] = a
122	return a
123}
124
125// sometimes rate limits how often a function f() is called. Specifically, Do()
126// will run the provided function f() up to threshold times every interval
127// duration.
128type sometimes struct {
129	threshold int
130	interval  time.Duration
131
132	clock clock.Clock
133	mu    sync.Mutex
134
135	count  int       // times we have called f() in this window
136	window time.Time // beginning of current window of length interval
137}
138
139func (s *sometimes) Do(f func()) {
140	s.mu.Lock()
141	defer s.mu.Unlock()
142
143	now := s.clock.Now()
144	if s.window.IsZero() {
145		s.window = now
146	}
147
148	// If we are no longer in our saved time window, then we get to reset our run
149	// count back to 0 and start increasing towards the threshold again.
150	if inWindow := now.Sub(s.window) < s.interval; !inWindow {
151		s.window = now
152		s.count = 0
153	}
154
155	// If we have not run the function more than threshold times in this current
156	// time window, we get to run it now!
157	if underThreshold := s.count < s.threshold; underThreshold {
158		s.count++
159		f()
160	}
161}
162
163// GetAuthenticator returns an exec-based plugin for providing client credentials.
164func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
165	return newAuthenticator(globalCache, config, cluster)
166}
167
168func newAuthenticator(c *cache, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
169	key := cacheKey(config, cluster)
170	if a, ok := c.get(key); ok {
171		return a, nil
172	}
173
174	gv, ok := apiVersions[config.APIVersion]
175	if !ok {
176		return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
177	}
178
179	connTracker := connrotation.NewConnectionTracker()
180	defaultDialer := connrotation.NewDialerWithTracker(
181		(&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
182		connTracker,
183	)
184
185	a := &Authenticator{
186		cmd:                config.Command,
187		args:               config.Args,
188		group:              gv,
189		cluster:            cluster,
190		provideClusterInfo: config.ProvideClusterInfo,
191
192		installHint: config.InstallHint,
193		sometimes: &sometimes{
194			threshold: 10,
195			interval:  time.Hour,
196			clock:     clock.RealClock{},
197		},
198
199		stdin:       os.Stdin,
200		stderr:      os.Stderr,
201		interactive: term.IsTerminal(int(os.Stdin.Fd())),
202		now:         time.Now,
203		environ:     os.Environ,
204
205		defaultDialer: defaultDialer,
206		connTracker:   connTracker,
207	}
208
209	for _, env := range config.Env {
210		a.env = append(a.env, env.Name+"="+env.Value)
211	}
212
213	return c.put(key, a), nil
214}
215
216// Authenticator is a client credential provider that rotates credentials by executing a plugin.
217// The plugin input and output are defined by the API group client.authentication.k8s.io.
218type Authenticator struct {
219	// Set by the config
220	cmd                string
221	args               []string
222	group              schema.GroupVersion
223	env                []string
224	cluster            *clientauthentication.Cluster
225	provideClusterInfo bool
226
227	// Used to avoid log spew by rate limiting install hint printing. We didn't do
228	// this by interval based rate limiting alone since that way may have prevented
229	// the install hint from showing up for kubectl users.
230	sometimes   *sometimes
231	installHint string
232
233	// Stubbable for testing
234	stdin       io.Reader
235	stderr      io.Writer
236	interactive bool
237	now         func() time.Time
238	environ     func() []string
239
240	// defaultDialer is used for clients which don't specify a custom dialer
241	defaultDialer *connrotation.Dialer
242	// connTracker tracks all connections opened that we need to close when rotating a client certificate
243	connTracker *connrotation.ConnectionTracker
244
245	// Cached results.
246	//
247	// The mutex also guards calling the plugin. Since the plugin could be
248	// interactive we want to make sure it's only called once.
249	mu          sync.Mutex
250	cachedCreds *credentials
251	exp         time.Time
252}
253
254type credentials struct {
255	token string           `datapolicy:"token"`
256	cert  *tls.Certificate `datapolicy:"secret-key"`
257}
258
259// UpdateTransportConfig updates the transport.Config to use credentials
260// returned by the plugin.
261func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
262	// If a bearer token is present in the request - avoid the GetCert callback when
263	// setting up the transport, as that triggers the exec action if the server is
264	// also configured to allow client certificates for authentication. For requests
265	// like "kubectl get --token (token) pods" we should assume the intention is to
266	// use the provided token for authentication.
267	if c.HasTokenAuth() {
268		return nil
269	}
270
271	c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
272		return &roundTripper{a, rt}
273	})
274
275	if c.TLS.GetCert != nil {
276		return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
277	}
278	c.TLS.GetCert = a.cert
279
280	var d *connrotation.Dialer
281	if c.Dial != nil {
282		// if c has a custom dialer, we have to wrap it
283		d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
284	} else {
285		d = a.defaultDialer
286	}
287
288	c.Dial = d.DialContext
289
290	return nil
291}
292
293type roundTripper struct {
294	a    *Authenticator
295	base http.RoundTripper
296}
297
298func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
299	// If a user has already set credentials, use that. This makes commands like
300	// "kubectl get --token (token) pods" work.
301	if req.Header.Get("Authorization") != "" {
302		return r.base.RoundTrip(req)
303	}
304
305	creds, err := r.a.getCreds()
306	if err != nil {
307		return nil, fmt.Errorf("getting credentials: %v", err)
308	}
309	if creds.token != "" {
310		req.Header.Set("Authorization", "Bearer "+creds.token)
311	}
312
313	res, err := r.base.RoundTrip(req)
314	if err != nil {
315		return nil, err
316	}
317	if res.StatusCode == http.StatusUnauthorized {
318		resp := &clientauthentication.Response{
319			Header: res.Header,
320			Code:   int32(res.StatusCode),
321		}
322		if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
323			klog.Errorf("refreshing credentials: %v", err)
324		}
325	}
326	return res, nil
327}
328
329func (a *Authenticator) credsExpired() bool {
330	if a.exp.IsZero() {
331		return false
332	}
333	return a.now().After(a.exp)
334}
335
336func (a *Authenticator) cert() (*tls.Certificate, error) {
337	creds, err := a.getCreds()
338	if err != nil {
339		return nil, err
340	}
341	return creds.cert, nil
342}
343
344func (a *Authenticator) getCreds() (*credentials, error) {
345	a.mu.Lock()
346	defer a.mu.Unlock()
347
348	if a.cachedCreds != nil && !a.credsExpired() {
349		return a.cachedCreds, nil
350	}
351
352	if err := a.refreshCredsLocked(nil); err != nil {
353		return nil, err
354	}
355
356	return a.cachedCreds, nil
357}
358
359// maybeRefreshCreds executes the plugin to force a rotation of the
360// credentials, unless they were rotated already.
361func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
362	a.mu.Lock()
363	defer a.mu.Unlock()
364
365	// Since we're not making a new pointer to a.cachedCreds in getCreds, no
366	// need to do deep comparison.
367	if creds != a.cachedCreds {
368		// Credentials already rotated.
369		return nil
370	}
371
372	return a.refreshCredsLocked(r)
373}
374
375// refreshCredsLocked executes the plugin and reads the credentials from
376// stdout. It must be called while holding the Authenticator's mutex.
377func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
378	cred := &clientauthentication.ExecCredential{
379		Spec: clientauthentication.ExecCredentialSpec{
380			Response:    r,
381			Interactive: a.interactive,
382		},
383	}
384	if a.provideClusterInfo {
385		cred.Spec.Cluster = a.cluster
386	}
387
388	env := append(a.environ(), a.env...)
389	data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
390	if err != nil {
391		return fmt.Errorf("encode ExecCredentials: %v", err)
392	}
393	env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
394
395	stdout := &bytes.Buffer{}
396	cmd := exec.Command(a.cmd, a.args...)
397	cmd.Env = env
398	cmd.Stderr = a.stderr
399	cmd.Stdout = stdout
400	if a.interactive {
401		cmd.Stdin = a.stdin
402	}
403
404	err = cmd.Run()
405	incrementCallsMetric(err)
406	if err != nil {
407		return a.wrapCmdRunErrorLocked(err)
408	}
409
410	_, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
411	if err != nil {
412		return fmt.Errorf("decoding stdout: %v", err)
413	}
414	if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
415		return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
416			a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
417	}
418
419	if cred.Status == nil {
420		return fmt.Errorf("exec plugin didn't return a status field")
421	}
422	if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
423		return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
424	}
425	if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
426		return fmt.Errorf("exec plugin returned only certificate or key, not both")
427	}
428
429	if cred.Status.ExpirationTimestamp != nil {
430		a.exp = cred.Status.ExpirationTimestamp.Time
431	} else {
432		a.exp = time.Time{}
433	}
434
435	newCreds := &credentials{
436		token: cred.Status.Token,
437	}
438	if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
439		cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
440		if err != nil {
441			return fmt.Errorf("failed parsing client key/certificate: %v", err)
442		}
443
444		// Leaf is initialized to be nil:
445		//  https://golang.org/pkg/crypto/tls/#X509KeyPair
446		// Leaf certificate is the first certificate:
447		//  https://golang.org/pkg/crypto/tls/#Certificate
448		// Populating leaf is useful for quickly accessing the underlying x509
449		// certificate values.
450		cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
451		if err != nil {
452			return fmt.Errorf("failed parsing client leaf certificate: %v", err)
453		}
454		newCreds.cert = &cert
455	}
456
457	oldCreds := a.cachedCreds
458	a.cachedCreds = newCreds
459	// Only close all connections when TLS cert rotates. Token rotation doesn't
460	// need the extra noise.
461	if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
462		// Can be nil if the exec auth plugin only returned token auth.
463		if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
464			metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
465		}
466		a.connTracker.CloseAll()
467	}
468
469	expiry := time.Time{}
470	if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
471		expiry = a.cachedCreds.cert.Leaf.NotAfter
472	}
473	expirationMetrics.set(a, expiry)
474	return nil
475}
476
477// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
478// for when the exec plugin's binary fails to Run().
479//
480// It must be called while holding the Authenticator's mutex.
481func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
482	switch err.(type) {
483	case *exec.Error: // Binary does not exist (see exec.Error).
484		builder := strings.Builder{}
485		fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
486
487		a.sometimes.Do(func() {
488			fmt.Fprint(&builder, installHintVerboseHelp)
489			if a.installHint != "" {
490				fmt.Fprintf(&builder, "\n\n%s", a.installHint)
491			}
492		})
493
494		return errors.New(builder.String())
495
496	case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
497		e := err.(*exec.ExitError)
498		return fmt.Errorf(
499			"exec: executable %s failed with exit code %d",
500			a.cmd,
501			e.ProcessState.ExitCode(),
502		)
503
504	default:
505		return fmt.Errorf("exec: %v", err)
506	}
507}
508