1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package internal
16
17import (
18	"bytes"
19	"context"
20	"errors"
21	"fmt"
22	"io"
23	"log"
24	"sync"
25	"testing"
26
27	"google.golang.org/grpc"
28
29	pb "google.golang.org/genproto/googleapis/bytestream"
30)
31
32const (
33	testName = "testName"
34	testData = "0123456789"
35)
36
37var (
38	setupServerOnce sync.Once
39	server          *Server
40)
41
42func TestNewServerWithInvalidInputs(t *testing.T) {
43	_, err := NewServer(grpc.NewServer(), nil, nil)
44	if err == nil {
45		t.Fatal("NewServer(nil, nil) should not succeed")
46	}
47}
48
49func TestServerWrite(t *testing.T) {
50	testCases := []struct {
51		name              string
52		writeHandler      WriteHandler
53		input             []interface{}
54		writeCount        int
55		allowEmptyCommits bool
56		allowOverwrite    bool
57		wantErr           bool
58		wantResponse      int
59	}{
60		{
61			name:         "empty resource name",
62			writeHandler: &TestWriteHandler{},
63			input: []interface{}{
64				&pb.WriteRequest{
65					FinishWrite: true,
66					Data:        []byte(testData),
67				},
68			},
69			writeCount:   1,
70			wantErr:      true,
71			wantResponse: 0,
72		}, {
73			name:         "Recv returns io.EOF",
74			writeHandler: &TestWriteHandler{},
75			input: []interface{}{
76				io.EOF,
77			},
78			writeCount:   1,
79			wantErr:      false,
80			wantResponse: 0,
81		}, {
82			name:         "Recv returns error, 0 WriteRequests",
83			writeHandler: &TestWriteHandler{},
84			input: []interface{}{
85				errors.New("Recv returns error, 0 WriteRequests"),
86			},
87			writeCount:   1,
88			wantErr:      true,
89			wantResponse: 0,
90		}, {
91			name:         "simple test",
92			writeHandler: &TestWriteHandler{},
93			input: []interface{}{
94				&pb.WriteRequest{
95					ResourceName: testName,
96					WriteOffset:  0,
97					FinishWrite:  true,
98					Data:         []byte(testData),
99				},
100				io.EOF,
101			},
102			writeCount:   1,
103			wantResponse: 1,
104		}, {
105			name:         "Recv returns error, 1 WriteRequests",
106			writeHandler: &TestWriteHandler{},
107			input: []interface{}{
108				&pb.WriteRequest{
109					ResourceName: testName,
110					WriteOffset:  0,
111					FinishWrite:  false,
112					Data:         []byte(testData),
113				},
114				errors.New("Recv returns error, 1 WriteRequests"),
115			},
116			writeCount:   1,
117			wantErr:      true,
118			wantResponse: 0,
119		}, {
120			name:         "attempt to overwrite the same name",
121			writeHandler: &TestWriteHandler{},
122			input: []interface{}{
123				&pb.WriteRequest{
124					ResourceName: testName,
125					WriteOffset:  0,
126					FinishWrite:  true,
127					Data:         []byte(testData),
128				},
129				io.EOF,
130				&pb.WriteRequest{
131					ResourceName: testName,
132					WriteOffset:  0,
133					FinishWrite:  true,
134					Data:         []byte(testData),
135				},
136			},
137			writeCount:   2,
138			wantErr:      true,
139			wantResponse: 1,
140		}, {
141			name:         "overwrite with the same name + AllowOverwrite",
142			writeHandler: &TestWriteHandler{},
143			input: []interface{}{
144				&pb.WriteRequest{
145					ResourceName: testName,
146					WriteOffset:  0,
147					FinishWrite:  true,
148					Data:         []byte(testData),
149				},
150				io.EOF,
151				&pb.WriteRequest{
152					ResourceName: testName,
153					WriteOffset:  0,
154					FinishWrite:  true,
155					Data:         []byte(testData),
156				},
157				io.EOF,
158			},
159			writeCount:     2,
160			allowOverwrite: true,
161			wantResponse:   2,
162		}, {
163			name:         "two WriteRequests - 1st is empty",
164			writeHandler: &TestWriteHandler{},
165			input: []interface{}{
166				&pb.WriteRequest{
167					ResourceName: testName,
168					WriteOffset:  0,
169					FinishWrite:  false,
170					Data:         nil,
171				},
172				&pb.WriteRequest{
173					ResourceName: testName,
174					WriteOffset:  0,
175					FinishWrite:  true,
176					Data:         []byte(testData),
177				},
178				io.EOF,
179			},
180			writeCount:        1,
181			wantResponse:      1,
182			allowEmptyCommits: true,
183		}, {
184			name:         "two WriteRequests - 2nd is empty",
185			writeHandler: &TestWriteHandler{},
186			input: []interface{}{
187				&pb.WriteRequest{
188					ResourceName: testName,
189					WriteOffset:  0,
190					FinishWrite:  false,
191					Data:         []byte(testData),
192				},
193				&pb.WriteRequest{
194					ResourceName: testName,
195					WriteOffset:  int64(len(testData)),
196					FinishWrite:  true,
197					Data:         nil,
198				},
199				io.EOF,
200			},
201			writeCount:   1,
202			wantResponse: 1,
203		}, {
204			name:         "two WriteRequests - all empty",
205			writeHandler: &TestWriteHandler{},
206			input: []interface{}{
207				&pb.WriteRequest{
208					ResourceName: testName,
209					WriteOffset:  0,
210					FinishWrite:  false,
211					Data:         nil,
212				},
213				&pb.WriteRequest{
214					ResourceName: testName,
215					WriteOffset:  0,
216					FinishWrite:  true,
217					Data:         nil,
218				},
219			},
220			writeCount:        1,
221			wantErr:           true,
222			wantResponse:      1,
223			allowEmptyCommits: true,
224		}, {
225			name:         "two WriteRequests - varying offset",
226			writeHandler: &TestWriteHandler{},
227			input: []interface{}{
228				&pb.WriteRequest{
229					ResourceName: testName,
230					WriteOffset:  100,
231					FinishWrite:  false,
232					Data:         []byte(testData),
233				},
234				&pb.WriteRequest{
235					ResourceName: testName,
236					WriteOffset:  100 + int64(len(testData)),
237					FinishWrite:  true,
238					Data:         []byte(testData),
239				},
240				io.EOF,
241			},
242			writeCount:   1,
243			wantResponse: 1,
244		}, {
245			name:         "two WriteRequests - disjoint offset",
246			writeHandler: &TestWriteHandler{},
247			input: []interface{}{
248				&pb.WriteRequest{
249					ResourceName: testName,
250					WriteOffset:  100,
251					FinishWrite:  false,
252					Data:         []byte(testData),
253				},
254				&pb.WriteRequest{
255					ResourceName: testName,
256					WriteOffset:  200,
257					FinishWrite:  true,
258					Data:         []byte(testData),
259				},
260			},
261			writeCount:   1,
262			wantErr:      true,
263			wantResponse: 0,
264		}, {
265			name:         "fails with UngettableWriteHandler",
266			writeHandler: &UngettableWriteHandler{},
267			input: []interface{}{
268				&pb.WriteRequest{
269					ResourceName: testName,
270					WriteOffset:  0,
271					FinishWrite:  true,
272					Data:         []byte(testData),
273				},
274			},
275			writeCount: 1,
276			wantErr:    true,
277		}, {
278			name:         "fails with UnwritableWriteHandler",
279			writeHandler: &UnwritableWriteHandler{},
280			input: []interface{}{
281				&pb.WriteRequest{
282					ResourceName: testName,
283					WriteOffset:  0,
284					FinishWrite:  true,
285					Data:         []byte(testData),
286				},
287			},
288			writeCount: 1,
289			wantErr:    true,
290		}, {
291			name:         "fails with UnclosableWriteHandler",
292			writeHandler: &UnclosableWriteHandler{},
293			input: []interface{}{
294				&pb.WriteRequest{
295					ResourceName: testName,
296					WriteOffset:  0,
297					FinishWrite:  true,
298					Data:         []byte(testData),
299				},
300			},
301			writeCount:   1,
302			wantErr:      true,
303			wantResponse: 1,
304		}, {
305			name:         "fails with nil WriteHandler",
306			writeHandler: nil,
307			input: []interface{}{
308				&pb.WriteRequest{
309					ResourceName: testName,
310					WriteOffset:  0,
311					FinishWrite:  true,
312					Data:         []byte(testData),
313				},
314			},
315			writeCount: 1,
316			wantErr:    true,
317		},
318	}
319
320	ctx := context.Background()
321	for _, tc := range testCases {
322		readHandler := &TestReadHandler{}
323		if tc.writeHandler != nil {
324			readHandler = nil
325		}
326		setupServer(readHandler, tc.writeHandler)
327		server.AllowOverwrite = tc.allowOverwrite
328		var requestCount, responseCount int
329		var err error
330
331		for i := 0; i < tc.writeCount; i++ {
332			err = server.rpc.Write(&fakeWriteServerImpl{
333				ctx: ctx,
334				receiver: func() (*pb.WriteRequest, error) {
335					if requestCount >= len(tc.input) {
336						t.Fatalf("%s: got %d call(s) to Recv, want %d from len(input)", tc.name, requestCount+1, len(tc.input))
337					}
338					v := tc.input[requestCount]
339					requestCount++
340					request, ok := v.(*pb.WriteRequest)
341					if ok {
342						return request, nil
343					}
344					err, ok := v.(error)
345					if !ok {
346						t.Fatalf("%s: unknown input: %v", tc.name, v)
347					}
348					return nil, err
349				},
350				sender: func(response *pb.WriteResponse) error {
351					if !tc.allowEmptyCommits && response.CommittedSize == 0 {
352						t.Fatalf("%s: invalid response: WriteResponse %v", tc.name, response)
353					}
354					responseCount++
355					return nil
356				},
357			})
358			gotErr := (err != nil)
359			if i+1 < tc.writeCount {
360				if gotErr {
361					t.Errorf("%s: Write got err=%v, wantErr=%t, but on Write[%d/%d]. Error should not happen until last call to Write.", tc.name, err, tc.wantErr, i+1, tc.writeCount)
362					break // The t.Errorf conditions below may erroneously fire, pay them no mind.
363				}
364			} else if gotErr != tc.wantErr {
365				t.Errorf("%s: Write got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
366				break // The t.Errorf conditions below may erroneously fire, pay them no mind.
367			}
368		}
369		if requestCount != len(tc.input) {
370			t.Errorf("%s: got %d call(s) to Recv, want %d", tc.name, requestCount, len(tc.input))
371		}
372		if responseCount != tc.wantResponse {
373			t.Errorf("%s: got %d call(s) to SendProto, want %d", tc.name, responseCount, tc.wantResponse)
374		}
375	}
376}
377
378func TestServerWrite_SendAndCloseError(t *testing.T) {
379	const (
380		wantRequest  = 2
381		wantResponse = 1
382	)
383
384	ctx := context.Background()
385	setupServer(nil, &TestWriteHandler{})
386	var requestCount, responseCount int
387
388	err := server.rpc.Write(&fakeWriteServerImpl{
389		ctx: ctx,
390		receiver: func() (*pb.WriteRequest, error) {
391			if requestCount >= wantRequest {
392				t.Fatalf("got %d call(s) to Recv, want %d", requestCount+1, wantRequest)
393			}
394			requestCount++
395			return &pb.WriteRequest{
396				ResourceName: testName,
397				WriteOffset:  0,
398				FinishWrite:  true,
399				Data:         []byte(testData),
400			}, nil
401		},
402		sender: func(response *pb.WriteResponse) error {
403			responseCount++
404			return errors.New("TestServerWrite SendProto error")
405		},
406	})
407	if err == nil {
408		t.Errorf("Write should have failed, but succeeded")
409	}
410	if requestCount != wantRequest {
411		t.Errorf("got %d call(s) to Recv, want %d", requestCount, wantRequest)
412	}
413	if responseCount != wantResponse {
414		t.Errorf("got %d call(s) to SendProto, want %d", responseCount, wantResponse)
415	}
416}
417
418func TestQueryWriteStatus(t *testing.T) {
419	testCases := []struct {
420		name         string
421		existingName string
422		requestName  string
423		wantErr      bool
424	}{
425		{
426			name:         "existing name should work",
427			existingName: testName,
428			requestName:  testName,
429		}, {
430			name:         "missing name should break",
431			existingName: testName,
432			requestName:  "invalidName",
433			wantErr:      true,
434		},
435	}
436
437	ctx := context.Background()
438	for _, tc := range testCases {
439		setupServer(nil, &TestWriteHandler{})
440		server.status[tc.existingName] = &pb.QueryWriteStatusResponse{}
441
442		_, err := server.rpc.QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{
443			ResourceName: tc.requestName,
444		})
445
446		if gotErr := (err != nil); gotErr != tc.wantErr {
447			t.Errorf("%s: QueryWriteStatus(%q) got err=%v, wantErr=%t", tc.name, tc.requestName, err, tc.wantErr)
448		}
449	}
450}
451
452func TestServerRead(t *testing.T) {
453	testCases := []struct {
454		name         string
455		readHandler  ReadHandler
456		input        *pb.ReadRequest
457		readCount    int
458		wantErr      bool
459		wantResponse []string
460	}{
461		{
462			name:        "empty resource name",
463			readHandler: &TestReadHandler{},
464			input: &pb.ReadRequest{
465				ReadLimit: 1,
466			},
467			readCount:    1,
468			wantErr:      true,
469			wantResponse: []string{},
470		}, {
471			name:        "test ReadLimit=-1",
472			readHandler: &TestReadHandler{buf: testData},
473			input: &pb.ReadRequest{
474				ResourceName: testName,
475				ReadOffset:   0,
476				ReadLimit:    -1,
477			},
478			readCount: 1,
479			wantErr:   true,
480		}, {
481			name:        "test ReadLimit=1",
482			readHandler: &TestReadHandler{buf: testData},
483			input: &pb.ReadRequest{
484				ResourceName: testName,
485				ReadOffset:   0,
486				ReadLimit:    1,
487			},
488			readCount:    1,
489			wantResponse: []string{"0"},
490		}, {
491			name:        "test ReadLimit=2",
492			readHandler: &TestReadHandler{buf: testData},
493			input: &pb.ReadRequest{
494				ResourceName: testName,
495				ReadOffset:   0,
496				ReadLimit:    2,
497			},
498			readCount:    1,
499			wantResponse: []string{"01"},
500		}, {
501			name:        "test ReadOffset=1 ReadLimit=2",
502			readHandler: &TestReadHandler{buf: testData},
503			input: &pb.ReadRequest{
504				ResourceName: testName,
505				ReadOffset:   1,
506				ReadLimit:    2,
507			},
508			readCount:    1,
509			wantResponse: []string{"12"},
510		}, {
511			name:        "test ReadOffset=2 ReadLimit=2",
512			readHandler: &TestReadHandler{buf: testData},
513			input: &pb.ReadRequest{
514				ResourceName: testName,
515				ReadOffset:   2,
516				ReadLimit:    2,
517			},
518			readCount:    1,
519			wantResponse: []string{"23"},
520		}, {
521			name:        "read all testData at exactly the limit",
522			readHandler: &TestReadHandler{buf: testData},
523			input: &pb.ReadRequest{
524				ResourceName: testName,
525				ReadOffset:   0,
526				ReadLimit:    int64(len(testData)),
527			},
528			readCount:    1,
529			wantResponse: []string{"0123456789"},
530		}, {
531			name:        "read all testData",
532			readHandler: &TestReadHandler{buf: testData},
533			input: &pb.ReadRequest{
534				ResourceName: testName,
535				ReadOffset:   0,
536				ReadLimit:    int64(len(testData)) * 2,
537			},
538			readCount:    1,
539			wantResponse: []string{"0123456789"},
540		}, {
541			name:        "read all testData 2 times",
542			readHandler: &TestReadHandler{buf: testData},
543			input: &pb.ReadRequest{
544				ResourceName: testName,
545				ReadOffset:   0,
546				ReadLimit:    int64(len(testData)) * 2,
547			},
548			readCount:    2,
549			wantResponse: []string{"0123456789", "0123456789"},
550		}, {
551			name:        "test ReadLimit=0",
552			readHandler: &TestReadHandler{buf: testData},
553			input: &pb.ReadRequest{
554				ResourceName: testName,
555				ReadOffset:   0,
556				ReadLimit:    0,
557			},
558			readCount:    1,
559			wantResponse: []string{"0123456789"},
560		}, {
561			name:        "test ReadLimit=1000",
562			readHandler: &TestReadHandler{buf: testData},
563			input: &pb.ReadRequest{
564				ResourceName: testName,
565				ReadOffset:   0,
566				ReadLimit:    1000,
567			},
568			readCount:    1,
569			wantResponse: []string{"0123456789"},
570		}, {
571			name:        "fails with UngettableReadHandler",
572			readHandler: &UngettableReadHandler{},
573			input: &pb.ReadRequest{
574				ResourceName: testName,
575				ReadOffset:   0,
576				ReadLimit:    int64(len(testData)),
577			},
578			readCount: 1,
579			wantErr:   true,
580		}, {
581			name:        "fails with UnreadableReadHandler",
582			readHandler: &UnreadableReadHandler{},
583			input: &pb.ReadRequest{
584				ResourceName: testName,
585				ReadOffset:   0,
586				ReadLimit:    int64(len(testData)),
587			},
588			readCount: 1,
589			wantErr:   true,
590		}, {
591			name:        "fails with UnclosableReadHandler",
592			readHandler: &UnclosableReadHandler{buf: testData},
593			input: &pb.ReadRequest{
594				ResourceName: testName,
595				ReadOffset:   0,
596				ReadLimit:    int64(len(testData)) * 2,
597			},
598			readCount:    1,
599			wantErr:      true,
600			wantResponse: []string{"0123456789"},
601		}, {
602			name:        "fails with nil ReadRequest",
603			readHandler: &TestReadHandler{buf: testData},
604			readCount:   1,
605			wantErr:     true,
606		}, {
607			name:        "fails with nil ReadHandler",
608			readHandler: nil,
609			readCount:   1,
610			wantErr:     true,
611		},
612	}
613
614	ctx := context.Background()
615	for _, tc := range testCases {
616		var writeHandler WriteHandler
617		if tc.readHandler == nil {
618			writeHandler = &TestWriteHandler{}
619		}
620		setupServer(tc.readHandler, writeHandler)
621		var responseCount int
622		var err error
623
624		for i := 0; i < tc.readCount; i++ {
625			err = server.rpc.Read(tc.input, &fakeReadServerImpl{
626				ctx: ctx,
627				sender: func(response *pb.ReadResponse) error {
628					if responseCount >= len(tc.wantResponse) {
629						t.Fatalf("%s: got %d call(s) to Send(), want %d", tc.name, responseCount+1, len(tc.wantResponse))
630					}
631					if got, want := string(response.Data), tc.wantResponse[responseCount]; got != want {
632						t.Fatalf("%s: response[%d] got %q, want %q", tc.name, responseCount, got, want)
633					}
634					responseCount++
635					return nil
636				},
637			})
638			gotErr := (err != nil)
639			if i+1 < tc.readCount {
640				if gotErr {
641					t.Errorf("%s: Read got err=%v, wantErr=%t, but on Read[%d/%d]. Error should not happen until last call to Read", tc.name, err, tc.wantErr, i+1, tc.readCount)
642					break
643				}
644			} else if gotErr != tc.wantErr {
645				t.Errorf("%s: Read got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
646				break
647			}
648		}
649		if responseCount != len(tc.wantResponse) {
650			t.Errorf("%s: got %d call(s) to Send, want %d", tc.name, responseCount, len(tc.wantResponse))
651		}
652	}
653}
654
655func TestServerRead_SendError(t *testing.T) {
656	setupServer(&TestReadHandler{buf: testData}, nil)
657
658	err := server.rpc.Read(&pb.ReadRequest{
659		ResourceName: testName,
660		ReadOffset:   0,
661		ReadLimit:    int64(len(testData)) * 2,
662	}, &fakeReadServerImpl{
663		ctx: context.Background(),
664		sender: func(response *pb.ReadResponse) error {
665			if string(response.Data) != testData {
666				t.Fatalf("Send: got %v, want %q", response, testData)
667			}
668			return errors.New("TestServerRead Send() error")
669		},
670	})
671
672	if err == nil {
673		t.Fatal("Read() should have failed, but succeeded")
674	}
675}
676
677type fakeWriteServerImpl struct {
678	pb.ByteStream_WriteServer
679	ctx      context.Context
680	receiver func() (*pb.WriteRequest, error)
681	sender   func(*pb.WriteResponse) error
682}
683
684func (fake *fakeWriteServerImpl) Context() context.Context {
685	return fake.ctx
686}
687
688func (fake *fakeWriteServerImpl) Recv() (*pb.WriteRequest, error) {
689	return fake.receiver()
690}
691
692func (fake *fakeWriteServerImpl) SendMsg(m interface{}) error {
693	return fake.sender(m.(*pb.WriteResponse))
694}
695
696func (fake *fakeWriteServerImpl) SendAndClose(m *pb.WriteResponse) error {
697	fake.sender(m)
698	return nil
699}
700
701type fakeReadServerImpl struct {
702	pb.ByteStream_ReadServer
703	ctx    context.Context
704	sender func(*pb.ReadResponse) error
705}
706
707func (fake *fakeReadServerImpl) Context() context.Context {
708	return fake.ctx
709}
710
711func (fake *fakeReadServerImpl) Send(response *pb.ReadResponse) error {
712	return fake.sender(response)
713}
714
715type TestWriteHandler struct {
716	buf  bytes.Buffer // bytes.Buffer implements io.Writer
717	name string       // This service can handle one name only.
718}
719
720func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
721	if w.name == "" {
722		w.name = name
723	} else if w.name != name {
724		return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
725	}
726	// initOffset is ignored.
727	return &w.buf, nil
728}
729
730func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
731	w.name = ""
732	w.buf.Reset()
733	return nil
734}
735
736type UngettableWriteHandler struct{}
737
738func (w *UngettableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
739	return nil, errors.New("UngettableWriteHandler.GetWriter() always fails")
740}
741
742func (w *UngettableWriteHandler) Close(ctx context.Context, name string) error {
743	return nil
744}
745
746type UnwritableWriter struct{}
747
748func (w *UnwritableWriter) Write(p []byte) (int, error) {
749	return 0, errors.New("UnwritableWriter.Write() always fails")
750}
751
752type UnwritableWriteHandler struct{}
753
754func (w *UnwritableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
755	return &UnwritableWriter{}, nil
756}
757
758func (w *UnwritableWriteHandler) Close(ctx context.Context, name string) error {
759	return nil
760}
761
762type UnclosableWriter struct{}
763
764func (w *UnclosableWriter) Write(p []byte) (int, error) {
765	return len(p), nil
766}
767
768type UnclosableWriteHandler struct{}
769
770func (w *UnclosableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
771	return &UnclosableWriter{}, nil
772}
773
774func (w *UnclosableWriteHandler) Close(ctx context.Context, name string) error {
775	return errors.New("UnclosableWriteHandler.Close() always fails")
776}
777
778type TestReadHandler struct {
779	buf  string
780	name string // This service can handle one name only.
781}
782
783// GetWriter() returns an io.ReaderAt to accept reads from the given name.
784func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
785	if r.name == "" {
786		r.name = name
787	} else if r.name != name {
788		return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
789	}
790	return bytes.NewReader([]byte(r.buf)), nil
791}
792
793// Close does nothing.
794func (r *TestReadHandler) Close(ctx context.Context, name string) error {
795	return nil
796}
797
798type UngettableReadHandler struct{}
799
800func (r *UngettableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
801	return nil, errors.New("UngettableReadHandler.GetReader() always fails")
802}
803
804func (r *UngettableReadHandler) Close(ctx context.Context, name string) error {
805	return nil
806}
807
808type UnreadableReader struct{}
809
810func (r *UnreadableReader) ReadAt(p []byte, offset int64) (int, error) {
811	return 0, errors.New("UnreadableReader.ReadAt() always fails")
812}
813
814type UnreadableReadHandler struct{}
815
816func (r *UnreadableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
817	return &UnreadableReader{}, nil
818}
819
820func (r *UnreadableReadHandler) Close(ctx context.Context, name string) error {
821	return nil
822}
823
824type UnclosableReadHandler struct {
825	buf string
826}
827
828func (r *UnclosableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
829	return bytes.NewReader([]byte(r.buf)), nil
830}
831
832func (r *UnclosableReadHandler) Close(ctx context.Context, name string) error {
833	return fmt.Errorf("UnclosableReader.Close(%s) always fails", name)
834}
835
836func registerServer() {
837	gsrv := grpc.NewServer()
838	var err error
839	server, err = NewServer(gsrv, &TestReadHandler{}, &TestWriteHandler{})
840	if err != nil {
841		log.Fatalf("NewServer() failed: %v", err)
842	}
843}
844
845func setupServer(readHandler ReadHandler, writeHandler WriteHandler) {
846	setupServerOnce.Do(registerServer)
847	server.status = make(map[string]*pb.QueryWriteStatusResponse)
848	server.readHandler = readHandler
849	server.writeHandler = writeHandler
850}
851