1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package streaming implements encoder and decoder for streams 18// of runtime.Objects over io.Writer/Readers. 19package streaming 20 21import ( 22 "bytes" 23 "fmt" 24 "io" 25 26 "k8s.io/apimachinery/pkg/runtime" 27 "k8s.io/apimachinery/pkg/runtime/schema" 28) 29 30// Encoder is a runtime.Encoder on a stream. 31type Encoder interface { 32 // Encode will write the provided object to the stream or return an error. It obeys the same 33 // contract as runtime.VersionedEncoder. 34 Encode(obj runtime.Object) error 35} 36 37// Decoder is a runtime.Decoder from a stream. 38type Decoder interface { 39 // Decode will return io.EOF when no more objects are available. 40 Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) 41 // Close closes the underlying stream. 42 Close() error 43} 44 45// Serializer is a factory for creating encoders and decoders that work over streams. 46type Serializer interface { 47 NewEncoder(w io.Writer) Encoder 48 NewDecoder(r io.ReadCloser) Decoder 49} 50 51type decoder struct { 52 reader io.ReadCloser 53 decoder runtime.Decoder 54 buf []byte 55 maxBytes int 56 resetRead bool 57} 58 59// NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d. 60// The reader is expected to return ErrShortRead if the provided buffer is not large enough to read 61// an entire object. 62func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder { 63 return &decoder{ 64 reader: r, 65 decoder: d, 66 buf: make([]byte, 1024), 67 maxBytes: 16 * 1024 * 1024, 68 } 69} 70 71var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size") 72 73// Decode reads the next object from the stream and decodes it. 74func (d *decoder) Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { 75 base := 0 76 for { 77 n, err := d.reader.Read(d.buf[base:]) 78 if err == io.ErrShortBuffer { 79 if n == 0 { 80 return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf)) 81 } 82 if d.resetRead { 83 continue 84 } 85 // double the buffer size up to maxBytes 86 if len(d.buf) < d.maxBytes { 87 base += n 88 d.buf = append(d.buf, make([]byte, len(d.buf))...) 89 continue 90 } 91 // must read the rest of the frame (until we stop getting ErrShortBuffer) 92 d.resetRead = true 93 base = 0 94 return nil, nil, ErrObjectTooLarge 95 } 96 if err != nil { 97 return nil, nil, err 98 } 99 if d.resetRead { 100 // now that we have drained the large read, continue 101 d.resetRead = false 102 continue 103 } 104 base += n 105 break 106 } 107 return d.decoder.Decode(d.buf[:base], defaults, into) 108} 109 110func (d *decoder) Close() error { 111 return d.reader.Close() 112} 113 114type encoder struct { 115 writer io.Writer 116 encoder runtime.Encoder 117 buf *bytes.Buffer 118} 119 120// NewEncoder returns a new streaming encoder. 121func NewEncoder(w io.Writer, e runtime.Encoder) Encoder { 122 return &encoder{ 123 writer: w, 124 encoder: e, 125 buf: &bytes.Buffer{}, 126 } 127} 128 129// Encode writes the provided object to the nested writer. 130func (e *encoder) Encode(obj runtime.Object) error { 131 if err := e.encoder.Encode(obj, e.buf); err != nil { 132 return err 133 } 134 _, err := e.writer.Write(e.buf.Bytes()) 135 e.buf.Reset() 136 return err 137} 138