1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package exec 18 19import ( 20 "bytes" 21 "crypto/tls" 22 "crypto/x509" 23 "errors" 24 "fmt" 25 "io" 26 "net" 27 "net/http" 28 "os" 29 "os/exec" 30 "reflect" 31 "strings" 32 "sync" 33 "time" 34 35 "github.com/davecgh/go-spew/spew" 36 "golang.org/x/term" 37 38 "k8s.io/apimachinery/pkg/runtime" 39 "k8s.io/apimachinery/pkg/runtime/schema" 40 "k8s.io/apimachinery/pkg/runtime/serializer" 41 "k8s.io/apimachinery/pkg/util/clock" 42 "k8s.io/client-go/pkg/apis/clientauthentication" 43 "k8s.io/client-go/pkg/apis/clientauthentication/install" 44 clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1" 45 clientauthenticationv1alpha1 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1" 46 clientauthenticationv1beta1 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1" 47 "k8s.io/client-go/tools/clientcmd/api" 48 "k8s.io/client-go/tools/metrics" 49 "k8s.io/client-go/transport" 50 "k8s.io/client-go/util/connrotation" 51 "k8s.io/klog/v2" 52) 53 54const execInfoEnv = "KUBERNETES_EXEC_INFO" 55const installHintVerboseHelp = ` 56 57It looks like you are trying to use a client-go credential plugin that is not installed. 58 59To learn more about this feature, consult the documentation available at: 60 https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins` 61 62var scheme = runtime.NewScheme() 63var codecs = serializer.NewCodecFactory(scheme) 64 65func init() { 66 install.Install(scheme) 67} 68 69var ( 70 // Since transports can be constantly re-initialized by programs like kubectl, 71 // keep a cache of initialized authenticators keyed by a hash of their config. 72 globalCache = newCache() 73 // The list of API versions we accept. 74 apiVersions = map[string]schema.GroupVersion{ 75 clientauthenticationv1alpha1.SchemeGroupVersion.String(): clientauthenticationv1alpha1.SchemeGroupVersion, 76 clientauthenticationv1beta1.SchemeGroupVersion.String(): clientauthenticationv1beta1.SchemeGroupVersion, 77 clientauthenticationv1.SchemeGroupVersion.String(): clientauthenticationv1.SchemeGroupVersion, 78 } 79) 80 81func newCache() *cache { 82 return &cache{m: make(map[string]*Authenticator)} 83} 84 85var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "} 86 87func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string { 88 key := struct { 89 conf *api.ExecConfig 90 cluster *clientauthentication.Cluster 91 }{ 92 conf: conf, 93 cluster: cluster, 94 } 95 return spewConfig.Sprint(key) 96} 97 98type cache struct { 99 mu sync.Mutex 100 m map[string]*Authenticator 101} 102 103func (c *cache) get(s string) (*Authenticator, bool) { 104 c.mu.Lock() 105 defer c.mu.Unlock() 106 a, ok := c.m[s] 107 return a, ok 108} 109 110// put inserts an authenticator into the cache. If an authenticator is already 111// associated with the key, the first one is returned instead. 112func (c *cache) put(s string, a *Authenticator) *Authenticator { 113 c.mu.Lock() 114 defer c.mu.Unlock() 115 existing, ok := c.m[s] 116 if ok { 117 return existing 118 } 119 c.m[s] = a 120 return a 121} 122 123// sometimes rate limits how often a function f() is called. Specifically, Do() 124// will run the provided function f() up to threshold times every interval 125// duration. 126type sometimes struct { 127 threshold int 128 interval time.Duration 129 130 clock clock.Clock 131 mu sync.Mutex 132 133 count int // times we have called f() in this window 134 window time.Time // beginning of current window of length interval 135} 136 137func (s *sometimes) Do(f func()) { 138 s.mu.Lock() 139 defer s.mu.Unlock() 140 141 now := s.clock.Now() 142 if s.window.IsZero() { 143 s.window = now 144 } 145 146 // If we are no longer in our saved time window, then we get to reset our run 147 // count back to 0 and start increasing towards the threshold again. 148 if inWindow := now.Sub(s.window) < s.interval; !inWindow { 149 s.window = now 150 s.count = 0 151 } 152 153 // If we have not run the function more than threshold times in this current 154 // time window, we get to run it now! 155 if underThreshold := s.count < s.threshold; underThreshold { 156 s.count++ 157 f() 158 } 159} 160 161// GetAuthenticator returns an exec-based plugin for providing client credentials. 162func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) { 163 return newAuthenticator(globalCache, term.IsTerminal, config, cluster) 164} 165 166func newAuthenticator(c *cache, isTerminalFunc func(int) bool, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) { 167 key := cacheKey(config, cluster) 168 if a, ok := c.get(key); ok { 169 return a, nil 170 } 171 172 gv, ok := apiVersions[config.APIVersion] 173 if !ok { 174 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion) 175 } 176 177 connTracker := connrotation.NewConnectionTracker() 178 defaultDialer := connrotation.NewDialerWithTracker( 179 (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext, 180 connTracker, 181 ) 182 183 a := &Authenticator{ 184 cmd: config.Command, 185 args: config.Args, 186 group: gv, 187 cluster: cluster, 188 provideClusterInfo: config.ProvideClusterInfo, 189 190 installHint: config.InstallHint, 191 sometimes: &sometimes{ 192 threshold: 10, 193 interval: time.Hour, 194 clock: clock.RealClock{}, 195 }, 196 197 stdin: os.Stdin, 198 stderr: os.Stderr, 199 interactiveFunc: func() (bool, error) { return isInteractive(isTerminalFunc, config) }, 200 now: time.Now, 201 environ: os.Environ, 202 203 defaultDialer: defaultDialer, 204 connTracker: connTracker, 205 } 206 207 for _, env := range config.Env { 208 a.env = append(a.env, env.Name+"="+env.Value) 209 } 210 211 return c.put(key, a), nil 212} 213 214func isInteractive(isTerminalFunc func(int) bool, config *api.ExecConfig) (bool, error) { 215 var shouldBeInteractive bool 216 switch config.InteractiveMode { 217 case api.NeverExecInteractiveMode: 218 shouldBeInteractive = false 219 case api.IfAvailableExecInteractiveMode: 220 shouldBeInteractive = !config.StdinUnavailable && isTerminalFunc(int(os.Stdin.Fd())) 221 case api.AlwaysExecInteractiveMode: 222 if !isTerminalFunc(int(os.Stdin.Fd())) { 223 return false, errors.New("standard input is not a terminal") 224 } 225 if config.StdinUnavailable { 226 suffix := "" 227 if len(config.StdinUnavailableMessage) > 0 { 228 // only print extra ": <message>" if the user actually specified a message 229 suffix = fmt.Sprintf(": %s", config.StdinUnavailableMessage) 230 } 231 return false, fmt.Errorf("standard input is unavailable%s", suffix) 232 } 233 shouldBeInteractive = true 234 default: 235 return false, fmt.Errorf("unknown interactiveMode: %q", config.InteractiveMode) 236 } 237 238 return shouldBeInteractive, nil 239} 240 241// Authenticator is a client credential provider that rotates credentials by executing a plugin. 242// The plugin input and output are defined by the API group client.authentication.k8s.io. 243type Authenticator struct { 244 // Set by the config 245 cmd string 246 args []string 247 group schema.GroupVersion 248 env []string 249 cluster *clientauthentication.Cluster 250 provideClusterInfo bool 251 252 // Used to avoid log spew by rate limiting install hint printing. We didn't do 253 // this by interval based rate limiting alone since that way may have prevented 254 // the install hint from showing up for kubectl users. 255 sometimes *sometimes 256 installHint string 257 258 // Stubbable for testing 259 stdin io.Reader 260 stderr io.Writer 261 interactiveFunc func() (bool, error) 262 now func() time.Time 263 environ func() []string 264 265 // defaultDialer is used for clients which don't specify a custom dialer 266 defaultDialer *connrotation.Dialer 267 // connTracker tracks all connections opened that we need to close when rotating a client certificate 268 connTracker *connrotation.ConnectionTracker 269 270 // Cached results. 271 // 272 // The mutex also guards calling the plugin. Since the plugin could be 273 // interactive we want to make sure it's only called once. 274 mu sync.Mutex 275 cachedCreds *credentials 276 exp time.Time 277} 278 279type credentials struct { 280 token string `datapolicy:"token"` 281 cert *tls.Certificate `datapolicy:"secret-key"` 282} 283 284// UpdateTransportConfig updates the transport.Config to use credentials 285// returned by the plugin. 286func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { 287 // If a bearer token is present in the request - avoid the GetCert callback when 288 // setting up the transport, as that triggers the exec action if the server is 289 // also configured to allow client certificates for authentication. For requests 290 // like "kubectl get --token (token) pods" we should assume the intention is to 291 // use the provided token for authentication. The same can be said for when the 292 // user specifies basic auth. 293 if c.HasTokenAuth() || c.HasBasicAuth() { 294 return nil 295 } 296 297 c.Wrap(func(rt http.RoundTripper) http.RoundTripper { 298 return &roundTripper{a, rt} 299 }) 300 301 if c.TLS.GetCert != nil { 302 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") 303 } 304 c.TLS.GetCert = a.cert 305 306 var d *connrotation.Dialer 307 if c.Dial != nil { 308 // if c has a custom dialer, we have to wrap it 309 d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker) 310 } else { 311 d = a.defaultDialer 312 } 313 314 c.Dial = d.DialContext 315 316 return nil 317} 318 319type roundTripper struct { 320 a *Authenticator 321 base http.RoundTripper 322} 323 324func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 325 // If a user has already set credentials, use that. This makes commands like 326 // "kubectl get --token (token) pods" work. 327 if req.Header.Get("Authorization") != "" { 328 return r.base.RoundTrip(req) 329 } 330 331 creds, err := r.a.getCreds() 332 if err != nil { 333 return nil, fmt.Errorf("getting credentials: %v", err) 334 } 335 if creds.token != "" { 336 req.Header.Set("Authorization", "Bearer "+creds.token) 337 } 338 339 res, err := r.base.RoundTrip(req) 340 if err != nil { 341 return nil, err 342 } 343 if res.StatusCode == http.StatusUnauthorized { 344 resp := &clientauthentication.Response{ 345 Header: res.Header, 346 Code: int32(res.StatusCode), 347 } 348 if err := r.a.maybeRefreshCreds(creds, resp); err != nil { 349 klog.Errorf("refreshing credentials: %v", err) 350 } 351 } 352 return res, nil 353} 354 355func (a *Authenticator) credsExpired() bool { 356 if a.exp.IsZero() { 357 return false 358 } 359 return a.now().After(a.exp) 360} 361 362func (a *Authenticator) cert() (*tls.Certificate, error) { 363 creds, err := a.getCreds() 364 if err != nil { 365 return nil, err 366 } 367 return creds.cert, nil 368} 369 370func (a *Authenticator) getCreds() (*credentials, error) { 371 a.mu.Lock() 372 defer a.mu.Unlock() 373 374 if a.cachedCreds != nil && !a.credsExpired() { 375 return a.cachedCreds, nil 376 } 377 378 if err := a.refreshCredsLocked(nil); err != nil { 379 return nil, err 380 } 381 382 return a.cachedCreds, nil 383} 384 385// maybeRefreshCreds executes the plugin to force a rotation of the 386// credentials, unless they were rotated already. 387func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error { 388 a.mu.Lock() 389 defer a.mu.Unlock() 390 391 // Since we're not making a new pointer to a.cachedCreds in getCreds, no 392 // need to do deep comparison. 393 if creds != a.cachedCreds { 394 // Credentials already rotated. 395 return nil 396 } 397 398 return a.refreshCredsLocked(r) 399} 400 401// refreshCredsLocked executes the plugin and reads the credentials from 402// stdout. It must be called while holding the Authenticator's mutex. 403func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error { 404 interactive, err := a.interactiveFunc() 405 if err != nil { 406 return fmt.Errorf("exec plugin cannot support interactive mode: %w", err) 407 } 408 409 cred := &clientauthentication.ExecCredential{ 410 Spec: clientauthentication.ExecCredentialSpec{ 411 Response: r, 412 Interactive: interactive, 413 }, 414 } 415 if a.provideClusterInfo { 416 cred.Spec.Cluster = a.cluster 417 } 418 419 env := append(a.environ(), a.env...) 420 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred) 421 if err != nil { 422 return fmt.Errorf("encode ExecCredentials: %v", err) 423 } 424 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data)) 425 426 stdout := &bytes.Buffer{} 427 cmd := exec.Command(a.cmd, a.args...) 428 cmd.Env = env 429 cmd.Stderr = a.stderr 430 cmd.Stdout = stdout 431 if interactive { 432 cmd.Stdin = a.stdin 433 } 434 435 err = cmd.Run() 436 incrementCallsMetric(err) 437 if err != nil { 438 return a.wrapCmdRunErrorLocked(err) 439 } 440 441 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred) 442 if err != nil { 443 return fmt.Errorf("decoding stdout: %v", err) 444 } 445 if gvk.Group != a.group.Group || gvk.Version != a.group.Version { 446 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s", 447 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version}) 448 } 449 450 if cred.Status == nil { 451 return fmt.Errorf("exec plugin didn't return a status field") 452 } 453 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" { 454 return fmt.Errorf("exec plugin didn't return a token or cert/key pair") 455 } 456 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") { 457 return fmt.Errorf("exec plugin returned only certificate or key, not both") 458 } 459 460 if cred.Status.ExpirationTimestamp != nil { 461 a.exp = cred.Status.ExpirationTimestamp.Time 462 } else { 463 a.exp = time.Time{} 464 } 465 466 newCreds := &credentials{ 467 token: cred.Status.Token, 468 } 469 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" { 470 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData)) 471 if err != nil { 472 return fmt.Errorf("failed parsing client key/certificate: %v", err) 473 } 474 475 // Leaf is initialized to be nil: 476 // https://golang.org/pkg/crypto/tls/#X509KeyPair 477 // Leaf certificate is the first certificate: 478 // https://golang.org/pkg/crypto/tls/#Certificate 479 // Populating leaf is useful for quickly accessing the underlying x509 480 // certificate values. 481 cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) 482 if err != nil { 483 return fmt.Errorf("failed parsing client leaf certificate: %v", err) 484 } 485 newCreds.cert = &cert 486 } 487 488 oldCreds := a.cachedCreds 489 a.cachedCreds = newCreds 490 // Only close all connections when TLS cert rotates. Token rotation doesn't 491 // need the extra noise. 492 if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) { 493 // Can be nil if the exec auth plugin only returned token auth. 494 if oldCreds.cert != nil && oldCreds.cert.Leaf != nil { 495 metrics.ClientCertRotationAge.Observe(time.Since(oldCreds.cert.Leaf.NotBefore)) 496 } 497 a.connTracker.CloseAll() 498 } 499 500 expiry := time.Time{} 501 if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil { 502 expiry = a.cachedCreds.cert.Leaf.NotAfter 503 } 504 expirationMetrics.set(a, expiry) 505 return nil 506} 507 508// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message 509// for when the exec plugin's binary fails to Run(). 510// 511// It must be called while holding the Authenticator's mutex. 512func (a *Authenticator) wrapCmdRunErrorLocked(err error) error { 513 switch err.(type) { 514 case *exec.Error: // Binary does not exist (see exec.Error). 515 builder := strings.Builder{} 516 fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd) 517 518 a.sometimes.Do(func() { 519 fmt.Fprint(&builder, installHintVerboseHelp) 520 if a.installHint != "" { 521 fmt.Fprintf(&builder, "\n\n%s", a.installHint) 522 } 523 }) 524 525 return errors.New(builder.String()) 526 527 case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()). 528 e := err.(*exec.ExitError) 529 return fmt.Errorf( 530 "exec: executable %s failed with exit code %d", 531 a.cmd, 532 e.ProcessState.ExitCode(), 533 ) 534 535 default: 536 return fmt.Errorf("exec: %v", err) 537 } 538} 539