1package distribution // import "github.com/docker/docker/distribution"
2
3import (
4	"bufio"
5	"compress/gzip"
6	"context"
7	"fmt"
8	"io"
9
10	"github.com/docker/distribution/reference"
11	"github.com/docker/docker/distribution/metadata"
12	"github.com/docker/docker/pkg/progress"
13	"github.com/docker/docker/registry"
14	"github.com/sirupsen/logrus"
15)
16
17// Pusher is an interface that abstracts pushing for different API versions.
18type Pusher interface {
19	// Push tries to push the image configured at the creation of Pusher.
20	// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
21	//
22	// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
23	Push(ctx context.Context) error
24}
25
26const compressionBufSize = 32768
27
28// NewPusher creates a new Pusher interface that will push to either a v1 or v2
29// registry. The endpoint argument contains a Version field that determines
30// whether a v1 or v2 pusher will be created. The other parameters are passed
31// through to the underlying pusher implementation for use during the actual
32// push operation.
33func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
34	switch endpoint.Version {
35	case registry.APIVersion2:
36		return &v2Pusher{
37			v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
38			ref:               ref,
39			endpoint:          endpoint,
40			repoInfo:          repoInfo,
41			config:            imagePushConfig,
42		}, nil
43	case registry.APIVersion1:
44		return &v1Pusher{
45			v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
46			ref:         ref,
47			endpoint:    endpoint,
48			repoInfo:    repoInfo,
49			config:      imagePushConfig,
50		}, nil
51	}
52	return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
53}
54
55// Push initiates a push operation on ref.
56// ref is the specific variant of the image to be pushed.
57// If no tag is provided, all tags will be pushed.
58func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
59	// FIXME: Allow to interrupt current push when new push of same image is done.
60
61	// Resolve the Repository name from fqn to RepositoryInfo
62	repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
63	if err != nil {
64		return err
65	}
66
67	endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
68	if err != nil {
69		return err
70	}
71
72	progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
73
74	associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
75	if len(associations) == 0 {
76		return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
77	}
78
79	var (
80		lastErr error
81
82		// confirmedV2 is set to true if a push attempt managed to
83		// confirm that it was talking to a v2 registry. This will
84		// prevent fallback to the v1 protocol.
85		confirmedV2 bool
86
87		// confirmedTLSRegistries is a map indicating which registries
88		// are known to be using TLS. There should never be a plaintext
89		// retry for any of these.
90		confirmedTLSRegistries = make(map[string]struct{})
91	)
92
93	for _, endpoint := range endpoints {
94		if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
95			continue
96		}
97		if confirmedV2 && endpoint.Version == registry.APIVersion1 {
98			logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
99			continue
100		}
101
102		if endpoint.URL.Scheme != "https" {
103			if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
104				logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
105				continue
106			}
107		}
108
109		logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
110
111		pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
112		if err != nil {
113			lastErr = err
114			continue
115		}
116		if err := pusher.Push(ctx); err != nil {
117			// Was this push cancelled? If so, don't try to fall
118			// back.
119			select {
120			case <-ctx.Done():
121			default:
122				if fallbackErr, ok := err.(fallbackError); ok {
123					confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
124					if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
125						confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
126					}
127					err = fallbackErr.err
128					lastErr = err
129					logrus.Infof("Attempting next endpoint for push after error: %v", err)
130					continue
131				}
132			}
133
134			logrus.Errorf("Not continuing with push after error: %v", err)
135			return err
136		}
137
138		imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
139		return nil
140	}
141
142	if lastErr == nil {
143		lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
144	}
145	return lastErr
146}
147
148// compress returns an io.ReadCloser which will supply a compressed version of
149// the provided Reader. The caller must close the ReadCloser after reading the
150// compressed data.
151//
152// Note that this function returns a reader instead of taking a writer as an
153// argument so that it can be used with httpBlobWriter's ReadFrom method.
154// Using httpBlobWriter's Write method would send a PATCH request for every
155// Write call.
156//
157// The second return value is a channel that gets closed when the goroutine
158// is finished. This allows the caller to make sure the goroutine finishes
159// before it releases any resources connected with the reader that was
160// passed in.
161func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
162	compressionDone := make(chan struct{})
163
164	pipeReader, pipeWriter := io.Pipe()
165	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
166	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
167	compressor := gzip.NewWriter(bufWriter)
168
169	go func() {
170		_, err := io.Copy(compressor, in)
171		if err == nil {
172			err = compressor.Close()
173		}
174		if err == nil {
175			err = bufWriter.Flush()
176		}
177		if err != nil {
178			pipeWriter.CloseWithError(err)
179		} else {
180			pipeWriter.Close()
181		}
182		close(compressionDone)
183	}()
184
185	return pipeReader, compressionDone
186}
187