1// Copyright 2016 Google LLC.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package internal
6
7import (
8	"bytes"
9	"context"
10	"errors"
11	"fmt"
12	"io"
13	"log"
14	"sync"
15	"testing"
16
17	"google.golang.org/grpc"
18
19	pb "google.golang.org/genproto/googleapis/bytestream"
20)
21
22const (
23	testName = "testName"
24	testData = "0123456789"
25)
26
27var (
28	setupServerOnce sync.Once
29	server          *Server
30)
31
32func TestNewServerWithInvalidInputs(t *testing.T) {
33	_, err := NewServer(grpc.NewServer(), nil, nil)
34	if err == nil {
35		t.Fatal("NewServer(nil, nil) should not succeed")
36	}
37}
38
39func TestServerWrite(t *testing.T) {
40	testCases := []struct {
41		name              string
42		writeHandler      WriteHandler
43		input             []interface{}
44		writeCount        int
45		allowEmptyCommits bool
46		allowOverwrite    bool
47		wantErr           bool
48		wantResponse      int
49	}{
50		{
51			name:         "empty resource name",
52			writeHandler: &TestWriteHandler{},
53			input: []interface{}{
54				&pb.WriteRequest{
55					FinishWrite: true,
56					Data:        []byte(testData),
57				},
58			},
59			writeCount:   1,
60			wantErr:      true,
61			wantResponse: 0,
62		}, {
63			name:         "Recv returns io.EOF",
64			writeHandler: &TestWriteHandler{},
65			input: []interface{}{
66				io.EOF,
67			},
68			writeCount:   1,
69			wantErr:      false,
70			wantResponse: 0,
71		}, {
72			name:         "Recv returns error, 0 WriteRequests",
73			writeHandler: &TestWriteHandler{},
74			input: []interface{}{
75				errors.New("Recv returns error, 0 WriteRequests"),
76			},
77			writeCount:   1,
78			wantErr:      true,
79			wantResponse: 0,
80		}, {
81			name:         "simple test",
82			writeHandler: &TestWriteHandler{},
83			input: []interface{}{
84				&pb.WriteRequest{
85					ResourceName: testName,
86					WriteOffset:  0,
87					FinishWrite:  true,
88					Data:         []byte(testData),
89				},
90				io.EOF,
91			},
92			writeCount:   1,
93			wantResponse: 1,
94		}, {
95			name:         "Recv returns error, 1 WriteRequests",
96			writeHandler: &TestWriteHandler{},
97			input: []interface{}{
98				&pb.WriteRequest{
99					ResourceName: testName,
100					WriteOffset:  0,
101					FinishWrite:  false,
102					Data:         []byte(testData),
103				},
104				errors.New("Recv returns error, 1 WriteRequests"),
105			},
106			writeCount:   1,
107			wantErr:      true,
108			wantResponse: 0,
109		}, {
110			name:         "attempt to overwrite the same name",
111			writeHandler: &TestWriteHandler{},
112			input: []interface{}{
113				&pb.WriteRequest{
114					ResourceName: testName,
115					WriteOffset:  0,
116					FinishWrite:  true,
117					Data:         []byte(testData),
118				},
119				io.EOF,
120				&pb.WriteRequest{
121					ResourceName: testName,
122					WriteOffset:  0,
123					FinishWrite:  true,
124					Data:         []byte(testData),
125				},
126			},
127			writeCount:   2,
128			wantErr:      true,
129			wantResponse: 1,
130		}, {
131			name:         "overwrite with the same name + AllowOverwrite",
132			writeHandler: &TestWriteHandler{},
133			input: []interface{}{
134				&pb.WriteRequest{
135					ResourceName: testName,
136					WriteOffset:  0,
137					FinishWrite:  true,
138					Data:         []byte(testData),
139				},
140				io.EOF,
141				&pb.WriteRequest{
142					ResourceName: testName,
143					WriteOffset:  0,
144					FinishWrite:  true,
145					Data:         []byte(testData),
146				},
147				io.EOF,
148			},
149			writeCount:     2,
150			allowOverwrite: true,
151			wantResponse:   2,
152		}, {
153			name:         "two WriteRequests - 1st is empty",
154			writeHandler: &TestWriteHandler{},
155			input: []interface{}{
156				&pb.WriteRequest{
157					ResourceName: testName,
158					WriteOffset:  0,
159					FinishWrite:  false,
160					Data:         nil,
161				},
162				&pb.WriteRequest{
163					ResourceName: testName,
164					WriteOffset:  0,
165					FinishWrite:  true,
166					Data:         []byte(testData),
167				},
168				io.EOF,
169			},
170			writeCount:        1,
171			wantResponse:      1,
172			allowEmptyCommits: true,
173		}, {
174			name:         "two WriteRequests - 2nd is empty",
175			writeHandler: &TestWriteHandler{},
176			input: []interface{}{
177				&pb.WriteRequest{
178					ResourceName: testName,
179					WriteOffset:  0,
180					FinishWrite:  false,
181					Data:         []byte(testData),
182				},
183				&pb.WriteRequest{
184					ResourceName: testName,
185					WriteOffset:  int64(len(testData)),
186					FinishWrite:  true,
187					Data:         nil,
188				},
189				io.EOF,
190			},
191			writeCount:   1,
192			wantResponse: 1,
193		}, {
194			name:         "two WriteRequests - all empty",
195			writeHandler: &TestWriteHandler{},
196			input: []interface{}{
197				&pb.WriteRequest{
198					ResourceName: testName,
199					WriteOffset:  0,
200					FinishWrite:  false,
201					Data:         nil,
202				},
203				&pb.WriteRequest{
204					ResourceName: testName,
205					WriteOffset:  0,
206					FinishWrite:  true,
207					Data:         nil,
208				},
209			},
210			writeCount:        1,
211			wantErr:           true,
212			wantResponse:      1,
213			allowEmptyCommits: true,
214		}, {
215			name:         "two WriteRequests - varying offset",
216			writeHandler: &TestWriteHandler{},
217			input: []interface{}{
218				&pb.WriteRequest{
219					ResourceName: testName,
220					WriteOffset:  100,
221					FinishWrite:  false,
222					Data:         []byte(testData),
223				},
224				&pb.WriteRequest{
225					ResourceName: testName,
226					WriteOffset:  100 + int64(len(testData)),
227					FinishWrite:  true,
228					Data:         []byte(testData),
229				},
230				io.EOF,
231			},
232			writeCount:   1,
233			wantResponse: 1,
234		}, {
235			name:         "two WriteRequests - disjoint offset",
236			writeHandler: &TestWriteHandler{},
237			input: []interface{}{
238				&pb.WriteRequest{
239					ResourceName: testName,
240					WriteOffset:  100,
241					FinishWrite:  false,
242					Data:         []byte(testData),
243				},
244				&pb.WriteRequest{
245					ResourceName: testName,
246					WriteOffset:  200,
247					FinishWrite:  true,
248					Data:         []byte(testData),
249				},
250			},
251			writeCount:   1,
252			wantErr:      true,
253			wantResponse: 0,
254		}, {
255			name:         "fails with UngettableWriteHandler",
256			writeHandler: &UngettableWriteHandler{},
257			input: []interface{}{
258				&pb.WriteRequest{
259					ResourceName: testName,
260					WriteOffset:  0,
261					FinishWrite:  true,
262					Data:         []byte(testData),
263				},
264			},
265			writeCount: 1,
266			wantErr:    true,
267		}, {
268			name:         "fails with UnwritableWriteHandler",
269			writeHandler: &UnwritableWriteHandler{},
270			input: []interface{}{
271				&pb.WriteRequest{
272					ResourceName: testName,
273					WriteOffset:  0,
274					FinishWrite:  true,
275					Data:         []byte(testData),
276				},
277			},
278			writeCount: 1,
279			wantErr:    true,
280		}, {
281			name:         "fails with UnclosableWriteHandler",
282			writeHandler: &UnclosableWriteHandler{},
283			input: []interface{}{
284				&pb.WriteRequest{
285					ResourceName: testName,
286					WriteOffset:  0,
287					FinishWrite:  true,
288					Data:         []byte(testData),
289				},
290			},
291			writeCount:   1,
292			wantErr:      true,
293			wantResponse: 1,
294		}, {
295			name:         "fails with nil WriteHandler",
296			writeHandler: nil,
297			input: []interface{}{
298				&pb.WriteRequest{
299					ResourceName: testName,
300					WriteOffset:  0,
301					FinishWrite:  true,
302					Data:         []byte(testData),
303				},
304			},
305			writeCount: 1,
306			wantErr:    true,
307		},
308	}
309
310	ctx := context.Background()
311	for _, tc := range testCases {
312		readHandler := &TestReadHandler{}
313		if tc.writeHandler != nil {
314			readHandler = nil
315		}
316		setupServer(readHandler, tc.writeHandler)
317		server.AllowOverwrite = tc.allowOverwrite
318		var requestCount, responseCount int
319		var err error
320
321		for i := 0; i < tc.writeCount; i++ {
322			err = server.rpc.Write(&fakeWriteServerImpl{
323				ctx: ctx,
324				receiver: func() (*pb.WriteRequest, error) {
325					if requestCount >= len(tc.input) {
326						t.Fatalf("%s: got %d call(s) to Recv, want %d from len(input)", tc.name, requestCount+1, len(tc.input))
327					}
328					v := tc.input[requestCount]
329					requestCount++
330					request, ok := v.(*pb.WriteRequest)
331					if ok {
332						return request, nil
333					}
334					err, ok := v.(error)
335					if !ok {
336						t.Fatalf("%s: unknown input: %v", tc.name, v)
337					}
338					return nil, err
339				},
340				sender: func(response *pb.WriteResponse) error {
341					if !tc.allowEmptyCommits && response.CommittedSize == 0 {
342						t.Fatalf("%s: invalid response: WriteResponse %v", tc.name, response)
343					}
344					responseCount++
345					return nil
346				},
347			})
348			gotErr := (err != nil)
349			if i+1 < tc.writeCount {
350				if gotErr {
351					t.Errorf("%s: Write got err=%v, wantErr=%t, but on Write[%d/%d]. Error should not happen until last call to Write.", tc.name, err, tc.wantErr, i+1, tc.writeCount)
352					break // The t.Errorf conditions below may erroneously fire, pay them no mind.
353				}
354			} else if gotErr != tc.wantErr {
355				t.Errorf("%s: Write got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
356				break // The t.Errorf conditions below may erroneously fire, pay them no mind.
357			}
358		}
359		if requestCount != len(tc.input) {
360			t.Errorf("%s: got %d call(s) to Recv, want %d", tc.name, requestCount, len(tc.input))
361		}
362		if responseCount != tc.wantResponse {
363			t.Errorf("%s: got %d call(s) to SendProto, want %d", tc.name, responseCount, tc.wantResponse)
364		}
365	}
366}
367
368func TestServerWrite_SendAndCloseError(t *testing.T) {
369	const (
370		wantRequest  = 2
371		wantResponse = 1
372	)
373
374	ctx := context.Background()
375	setupServer(nil, &TestWriteHandler{})
376	var requestCount, responseCount int
377
378	err := server.rpc.Write(&fakeWriteServerImpl{
379		ctx: ctx,
380		receiver: func() (*pb.WriteRequest, error) {
381			if requestCount >= wantRequest {
382				t.Fatalf("got %d call(s) to Recv, want %d", requestCount+1, wantRequest)
383			}
384			requestCount++
385			return &pb.WriteRequest{
386				ResourceName: testName,
387				WriteOffset:  0,
388				FinishWrite:  true,
389				Data:         []byte(testData),
390			}, nil
391		},
392		sender: func(response *pb.WriteResponse) error {
393			responseCount++
394			return errors.New("TestServerWrite SendProto error")
395		},
396	})
397	if err == nil {
398		t.Errorf("Write should have failed, but succeeded")
399	}
400	if requestCount != wantRequest {
401		t.Errorf("got %d call(s) to Recv, want %d", requestCount, wantRequest)
402	}
403	if responseCount != wantResponse {
404		t.Errorf("got %d call(s) to SendProto, want %d", responseCount, wantResponse)
405	}
406}
407
408func TestQueryWriteStatus(t *testing.T) {
409	testCases := []struct {
410		name         string
411		existingName string
412		requestName  string
413		wantErr      bool
414	}{
415		{
416			name:         "existing name should work",
417			existingName: testName,
418			requestName:  testName,
419		}, {
420			name:         "missing name should break",
421			existingName: testName,
422			requestName:  "invalidName",
423			wantErr:      true,
424		},
425	}
426
427	ctx := context.Background()
428	for _, tc := range testCases {
429		setupServer(nil, &TestWriteHandler{})
430		server.status[tc.existingName] = &pb.QueryWriteStatusResponse{}
431
432		_, err := server.rpc.QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{
433			ResourceName: tc.requestName,
434		})
435
436		if gotErr := (err != nil); gotErr != tc.wantErr {
437			t.Errorf("%s: QueryWriteStatus(%q) got err=%v, wantErr=%t", tc.name, tc.requestName, err, tc.wantErr)
438		}
439	}
440}
441
442func TestServerRead(t *testing.T) {
443	testCases := []struct {
444		name         string
445		readHandler  ReadHandler
446		input        *pb.ReadRequest
447		readCount    int
448		wantErr      bool
449		wantResponse []string
450	}{
451		{
452			name:        "empty resource name",
453			readHandler: &TestReadHandler{},
454			input: &pb.ReadRequest{
455				ReadLimit: 1,
456			},
457			readCount:    1,
458			wantErr:      true,
459			wantResponse: []string{},
460		}, {
461			name:        "test ReadLimit=-1",
462			readHandler: &TestReadHandler{buf: testData},
463			input: &pb.ReadRequest{
464				ResourceName: testName,
465				ReadOffset:   0,
466				ReadLimit:    -1,
467			},
468			readCount: 1,
469			wantErr:   true,
470		}, {
471			name:        "test ReadLimit=1",
472			readHandler: &TestReadHandler{buf: testData},
473			input: &pb.ReadRequest{
474				ResourceName: testName,
475				ReadOffset:   0,
476				ReadLimit:    1,
477			},
478			readCount:    1,
479			wantResponse: []string{"0"},
480		}, {
481			name:        "test ReadLimit=2",
482			readHandler: &TestReadHandler{buf: testData},
483			input: &pb.ReadRequest{
484				ResourceName: testName,
485				ReadOffset:   0,
486				ReadLimit:    2,
487			},
488			readCount:    1,
489			wantResponse: []string{"01"},
490		}, {
491			name:        "test ReadOffset=1 ReadLimit=2",
492			readHandler: &TestReadHandler{buf: testData},
493			input: &pb.ReadRequest{
494				ResourceName: testName,
495				ReadOffset:   1,
496				ReadLimit:    2,
497			},
498			readCount:    1,
499			wantResponse: []string{"12"},
500		}, {
501			name:        "test ReadOffset=2 ReadLimit=2",
502			readHandler: &TestReadHandler{buf: testData},
503			input: &pb.ReadRequest{
504				ResourceName: testName,
505				ReadOffset:   2,
506				ReadLimit:    2,
507			},
508			readCount:    1,
509			wantResponse: []string{"23"},
510		}, {
511			name:        "read all testData at exactly the limit",
512			readHandler: &TestReadHandler{buf: testData},
513			input: &pb.ReadRequest{
514				ResourceName: testName,
515				ReadOffset:   0,
516				ReadLimit:    int64(len(testData)),
517			},
518			readCount:    1,
519			wantResponse: []string{"0123456789"},
520		}, {
521			name:        "read all testData",
522			readHandler: &TestReadHandler{buf: testData},
523			input: &pb.ReadRequest{
524				ResourceName: testName,
525				ReadOffset:   0,
526				ReadLimit:    int64(len(testData)) * 2,
527			},
528			readCount:    1,
529			wantResponse: []string{"0123456789"},
530		}, {
531			name:        "read all testData 2 times",
532			readHandler: &TestReadHandler{buf: testData},
533			input: &pb.ReadRequest{
534				ResourceName: testName,
535				ReadOffset:   0,
536				ReadLimit:    int64(len(testData)) * 2,
537			},
538			readCount:    2,
539			wantResponse: []string{"0123456789", "0123456789"},
540		}, {
541			name:        "test ReadLimit=0",
542			readHandler: &TestReadHandler{buf: testData},
543			input: &pb.ReadRequest{
544				ResourceName: testName,
545				ReadOffset:   0,
546				ReadLimit:    0,
547			},
548			readCount:    1,
549			wantResponse: []string{"0123456789"},
550		}, {
551			name:        "test ReadLimit=1000",
552			readHandler: &TestReadHandler{buf: testData},
553			input: &pb.ReadRequest{
554				ResourceName: testName,
555				ReadOffset:   0,
556				ReadLimit:    1000,
557			},
558			readCount:    1,
559			wantResponse: []string{"0123456789"},
560		}, {
561			name:        "fails with UngettableReadHandler",
562			readHandler: &UngettableReadHandler{},
563			input: &pb.ReadRequest{
564				ResourceName: testName,
565				ReadOffset:   0,
566				ReadLimit:    int64(len(testData)),
567			},
568			readCount: 1,
569			wantErr:   true,
570		}, {
571			name:        "fails with UnreadableReadHandler",
572			readHandler: &UnreadableReadHandler{},
573			input: &pb.ReadRequest{
574				ResourceName: testName,
575				ReadOffset:   0,
576				ReadLimit:    int64(len(testData)),
577			},
578			readCount: 1,
579			wantErr:   true,
580		}, {
581			name:        "fails with UnclosableReadHandler",
582			readHandler: &UnclosableReadHandler{buf: testData},
583			input: &pb.ReadRequest{
584				ResourceName: testName,
585				ReadOffset:   0,
586				ReadLimit:    int64(len(testData)) * 2,
587			},
588			readCount:    1,
589			wantErr:      true,
590			wantResponse: []string{"0123456789"},
591		}, {
592			name:        "fails with nil ReadRequest",
593			readHandler: &TestReadHandler{buf: testData},
594			readCount:   1,
595			wantErr:     true,
596		}, {
597			name:        "fails with nil ReadHandler",
598			readHandler: nil,
599			readCount:   1,
600			wantErr:     true,
601		},
602	}
603
604	ctx := context.Background()
605	for _, tc := range testCases {
606		var writeHandler WriteHandler
607		if tc.readHandler == nil {
608			writeHandler = &TestWriteHandler{}
609		}
610		setupServer(tc.readHandler, writeHandler)
611		var responseCount int
612		var err error
613
614		for i := 0; i < tc.readCount; i++ {
615			err = server.rpc.Read(tc.input, &fakeReadServerImpl{
616				ctx: ctx,
617				sender: func(response *pb.ReadResponse) error {
618					if responseCount >= len(tc.wantResponse) {
619						t.Fatalf("%s: got %d call(s) to Send(), want %d", tc.name, responseCount+1, len(tc.wantResponse))
620					}
621					if got, want := string(response.Data), tc.wantResponse[responseCount]; got != want {
622						t.Fatalf("%s: response[%d] got %q, want %q", tc.name, responseCount, got, want)
623					}
624					responseCount++
625					return nil
626				},
627			})
628			gotErr := (err != nil)
629			if i+1 < tc.readCount {
630				if gotErr {
631					t.Errorf("%s: Read got err=%v, wantErr=%t, but on Read[%d/%d]. Error should not happen until last call to Read", tc.name, err, tc.wantErr, i+1, tc.readCount)
632					break
633				}
634			} else if gotErr != tc.wantErr {
635				t.Errorf("%s: Read got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
636				break
637			}
638		}
639		if responseCount != len(tc.wantResponse) {
640			t.Errorf("%s: got %d call(s) to Send, want %d", tc.name, responseCount, len(tc.wantResponse))
641		}
642	}
643}
644
645func TestServerRead_SendError(t *testing.T) {
646	setupServer(&TestReadHandler{buf: testData}, nil)
647
648	err := server.rpc.Read(&pb.ReadRequest{
649		ResourceName: testName,
650		ReadOffset:   0,
651		ReadLimit:    int64(len(testData)) * 2,
652	}, &fakeReadServerImpl{
653		ctx: context.Background(),
654		sender: func(response *pb.ReadResponse) error {
655			if string(response.Data) != testData {
656				t.Fatalf("Send: got %v, want %q", response, testData)
657			}
658			return errors.New("TestServerRead Send() error")
659		},
660	})
661
662	if err == nil {
663		t.Fatal("Read() should have failed, but succeeded")
664	}
665}
666
667type fakeWriteServerImpl struct {
668	pb.ByteStream_WriteServer
669	ctx      context.Context
670	receiver func() (*pb.WriteRequest, error)
671	sender   func(*pb.WriteResponse) error
672}
673
674func (fake *fakeWriteServerImpl) Context() context.Context {
675	return fake.ctx
676}
677
678func (fake *fakeWriteServerImpl) Recv() (*pb.WriteRequest, error) {
679	return fake.receiver()
680}
681
682func (fake *fakeWriteServerImpl) SendMsg(m interface{}) error {
683	return fake.sender(m.(*pb.WriteResponse))
684}
685
686func (fake *fakeWriteServerImpl) SendAndClose(m *pb.WriteResponse) error {
687	fake.sender(m)
688	return nil
689}
690
691type fakeReadServerImpl struct {
692	pb.ByteStream_ReadServer
693	ctx    context.Context
694	sender func(*pb.ReadResponse) error
695}
696
697func (fake *fakeReadServerImpl) Context() context.Context {
698	return fake.ctx
699}
700
701func (fake *fakeReadServerImpl) Send(response *pb.ReadResponse) error {
702	return fake.sender(response)
703}
704
705type TestWriteHandler struct {
706	buf  bytes.Buffer // bytes.Buffer implements io.Writer
707	name string       // This service can handle one name only.
708}
709
710func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
711	if w.name == "" {
712		w.name = name
713	} else if w.name != name {
714		return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
715	}
716	// initOffset is ignored.
717	return &w.buf, nil
718}
719
720func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
721	w.name = ""
722	w.buf.Reset()
723	return nil
724}
725
726type UngettableWriteHandler struct{}
727
728func (w *UngettableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
729	return nil, errors.New("UngettableWriteHandler.GetWriter() always fails")
730}
731
732func (w *UngettableWriteHandler) Close(ctx context.Context, name string) error {
733	return nil
734}
735
736type UnwritableWriter struct{}
737
738func (w *UnwritableWriter) Write(p []byte) (int, error) {
739	return 0, errors.New("UnwritableWriter.Write() always fails")
740}
741
742type UnwritableWriteHandler struct{}
743
744func (w *UnwritableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
745	return &UnwritableWriter{}, nil
746}
747
748func (w *UnwritableWriteHandler) Close(ctx context.Context, name string) error {
749	return nil
750}
751
752type UnclosableWriter struct{}
753
754func (w *UnclosableWriter) Write(p []byte) (int, error) {
755	return len(p), nil
756}
757
758type UnclosableWriteHandler struct{}
759
760func (w *UnclosableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
761	return &UnclosableWriter{}, nil
762}
763
764func (w *UnclosableWriteHandler) Close(ctx context.Context, name string) error {
765	return errors.New("UnclosableWriteHandler.Close() always fails")
766}
767
768type TestReadHandler struct {
769	buf  string
770	name string // This service can handle one name only.
771}
772
773// GetWriter() returns an io.ReaderAt to accept reads from the given name.
774func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
775	if r.name == "" {
776		r.name = name
777	} else if r.name != name {
778		return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
779	}
780	return bytes.NewReader([]byte(r.buf)), nil
781}
782
783// Close does nothing.
784func (r *TestReadHandler) Close(ctx context.Context, name string) error {
785	return nil
786}
787
788type UngettableReadHandler struct{}
789
790func (r *UngettableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
791	return nil, errors.New("UngettableReadHandler.GetReader() always fails")
792}
793
794func (r *UngettableReadHandler) Close(ctx context.Context, name string) error {
795	return nil
796}
797
798type UnreadableReader struct{}
799
800func (r *UnreadableReader) ReadAt(p []byte, offset int64) (int, error) {
801	return 0, errors.New("UnreadableReader.ReadAt() always fails")
802}
803
804type UnreadableReadHandler struct{}
805
806func (r *UnreadableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
807	return &UnreadableReader{}, nil
808}
809
810func (r *UnreadableReadHandler) Close(ctx context.Context, name string) error {
811	return nil
812}
813
814type UnclosableReadHandler struct {
815	buf string
816}
817
818func (r *UnclosableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
819	return bytes.NewReader([]byte(r.buf)), nil
820}
821
822func (r *UnclosableReadHandler) Close(ctx context.Context, name string) error {
823	return fmt.Errorf("UnclosableReader.Close(%s) always fails", name)
824}
825
826func registerServer() {
827	gsrv := grpc.NewServer()
828	var err error
829	server, err = NewServer(gsrv, &TestReadHandler{}, &TestWriteHandler{})
830	if err != nil {
831		log.Fatalf("NewServer() failed: %v", err)
832	}
833}
834
835func setupServer(readHandler ReadHandler, writeHandler WriteHandler) {
836	setupServerOnce.Do(registerServer)
837	server.status = make(map[string]*pb.QueryWriteStatusResponse)
838	server.readHandler = readHandler
839	server.writeHandler = writeHandler
840}
841