1// Copyright 2019 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package google
16
17import (
18	"bytes"
19	"crypto/tls"
20	"crypto/x509"
21	"encoding/json"
22	"errors"
23	"fmt"
24	"io/ioutil"
25	"net/http"
26	"net/http/httputil"
27	"sync"
28	"time"
29
30	"github.com/golang/protobuf/ptypes/duration"
31
32	"istio.io/istio/security/pkg/stsservice"
33	"istio.io/pkg/log"
34)
35
36const (
37	httpTimeOutInSec = 5
38	maxRequestRetry  = 5
39	cacheHitDivisor  = 50
40	contentType      = "application/json"
41	scope            = "https://www.googleapis.com/auth/cloud-platform"
42	tokenType        = "urn:ietf:params:oauth:token-type:access_token"
43	federatedToken   = "federated token"
44	accessToken      = "access token"
45)
46
47var (
48	pluginLog              = log.RegisterScope("token", "token manager plugin debugging", 0)
49	federatedTokenEndpoint = "https://securetoken.googleapis.com/v1/identitybindingtoken"
50	accessTokenEndpoint    = "https://iamcredentials.googleapis.com/v1/projects/-/" +
51		"serviceAccounts/service-%s@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken"
52	// default grace period in seconds of an access token. If caching is enabled and token remaining life time is
53	// within this period, refresh access token.
54	defaultGracePeriod = 300
55)
56
57// Plugin supports token exchange with Google OAuth 2.0 authorization server.
58type Plugin struct {
59	httpClient  *http.Client
60	trustDomain string
61	// tokens is the cache for fetched tokens.
62	// map key is token type, map value is tokenInfo.
63	tokens           sync.Map
64	gcpProjectNumber string
65	gkeClusterURL    string
66	enableCache      bool
67
68	// Counts numbers of access token cache hits.
69	mutex               sync.RWMutex
70	accessTokenCacheHit uint64
71}
72
73// CreateTokenManagerPlugin creates a plugin that fetches token from a Google OAuth 2.0 authorization server.
74func CreateTokenManagerPlugin(trustDomain, gcpProjectNumber, gkeClusterURL string, enableCache bool) (*Plugin, error) {
75	caCertPool, err := x509.SystemCertPool()
76	if err != nil {
77		pluginLog.Errorf("Failed to get SystemCertPool: %v", err)
78		return nil, err
79	}
80	p := &Plugin{
81		httpClient: &http.Client{
82			Timeout: httpTimeOutInSec * time.Second,
83			Transport: &http.Transport{
84				TLSClientConfig: &tls.Config{
85					RootCAs: caCertPool,
86				},
87			},
88		},
89		trustDomain:      trustDomain,
90		gcpProjectNumber: gcpProjectNumber,
91		gkeClusterURL:    gkeClusterURL,
92		enableCache:      enableCache,
93	}
94	return p, nil
95}
96
97type federatedTokenResponse struct {
98	AccessToken     string `json:"access_token"`
99	IssuedTokenType string `json:"issued_token_type"`
100	TokenType       string `json:"token_type"`
101	ExpiresIn       int64  `json:"expires_in"` // Expiration time in seconds
102}
103
104// GenerateToken takes STS request parameters and fetches token, returns StsResponseParameters in JSON.
105func (p *Plugin) ExchangeToken(parameters stsservice.StsRequestParameters) ([]byte, error) {
106	if tokenSTS, ok := p.useCachedToken(); ok {
107		return tokenSTS, nil
108	}
109	pluginLog.Debugf("Start to fetch token with STS request parameters: %v", parameters)
110	ftResp, err := p.fetchFederatedToken(parameters)
111	if err != nil {
112		return nil, err
113	}
114	atResp, err := p.fetchAccessToken(ftResp)
115	if err != nil {
116		return nil, err
117	}
118	return p.generateSTSResp(atResp)
119}
120
121// useCachedToken checks if there is a cached access token which is not going to expire soon. Returns
122// cached token in STS response or false if token is not available.
123func (p *Plugin) useCachedToken() ([]byte, bool) {
124	if !p.enableCache {
125		return nil, false
126	}
127	v, ok := p.tokens.Load(accessToken)
128	if !ok {
129		return nil, false
130	}
131
132	var cacheHitCount uint64
133	p.mutex.Lock()
134	p.accessTokenCacheHit++
135	cacheHitCount = p.accessTokenCacheHit
136	p.mutex.Unlock()
137
138	token := v.(stsservice.TokenInfo)
139	remainingLife := time.Until(token.ExpireTime)
140	if cacheHitCount%cacheHitDivisor == 0 {
141		pluginLog.Debugf("find a cached access token with remaining lifetime: %s (number of cache hits: %d)",
142			remainingLife.String(), cacheHitCount)
143	}
144	if remainingLife > time.Duration(defaultGracePeriod)*time.Second {
145		expireInSec := int64(remainingLife.Seconds())
146		if tokenSTS, err := p.generateSTSRespInner(token.Token, expireInSec); err == nil {
147			if cacheHitCount%cacheHitDivisor == 0 {
148				pluginLog.Debugf("generated an STS response using a cached access token")
149			}
150			return tokenSTS, true
151		}
152	}
153	return nil, false
154}
155
156// constructFederatedTokenRequest returns an HTTP request for federated token.
157// Example of a federated token request:
158// POST https://securetoken.googleapis.com/v1/identitybindingtoken
159// Content-Type: application/json
160// {
161//    audience: <trust domain>
162//    grantType: urn:ietf:params:oauth:grant-type:token-exchange
163//    requestedTokenType: urn:ietf:params:oauth:token-type:access_token
164//    subjectTokenType: urn:ietf:params:oauth:token-type:jwt
165//    subjectToken: <jwt token>
166//    Scope: https://www.googleapis.com/auth/cloud-platform
167// }
168func (p *Plugin) constructFederatedTokenRequest(parameters stsservice.StsRequestParameters) (*http.Request, error) {
169	reqScope := scope
170	if len(parameters.Scope) != 0 {
171		reqScope = parameters.Scope
172	}
173	aud := fmt.Sprintf("identitynamespace:%s:%s", p.trustDomain, p.gkeClusterURL)
174	query := map[string]string{
175		"audience":           aud,
176		"grantType":          parameters.GrantType,
177		"requestedTokenType": tokenType,
178		"subjectTokenType":   parameters.SubjectTokenType,
179		"subjectToken":       parameters.SubjectToken,
180		"scope":              reqScope,
181	}
182	jsonQuery, err := json.Marshal(query)
183	if err != nil {
184		return nil, fmt.Errorf("failed to marshal query for get federated token request: %+v", err)
185	}
186	req, err := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(jsonQuery))
187	if err != nil {
188		return req, fmt.Errorf("failed to create get federated token request: %+v", err)
189	}
190	req.Header.Set("Content-Type", contentType)
191	if pluginLog.DebugEnabled() {
192		dQuery := map[string]string{
193			"audience":           aud,
194			"grantType":          parameters.GrantType,
195			"requestedTokenType": tokenType,
196			"subjectTokenType":   parameters.SubjectTokenType,
197			"subjectToken":       "redacted",
198			"scope":              reqScope,
199		}
200		dJSONQuery, _ := json.Marshal(dQuery)
201		dReq, _ := http.NewRequest("POST", federatedTokenEndpoint, bytes.NewBuffer(dJSONQuery))
202		dReq.Header.Set("Content-Type", contentType)
203		reqDump, _ := httputil.DumpRequest(dReq, true)
204		pluginLog.Debugf("Prepared federated token request: \n%s", string(reqDump))
205	} else {
206		pluginLog.Info("Prepared federated token request")
207	}
208	return req, nil
209}
210
211// fetchFederatedToken exchanges a third-party issued Json Web Token for an OAuth2.0 access token
212// which asserts a third-party identity within an identity namespace.
213func (p *Plugin) fetchFederatedToken(parameters stsservice.StsRequestParameters) (*federatedTokenResponse, error) {
214	respData := &federatedTokenResponse{}
215
216	req, err := p.constructFederatedTokenRequest(parameters)
217	if err != nil {
218		pluginLog.Errorf("failed to create get federated token request: %+v", err)
219		return nil, err
220	}
221	resp, timeElapsed, err := p.sendRequestWithRetry(req)
222	if err != nil {
223		respCode := 0
224		if resp != nil {
225			respCode = resp.StatusCode
226		}
227		pluginLog.Errorf("Failed to exchange federated token (HTTP status %d, total time elapsed %s): %v",
228			respCode, timeElapsed.String(), err)
229		return nil, fmt.Errorf("failed to exchange federated token (HTTP status %d): %v", respCode,
230			err)
231	}
232	// resp should not be nil.
233	defer resp.Body.Close()
234
235	if pluginLog.DebugEnabled() {
236		respDump, _ := httputil.DumpResponse(resp, false)
237		pluginLog.Debugf("Received federated token response after %s: \n%s",
238			timeElapsed.String(), string(respDump))
239	} else {
240		pluginLog.Infof("Received federated token response after %s", timeElapsed.String())
241	}
242
243	body, err := ioutil.ReadAll(resp.Body)
244	if err != nil {
245		pluginLog.Errorf("Failed to read federated token response body: %+v", err)
246		return respData, fmt.Errorf("failed to read federated token response body: %+v", err)
247	}
248	if err := json.Unmarshal(body, respData); err != nil {
249		pluginLog.Errorf("Failed to unmarshal federated token response data: %v", err)
250		return respData, fmt.Errorf("failed to unmarshal federated token response data: %v", err)
251	}
252	if respData.AccessToken == "" {
253		pluginLog.Errora("federated token response does not have access token", string(body))
254		return respData, errors.New("federated token response does not have access token. " + string(body))
255	}
256	pluginLog.Infof("Federated token will expire in %d seconds", respData.ExpiresIn)
257	tokenReceivedTime := time.Now()
258	p.tokens.Store(federatedToken, stsservice.TokenInfo{
259		TokenType:  federatedToken,
260		IssueTime:  tokenReceivedTime,
261		ExpireTime: tokenReceivedTime.Add(time.Duration(respData.ExpiresIn) * time.Second)})
262	return respData, nil
263}
264
265// Send HTTP request every 0.01 seconds until successfully receive response or hit max retry numbers.
266// If response code is 4xx, return immediately without retry.
267func (p *Plugin) sendRequestWithRetry(req *http.Request) (resp *http.Response, elapsedTime time.Duration, err error) {
268	start := time.Now()
269	for i := 0; i < maxRequestRetry; i++ {
270		resp, err = p.httpClient.Do(req)
271		if err != nil {
272			pluginLog.Errorf("failed to send out request: %v (response: %v)", err, resp)
273		}
274		if resp != nil && resp.StatusCode == http.StatusOK {
275			return resp, time.Since(start), err
276		}
277		if resp != nil && resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError {
278			return resp, time.Since(start), err
279		}
280		time.Sleep(10 * time.Millisecond)
281	}
282	if resp != nil && resp.StatusCode != http.StatusOK {
283		bodyBytes, _ := ioutil.ReadAll(resp.Body)
284		defer resp.Body.Close()
285		return resp, time.Since(start), fmt.Errorf("HTTP Status %d, body: %s", resp.StatusCode, string(bodyBytes))
286	}
287	return resp, time.Since(start), err
288}
289
290type accessTokenRequest struct {
291	Name      string            `json:"name"` // nolint: structcheck, unused
292	Delegates []string          `json:"delegates"`
293	Scope     []string          `json:"scope"`
294	LifeTime  duration.Duration `json:"lifetime"` // nolint: structcheck, unused
295}
296
297type accessTokenResponse struct {
298	AccessToken string `json:"accessToken"`
299	ExpireTime  string `json:"expireTime"`
300}
301
302// constructFederatedTokenRequest returns an HTTP request for access token.
303// Example of an access token request:
304// POST https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/
305// service-<GCP project number>@gcp-sa-meshdataplane.iam.gserviceaccount.com:generateAccessToken
306// Content-Type: application/json
307// Authorization: Bearer <federated token>
308// {
309//  "Delegates": [],
310//  "Scope": [
311//      https://www.googleapis.com/auth/cloud-platform
312//  ],
313// }
314func (p *Plugin) constructGenerateAccessTokenRequest(fResp *federatedTokenResponse) (*http.Request, error) {
315	// Request for access token with a lifetime of 3600 seconds.
316	query := accessTokenRequest{
317		LifeTime: duration.Duration{Seconds: 3600},
318	}
319	query.Scope = append(query.Scope, scope)
320
321	jsonQuery, err := json.Marshal(query)
322	if err != nil {
323		return nil, fmt.Errorf("failed to marshal query for get access token request: %+v", err)
324	}
325	endpoint := fmt.Sprintf(accessTokenEndpoint, p.gcpProjectNumber)
326	req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(jsonQuery))
327	if err != nil {
328		return nil, fmt.Errorf("failed to create get access token request: %+v", err)
329	}
330	req.Header.Add("Content-Type", contentType)
331	if pluginLog.DebugEnabled() {
332		reqDump, _ := httputil.DumpRequest(req, true)
333		pluginLog.Debugf("Prepared access token request: \n%s", string(reqDump))
334	} else {
335		pluginLog.Info("Prepared access token request")
336	}
337	req.Header.Add("Authorization", "Bearer "+fResp.AccessToken)
338	return req, nil
339}
340
341func (p *Plugin) fetchAccessToken(federatedToken *federatedTokenResponse) (*accessTokenResponse, error) {
342	respData := &accessTokenResponse{}
343
344	req, err := p.constructGenerateAccessTokenRequest(federatedToken)
345	if err != nil {
346		pluginLog.Errorf("failed to create get access token request: %+v", err)
347		return nil, err
348	}
349	resp, timeElapsed, err := p.sendRequestWithRetry(req)
350	if err != nil {
351		respCode := 0
352		if resp != nil {
353			respCode = resp.StatusCode
354		}
355		pluginLog.Errorf("failed to exchange access token (HTTP status %d, total time elapsed %s): %v",
356			respCode, timeElapsed.String(), err)
357		return respData, fmt.Errorf("failed to exchange access token (HTTP status %d): %v", respCode, err)
358	}
359	defer resp.Body.Close()
360
361	if pluginLog.DebugEnabled() {
362		respDump, _ := httputil.DumpResponse(resp, false)
363		pluginLog.Debugf("Received access token response after %s: \n%s",
364			timeElapsed.String(), string(respDump))
365	} else {
366		pluginLog.Infof("Received access token response after %s", timeElapsed.String())
367	}
368
369	body, err := ioutil.ReadAll(resp.Body)
370	if err != nil {
371		pluginLog.Errorf("Failed to read access token response body: %+v", err)
372		return respData, fmt.Errorf("failed to read access token response body: %+v", err)
373	}
374	if err := json.Unmarshal(body, respData); err != nil {
375		pluginLog.Errorf("Failed to unmarshal access token response data: %v", err)
376		return respData, fmt.Errorf("failed to unmarshal access token response data: %v", err)
377	}
378	if respData.AccessToken == "" {
379		pluginLog.Errora("access token response does not have access token", string(body))
380		return respData, errors.New("access token response does not have access token. " + string(body))
381	}
382	pluginLog.Debug("successfully exchanged an access token")
383	// Store access token
384	// Default token life time is 3600 seconds.
385	tokenExp := time.Now().Add(3600 * time.Second)
386	exp, err := time.Parse(time.RFC3339Nano, respData.ExpireTime)
387	if err != nil {
388		pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+
389			"fall back to use default lifetime (3600 seconds): %v", respData.ExpireTime, err)
390	} else {
391		tokenExp = exp
392	}
393	// Update cache and reset cache hit counter.
394	p.tokens.Store(accessToken, stsservice.TokenInfo{
395		TokenType:  accessToken,
396		IssueTime:  time.Now(),
397		ExpireTime: tokenExp,
398		Token:      respData.AccessToken})
399	p.mutex.Lock()
400	p.accessTokenCacheHit = 0
401	p.mutex.Unlock()
402	return respData, nil
403}
404
405// generateSTSResp takes accessTokenResponse and generates StsResponseParameters in JSON.
406func (p *Plugin) generateSTSResp(atResp *accessTokenResponse) ([]byte, error) {
407	exp, err := time.Parse(time.RFC3339Nano, atResp.ExpireTime)
408	// Default token life time is 3600 seconds
409	var expireInSec int64 = 3600
410	if err != nil {
411		pluginLog.Errorf("Failed to unmarshal timestamp %s from access token response, "+
412			"fall back to use default lifetime (3600 seconds): %v", atResp.ExpireTime, err)
413	} else {
414		expireInSec = int64(time.Until(exp).Seconds())
415	}
416	return p.generateSTSRespInner(atResp.AccessToken, expireInSec)
417}
418
419func (p *Plugin) generateSTSRespInner(token string, expire int64) ([]byte, error) {
420	stsRespParam := stsservice.StsResponseParameters{
421		AccessToken:     token,
422		IssuedTokenType: tokenType,
423		TokenType:       "Bearer",
424		ExpiresIn:       expire,
425	}
426	statusJSON, err := json.MarshalIndent(stsRespParam, "", " ")
427	if pluginLog.DebugEnabled() {
428		stsRespParam.AccessToken = "redacted"
429		pluginLog.Infof("Populated STS response parameters: %+v", stsRespParam)
430	}
431	return statusJSON, err
432}
433
434// DumpTokenStatus dumps all token status in JSON
435func (p *Plugin) DumpPluginStatus() ([]byte, error) {
436	tokenStatus := make([]stsservice.TokenInfo, 0)
437	p.tokens.Range(func(k interface{}, v interface{}) bool {
438		token := v.(stsservice.TokenInfo)
439		tokenStatus = append(tokenStatus, stsservice.TokenInfo{
440			TokenType: token.TokenType, IssueTime: token.IssueTime, ExpireTime: token.ExpireTime})
441		return true
442	})
443	td := stsservice.TokensDump{
444		Tokens: tokenStatus,
445	}
446	statusJSON, err := json.MarshalIndent(td, "", " ")
447	return statusJSON, err
448}
449
450// SetEndpoints changes the endpoints for testing purposes only.
451func (p *Plugin) SetEndpoints(fTokenEndpoint, aTokenEndpoint string) {
452	federatedTokenEndpoint = fTokenEndpoint
453	accessTokenEndpoint = aTokenEndpoint
454}
455
456// ClearCache is only used for testing purposes.
457func (p *Plugin) ClearCache() {
458	p.tokens.Delete(federatedToken)
459	p.tokens.Delete(accessToken)
460}
461