1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package responsewriters 18 19import ( 20 "bytes" 21 "compress/gzip" 22 "encoding/hex" 23 "encoding/json" 24 "errors" 25 "fmt" 26 "io" 27 "io/ioutil" 28 "math/rand" 29 "net/http" 30 "net/http/httptest" 31 "net/url" 32 "os" 33 "reflect" 34 "strconv" 35 "testing" 36 "time" 37 38 v1 "k8s.io/api/core/v1" 39 kerrors "k8s.io/apimachinery/pkg/api/errors" 40 "k8s.io/apimachinery/pkg/runtime" 41 "k8s.io/apimachinery/pkg/runtime/schema" 42 "k8s.io/apimachinery/pkg/util/diff" 43 "k8s.io/apimachinery/pkg/util/uuid" 44 "k8s.io/apiserver/pkg/features" 45 utilfeature "k8s.io/apiserver/pkg/util/feature" 46 featuregatetesting "k8s.io/component-base/featuregate/testing" 47) 48 49const benchmarkSeed = 100 50 51func TestSerializeObjectParallel(t *testing.T) { 52 largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) 53 type test struct { 54 name string 55 56 compressionEnabled bool 57 58 mediaType string 59 out []byte 60 outErrs []error 61 req *http.Request 62 statusCode int 63 object runtime.Object 64 65 wantCode int 66 wantHeaders http.Header 67 } 68 newTest := func() test { 69 return test{ 70 name: "compress on gzip", 71 compressionEnabled: true, 72 out: largePayload, 73 mediaType: "application/json", 74 req: &http.Request{ 75 Header: http.Header{ 76 "Accept-Encoding": []string{"gzip"}, 77 }, 78 URL: &url.URL{Path: "/path"}, 79 }, 80 wantCode: http.StatusOK, 81 wantHeaders: http.Header{ 82 "Content-Type": []string{"application/json"}, 83 "Content-Encoding": []string{"gzip"}, 84 "Vary": []string{"Accept-Encoding"}, 85 }, 86 } 87 } 88 for i := 0; i < 100; i++ { 89 ctt := newTest() 90 t.Run(ctt.name, func(t *testing.T) { 91 defer func() { 92 if r := recover(); r != nil { 93 t.Fatalf("recovered from err %v", r) 94 } 95 }() 96 t.Parallel() 97 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, ctt.compressionEnabled)() 98 99 encoder := &fakeEncoder{ 100 buf: ctt.out, 101 errs: ctt.outErrs, 102 } 103 if ctt.statusCode == 0 { 104 ctt.statusCode = http.StatusOK 105 } 106 recorder := &fakeResponseRecorder{ 107 ResponseRecorder: httptest.NewRecorder(), 108 fe: encoder, 109 errorAfterEncoding: true, 110 } 111 SerializeObject(ctt.mediaType, encoder, recorder, ctt.req, ctt.statusCode, ctt.object) 112 result := recorder.Result() 113 if result.StatusCode != ctt.wantCode { 114 t.Fatalf("unexpected code: %v", result.StatusCode) 115 } 116 if !reflect.DeepEqual(result.Header, ctt.wantHeaders) { 117 t.Fatal(diff.ObjectReflectDiff(ctt.wantHeaders, result.Header)) 118 } 119 }) 120 } 121} 122 123func TestSerializeObject(t *testing.T) { 124 smallPayload := []byte("{test-object,test-object}") 125 largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) 126 tests := []struct { 127 name string 128 129 compressionEnabled bool 130 131 mediaType string 132 out []byte 133 outErrs []error 134 req *http.Request 135 statusCode int 136 object runtime.Object 137 138 wantCode int 139 wantHeaders http.Header 140 wantBody []byte 141 }{ 142 { 143 name: "serialize object", 144 out: smallPayload, 145 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 146 wantCode: http.StatusOK, 147 wantHeaders: http.Header{"Content-Type": []string{""}}, 148 wantBody: smallPayload, 149 }, 150 151 { 152 name: "return content type", 153 out: smallPayload, 154 mediaType: "application/json", 155 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 156 wantCode: http.StatusOK, 157 wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, 158 wantBody: smallPayload, 159 }, 160 161 { 162 name: "return status code", 163 statusCode: http.StatusBadRequest, 164 out: smallPayload, 165 mediaType: "application/json", 166 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 167 wantCode: http.StatusBadRequest, 168 wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, 169 wantBody: smallPayload, 170 }, 171 172 { 173 name: "fail to encode object", 174 out: smallPayload, 175 outErrs: []error{fmt.Errorf("bad")}, 176 mediaType: "application/json", 177 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 178 wantCode: http.StatusInternalServerError, 179 wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, 180 wantBody: smallPayload, 181 }, 182 183 { 184 name: "fail to encode object or status", 185 out: smallPayload, 186 outErrs: []error{fmt.Errorf("bad"), fmt.Errorf("bad2")}, 187 mediaType: "application/json", 188 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 189 wantCode: http.StatusInternalServerError, 190 wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, 191 wantBody: []byte(": bad"), 192 }, 193 194 { 195 name: "fail to encode object or status with status code", 196 out: smallPayload, 197 outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, 198 mediaType: "application/json", 199 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 200 statusCode: http.StatusOK, 201 wantCode: http.StatusNotFound, 202 wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, 203 wantBody: []byte("NotFound: \"test\" not found"), 204 }, 205 206 { 207 name: "fail to encode object or status with status code and keeps previous error", 208 out: smallPayload, 209 outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, 210 mediaType: "application/json", 211 req: &http.Request{Header: http.Header{}, URL: &url.URL{Path: "/path"}}, 212 statusCode: http.StatusNotAcceptable, 213 wantCode: http.StatusNotAcceptable, 214 wantHeaders: http.Header{"Content-Type": []string{"text/plain"}}, 215 wantBody: []byte("NotFound: \"test\" not found"), 216 }, 217 218 { 219 name: "compression requires feature gate", 220 out: largePayload, 221 mediaType: "application/json", 222 req: &http.Request{ 223 Header: http.Header{ 224 "Accept-Encoding": []string{"gzip"}, 225 }, 226 URL: &url.URL{Path: "/path"}, 227 }, 228 wantCode: http.StatusOK, 229 wantHeaders: http.Header{"Content-Type": []string{"application/json"}}, 230 wantBody: largePayload, 231 }, 232 233 { 234 name: "compress on gzip", 235 compressionEnabled: true, 236 out: largePayload, 237 mediaType: "application/json", 238 req: &http.Request{ 239 Header: http.Header{ 240 "Accept-Encoding": []string{"gzip"}, 241 }, 242 URL: &url.URL{Path: "/path"}, 243 }, 244 wantCode: http.StatusOK, 245 wantHeaders: http.Header{ 246 "Content-Type": []string{"application/json"}, 247 "Content-Encoding": []string{"gzip"}, 248 "Vary": []string{"Accept-Encoding"}, 249 }, 250 wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), 251 }, 252 253 { 254 name: "compression is not performed on small objects", 255 compressionEnabled: true, 256 out: smallPayload, 257 mediaType: "application/json", 258 req: &http.Request{ 259 Header: http.Header{ 260 "Accept-Encoding": []string{"gzip"}, 261 }, 262 URL: &url.URL{Path: "/path"}, 263 }, 264 wantCode: http.StatusOK, 265 wantHeaders: http.Header{ 266 "Content-Type": []string{"application/json"}, 267 }, 268 wantBody: smallPayload, 269 }, 270 271 { 272 name: "compress when multiple encodings are requested", 273 compressionEnabled: true, 274 out: largePayload, 275 mediaType: "application/json", 276 req: &http.Request{ 277 Header: http.Header{ 278 "Accept-Encoding": []string{"deflate, , gzip,"}, 279 }, 280 URL: &url.URL{Path: "/path"}, 281 }, 282 wantCode: http.StatusOK, 283 wantHeaders: http.Header{ 284 "Content-Type": []string{"application/json"}, 285 "Content-Encoding": []string{"gzip"}, 286 "Vary": []string{"Accept-Encoding"}, 287 }, 288 wantBody: gzipContent(largePayload, defaultGzipContentEncodingLevel), 289 }, 290 291 { 292 name: "ignore compression on deflate", 293 compressionEnabled: true, 294 out: largePayload, 295 mediaType: "application/json", 296 req: &http.Request{ 297 Header: http.Header{ 298 "Accept-Encoding": []string{"deflate"}, 299 }, 300 URL: &url.URL{Path: "/path"}, 301 }, 302 wantCode: http.StatusOK, 303 wantHeaders: http.Header{ 304 "Content-Type": []string{"application/json"}, 305 }, 306 wantBody: largePayload, 307 }, 308 309 { 310 name: "ignore compression on unrecognized types", 311 compressionEnabled: true, 312 out: largePayload, 313 mediaType: "application/json", 314 req: &http.Request{ 315 Header: http.Header{ 316 "Accept-Encoding": []string{", , other, nothing, what, "}, 317 }, 318 URL: &url.URL{Path: "/path"}, 319 }, 320 wantCode: http.StatusOK, 321 wantHeaders: http.Header{ 322 "Content-Type": []string{"application/json"}, 323 }, 324 wantBody: largePayload, 325 }, 326 327 { 328 name: "errors are compressed", 329 compressionEnabled: true, 330 statusCode: http.StatusInternalServerError, 331 out: smallPayload, 332 outErrs: []error{fmt.Errorf(string(largePayload)), fmt.Errorf("bad2")}, 333 mediaType: "application/json", 334 req: &http.Request{ 335 Header: http.Header{ 336 "Accept-Encoding": []string{"gzip"}, 337 }, 338 URL: &url.URL{Path: "/path"}, 339 }, 340 wantCode: http.StatusInternalServerError, 341 wantHeaders: http.Header{ 342 "Content-Type": []string{"text/plain"}, 343 "Content-Encoding": []string{"gzip"}, 344 "Vary": []string{"Accept-Encoding"}, 345 }, 346 wantBody: gzipContent([]byte(": "+string(largePayload)), defaultGzipContentEncodingLevel), 347 }, 348 } 349 for _, tt := range tests { 350 t.Run(tt.name, func(t *testing.T) { 351 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, tt.compressionEnabled)() 352 353 encoder := &fakeEncoder{ 354 buf: tt.out, 355 errs: tt.outErrs, 356 } 357 if tt.statusCode == 0 { 358 tt.statusCode = http.StatusOK 359 } 360 recorder := httptest.NewRecorder() 361 SerializeObject(tt.mediaType, encoder, recorder, tt.req, tt.statusCode, tt.object) 362 result := recorder.Result() 363 if result.StatusCode != tt.wantCode { 364 t.Fatalf("unexpected code: %v", result.StatusCode) 365 } 366 if !reflect.DeepEqual(result.Header, tt.wantHeaders) { 367 t.Fatal(diff.ObjectReflectDiff(tt.wantHeaders, result.Header)) 368 } 369 body, _ := ioutil.ReadAll(result.Body) 370 if !bytes.Equal(tt.wantBody, body) { 371 t.Fatalf("wanted:\n%s\ngot:\n%s", hex.Dump(tt.wantBody), hex.Dump(body)) 372 } 373 }) 374 } 375} 376 377func randTime(t *time.Time, r *rand.Rand) { 378 *t = time.Unix(r.Int63n(1000*365*24*60*60), r.Int63()) 379} 380 381func randIP(s *string, r *rand.Rand) { 382 *s = fmt.Sprintf("10.20.%d.%d", r.Int31n(256), r.Int31n(256)) 383} 384 385// randPod changes fields in pod to mimic another pod from the same replicaset. 386// The list fields here has been generated by picking two pods in the same replicaset 387// and checking diff of their jsons. 388func randPod(b *testing.B, pod *v1.Pod, r *rand.Rand) { 389 pod.Name = fmt.Sprintf("%s-%x", pod.GenerateName, r.Int63n(1000)) 390 pod.UID = uuid.NewUUID() 391 pod.ResourceVersion = strconv.Itoa(r.Int()) 392 pod.Spec.NodeName = fmt.Sprintf("some-node-prefix-%x", r.Int63n(1000)) 393 394 randTime(&pod.CreationTimestamp.Time, r) 395 randTime(&pod.Status.StartTime.Time, r) 396 for i := range pod.Status.Conditions { 397 randTime(&pod.Status.Conditions[i].LastTransitionTime.Time, r) 398 } 399 for i := range pod.Status.ContainerStatuses { 400 containerStatus := &pod.Status.ContainerStatuses[i] 401 state := &containerStatus.State 402 if state.Running != nil { 403 randTime(&state.Running.StartedAt.Time, r) 404 } 405 containerStatus.ContainerID = fmt.Sprintf("docker://%x%x%x%x", r.Int63(), r.Int63(), r.Int63(), r.Int63()) 406 } 407 for i := range pod.ManagedFields { 408 randTime(&pod.ManagedFields[i].Time.Time, r) 409 } 410 411 randIP(&pod.Status.HostIP, r) 412 randIP(&pod.Status.PodIP, r) 413} 414 415func benchmarkItems(b *testing.B, file string, n int) *v1.PodList { 416 pod := v1.Pod{} 417 f, err := os.Open(file) 418 if err != nil { 419 b.Fatalf("Failed to open %q: %v", file, err) 420 } 421 defer f.Close() 422 err = json.NewDecoder(f).Decode(&pod) 423 if err != nil { 424 b.Fatalf("Failed to decode %q: %v", file, err) 425 } 426 427 list := &v1.PodList{ 428 Items: make([]v1.Pod, n), 429 } 430 431 r := rand.New(rand.NewSource(benchmarkSeed)) 432 for i := 0; i < n; i++ { 433 list.Items[i] = *pod.DeepCopy() 434 randPod(b, &list.Items[i], r) 435 } 436 return list 437} 438 439func toProtoBuf(b *testing.B, list *v1.PodList) []byte { 440 out, err := list.Marshal() 441 if err != nil { 442 b.Fatalf("Failed to marshal list to protobuf: %v", err) 443 } 444 return out 445} 446 447func toJSON(b *testing.B, list *v1.PodList) []byte { 448 out, err := json.Marshal(list) 449 if err != nil { 450 b.Fatalf("Failed to marshal list to json: %v", err) 451 } 452 return out 453} 454 455func benchmarkSerializeObject(b *testing.B, payload []byte) { 456 input, output := len(payload), len(gzipContent(payload, defaultGzipContentEncodingLevel)) 457 b.Logf("Payload size: %d, expected output size: %d, ratio: %.2f", input, output, float64(output)/float64(input)) 458 459 req := &http.Request{ 460 Header: http.Header{ 461 "Accept-Encoding": []string{"gzip"}, 462 }, 463 URL: &url.URL{Path: "/path"}, 464 } 465 defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.APIResponseCompression, true)() 466 467 encoder := &fakeEncoder{ 468 buf: payload, 469 } 470 471 b.ResetTimer() 472 for i := 0; i < b.N; i++ { 473 recorder := httptest.NewRecorder() 474 SerializeObject("application/json", encoder, recorder, req, http.StatusOK, nil /* object */) 475 result := recorder.Result() 476 if result.StatusCode != http.StatusOK { 477 b.Fatalf("incorrect status code: got %v; want: %v", result.StatusCode, http.StatusOK) 478 } 479 } 480} 481 482func BenchmarkSerializeObject1000PodsPB(b *testing.B) { 483 benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 1000))) 484} 485func BenchmarkSerializeObject10000PodsPB(b *testing.B) { 486 benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 10000))) 487} 488func BenchmarkSerializeObject100000PodsPB(b *testing.B) { 489 benchmarkSerializeObject(b, toProtoBuf(b, benchmarkItems(b, "testdata/pod.json", 100000))) 490} 491 492func BenchmarkSerializeObject1000PodsJSON(b *testing.B) { 493 benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 1000))) 494} 495func BenchmarkSerializeObject10000PodsJSON(b *testing.B) { 496 benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 10000))) 497} 498func BenchmarkSerializeObject100000PodsJSON(b *testing.B) { 499 benchmarkSerializeObject(b, toJSON(b, benchmarkItems(b, "testdata/pod.json", 100000))) 500} 501 502type fakeResponseRecorder struct { 503 *httptest.ResponseRecorder 504 fe *fakeEncoder 505 errorAfterEncoding bool 506} 507 508func (frw *fakeResponseRecorder) Write(buf []byte) (int, error) { 509 if frw.errorAfterEncoding && frw.fe.encodeCalled { 510 return 0, errors.New("returning a requested error") 511 } 512 return frw.ResponseRecorder.Write(buf) 513} 514 515type fakeEncoder struct { 516 obj runtime.Object 517 buf []byte 518 errs []error 519 520 encodeCalled bool 521} 522 523func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { 524 e.obj = obj 525 if len(e.errs) > 0 { 526 err := e.errs[0] 527 e.errs = e.errs[1:] 528 return err 529 } 530 _, err := w.Write(e.buf) 531 e.encodeCalled = true 532 return err 533} 534 535func (e *fakeEncoder) Identifier() runtime.Identifier { 536 return runtime.Identifier("fake") 537} 538 539func gzipContent(data []byte, level int) []byte { 540 buf := &bytes.Buffer{} 541 gw, err := gzip.NewWriterLevel(buf, level) 542 if err != nil { 543 panic(err) 544 } 545 if _, err := gw.Write(data); err != nil { 546 panic(err) 547 } 548 if err := gw.Close(); err != nil { 549 panic(err) 550 } 551 return buf.Bytes() 552} 553