1package repository 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "os" 10 "os/exec" 11 "path/filepath" 12 "strings" 13 14 "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" 15 "gitlab.com/gitlab-org/gitaly/v14/internal/command" 16 "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote" 17 "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" 18 "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" 19 "gitlab.com/gitlab-org/gitaly/v14/internal/helper" 20 "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" 21 "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" 22 "gitlab.com/gitlab-org/gitaly/v14/internal/safe" 23 "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" 24 "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" 25 "gitlab.com/gitlab-org/gitaly/v14/streamio" 26 "golang.org/x/sync/errgroup" 27 "google.golang.org/grpc/codes" 28 "google.golang.org/grpc/status" 29) 30 31// ErrInvalidSourceRepository is returned when attempting to replicate from an invalid source repository. 32var ErrInvalidSourceRepository = status.Error(codes.NotFound, "invalid source repository") 33 34func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { 35 if err := validateReplicateRepository(in); err != nil { 36 return nil, helper.ErrInvalidArgument(err) 37 } 38 39 repoPath, err := s.locator.GetPath(in.GetRepository()) 40 if err != nil { 41 return nil, helper.ErrInternal(err) 42 } 43 44 if !storage.IsGitDirectory(repoPath) { 45 if err = s.create(ctx, in, repoPath); err != nil { 46 if errors.Is(err, ErrInvalidSourceRepository) { 47 return nil, ErrInvalidSourceRepository 48 } 49 50 return nil, helper.ErrInternal(err) 51 } 52 } 53 54 // We're not using the context of the errgroup here, as an error 55 // returned by either of the called functions would cancel the 56 // respective other function. Given that we're doing RPC calls in 57 // them, cancellation of the calls would mean that the remote side 58 // may still modify the repository even though the local side has 59 // returned already. 60 g, _ := errgroup.WithContext(ctx) 61 outgoingCtx := metadata.IncomingToOutgoing(ctx) 62 63 syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ 64 s.syncGitconfig, 65 s.syncInfoAttributes, 66 s.syncRepository, 67 } 68 69 for _, f := range syncFuncs { 70 f := f // rescoping f 71 g.Go(func() error { return f(outgoingCtx, in) }) 72 } 73 74 if err := g.Wait(); err != nil { 75 return nil, helper.ErrInternal(err) 76 } 77 78 return &gitalypb.ReplicateRepositoryResponse{}, nil 79} 80 81func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error { 82 if in.GetRepository() == nil { 83 return errors.New("repository cannot be empty") 84 } 85 86 if in.GetSource() == nil { 87 return errors.New("source repository cannot be empty") 88 } 89 90 if in.GetRepository().GetRelativePath() != in.GetSource().GetRelativePath() { 91 return errors.New("both source and repository should have the same relative path") 92 } 93 94 if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() { 95 return errors.New("repository and source have the same storage") 96 } 97 98 return nil 99} 100 101func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest, repoPath string) error { 102 // if the directory exists, remove it 103 if _, err := os.Stat(repoPath); err == nil { 104 tempDir, err := tempdir.NewWithoutContext(in.GetRepository().GetStorageName(), s.locator) 105 if err != nil { 106 return err 107 } 108 109 if err = os.Rename(repoPath, filepath.Join(tempDir.Path(), filepath.Base(repoPath))); err != nil { 110 return fmt.Errorf("error deleting invalid repo: %v", err) 111 } 112 113 ctxlogrus.Extract(ctx).WithField("repo_path", repoPath).Warn("removed invalid repository") 114 } 115 116 if err := s.createFromSnapshot(ctx, in); err != nil { 117 return fmt.Errorf("could not create repository from snapshot: %w", err) 118 } 119 120 return nil 121} 122 123func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { 124 tempRepo, tempDir, err := tempdir.NewRepository(ctx, in.GetRepository().GetStorageName(), s.locator) 125 if err != nil { 126 return fmt.Errorf("create temporary directory: %w", err) 127 } 128 129 if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ 130 Repository: tempRepo, 131 }); err != nil { 132 return fmt.Errorf("create repository: %w", err) 133 } 134 135 repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) 136 if err != nil { 137 return fmt.Errorf("new client: %w", err) 138 } 139 140 stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()}) 141 if err != nil { 142 return fmt.Errorf("get snapshot: %w", err) 143 } 144 145 // We need to catch a possible 'invalid repository' error from GetSnapshot. On an empty read, 146 // BSD tar exits with code 0 so we'd receive the error when waiting for the command. GNU tar on 147 // Linux exits with a non-zero code, which causes Go to return an os.ExitError hiding the original 148 // error reading from stdin. To get access to the error on both Linux and macOS, we read the first 149 // message from the stream here to get access to the possible 'invalid repository' first on both 150 // platforms. 151 firstBytes, err := stream.Recv() 152 if err != nil { 153 if st, ok := status.FromError(err); ok { 154 if st.Code() == codes.NotFound && strings.HasPrefix(st.Message(), "GetRepoPath: not a git repository:") { 155 return ErrInvalidSourceRepository 156 } 157 } 158 159 return fmt.Errorf("first snapshot read: %w", err) 160 } 161 162 snapshotReader := io.MultiReader( 163 bytes.NewReader(firstBytes.GetData()), 164 streamio.NewReader(func() ([]byte, error) { 165 resp, err := stream.Recv() 166 return resp.GetData(), err 167 }), 168 ) 169 170 stderr := &bytes.Buffer{} 171 cmd, err := command.New(ctx, exec.Command("tar", "-C", tempDir.Path(), "-xvf", "-"), snapshotReader, nil, stderr) 172 if err != nil { 173 return fmt.Errorf("create tar command: %w", err) 174 } 175 176 if err = cmd.Wait(); err != nil { 177 return fmt.Errorf("wait for tar, stderr: %q, err: %w", stderr, err) 178 } 179 180 targetPath, err := s.locator.GetPath(in.GetRepository()) 181 if err != nil { 182 return fmt.Errorf("locate repository: %w", err) 183 } 184 185 if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil { 186 return fmt.Errorf("create parent directories: %w", err) 187 } 188 189 if err := os.Rename(tempDir.Path(), targetPath); err != nil { 190 return fmt.Errorf("move temporary directory to target path: %w", err) 191 } 192 193 return nil 194} 195 196func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { 197 repo := s.localrepo(in.GetRepository()) 198 199 if err := remote.FetchInternalRemote(ctx, s.cfg, s.conns, repo, in.GetSource()); err != nil { 200 return fmt.Errorf("fetch internal remote: %w", err) 201 } 202 203 return nil 204} 205 206func (s *server) syncGitconfig(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { 207 repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) 208 if err != nil { 209 return err 210 } 211 212 repoPath, err := s.locator.GetRepoPath(in.GetRepository()) 213 if err != nil { 214 return err 215 } 216 217 // At the point of implementing this, the `GetConfig` RPC hasn't been deployed yet and is 218 // thus not available for general use. In theory, we'd have to wait for this release cycle 219 // to finish, and only afterwards would we be able to implement replication of the 220 // gitconfig. In order to allow us to iterate fast, we just try to call `GetConfig()`, but 221 // ignore any errors for the case where the target Gitaly node doesn't support the RPC yet. 222 // TODO: Remove this hack and properly return the error in the next release cycle. 223 if err := func() error { 224 stream, err := repoClient.GetConfig(ctx, &gitalypb.GetConfigRequest{ 225 Repository: in.GetSource(), 226 }) 227 if err != nil { 228 return err 229 } 230 231 configPath := filepath.Join(repoPath, "config") 232 if err := s.writeFile(ctx, configPath, 0o644, streamio.NewReader(func() ([]byte, error) { 233 resp, err := stream.Recv() 234 return resp.GetData(), err 235 })); err != nil { 236 return err 237 } 238 239 return nil 240 }(); err != nil { 241 ctxlogrus.Extract(ctx).WithError(err).Warn("synchronizing gitconfig failed") 242 } 243 244 return nil 245} 246 247func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { 248 repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) 249 if err != nil { 250 return err 251 } 252 253 repoPath, err := s.locator.GetRepoPath(in.GetRepository()) 254 if err != nil { 255 return err 256 } 257 258 stream, err := repoClient.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ 259 Repository: in.GetSource(), 260 }) 261 if err != nil { 262 return err 263 } 264 265 attributesPath := filepath.Join(repoPath, "info", "attributes") 266 if err := s.writeFile(ctx, attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) { 267 resp, err := stream.Recv() 268 return resp.GetAttributes(), err 269 })); err != nil { 270 return err 271 } 272 273 return nil 274} 275 276func (s *server) writeFile(ctx context.Context, path string, mode os.FileMode, reader io.Reader) (returnedErr error) { 277 parentDir := filepath.Dir(path) 278 if err := os.MkdirAll(parentDir, 0o755); err != nil { 279 return err 280 } 281 282 if featureflag.TxExtendedFileLocking.IsEnabled(ctx) { 283 lockedFile, err := safe.NewLockingFileWriter(path, safe.LockingFileWriterConfig{ 284 FileWriterConfig: safe.FileWriterConfig{ 285 FileMode: mode, 286 }, 287 }) 288 if err != nil { 289 return fmt.Errorf("creating file writer: %w", err) 290 } 291 defer func() { 292 if err := lockedFile.Close(); err != nil && returnedErr == nil { 293 returnedErr = fmt.Errorf("closing file writer: %w", err) 294 } 295 }() 296 297 if _, err := io.Copy(lockedFile, reader); err != nil { 298 return fmt.Errorf("writing contents: %w", err) 299 } 300 301 if err := transaction.CommitLockedFile(ctx, s.txManager, lockedFile); err != nil { 302 return fmt.Errorf("committing file: %w", err) 303 } 304 } else { 305 fw, err := safe.NewFileWriter(path) 306 if err != nil { 307 return err 308 } 309 defer fw.Close() 310 311 if _, err := io.Copy(fw, reader); err != nil { 312 return err 313 } 314 315 if err = fw.Commit(); err != nil { 316 return err 317 } 318 319 if err := os.Chmod(path, mode); err != nil { 320 return err 321 } 322 } 323 324 return nil 325} 326 327// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository 328func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { 329 gitalyServerInfo, err := storage.ExtractGitalyServer(ctx, storageName) 330 if err != nil { 331 return nil, err 332 } 333 334 conn, err := s.conns.Dial(ctx, gitalyServerInfo.Address, gitalyServerInfo.Token) 335 if err != nil { 336 return nil, err 337 } 338 339 return gitalypb.NewRepositoryServiceClient(conn), nil 340} 341