1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package exec 18 19import ( 20 "bytes" 21 "crypto/tls" 22 "crypto/x509" 23 "errors" 24 "fmt" 25 "io" 26 "net" 27 "net/http" 28 "os" 29 "os/exec" 30 "reflect" 31 "strings" 32 "sync" 33 "time" 34 35 "github.com/davecgh/go-spew/spew" 36 "golang.org/x/term" 37 38 v1 "k8s.io/apimachinery/pkg/apis/meta/v1" 39 "k8s.io/apimachinery/pkg/runtime" 40 "k8s.io/apimachinery/pkg/runtime/schema" 41 "k8s.io/apimachinery/pkg/runtime/serializer" 42 "k8s.io/apimachinery/pkg/util/clock" 43 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 44 "k8s.io/client-go/pkg/apis/clientauthentication" 45 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1" 46 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1" 47 "k8s.io/client-go/tools/clientcmd/api" 48 "k8s.io/client-go/tools/metrics" 49 "k8s.io/client-go/transport" 50 "k8s.io/client-go/util/connrotation" 51 "k8s.io/klog/v2" 52) 53 54const execInfoEnv = "KUBERNETES_EXEC_INFO" 55const installHintVerboseHelp = ` 56 57It looks like you are trying to use a client-go credential plugin that is not installed. 58 59To learn more about this feature, consult the documentation available at: 60 https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins` 61 62var scheme = runtime.NewScheme() 63var codecs = serializer.NewCodecFactory(scheme) 64 65func init() { 66 v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) 67 utilruntime.Must(v1alpha1.AddToScheme(scheme)) 68 utilruntime.Must(v1beta1.AddToScheme(scheme)) 69 utilruntime.Must(clientauthentication.AddToScheme(scheme)) 70} 71 72var ( 73 // Since transports can be constantly re-initialized by programs like kubectl, 74 // keep a cache of initialized authenticators keyed by a hash of their config. 75 globalCache = newCache() 76 // The list of API versions we accept. 77 apiVersions = map[string]schema.GroupVersion{ 78 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion, 79 v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion, 80 } 81) 82 83func newCache() *cache { 84 return &cache{m: make(map[string]*Authenticator)} 85} 86 87var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "} 88 89func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string { 90 key := struct { 91 conf *api.ExecConfig 92 cluster *clientauthentication.Cluster 93 }{ 94 conf: conf, 95 cluster: cluster, 96 } 97 return spewConfig.Sprint(key) 98} 99 100type cache struct { 101 mu sync.Mutex 102 m map[string]*Authenticator 103} 104 105func (c *cache) get(s string) (*Authenticator, bool) { 106 c.mu.Lock() 107 defer c.mu.Unlock() 108 a, ok := c.m[s] 109 return a, ok 110} 111 112// put inserts an authenticator into the cache. If an authenticator is already 113// associated with the key, the first one is returned instead. 114func (c *cache) put(s string, a *Authenticator) *Authenticator { 115 c.mu.Lock() 116 defer c.mu.Unlock() 117 existing, ok := c.m[s] 118 if ok { 119 return existing 120 } 121 c.m[s] = a 122 return a 123} 124 125// sometimes rate limits how often a function f() is called. Specifically, Do() 126// will run the provided function f() up to threshold times every interval 127// duration. 128type sometimes struct { 129 threshold int 130 interval time.Duration 131 132 clock clock.Clock 133 mu sync.Mutex 134 135 count int // times we have called f() in this window 136 window time.Time // beginning of current window of length interval 137} 138 139func (s *sometimes) Do(f func()) { 140 s.mu.Lock() 141 defer s.mu.Unlock() 142 143 now := s.clock.Now() 144 if s.window.IsZero() { 145 s.window = now 146 } 147 148 // If we are no longer in our saved time window, then we get to reset our run 149 // count back to 0 and start increasing towards the threshold again. 150 if inWindow := now.Sub(s.window) < s.interval; !inWindow { 151 s.window = now 152 s.count = 0 153 } 154 155 // If we have not run the function more than threshold times in this current 156 // time window, we get to run it now! 157 if underThreshold := s.count < s.threshold; underThreshold { 158 s.count++ 159 f() 160 } 161} 162 163// GetAuthenticator returns an exec-based plugin for providing client credentials. 164func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) { 165 return newAuthenticator(globalCache, config, cluster) 166} 167 168func newAuthenticator(c *cache, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) { 169 key := cacheKey(config, cluster) 170 if a, ok := c.get(key); ok { 171 return a, nil 172 } 173 174 gv, ok := apiVersions[config.APIVersion] 175 if !ok { 176 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion) 177 } 178 179 connTracker := connrotation.NewConnectionTracker() 180 defaultDialer := connrotation.NewDialerWithTracker( 181 (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext, 182 connTracker, 183 ) 184 185 a := &Authenticator{ 186 cmd: config.Command, 187 args: config.Args, 188 group: gv, 189 cluster: cluster, 190 provideClusterInfo: config.ProvideClusterInfo, 191 192 installHint: config.InstallHint, 193 sometimes: &sometimes{ 194 threshold: 10, 195 interval: time.Hour, 196 clock: clock.RealClock{}, 197 }, 198 199 stdin: os.Stdin, 200 stderr: os.Stderr, 201 interactive: term.IsTerminal(int(os.Stdin.Fd())), 202 now: time.Now, 203 environ: os.Environ, 204 205 defaultDialer: defaultDialer, 206 connTracker: connTracker, 207 } 208 209 for _, env := range config.Env { 210 a.env = append(a.env, env.Name+"="+env.Value) 211 } 212 213 return c.put(key, a), nil 214} 215 216// Authenticator is a client credential provider that rotates credentials by executing a plugin. 217// The plugin input and output are defined by the API group client.authentication.k8s.io. 218type Authenticator struct { 219 // Set by the config 220 cmd string 221 args []string 222 group schema.GroupVersion 223 env []string 224 cluster *clientauthentication.Cluster 225 provideClusterInfo bool 226 227 // Used to avoid log spew by rate limiting install hint printing. We didn't do 228 // this by interval based rate limiting alone since that way may have prevented 229 // the install hint from showing up for kubectl users. 230 sometimes *sometimes 231 installHint string 232 233 // Stubbable for testing 234 stdin io.Reader 235 stderr io.Writer 236 interactive bool 237 now func() time.Time 238 environ func() []string 239 240 // defaultDialer is used for clients which don't specify a custom dialer 241 defaultDialer *connrotation.Dialer 242 // connTracker tracks all connections opened that we need to close when rotating a client certificate 243 connTracker *connrotation.ConnectionTracker 244 245 // Cached results. 246 // 247 // The mutex also guards calling the plugin. Since the plugin could be 248 // interactive we want to make sure it's only called once. 249 mu sync.Mutex 250 cachedCreds *credentials 251 exp time.Time 252} 253 254type credentials struct { 255 token string `datapolicy:"token"` 256 cert *tls.Certificate `datapolicy:"secret-key"` 257} 258 259// UpdateTransportConfig updates the transport.Config to use credentials 260// returned by the plugin. 261func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { 262 // If a bearer token is present in the request - avoid the GetCert callback when 263 // setting up the transport, as that triggers the exec action if the server is 264 // also configured to allow client certificates for authentication. For requests 265 // like "kubectl get --token (token) pods" we should assume the intention is to 266 // use the provided token for authentication. 267 if c.HasTokenAuth() { 268 return nil 269 } 270 271 c.Wrap(func(rt http.RoundTripper) http.RoundTripper { 272 return &roundTripper{a, rt} 273 }) 274 275 if c.TLS.GetCert != nil { 276 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") 277 } 278 c.TLS.GetCert = a.cert 279 280 var d *connrotation.Dialer 281 if c.Dial != nil { 282 // if c has a custom dialer, we have to wrap it 283 d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker) 284 } else { 285 d = a.defaultDialer 286 } 287 288 c.Dial = d.DialContext 289 290 return nil 291} 292 293type roundTripper struct { 294 a *Authenticator 295 base http.RoundTripper 296} 297 298func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 299 // If a user has already set credentials, use that. This makes commands like 300 // "kubectl get --token (token) pods" work. 301 if req.Header.Get("Authorization") != "" { 302 return r.base.RoundTrip(req) 303 } 304 305 creds, err := r.a.getCreds() 306 if err != nil { 307 return nil, fmt.Errorf("getting credentials: %v", err) 308 } 309 if creds.token != "" { 310 req.Header.Set("Authorization", "Bearer "+creds.token) 311 } 312 313 res, err := r.base.RoundTrip(req) 314 if err != nil { 315 return nil, err 316 } 317 if res.StatusCode == http.StatusUnauthorized { 318 resp := &clientauthentication.Response{ 319 Header: res.Header, 320 Code: int32(res.StatusCode), 321 } 322 if err := r.a.maybeRefreshCreds(creds, resp); err != nil { 323 klog.Errorf("refreshing credentials: %v", err) 324 } 325 } 326 return res, nil 327} 328 329func (a *Authenticator) credsExpired() bool { 330 if a.exp.IsZero() { 331 return false 332 } 333 return a.now().After(a.exp) 334} 335 336func (a *Authenticator) cert() (*tls.Certificate, error) { 337 creds, err := a.getCreds() 338 if err != nil { 339 return nil, err 340 } 341 return creds.cert, nil 342} 343 344func (a *Authenticator) getCreds() (*credentials, error) { 345 a.mu.Lock() 346 defer a.mu.Unlock() 347 348 if a.cachedCreds != nil && !a.credsExpired() { 349 return a.cachedCreds, nil 350 } 351 352 if err := a.refreshCredsLocked(nil); err != nil { 353 return nil, err 354 } 355 356 return a.cachedCreds, nil 357} 358 359// maybeRefreshCreds executes the plugin to force a rotation of the 360// credentials, unless they were rotated already. 361func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error { 362 a.mu.Lock() 363 defer a.mu.Unlock() 364 365 // Since we're not making a new pointer to a.cachedCreds in getCreds, no 366 // need to do deep comparison. 367 if creds != a.cachedCreds { 368 // Credentials already rotated. 369 return nil 370 } 371 372 return a.refreshCredsLocked(r) 373} 374 375// refreshCredsLocked executes the plugin and reads the credentials from 376// stdout. It must be called while holding the Authenticator's mutex. 377func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error { 378 cred := &clientauthentication.ExecCredential{ 379 Spec: clientauthentication.ExecCredentialSpec{ 380 Response: r, 381 Interactive: a.interactive, 382 }, 383 } 384 if a.provideClusterInfo { 385 cred.Spec.Cluster = a.cluster 386 } 387 388 env := append(a.environ(), a.env...) 389 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred) 390 if err != nil { 391 return fmt.Errorf("encode ExecCredentials: %v", err) 392 } 393 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data)) 394 395 stdout := &bytes.Buffer{} 396 cmd := exec.Command(a.cmd, a.args...) 397 cmd.Env = env 398 cmd.Stderr = a.stderr 399 cmd.Stdout = stdout 400 if a.interactive { 401 cmd.Stdin = a.stdin 402 } 403 404 err = cmd.Run() 405 incrementCallsMetric(err) 406 if err != nil { 407 return a.wrapCmdRunErrorLocked(err) 408 } 409 410 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred) 411 if err != nil { 412 return fmt.Errorf("decoding stdout: %v", err) 413 } 414 if gvk.Group != a.group.Group || gvk.Version != a.group.Version { 415 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s", 416 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version}) 417 } 418 419 if cred.Status == nil { 420 return fmt.Errorf("exec plugin didn't return a status field") 421 } 422 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" { 423 return fmt.Errorf("exec plugin didn't return a token or cert/key pair") 424 } 425 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") { 426 return fmt.Errorf("exec plugin returned only certificate or key, not both") 427 } 428 429 if cred.Status.ExpirationTimestamp != nil { 430 a.exp = cred.Status.ExpirationTimestamp.Time 431 } else { 432 a.exp = time.Time{} 433 } 434 435 newCreds := &credentials{ 436 token: cred.Status.Token, 437 } 438 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" { 439 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData)) 440 if err != nil { 441 return fmt.Errorf("failed parsing client key/certificate: %v", err) 442 } 443 444 // Leaf is initialized to be nil: 445 // https://golang.org/pkg/crypto/tls/#X509KeyPair 446 // Leaf certificate is the first certificate: 447 // https://golang.org/pkg/crypto/tls/#Certificate 448 // Populating leaf is useful for quickly accessing the underlying x509 449 // certificate values. 450 cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) 451 if err != nil { 452 return fmt.Errorf("failed parsing client leaf certificate: %v", err) 453 } 454 newCreds.cert = &cert 455 } 456 457 oldCreds := a.cachedCreds 458 a.cachedCreds = newCreds 459 // Only close all connections when TLS cert rotates. Token rotation doesn't 460 // need the extra noise. 461 if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) { 462 // Can be nil if the exec auth plugin only returned token auth. 463 if oldCreds.cert != nil && oldCreds.cert.Leaf != nil { 464 metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore)) 465 } 466 a.connTracker.CloseAll() 467 } 468 469 expiry := time.Time{} 470 if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil { 471 expiry = a.cachedCreds.cert.Leaf.NotAfter 472 } 473 expirationMetrics.set(a, expiry) 474 return nil 475} 476 477// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message 478// for when the exec plugin's binary fails to Run(). 479// 480// It must be called while holding the Authenticator's mutex. 481func (a *Authenticator) wrapCmdRunErrorLocked(err error) error { 482 switch err.(type) { 483 case *exec.Error: // Binary does not exist (see exec.Error). 484 builder := strings.Builder{} 485 fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd) 486 487 a.sometimes.Do(func() { 488 fmt.Fprint(&builder, installHintVerboseHelp) 489 if a.installHint != "" { 490 fmt.Fprintf(&builder, "\n\n%s", a.installHint) 491 } 492 }) 493 494 return errors.New(builder.String()) 495 496 case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()). 497 e := err.(*exec.ExitError) 498 return fmt.Errorf( 499 "exec: executable %s failed with exit code %d", 500 a.cmd, 501 e.ProcessState.ExitCode(), 502 ) 503 504 default: 505 return fmt.Errorf("exec: %v", err) 506 } 507} 508