1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package compression
18
19import (
20	"bufio"
21	"bytes"
22	"compress/gzip"
23	"context"
24	"fmt"
25	"io"
26	"os"
27	"os/exec"
28	"strconv"
29	"sync"
30
31	"github.com/containerd/containerd/log"
32)
33
34type (
35	// Compression is the state represents if compressed or not.
36	Compression int
37)
38
39const (
40	// Uncompressed represents the uncompressed.
41	Uncompressed Compression = iota
42	// Gzip is gzip compression algorithm.
43	Gzip
44)
45
46const disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
47
48var (
49	initPigz   sync.Once
50	unpigzPath string
51)
52
53var (
54	bufioReader32KPool = &sync.Pool{
55		New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
56	}
57)
58
59// DecompressReadCloser include the stream after decompress and the compress method detected.
60type DecompressReadCloser interface {
61	io.ReadCloser
62	// GetCompression returns the compress method which is used before decompressing
63	GetCompression() Compression
64}
65
66type readCloserWrapper struct {
67	io.Reader
68	compression Compression
69	closer      func() error
70}
71
72func (r *readCloserWrapper) Close() error {
73	if r.closer != nil {
74		return r.closer()
75	}
76	return nil
77}
78
79func (r *readCloserWrapper) GetCompression() Compression {
80	return r.compression
81}
82
83type writeCloserWrapper struct {
84	io.Writer
85	closer func() error
86}
87
88func (w *writeCloserWrapper) Close() error {
89	if w.closer != nil {
90		w.closer()
91	}
92	return nil
93}
94
95type bufferedReader struct {
96	buf *bufio.Reader
97}
98
99func newBufferedReader(r io.Reader) *bufferedReader {
100	buf := bufioReader32KPool.Get().(*bufio.Reader)
101	buf.Reset(r)
102	return &bufferedReader{buf}
103}
104
105func (r *bufferedReader) Read(p []byte) (n int, err error) {
106	if r.buf == nil {
107		return 0, io.EOF
108	}
109	n, err = r.buf.Read(p)
110	if err == io.EOF {
111		r.buf.Reset(nil)
112		bufioReader32KPool.Put(r.buf)
113		r.buf = nil
114	}
115	return
116}
117
118func (r *bufferedReader) Peek(n int) ([]byte, error) {
119	if r.buf == nil {
120		return nil, io.EOF
121	}
122	return r.buf.Peek(n)
123}
124
125// DetectCompression detects the compression algorithm of the source.
126func DetectCompression(source []byte) Compression {
127	for compression, m := range map[Compression][]byte{
128		Gzip: {0x1F, 0x8B, 0x08},
129	} {
130		if len(source) < len(m) {
131			// Len too short
132			continue
133		}
134		if bytes.Equal(m, source[:len(m)]) {
135			return compression
136		}
137	}
138	return Uncompressed
139}
140
141// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
142func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
143	buf := newBufferedReader(archive)
144	bs, err := buf.Peek(10)
145	if err != nil && err != io.EOF {
146		// Note: we'll ignore any io.EOF error because there are some odd
147		// cases where the layer.tar file will be empty (zero bytes) and
148		// that results in an io.EOF from the Peek() call. So, in those
149		// cases we'll just treat it as a non-compressed stream and
150		// that means just create an empty layer.
151		// See Issue docker/docker#18170
152		return nil, err
153	}
154
155	switch compression := DetectCompression(bs); compression {
156	case Uncompressed:
157		return &readCloserWrapper{
158			Reader:      buf,
159			compression: compression,
160		}, nil
161	case Gzip:
162		ctx, cancel := context.WithCancel(context.Background())
163		gzReader, err := gzipDecompress(ctx, buf)
164		if err != nil {
165			cancel()
166			return nil, err
167		}
168
169		return &readCloserWrapper{
170			Reader:      gzReader,
171			compression: compression,
172			closer: func() error {
173				cancel()
174				return gzReader.Close()
175			},
176		}, nil
177
178	default:
179		return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
180	}
181}
182
183// CompressStream compresses the dest with specified compression algorithm.
184func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
185	switch compression {
186	case Uncompressed:
187		return &writeCloserWrapper{dest, nil}, nil
188	case Gzip:
189		return gzip.NewWriter(dest), nil
190	default:
191		return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
192	}
193}
194
195// Extension returns the extension of a file that uses the specified compression algorithm.
196func (compression *Compression) Extension() string {
197	switch *compression {
198	case Gzip:
199		return "gz"
200	}
201	return ""
202}
203
204func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
205	initPigz.Do(func() {
206		if unpigzPath = detectPigz(); unpigzPath != "" {
207			log.L.Debug("using pigz for decompression")
208		}
209	})
210
211	if unpigzPath == "" {
212		return gzip.NewReader(buf)
213	}
214
215	return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
216}
217
218func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
219	reader, writer := io.Pipe()
220
221	cmd.Stdin = in
222	cmd.Stdout = writer
223
224	var errBuf bytes.Buffer
225	cmd.Stderr = &errBuf
226
227	if err := cmd.Start(); err != nil {
228		return nil, err
229	}
230
231	go func() {
232		if err := cmd.Wait(); err != nil {
233			writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
234		} else {
235			writer.Close()
236		}
237	}()
238
239	return reader, nil
240}
241
242func detectPigz() string {
243	path, err := exec.LookPath("unpigz")
244	if err != nil {
245		log.L.WithError(err).Debug("unpigz not found, falling back to go gzip")
246		return ""
247	}
248
249	// Check if pigz disabled via CONTAINERD_DISABLE_PIGZ env variable
250	value := os.Getenv(disablePigzEnv)
251	if value == "" {
252		return path
253	}
254
255	disable, err := strconv.ParseBool(value)
256	if err != nil {
257		log.L.WithError(err).Warnf("could not parse %s: %s", disablePigzEnv, value)
258		return path
259	}
260
261	if disable {
262		return ""
263	}
264
265	return path
266}
267