1package testhelper
2
3import (
4	"bytes"
5	"fmt"
6	"io"
7	"io/ioutil"
8	"path"
9	"strings"
10	"sync"
11
12	"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
13	"github.com/golang/protobuf/proto"  //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
14	"golang.org/x/net/context"
15	"google.golang.org/grpc"
16	"google.golang.org/grpc/codes"
17	"google.golang.org/grpc/metadata"
18	"google.golang.org/grpc/status"
19
20	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
21	"gitlab.com/gitlab-org/labkit/log"
22)
23
24type GitalyTestServer struct {
25	finalMessageCode codes.Code
26	sync.WaitGroup
27	LastIncomingMetadata metadata.MD
28	gitalypb.UnimplementedSmartHTTPServiceServer
29	gitalypb.UnimplementedRepositoryServiceServer
30	gitalypb.UnimplementedBlobServiceServer
31	gitalypb.UnimplementedDiffServiceServer
32}
33
34var (
35	GitalyInfoRefsResponseMock   = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
36	GitalyGetBlobResponseMock    = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000)
37	GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000)
38	GitalyGetDiffResponseMock    = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000)
39	GitalyGetPatchResponseMock   = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000)
40
41	GitalyGetSnapshotResponseMock = strings.Repeat("Mock Gitaly GetSnapshotResponse data", 100000)
42
43	GitalyReceivePackResponseMock []byte
44	GitalyUploadPackResponseMock  []byte
45)
46
47func init() {
48	var err error
49	if GitalyReceivePackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/receive-pack-fixture.txt")); err != nil {
50		log.WithError(err).Fatal("Unable to read pack response")
51	}
52	if GitalyUploadPackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/upload-pack-fixture.txt")); err != nil {
53		log.WithError(err).Fatal("Unable to read pack response")
54	}
55}
56
57func NewGitalyServer(finalMessageCode codes.Code) *GitalyTestServer {
58	return &GitalyTestServer{finalMessageCode: finalMessageCode}
59}
60
61func (s *GitalyTestServer) InfoRefsUploadPack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error {
62	s.WaitGroup.Add(1)
63	defer s.WaitGroup.Done()
64
65	if err := validateRepository(in.GetRepository()); err != nil {
66		return err
67	}
68
69	fmt.Printf("Result: %+v\n", in)
70
71	marshaler := &jsonpb.Marshaler{}
72	jsonString, err := marshaler.MarshalToString(in)
73	if err != nil {
74		return err
75	}
76
77	data := []byte(strings.Join([]string{
78		jsonString,
79		"git-upload-pack",
80		GitalyInfoRefsResponseMock,
81	}, "\000"))
82
83	s.LastIncomingMetadata = nil
84	if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
85		s.LastIncomingMetadata = md
86	}
87
88	return s.sendInfoRefs(stream, data)
89}
90
91func (s *GitalyTestServer) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error {
92	s.WaitGroup.Add(1)
93	defer s.WaitGroup.Done()
94
95	if err := validateRepository(in.GetRepository()); err != nil {
96		return err
97	}
98
99	fmt.Printf("Result: %+v\n", in)
100
101	jsonString, err := marshalJSON(in)
102	if err != nil {
103		return err
104	}
105
106	data := []byte(strings.Join([]string{
107		jsonString,
108		"git-receive-pack",
109		GitalyInfoRefsResponseMock,
110	}, "\000"))
111
112	return s.sendInfoRefs(stream, data)
113}
114
115func marshalJSON(msg proto.Message) (string, error) {
116	marshaler := &jsonpb.Marshaler{}
117	return marshaler.MarshalToString(msg)
118}
119
120type infoRefsSender interface {
121	Send(*gitalypb.InfoRefsResponse) error
122}
123
124func (s *GitalyTestServer) sendInfoRefs(stream infoRefsSender, data []byte) error {
125	nSends, err := sendBytes(data, 100, func(p []byte) error {
126		return stream.Send(&gitalypb.InfoRefsResponse{Data: p})
127	})
128	if err != nil {
129		return err
130	}
131	if nSends <= 1 {
132		panic("should have sent more than one message")
133	}
134
135	return s.finalError()
136}
137
138func (s *GitalyTestServer) PostReceivePack(stream gitalypb.SmartHTTPService_PostReceivePackServer) error {
139	s.WaitGroup.Add(1)
140	defer s.WaitGroup.Done()
141
142	req, err := stream.Recv()
143	if err != nil {
144		return err
145	}
146
147	repo := req.GetRepository()
148	if err := validateRepository(repo); err != nil {
149		return err
150	}
151
152	jsonString, err := marshalJSON(req)
153	if err != nil {
154		return err
155	}
156
157	data := []byte(jsonString + "\000")
158
159	// The body of the request starts in the second message
160	for {
161		req, err := stream.Recv()
162		if err != nil {
163			if err != io.EOF {
164				return err
165			}
166			break
167		}
168
169		// We want to echo the request data back
170		data = append(data, req.GetData()...)
171	}
172
173	nSends, _ := sendBytes(data, 100, func(p []byte) error {
174		return stream.Send(&gitalypb.PostReceivePackResponse{Data: p})
175	})
176
177	if nSends <= 1 {
178		panic("should have sent more than one message")
179	}
180
181	return s.finalError()
182}
183
184func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
185	s.WaitGroup.Add(1)
186	defer s.WaitGroup.Done()
187
188	req, err := stream.Recv()
189	if err != nil {
190		return err
191	}
192
193	if err := validateRepository(req.GetRepository()); err != nil {
194		return err
195	}
196
197	marshaler := &jsonpb.Marshaler{}
198	jsonBytes := &bytes.Buffer{}
199	if err := marshaler.Marshal(jsonBytes, req); err != nil {
200		return err
201	}
202
203	if err := stream.Send(&gitalypb.PostUploadPackResponse{
204		Data: append(jsonBytes.Bytes(), 0),
205	}); err != nil {
206		return err
207	}
208
209	nSends := 0
210	// The body of the request starts in the second message. Gitaly streams PostUploadPack responses
211	// as soon as possible without reading the request completely first. We stream messages here
212	// directly back to the client to simulate the streaming of the actual implementation.
213	for {
214		req, err := stream.Recv()
215		if err != nil {
216			if err != io.EOF {
217				return err
218			}
219			break
220		}
221
222		if err := stream.Send(&gitalypb.PostUploadPackResponse{Data: req.GetData()}); err != nil {
223			return err
224		}
225
226		nSends++
227	}
228
229	if nSends <= 1 {
230		panic("should have sent more than one message")
231	}
232
233	return s.finalError()
234}
235
236// PostUploadPackWithSidechannel should be a part of smarthttp server in real
237// server. In workhorse, setting up a real sidechannel server is troublesome.
238// Therefore, we bring up a sidechannel server with a mock server exported via
239// gitalyclient.TestSidechannelServer. This is the handler for that mock
240// server.
241func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error {
242	if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" {
243		return fmt.Errorf("unexpected method: %s", method)
244	}
245
246	var req gitalypb.PostUploadPackWithSidechannelRequest
247	if err := stream.RecvMsg(&req); err != nil {
248		return err
249	}
250
251	if err := validateRepository(req.GetRepository()); err != nil {
252		return err
253	}
254
255	marshaler := &jsonpb.Marshaler{}
256	jsonBytes := &bytes.Buffer{}
257	if err := marshaler.Marshal(jsonBytes, &req); err != nil {
258		return err
259	}
260
261	// Bounce back all data back to the client, plus flushing bytes
262	if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil {
263		return err
264	}
265
266	if _, err := io.Copy(conn, conn); err != nil {
267		return err
268	}
269
270	return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
271}
272
273func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
274	return nil, nil
275}
276
277func (s *GitalyTestServer) GetBlob(in *gitalypb.GetBlobRequest, stream gitalypb.BlobService_GetBlobServer) error {
278	s.WaitGroup.Add(1)
279	defer s.WaitGroup.Done()
280
281	if err := validateRepository(in.GetRepository()); err != nil {
282		return err
283	}
284
285	response := &gitalypb.GetBlobResponse{
286		Oid:  in.GetOid(),
287		Size: int64(len(GitalyGetBlobResponseMock)),
288	}
289	nSends, err := sendBytes([]byte(GitalyGetBlobResponseMock), 100, func(p []byte) error {
290		response.Data = p
291
292		if err := stream.Send(response); err != nil {
293			return err
294		}
295
296		// Use a new response so we don't send other fields (Size, ...) over and over
297		response = &gitalypb.GetBlobResponse{}
298
299		return nil
300	})
301	if err != nil {
302		return err
303	}
304	if nSends <= 1 {
305		panic("should have sent more than one message")
306	}
307
308	return s.finalError()
309}
310
311func (s *GitalyTestServer) GetArchive(in *gitalypb.GetArchiveRequest, stream gitalypb.RepositoryService_GetArchiveServer) error {
312	s.WaitGroup.Add(1)
313	defer s.WaitGroup.Done()
314
315	if err := validateRepository(in.GetRepository()); err != nil {
316		return err
317	}
318
319	nSends, err := sendBytes([]byte(GitalyGetArchiveResponseMock), 100, func(p []byte) error {
320		return stream.Send(&gitalypb.GetArchiveResponse{Data: p})
321	})
322	if err != nil {
323		return err
324	}
325	if nSends <= 1 {
326		panic("should have sent more than one message")
327	}
328
329	return s.finalError()
330}
331
332func (s *GitalyTestServer) RawDiff(in *gitalypb.RawDiffRequest, stream gitalypb.DiffService_RawDiffServer) error {
333	nSends, err := sendBytes([]byte(GitalyGetDiffResponseMock), 100, func(p []byte) error {
334		return stream.Send(&gitalypb.RawDiffResponse{
335			Data: p,
336		})
337	})
338	if err != nil {
339		return err
340	}
341	if nSends <= 1 {
342		panic("should have sent more than one message")
343	}
344
345	return s.finalError()
346}
347
348func (s *GitalyTestServer) RawPatch(in *gitalypb.RawPatchRequest, stream gitalypb.DiffService_RawPatchServer) error {
349	s.WaitGroup.Add(1)
350	defer s.WaitGroup.Done()
351
352	if err := validateRepository(in.GetRepository()); err != nil {
353		return err
354	}
355
356	nSends, err := sendBytes([]byte(GitalyGetPatchResponseMock), 100, func(p []byte) error {
357		return stream.Send(&gitalypb.RawPatchResponse{
358			Data: p,
359		})
360	})
361	if err != nil {
362		return err
363	}
364	if nSends <= 1 {
365		panic("should have sent more than one message")
366	}
367
368	return s.finalError()
369}
370
371func (s *GitalyTestServer) GetSnapshot(in *gitalypb.GetSnapshotRequest, stream gitalypb.RepositoryService_GetSnapshotServer) error {
372	s.WaitGroup.Add(1)
373	defer s.WaitGroup.Done()
374
375	if err := validateRepository(in.GetRepository()); err != nil {
376		return err
377	}
378
379	nSends, err := sendBytes([]byte(GitalyGetSnapshotResponseMock), 100, func(p []byte) error {
380		return stream.Send(&gitalypb.GetSnapshotResponse{Data: p})
381	})
382	if err != nil {
383		return err
384	}
385	if nSends <= 1 {
386		panic("should have sent more than one message")
387	}
388
389	return s.finalError()
390}
391
392// sendBytes returns the number of times the 'sender' function was called and an error.
393func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) {
394	i := 0
395	for ; len(data) > 0; i++ {
396		n := chunkSize
397		if n > len(data) {
398			n = len(data)
399		}
400
401		if err := sender(data[:n]); err != nil {
402			return i, err
403		}
404		data = data[n:]
405	}
406
407	return i, nil
408}
409
410func (s *GitalyTestServer) finalError() error {
411	if code := s.finalMessageCode; code != codes.OK {
412		return status.Errorf(code, "error as specified by test")
413	}
414
415	return nil
416}
417
418func validateRepository(repo *gitalypb.Repository) error {
419	if len(repo.GetStorageName()) == 0 {
420		return fmt.Errorf("missing storage_name: %v", repo)
421	}
422	if len(repo.GetRelativePath()) == 0 {
423		return fmt.Errorf("missing relative_path: %v", repo)
424	}
425	return nil
426}
427