1// Copyright 2019 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package google 16 17import ( 18 "bytes" 19 "crypto/tls" 20 "crypto/x509" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "io/ioutil" 25 "net/http" 26 "net/http/httputil" 27 "sync" 28 "time" 29 30 "github.com/golang/protobuf/ptypes/duration" 31 32 "istio.io/istio/security/pkg/stsservice" 33 "istio.io/pkg/log" 34) 35 36const ( 37 httpTimeOutInSec = 5 38 maxRequestRetry = 5 39 cacheHitDivisor = 50 40 contentType = "application/json" 41 scope = "https://www.googleapis.com/auth/cloud-platform" 42 tokenType = "urn:ietf:params:oauth:token-type:access_token" 43 federatedToken = "federated token" 44 accessToken = "access token" 45) 46 47var ( 48 pluginLog = log.RegisterScope("token", "token manager plugin debugging", 0) 49 federatedTokenEndpoint = "https://securetoken.googleapis.com/v1/identitybindingtoken" 50 accessTokenEndpoint = "https://iamcredentials.googleapis.com/v1/projects/-/" + 51 "serviceAccounts/service-%s@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken" 52 // default grace period in seconds of an access token. If caching is enabled and token remaining life time is 53 // within this period, refresh access token. 54 defaultGracePeriod = 300 55) 56 57// Plugin supports token exchange with Google OAuth 2.0 authorization server. 58type Plugin struct { 59 httpClient *http.Client 60 trustDomain string 61 // tokens is the cache for fetched tokens. 62 // map key is token type, map value is tokenInfo. 63 tokens sync.Map 64 gcpProjectNumber string 65 gkeClusterURL string 66 enableCache bool 67 68 // Counts numbers of access token cache hits. 69 mutex sync.RWMutex 70 accessTokenCacheHit uint64 71} 72 73// CreateTokenManagerPlugin creates a plugin that fetches token from a Google OAuth 2.0 authorization server. 74func CreateTokenManagerPlugin(trustDomain, gcpProjectNumber, gkeClusterURL string, enableCache bool) (*Plugin, error) { 75 caCertPool, err := x509.SystemCertPool() 76 if err != nil { 77 pluginLog.Errorf("Failed to get SystemCertPool: %v", err) 78 return nil, err 79 } 80 p := &Plugin{ 81 httpClient: &http.Client{ 82 Timeout: httpTimeOutInSec * time.Second, 83 Transport: &http.Transport{ 84 TLSClientConfig: &tls.Config{ 85 RootCAs: caCertPool, 86 }, 87 }, 88 }, 89 trustDomain: trustDomain, 90 gcpProjectNumber: gcpProjectNumber, 91 gkeClusterURL: gkeClusterURL, 92 enableCache: enableCache, 93 } 94 return p, nil 95} 96 97type federatedTokenResponse struct { 98 AccessToken string `json:"access_token"` 99 IssuedTokenType string `json:"issued_token_type"` 100 TokenType string `json:"token_type"` 101 ExpiresIn int64 `json:"expires_in"` // Expiration time in seconds 102} 103 104// GenerateToken takes STS request parameters and fetches token, returns StsResponseParameters in JSON. 105func (p *Plugin) ExchangeToken(parameters stsservice.StsRequestParameters) ([]byte, error) { 106 if tokenSTS, ok := p.useCachedToken(); ok { 107 return tokenSTS, nil 108 } 109 pluginLog.Debugf("Start to fetch token with STS request parameters: %v", parameters) 110 ftResp, err := p.fetchFederatedToken(parameters) 111 if err != nil { 112 return nil, err 113 } 114 atResp, err := p.fetchAccessToken(ftResp) 115 if err != nil { 116 return nil, err 117 } 118 return p.generateSTSResp(atResp) 119} 120 121// useCachedToken checks if there is a cached access token which is not going to expire soon. Returns 122// cached token in STS response or false if token is not available. 123func (p *Plugin) useCachedToken() ([]byte, bool) { 124 if !p.enableCache { 125 return nil, false 126 } 127 v, ok := p.tokens.Load(accessToken) 128 if !ok { 129 return nil, false 130 } 131 132 var cacheHitCount uint64 133 p.mutex.Lock() 134 p.accessTokenCacheHit++ 135 cacheHitCount = p.accessTokenCacheHit 136 p.mutex.Unlock() 137 138 token := v.(stsservice.TokenInfo) 139 remainingLife := time.Until(token.ExpireTime) 140 if cacheHitCount%cacheHitDivisor == 0 { 141 pluginLog.Debugf("find a cached access token with remaining lifetime: %s (number of cache hits: %d)", 142 remainingLife.String(), cacheHitCount) 143 } 144 if remainingLife > time.Duration(defaultGracePeriod)*time.Second { 145 expireInSec := int64(remainingLife.Seconds()) 146 if tokenSTS, err := p.generateSTSRespInner(token.Token, expireInSec); err == nil { 147 if cacheHitCount%cacheHitDivisor == 0 { 148 pluginLog.Debugf("generated an STS response using a cached access token") 149 } 150 return tokenSTS, true 151 } 152 } 153 return nil, false 154} 155 156// constructFederatedTokenRequest returns an HTTP request for federated token. 157// Example of a federated token request: 158// POST https://securetoken.googleapis.com/v1/identitybindingtoken 159// Content-Type: application/json 160// { 161// audience: <trust domain> 162// grantType: urn:ietf:params:oauth:grant-type:token-exchange 163// requestedTokenType: urn:ietf:params:oauth:token-type:access_token 164// subjectTokenType: urn:ietf:params:oauth:token-type:jwt 165// subjectToken: <jwt token> 166// Scope: https://www.googleapis.com/auth/cloud-platform 167// } 168func (p *Plugin) constructFederatedTokenRequest(parameters stsservice.StsRequestParameters) (*http.Request, error) { 169 reqScope := scope 170 if len(parameters.Scope) != 0 { 171 reqScope = parameters.Scope 172 } 173 aud := fmt.Sprintf("identitynamespace:%s:%s", p.trustDomain, p.gkeClusterURL) 174 query := map[string]string{ 175 "audience": aud, 176 "grantType": parameters.GrantType, 177 "requestedTokenType": tokenType, 178 "subjectTokenType": parameters.SubjectTokenType, 179 "subjectToken": parameters.SubjectToken, 180 "scope": reqScope, 181 } 182 jsonQuery, err := json.Marshal(query) 183 if err != nil { 184 return nil, fmt.Errorf("failed to marshal query for get federated token request: %+v", err) 185 } 186 req, err := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(jsonQuery)) 187 if err != nil { 188 return req, fmt.Errorf("failed to create get federated token request: %+v", err) 189 } 190 req.Header.Set("Content-Type", contentType) 191 if pluginLog.DebugEnabled() { 192 dQuery := map[string]string{ 193 "audience": aud, 194 "grantType": parameters.GrantType, 195 "requestedTokenType": tokenType, 196 "subjectTokenType": parameters.SubjectTokenType, 197 "subjectToken": "redacted", 198 "scope": reqScope, 199 } 200 dJSONQuery, _ := json.Marshal(dQuery) 201 dReq, _ := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(dJSONQuery)) 202 dReq.Header.Set("Content-Type", contentType) 203 reqDump, _ := httputil.DumpRequest(dReq, true) 204 pluginLog.Debugf("Prepared federated token request: \n%s", string(reqDump)) 205 } else { 206 pluginLog.Info("Prepared federated token request") 207 } 208 return req, nil 209} 210 211// fetchFederatedToken exchanges a third-party issued Json Web Token for an OAuth2.0 access token 212// which asserts a third-party identity within an identity namespace. 213func (p *Plugin) fetchFederatedToken(parameters stsservice.StsRequestParameters) (*federatedTokenResponse, error) { 214 respData := &federatedTokenResponse{} 215 216 req, err := p.constructFederatedTokenRequest(parameters) 217 if err != nil { 218 pluginLog.Errorf("failed to create get federated token request: %+v", err) 219 return nil, err 220 } 221 resp, timeElapsed, err := p.sendRequestWithRetry(req) 222 if err != nil { 223 respCode := 0 224 if resp != nil { 225 respCode = resp.StatusCode 226 } 227 pluginLog.Errorf("Failed to exchange federated token (HTTP status %d, total time elapsed %s): %v", 228 respCode, timeElapsed.String(), err) 229 return nil, fmt.Errorf("failed to exchange federated token (HTTP status %d): %v", respCode, 230 err) 231 } 232 // resp should not be nil. 233 defer resp.Body.Close() 234 235 if pluginLog.DebugEnabled() { 236 respDump, _ := httputil.DumpResponse(resp, false) 237 pluginLog.Debugf("Received federated token response after %s: \n%s", 238 timeElapsed.String(), string(respDump)) 239 } else { 240 pluginLog.Infof("Received federated token response after %s", timeElapsed.String()) 241 } 242 243 body, err := ioutil.ReadAll(resp.Body) 244 if err != nil { 245 pluginLog.Errorf("Failed to read federated token response body: %+v", err) 246 return respData, fmt.Errorf("failed to read federated token response body: %+v", err) 247 } 248 if err := json.Unmarshal(body, respData); err != nil { 249 pluginLog.Errorf("Failed to unmarshal federated token response data: %v", err) 250 return respData, fmt.Errorf("failed to unmarshal federated token response data: %v", err) 251 } 252 if respData.AccessToken == "" { 253 pluginLog.Errora("federated token response does not have access token", string(body)) 254 return respData, errors.New("federated token response does not have access token. " + string(body)) 255 } 256 pluginLog.Infof("Federated token will expire in %d seconds", respData.ExpiresIn) 257 tokenReceivedTime := time.Now() 258 p.tokens.Store(federatedToken, stsservice.TokenInfo{ 259 TokenType: federatedToken, 260 IssueTime: tokenReceivedTime, 261 ExpireTime: tokenReceivedTime.Add(time.Duration(respData.ExpiresIn) * time.Second)}) 262 return respData, nil 263} 264 265// Send HTTP request every 0.01 seconds until successfully receive response or hit max retry numbers. 266// If response code is 4xx, return immediately without retry. 267func (p *Plugin) sendRequestWithRetry(req *http.Request) (resp *http.Response, elapsedTime time.Duration, err error) { 268 start := time.Now() 269 for i := 0; i < maxRequestRetry; i++ { 270 resp, err = p.httpClient.Do(req) 271 if err != nil { 272 pluginLog.Errorf("failed to send out request: %v (response: %v)", err, resp) 273 } 274 if resp != nil && resp.StatusCode == http.StatusOK { 275 return resp, time.Since(start), err 276 } 277 if resp != nil && resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError { 278 return resp, time.Since(start), err 279 } 280 time.Sleep(10 * time.Millisecond) 281 } 282 if resp != nil && resp.StatusCode != http.StatusOK { 283 bodyBytes, _ := ioutil.ReadAll(resp.Body) 284 defer resp.Body.Close() 285 return resp, time.Since(start), fmt.Errorf("HTTP Status %d, body: %s", resp.StatusCode, string(bodyBytes)) 286 } 287 return resp, time.Since(start), err 288} 289 290type accessTokenRequest struct { 291 Name string `json:"name"` // nolint: structcheck, unused 292 Delegates []string `json:"delegates"` 293 Scope []string `json:"scope"` 294 LifeTime duration.Duration `json:"lifetime"` // nolint: structcheck, unused 295} 296 297type accessTokenResponse struct { 298 AccessToken string `json:"accessToken"` 299 ExpireTime string `json:"expireTime"` 300} 301 302// constructFederatedTokenRequest returns an HTTP request for access token. 303// Example of an access token request: 304// POST https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/ 305// service-<GCP project number>@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken 306// Content-Type: application/json 307// Authorization: Bearer <federated token> 308// { 309// "Delegates": [], 310// "Scope": [ 311// https://www.googleapis.com/auth/cloud-platform 312// ], 313// } 314func (p *Plugin) constructGenerateAccessTokenRequest(fResp *federatedTokenResponse) (*http.Request, error) { 315 // Request for access token with a lifetime of 3600 seconds. 316 query := accessTokenRequest{ 317 LifeTime: duration.Duration{Seconds: 3600}, 318 } 319 query.Scope = append(query.Scope, scope) 320 321 jsonQuery, err := json.Marshal(query) 322 if err != nil { 323 return nil, fmt.Errorf("failed to marshal query for get access token request: %+v", err) 324 } 325 endpoint := fmt.Sprintf(accessTokenEndpoint, p.gcpProjectNumber) 326 req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(jsonQuery)) 327 if err != nil { 328 return nil, fmt.Errorf("failed to create get access token request: %+v", err) 329 } 330 req.Header.Add("Content-Type", contentType) 331 if pluginLog.DebugEnabled() { 332 reqDump, _ := httputil.DumpRequest(req, true) 333 pluginLog.Debugf("Prepared access token request: \n%s", string(reqDump)) 334 } else { 335 pluginLog.Info("Prepared access token request") 336 } 337 req.Header.Add("Authorization", "Bearer "+fResp.AccessToken) 338 return req, nil 339} 340 341func (p *Plugin) fetchAccessToken(federatedToken *federatedTokenResponse) (*accessTokenResponse, error) { 342 respData := &accessTokenResponse{} 343 344 req, err := p.constructGenerateAccessTokenRequest(federatedToken) 345 if err != nil { 346 pluginLog.Errorf("failed to create get access token request: %+v", err) 347 return nil, err 348 } 349 resp, timeElapsed, err := p.sendRequestWithRetry(req) 350 if err != nil { 351 respCode := 0 352 if resp != nil { 353 respCode = resp.StatusCode 354 } 355 pluginLog.Errorf("failed to exchange access token (HTTP status %d, total time elapsed %s): %v", 356 respCode, timeElapsed.String(), err) 357 return respData, fmt.Errorf("failed to exchange access token (HTTP status %d): %v", respCode, err) 358 } 359 defer resp.Body.Close() 360 361 if pluginLog.DebugEnabled() { 362 respDump, _ := httputil.DumpResponse(resp, false) 363 pluginLog.Debugf("Received access token response after %s: \n%s", 364 timeElapsed.String(), string(respDump)) 365 } else { 366 pluginLog.Infof("Received access token response after %s", timeElapsed.String()) 367 } 368 369 body, err := ioutil.ReadAll(resp.Body) 370 if err != nil { 371 pluginLog.Errorf("Failed to read access token response body: %+v", err) 372 return respData, fmt.Errorf("failed to read access token response body: %+v", err) 373 } 374 if err := json.Unmarshal(body, respData); err != nil { 375 pluginLog.Errorf("Failed to unmarshal access token response data: %v", err) 376 return respData, fmt.Errorf("failed to unmarshal access token response data: %v", err) 377 } 378 if respData.AccessToken == "" { 379 pluginLog.Errora("access token response does not have access token", string(body)) 380 return respData, errors.New("access token response does not have access token. " + string(body)) 381 } 382 pluginLog.Debug("successfully exchanged an access token") 383 // Store access token 384 // Default token life time is 3600 seconds. 385 tokenExp := time.Now().Add(3600 * time.Second) 386 exp, err := time.Parse(time.RFC3339Nano, respData.ExpireTime) 387 if err != nil { 388 pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+ 389 "fall back to use default lifetime (3600 seconds): %v", respData.ExpireTime, err) 390 } else { 391 tokenExp = exp 392 } 393 // Update cache and reset cache hit counter. 394 p.tokens.Store(accessToken, stsservice.TokenInfo{ 395 TokenType: accessToken, 396 IssueTime: time.Now(), 397 ExpireTime: tokenExp, 398 Token: respData.AccessToken}) 399 p.mutex.Lock() 400 p.accessTokenCacheHit = 0 401 p.mutex.Unlock() 402 return respData, nil 403} 404 405// generateSTSResp takes accessTokenResponse and generates StsResponseParameters in JSON. 406func (p *Plugin) generateSTSResp(atResp *accessTokenResponse) ([]byte, error) { 407 exp, err := time.Parse(time.RFC3339Nano, atResp.ExpireTime) 408 // Default token life time is 3600 seconds 409 var expireInSec int64 = 3600 410 if err != nil { 411 pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+ 412 "fall back to use default lifetime (3600 seconds): %v", atResp.ExpireTime, err) 413 } else { 414 expireInSec = int64(time.Until(exp).Seconds()) 415 } 416 return p.generateSTSRespInner(atResp.AccessToken, expireInSec) 417} 418 419func (p *Plugin) generateSTSRespInner(token string, expire int64) ([]byte, error) { 420 stsRespParam := stsservice.StsResponseParameters{ 421 AccessToken: token, 422 IssuedTokenType: tokenType, 423 TokenType: "Bearer", 424 ExpiresIn: expire, 425 } 426 statusJSON, err := json.MarshalIndent(stsRespParam, "", " ") 427 if pluginLog.DebugEnabled() { 428 stsRespParam.AccessToken = "redacted" 429 pluginLog.Infof("Populated STS response parameters: %+v", stsRespParam) 430 } 431 return statusJSON, err 432} 433 434// DumpTokenStatus dumps all token status in JSON 435func (p *Plugin) DumpPluginStatus() ([]byte, error) { 436 tokenStatus := make([]stsservice.TokenInfo, 0) 437 p.tokens.Range(func(k interface{}, v interface{}) bool { 438 token := v.(stsservice.TokenInfo) 439 tokenStatus = append(tokenStatus, stsservice.TokenInfo{ 440 TokenType: token.TokenType, IssueTime: token.IssueTime, ExpireTime: token.ExpireTime}) 441 return true 442 }) 443 td := stsservice.TokensDump{ 444 Tokens: tokenStatus, 445 } 446 statusJSON, err := json.MarshalIndent(td, "", " ") 447 return statusJSON, err 448} 449 450// SetEndpoints changes the endpoints for testing purposes only. 451func (p *Plugin) SetEndpoints(fTokenEndpoint, aTokenEndpoint string) { 452 federatedTokenEndpoint = fTokenEndpoint 453 accessTokenEndpoint = aTokenEndpoint 454} 455 456// ClearCache is only used for testing purposes. 457func (p *Plugin) ClearCache() { 458 p.tokens.Delete(federatedToken) 459 p.tokens.Delete(accessToken) 460} 461