1// Copyright 2017 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package consul
16
17import (
18	"fmt"
19	"sync"
20
21	"github.com/hashicorp/consul/api"
22
23	"istio.io/pkg/log"
24
25	"istio.io/istio/pilot/pkg/model"
26	"istio.io/istio/pilot/pkg/serviceregistry"
27	"istio.io/istio/pkg/config/host"
28	"istio.io/istio/pkg/config/labels"
29	"istio.io/istio/pkg/spiffe"
30)
31
32var _ serviceregistry.Instance = &Controller{}
33
34// Controller communicates with Consul and monitors for changes
35type Controller struct {
36	client           *api.Client
37	monitor          Monitor
38	services         map[string]*model.Service //key hostname value service
39	servicesList     []*model.Service
40	serviceInstances map[string][]*model.ServiceInstance //key hostname value serviceInstance array
41	cacheMutex       sync.Mutex
42	initDone         bool
43	clusterID        string
44}
45
46// NewController creates a new Consul controller
47func NewController(addr string, clusterID string) (*Controller, error) {
48	conf := api.DefaultConfig()
49	conf.Address = addr
50
51	client, err := api.NewClient(conf)
52	monitor := NewConsulMonitor(client)
53	controller := Controller{
54		monitor:   monitor,
55		client:    client,
56		clusterID: clusterID,
57	}
58
59	//Watch the change events to refresh local caches
60	monitor.AppendServiceHandler(controller.ServiceChanged)
61	monitor.AppendInstanceHandler(controller.InstanceChanged)
62	return &controller, err
63}
64
65func (c *Controller) Provider() serviceregistry.ProviderID {
66	return serviceregistry.Consul
67}
68
69func (c *Controller) Cluster() string {
70	return c.clusterID
71}
72
73// Services list declarations of all services in the system
74func (c *Controller) Services() ([]*model.Service, error) {
75	c.cacheMutex.Lock()
76	defer c.cacheMutex.Unlock()
77
78	err := c.initCache()
79	if err != nil {
80		return nil, err
81	}
82
83	return c.servicesList, nil
84}
85
86// GetService retrieves a service by host name if it exists
87func (c *Controller) GetService(hostname host.Name) (*model.Service, error) {
88	c.cacheMutex.Lock()
89	defer c.cacheMutex.Unlock()
90
91	err := c.initCache()
92	if err != nil {
93		return nil, err
94	}
95
96	// Get actual service by name
97	name, err := parseHostname(hostname)
98	if err != nil {
99		log.Infof("parseHostname(%s) => error %v", hostname, err)
100		return nil, err
101	}
102
103	if service, ok := c.services[name]; ok {
104		return service, nil
105	}
106	return nil, nil
107}
108
109// ManagementPorts retrieves set of health check ports by instance IP.
110// This does not apply to Consul service registry, as Consul does not
111// manage the service instances. In future, when we integrate Nomad, we
112// might revisit this function.
113func (c *Controller) ManagementPorts(addr string) model.PortList {
114	return nil
115}
116
117// WorkloadHealthCheckInfo retrieves set of health check info by instance IP.
118// This does not apply to Consul service registry, as Consul does not
119// manage the service instances. In future, when we integrate Nomad, we
120// might revisit this function.
121func (c *Controller) WorkloadHealthCheckInfo(addr string) model.ProbeList {
122	return nil
123}
124
125// InstancesByPort retrieves instances for a service that match
126// any of the supplied labels. All instances match an empty tag list.
127func (c *Controller) InstancesByPort(svc *model.Service, port int,
128	labels labels.Collection) ([]*model.ServiceInstance, error) {
129	c.cacheMutex.Lock()
130	defer c.cacheMutex.Unlock()
131
132	err := c.initCache()
133	if err != nil {
134		return nil, err
135	}
136
137	// Get actual service by name
138	name, err := parseHostname(svc.Hostname)
139	if err != nil {
140		log.Infof("parseHostname(%s) => error %v", svc.Hostname, err)
141		return nil, err
142	}
143
144	if serviceInstances, ok := c.serviceInstances[name]; ok {
145		var instances []*model.ServiceInstance
146		for _, instance := range serviceInstances {
147			if labels.HasSubsetOf(instance.Endpoint.Labels) && portMatch(instance, port) {
148				instances = append(instances, instance)
149			}
150		}
151
152		return instances, nil
153	}
154	return nil, fmt.Errorf("could not find instance of service: %s", name)
155}
156
157// returns true if an instance's port matches with any in the provided list
158func portMatch(instance *model.ServiceInstance, port int) bool {
159	return port == 0 || port == instance.ServicePort.Port
160}
161
162// GetProxyServiceInstances lists service instances co-located with a given proxy
163func (c *Controller) GetProxyServiceInstances(node *model.Proxy) ([]*model.ServiceInstance, error) {
164	c.cacheMutex.Lock()
165	defer c.cacheMutex.Unlock()
166
167	err := c.initCache()
168	if err != nil {
169		return nil, err
170	}
171
172	out := make([]*model.ServiceInstance, 0)
173	for _, instances := range c.serviceInstances {
174		for _, instance := range instances {
175			addr := instance.Endpoint.Address
176			if len(node.IPAddresses) > 0 {
177				for _, ipAddress := range node.IPAddresses {
178					if ipAddress == addr {
179						out = append(out, instance)
180						break
181					}
182				}
183			}
184		}
185	}
186
187	return out, nil
188}
189
190func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) (labels.Collection, error) {
191	c.cacheMutex.Lock()
192	defer c.cacheMutex.Unlock()
193
194	err := c.initCache()
195	if err != nil {
196		return nil, err
197	}
198
199	out := make(labels.Collection, 0)
200	for _, instances := range c.serviceInstances {
201		for _, instance := range instances {
202			addr := instance.Endpoint.Address
203			if len(proxy.IPAddresses) > 0 {
204				for _, ipAddress := range proxy.IPAddresses {
205					if ipAddress == addr {
206						out = append(out, instance.Endpoint.Labels)
207						break
208					}
209				}
210			}
211		}
212	}
213
214	return out, nil
215}
216
217// Run all controllers until a signal is received
218func (c *Controller) Run(stop <-chan struct{}) {
219	c.monitor.Start(stop)
220}
221
222// AppendServiceHandler implements a service catalog operation
223func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error {
224	c.monitor.AppendServiceHandler(func(instances []*api.CatalogService, event model.Event) error {
225		f(convertService(instances), event)
226		return nil
227	})
228	return nil
229}
230
231// AppendInstanceHandler implements a service catalog operation
232func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
233	c.monitor.AppendInstanceHandler(func(instance *api.CatalogService, event model.Event) error {
234		f(convertInstance(instance), event)
235		return nil
236	})
237	return nil
238}
239
240// GetIstioServiceAccounts implements model.ServiceAccounts operation TODO
241func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string {
242	// Need to get service account of service registered with consul
243	// Currently Consul does not have service account or equivalent concept
244	// As a step-1, to enabling istio security in Consul, We assume all the services run in default service account
245	// This will allow all the consul services to do mTLS
246	// Follow - https://goo.gl/Dt11Ct
247
248	return []string{
249		spiffe.MustGenSpiffeURI("default", "default"),
250	}
251}
252
253func (c *Controller) initCache() error {
254	if c.initDone {
255		return nil
256	}
257
258	c.services = make(map[string]*model.Service)
259	c.serviceInstances = make(map[string][]*model.ServiceInstance)
260
261	// get all services from consul
262	consulServices, err := c.getServices()
263	if err != nil {
264		return err
265	}
266
267	for serviceName := range consulServices {
268		// get endpoints of a service from consul
269		endpoints, err := c.getCatalogService(serviceName, nil)
270		if err != nil {
271			return err
272		}
273		c.services[serviceName] = convertService(endpoints)
274
275		instances := make([]*model.ServiceInstance, len(endpoints))
276		for i, endpoint := range endpoints {
277			instances[i] = convertInstance(endpoint)
278		}
279		c.serviceInstances[serviceName] = instances
280	}
281
282	c.servicesList = make([]*model.Service, 0, len(c.services))
283	for _, value := range c.services {
284		c.servicesList = append(c.servicesList, value)
285	}
286
287	c.initDone = true
288	return nil
289}
290
291func (c *Controller) getServices() (map[string][]string, error) {
292	data, _, err := c.client.Catalog().Services(nil)
293	if err != nil {
294		log.Warnf("Could not retrieve services from consul: %v", err)
295		return nil, err
296	}
297
298	return data, nil
299}
300
301// nolint: unparam
302func (c *Controller) getCatalogService(name string, q *api.QueryOptions) ([]*api.CatalogService, error) {
303	endpoints, _, err := c.client.Catalog().Service(name, "", q)
304	if err != nil {
305		log.Warnf("Could not retrieve service catalog from consul: %v", err)
306		return nil, err
307	}
308
309	return endpoints, nil
310}
311
312func (c *Controller) refreshCache() {
313	c.cacheMutex.Lock()
314	defer c.cacheMutex.Unlock()
315	c.initDone = false
316}
317
318func (c *Controller) InstanceChanged(instance *api.CatalogService, event model.Event) error {
319	c.refreshCache()
320	return nil
321}
322
323func (c *Controller) ServiceChanged(instances []*api.CatalogService, event model.Event) error {
324	c.refreshCache()
325	return nil
326}
327