1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package responsewriters
18
19import (
20	"bytes"
21	"compress/gzip"
22	"encoding/hex"
23	"encoding/json"
24	"errors"
25	"fmt"
26	"io"
27	"io/ioutil"
28	"math/rand"
29	"net/http"
30	"net/http/httptest"
31	"net/url"
32	"os"
33	"reflect"
34	"strconv"
35	"testing"
36	"time"
37
38	v1 "k8s.io/api/core/v1"
39	kerrors "k8s.io/apimachinery/pkg/api/errors"
40	"k8s.io/apimachinery/pkg/runtime"
41	"k8s.io/apimachinery/pkg/runtime/schema"
42	"k8s.io/apimachinery/pkg/util/diff"
43	"k8s.io/apimachinery/pkg/util/uuid"
44	"k8s.io/apiserver/pkg/features"
45	utilfeature "k8s.io/apiserver/pkg/util/feature"
46	featuregatetesting "k8s.io/component-base/featuregate/testing"
47)
48
49const benchmarkSeed = 100
50
51func TestSerializeObjectParallel(t *testing.T) {
52	largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
53	type test struct {
54		name string
55
56		compressionEnabled bool
57
58		mediaType  string
59		out        []byte
60		outErrs    []error
61		req        *http.Request
62		statusCode int
63		object     runtime.Object
64
65		wantCode    int
66		wantHeaders http.Header
67	}
68	newTest := func() test {
69		return test{
70			name:               "compress on gzip",
71			compressionEnabled: true,
72			out:                largePayload,
73			mediaType:          "application/json",
74			req: &http.Request{
75				Header: http.Header{
76					"Accept-Encoding": []string{"gzip"},
77				},
78				URL: &url.URL{Path: "/path"},
79			},
80			wantCode: http.StatusOK,
81			wantHeaders: http.Header{
82				"Content-Type":     []string{"application/json"},
83				"Content-Encoding": []string{"gzip"},
84				"Vary":             []string{"Accept-Encoding"},
85			},
86		}
87	}
88	for i := 0; i < 100; i++ {
89		ctt := newTest()
90		t.Run(ctt.name, func(t *testing.T) {
91			defer func() {
92				if r := recover(); r != nil {
93					t.Fatalf("recovered from err %v", r)
94				}
95			}()
96			t.Parallel()
97			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, ctt.compressionEnabled)()
98
99			encoder := &fakeEncoder{
100				buf:  ctt.out,
101				errs: ctt.outErrs,
102			}
103			if ctt.statusCode == 0 {
104				ctt.statusCode = http.StatusOK
105			}
106			recorder := &fakeResponseRecorder{
107				ResponseRecorder:   httptest.NewRecorder(),
108				fe:                 encoder,
109				errorAfterEncoding: true,
110			}
111			SerializeObject(ctt.mediaType, encoder, recorder, ctt.req, ctt.statusCode, ctt.object)
112			result := recorder.Result()
113			if result.StatusCode != ctt.wantCode {
114				t.Fatalf("unexpected code: %v", result.StatusCode)
115			}
116			if !reflect.DeepEqual(result.Header, ctt.wantHeaders) {
117				t.Fatal(diff.ObjectReflectDiff(ctt.wantHeaders, result.Header))
118			}
119		})
120	}
121}
122
123func TestSerializeObject(t *testing.T) {
124	smallPayload := []byte("{test-object,test-object}")
125	largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
126	tests := []struct {
127		name string
128
129		compressionEnabled bool
130
131		mediaType  string
132		out        []byte
133		outErrs    []error
134		req        *http.Request
135		statusCode int
136		object     runtime.Object
137
138		wantCode    int
139		wantHeaders http.Header
140		wantBody    []byte
141	}{
142		{
143			name:        "serialize object",
144			out:         smallPayload,
145			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
146			wantCode:    http.StatusOK,
147			wantHeaders: http.Header{"Content-Type": []string{""}},
148			wantBody:    smallPayload,
149		},
150
151		{
152			name:        "return content type",
153			out:         smallPayload,
154			mediaType:   "application/json",
155			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
156			wantCode:    http.StatusOK,
157			wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
158			wantBody:    smallPayload,
159		},
160
161		{
162			name:        "return status code",
163			statusCode:  http.StatusBadRequest,
164			out:         smallPayload,
165			mediaType:   "application/json",
166			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
167			wantCode:    http.StatusBadRequest,
168			wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
169			wantBody:    smallPayload,
170		},
171
172		{
173			name:        "fail to encode object",
174			out:         smallPayload,
175			outErrs:     []error{fmt.Errorf("bad")},
176			mediaType:   "application/json",
177			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
178			wantCode:    http.StatusInternalServerError,
179			wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
180			wantBody:    smallPayload,
181		},
182
183		{
184			name:        "fail to encode object or status",
185			out:         smallPayload,
186			outErrs:     []error{fmt.Errorf("bad"), fmt.Errorf("bad2")},
187			mediaType:   "application/json",
188			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
189			wantCode:    http.StatusInternalServerError,
190			wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
191			wantBody:    []byte(": bad"),
192		},
193
194		{
195			name:        "fail to encode object or status with status code",
196			out:         smallPayload,
197			outErrs:     []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
198			mediaType:   "application/json",
199			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
200			statusCode:  http.StatusOK,
201			wantCode:    http.StatusNotFound,
202			wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
203			wantBody:    []byte("NotFound:  \"test\" not found"),
204		},
205
206		{
207			name:        "fail to encode object or status with status code and keeps previous error",
208			out:         smallPayload,
209			outErrs:     []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
210			mediaType:   "application/json",
211			req:         &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}},
212			statusCode:  http.StatusNotAcceptable,
213			wantCode:    http.StatusNotAcceptable,
214			wantHeaders: http.Header{"Content-Type": []string{"text/plain"}},
215			wantBody:    []byte("NotFound:  \"test\" not found"),
216		},
217
218		{
219			name:      "compression requires feature gate",
220			out:       largePayload,
221			mediaType: "application/json",
222			req: &http.Request{
223				Header: http.Header{
224					"Accept-Encoding": []string{"gzip"},
225				},
226				URL: &url.URL{Path: "/path"},
227			},
228			wantCode:    http.StatusOK,
229			wantHeaders: http.Header{"Content-Type": []string{"application/json"}},
230			wantBody:    largePayload,
231		},
232
233		{
234			name:               "compress on gzip",
235			compressionEnabled: true,
236			out:                largePayload,
237			mediaType:          "application/json",
238			req: &http.Request{
239				Header: http.Header{
240					"Accept-Encoding": []string{"gzip"},
241				},
242				URL: &url.URL{Path: "/path"},
243			},
244			wantCode: http.StatusOK,
245			wantHeaders: http.Header{
246				"Content-Type":     []string{"application/json"},
247				"Content-Encoding": []string{"gzip"},
248				"Vary":             []string{"Accept-Encoding"},
249			},
250			wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
251		},
252
253		{
254			name:               "compression is not performed on small objects",
255			compressionEnabled: true,
256			out:                smallPayload,
257			mediaType:          "application/json",
258			req: &http.Request{
259				Header: http.Header{
260					"Accept-Encoding": []string{"gzip"},
261				},
262				URL: &url.URL{Path: "/path"},
263			},
264			wantCode: http.StatusOK,
265			wantHeaders: http.Header{
266				"Content-Type": []string{"application/json"},
267			},
268			wantBody: smallPayload,
269		},
270
271		{
272			name:               "compress when multiple encodings are requested",
273			compressionEnabled: true,
274			out:                largePayload,
275			mediaType:          "application/json",
276			req: &http.Request{
277				Header: http.Header{
278					"Accept-Encoding": []string{"deflate, , gzip,"},
279				},
280				URL: &url.URL{Path: "/path"},
281			},
282			wantCode: http.StatusOK,
283			wantHeaders: http.Header{
284				"Content-Type":     []string{"application/json"},
285				"Content-Encoding": []string{"gzip"},
286				"Vary":             []string{"Accept-Encoding"},
287			},
288			wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel),
289		},
290
291		{
292			name:               "ignore compression on deflate",
293			compressionEnabled: true,
294			out:                largePayload,
295			mediaType:          "application/json",
296			req: &http.Request{
297				Header: http.Header{
298					"Accept-Encoding": []string{"deflate"},
299				},
300				URL: &url.URL{Path: "/path"},
301			},
302			wantCode: http.StatusOK,
303			wantHeaders: http.Header{
304				"Content-Type": []string{"application/json"},
305			},
306			wantBody: largePayload,
307		},
308
309		{
310			name:               "ignore compression on unrecognized types",
311			compressionEnabled: true,
312			out:                largePayload,
313			mediaType:          "application/json",
314			req: &http.Request{
315				Header: http.Header{
316					"Accept-Encoding": []string{", ,  other, nothing, what, "},
317				},
318				URL: &url.URL{Path: "/path"},
319			},
320			wantCode: http.StatusOK,
321			wantHeaders: http.Header{
322				"Content-Type": []string{"application/json"},
323			},
324			wantBody: largePayload,
325		},
326
327		{
328			name:               "errors are compressed",
329			compressionEnabled: true,
330			statusCode:         http.StatusInternalServerError,
331			out:                smallPayload,
332			outErrs:            []error{fmt.Errorf(string(largePayload)), fmt.Errorf("bad2")},
333			mediaType:          "application/json",
334			req: &http.Request{
335				Header: http.Header{
336					"Accept-Encoding": []string{"gzip"},
337				},
338				URL: &url.URL{Path: "/path"},
339			},
340			wantCode: http.StatusInternalServerError,
341			wantHeaders: http.Header{
342				"Content-Type":     []string{"text/plain"},
343				"Content-Encoding": []string{"gzip"},
344				"Vary":             []string{"Accept-Encoding"},
345			},
346			wantBody: gzipContent([]byte(": "+string(largePayload)), defaultGzipContentEncodingLevel),
347		},
348	}
349	for _, tt := range tests {
350		t.Run(tt.name, func(t *testing.T) {
351			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, tt.compressionEnabled)()
352
353			encoder := &fakeEncoder{
354				buf:  tt.out,
355				errs: tt.outErrs,
356			}
357			if tt.statusCode == 0 {
358				tt.statusCode = http.StatusOK
359			}
360			recorder := httptest.NewRecorder()
361			SerializeObject(tt.mediaType, encoder, recorder, tt.req, tt.statusCode, tt.object)
362			result := recorder.Result()
363			if result.StatusCode != tt.wantCode {
364				t.Fatalf("unexpected code: %v", result.StatusCode)
365			}
366			if !reflect.DeepEqual(result.Header, tt.wantHeaders) {
367				t.Fatal(diff.ObjectReflectDiff(tt.wantHeaders, result.Header))
368			}
369			body, _ := ioutil.ReadAll(result.Body)
370			if !bytes.Equal(tt.wantBody, body) {
371				t.Fatalf("wanted:\n%s\ngot:\n%s", hex.Dump(tt.wantBody), hex.Dump(body))
372			}
373		})
374	}
375}
376
377func randTime(t *time.Time, r *rand.Rand) {
378	*t = time.Unix(r.Int63n(1000*365*24*60*60), r.Int63())
379}
380
381func randIP(s *string, r *rand.Rand) {
382	*s = fmt.Sprintf("10.20.%d.%d", r.Int31n(256), r.Int31n(256))
383}
384
385// randPod changes fields in pod to mimic another pod from the same replicaset.
386// The list fields here has been generated by picking two pods in the same replicaset
387// and checking diff of their jsons.
388func randPod(b *testing.B, pod *v1.Pod, r *rand.Rand) {
389	pod.Name = fmt.Sprintf("%s-%x", pod.GenerateName, r.Int63n(1000))
390	pod.UID = uuid.NewUUID()
391	pod.ResourceVersion = strconv.Itoa(r.Int())
392	pod.Spec.NodeName = fmt.Sprintf("some-node-prefix-%x", r.Int63n(1000))
393
394	randTime(&pod.CreationTimestamp.Time, r)
395	randTime(&pod.Status.StartTime.Time, r)
396	for i := range pod.Status.Conditions {
397		randTime(&pod.Status.Conditions[i].LastTransitionTime.Time, r)
398	}
399	for i := range pod.Status.ContainerStatuses {
400		containerStatus := &pod.Status.ContainerStatuses[i]
401		state := &containerStatus.State
402		if state.Running != nil {
403			randTime(&state.Running.StartedAt.Time, r)
404		}
405		containerStatus.ContainerID = fmt.Sprintf("docker://%x%x%x%x", r.Int63(), r.Int63(), r.Int63(), r.Int63())
406	}
407	for i := range pod.ManagedFields {
408		randTime(&pod.ManagedFields[i].Time.Time, r)
409	}
410
411	randIP(&pod.Status.HostIP, r)
412	randIP(&pod.Status.PodIP, r)
413}
414
415func benchmarkItems(b *testing.B, file string, n int) *v1.PodList {
416	pod := v1.Pod{}
417	f, err := os.Open(file)
418	if err != nil {
419		b.Fatalf("Failed to open %q: %v", file, err)
420	}
421	defer f.Close()
422	err = json.NewDecoder(f).Decode(&pod)
423	if err != nil {
424		b.Fatalf("Failed to decode %q: %v", file, err)
425	}
426
427	list := &v1.PodList{
428		Items: make([]v1.Pod, n),
429	}
430
431	r := rand.New(rand.NewSource(benchmarkSeed))
432	for i := 0; i < n; i++ {
433		list.Items[i] = *pod.DeepCopy()
434		randPod(b, &list.Items[i], r)
435	}
436	return list
437}
438
439func toProtoBuf(b *testing.B, list *v1.PodList) []byte {
440	out, err := list.Marshal()
441	if err != nil {
442		b.Fatalf("Failed to marshal list to protobuf: %v", err)
443	}
444	return out
445}
446
447func toJSON(b *testing.B, list *v1.PodList) []byte {
448	out, err := json.Marshal(list)
449	if err != nil {
450		b.Fatalf("Failed to marshal list to json: %v", err)
451	}
452	return out
453}
454
455func benchmarkSerializeObject(b *testing.B, payload []byte) {
456	input, output := len(payload), len(gzipContent(payload, defaultGzipContentEncodingLevel))
457	b.Logf("Payload size: %d, expected output size: %d, ratio: %.2f", input, output, float64(output)/float64(input))
458
459	req := &http.Request{
460		Header: http.Header{
461			"Accept-Encoding": []string{"gzip"},
462		},
463		URL: &url.URL{Path: "/path"},
464	}
465	defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.APIResponseCompression, true)()
466
467	encoder := &fakeEncoder{
468		buf: payload,
469	}
470
471	b.ResetTimer()
472	for i := 0; i < b.N; i++ {
473		recorder := httptest.NewRecorder()
474		SerializeObject("application/json", encoder, recorder, req, http.StatusOK, nil /* object */)
475		result := recorder.Result()
476		if result.StatusCode != http.StatusOK {
477			b.Fatalf("incorrect status code: got %v;  want: %v", result.StatusCode, http.StatusOK)
478		}
479	}
480}
481
482func BenchmarkSerializeObject1000PodsPB(b *testing.B) {
483	benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 1000)))
484}
485func BenchmarkSerializeObject10000PodsPB(b *testing.B) {
486	benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 10000)))
487}
488func BenchmarkSerializeObject100000PodsPB(b *testing.B) {
489	benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 100000)))
490}
491
492func BenchmarkSerializeObject1000PodsJSON(b *testing.B) {
493	benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 1000)))
494}
495func BenchmarkSerializeObject10000PodsJSON(b *testing.B) {
496	benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 10000)))
497}
498func BenchmarkSerializeObject100000PodsJSON(b *testing.B) {
499	benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 100000)))
500}
501
502type fakeResponseRecorder struct {
503	*httptest.ResponseRecorder
504	fe                 *fakeEncoder
505	errorAfterEncoding bool
506}
507
508func (frw *fakeResponseRecorder) Write(buf []byte) (int, error) {
509	if frw.errorAfterEncoding && frw.fe.encodeCalled {
510		return 0, errors.New("returning a requested error")
511	}
512	return frw.ResponseRecorder.Write(buf)
513}
514
515type fakeEncoder struct {
516	obj  runtime.Object
517	buf  []byte
518	errs []error
519
520	encodeCalled bool
521}
522
523func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
524	e.obj = obj
525	if len(e.errs) > 0 {
526		err := e.errs[0]
527		e.errs = e.errs[1:]
528		return err
529	}
530	_, err := w.Write(e.buf)
531	e.encodeCalled = true
532	return err
533}
534
535func (e *fakeEncoder) Identifier() runtime.Identifier {
536	return runtime.Identifier("fake")
537}
538
539func gzipContent(data []byte, level int) []byte {
540	buf := &bytes.Buffer{}
541	gw, err := gzip.NewWriterLevel(buf, level)
542	if err != nil {
543		panic(err)
544	}
545	if _, err := gw.Write(data); err != nil {
546		panic(err)
547	}
548	if err := gw.Close(); err != nil {
549		panic(err)
550	}
551	return buf.Bytes()
552}
553