1package distribution // import "github.com/docker/docker/distribution" 2 3import ( 4 "bufio" 5 "compress/gzip" 6 "context" 7 "fmt" 8 "io" 9 10 "github.com/docker/distribution/reference" 11 "github.com/docker/docker/distribution/metadata" 12 "github.com/docker/docker/pkg/progress" 13 "github.com/docker/docker/registry" 14 "github.com/sirupsen/logrus" 15) 16 17// Pusher is an interface that abstracts pushing for different API versions. 18type Pusher interface { 19 // Push tries to push the image configured at the creation of Pusher. 20 // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint. 21 // 22 // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. 23 Push(ctx context.Context) error 24} 25 26const compressionBufSize = 32768 27 28// NewPusher creates a new Pusher interface that will push to either a v1 or v2 29// registry. The endpoint argument contains a Version field that determines 30// whether a v1 or v2 pusher will be created. The other parameters are passed 31// through to the underlying pusher implementation for use during the actual 32// push operation. 33func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) { 34 switch endpoint.Version { 35 case registry.APIVersion2: 36 return &v2Pusher{ 37 v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore), 38 ref: ref, 39 endpoint: endpoint, 40 repoInfo: repoInfo, 41 config: imagePushConfig, 42 }, nil 43 case registry.APIVersion1: 44 return &v1Pusher{ 45 v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore), 46 ref: ref, 47 endpoint: endpoint, 48 repoInfo: repoInfo, 49 config: imagePushConfig, 50 }, nil 51 } 52 return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL) 53} 54 55// Push initiates a push operation on ref. 56// ref is the specific variant of the image to be pushed. 57// If no tag is provided, all tags will be pushed. 58func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error { 59 // FIXME: Allow to interrupt current push when new push of same image is done. 60 61 // Resolve the Repository name from fqn to RepositoryInfo 62 repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref) 63 if err != nil { 64 return err 65 } 66 67 endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) 68 if err != nil { 69 return err 70 } 71 72 progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) 73 74 associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name) 75 if len(associations) == 0 { 76 return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name)) 77 } 78 79 var ( 80 lastErr error 81 82 // confirmedV2 is set to true if a push attempt managed to 83 // confirm that it was talking to a v2 registry. This will 84 // prevent fallback to the v1 protocol. 85 confirmedV2 bool 86 87 // confirmedTLSRegistries is a map indicating which registries 88 // are known to be using TLS. There should never be a plaintext 89 // retry for any of these. 90 confirmedTLSRegistries = make(map[string]struct{}) 91 ) 92 93 for _, endpoint := range endpoints { 94 if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 { 95 continue 96 } 97 if confirmedV2 && endpoint.Version == registry.APIVersion1 { 98 logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL) 99 continue 100 } 101 102 if endpoint.URL.Scheme != "https" { 103 if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS { 104 logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL) 105 continue 106 } 107 } 108 109 logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version) 110 111 pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig) 112 if err != nil { 113 lastErr = err 114 continue 115 } 116 if err := pusher.Push(ctx); err != nil { 117 // Was this push cancelled? If so, don't try to fall 118 // back. 119 select { 120 case <-ctx.Done(): 121 default: 122 if fallbackErr, ok := err.(fallbackError); ok { 123 confirmedV2 = confirmedV2 || fallbackErr.confirmedV2 124 if fallbackErr.transportOK && endpoint.URL.Scheme == "https" { 125 confirmedTLSRegistries[endpoint.URL.Host] = struct{}{} 126 } 127 err = fallbackErr.err 128 lastErr = err 129 logrus.Infof("Attempting next endpoint for push after error: %v", err) 130 continue 131 } 132 } 133 134 logrus.Errorf("Not continuing with push after error: %v", err) 135 return err 136 } 137 138 imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push") 139 return nil 140 } 141 142 if lastErr == nil { 143 lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name()) 144 } 145 return lastErr 146} 147 148// compress returns an io.ReadCloser which will supply a compressed version of 149// the provided Reader. The caller must close the ReadCloser after reading the 150// compressed data. 151// 152// Note that this function returns a reader instead of taking a writer as an 153// argument so that it can be used with httpBlobWriter's ReadFrom method. 154// Using httpBlobWriter's Write method would send a PATCH request for every 155// Write call. 156// 157// The second return value is a channel that gets closed when the goroutine 158// is finished. This allows the caller to make sure the goroutine finishes 159// before it releases any resources connected with the reader that was 160// passed in. 161func compress(in io.Reader) (io.ReadCloser, chan struct{}) { 162 compressionDone := make(chan struct{}) 163 164 pipeReader, pipeWriter := io.Pipe() 165 // Use a bufio.Writer to avoid excessive chunking in HTTP request. 166 bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize) 167 compressor := gzip.NewWriter(bufWriter) 168 169 go func() { 170 _, err := io.Copy(compressor, in) 171 if err == nil { 172 err = compressor.Close() 173 } 174 if err == nil { 175 err = bufWriter.Flush() 176 } 177 if err != nil { 178 pipeWriter.CloseWithError(err) 179 } else { 180 pipeWriter.Close() 181 } 182 close(compressionDone) 183 }() 184 185 return pipeReader, compressionDone 186} 187