1package repository
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io"
9	"os"
10	"os/exec"
11	"path/filepath"
12	"strings"
13
14	"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
15	"gitlab.com/gitlab-org/gitaly/v14/internal/command"
16	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote"
17	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
18	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
19	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
20	"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
21	"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
22	"gitlab.com/gitlab-org/gitaly/v14/internal/safe"
23	"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
24	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
25	"gitlab.com/gitlab-org/gitaly/v14/streamio"
26	"golang.org/x/sync/errgroup"
27	"google.golang.org/grpc/codes"
28	"google.golang.org/grpc/status"
29)
30
31// ErrInvalidSourceRepository is returned when attempting to replicate from an invalid source repository.
32var ErrInvalidSourceRepository = status.Error(codes.NotFound, "invalid source repository")
33
34func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
35	if err := validateReplicateRepository(in); err != nil {
36		return nil, helper.ErrInvalidArgument(err)
37	}
38
39	repoPath, err := s.locator.GetPath(in.GetRepository())
40	if err != nil {
41		return nil, helper.ErrInternal(err)
42	}
43
44	if !storage.IsGitDirectory(repoPath) {
45		if err = s.create(ctx, in, repoPath); err != nil {
46			if errors.Is(err, ErrInvalidSourceRepository) {
47				return nil, ErrInvalidSourceRepository
48			}
49
50			return nil, helper.ErrInternal(err)
51		}
52	}
53
54	// We're not using the context of the errgroup here, as an error
55	// returned by either of the called functions would cancel the
56	// respective other function. Given that we're doing RPC calls in
57	// them, cancellation of the calls would mean that the remote side
58	// may still modify the repository even though the local side has
59	// returned already.
60	g, _ := errgroup.WithContext(ctx)
61	outgoingCtx := metadata.IncomingToOutgoing(ctx)
62
63	syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{
64		s.syncGitconfig,
65		s.syncInfoAttributes,
66		s.syncRepository,
67	}
68
69	for _, f := range syncFuncs {
70		f := f // rescoping f
71		g.Go(func() error { return f(outgoingCtx, in) })
72	}
73
74	if err := g.Wait(); err != nil {
75		return nil, helper.ErrInternal(err)
76	}
77
78	return &gitalypb.ReplicateRepositoryResponse{}, nil
79}
80
81func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error {
82	if in.GetRepository() == nil {
83		return errors.New("repository cannot be empty")
84	}
85
86	if in.GetSource() == nil {
87		return errors.New("source repository cannot be empty")
88	}
89
90	if in.GetRepository().GetRelativePath() != in.GetSource().GetRelativePath() {
91		return errors.New("both source and repository should have the same relative path")
92	}
93
94	if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() {
95		return errors.New("repository and source have the same storage")
96	}
97
98	return nil
99}
100
101func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest, repoPath string) error {
102	// if the directory exists, remove it
103	if _, err := os.Stat(repoPath); err == nil {
104		tempDir, err := tempdir.NewWithoutContext(in.GetRepository().GetStorageName(), s.locator)
105		if err != nil {
106			return err
107		}
108
109		if err = os.Rename(repoPath, filepath.Join(tempDir.Path(), filepath.Base(repoPath))); err != nil {
110			return fmt.Errorf("error deleting invalid repo: %v", err)
111		}
112
113		ctxlogrus.Extract(ctx).WithField("repo_path", repoPath).Warn("removed invalid repository")
114	}
115
116	if err := s.createFromSnapshot(ctx, in); err != nil {
117		return fmt.Errorf("could not create repository from snapshot: %w", err)
118	}
119
120	return nil
121}
122
123func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
124	tempRepo, tempDir, err := tempdir.NewRepository(ctx, in.GetRepository().GetStorageName(), s.locator)
125	if err != nil {
126		return fmt.Errorf("create temporary directory: %w", err)
127	}
128
129	if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
130		Repository: tempRepo,
131	}); err != nil {
132		return fmt.Errorf("create repository: %w", err)
133	}
134
135	repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
136	if err != nil {
137		return fmt.Errorf("new client: %w", err)
138	}
139
140	stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()})
141	if err != nil {
142		return fmt.Errorf("get snapshot: %w", err)
143	}
144
145	// We need to catch a possible 'invalid repository' error from GetSnapshot. On an empty read,
146	// BSD tar exits with code 0 so we'd receive the error when waiting for the command. GNU tar on
147	// Linux exits with a non-zero code, which causes Go to return an os.ExitError hiding the original
148	// error reading from stdin. To get access to the error on both Linux and macOS, we read the first
149	// message from the stream here to get access to the possible 'invalid repository' first on both
150	// platforms.
151	firstBytes, err := stream.Recv()
152	if err != nil {
153		if st, ok := status.FromError(err); ok {
154			if st.Code() == codes.NotFound && strings.HasPrefix(st.Message(), "GetRepoPath: not a git repository:") {
155				return ErrInvalidSourceRepository
156			}
157		}
158
159		return fmt.Errorf("first snapshot read: %w", err)
160	}
161
162	snapshotReader := io.MultiReader(
163		bytes.NewReader(firstBytes.GetData()),
164		streamio.NewReader(func() ([]byte, error) {
165			resp, err := stream.Recv()
166			return resp.GetData(), err
167		}),
168	)
169
170	stderr := &bytes.Buffer{}
171	cmd, err := command.New(ctx, exec.Command("tar", "-C", tempDir.Path(), "-xvf", "-"), snapshotReader, nil, stderr)
172	if err != nil {
173		return fmt.Errorf("create tar command: %w", err)
174	}
175
176	if err = cmd.Wait(); err != nil {
177		return fmt.Errorf("wait for tar, stderr: %q, err: %w", stderr, err)
178	}
179
180	targetPath, err := s.locator.GetPath(in.GetRepository())
181	if err != nil {
182		return fmt.Errorf("locate repository: %w", err)
183	}
184
185	if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
186		return fmt.Errorf("create parent directories: %w", err)
187	}
188
189	if err := os.Rename(tempDir.Path(), targetPath); err != nil {
190		return fmt.Errorf("move temporary directory to target path: %w", err)
191	}
192
193	return nil
194}
195
196func (s *server) syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
197	repo := s.localrepo(in.GetRepository())
198
199	if err := remote.FetchInternalRemote(ctx, s.cfg, s.conns, repo, in.GetSource()); err != nil {
200		return fmt.Errorf("fetch internal remote: %w", err)
201	}
202
203	return nil
204}
205
206func (s *server) syncGitconfig(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
207	repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
208	if err != nil {
209		return err
210	}
211
212	repoPath, err := s.locator.GetRepoPath(in.GetRepository())
213	if err != nil {
214		return err
215	}
216
217	// At the point of implementing this, the `GetConfig` RPC hasn't been deployed yet and is
218	// thus not available for general use. In theory, we'd have to wait for this release cycle
219	// to finish, and only afterwards would we be able to implement replication of the
220	// gitconfig. In order to allow us to iterate fast, we just try to call `GetConfig()`, but
221	// ignore any errors for the case where the target Gitaly node doesn't support the RPC yet.
222	// TODO: Remove this hack and properly return the error in the next release cycle.
223	if err := func() error {
224		stream, err := repoClient.GetConfig(ctx, &gitalypb.GetConfigRequest{
225			Repository: in.GetSource(),
226		})
227		if err != nil {
228			return err
229		}
230
231		configPath := filepath.Join(repoPath, "config")
232		if err := s.writeFile(ctx, configPath, 0o644, streamio.NewReader(func() ([]byte, error) {
233			resp, err := stream.Recv()
234			return resp.GetData(), err
235		})); err != nil {
236			return err
237		}
238
239		return nil
240	}(); err != nil {
241		ctxlogrus.Extract(ctx).WithError(err).Warn("synchronizing gitconfig failed")
242	}
243
244	return nil
245}
246
247func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
248	repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
249	if err != nil {
250		return err
251	}
252
253	repoPath, err := s.locator.GetRepoPath(in.GetRepository())
254	if err != nil {
255		return err
256	}
257
258	stream, err := repoClient.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{
259		Repository: in.GetSource(),
260	})
261	if err != nil {
262		return err
263	}
264
265	attributesPath := filepath.Join(repoPath, "info", "attributes")
266	if err := s.writeFile(ctx, attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) {
267		resp, err := stream.Recv()
268		return resp.GetAttributes(), err
269	})); err != nil {
270		return err
271	}
272
273	return nil
274}
275
276func (s *server) writeFile(ctx context.Context, path string, mode os.FileMode, reader io.Reader) (returnedErr error) {
277	parentDir := filepath.Dir(path)
278	if err := os.MkdirAll(parentDir, 0o755); err != nil {
279		return err
280	}
281
282	if featureflag.TxExtendedFileLocking.IsEnabled(ctx) {
283		lockedFile, err := safe.NewLockingFileWriter(path, safe.LockingFileWriterConfig{
284			FileWriterConfig: safe.FileWriterConfig{
285				FileMode: mode,
286			},
287		})
288		if err != nil {
289			return fmt.Errorf("creating file writer: %w", err)
290		}
291		defer func() {
292			if err := lockedFile.Close(); err != nil && returnedErr == nil {
293				returnedErr = fmt.Errorf("closing file writer: %w", err)
294			}
295		}()
296
297		if _, err := io.Copy(lockedFile, reader); err != nil {
298			return fmt.Errorf("writing contents: %w", err)
299		}
300
301		if err := transaction.CommitLockedFile(ctx, s.txManager, lockedFile); err != nil {
302			return fmt.Errorf("committing file: %w", err)
303		}
304	} else {
305		fw, err := safe.NewFileWriter(path)
306		if err != nil {
307			return err
308		}
309		defer fw.Close()
310
311		if _, err := io.Copy(fw, reader); err != nil {
312			return err
313		}
314
315		if err = fw.Commit(); err != nil {
316			return err
317		}
318
319		if err := os.Chmod(path, mode); err != nil {
320			return err
321		}
322	}
323
324	return nil
325}
326
327// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
328func (s *server) newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
329	gitalyServerInfo, err := storage.ExtractGitalyServer(ctx, storageName)
330	if err != nil {
331		return nil, err
332	}
333
334	conn, err := s.conns.Dial(ctx, gitalyServerInfo.Address, gitalyServerInfo.Token)
335	if err != nil {
336		return nil, err
337	}
338
339	return gitalypb.NewRepositoryServiceClient(conn), nil
340}
341