1// Package k8s provides pod discovery for Kubernetes. 2package k8s 3 4import ( 5 "fmt" 6 "log" 7 "path/filepath" 8 "strconv" 9 10 "github.com/hashicorp/go-multierror" 11 "github.com/mitchellh/go-homedir" 12 corev1 "k8s.io/api/core/v1" 13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 14 "k8s.io/client-go/kubernetes" 15 "k8s.io/client-go/rest" 16 "k8s.io/client-go/tools/clientcmd" 17 18 // Register all known auth mechanisms since we might be authenticating 19 // from anywhere. 20 _ "k8s.io/client-go/plugin/pkg/client/auth" 21) 22 23const ( 24 // AnnotationKeyPort is the annotation name of the field that specifies 25 // the port name or number to append to the address. 26 AnnotationKeyPort = "consul.hashicorp.com/auto-join-port" 27) 28 29type Provider struct{} 30 31func (p *Provider) Help() string { 32 return `Kubernetes (K8S): 33 34 provider: "k8s" 35 kubeconfig: Path to the kubeconfig file. 36 namespace: Namespace to search for pods (defaults to "default"). 37 label_selector: Label selector value to filter pods. 38 field_selector: Field selector value to filter pods. 39 host_network: "true" if pod host IP and ports should be used. 40 41 The kubeconfig file value will be searched in the following locations: 42 43 1. Use path from "kubeconfig" option if provided. 44 2. Use path from KUBECONFIG environment variable. 45 3. Use default path of $HOME/.kube/config 46 47 By default, the Pod IP is used to join. The "host_network" option may 48 be set to use the Host IP. No port is used by default. Pods may set 49 an annotation 'hashicorp/consul-auto-join-port' to a named port or 50 an integer value. If the value matches a named port, that port will 51 be used to join. 52 53 Note that if "host_network" is set to true, then only pods that have 54 a HostIP available will be selected. If a port annotation exists, then 55 the port must be exposed via a HostPort as well, otherwise the pod will 56 be ignored. 57` 58} 59 60func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error) { 61 if args["provider"] != "k8s" { 62 return nil, fmt.Errorf("discover-k8s: invalid provider " + args["provider"]) 63 } 64 65 // Get the configuration. This can come from multiple sources. We first 66 // try kubeconfig it is set directly, then we fall back to in-cluster 67 // auth. Finally, we try the default kubeconfig path. 68 kubeconfig := args["kubeconfig"] 69 if kubeconfig == "" { 70 // If kubeconfig is empty, let's first try the default directory. 71 // This is must faster than trying in-cluster auth so we try this 72 // first. 73 dir, err := homedir.Dir() 74 if err != nil { 75 return nil, fmt.Errorf("discover-k8s: error retrieving home directory: %s", err) 76 } 77 kubeconfig = filepath.Join(dir, ".kube", "config") 78 } 79 80 // First try to get the configuration from the kubeconfig value 81 config, configErr := clientcmd.BuildConfigFromFlags("", kubeconfig) 82 if configErr != nil { 83 configErr = fmt.Errorf("discover-k8s: error loading kubeconfig: %s", configErr) 84 85 // kubeconfig failed, fall back and try in-cluster config. We do 86 // this as the fallback since this makes network connections and 87 // is much slower to fail. 88 var err error 89 config, err = rest.InClusterConfig() 90 if err != nil { 91 return nil, multierror.Append(configErr, fmt.Errorf( 92 "discover-k8s: error loading in-cluster config: %s", err)) 93 } 94 } 95 96 // Initialize the clientset 97 clientset, err := kubernetes.NewForConfig(config) 98 if err != nil { 99 return nil, fmt.Errorf("discover-k8s: error initializing k8s client: %s", err) 100 } 101 102 namespace := args["namespace"] 103 if namespace == "" { 104 namespace = "default" 105 } 106 107 // List all the pods based on the filters we requested 108 pods, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{ 109 LabelSelector: args["label_selector"], 110 FieldSelector: args["field_selector"], 111 }) 112 if err != nil { 113 return nil, fmt.Errorf("discover-k8s: error listing pods: %s", err) 114 } 115 116 return PodAddrs(pods, args, l) 117} 118 119// PodAddrs extracts the addresses from a list of pods. 120// 121// This is a separate method so that we can unit test this without having 122// to setup complicated K8S cluster scenarios. It shouldn't generally be 123// called externally. 124func PodAddrs(pods *corev1.PodList, args map[string]string, l *log.Logger) ([]string, error) { 125 hostNetwork := false 126 if v := args["host_network"]; v != "" { 127 var err error 128 hostNetwork, err = strconv.ParseBool(v) 129 if err != nil { 130 return nil, fmt.Errorf("discover-k8s: host_network must be boolean value: %s", err) 131 } 132 } 133 134 var addrs []string 135PodLoop: 136 for _, pod := range pods.Items { 137 if pod.Status.Phase != corev1.PodRunning { 138 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not running: %q", 139 pod.Name, pod.Status.Phase) 140 continue 141 } 142 143 // If there is a Ready condition available, we need that to be true. 144 // If no ready condition is set, then we accept this pod regardless. 145 for _, condition := range pod.Status.Conditions { 146 if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue { 147 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not ready state", pod.Name) 148 continue PodLoop 149 } 150 } 151 152 // Get the IP address that we will join. 153 addr := pod.Status.PodIP 154 if hostNetwork { 155 addr = pod.Status.HostIP 156 } 157 if addr == "" { 158 // This can be empty according to the API docs, so we protect that. 159 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, requested IP is empty", pod.Name) 160 continue 161 } 162 163 // We only use the port if it is specified as an annotation. The 164 // annotation value can be a name or a number. 165 if v := pod.Annotations[AnnotationKeyPort]; v != "" { 166 port, err := podPort(&pod, v, hostNetwork) 167 if err != nil { 168 l.Printf("[DEBUG] discover-k8s: ignoring pod %q, error retrieving port: %s", 169 pod.Name, err) 170 continue 171 } 172 173 addr = fmt.Sprintf("%s:%d", addr, port) 174 } 175 176 addrs = append(addrs, addr) 177 } 178 179 return addrs, nil 180} 181 182// podPort extracts the proper port for the address from the given pod 183// for a non-empty annotation. 184// 185// Pre-condition: annotation is non-empty 186func podPort(pod *corev1.Pod, annotation string, host bool) (int32, error) { 187 // First look for a matching port matching the value of the annotation. 188 for _, container := range pod.Spec.Containers { 189 for _, portDef := range container.Ports { 190 if portDef.Name == annotation { 191 if host { 192 // It is possible for HostPort to be zero, if that is the 193 // case then we ignore this port. 194 if portDef.HostPort == 0 { 195 continue 196 } 197 198 return portDef.HostPort, nil 199 } 200 201 return portDef.ContainerPort, nil 202 } 203 } 204 } 205 206 // Otherwise assume that the port is a numeric value. 207 v, err := strconv.ParseInt(annotation, 0, 32) 208 return int32(v), err 209} 210