1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package exec 18 19import ( 20 "bytes" 21 "context" 22 "crypto/tls" 23 "crypto/x509" 24 "errors" 25 "fmt" 26 "io" 27 "net" 28 "net/http" 29 "os" 30 "os/exec" 31 "reflect" 32 "strings" 33 "sync" 34 "time" 35 36 "github.com/davecgh/go-spew/spew" 37 "golang.org/x/crypto/ssh/terminal" 38 v1 "k8s.io/apimachinery/pkg/apis/meta/v1" 39 "k8s.io/apimachinery/pkg/runtime" 40 "k8s.io/apimachinery/pkg/runtime/schema" 41 "k8s.io/apimachinery/pkg/runtime/serializer" 42 "k8s.io/apimachinery/pkg/util/clock" 43 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 44 "k8s.io/client-go/pkg/apis/clientauthentication" 45 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1" 46 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1" 47 "k8s.io/client-go/tools/clientcmd/api" 48 "k8s.io/client-go/tools/metrics" 49 "k8s.io/client-go/transport" 50 "k8s.io/client-go/util/connrotation" 51 "k8s.io/klog/v2" 52) 53 54const execInfoEnv = "KUBERNETES_EXEC_INFO" 55const onRotateListWarningLength = 1000 56const installHintVerboseHelp = ` 57 58It looks like you are trying to use a client-go credential plugin that is not installed. 59 60To learn more about this feature, consult the documentation available at: 61 https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins` 62 63var scheme = runtime.NewScheme() 64var codecs = serializer.NewCodecFactory(scheme) 65 66func init() { 67 v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) 68 utilruntime.Must(v1alpha1.AddToScheme(scheme)) 69 utilruntime.Must(v1beta1.AddToScheme(scheme)) 70 utilruntime.Must(clientauthentication.AddToScheme(scheme)) 71} 72 73var ( 74 // Since transports can be constantly re-initialized by programs like kubectl, 75 // keep a cache of initialized authenticators keyed by a hash of their config. 76 globalCache = newCache() 77 // The list of API versions we accept. 78 apiVersions = map[string]schema.GroupVersion{ 79 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion, 80 v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion, 81 } 82) 83 84func newCache() *cache { 85 return &cache{m: make(map[string]*Authenticator)} 86} 87 88var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "} 89 90func cacheKey(c *api.ExecConfig) string { 91 return spewConfig.Sprint(c) 92} 93 94type cache struct { 95 mu sync.Mutex 96 m map[string]*Authenticator 97} 98 99func (c *cache) get(s string) (*Authenticator, bool) { 100 c.mu.Lock() 101 defer c.mu.Unlock() 102 a, ok := c.m[s] 103 return a, ok 104} 105 106// put inserts an authenticator into the cache. If an authenticator is already 107// associated with the key, the first one is returned instead. 108func (c *cache) put(s string, a *Authenticator) *Authenticator { 109 c.mu.Lock() 110 defer c.mu.Unlock() 111 existing, ok := c.m[s] 112 if ok { 113 return existing 114 } 115 c.m[s] = a 116 return a 117} 118 119// sometimes rate limits how often a function f() is called. Specifically, Do() 120// will run the provided function f() up to threshold times every interval 121// duration. 122type sometimes struct { 123 threshold int 124 interval time.Duration 125 126 clock clock.Clock 127 mu sync.Mutex 128 129 count int // times we have called f() in this window 130 window time.Time // beginning of current window of length interval 131} 132 133func (s *sometimes) Do(f func()) { 134 s.mu.Lock() 135 defer s.mu.Unlock() 136 137 now := s.clock.Now() 138 if s.window.IsZero() { 139 s.window = now 140 } 141 142 // If we are no longer in our saved time window, then we get to reset our run 143 // count back to 0 and start increasing towards the threshold again. 144 if inWindow := now.Sub(s.window) < s.interval; !inWindow { 145 s.window = now 146 s.count = 0 147 } 148 149 // If we have not run the function more than threshold times in this current 150 // time window, we get to run it now! 151 if underThreshold := s.count < s.threshold; underThreshold { 152 s.count++ 153 f() 154 } 155} 156 157// GetAuthenticator returns an exec-based plugin for providing client credentials. 158func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) { 159 return newAuthenticator(globalCache, config) 160} 161 162func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) { 163 key := cacheKey(config) 164 if a, ok := c.get(key); ok { 165 return a, nil 166 } 167 168 gv, ok := apiVersions[config.APIVersion] 169 if !ok { 170 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion) 171 } 172 173 a := &Authenticator{ 174 cmd: config.Command, 175 args: config.Args, 176 group: gv, 177 178 installHint: config.InstallHint, 179 sometimes: &sometimes{ 180 threshold: 10, 181 interval: time.Hour, 182 clock: clock.RealClock{}, 183 }, 184 185 stdin: os.Stdin, 186 stderr: os.Stderr, 187 interactive: terminal.IsTerminal(int(os.Stdout.Fd())), 188 now: time.Now, 189 environ: os.Environ, 190 } 191 192 for _, env := range config.Env { 193 a.env = append(a.env, env.Name+"="+env.Value) 194 } 195 196 return c.put(key, a), nil 197} 198 199// Authenticator is a client credential provider that rotates credentials by executing a plugin. 200// The plugin input and output are defined by the API group client.authentication.k8s.io. 201type Authenticator struct { 202 // Set by the config 203 cmd string 204 args []string 205 group schema.GroupVersion 206 env []string 207 208 // Used to avoid log spew by rate limiting install hint printing. We didn't do 209 // this by interval based rate limiting alone since that way may have prevented 210 // the install hint from showing up for kubectl users. 211 sometimes *sometimes 212 installHint string 213 214 // Stubbable for testing 215 stdin io.Reader 216 stderr io.Writer 217 interactive bool 218 now func() time.Time 219 environ func() []string 220 221 // Cached results. 222 // 223 // The mutex also guards calling the plugin. Since the plugin could be 224 // interactive we want to make sure it's only called once. 225 mu sync.Mutex 226 cachedCreds *credentials 227 exp time.Time 228 229 onRotateList []func() 230} 231 232type credentials struct { 233 token string 234 cert *tls.Certificate 235} 236 237// UpdateTransportConfig updates the transport.Config to use credentials 238// returned by the plugin. 239func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { 240 // If a bearer token is present in the request - avoid the GetCert callback when 241 // setting up the transport, as that triggers the exec action if the server is 242 // also configured to allow client certificates for authentication. For requests 243 // like "kubectl get --token (token) pods" we should assume the intention is to 244 // use the provided token for authentication. 245 if c.HasTokenAuth() { 246 return nil 247 } 248 249 c.Wrap(func(rt http.RoundTripper) http.RoundTripper { 250 return &roundTripper{a, rt} 251 }) 252 253 if c.TLS.GetCert != nil { 254 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") 255 } 256 c.TLS.GetCert = a.cert 257 258 var dial func(ctx context.Context, network, addr string) (net.Conn, error) 259 if c.Dial != nil { 260 dial = c.Dial 261 } else { 262 dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext 263 } 264 d := connrotation.NewDialer(dial) 265 266 a.mu.Lock() 267 defer a.mu.Unlock() 268 a.onRotateList = append(a.onRotateList, d.CloseAll) 269 onRotateListLength := len(a.onRotateList) 270 if onRotateListLength > onRotateListWarningLength { 271 klog.Warningf("constructing many client instances from the same exec auth config can cause performance problems during cert rotation and can exhaust available network connections; %d clients constructed calling %q", onRotateListLength, a.cmd) 272 } 273 274 c.Dial = d.DialContext 275 276 return nil 277} 278 279type roundTripper struct { 280 a *Authenticator 281 base http.RoundTripper 282} 283 284func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 285 // If a user has already set credentials, use that. This makes commands like 286 // "kubectl get --token (token) pods" work. 287 if req.Header.Get("Authorization") != "" { 288 return r.base.RoundTrip(req) 289 } 290 291 creds, err := r.a.getCreds() 292 if err != nil { 293 return nil, fmt.Errorf("getting credentials: %v", err) 294 } 295 if creds.token != "" { 296 req.Header.Set("Authorization", "Bearer "+creds.token) 297 } 298 299 res, err := r.base.RoundTrip(req) 300 if err != nil { 301 return nil, err 302 } 303 if res.StatusCode == http.StatusUnauthorized { 304 resp := &clientauthentication.Response{ 305 Header: res.Header, 306 Code: int32(res.StatusCode), 307 } 308 if err := r.a.maybeRefreshCreds(creds, resp); err != nil { 309 klog.Errorf("refreshing credentials: %v", err) 310 } 311 } 312 return res, nil 313} 314 315func (a *Authenticator) credsExpired() bool { 316 if a.exp.IsZero() { 317 return false 318 } 319 return a.now().After(a.exp) 320} 321 322func (a *Authenticator) cert() (*tls.Certificate, error) { 323 creds, err := a.getCreds() 324 if err != nil { 325 return nil, err 326 } 327 return creds.cert, nil 328} 329 330func (a *Authenticator) getCreds() (*credentials, error) { 331 a.mu.Lock() 332 defer a.mu.Unlock() 333 334 if a.cachedCreds != nil && !a.credsExpired() { 335 return a.cachedCreds, nil 336 } 337 338 if err := a.refreshCredsLocked(nil); err != nil { 339 return nil, err 340 } 341 342 return a.cachedCreds, nil 343} 344 345// maybeRefreshCreds executes the plugin to force a rotation of the 346// credentials, unless they were rotated already. 347func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error { 348 a.mu.Lock() 349 defer a.mu.Unlock() 350 351 // Since we're not making a new pointer to a.cachedCreds in getCreds, no 352 // need to do deep comparison. 353 if creds != a.cachedCreds { 354 // Credentials already rotated. 355 return nil 356 } 357 358 return a.refreshCredsLocked(r) 359} 360 361// refreshCredsLocked executes the plugin and reads the credentials from 362// stdout. It must be called while holding the Authenticator's mutex. 363func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error { 364 cred := &clientauthentication.ExecCredential{ 365 Spec: clientauthentication.ExecCredentialSpec{ 366 Response: r, 367 Interactive: a.interactive, 368 }, 369 } 370 371 env := append(a.environ(), a.env...) 372 if a.group == v1alpha1.SchemeGroupVersion { 373 // Input spec disabled for beta due to lack of use. Possibly re-enable this later if 374 // someone wants it back. 375 // 376 // See: https://github.com/kubernetes/kubernetes/issues/61796 377 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred) 378 if err != nil { 379 return fmt.Errorf("encode ExecCredentials: %v", err) 380 } 381 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data)) 382 } 383 384 stdout := &bytes.Buffer{} 385 cmd := exec.Command(a.cmd, a.args...) 386 cmd.Env = env 387 cmd.Stderr = a.stderr 388 cmd.Stdout = stdout 389 if a.interactive { 390 cmd.Stdin = a.stdin 391 } 392 393 if err := cmd.Run(); err != nil { 394 return a.wrapCmdRunErrorLocked(err) 395 } 396 397 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred) 398 if err != nil { 399 return fmt.Errorf("decoding stdout: %v", err) 400 } 401 if gvk.Group != a.group.Group || gvk.Version != a.group.Version { 402 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s", 403 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version}) 404 } 405 406 if cred.Status == nil { 407 return fmt.Errorf("exec plugin didn't return a status field") 408 } 409 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" { 410 return fmt.Errorf("exec plugin didn't return a token or cert/key pair") 411 } 412 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") { 413 return fmt.Errorf("exec plugin returned only certificate or key, not both") 414 } 415 416 if cred.Status.ExpirationTimestamp != nil { 417 a.exp = cred.Status.ExpirationTimestamp.Time 418 } else { 419 a.exp = time.Time{} 420 } 421 422 newCreds := &credentials{ 423 token: cred.Status.Token, 424 } 425 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" { 426 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData)) 427 if err != nil { 428 return fmt.Errorf("failed parsing client key/certificate: %v", err) 429 } 430 431 // Leaf is initialized to be nil: 432 // https://golang.org/pkg/crypto/tls/#X509KeyPair 433 // Leaf certificate is the first certificate: 434 // https://golang.org/pkg/crypto/tls/#Certificate 435 // Populating leaf is useful for quickly accessing the underlying x509 436 // certificate values. 437 cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) 438 if err != nil { 439 return fmt.Errorf("failed parsing client leaf certificate: %v", err) 440 } 441 newCreds.cert = &cert 442 } 443 444 oldCreds := a.cachedCreds 445 a.cachedCreds = newCreds 446 // Only close all connections when TLS cert rotates. Token rotation doesn't 447 // need the extra noise. 448 if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) { 449 // Can be nil if the exec auth plugin only returned token auth. 450 if oldCreds.cert != nil && oldCreds.cert.Leaf != nil { 451 metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore)) 452 } 453 for _, onRotate := range a.onRotateList { 454 onRotate() 455 } 456 } 457 458 expiry := time.Time{} 459 if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil { 460 expiry = a.cachedCreds.cert.Leaf.NotAfter 461 } 462 expirationMetrics.set(a, expiry) 463 return nil 464} 465 466// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message 467// for when the exec plugin's binary fails to Run(). 468// 469// It must be called while holding the Authenticator's mutex. 470func (a *Authenticator) wrapCmdRunErrorLocked(err error) error { 471 switch err.(type) { 472 case *exec.Error: // Binary does not exist (see exec.Error). 473 builder := strings.Builder{} 474 fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd) 475 476 a.sometimes.Do(func() { 477 fmt.Fprint(&builder, installHintVerboseHelp) 478 if a.installHint != "" { 479 fmt.Fprintf(&builder, "\n\n%s", a.installHint) 480 } 481 }) 482 483 return errors.New(builder.String()) 484 485 case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()). 486 e := err.(*exec.ExitError) 487 return fmt.Errorf( 488 "exec: executable %s failed with exit code %d", 489 a.cmd, 490 e.ProcessState.ExitCode(), 491 ) 492 493 default: 494 return fmt.Errorf("exec: %v", err) 495 } 496} 497