1package cachetype 2 3import ( 4 "crypto/sha256" 5 "errors" 6 "fmt" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/hashicorp/consul/agent/cache" 12 "github.com/hashicorp/consul/agent/connect" 13 "github.com/hashicorp/consul/agent/structs" 14) 15 16// Recommended name for registration. 17const ConnectCALeafName = "connect-ca-leaf" 18 19// ConnectCALeaf supports fetching and generating Connect leaf 20// certificates. 21type ConnectCALeaf struct { 22 caIndex uint64 // Current index for CA roots 23 24 issuedCertsLock sync.RWMutex 25 issuedCerts map[string]*structs.IssuedCert 26 27 RPC RPC // RPC client for remote requests 28 Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot 29} 30 31// issuedKey returns the issuedCerts cache key for a given service and token. We 32// use a hash rather than concatenating strings to provide resilience against 33// user input containing our separator - both service name and token ID can be 34// freely manipulated by user so may contain any delimiter we choose. It also 35// has the benefit of not leaking the ACL token to a new place in memory it 36// might get accidentally dumped etc. 37func issuedKey(service, token string) string { 38 hash := sha256.New() 39 hash.Write([]byte(service)) 40 hash.Write([]byte(token)) 41 return fmt.Sprintf("%x", hash.Sum(nil)) 42} 43 44func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { 45 var result cache.FetchResult 46 47 // Get the correct type 48 reqReal, ok := req.(*ConnectCALeafRequest) 49 if !ok { 50 return result, fmt.Errorf( 51 "Internal cache failure: request wrong type: %T", req) 52 } 53 54 // This channel watches our overall timeout. The other goroutines 55 // launched in this function should end all around the same time so 56 // they clean themselves up. 57 timeoutCh := time.After(opts.Timeout) 58 59 // Kick off the goroutine that waits for new CA roots. The channel buffer 60 // is so that the goroutine doesn't block forever if we return for other 61 // reasons. 62 newRootCACh := make(chan error, 1) 63 go c.waitNewRootCA(reqReal.Datacenter, newRootCACh, opts.Timeout) 64 65 // Generate a cache key to lookup/store the cert. We MUST generate a new cert 66 // per token used to ensure revocation by ACL token is robust. 67 issuedKey := issuedKey(reqReal.Service, reqReal.Token) 68 69 // Get our prior cert (if we had one) and use that to determine our 70 // expiration time. If no cert exists, we expire immediately since we 71 // need to generate. 72 c.issuedCertsLock.RLock() 73 lastCert := c.issuedCerts[issuedKey] 74 c.issuedCertsLock.RUnlock() 75 76 var leafExpiryCh <-chan time.Time 77 if lastCert != nil { 78 // Determine how long we wait until triggering. If we've already 79 // expired, we trigger immediately. 80 if expiryDur := lastCert.ValidBefore.Sub(time.Now()); expiryDur > 0 { 81 leafExpiryCh = time.After(expiryDur - 1*time.Hour) 82 // TODO(mitchellh): 1 hour buffer is hardcoded above 83 84 // We should not depend on the cache package de-duplicating requests for 85 // the same service/token (which is all we care about keying our local 86 // issued cert cache on) since it might later make sense to partition 87 // clients for other reasons too. So if the request has a 0 MinIndex, and 88 // the cached cert is still valid, then the client is expecting an 89 // immediate response and hasn't already seen the cached cert, return it 90 // now. 91 if opts.MinIndex == 0 { 92 result.Value = lastCert 93 result.Index = lastCert.ModifyIndex 94 return result, nil 95 } 96 } 97 } 98 99 if leafExpiryCh == nil { 100 // If the channel is still nil then it means we need to generate 101 // a cert no matter what: we either don't have an existing one or 102 // it is expired. 103 leafExpiryCh = time.After(0) 104 } 105 106 // Block on the events that wake us up. 107 select { 108 case <-timeoutCh: 109 // On a timeout, we just return the empty result and no error. 110 // It isn't an error to timeout, its just the limit of time the 111 // caching system wants us to block for. By returning an empty result 112 // the caching system will ignore. 113 return result, nil 114 115 case err := <-newRootCACh: 116 // A new root CA triggers us to refresh the leaf certificate. 117 // If there was an error while getting the root CA then we return. 118 // Otherwise, we leave the select statement and move to generation. 119 if err != nil { 120 return result, err 121 } 122 123 case <-leafExpiryCh: 124 // The existing leaf certificate is expiring soon, so we generate a 125 // new cert with a healthy overlapping validity period (determined 126 // by the above channel). 127 } 128 129 // Need to lookup RootCAs response to discover trust domain. First just lookup 130 // with no blocking info - this should be a cache hit most of the time. 131 rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{ 132 Datacenter: reqReal.Datacenter, 133 }) 134 if err != nil { 135 return result, err 136 } 137 roots, ok := rawRoots.(*structs.IndexedCARoots) 138 if !ok { 139 return result, errors.New("invalid RootCA response type") 140 } 141 if roots.TrustDomain == "" { 142 return result, errors.New("cluster has no CA bootstrapped yet") 143 } 144 145 // Build the service ID 146 serviceID := &connect.SpiffeIDService{ 147 Host: roots.TrustDomain, 148 Datacenter: reqReal.Datacenter, 149 Namespace: "default", 150 Service: reqReal.Service, 151 } 152 153 // Create a new private key 154 pk, pkPEM, err := connect.GeneratePrivateKey() 155 if err != nil { 156 return result, err 157 } 158 159 // Create a CSR. 160 csr, err := connect.CreateCSR(serviceID, pk) 161 if err != nil { 162 return result, err 163 } 164 165 // Request signing 166 var reply structs.IssuedCert 167 args := structs.CASignRequest{ 168 WriteRequest: structs.WriteRequest{Token: reqReal.Token}, 169 Datacenter: reqReal.Datacenter, 170 CSR: csr, 171 } 172 if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil { 173 return result, err 174 } 175 reply.PrivateKeyPEM = pkPEM 176 177 // Lock the issued certs map so we can insert it. We only insert if 178 // we didn't happen to get a newer one. This should never happen since 179 // the Cache should ensure only one Fetch per service, but we sanity 180 // check just in case. 181 c.issuedCertsLock.Lock() 182 defer c.issuedCertsLock.Unlock() 183 lastCert = c.issuedCerts[issuedKey] 184 if lastCert == nil || lastCert.ModifyIndex < reply.ModifyIndex { 185 if c.issuedCerts == nil { 186 c.issuedCerts = make(map[string]*structs.IssuedCert) 187 } 188 189 c.issuedCerts[issuedKey] = &reply 190 lastCert = &reply 191 } 192 193 result.Value = lastCert 194 result.Index = lastCert.ModifyIndex 195 return result, nil 196} 197 198// waitNewRootCA blocks until a new root CA is available or the timeout is 199// reached (on timeout ErrTimeout is returned on the channel). 200func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error, 201 timeout time.Duration) { 202 // We always want to block on at least an initial value. If this isn't 203 minIndex := atomic.LoadUint64(&c.caIndex) 204 if minIndex == 0 { 205 minIndex = 1 206 } 207 208 // Fetch some new roots. This will block until our MinQueryIndex is 209 // matched or the timeout is reached. 210 rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{ 211 Datacenter: datacenter, 212 QueryOptions: structs.QueryOptions{ 213 MinQueryIndex: minIndex, 214 MaxQueryTime: timeout, 215 }, 216 }) 217 if err != nil { 218 ch <- err 219 return 220 } 221 222 roots, ok := rawRoots.(*structs.IndexedCARoots) 223 if !ok { 224 // This should never happen but we don't want to even risk a panic 225 ch <- fmt.Errorf( 226 "internal error: CA root cache returned bad type: %T", rawRoots) 227 return 228 } 229 230 // We do a loop here because there can be multiple waitNewRootCA calls 231 // happening simultaneously. Each Fetch kicks off one call. These are 232 // multiplexed through Cache.Get which should ensure we only ever 233 // actually make a single RPC call. However, there is a race to set 234 // the caIndex field so do a basic CAS loop here. 235 for { 236 // We only set our index if its newer than what is previously set. 237 old := atomic.LoadUint64(&c.caIndex) 238 if old == roots.Index || old > roots.Index { 239 break 240 } 241 242 // Set the new index atomically. If the caIndex value changed 243 // in the meantime, retry. 244 if atomic.CompareAndSwapUint64(&c.caIndex, old, roots.Index) { 245 break 246 } 247 } 248 249 // Trigger the channel since we updated. 250 ch <- nil 251} 252 253func (c *ConnectCALeaf) SupportsBlocking() bool { 254 return true 255} 256 257// ConnectCALeafRequest is the cache.Request implementation for the 258// ConnectCALeaf cache type. This is implemented here and not in structs 259// since this is only used for cache-related requests and not forwarded 260// directly to any Consul servers. 261type ConnectCALeafRequest struct { 262 Token string 263 Datacenter string 264 Service string // Service name, not ID 265 MinQueryIndex uint64 266} 267 268func (r *ConnectCALeafRequest) CacheInfo() cache.RequestInfo { 269 return cache.RequestInfo{ 270 Token: r.Token, 271 Key: r.Service, 272 Datacenter: r.Datacenter, 273 MinIndex: r.MinQueryIndex, 274 } 275} 276