1package testhelper 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 "io/ioutil" 8 "path" 9 "strings" 10 "sync" 11 12 "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 13 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 14 "golang.org/x/net/context" 15 "google.golang.org/grpc" 16 "google.golang.org/grpc/codes" 17 "google.golang.org/grpc/metadata" 18 "google.golang.org/grpc/status" 19 20 "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" 21 "gitlab.com/gitlab-org/labkit/log" 22) 23 24type GitalyTestServer struct { 25 finalMessageCode codes.Code 26 sync.WaitGroup 27 LastIncomingMetadata metadata.MD 28 gitalypb.UnimplementedSmartHTTPServiceServer 29 gitalypb.UnimplementedRepositoryServiceServer 30 gitalypb.UnimplementedBlobServiceServer 31 gitalypb.UnimplementedDiffServiceServer 32} 33 34var ( 35 GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000) 36 GitalyGetBlobResponseMock = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000) 37 GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000) 38 GitalyGetDiffResponseMock = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000) 39 GitalyGetPatchResponseMock = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000) 40 41 GitalyGetSnapshotResponseMock = strings.Repeat("Mock Gitaly GetSnapshotResponse data", 100000) 42 43 GitalyReceivePackResponseMock []byte 44 GitalyUploadPackResponseMock []byte 45) 46 47func init() { 48 var err error 49 if GitalyReceivePackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/receive-pack-fixture.txt")); err != nil { 50 log.WithError(err).Fatal("Unable to read pack response") 51 } 52 if GitalyUploadPackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/upload-pack-fixture.txt")); err != nil { 53 log.WithError(err).Fatal("Unable to read pack response") 54 } 55} 56 57func NewGitalyServer(finalMessageCode codes.Code) *GitalyTestServer { 58 return &GitalyTestServer{finalMessageCode: finalMessageCode} 59} 60 61func (s *GitalyTestServer) InfoRefsUploadPack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error { 62 s.WaitGroup.Add(1) 63 defer s.WaitGroup.Done() 64 65 if err := validateRepository(in.GetRepository()); err != nil { 66 return err 67 } 68 69 fmt.Printf("Result: %+v\n", in) 70 71 marshaler := &jsonpb.Marshaler{} 72 jsonString, err := marshaler.MarshalToString(in) 73 if err != nil { 74 return err 75 } 76 77 data := []byte(strings.Join([]string{ 78 jsonString, 79 "git-upload-pack", 80 GitalyInfoRefsResponseMock, 81 }, "\000")) 82 83 s.LastIncomingMetadata = nil 84 if md, ok := metadata.FromIncomingContext(stream.Context()); ok { 85 s.LastIncomingMetadata = md 86 } 87 88 return s.sendInfoRefs(stream, data) 89} 90 91func (s *GitalyTestServer) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error { 92 s.WaitGroup.Add(1) 93 defer s.WaitGroup.Done() 94 95 if err := validateRepository(in.GetRepository()); err != nil { 96 return err 97 } 98 99 fmt.Printf("Result: %+v\n", in) 100 101 jsonString, err := marshalJSON(in) 102 if err != nil { 103 return err 104 } 105 106 data := []byte(strings.Join([]string{ 107 jsonString, 108 "git-receive-pack", 109 GitalyInfoRefsResponseMock, 110 }, "\000")) 111 112 return s.sendInfoRefs(stream, data) 113} 114 115func marshalJSON(msg proto.Message) (string, error) { 116 marshaler := &jsonpb.Marshaler{} 117 return marshaler.MarshalToString(msg) 118} 119 120type infoRefsSender interface { 121 Send(*gitalypb.InfoRefsResponse) error 122} 123 124func (s *GitalyTestServer) sendInfoRefs(stream infoRefsSender, data []byte) error { 125 nSends, err := sendBytes(data, 100, func(p []byte) error { 126 return stream.Send(&gitalypb.InfoRefsResponse{Data: p}) 127 }) 128 if err != nil { 129 return err 130 } 131 if nSends <= 1 { 132 panic("should have sent more than one message") 133 } 134 135 return s.finalError() 136} 137 138func (s *GitalyTestServer) PostReceivePack(stream gitalypb.SmartHTTPService_PostReceivePackServer) error { 139 s.WaitGroup.Add(1) 140 defer s.WaitGroup.Done() 141 142 req, err := stream.Recv() 143 if err != nil { 144 return err 145 } 146 147 repo := req.GetRepository() 148 if err := validateRepository(repo); err != nil { 149 return err 150 } 151 152 jsonString, err := marshalJSON(req) 153 if err != nil { 154 return err 155 } 156 157 data := []byte(jsonString + "\000") 158 159 // The body of the request starts in the second message 160 for { 161 req, err := stream.Recv() 162 if err != nil { 163 if err != io.EOF { 164 return err 165 } 166 break 167 } 168 169 // We want to echo the request data back 170 data = append(data, req.GetData()...) 171 } 172 173 nSends, _ := sendBytes(data, 100, func(p []byte) error { 174 return stream.Send(&gitalypb.PostReceivePackResponse{Data: p}) 175 }) 176 177 if nSends <= 1 { 178 panic("should have sent more than one message") 179 } 180 181 return s.finalError() 182} 183 184func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { 185 s.WaitGroup.Add(1) 186 defer s.WaitGroup.Done() 187 188 req, err := stream.Recv() 189 if err != nil { 190 return err 191 } 192 193 if err := validateRepository(req.GetRepository()); err != nil { 194 return err 195 } 196 197 marshaler := &jsonpb.Marshaler{} 198 jsonBytes := &bytes.Buffer{} 199 if err := marshaler.Marshal(jsonBytes, req); err != nil { 200 return err 201 } 202 203 if err := stream.Send(&gitalypb.PostUploadPackResponse{ 204 Data: append(jsonBytes.Bytes(), 0), 205 }); err != nil { 206 return err 207 } 208 209 nSends := 0 210 // The body of the request starts in the second message. Gitaly streams PostUploadPack responses 211 // as soon as possible without reading the request completely first. We stream messages here 212 // directly back to the client to simulate the streaming of the actual implementation. 213 for { 214 req, err := stream.Recv() 215 if err != nil { 216 if err != io.EOF { 217 return err 218 } 219 break 220 } 221 222 if err := stream.Send(&gitalypb.PostUploadPackResponse{Data: req.GetData()}); err != nil { 223 return err 224 } 225 226 nSends++ 227 } 228 229 if nSends <= 1 { 230 panic("should have sent more than one message") 231 } 232 233 return s.finalError() 234} 235 236// PostUploadPackWithSidechannel should be a part of smarthttp server in real 237// server. In workhorse, setting up a real sidechannel server is troublesome. 238// Therefore, we bring up a sidechannel server with a mock server exported via 239// gitalyclient.TestSidechannelServer. This is the handler for that mock 240// server. 241func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error { 242 if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { 243 return fmt.Errorf("unexpected method: %s", method) 244 } 245 246 var req gitalypb.PostUploadPackWithSidechannelRequest 247 if err := stream.RecvMsg(&req); err != nil { 248 return err 249 } 250 251 if err := validateRepository(req.GetRepository()); err != nil { 252 return err 253 } 254 255 marshaler := &jsonpb.Marshaler{} 256 jsonBytes := &bytes.Buffer{} 257 if err := marshaler.Marshal(jsonBytes, &req); err != nil { 258 return err 259 } 260 261 // Bounce back all data back to the client, plus flushing bytes 262 if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil { 263 return err 264 } 265 266 if _, err := io.Copy(conn, conn); err != nil { 267 return err 268 } 269 270 return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{}) 271} 272 273func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) { 274 return nil, nil 275} 276 277func (s *GitalyTestServer) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobService_GetBlobServer) error { 278 s.WaitGroup.Add(1) 279 defer s.WaitGroup.Done() 280 281 if err := validateRepository(in.GetRepository()); err != nil { 282 return err 283 } 284 285 response := &gitalypb.GetBlobResponse{ 286 Oid: in.GetOid(), 287 Size: int64(len(GitalyGetBlobResponseMock)), 288 } 289 nSends, err := sendBytes([]byte(GitalyGetBlobResponseMock), 100, func(p []byte) error { 290 response.Data = p 291 292 if err := stream.Send(response); err != nil { 293 return err 294 } 295 296 // Use a new response so we don't send other fields (Size, ...) over and over 297 response = &gitalypb.GetBlobResponse{} 298 299 return nil 300 }) 301 if err != nil { 302 return err 303 } 304 if nSends <= 1 { 305 panic("should have sent more than one message") 306 } 307 308 return s.finalError() 309} 310 311func (s *GitalyTestServer) GetArchive(in *gitalypb.GetArchiveRequest, stream gitalypb.RepositoryService_GetArchiveServer) error { 312 s.WaitGroup.Add(1) 313 defer s.WaitGroup.Done() 314 315 if err := validateRepository(in.GetRepository()); err != nil { 316 return err 317 } 318 319 nSends, err := sendBytes([]byte(GitalyGetArchiveResponseMock), 100, func(p []byte) error { 320 return stream.Send(&gitalypb.GetArchiveResponse{Data: p}) 321 }) 322 if err != nil { 323 return err 324 } 325 if nSends <= 1 { 326 panic("should have sent more than one message") 327 } 328 329 return s.finalError() 330} 331 332func (s *GitalyTestServer) RawDiff(in *gitalypb.RawDiffRequest, stream gitalypb.DiffService_RawDiffServer) error { 333 nSends, err := sendBytes([]byte(GitalyGetDiffResponseMock), 100, func(p []byte) error { 334 return stream.Send(&gitalypb.RawDiffResponse{ 335 Data: p, 336 }) 337 }) 338 if err != nil { 339 return err 340 } 341 if nSends <= 1 { 342 panic("should have sent more than one message") 343 } 344 345 return s.finalError() 346} 347 348func (s *GitalyTestServer) RawPatch(in *gitalypb.RawPatchRequest, stream gitalypb.DiffService_RawPatchServer) error { 349 s.WaitGroup.Add(1) 350 defer s.WaitGroup.Done() 351 352 if err := validateRepository(in.GetRepository()); err != nil { 353 return err 354 } 355 356 nSends, err := sendBytes([]byte(GitalyGetPatchResponseMock), 100, func(p []byte) error { 357 return stream.Send(&gitalypb.RawPatchResponse{ 358 Data: p, 359 }) 360 }) 361 if err != nil { 362 return err 363 } 364 if nSends <= 1 { 365 panic("should have sent more than one message") 366 } 367 368 return s.finalError() 369} 370 371func (s *GitalyTestServer) GetSnapshot(in *gitalypb.GetSnapshotRequest, stream gitalypb.RepositoryService_GetSnapshotServer) error { 372 s.WaitGroup.Add(1) 373 defer s.WaitGroup.Done() 374 375 if err := validateRepository(in.GetRepository()); err != nil { 376 return err 377 } 378 379 nSends, err := sendBytes([]byte(GitalyGetSnapshotResponseMock), 100, func(p []byte) error { 380 return stream.Send(&gitalypb.GetSnapshotResponse{Data: p}) 381 }) 382 if err != nil { 383 return err 384 } 385 if nSends <= 1 { 386 panic("should have sent more than one message") 387 } 388 389 return s.finalError() 390} 391 392// sendBytes returns the number of times the 'sender' function was called and an error. 393func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) { 394 i := 0 395 for ; len(data) > 0; i++ { 396 n := chunkSize 397 if n > len(data) { 398 n = len(data) 399 } 400 401 if err := sender(data[:n]); err != nil { 402 return i, err 403 } 404 data = data[n:] 405 } 406 407 return i, nil 408} 409 410func (s *GitalyTestServer) finalError() error { 411 if code := s.finalMessageCode; code != codes.OK { 412 return status.Errorf(code, "error as specified by test") 413 } 414 415 return nil 416} 417 418func validateRepository(repo *gitalypb.Repository) error { 419 if len(repo.GetStorageName()) == 0 { 420 return fmt.Errorf("missing storage_name: %v", repo) 421 } 422 if len(repo.GetRelativePath()) == 0 { 423 return fmt.Errorf("missing relative_path: %v", repo) 424 } 425 return nil 426} 427