1// Package k8s provides pod discovery for Kubernetes.
2package k8s
3
4import (
5	"fmt"
6	"log"
7	"path/filepath"
8	"strconv"
9
10	"github.com/hashicorp/go-multierror"
11	"github.com/mitchellh/go-homedir"
12	corev1 "k8s.io/api/core/v1"
13	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14	"k8s.io/client-go/kubernetes"
15	"k8s.io/client-go/rest"
16	"k8s.io/client-go/tools/clientcmd"
17
18	// Register all known auth mechanisms since we might be authenticating
19	// from anywhere.
20	_ "k8s.io/client-go/plugin/pkg/client/auth"
21)
22
23const (
24	// AnnotationKeyPort is the annotation name of the field that specifies
25	// the port name or number to append to the address.
26	AnnotationKeyPort = "consul.hashicorp.com/auto-join-port"
27)
28
29type Provider struct{}
30
31func (p *Provider) Help() string {
32	return `Kubernetes (K8S):
33
34    provider:         "k8s"
35    kubeconfig:       Path to the kubeconfig file.
36    namespace:        Namespace to search for pods (defaults to "default").
37    label_selector:   Label selector value to filter pods.
38    field_selector:   Field selector value to filter pods.
39    host_network:     "true" if pod host IP and ports should be used.
40
41    The kubeconfig file value will be searched in the following locations:
42
43     1. Use path from "kubeconfig" option if provided.
44     2. Use path from KUBECONFIG environment variable.
45     3. Use default path of $HOME/.kube/config
46
47    By default, the Pod IP is used to join. The "host_network" option may
48    be set to use the Host IP. No port is used by default. Pods may set
49    an annotation 'hashicorp/consul-auto-join-port' to a named port or
50    an integer value. If the value matches a named port, that port will
51    be used to join.
52
53    Note that if "host_network" is set to true, then only pods that have
54    a HostIP available will be selected. If a port annotation exists, then
55    the port must be exposed via a HostPort as well, otherwise the pod will
56    be ignored.
57`
58}
59
60func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error) {
61	if args["provider"] != "k8s" {
62		return nil, fmt.Errorf("discover-k8s: invalid provider " + args["provider"])
63	}
64
65	// Get the configuration. This can come from multiple sources. We first
66	// try kubeconfig it is set directly, then we fall back to in-cluster
67	// auth. Finally, we try the default kubeconfig path.
68	kubeconfig := args["kubeconfig"]
69	if kubeconfig == "" {
70		// If kubeconfig is empty, let's first try the default directory.
71		// This is must faster than trying in-cluster auth so we try this
72		// first.
73		dir, err := homedir.Dir()
74		if err != nil {
75			return nil, fmt.Errorf("discover-k8s: error retrieving home directory: %s", err)
76		}
77		kubeconfig = filepath.Join(dir, ".kube", "config")
78	}
79
80	// First try to get the configuration from the kubeconfig value
81	config, configErr := clientcmd.BuildConfigFromFlags("", kubeconfig)
82	if configErr != nil {
83		configErr = fmt.Errorf("discover-k8s: error loading kubeconfig: %s", configErr)
84
85		// kubeconfig failed, fall back and try in-cluster config. We do
86		// this as the fallback since this makes network connections and
87		// is much slower to fail.
88		var err error
89		config, err = rest.InClusterConfig()
90		if err != nil {
91			return nil, multierror.Append(configErr, fmt.Errorf(
92				"discover-k8s: error loading in-cluster config: %s", err))
93		}
94	}
95
96	// Initialize the clientset
97	clientset, err := kubernetes.NewForConfig(config)
98	if err != nil {
99		return nil, fmt.Errorf("discover-k8s: error initializing k8s client: %s", err)
100	}
101
102	namespace := args["namespace"]
103	if namespace == "" {
104		namespace = "default"
105	}
106
107	// List all the pods based on the filters we requested
108	pods, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{
109		LabelSelector: args["label_selector"],
110		FieldSelector: args["field_selector"],
111	})
112	if err != nil {
113		return nil, fmt.Errorf("discover-k8s: error listing pods: %s", err)
114	}
115
116	return PodAddrs(pods, args, l)
117}
118
119// PodAddrs extracts the addresses from a list of pods.
120//
121// This is a separate method so that we can unit test this without having
122// to setup complicated K8S cluster scenarios. It shouldn't generally be
123// called externally.
124func PodAddrs(pods *corev1.PodList, args map[string]string, l *log.Logger) ([]string, error) {
125	hostNetwork := false
126	if v := args["host_network"]; v != "" {
127		var err error
128		hostNetwork, err = strconv.ParseBool(v)
129		if err != nil {
130			return nil, fmt.Errorf("discover-k8s: host_network must be boolean value: %s", err)
131		}
132	}
133
134	var addrs []string
135PodLoop:
136	for _, pod := range pods.Items {
137		if pod.Status.Phase != corev1.PodRunning {
138			l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not running: %q",
139				pod.Name, pod.Status.Phase)
140			continue
141		}
142
143		// If there is a Ready condition available, we need that to be true.
144		// If no ready condition is set, then we accept this pod regardless.
145		for _, condition := range pod.Status.Conditions {
146			if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
147				l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not ready state", pod.Name)
148				continue PodLoop
149			}
150		}
151
152		// Get the IP address that we will join.
153		addr := pod.Status.PodIP
154		if hostNetwork {
155			addr = pod.Status.HostIP
156		}
157		if addr == "" {
158			// This can be empty according to the API docs, so we protect that.
159			l.Printf("[DEBUG] discover-k8s: ignoring pod %q, requested IP is empty", pod.Name)
160			continue
161		}
162
163		// We only use the port if it is specified as an annotation. The
164		// annotation value can be a name or a number.
165		if v := pod.Annotations[AnnotationKeyPort]; v != "" {
166			port, err := podPort(&pod, v, hostNetwork)
167			if err != nil {
168				l.Printf("[DEBUG] discover-k8s: ignoring pod %q, error retrieving port: %s",
169					pod.Name, err)
170				continue
171			}
172
173			addr = fmt.Sprintf("%s:%d", addr, port)
174		}
175
176		addrs = append(addrs, addr)
177	}
178
179	return addrs, nil
180}
181
182// podPort extracts the proper port for the address from the given pod
183// for a non-empty annotation.
184//
185// Pre-condition: annotation is non-empty
186func podPort(pod *corev1.Pod, annotation string, host bool) (int32, error) {
187	// First look for a matching port matching the value of the annotation.
188	for _, container := range pod.Spec.Containers {
189		for _, portDef := range container.Ports {
190			if portDef.Name == annotation {
191				if host {
192					// It is possible for HostPort to be zero, if that is the
193					// case then we ignore this port.
194					if portDef.HostPort == 0 {
195						continue
196					}
197
198					return portDef.HostPort, nil
199				}
200
201				return portDef.ContainerPort, nil
202			}
203		}
204	}
205
206	// Otherwise assume that the port is a numeric value.
207	v, err := strconv.ParseInt(annotation, 0, 32)
208	return int32(v), err
209}
210