1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21type watcherInfoWithUpdate struct { 22 wi *watchInfo 23 update interface{} 24 err error 25} 26 27// scheduleCallback should only be called by methods of watchInfo, which checks 28// for watcher states and maintain consistency. 29func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) { 30 c.updateCh.Put(&watcherInfoWithUpdate{ 31 wi: wi, 32 update: update, 33 err: err, 34 }) 35} 36 37func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) { 38 c.mu.Lock() 39 // Use a closure to capture the callback and type assertion, to save one 40 // more switch case. 41 // 42 // The callback must be called without c.mu. Otherwise if the callback calls 43 // another watch() inline, it will cause a deadlock. This leaves a small 44 // window that a watcher's callback could be called after the watcher is 45 // canceled, and the user needs to take care of it. 46 var ccb func() 47 switch wiu.wi.rType { 48 case ListenerResource: 49 if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 50 ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) } 51 } 52 case RouteConfigResource: 53 if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 54 ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) } 55 } 56 case ClusterResource: 57 if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 58 ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) } 59 } 60 case EndpointsResource: 61 if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 62 ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) } 63 } 64 } 65 c.mu.Unlock() 66 67 if ccb != nil { 68 ccb() 69 } 70} 71 72// NewListeners is called by the underlying xdsAPIClient when it receives an 73// xDS response. 74// 75// A response can contain multiple resources. They will be parsed and put in a 76// map from resource name to the resource content. 77func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate) { 78 c.mu.Lock() 79 defer c.mu.Unlock() 80 81 for name, update := range updates { 82 if s, ok := c.ldsWatchers[name]; ok { 83 for wi := range s { 84 wi.newUpdate(update) 85 } 86 // Sync cache. 87 c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update) 88 c.ldsCache[name] = update 89 } 90 } 91 for name := range c.ldsCache { 92 if _, ok := updates[name]; !ok { 93 // If resource exists in cache, but not in the new update, delete it 94 // from cache, and also send an resource not found error to indicate 95 // resource removed. 96 delete(c.ldsCache, name) 97 for wi := range c.ldsWatchers[name] { 98 wi.resourceNotFound() 99 } 100 } 101 } 102 // When LDS resource is removed, we don't delete corresponding RDS cached 103 // data. The RDS watch will be canceled, and cache entry is removed when the 104 // last watch is canceled. 105} 106 107// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an 108// xDS response. 109// 110// A response can contain multiple resources. They will be parsed and put in a 111// map from resource name to the resource content. 112func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate) { 113 c.mu.Lock() 114 defer c.mu.Unlock() 115 116 for name, update := range updates { 117 if s, ok := c.rdsWatchers[name]; ok { 118 for wi := range s { 119 wi.newUpdate(update) 120 } 121 // Sync cache. 122 c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update) 123 c.rdsCache[name] = update 124 } 125 } 126} 127 128// NewClusters is called by the underlying xdsAPIClient when it receives an xDS 129// response. 130// 131// A response can contain multiple resources. They will be parsed and put in a 132// map from resource name to the resource content. 133func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate) { 134 c.mu.Lock() 135 defer c.mu.Unlock() 136 137 for name, update := range updates { 138 if s, ok := c.cdsWatchers[name]; ok { 139 for wi := range s { 140 wi.newUpdate(update) 141 } 142 // Sync cache. 143 c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update) 144 c.cdsCache[name] = update 145 } 146 } 147 for name := range c.cdsCache { 148 if _, ok := updates[name]; !ok { 149 // If resource exists in cache, but not in the new update, delete it 150 // from cache, and also send an resource not found error to indicate 151 // resource removed. 152 delete(c.cdsCache, name) 153 for wi := range c.cdsWatchers[name] { 154 wi.resourceNotFound() 155 } 156 } 157 } 158 // When CDS resource is removed, we don't delete corresponding EDS cached 159 // data. The EDS watch will be canceled, and cache entry is removed when the 160 // last watch is canceled. 161} 162 163// NewEndpoints is called by the underlying xdsAPIClient when it receives an 164// xDS response. 165// 166// A response can contain multiple resources. They will be parsed and put in a 167// map from resource name to the resource content. 168func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate) { 169 c.mu.Lock() 170 defer c.mu.Unlock() 171 172 for name, update := range updates { 173 if s, ok := c.edsWatchers[name]; ok { 174 for wi := range s { 175 wi.newUpdate(update) 176 } 177 // Sync cache. 178 c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update) 179 c.edsCache[name] = update 180 } 181 } 182} 183