1// Copyright 2017 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package consul 16 17import ( 18 "fmt" 19 "sync" 20 21 "github.com/hashicorp/consul/api" 22 23 "istio.io/pkg/log" 24 25 "istio.io/istio/pilot/pkg/model" 26 "istio.io/istio/pilot/pkg/serviceregistry" 27 "istio.io/istio/pkg/config/host" 28 "istio.io/istio/pkg/config/labels" 29 "istio.io/istio/pkg/spiffe" 30) 31 32var _ serviceregistry.Instance = &Controller{} 33 34// Controller communicates with Consul and monitors for changes 35type Controller struct { 36 client *api.Client 37 monitor Monitor 38 services map[string]*model.Service //key hostname value service 39 servicesList []*model.Service 40 serviceInstances map[string][]*model.ServiceInstance //key hostname value serviceInstance array 41 cacheMutex sync.Mutex 42 initDone bool 43 clusterID string 44} 45 46// NewController creates a new Consul controller 47func NewController(addr string, clusterID string) (*Controller, error) { 48 conf := api.DefaultConfig() 49 conf.Address = addr 50 51 client, err := api.NewClient(conf) 52 monitor := NewConsulMonitor(client) 53 controller := Controller{ 54 monitor: monitor, 55 client: client, 56 clusterID: clusterID, 57 } 58 59 //Watch the change events to refresh local caches 60 monitor.AppendServiceHandler(controller.ServiceChanged) 61 monitor.AppendInstanceHandler(controller.InstanceChanged) 62 return &controller, err 63} 64 65func (c *Controller) Provider() serviceregistry.ProviderID { 66 return serviceregistry.Consul 67} 68 69func (c *Controller) Cluster() string { 70 return c.clusterID 71} 72 73// Services list declarations of all services in the system 74func (c *Controller) Services() ([]*model.Service, error) { 75 c.cacheMutex.Lock() 76 defer c.cacheMutex.Unlock() 77 78 err := c.initCache() 79 if err != nil { 80 return nil, err 81 } 82 83 return c.servicesList, nil 84} 85 86// GetService retrieves a service by host name if it exists 87func (c *Controller) GetService(hostname host.Name) (*model.Service, error) { 88 c.cacheMutex.Lock() 89 defer c.cacheMutex.Unlock() 90 91 err := c.initCache() 92 if err != nil { 93 return nil, err 94 } 95 96 // Get actual service by name 97 name, err := parseHostname(hostname) 98 if err != nil { 99 log.Infof("parseHostname(%s) => error %v", hostname, err) 100 return nil, err 101 } 102 103 if service, ok := c.services[name]; ok { 104 return service, nil 105 } 106 return nil, nil 107} 108 109// ManagementPorts retrieves set of health check ports by instance IP. 110// This does not apply to Consul service registry, as Consul does not 111// manage the service instances. In future, when we integrate Nomad, we 112// might revisit this function. 113func (c *Controller) ManagementPorts(addr string) model.PortList { 114 return nil 115} 116 117// WorkloadHealthCheckInfo retrieves set of health check info by instance IP. 118// This does not apply to Consul service registry, as Consul does not 119// manage the service instances. In future, when we integrate Nomad, we 120// might revisit this function. 121func (c *Controller) WorkloadHealthCheckInfo(addr string) model.ProbeList { 122 return nil 123} 124 125// InstancesByPort retrieves instances for a service that match 126// any of the supplied labels. All instances match an empty tag list. 127func (c *Controller) InstancesByPort(svc *model.Service, port int, 128 labels labels.Collection) ([]*model.ServiceInstance, error) { 129 c.cacheMutex.Lock() 130 defer c.cacheMutex.Unlock() 131 132 err := c.initCache() 133 if err != nil { 134 return nil, err 135 } 136 137 // Get actual service by name 138 name, err := parseHostname(svc.Hostname) 139 if err != nil { 140 log.Infof("parseHostname(%s) => error %v", svc.Hostname, err) 141 return nil, err 142 } 143 144 if serviceInstances, ok := c.serviceInstances[name]; ok { 145 var instances []*model.ServiceInstance 146 for _, instance := range serviceInstances { 147 if labels.HasSubsetOf(instance.Endpoint.Labels) && portMatch(instance, port) { 148 instances = append(instances, instance) 149 } 150 } 151 152 return instances, nil 153 } 154 return nil, fmt.Errorf("could not find instance of service: %s", name) 155} 156 157// returns true if an instance's port matches with any in the provided list 158func portMatch(instance *model.ServiceInstance, port int) bool { 159 return port == 0 || port == instance.ServicePort.Port 160} 161 162// GetProxyServiceInstances lists service instances co-located with a given proxy 163func (c *Controller) GetProxyServiceInstances(node *model.Proxy) ([]*model.ServiceInstance, error) { 164 c.cacheMutex.Lock() 165 defer c.cacheMutex.Unlock() 166 167 err := c.initCache() 168 if err != nil { 169 return nil, err 170 } 171 172 out := make([]*model.ServiceInstance, 0) 173 for _, instances := range c.serviceInstances { 174 for _, instance := range instances { 175 addr := instance.Endpoint.Address 176 if len(node.IPAddresses) > 0 { 177 for _, ipAddress := range node.IPAddresses { 178 if ipAddress == addr { 179 out = append(out, instance) 180 break 181 } 182 } 183 } 184 } 185 } 186 187 return out, nil 188} 189 190func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) (labels.Collection, error) { 191 c.cacheMutex.Lock() 192 defer c.cacheMutex.Unlock() 193 194 err := c.initCache() 195 if err != nil { 196 return nil, err 197 } 198 199 out := make(labels.Collection, 0) 200 for _, instances := range c.serviceInstances { 201 for _, instance := range instances { 202 addr := instance.Endpoint.Address 203 if len(proxy.IPAddresses) > 0 { 204 for _, ipAddress := range proxy.IPAddresses { 205 if ipAddress == addr { 206 out = append(out, instance.Endpoint.Labels) 207 break 208 } 209 } 210 } 211 } 212 } 213 214 return out, nil 215} 216 217// Run all controllers until a signal is received 218func (c *Controller) Run(stop <-chan struct{}) { 219 c.monitor.Start(stop) 220} 221 222// AppendServiceHandler implements a service catalog operation 223func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error { 224 c.monitor.AppendServiceHandler(func(instances []*api.CatalogService, event model.Event) error { 225 f(convertService(instances), event) 226 return nil 227 }) 228 return nil 229} 230 231// AppendInstanceHandler implements a service catalog operation 232func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error { 233 c.monitor.AppendInstanceHandler(func(instance *api.CatalogService, event model.Event) error { 234 f(convertInstance(instance), event) 235 return nil 236 }) 237 return nil 238} 239 240// GetIstioServiceAccounts implements model.ServiceAccounts operation TODO 241func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string { 242 // Need to get service account of service registered with consul 243 // Currently Consul does not have service account or equivalent concept 244 // As a step-1, to enabling istio security in Consul, We assume all the services run in default service account 245 // This will allow all the consul services to do mTLS 246 // Follow - https://goo.gl/Dt11Ct 247 248 return []string{ 249 spiffe.MustGenSpiffeURI("default", "default"), 250 } 251} 252 253func (c *Controller) initCache() error { 254 if c.initDone { 255 return nil 256 } 257 258 c.services = make(map[string]*model.Service) 259 c.serviceInstances = make(map[string][]*model.ServiceInstance) 260 261 // get all services from consul 262 consulServices, err := c.getServices() 263 if err != nil { 264 return err 265 } 266 267 for serviceName := range consulServices { 268 // get endpoints of a service from consul 269 endpoints, err := c.getCatalogService(serviceName, nil) 270 if err != nil { 271 return err 272 } 273 c.services[serviceName] = convertService(endpoints) 274 275 instances := make([]*model.ServiceInstance, len(endpoints)) 276 for i, endpoint := range endpoints { 277 instances[i] = convertInstance(endpoint) 278 } 279 c.serviceInstances[serviceName] = instances 280 } 281 282 c.servicesList = make([]*model.Service, 0, len(c.services)) 283 for _, value := range c.services { 284 c.servicesList = append(c.servicesList, value) 285 } 286 287 c.initDone = true 288 return nil 289} 290 291func (c *Controller) getServices() (map[string][]string, error) { 292 data, _, err := c.client.Catalog().Services(nil) 293 if err != nil { 294 log.Warnf("Could not retrieve services from consul: %v", err) 295 return nil, err 296 } 297 298 return data, nil 299} 300 301// nolint: unparam 302func (c *Controller) getCatalogService(name string, q *api.QueryOptions) ([]*api.CatalogService, error) { 303 endpoints, _, err := c.client.Catalog().Service(name, "", q) 304 if err != nil { 305 log.Warnf("Could not retrieve service catalog from consul: %v", err) 306 return nil, err 307 } 308 309 return endpoints, nil 310} 311 312func (c *Controller) refreshCache() { 313 c.cacheMutex.Lock() 314 defer c.cacheMutex.Unlock() 315 c.initDone = false 316} 317 318func (c *Controller) InstanceChanged(instance *api.CatalogService, event model.Event) error { 319 c.refreshCache() 320 return nil 321} 322 323func (c *Controller) ServiceChanged(instances []*api.CatalogService, event model.Event) error { 324 c.refreshCache() 325 return nil 326} 327