1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "strings" 24 25 "google.golang.org/grpc/grpclog" 26 "google.golang.org/grpc/internal/channelz" 27 "google.golang.org/grpc/resolver" 28) 29 30// ccResolverWrapper is a wrapper on top of cc for resolvers. 31// It implements resolver.ClientConnection interface. 32type ccResolverWrapper struct { 33 cc *ClientConn 34 resolver resolver.Resolver 35 addrCh chan []resolver.Address 36 scCh chan string 37 done chan struct{} 38 lastAddressesCount int 39} 40 41// split2 returns the values from strings.SplitN(s, sep, 2). 42// If sep is not found, it returns ("", s, false) instead. 43func split2(s, sep string) (string, string, bool) { 44 spl := strings.SplitN(s, sep, 2) 45 if len(spl) < 2 { 46 return "", "", false 47 } 48 return spl[0], spl[1], true 49} 50 51// parseTarget splits target into a struct containing scheme, authority and 52// endpoint. 53// 54// If target is not a valid scheme://authority/endpoint, it returns {Endpoint: 55// target}. 56func parseTarget(target string) (ret resolver.Target) { 57 var ok bool 58 ret.Scheme, ret.Endpoint, ok = split2(target, "://") 59 if !ok { 60 return resolver.Target{Endpoint: target} 61 } 62 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") 63 if !ok { 64 return resolver.Target{Endpoint: target} 65 } 66 return ret 67} 68 69// newCCResolverWrapper parses cc.target for scheme and gets the resolver 70// builder for this scheme and builds the resolver. The monitoring goroutine 71// for it is not started yet and can be created by calling start(). 72// 73// If withResolverBuilder dial option is set, the specified resolver will be 74// used instead. 75func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { 76 rb := cc.dopts.resolverBuilder 77 if rb == nil { 78 return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme) 79 } 80 81 ccr := &ccResolverWrapper{ 82 cc: cc, 83 addrCh: make(chan []resolver.Address, 1), 84 scCh: make(chan string, 1), 85 done: make(chan struct{}), 86 } 87 88 var err error 89 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig}) 90 if err != nil { 91 return nil, err 92 } 93 return ccr, nil 94} 95 96func (ccr *ccResolverWrapper) start() { 97 go ccr.watcher() 98} 99 100// watcher processes address updates and service config updates sequentially. 101// Otherwise, we need to resolve possible races between address and service 102// config (e.g. they specify different balancer types). 103func (ccr *ccResolverWrapper) watcher() { 104 for { 105 select { 106 case <-ccr.done: 107 return 108 default: 109 } 110 111 select { 112 case addrs := <-ccr.addrCh: 113 select { 114 case <-ccr.done: 115 return 116 default: 117 } 118 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) 119 if channelz.IsOn() { 120 ccr.addChannelzTraceEvent(addrs) 121 } 122 ccr.cc.handleResolvedAddrs(addrs, nil) 123 case sc := <-ccr.scCh: 124 select { 125 case <-ccr.done: 126 return 127 default: 128 } 129 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) 130 ccr.cc.handleServiceConfig(sc) 131 case <-ccr.done: 132 return 133 } 134 } 135} 136 137func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { 138 ccr.resolver.ResolveNow(o) 139} 140 141func (ccr *ccResolverWrapper) close() { 142 ccr.resolver.Close() 143 close(ccr.done) 144} 145 146// NewAddress is called by the resolver implemenetion to send addresses to gRPC. 147func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { 148 select { 149 case <-ccr.addrCh: 150 default: 151 } 152 ccr.addrCh <- addrs 153} 154 155// NewServiceConfig is called by the resolver implemenetion to send service 156// configs to gRPC. 157func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { 158 select { 159 case <-ccr.scCh: 160 default: 161 } 162 ccr.scCh <- sc 163} 164 165func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) { 166 if len(addrs) == 0 && ccr.lastAddressesCount != 0 { 167 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{ 168 Desc: "Resolver returns an empty address list", 169 Severity: channelz.CtWarning, 170 }) 171 } else if len(addrs) != 0 && ccr.lastAddressesCount == 0 { 172 var s string 173 for i, a := range addrs { 174 if a.ServerName != "" { 175 s += a.Addr + "(" + a.ServerName + ")" 176 } else { 177 s += a.Addr 178 } 179 if i != len(addrs)-1 { 180 s += " " 181 } 182 } 183 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{ 184 Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s), 185 Severity: channelz.CtINFO, 186 }) 187 } 188 ccr.lastAddressesCount = len(addrs) 189} 190