1// Copyright 2016 Google LLC. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package internal 6 7import ( 8 "bytes" 9 "context" 10 "errors" 11 "fmt" 12 "io" 13 "log" 14 "sync" 15 "testing" 16 17 "google.golang.org/grpc" 18 19 pb "google.golang.org/genproto/googleapis/bytestream" 20) 21 22const ( 23 testName = "testName" 24 testData = "0123456789" 25) 26 27var ( 28 setupServerOnce sync.Once 29 server *Server 30) 31 32func TestNewServerWithInvalidInputs(t *testing.T) { 33 _, err := NewServer(grpc.NewServer(), nil, nil) 34 if err == nil { 35 t.Fatal("NewServer(nil, nil) should not succeed") 36 } 37} 38 39func TestServerWrite(t *testing.T) { 40 testCases := []struct { 41 name string 42 writeHandler WriteHandler 43 input []interface{} 44 writeCount int 45 allowEmptyCommits bool 46 allowOverwrite bool 47 wantErr bool 48 wantResponse int 49 }{ 50 { 51 name: "empty resource name", 52 writeHandler: &TestWriteHandler{}, 53 input: []interface{}{ 54 &pb.WriteRequest{ 55 FinishWrite: true, 56 Data: []byte(testData), 57 }, 58 }, 59 writeCount: 1, 60 wantErr: true, 61 wantResponse: 0, 62 }, { 63 name: "Recv returns io.EOF", 64 writeHandler: &TestWriteHandler{}, 65 input: []interface{}{ 66 io.EOF, 67 }, 68 writeCount: 1, 69 wantErr: false, 70 wantResponse: 0, 71 }, { 72 name: "Recv returns error, 0 WriteRequests", 73 writeHandler: &TestWriteHandler{}, 74 input: []interface{}{ 75 errors.New("Recv returns error, 0 WriteRequests"), 76 }, 77 writeCount: 1, 78 wantErr: true, 79 wantResponse: 0, 80 }, { 81 name: "simple test", 82 writeHandler: &TestWriteHandler{}, 83 input: []interface{}{ 84 &pb.WriteRequest{ 85 ResourceName: testName, 86 WriteOffset: 0, 87 FinishWrite: true, 88 Data: []byte(testData), 89 }, 90 io.EOF, 91 }, 92 writeCount: 1, 93 wantResponse: 1, 94 }, { 95 name: "Recv returns error, 1 WriteRequests", 96 writeHandler: &TestWriteHandler{}, 97 input: []interface{}{ 98 &pb.WriteRequest{ 99 ResourceName: testName, 100 WriteOffset: 0, 101 FinishWrite: false, 102 Data: []byte(testData), 103 }, 104 errors.New("Recv returns error, 1 WriteRequests"), 105 }, 106 writeCount: 1, 107 wantErr: true, 108 wantResponse: 0, 109 }, { 110 name: "attempt to overwrite the same name", 111 writeHandler: &TestWriteHandler{}, 112 input: []interface{}{ 113 &pb.WriteRequest{ 114 ResourceName: testName, 115 WriteOffset: 0, 116 FinishWrite: true, 117 Data: []byte(testData), 118 }, 119 io.EOF, 120 &pb.WriteRequest{ 121 ResourceName: testName, 122 WriteOffset: 0, 123 FinishWrite: true, 124 Data: []byte(testData), 125 }, 126 }, 127 writeCount: 2, 128 wantErr: true, 129 wantResponse: 1, 130 }, { 131 name: "overwrite with the same name + AllowOverwrite", 132 writeHandler: &TestWriteHandler{}, 133 input: []interface{}{ 134 &pb.WriteRequest{ 135 ResourceName: testName, 136 WriteOffset: 0, 137 FinishWrite: true, 138 Data: []byte(testData), 139 }, 140 io.EOF, 141 &pb.WriteRequest{ 142 ResourceName: testName, 143 WriteOffset: 0, 144 FinishWrite: true, 145 Data: []byte(testData), 146 }, 147 io.EOF, 148 }, 149 writeCount: 2, 150 allowOverwrite: true, 151 wantResponse: 2, 152 }, { 153 name: "two WriteRequests - 1st is empty", 154 writeHandler: &TestWriteHandler{}, 155 input: []interface{}{ 156 &pb.WriteRequest{ 157 ResourceName: testName, 158 WriteOffset: 0, 159 FinishWrite: false, 160 Data: nil, 161 }, 162 &pb.WriteRequest{ 163 ResourceName: testName, 164 WriteOffset: 0, 165 FinishWrite: true, 166 Data: []byte(testData), 167 }, 168 io.EOF, 169 }, 170 writeCount: 1, 171 wantResponse: 1, 172 allowEmptyCommits: true, 173 }, { 174 name: "two WriteRequests - 2nd is empty", 175 writeHandler: &TestWriteHandler{}, 176 input: []interface{}{ 177 &pb.WriteRequest{ 178 ResourceName: testName, 179 WriteOffset: 0, 180 FinishWrite: false, 181 Data: []byte(testData), 182 }, 183 &pb.WriteRequest{ 184 ResourceName: testName, 185 WriteOffset: int64(len(testData)), 186 FinishWrite: true, 187 Data: nil, 188 }, 189 io.EOF, 190 }, 191 writeCount: 1, 192 wantResponse: 1, 193 }, { 194 name: "two WriteRequests - all empty", 195 writeHandler: &TestWriteHandler{}, 196 input: []interface{}{ 197 &pb.WriteRequest{ 198 ResourceName: testName, 199 WriteOffset: 0, 200 FinishWrite: false, 201 Data: nil, 202 }, 203 &pb.WriteRequest{ 204 ResourceName: testName, 205 WriteOffset: 0, 206 FinishWrite: true, 207 Data: nil, 208 }, 209 }, 210 writeCount: 1, 211 wantErr: true, 212 wantResponse: 1, 213 allowEmptyCommits: true, 214 }, { 215 name: "two WriteRequests - varying offset", 216 writeHandler: &TestWriteHandler{}, 217 input: []interface{}{ 218 &pb.WriteRequest{ 219 ResourceName: testName, 220 WriteOffset: 100, 221 FinishWrite: false, 222 Data: []byte(testData), 223 }, 224 &pb.WriteRequest{ 225 ResourceName: testName, 226 WriteOffset: 100 + int64(len(testData)), 227 FinishWrite: true, 228 Data: []byte(testData), 229 }, 230 io.EOF, 231 }, 232 writeCount: 1, 233 wantResponse: 1, 234 }, { 235 name: "two WriteRequests - disjoint offset", 236 writeHandler: &TestWriteHandler{}, 237 input: []interface{}{ 238 &pb.WriteRequest{ 239 ResourceName: testName, 240 WriteOffset: 100, 241 FinishWrite: false, 242 Data: []byte(testData), 243 }, 244 &pb.WriteRequest{ 245 ResourceName: testName, 246 WriteOffset: 200, 247 FinishWrite: true, 248 Data: []byte(testData), 249 }, 250 }, 251 writeCount: 1, 252 wantErr: true, 253 wantResponse: 0, 254 }, { 255 name: "fails with UngettableWriteHandler", 256 writeHandler: &UngettableWriteHandler{}, 257 input: []interface{}{ 258 &pb.WriteRequest{ 259 ResourceName: testName, 260 WriteOffset: 0, 261 FinishWrite: true, 262 Data: []byte(testData), 263 }, 264 }, 265 writeCount: 1, 266 wantErr: true, 267 }, { 268 name: "fails with UnwritableWriteHandler", 269 writeHandler: &UnwritableWriteHandler{}, 270 input: []interface{}{ 271 &pb.WriteRequest{ 272 ResourceName: testName, 273 WriteOffset: 0, 274 FinishWrite: true, 275 Data: []byte(testData), 276 }, 277 }, 278 writeCount: 1, 279 wantErr: true, 280 }, { 281 name: "fails with UnclosableWriteHandler", 282 writeHandler: &UnclosableWriteHandler{}, 283 input: []interface{}{ 284 &pb.WriteRequest{ 285 ResourceName: testName, 286 WriteOffset: 0, 287 FinishWrite: true, 288 Data: []byte(testData), 289 }, 290 }, 291 writeCount: 1, 292 wantErr: true, 293 wantResponse: 1, 294 }, { 295 name: "fails with nil WriteHandler", 296 writeHandler: nil, 297 input: []interface{}{ 298 &pb.WriteRequest{ 299 ResourceName: testName, 300 WriteOffset: 0, 301 FinishWrite: true, 302 Data: []byte(testData), 303 }, 304 }, 305 writeCount: 1, 306 wantErr: true, 307 }, 308 } 309 310 ctx := context.Background() 311 for _, tc := range testCases { 312 readHandler := &TestReadHandler{} 313 if tc.writeHandler != nil { 314 readHandler = nil 315 } 316 setupServer(readHandler, tc.writeHandler) 317 server.AllowOverwrite = tc.allowOverwrite 318 var requestCount, responseCount int 319 var err error 320 321 for i := 0; i < tc.writeCount; i++ { 322 err = server.rpc.Write(&fakeWriteServerImpl{ 323 ctx: ctx, 324 receiver: func() (*pb.WriteRequest, error) { 325 if requestCount >= len(tc.input) { 326 t.Fatalf("%s: got %d call(s) to Recv, want %d from len(input)", tc.name, requestCount+1, len(tc.input)) 327 } 328 v := tc.input[requestCount] 329 requestCount++ 330 request, ok := v.(*pb.WriteRequest) 331 if ok { 332 return request, nil 333 } 334 err, ok := v.(error) 335 if !ok { 336 t.Fatalf("%s: unknown input: %v", tc.name, v) 337 } 338 return nil, err 339 }, 340 sender: func(response *pb.WriteResponse) error { 341 if !tc.allowEmptyCommits && response.CommittedSize == 0 { 342 t.Fatalf("%s: invalid response: WriteResponse %v", tc.name, response) 343 } 344 responseCount++ 345 return nil 346 }, 347 }) 348 gotErr := (err != nil) 349 if i+1 < tc.writeCount { 350 if gotErr { 351 t.Errorf("%s: Write got err=%v, wantErr=%t, but on Write[%d/%d]. Error should not happen until last call to Write.", tc.name, err, tc.wantErr, i+1, tc.writeCount) 352 break // The t.Errorf conditions below may erroneously fire, pay them no mind. 353 } 354 } else if gotErr != tc.wantErr { 355 t.Errorf("%s: Write got err=%v, wantErr=%t", tc.name, err, tc.wantErr) 356 break // The t.Errorf conditions below may erroneously fire, pay them no mind. 357 } 358 } 359 if requestCount != len(tc.input) { 360 t.Errorf("%s: got %d call(s) to Recv, want %d", tc.name, requestCount, len(tc.input)) 361 } 362 if responseCount != tc.wantResponse { 363 t.Errorf("%s: got %d call(s) to SendProto, want %d", tc.name, responseCount, tc.wantResponse) 364 } 365 } 366} 367 368func TestServerWrite_SendAndCloseError(t *testing.T) { 369 const ( 370 wantRequest = 2 371 wantResponse = 1 372 ) 373 374 ctx := context.Background() 375 setupServer(nil, &TestWriteHandler{}) 376 var requestCount, responseCount int 377 378 err := server.rpc.Write(&fakeWriteServerImpl{ 379 ctx: ctx, 380 receiver: func() (*pb.WriteRequest, error) { 381 if requestCount >= wantRequest { 382 t.Fatalf("got %d call(s) to Recv, want %d", requestCount+1, wantRequest) 383 } 384 requestCount++ 385 return &pb.WriteRequest{ 386 ResourceName: testName, 387 WriteOffset: 0, 388 FinishWrite: true, 389 Data: []byte(testData), 390 }, nil 391 }, 392 sender: func(response *pb.WriteResponse) error { 393 responseCount++ 394 return errors.New("TestServerWrite SendProto error") 395 }, 396 }) 397 if err == nil { 398 t.Errorf("Write should have failed, but succeeded") 399 } 400 if requestCount != wantRequest { 401 t.Errorf("got %d call(s) to Recv, want %d", requestCount, wantRequest) 402 } 403 if responseCount != wantResponse { 404 t.Errorf("got %d call(s) to SendProto, want %d", responseCount, wantResponse) 405 } 406} 407 408func TestQueryWriteStatus(t *testing.T) { 409 testCases := []struct { 410 name string 411 existingName string 412 requestName string 413 wantErr bool 414 }{ 415 { 416 name: "existing name should work", 417 existingName: testName, 418 requestName: testName, 419 }, { 420 name: "missing name should break", 421 existingName: testName, 422 requestName: "invalidName", 423 wantErr: true, 424 }, 425 } 426 427 ctx := context.Background() 428 for _, tc := range testCases { 429 setupServer(nil, &TestWriteHandler{}) 430 server.status[tc.existingName] = &pb.QueryWriteStatusResponse{} 431 432 _, err := server.rpc.QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{ 433 ResourceName: tc.requestName, 434 }) 435 436 if gotErr := (err != nil); gotErr != tc.wantErr { 437 t.Errorf("%s: QueryWriteStatus(%q) got err=%v, wantErr=%t", tc.name, tc.requestName, err, tc.wantErr) 438 } 439 } 440} 441 442func TestServerRead(t *testing.T) { 443 testCases := []struct { 444 name string 445 readHandler ReadHandler 446 input *pb.ReadRequest 447 readCount int 448 wantErr bool 449 wantResponse []string 450 }{ 451 { 452 name: "empty resource name", 453 readHandler: &TestReadHandler{}, 454 input: &pb.ReadRequest{ 455 ReadLimit: 1, 456 }, 457 readCount: 1, 458 wantErr: true, 459 wantResponse: []string{}, 460 }, { 461 name: "test ReadLimit=-1", 462 readHandler: &TestReadHandler{buf: testData}, 463 input: &pb.ReadRequest{ 464 ResourceName: testName, 465 ReadOffset: 0, 466 ReadLimit: -1, 467 }, 468 readCount: 1, 469 wantErr: true, 470 }, { 471 name: "test ReadLimit=1", 472 readHandler: &TestReadHandler{buf: testData}, 473 input: &pb.ReadRequest{ 474 ResourceName: testName, 475 ReadOffset: 0, 476 ReadLimit: 1, 477 }, 478 readCount: 1, 479 wantResponse: []string{"0"}, 480 }, { 481 name: "test ReadLimit=2", 482 readHandler: &TestReadHandler{buf: testData}, 483 input: &pb.ReadRequest{ 484 ResourceName: testName, 485 ReadOffset: 0, 486 ReadLimit: 2, 487 }, 488 readCount: 1, 489 wantResponse: []string{"01"}, 490 }, { 491 name: "test ReadOffset=1 ReadLimit=2", 492 readHandler: &TestReadHandler{buf: testData}, 493 input: &pb.ReadRequest{ 494 ResourceName: testName, 495 ReadOffset: 1, 496 ReadLimit: 2, 497 }, 498 readCount: 1, 499 wantResponse: []string{"12"}, 500 }, { 501 name: "test ReadOffset=2 ReadLimit=2", 502 readHandler: &TestReadHandler{buf: testData}, 503 input: &pb.ReadRequest{ 504 ResourceName: testName, 505 ReadOffset: 2, 506 ReadLimit: 2, 507 }, 508 readCount: 1, 509 wantResponse: []string{"23"}, 510 }, { 511 name: "read all testData at exactly the limit", 512 readHandler: &TestReadHandler{buf: testData}, 513 input: &pb.ReadRequest{ 514 ResourceName: testName, 515 ReadOffset: 0, 516 ReadLimit: int64(len(testData)), 517 }, 518 readCount: 1, 519 wantResponse: []string{"0123456789"}, 520 }, { 521 name: "read all testData", 522 readHandler: &TestReadHandler{buf: testData}, 523 input: &pb.ReadRequest{ 524 ResourceName: testName, 525 ReadOffset: 0, 526 ReadLimit: int64(len(testData)) * 2, 527 }, 528 readCount: 1, 529 wantResponse: []string{"0123456789"}, 530 }, { 531 name: "read all testData 2 times", 532 readHandler: &TestReadHandler{buf: testData}, 533 input: &pb.ReadRequest{ 534 ResourceName: testName, 535 ReadOffset: 0, 536 ReadLimit: int64(len(testData)) * 2, 537 }, 538 readCount: 2, 539 wantResponse: []string{"0123456789", "0123456789"}, 540 }, { 541 name: "test ReadLimit=0", 542 readHandler: &TestReadHandler{buf: testData}, 543 input: &pb.ReadRequest{ 544 ResourceName: testName, 545 ReadOffset: 0, 546 ReadLimit: 0, 547 }, 548 readCount: 1, 549 wantResponse: []string{"0123456789"}, 550 }, { 551 name: "test ReadLimit=1000", 552 readHandler: &TestReadHandler{buf: testData}, 553 input: &pb.ReadRequest{ 554 ResourceName: testName, 555 ReadOffset: 0, 556 ReadLimit: 1000, 557 }, 558 readCount: 1, 559 wantResponse: []string{"0123456789"}, 560 }, { 561 name: "fails with UngettableReadHandler", 562 readHandler: &UngettableReadHandler{}, 563 input: &pb.ReadRequest{ 564 ResourceName: testName, 565 ReadOffset: 0, 566 ReadLimit: int64(len(testData)), 567 }, 568 readCount: 1, 569 wantErr: true, 570 }, { 571 name: "fails with UnreadableReadHandler", 572 readHandler: &UnreadableReadHandler{}, 573 input: &pb.ReadRequest{ 574 ResourceName: testName, 575 ReadOffset: 0, 576 ReadLimit: int64(len(testData)), 577 }, 578 readCount: 1, 579 wantErr: true, 580 }, { 581 name: "fails with UnclosableReadHandler", 582 readHandler: &UnclosableReadHandler{buf: testData}, 583 input: &pb.ReadRequest{ 584 ResourceName: testName, 585 ReadOffset: 0, 586 ReadLimit: int64(len(testData)) * 2, 587 }, 588 readCount: 1, 589 wantErr: true, 590 wantResponse: []string{"0123456789"}, 591 }, { 592 name: "fails with nil ReadRequest", 593 readHandler: &TestReadHandler{buf: testData}, 594 readCount: 1, 595 wantErr: true, 596 }, { 597 name: "fails with nil ReadHandler", 598 readHandler: nil, 599 readCount: 1, 600 wantErr: true, 601 }, 602 } 603 604 ctx := context.Background() 605 for _, tc := range testCases { 606 var writeHandler WriteHandler 607 if tc.readHandler == nil { 608 writeHandler = &TestWriteHandler{} 609 } 610 setupServer(tc.readHandler, writeHandler) 611 var responseCount int 612 var err error 613 614 for i := 0; i < tc.readCount; i++ { 615 err = server.rpc.Read(tc.input, &fakeReadServerImpl{ 616 ctx: ctx, 617 sender: func(response *pb.ReadResponse) error { 618 if responseCount >= len(tc.wantResponse) { 619 t.Fatalf("%s: got %d call(s) to Send(), want %d", tc.name, responseCount+1, len(tc.wantResponse)) 620 } 621 if got, want := string(response.Data), tc.wantResponse[responseCount]; got != want { 622 t.Fatalf("%s: response[%d] got %q, want %q", tc.name, responseCount, got, want) 623 } 624 responseCount++ 625 return nil 626 }, 627 }) 628 gotErr := (err != nil) 629 if i+1 < tc.readCount { 630 if gotErr { 631 t.Errorf("%s: Read got err=%v, wantErr=%t, but on Read[%d/%d]. Error should not happen until last call to Read", tc.name, err, tc.wantErr, i+1, tc.readCount) 632 break 633 } 634 } else if gotErr != tc.wantErr { 635 t.Errorf("%s: Read got err=%v, wantErr=%t", tc.name, err, tc.wantErr) 636 break 637 } 638 } 639 if responseCount != len(tc.wantResponse) { 640 t.Errorf("%s: got %d call(s) to Send, want %d", tc.name, responseCount, len(tc.wantResponse)) 641 } 642 } 643} 644 645func TestServerRead_SendError(t *testing.T) { 646 setupServer(&TestReadHandler{buf: testData}, nil) 647 648 err := server.rpc.Read(&pb.ReadRequest{ 649 ResourceName: testName, 650 ReadOffset: 0, 651 ReadLimit: int64(len(testData)) * 2, 652 }, &fakeReadServerImpl{ 653 ctx: context.Background(), 654 sender: func(response *pb.ReadResponse) error { 655 if string(response.Data) != testData { 656 t.Fatalf("Send: got %v, want %q", response, testData) 657 } 658 return errors.New("TestServerRead Send() error") 659 }, 660 }) 661 662 if err == nil { 663 t.Fatal("Read() should have failed, but succeeded") 664 } 665} 666 667type fakeWriteServerImpl struct { 668 pb.ByteStream_WriteServer 669 ctx context.Context 670 receiver func() (*pb.WriteRequest, error) 671 sender func(*pb.WriteResponse) error 672} 673 674func (fake *fakeWriteServerImpl) Context() context.Context { 675 return fake.ctx 676} 677 678func (fake *fakeWriteServerImpl) Recv() (*pb.WriteRequest, error) { 679 return fake.receiver() 680} 681 682func (fake *fakeWriteServerImpl) SendMsg(m interface{}) error { 683 return fake.sender(m.(*pb.WriteResponse)) 684} 685 686func (fake *fakeWriteServerImpl) SendAndClose(m *pb.WriteResponse) error { 687 fake.sender(m) 688 return nil 689} 690 691type fakeReadServerImpl struct { 692 pb.ByteStream_ReadServer 693 ctx context.Context 694 sender func(*pb.ReadResponse) error 695} 696 697func (fake *fakeReadServerImpl) Context() context.Context { 698 return fake.ctx 699} 700 701func (fake *fakeReadServerImpl) Send(response *pb.ReadResponse) error { 702 return fake.sender(response) 703} 704 705type TestWriteHandler struct { 706 buf bytes.Buffer // bytes.Buffer implements io.Writer 707 name string // This service can handle one name only. 708} 709 710func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 711 if w.name == "" { 712 w.name = name 713 } else if w.name != name { 714 return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name) 715 } 716 // initOffset is ignored. 717 return &w.buf, nil 718} 719 720func (w *TestWriteHandler) Close(ctx context.Context, name string) error { 721 w.name = "" 722 w.buf.Reset() 723 return nil 724} 725 726type UngettableWriteHandler struct{} 727 728func (w *UngettableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 729 return nil, errors.New("UngettableWriteHandler.GetWriter() always fails") 730} 731 732func (w *UngettableWriteHandler) Close(ctx context.Context, name string) error { 733 return nil 734} 735 736type UnwritableWriter struct{} 737 738func (w *UnwritableWriter) Write(p []byte) (int, error) { 739 return 0, errors.New("UnwritableWriter.Write() always fails") 740} 741 742type UnwritableWriteHandler struct{} 743 744func (w *UnwritableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 745 return &UnwritableWriter{}, nil 746} 747 748func (w *UnwritableWriteHandler) Close(ctx context.Context, name string) error { 749 return nil 750} 751 752type UnclosableWriter struct{} 753 754func (w *UnclosableWriter) Write(p []byte) (int, error) { 755 return len(p), nil 756} 757 758type UnclosableWriteHandler struct{} 759 760func (w *UnclosableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 761 return &UnclosableWriter{}, nil 762} 763 764func (w *UnclosableWriteHandler) Close(ctx context.Context, name string) error { 765 return errors.New("UnclosableWriteHandler.Close() always fails") 766} 767 768type TestReadHandler struct { 769 buf string 770 name string // This service can handle one name only. 771} 772 773// GetWriter() returns an io.ReaderAt to accept reads from the given name. 774func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 775 if r.name == "" { 776 r.name = name 777 } else if r.name != name { 778 return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name) 779 } 780 return bytes.NewReader([]byte(r.buf)), nil 781} 782 783// Close does nothing. 784func (r *TestReadHandler) Close(ctx context.Context, name string) error { 785 return nil 786} 787 788type UngettableReadHandler struct{} 789 790func (r *UngettableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 791 return nil, errors.New("UngettableReadHandler.GetReader() always fails") 792} 793 794func (r *UngettableReadHandler) Close(ctx context.Context, name string) error { 795 return nil 796} 797 798type UnreadableReader struct{} 799 800func (r *UnreadableReader) ReadAt(p []byte, offset int64) (int, error) { 801 return 0, errors.New("UnreadableReader.ReadAt() always fails") 802} 803 804type UnreadableReadHandler struct{} 805 806func (r *UnreadableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 807 return &UnreadableReader{}, nil 808} 809 810func (r *UnreadableReadHandler) Close(ctx context.Context, name string) error { 811 return nil 812} 813 814type UnclosableReadHandler struct { 815 buf string 816} 817 818func (r *UnclosableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 819 return bytes.NewReader([]byte(r.buf)), nil 820} 821 822func (r *UnclosableReadHandler) Close(ctx context.Context, name string) error { 823 return fmt.Errorf("UnclosableReader.Close(%s) always fails", name) 824} 825 826func registerServer() { 827 gsrv := grpc.NewServer() 828 var err error 829 server, err = NewServer(gsrv, &TestReadHandler{}, &TestWriteHandler{}) 830 if err != nil { 831 log.Fatalf("NewServer() failed: %v", err) 832 } 833} 834 835func setupServer(readHandler ReadHandler, writeHandler WriteHandler) { 836 setupServerOnce.Do(registerServer) 837 server.status = make(map[string]*pb.QueryWriteStatusResponse) 838 server.readHandler = readHandler 839 server.writeHandler = writeHandler 840} 841