1package client 2 3import ( 4 "bytes" 5 "context" 6 "crypto/tls" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "net/http" 11 "time" 12 13 "github.com/hashicorp/go-cleanhttp" 14 "github.com/hashicorp/go-hclog" 15 "github.com/hashicorp/go-retryablehttp" 16) 17 18var ( 19 // Retry configuration 20 RetryWaitMin = 500 * time.Millisecond 21 RetryWaitMax = 30 * time.Second 22 RetryMax = 10 23 24 // Standard errs 25 ErrNamespaceUnset = errors.New(`"namespace" is unset`) 26 ErrPodNameUnset = errors.New(`"podName" is unset`) 27 ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") 28) 29 30// New instantiates a Client. The stopCh is used for exiting retry loops 31// when closed. 32func New(logger hclog.Logger, stopCh <-chan struct{}) (*Client, error) { 33 config, err := inClusterConfig() 34 if err != nil { 35 return nil, err 36 } 37 return &Client{ 38 logger: logger, 39 config: config, 40 stopCh: stopCh, 41 }, nil 42} 43 44// Client is a minimal Kubernetes client. We rolled our own because the existing 45// Kubernetes client-go library available externally has a high number of dependencies 46// and we thought it wasn't worth it for only two API calls. If at some point they break 47// the client into smaller modules, or if we add quite a few methods to this client, it may 48// be worthwhile to revisit that decision. 49type Client struct { 50 logger hclog.Logger 51 config *Config 52 stopCh <-chan struct{} 53} 54 55// GetPod gets a pod from the Kubernetes API. 56func (c *Client) GetPod(namespace, podName string) (*Pod, error) { 57 endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) 58 method := http.MethodGet 59 60 // Validate that we received required parameters. 61 if namespace == "" { 62 return nil, ErrNamespaceUnset 63 } 64 if podName == "" { 65 return nil, ErrPodNameUnset 66 } 67 68 req, err := http.NewRequest(method, c.config.Host+endpoint, nil) 69 if err != nil { 70 return nil, err 71 } 72 pod := &Pod{} 73 if err := c.do(req, pod); err != nil { 74 return nil, err 75 } 76 return pod, nil 77} 78 79// PatchPod updates the pod's tags to the given ones. 80// It does so non-destructively, or in other words, without tearing down 81// the pod. 82func (c *Client) PatchPod(namespace, podName string, patches ...*Patch) error { 83 endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) 84 method := http.MethodPatch 85 86 // Validate that we received required parameters. 87 if namespace == "" { 88 return ErrNamespaceUnset 89 } 90 if podName == "" { 91 return ErrPodNameUnset 92 } 93 if len(patches) == 0 { 94 // No work to perform. 95 return nil 96 } 97 98 var jsonPatches []map[string]interface{} 99 for _, patch := range patches { 100 if patch.Operation == Unset { 101 return errors.New("patch operation must be set") 102 } 103 jsonPatches = append(jsonPatches, map[string]interface{}{ 104 "op": patch.Operation, 105 "path": patch.Path, 106 "value": patch.Value, 107 }) 108 } 109 body, err := json.Marshal(jsonPatches) 110 if err != nil { 111 return err 112 } 113 req, err := http.NewRequest(method, c.config.Host+endpoint, bytes.NewReader(body)) 114 if err != nil { 115 return err 116 } 117 req.Header.Set("Content-Type", "application/json-patch+json") 118 return c.do(req, nil) 119} 120 121// do executes the given request, retrying if necessary. 122func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error { 123 // Finish setting up a valid request. 124 retryableReq, err := retryablehttp.FromRequest(req) 125 if err != nil { 126 return err 127 } 128 129 // Build a context that will call the cancelFunc when we receive 130 // a stop from our stopChan. This allows us to exit from our retry 131 // loop during a shutdown, rather than hanging. 132 ctx, cancelFunc := context.WithCancel(context.Background()) 133 go func(stopCh <-chan struct{}) { 134 <-stopCh 135 cancelFunc() 136 }(c.stopCh) 137 retryableReq.WithContext(ctx) 138 139 retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken) 140 retryableReq.Header.Set("Accept", "application/json") 141 142 client := &retryablehttp.Client{ 143 HTTPClient: cleanhttp.DefaultClient(), 144 RetryWaitMin: RetryWaitMin, 145 RetryWaitMax: RetryWaitMax, 146 RetryMax: RetryMax, 147 CheckRetry: c.getCheckRetry(req), 148 Backoff: retryablehttp.DefaultBackoff, 149 } 150 client.HTTPClient.Transport = &http.Transport{ 151 TLSClientConfig: &tls.Config{ 152 RootCAs: c.config.CACertPool, 153 }, 154 } 155 156 // Execute and retry the request. This client comes with exponential backoff and 157 // jitter already rolled in. 158 resp, err := client.Do(retryableReq) 159 if err != nil { 160 return err 161 } 162 defer func() { 163 if err := resp.Body.Close(); err != nil { 164 if c.logger.IsWarn() { 165 // Failing to close response bodies can present as a memory leak so it's 166 // important to surface it. 167 c.logger.Warn(fmt.Sprintf("unable to close response body: %s", err)) 168 } 169 } 170 }() 171 172 // If we're not supposed to read out the body, we have nothing further 173 // to do here. 174 if ptrToReturnObj == nil { 175 return nil 176 } 177 178 // Attempt to read out the body into the given return object. 179 return json.NewDecoder(resp.Body).Decode(ptrToReturnObj) 180} 181 182func (c *Client) getCheckRetry(req *http.Request) retryablehttp.CheckRetry { 183 return func(ctx context.Context, resp *http.Response, err error) (bool, error) { 184 if resp == nil { 185 return true, fmt.Errorf("nil response: %s", req.URL.RequestURI()) 186 } 187 switch resp.StatusCode { 188 case 200, 201, 202, 204: 189 // Success. 190 return false, nil 191 case 401, 403: 192 // Perhaps the token from our bearer token file has been refreshed. 193 config, err := inClusterConfig() 194 if err != nil { 195 return false, err 196 } 197 if config.BearerToken == c.config.BearerToken { 198 // It's the same token. 199 return false, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) 200 } 201 c.config = config 202 // Continue to try again, but return the error too in case the caller would rather read it out. 203 return true, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) 204 case 404: 205 return false, &ErrNotFound{debuggingInfo: sanitizedDebuggingInfo(req, resp.StatusCode)} 206 case 500, 502, 503, 504: 207 // Could be transient. 208 return true, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) 209 } 210 // Unexpected. 211 return false, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) 212 } 213} 214 215type Pod struct { 216 Metadata *Metadata `json:"metadata,omitempty"` 217} 218 219type Metadata struct { 220 Name string `json:"name,omitempty"` 221 222 // This map will be nil if no "labels" key was provided. 223 // It will be populated but have a length of zero if the 224 // key was provided, but no values. 225 Labels map[string]string `json:"labels,omitempty"` 226} 227 228type PatchOperation string 229 230const ( 231 Unset PatchOperation = "unset" 232 Add = "add" 233 Replace = "replace" 234) 235 236type Patch struct { 237 Operation PatchOperation 238 Path string 239 Value interface{} 240} 241 242type ErrNotFound struct { 243 debuggingInfo string 244} 245 246func (e *ErrNotFound) Error() string { 247 return e.debuggingInfo 248} 249 250// sanitizedDebuggingInfo provides a returnable string that can be used for debugging. This is intentionally somewhat vague 251// because we don't want to leak secrets that may be in a request or response body. 252func sanitizedDebuggingInfo(req *http.Request, respStatus int) string { 253 return fmt.Sprintf("req method: %s, req url: %s, resp statuscode: %d", req.Method, req.URL, respStatus) 254} 255