1package client
2
3import (
4	"bytes"
5	"context"
6	"crypto/tls"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"net/http"
11	"time"
12
13	"github.com/hashicorp/go-cleanhttp"
14	"github.com/hashicorp/go-hclog"
15	"github.com/hashicorp/go-retryablehttp"
16)
17
18var (
19	// Retry configuration
20	RetryWaitMin = 500 * time.Millisecond
21	RetryWaitMax = 30 * time.Second
22	RetryMax     = 10
23
24	// Standard errs
25	ErrNamespaceUnset = errors.New(`"namespace" is unset`)
26	ErrPodNameUnset   = errors.New(`"podName" is unset`)
27	ErrNotInCluster   = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")
28)
29
30// New instantiates a Client. The stopCh is used for exiting retry loops
31// when closed.
32func New(logger hclog.Logger, stopCh <-chan struct{}) (*Client, error) {
33	config, err := inClusterConfig()
34	if err != nil {
35		return nil, err
36	}
37	return &Client{
38		logger: logger,
39		config: config,
40		stopCh: stopCh,
41	}, nil
42}
43
44// Client is a minimal Kubernetes client. We rolled our own because the existing
45// Kubernetes client-go library available externally has a high number of dependencies
46// and we thought it wasn't worth it for only two API calls. If at some point they break
47// the client into smaller modules, or if we add quite a few methods to this client, it may
48// be worthwhile to revisit that decision.
49type Client struct {
50	logger hclog.Logger
51	config *Config
52	stopCh <-chan struct{}
53}
54
55// GetPod gets a pod from the Kubernetes API.
56func (c *Client) GetPod(namespace, podName string) (*Pod, error) {
57	endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName)
58	method := http.MethodGet
59
60	// Validate that we received required parameters.
61	if namespace == "" {
62		return nil, ErrNamespaceUnset
63	}
64	if podName == "" {
65		return nil, ErrPodNameUnset
66	}
67
68	req, err := http.NewRequest(method, c.config.Host+endpoint, nil)
69	if err != nil {
70		return nil, err
71	}
72	pod := &Pod{}
73	if err := c.do(req, pod); err != nil {
74		return nil, err
75	}
76	return pod, nil
77}
78
79// PatchPod updates the pod's tags to the given ones.
80// It does so non-destructively, or in other words, without tearing down
81// the pod.
82func (c *Client) PatchPod(namespace, podName string, patches ...*Patch) error {
83	endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName)
84	method := http.MethodPatch
85
86	// Validate that we received required parameters.
87	if namespace == "" {
88		return ErrNamespaceUnset
89	}
90	if podName == "" {
91		return ErrPodNameUnset
92	}
93	if len(patches) == 0 {
94		// No work to perform.
95		return nil
96	}
97
98	var jsonPatches []map[string]interface{}
99	for _, patch := range patches {
100		if patch.Operation == Unset {
101			return errors.New("patch operation must be set")
102		}
103		jsonPatches = append(jsonPatches, map[string]interface{}{
104			"op":    patch.Operation,
105			"path":  patch.Path,
106			"value": patch.Value,
107		})
108	}
109	body, err := json.Marshal(jsonPatches)
110	if err != nil {
111		return err
112	}
113	req, err := http.NewRequest(method, c.config.Host+endpoint, bytes.NewReader(body))
114	if err != nil {
115		return err
116	}
117	req.Header.Set("Content-Type", "application/json-patch+json")
118	return c.do(req, nil)
119}
120
121// do executes the given request, retrying if necessary.
122func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error {
123	// Finish setting up a valid request.
124	retryableReq, err := retryablehttp.FromRequest(req)
125	if err != nil {
126		return err
127	}
128
129	// Build a context that will call the cancelFunc when we receive
130	// a stop from our stopChan. This allows us to exit from our retry
131	// loop during a shutdown, rather than hanging.
132	ctx, cancelFunc := context.WithCancel(context.Background())
133	go func(stopCh <-chan struct{}) {
134		<-stopCh
135		cancelFunc()
136	}(c.stopCh)
137	retryableReq.WithContext(ctx)
138
139	retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken)
140	retryableReq.Header.Set("Accept", "application/json")
141
142	client := &retryablehttp.Client{
143		HTTPClient:   cleanhttp.DefaultClient(),
144		RetryWaitMin: RetryWaitMin,
145		RetryWaitMax: RetryWaitMax,
146		RetryMax:     RetryMax,
147		CheckRetry:   c.getCheckRetry(req),
148		Backoff:      retryablehttp.DefaultBackoff,
149	}
150	client.HTTPClient.Transport = &http.Transport{
151		TLSClientConfig: &tls.Config{
152			RootCAs: c.config.CACertPool,
153		},
154	}
155
156	// Execute and retry the request. This client comes with exponential backoff and
157	// jitter already rolled in.
158	resp, err := client.Do(retryableReq)
159	if err != nil {
160		return err
161	}
162	defer func() {
163		if err := resp.Body.Close(); err != nil {
164			if c.logger.IsWarn() {
165				// Failing to close response bodies can present as a memory leak so it's
166				// important to surface it.
167				c.logger.Warn(fmt.Sprintf("unable to close response body: %s", err))
168			}
169		}
170	}()
171
172	// If we're not supposed to read out the body, we have nothing further
173	// to do here.
174	if ptrToReturnObj == nil {
175		return nil
176	}
177
178	// Attempt to read out the body into the given return object.
179	return json.NewDecoder(resp.Body).Decode(ptrToReturnObj)
180}
181
182func (c *Client) getCheckRetry(req *http.Request) retryablehttp.CheckRetry {
183	return func(ctx context.Context, resp *http.Response, err error) (bool, error) {
184		if resp == nil {
185			return true, fmt.Errorf("nil response: %s", req.URL.RequestURI())
186		}
187		switch resp.StatusCode {
188		case 200, 201, 202, 204:
189			// Success.
190			return false, nil
191		case 401, 403:
192			// Perhaps the token from our bearer token file has been refreshed.
193			config, err := inClusterConfig()
194			if err != nil {
195				return false, err
196			}
197			if config.BearerToken == c.config.BearerToken {
198				// It's the same token.
199				return false, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode))
200			}
201			c.config = config
202			// Continue to try again, but return the error too in case the caller would rather read it out.
203			return true, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode))
204		case 404:
205			return false, &ErrNotFound{debuggingInfo: sanitizedDebuggingInfo(req, resp.StatusCode)}
206		case 500, 502, 503, 504:
207			// Could be transient.
208			return true, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode))
209		}
210		// Unexpected.
211		return false, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode))
212	}
213}
214
215type Pod struct {
216	Metadata *Metadata `json:"metadata,omitempty"`
217}
218
219type Metadata struct {
220	Name string `json:"name,omitempty"`
221
222	// This map will be nil if no "labels" key was provided.
223	// It will be populated but have a length of zero if the
224	// key was provided, but no values.
225	Labels map[string]string `json:"labels,omitempty"`
226}
227
228type PatchOperation string
229
230const (
231	Unset   PatchOperation = "unset"
232	Add                    = "add"
233	Replace                = "replace"
234)
235
236type Patch struct {
237	Operation PatchOperation
238	Path      string
239	Value     interface{}
240}
241
242type ErrNotFound struct {
243	debuggingInfo string
244}
245
246func (e *ErrNotFound) Error() string {
247	return e.debuggingInfo
248}
249
250// sanitizedDebuggingInfo provides a returnable string that can be used for debugging. This is intentionally somewhat vague
251// because we don't want to leak secrets that may be in a request or response body.
252func sanitizedDebuggingInfo(req *http.Request, respStatus int) string {
253	return fmt.Sprintf("req method: %s, req url: %s, resp statuscode: %d", req.Method, req.URL, respStatus)
254}
255