1package backup 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "os" 9 "path/filepath" 10 "strings" 11 12 "gitlab.com/gitlab-org/gitaly/v14/client" 13 "gitlab.com/gitlab-org/gitaly/v14/internal/storage" 14 "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" 15 "gitlab.com/gitlab-org/gitaly/v14/streamio" 16 "google.golang.org/grpc/codes" 17 "google.golang.org/grpc/status" 18) 19 20// ErrSkipped means the repository was skipped because there was nothing to backup 21var ErrSkipped = errors.New("repository skipped") 22 23// Filesystem strategy for creating and restoring backups 24type Filesystem struct { 25 path string 26 conns *client.Pool 27} 28 29// NewFilesystem creates a new Filesystem strategy 30func NewFilesystem(path string) *Filesystem { 31 return &Filesystem{ 32 path: path, 33 conns: client.NewPool(), 34 } 35} 36 37// CreateRequest is the request to create a backup 38type CreateRequest struct { 39 Server storage.ServerInfo 40 Repository *gitalypb.Repository 41} 42 43// Create creates a repository backup on a local filesystem 44func (fs *Filesystem) Create(ctx context.Context, req *CreateRequest) error { 45 if isEmpty, err := fs.isEmpty(ctx, req.Server, req.Repository); err != nil { 46 return fmt.Errorf("filesystem: %w", err) 47 } else if isEmpty { 48 return ErrSkipped 49 } 50 51 backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git") 52 bundlePath := backupPath + ".bundle" 53 customHooksPath := filepath.Join(backupPath, "custom_hooks.tar") 54 55 if err := os.MkdirAll(backupPath, 0700); err != nil { 56 return fmt.Errorf("filesystem: %w", err) 57 } 58 if err := fs.writeBundle(ctx, bundlePath, req.Server, req.Repository); err != nil { 59 return fmt.Errorf("filesystem: write bundle: %w", err) 60 } 61 if err := fs.writeCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil { 62 return fmt.Errorf("filesystem: write custom hooks: %w", err) 63 } 64 65 return nil 66} 67 68// RestoreRequest is the request to restore from a backup 69type RestoreRequest struct { 70 Server storage.ServerInfo 71 Repository *gitalypb.Repository 72 AlwaysCreate bool 73} 74 75// Restore restores a repository from a backup on a local filesystem 76func (fs *Filesystem) Restore(ctx context.Context, req *RestoreRequest) error { 77 backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git") 78 bundlePath := backupPath + ".bundle" 79 customHooksPath := filepath.Join(backupPath, "custom_hooks.tar") 80 81 if err := fs.removeRepository(ctx, req.Server, req.Repository); err != nil { 82 return fmt.Errorf("filesystem: %w", err) 83 } 84 if err := fs.restoreBundle(ctx, bundlePath, req.Server, req.Repository); err != nil { 85 // For compatibility with existing backups we need to always create the 86 // repository even if there's no bundle for project repositories 87 // (not wiki or snippet repositories). Gitaly does not know which 88 // repository is which type so here we accept a parameter to tell us 89 // to employ this behaviour. 90 if req.AlwaysCreate && errors.Is(err, ErrSkipped) { 91 if err := fs.createRepository(ctx, req.Server, req.Repository); err != nil { 92 return fmt.Errorf("filesystem: %w", err) 93 } 94 } else { 95 return fmt.Errorf("filesystem: %w", err) 96 } 97 } 98 if err := fs.restoreCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil { 99 return fmt.Errorf("filesystem: %w", err) 100 } 101 return nil 102} 103 104func (fs *Filesystem) isEmpty(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) (bool, error) { 105 repoClient, err := fs.newRepoClient(ctx, server) 106 if err != nil { 107 return false, fmt.Errorf("isEmpty: %w", err) 108 } 109 hasLocalBranches, err := repoClient.HasLocalBranches(ctx, &gitalypb.HasLocalBranchesRequest{Repository: repo}) 110 switch { 111 case status.Code(err) == codes.NotFound: 112 return true, nil 113 case err != nil: 114 return false, fmt.Errorf("isEmpty: %w", err) 115 } 116 return !hasLocalBranches.GetValue(), nil 117} 118 119func (fs *Filesystem) removeRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error { 120 repoClient, err := fs.newRepoClient(ctx, server) 121 if err != nil { 122 return fmt.Errorf("remove repository: %w", err) 123 } 124 if _, err := repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil { 125 return fmt.Errorf("remove repository: %w", err) 126 } 127 return nil 128} 129 130func (fs *Filesystem) createRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error { 131 repoClient, err := fs.newRepoClient(ctx, server) 132 if err != nil { 133 return fmt.Errorf("create repository: %w", err) 134 } 135 if _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}); err != nil { 136 return fmt.Errorf("create repository: %w", err) 137 } 138 return nil 139} 140 141func (fs *Filesystem) writeBundle(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error { 142 repoClient, err := fs.newRepoClient(ctx, server) 143 if err != nil { 144 return err 145 } 146 stream, err := repoClient.CreateBundle(ctx, &gitalypb.CreateBundleRequest{Repository: repo}) 147 if err != nil { 148 return err 149 } 150 bundle := streamio.NewReader(func() ([]byte, error) { 151 resp, err := stream.Recv() 152 return resp.GetData(), err 153 }) 154 return writeFile(path, bundle) 155} 156 157func (fs *Filesystem) restoreBundle(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error { 158 f, err := os.Open(path) 159 if err != nil { 160 if os.IsNotExist(err) { 161 return fmt.Errorf("%w: bundle does not exist: %q", ErrSkipped, path) 162 } 163 return fmt.Errorf("restore bundle: %w", err) 164 } 165 defer f.Close() 166 167 repoClient, err := fs.newRepoClient(ctx, server) 168 if err != nil { 169 return fmt.Errorf("restore bundle: %q: %w", path, err) 170 } 171 stream, err := repoClient.CreateRepositoryFromBundle(ctx) 172 if err != nil { 173 return fmt.Errorf("restore bundle: %q: %w", path, err) 174 } 175 request := &gitalypb.CreateRepositoryFromBundleRequest{Repository: repo} 176 bundle := streamio.NewWriter(func(p []byte) error { 177 request.Data = p 178 if err := stream.Send(request); err != nil { 179 return err 180 } 181 182 // Only set `Repository` on the first `Send` of the stream 183 request = &gitalypb.CreateRepositoryFromBundleRequest{} 184 185 return nil 186 }) 187 if _, err := io.Copy(bundle, f); err != nil { 188 return fmt.Errorf("restore bundle: %q: %w", path, err) 189 } 190 if _, err = stream.CloseAndRecv(); err != nil { 191 return fmt.Errorf("restore bundle: %q: %w", path, err) 192 } 193 return nil 194} 195 196func (fs *Filesystem) writeCustomHooks(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error { 197 repoClient, err := fs.newRepoClient(ctx, server) 198 if err != nil { 199 return err 200 } 201 stream, err := repoClient.BackupCustomHooks(ctx, &gitalypb.BackupCustomHooksRequest{Repository: repo}) 202 if err != nil { 203 return err 204 } 205 hooks := streamio.NewReader(func() ([]byte, error) { 206 resp, err := stream.Recv() 207 return resp.GetData(), err 208 }) 209 if err := writeFile(path, hooks); err != nil { 210 return err 211 } 212 return nil 213} 214 215func (fs *Filesystem) restoreCustomHooks(ctx context.Context, path string, server storage.ServerInfo, repo *gitalypb.Repository) error { 216 f, err := os.Open(path) 217 if err != nil { 218 if os.IsNotExist(err) { 219 return nil 220 } 221 return fmt.Errorf("restore custom hooks: %w", err) 222 } 223 defer f.Close() 224 225 repoClient, err := fs.newRepoClient(ctx, server) 226 if err != nil { 227 return fmt.Errorf("restore custom hooks, %q: %w", path, err) 228 } 229 stream, err := repoClient.RestoreCustomHooks(ctx) 230 if err != nil { 231 return fmt.Errorf("restore custom hooks, %q: %w", path, err) 232 } 233 234 request := &gitalypb.RestoreCustomHooksRequest{Repository: repo} 235 bundle := streamio.NewWriter(func(p []byte) error { 236 request.Data = p 237 if err := stream.Send(request); err != nil { 238 return err 239 } 240 241 // Only set `Repository` on the first `Send` of the stream 242 request = &gitalypb.RestoreCustomHooksRequest{} 243 244 return nil 245 }) 246 if _, err := io.Copy(bundle, f); err != nil { 247 return fmt.Errorf("restore custom hooks, %q: %w", path, err) 248 } 249 if _, err = stream.CloseAndRecv(); err != nil { 250 return fmt.Errorf("restore custom hooks, %q: %w", path, err) 251 } 252 return nil 253} 254 255func (fs *Filesystem) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { 256 conn, err := fs.conns.Dial(ctx, server.Address, server.Token) 257 if err != nil { 258 return nil, err 259 } 260 261 return gitalypb.NewRepositoryServiceClient(conn), nil 262} 263 264func writeFile(path string, r io.Reader) (returnErr error) { 265 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) 266 if err != nil { 267 return fmt.Errorf("write file: %w", err) 268 } 269 defer func() { 270 if err := f.Close(); err != nil && returnErr == nil { 271 returnErr = fmt.Errorf("write file: %w", err) 272 } 273 }() 274 size, err := io.Copy(f, r) 275 if err != nil { 276 return fmt.Errorf("write file %q: %w", path, err) 277 } 278 if size == 0 { 279 // If the file is empty means that we received an empty stream, we delete the file 280 if err := os.Remove(path); err != nil { 281 return fmt.Errorf("write file: %w", err) 282 } 283 return nil 284 } 285 return nil 286} 287