1// Package k8s provides pod discovery for Kubernetes.
2package k8s
3
4import (
5	"context"
6	"fmt"
7	"log"
8	"path/filepath"
9	"strconv"
10
11	"github.com/hashicorp/go-multierror"
12	"github.com/mitchellh/go-homedir"
13	corev1 "k8s.io/api/core/v1"
14	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15	"k8s.io/client-go/kubernetes"
16	"k8s.io/client-go/rest"
17	"k8s.io/client-go/tools/clientcmd"
18
19	// Register all known auth mechanisms since we might be authenticating
20	// from anywhere.
21	_ "k8s.io/client-go/plugin/pkg/client/auth"
22)
23
24const (
25	// AnnotationKeyPort is the annotation name of the field that specifies
26	// the port name or number to append to the address.
27	AnnotationKeyPort = "consul.hashicorp.com/auto-join-port"
28)
29
30type Provider struct{}
31
32func (p *Provider) Help() string {
33	return `Kubernetes (K8S):
34
35    provider:         "k8s"
36    kubeconfig:       Path to the kubeconfig file.
37    namespace:        Namespace to search for pods (defaults to "default").
38    label_selector:   Label selector value to filter pods.
39    field_selector:   Field selector value to filter pods.
40    host_network:     "true" if pod host IP and ports should be used.
41
42    The kubeconfig file value will be searched in the following locations:
43
44     1. Use path from "kubeconfig" option if provided.
45     2. Use path from KUBECONFIG environment variable.
46     3. Use default path of $HOME/.kube/config
47
48    By default, the Pod IP is used to join. The "host_network" option may
49    be set to use the Host IP. No port is used by default. Pods may set
50    an annotation 'hashicorp/consul-auto-join-port' to a named port or
51    an integer value. If the value matches a named port, that port will
52    be used to join.
53
54    Note that if "host_network" is set to true, then only pods that have
55    a HostIP available will be selected. If a port annotation exists, then
56    the port must be exposed via a HostPort as well, otherwise the pod will
57    be ignored.
58`
59}
60
61func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error) {
62	if args["provider"] != "k8s" {
63		return nil, fmt.Errorf("discover-k8s: invalid provider " + args["provider"])
64	}
65
66	// Get the configuration. This can come from multiple sources. We first
67	// try kubeconfig it is set directly, then we fall back to in-cluster
68	// auth. Finally, we try the default kubeconfig path.
69	kubeconfig := args["kubeconfig"]
70	if kubeconfig == "" {
71		// If kubeconfig is empty, let's first try the default directory.
72		// This is must faster than trying in-cluster auth so we try this
73		// first.
74		dir, err := homedir.Dir()
75		if err != nil {
76			return nil, fmt.Errorf("discover-k8s: error retrieving home directory: %s", err)
77		}
78		kubeconfig = filepath.Join(dir, ".kube", "config")
79	}
80
81	// First try to get the configuration from the kubeconfig value
82	config, configErr := clientcmd.BuildConfigFromFlags("", kubeconfig)
83	if configErr != nil {
84		configErr = fmt.Errorf("discover-k8s: error loading kubeconfig: %s", configErr)
85
86		// kubeconfig failed, fall back and try in-cluster config. We do
87		// this as the fallback since this makes network connections and
88		// is much slower to fail.
89		var err error
90		config, err = rest.InClusterConfig()
91		if err != nil {
92			return nil, multierror.Append(configErr, fmt.Errorf(
93				"discover-k8s: error loading in-cluster config: %s", err))
94		}
95	}
96
97	// Initialize the clientset
98	clientset, err := kubernetes.NewForConfig(config)
99	if err != nil {
100		return nil, fmt.Errorf("discover-k8s: error initializing k8s client: %s", err)
101	}
102
103	namespace := args["namespace"]
104	if namespace == "" {
105		namespace = "default"
106	}
107
108	// List all the pods based on the filters we requested
109	pods, err := clientset.CoreV1().Pods(namespace).List(
110		context.Background(),
111		metav1.ListOptions{
112			LabelSelector: args["label_selector"],
113			FieldSelector: args["field_selector"],
114		})
115	if err != nil {
116		return nil, fmt.Errorf("discover-k8s: error listing pods: %s", err)
117	}
118
119	return PodAddrs(pods, args, l)
120}
121
122// PodAddrs extracts the addresses from a list of pods.
123//
124// This is a separate method so that we can unit test this without having
125// to setup complicated K8S cluster scenarios. It shouldn't generally be
126// called externally.
127func PodAddrs(pods *corev1.PodList, args map[string]string, l *log.Logger) ([]string, error) {
128	hostNetwork := false
129	if v := args["host_network"]; v != "" {
130		var err error
131		hostNetwork, err = strconv.ParseBool(v)
132		if err != nil {
133			return nil, fmt.Errorf("discover-k8s: host_network must be boolean value: %s", err)
134		}
135	}
136
137	var addrs []string
138PodLoop:
139	for _, pod := range pods.Items {
140		if pod.Status.Phase != corev1.PodRunning {
141			l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not running: %q",
142				pod.Name, pod.Status.Phase)
143			continue
144		}
145
146		// If there is a Ready condition available, we need that to be true.
147		// If no ready condition is set, then we accept this pod regardless.
148		for _, condition := range pod.Status.Conditions {
149			if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
150				l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not ready state", pod.Name)
151				continue PodLoop
152			}
153		}
154
155		// Get the IP address that we will join.
156		addr := pod.Status.PodIP
157		if hostNetwork {
158			addr = pod.Status.HostIP
159		}
160		if addr == "" {
161			// This can be empty according to the API docs, so we protect that.
162			l.Printf("[DEBUG] discover-k8s: ignoring pod %q, requested IP is empty", pod.Name)
163			continue
164		}
165
166		// We only use the port if it is specified as an annotation. The
167		// annotation value can be a name or a number.
168		if v := pod.Annotations[AnnotationKeyPort]; v != "" {
169			port, err := podPort(&pod, v, hostNetwork)
170			if err != nil {
171				l.Printf("[DEBUG] discover-k8s: ignoring pod %q, error retrieving port: %s",
172					pod.Name, err)
173				continue
174			}
175
176			addr = fmt.Sprintf("%s:%d", addr, port)
177		}
178
179		addrs = append(addrs, addr)
180	}
181
182	return addrs, nil
183}
184
185// podPort extracts the proper port for the address from the given pod
186// for a non-empty annotation.
187//
188// Pre-condition: annotation is non-empty
189func podPort(pod *corev1.Pod, annotation string, host bool) (int32, error) {
190	// First look for a matching port matching the value of the annotation.
191	for _, container := range pod.Spec.Containers {
192		for _, portDef := range container.Ports {
193			if portDef.Name == annotation {
194				if host {
195					// It is possible for HostPort to be zero, if that is the
196					// case then we ignore this port.
197					if portDef.HostPort == 0 {
198						continue
199					}
200
201					return portDef.HostPort, nil
202				}
203
204				return portDef.ContainerPort, nil
205			}
206		}
207	}
208
209	// Otherwise assume that the port is a numeric value.
210	v, err := strconv.ParseInt(annotation, 0, 32)
211	return int32(v), err
212}
213