1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 "time" 25 26 "google.golang.org/grpc/balancer" 27 "google.golang.org/grpc/connectivity" 28 "google.golang.org/grpc/resolver" 29) 30 31// The parent ClientConn should re-resolve when grpclb loses connection to the 32// remote balancer. When the ClientConn inside grpclb gets a TransientFailure, 33// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's 34// ResolveNow, and eventually results in re-resolve happening in parent 35// ClientConn's resolver (DNS for example). 36// 37// parent 38// ClientConn 39// +-----------------------------------------------------------------+ 40// | parent +---------------------------------+ | 41// | DNS ClientConn | grpclb | | 42// | resolver balancerWrapper | | | 43// | + + | grpclb grpclb | | 44// | | | | ManualResolver ClientConn | | 45// | | | | + + | | 46// | | | | | | Transient | | 47// | | | | | | Failure | | 48// | | | | | <--------- | | | 49// | | | <--------------- | ResolveNow | | | 50// | | <--------- | ResolveNow | | | | | 51// | | ResolveNow | | | | | | 52// | | | | | | | | 53// | + + | + + | | 54// | +---------------------------------+ | 55// +-----------------------------------------------------------------+ 56 57// lbManualResolver is used by the ClientConn inside grpclb. It's a manual 58// resolver with a special ResolveNow() function. 59// 60// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn, 61// so when grpclb client lose contact with remote balancers, the parent 62// ClientConn's resolver will re-resolve. 63type lbManualResolver struct { 64 scheme string 65 ccr resolver.ClientConn 66 67 ccb balancer.ClientConn 68} 69 70func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { 71 r.ccr = cc 72 return r, nil 73} 74 75func (r *lbManualResolver) Scheme() string { 76 return r.scheme 77} 78 79// ResolveNow calls resolveNow on the parent ClientConn. 80func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) { 81 r.ccb.ResolveNow(o) 82} 83 84// Close is a noop for Resolver. 85func (*lbManualResolver) Close() {} 86 87// NewAddress calls cc.NewAddress. 88func (r *lbManualResolver) NewAddress(addrs []resolver.Address) { 89 r.ccr.NewAddress(addrs) 90} 91 92// NewServiceConfig calls cc.NewServiceConfig. 93func (r *lbManualResolver) NewServiceConfig(sc string) { 94 r.ccr.NewServiceConfig(sc) 95} 96 97const subConnCacheTime = time.Second * 10 98 99// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. 100// SubConns will be kept in cache for subConnCacheTime before being removed. 101// 102// Its new and remove methods are updated to do cache first. 103type lbCacheClientConn struct { 104 cc balancer.ClientConn 105 timeout time.Duration 106 107 mu sync.Mutex 108 // subConnCache only keeps subConns that are being deleted. 109 subConnCache map[resolver.Address]*subConnCacheEntry 110 subConnToAddr map[balancer.SubConn]resolver.Address 111} 112 113type subConnCacheEntry struct { 114 sc balancer.SubConn 115 116 cancel func() 117 abortDeleting bool 118} 119 120func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn { 121 return &lbCacheClientConn{ 122 cc: cc, 123 timeout: subConnCacheTime, 124 subConnCache: make(map[resolver.Address]*subConnCacheEntry), 125 subConnToAddr: make(map[balancer.SubConn]resolver.Address), 126 } 127} 128 129func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 130 if len(addrs) != 1 { 131 return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) 132 } 133 addrWithoutMD := addrs[0] 134 addrWithoutMD.Metadata = nil 135 136 ccc.mu.Lock() 137 defer ccc.mu.Unlock() 138 if entry, ok := ccc.subConnCache[addrWithoutMD]; ok { 139 // If entry is in subConnCache, the SubConn was being deleted. 140 // cancel function will never be nil. 141 entry.cancel() 142 delete(ccc.subConnCache, addrWithoutMD) 143 return entry.sc, nil 144 } 145 146 scNew, err := ccc.cc.NewSubConn(addrs, opts) 147 if err != nil { 148 return nil, err 149 } 150 151 ccc.subConnToAddr[scNew] = addrWithoutMD 152 return scNew, nil 153} 154 155func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { 156 ccc.mu.Lock() 157 defer ccc.mu.Unlock() 158 addr, ok := ccc.subConnToAddr[sc] 159 if !ok { 160 return 161 } 162 163 if entry, ok := ccc.subConnCache[addr]; ok { 164 if entry.sc != sc { 165 // This could happen if NewSubConn was called multiple times for the 166 // same address, and those SubConns are all removed. We remove sc 167 // immediately here. 168 delete(ccc.subConnToAddr, sc) 169 ccc.cc.RemoveSubConn(sc) 170 } 171 return 172 } 173 174 entry := &subConnCacheEntry{ 175 sc: sc, 176 } 177 ccc.subConnCache[addr] = entry 178 179 timer := time.AfterFunc(ccc.timeout, func() { 180 ccc.mu.Lock() 181 if entry.abortDeleting { 182 return 183 } 184 ccc.cc.RemoveSubConn(sc) 185 delete(ccc.subConnToAddr, sc) 186 delete(ccc.subConnCache, addr) 187 ccc.mu.Unlock() 188 }) 189 entry.cancel = func() { 190 if !timer.Stop() { 191 // If stop was not successful, the timer has fired (this can only 192 // happen in a race). But the deleting function is blocked on ccc.mu 193 // because the mutex was held by the caller of this function. 194 // 195 // Set abortDeleting to true to abort the deleting function. When 196 // the lock is released, the deleting function will acquire the 197 // lock, check the value of abortDeleting and return. 198 entry.abortDeleting = true 199 } 200 } 201} 202 203func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { 204 ccc.cc.UpdateBalancerState(s, p) 205} 206 207func (ccc *lbCacheClientConn) close() { 208 ccc.mu.Lock() 209 // Only cancel all existing timers. There's no need to remove SubConns. 210 for _, entry := range ccc.subConnCache { 211 entry.cancel() 212 } 213 ccc.mu.Unlock() 214} 215