1// Package k8s provides pod discovery for Kubernetes. 2package k8s 3 4import ( 5 "context" 6 "fmt" 7 "log" 8 "path/filepath" 9 "strconv" 10 11 "github.com/hashicorp/go-multierror" 12 "github.com/mitchellh/go-homedir" 13 corev1 "k8s.io/api/core/v1" 14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 15 "k8s.io/client-go/kubernetes" 16 "k8s.io/client-go/rest" 17 "k8s.io/client-go/tools/clientcmd" 18 19 // Register all known auth mechanisms since we might be authenticating 20 // from anywhere. 21 _ "k8s.io/client-go/plugin/pkg/client/auth" 22) 23 24const ( 25 // AnnotationKeyPort is the annotation name of the field that specifies 26 // the port name or number to append to the address. 27 AnnotationKeyPort = "consul.hashicorp.com/auto-join-port" 28) 29 30type Provider struct{} 31 32func (p *Provider) Help() string { 33 return `Kubernetes (K8S): 34 35 provider: "k8s" 36 kubeconfig: Path to the kubeconfig file. 37 namespace: Namespace to search for pods (defaults to "default"). 38 label_selector: Label selector value to filter pods. 39 field_selector: Field selector value to filter pods. 40 host_network: "true" if pod host IP and ports should be used. 41 42 The kubeconfig file value will be searched in the following locations: 43 44 1. Use path from "kubeconfig" option if provided. 45 2. Use path from KUBECONFIG environment variable. 46 3. Use default path of $HOME/.kube/config 47 48 By default, the Pod IP is used to join. The "host_network" option may 49 be set to use the Host IP. No port is used by default. Pods may set 50 an annotation 'hashicorp/consul-auto-join-port' to a named port or 51 an integer value. If the value matches a named port, that port will 52 be used to join. 53 54 Note that if "host_network" is set to true, then only pods that have 55 a HostIP available will be selected. If a port annotation exists, then 56 the port must be exposed via a HostPort as well, otherwise the pod will 57 be ignored. 58` 59} 60 61func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error) { 62 if args["provider"] != "k8s" { 63 return nil, fmt.Errorf("discover-k8s: invalid provider " + args["provider"]) 64 } 65 66 // Get the configuration. This can come from multiple sources. We first 67 // try kubeconfig it is set directly, then we fall back to in-cluster 68 // auth. Finally, we try the default kubeconfig path. 69 kubeconfig := args["kubeconfig"] 70 if kubeconfig == "" { 71 // If kubeconfig is empty, let's first try the default directory. 72 // This is must faster than trying in-cluster auth so we try this 73 // first. 74 dir, err := homedir.Dir() 75 if err != nil { 76 return nil, fmt.Errorf("discover-k8s: error retrieving home directory: %s", err) 77 } 78 kubeconfig = filepath.Join(dir, ".kube", "config") 79 } 80 81 // First try to get the configuration from the kubeconfig value 82 config, configErr := clientcmd.BuildConfigFromFlags("", kubeconfig) 83 if configErr != nil { 84 configErr = fmt.Errorf("discover-k8s: error loading kubeconfig: %s", configErr) 85 86 // kubeconfig failed, fall back and try in-cluster config. We do 87 // this as the fallback since this makes network connections and 88 // is much slower to fail. 89 var err error 90 config, err = rest.InClusterConfig() 91 if err != nil { 92 return nil, multierror.Append(configErr, fmt.Errorf( 93 "discover-k8s: error loading in-cluster config: %s", err)) 94 } 95 } 96 97 // Initialize the clientset 98 clientset, err := kubernetes.NewForConfig(config) 99 if err != nil { 100 return nil, fmt.Errorf("discover-k8s: error initializing k8s client: %s", err) 101 } 102 103 namespace := args["namespace"] 104 if namespace == "" { 105 namespace = "default" 106 } 107 108 // List all the pods based on the filters we requested 109 pods, err := clientset.CoreV1().Pods(namespace).List( 110 context.Background(), 111 metav1.ListOptions{ 112 LabelSelector: args["label_selector"], 113 FieldSelector: args["field_selector"], 114 }) 115 if err != nil { 116 return nil, fmt.Errorf("discover-k8s: error listing pods: %s", err) 117 } 118 119 return PodAddrs(pods, args, l) 120} 121 122// PodAddrs extracts the addresses from a list of pods. 123// 124// This is a separate method so that we can unit test this without having 125// to setup complicated K8S cluster scenarios. It shouldn't generally be 126// called externally. 127func PodAddrs(pods *corev1.PodList, args map[string]string, l *log.Logger) ([]string, error) { 128 hostNetwork := false 129 if v := args["host_network"]; v != "" { 130 var err error 131 hostNetwork, err = strconv.ParseBool(v) 132 if err != nil { 133 return nil, fmt.Errorf("discover-k8s: host_network must be boolean value: %s", err) 134 } 135 } 136 137 var addrs []string 138PodLoop: 139 for _, pod := range pods.Items { 140 if pod.Status.Phase != corev1.PodRunning { 141 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not running: %q", 142 pod.Name, pod.Status.Phase) 143 continue 144 } 145 146 // If there is a Ready condition available, we need that to be true. 147 // If no ready condition is set, then we accept this pod regardless. 148 for _, condition := range pod.Status.Conditions { 149 if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue { 150 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not ready state", pod.Name) 151 continue PodLoop 152 } 153 } 154 155 // Get the IP address that we will join. 156 addr := pod.Status.PodIP 157 if hostNetwork { 158 addr = pod.Status.HostIP 159 } 160 if addr == "" { 161 // This can be empty according to the API docs, so we protect that. 162 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, requested IP is empty", pod.Name) 163 continue 164 } 165 166 // We only use the port if it is specified as an annotation. The 167 // annotation value can be a name or a number. 168 if v := pod.Annotations[AnnotationKeyPort]; v != "" { 169 port, err := podPort(&pod, v, hostNetwork) 170 if err != nil { 171 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, error retrieving port: %s", 172 pod.Name, err) 173 continue 174 } 175 176 addr = fmt.Sprintf("%s:%d", addr, port) 177 } 178 179 addrs = append(addrs, addr) 180 } 181 182 return addrs, nil 183} 184 185// podPort extracts the proper port for the address from the given pod 186// for a non-empty annotation. 187// 188// Pre-condition: annotation is non-empty 189func podPort(pod *corev1.Pod, annotation string, host bool) (int32, error) { 190 // First look for a matching port matching the value of the annotation. 191 for _, container := range pod.Spec.Containers { 192 for _, portDef := range container.Ports { 193 if portDef.Name == annotation { 194 if host { 195 // It is possible for HostPort to be zero, if that is the 196 // case then we ignore this port. 197 if portDef.HostPort == 0 { 198 continue 199 } 200 201 return portDef.HostPort, nil 202 } 203 204 return portDef.ContainerPort, nil 205 } 206 } 207 } 208 209 // Otherwise assume that the port is a numeric value. 210 v, err := strconv.ParseInt(annotation, 0, 32) 211 return int32(v), err 212} 213