1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package internal 16 17import ( 18 "bytes" 19 "context" 20 "errors" 21 "fmt" 22 "io" 23 "log" 24 "sync" 25 "testing" 26 27 "google.golang.org/grpc" 28 29 pb "google.golang.org/genproto/googleapis/bytestream" 30) 31 32const ( 33 testName = "testName" 34 testData = "0123456789" 35) 36 37var ( 38 setupServerOnce sync.Once 39 server *Server 40) 41 42func TestNewServerWithInvalidInputs(t *testing.T) { 43 _, err := NewServer(grpc.NewServer(), nil, nil) 44 if err == nil { 45 t.Fatal("NewServer(nil, nil) should not succeed") 46 } 47} 48 49func TestServerWrite(t *testing.T) { 50 testCases := []struct { 51 name string 52 writeHandler WriteHandler 53 input []interface{} 54 writeCount int 55 allowEmptyCommits bool 56 allowOverwrite bool 57 wantErr bool 58 wantResponse int 59 }{ 60 { 61 name: "empty resource name", 62 writeHandler: &TestWriteHandler{}, 63 input: []interface{}{ 64 &pb.WriteRequest{ 65 FinishWrite: true, 66 Data: []byte(testData), 67 }, 68 }, 69 writeCount: 1, 70 wantErr: true, 71 wantResponse: 0, 72 }, { 73 name: "Recv returns io.EOF", 74 writeHandler: &TestWriteHandler{}, 75 input: []interface{}{ 76 io.EOF, 77 }, 78 writeCount: 1, 79 wantErr: false, 80 wantResponse: 0, 81 }, { 82 name: "Recv returns error, 0 WriteRequests", 83 writeHandler: &TestWriteHandler{}, 84 input: []interface{}{ 85 errors.New("Recv returns error, 0 WriteRequests"), 86 }, 87 writeCount: 1, 88 wantErr: true, 89 wantResponse: 0, 90 }, { 91 name: "simple test", 92 writeHandler: &TestWriteHandler{}, 93 input: []interface{}{ 94 &pb.WriteRequest{ 95 ResourceName: testName, 96 WriteOffset: 0, 97 FinishWrite: true, 98 Data: []byte(testData), 99 }, 100 io.EOF, 101 }, 102 writeCount: 1, 103 wantResponse: 1, 104 }, { 105 name: "Recv returns error, 1 WriteRequests", 106 writeHandler: &TestWriteHandler{}, 107 input: []interface{}{ 108 &pb.WriteRequest{ 109 ResourceName: testName, 110 WriteOffset: 0, 111 FinishWrite: false, 112 Data: []byte(testData), 113 }, 114 errors.New("Recv returns error, 1 WriteRequests"), 115 }, 116 writeCount: 1, 117 wantErr: true, 118 wantResponse: 0, 119 }, { 120 name: "attempt to overwrite the same name", 121 writeHandler: &TestWriteHandler{}, 122 input: []interface{}{ 123 &pb.WriteRequest{ 124 ResourceName: testName, 125 WriteOffset: 0, 126 FinishWrite: true, 127 Data: []byte(testData), 128 }, 129 io.EOF, 130 &pb.WriteRequest{ 131 ResourceName: testName, 132 WriteOffset: 0, 133 FinishWrite: true, 134 Data: []byte(testData), 135 }, 136 }, 137 writeCount: 2, 138 wantErr: true, 139 wantResponse: 1, 140 }, { 141 name: "overwrite with the same name + AllowOverwrite", 142 writeHandler: &TestWriteHandler{}, 143 input: []interface{}{ 144 &pb.WriteRequest{ 145 ResourceName: testName, 146 WriteOffset: 0, 147 FinishWrite: true, 148 Data: []byte(testData), 149 }, 150 io.EOF, 151 &pb.WriteRequest{ 152 ResourceName: testName, 153 WriteOffset: 0, 154 FinishWrite: true, 155 Data: []byte(testData), 156 }, 157 io.EOF, 158 }, 159 writeCount: 2, 160 allowOverwrite: true, 161 wantResponse: 2, 162 }, { 163 name: "two WriteRequests - 1st is empty", 164 writeHandler: &TestWriteHandler{}, 165 input: []interface{}{ 166 &pb.WriteRequest{ 167 ResourceName: testName, 168 WriteOffset: 0, 169 FinishWrite: false, 170 Data: nil, 171 }, 172 &pb.WriteRequest{ 173 ResourceName: testName, 174 WriteOffset: 0, 175 FinishWrite: true, 176 Data: []byte(testData), 177 }, 178 io.EOF, 179 }, 180 writeCount: 1, 181 wantResponse: 1, 182 allowEmptyCommits: true, 183 }, { 184 name: "two WriteRequests - 2nd is empty", 185 writeHandler: &TestWriteHandler{}, 186 input: []interface{}{ 187 &pb.WriteRequest{ 188 ResourceName: testName, 189 WriteOffset: 0, 190 FinishWrite: false, 191 Data: []byte(testData), 192 }, 193 &pb.WriteRequest{ 194 ResourceName: testName, 195 WriteOffset: int64(len(testData)), 196 FinishWrite: true, 197 Data: nil, 198 }, 199 io.EOF, 200 }, 201 writeCount: 1, 202 wantResponse: 1, 203 }, { 204 name: "two WriteRequests - all empty", 205 writeHandler: &TestWriteHandler{}, 206 input: []interface{}{ 207 &pb.WriteRequest{ 208 ResourceName: testName, 209 WriteOffset: 0, 210 FinishWrite: false, 211 Data: nil, 212 }, 213 &pb.WriteRequest{ 214 ResourceName: testName, 215 WriteOffset: 0, 216 FinishWrite: true, 217 Data: nil, 218 }, 219 }, 220 writeCount: 1, 221 wantErr: true, 222 wantResponse: 1, 223 allowEmptyCommits: true, 224 }, { 225 name: "two WriteRequests - varying offset", 226 writeHandler: &TestWriteHandler{}, 227 input: []interface{}{ 228 &pb.WriteRequest{ 229 ResourceName: testName, 230 WriteOffset: 100, 231 FinishWrite: false, 232 Data: []byte(testData), 233 }, 234 &pb.WriteRequest{ 235 ResourceName: testName, 236 WriteOffset: 100 + int64(len(testData)), 237 FinishWrite: true, 238 Data: []byte(testData), 239 }, 240 io.EOF, 241 }, 242 writeCount: 1, 243 wantResponse: 1, 244 }, { 245 name: "two WriteRequests - disjoint offset", 246 writeHandler: &TestWriteHandler{}, 247 input: []interface{}{ 248 &pb.WriteRequest{ 249 ResourceName: testName, 250 WriteOffset: 100, 251 FinishWrite: false, 252 Data: []byte(testData), 253 }, 254 &pb.WriteRequest{ 255 ResourceName: testName, 256 WriteOffset: 200, 257 FinishWrite: true, 258 Data: []byte(testData), 259 }, 260 }, 261 writeCount: 1, 262 wantErr: true, 263 wantResponse: 0, 264 }, { 265 name: "fails with UngettableWriteHandler", 266 writeHandler: &UngettableWriteHandler{}, 267 input: []interface{}{ 268 &pb.WriteRequest{ 269 ResourceName: testName, 270 WriteOffset: 0, 271 FinishWrite: true, 272 Data: []byte(testData), 273 }, 274 }, 275 writeCount: 1, 276 wantErr: true, 277 }, { 278 name: "fails with UnwritableWriteHandler", 279 writeHandler: &UnwritableWriteHandler{}, 280 input: []interface{}{ 281 &pb.WriteRequest{ 282 ResourceName: testName, 283 WriteOffset: 0, 284 FinishWrite: true, 285 Data: []byte(testData), 286 }, 287 }, 288 writeCount: 1, 289 wantErr: true, 290 }, { 291 name: "fails with UnclosableWriteHandler", 292 writeHandler: &UnclosableWriteHandler{}, 293 input: []interface{}{ 294 &pb.WriteRequest{ 295 ResourceName: testName, 296 WriteOffset: 0, 297 FinishWrite: true, 298 Data: []byte(testData), 299 }, 300 }, 301 writeCount: 1, 302 wantErr: true, 303 wantResponse: 1, 304 }, { 305 name: "fails with nil WriteHandler", 306 writeHandler: nil, 307 input: []interface{}{ 308 &pb.WriteRequest{ 309 ResourceName: testName, 310 WriteOffset: 0, 311 FinishWrite: true, 312 Data: []byte(testData), 313 }, 314 }, 315 writeCount: 1, 316 wantErr: true, 317 }, 318 } 319 320 ctx := context.Background() 321 for _, tc := range testCases { 322 readHandler := &TestReadHandler{} 323 if tc.writeHandler != nil { 324 readHandler = nil 325 } 326 setupServer(readHandler, tc.writeHandler) 327 server.AllowOverwrite = tc.allowOverwrite 328 var requestCount, responseCount int 329 var err error 330 331 for i := 0; i < tc.writeCount; i++ { 332 err = server.rpc.Write(&fakeWriteServerImpl{ 333 ctx: ctx, 334 receiver: func() (*pb.WriteRequest, error) { 335 if requestCount >= len(tc.input) { 336 t.Fatalf("%s: got %d call(s) to Recv, want %d from len(input)", tc.name, requestCount+1, len(tc.input)) 337 } 338 v := tc.input[requestCount] 339 requestCount++ 340 request, ok := v.(*pb.WriteRequest) 341 if ok { 342 return request, nil 343 } 344 err, ok := v.(error) 345 if !ok { 346 t.Fatalf("%s: unknown input: %v", tc.name, v) 347 } 348 return nil, err 349 }, 350 sender: func(response *pb.WriteResponse) error { 351 if !tc.allowEmptyCommits && response.CommittedSize == 0 { 352 t.Fatalf("%s: invalid response: WriteResponse %v", tc.name, response) 353 } 354 responseCount++ 355 return nil 356 }, 357 }) 358 gotErr := (err != nil) 359 if i+1 < tc.writeCount { 360 if gotErr { 361 t.Errorf("%s: Write got err=%v, wantErr=%t, but on Write[%d/%d]. Error should not happen until last call to Write.", tc.name, err, tc.wantErr, i+1, tc.writeCount) 362 break // The t.Errorf conditions below may erroneously fire, pay them no mind. 363 } 364 } else if gotErr != tc.wantErr { 365 t.Errorf("%s: Write got err=%v, wantErr=%t", tc.name, err, tc.wantErr) 366 break // The t.Errorf conditions below may erroneously fire, pay them no mind. 367 } 368 } 369 if requestCount != len(tc.input) { 370 t.Errorf("%s: got %d call(s) to Recv, want %d", tc.name, requestCount, len(tc.input)) 371 } 372 if responseCount != tc.wantResponse { 373 t.Errorf("%s: got %d call(s) to SendProto, want %d", tc.name, responseCount, tc.wantResponse) 374 } 375 } 376} 377 378func TestServerWrite_SendAndCloseError(t *testing.T) { 379 const ( 380 wantRequest = 2 381 wantResponse = 1 382 ) 383 384 ctx := context.Background() 385 setupServer(nil, &TestWriteHandler{}) 386 var requestCount, responseCount int 387 388 err := server.rpc.Write(&fakeWriteServerImpl{ 389 ctx: ctx, 390 receiver: func() (*pb.WriteRequest, error) { 391 if requestCount >= wantRequest { 392 t.Fatalf("got %d call(s) to Recv, want %d", requestCount+1, wantRequest) 393 } 394 requestCount++ 395 return &pb.WriteRequest{ 396 ResourceName: testName, 397 WriteOffset: 0, 398 FinishWrite: true, 399 Data: []byte(testData), 400 }, nil 401 }, 402 sender: func(response *pb.WriteResponse) error { 403 responseCount++ 404 return errors.New("TestServerWrite SendProto error") 405 }, 406 }) 407 if err == nil { 408 t.Errorf("Write should have failed, but succeeded") 409 } 410 if requestCount != wantRequest { 411 t.Errorf("got %d call(s) to Recv, want %d", requestCount, wantRequest) 412 } 413 if responseCount != wantResponse { 414 t.Errorf("got %d call(s) to SendProto, want %d", responseCount, wantResponse) 415 } 416} 417 418func TestQueryWriteStatus(t *testing.T) { 419 testCases := []struct { 420 name string 421 existingName string 422 requestName string 423 wantErr bool 424 }{ 425 { 426 name: "existing name should work", 427 existingName: testName, 428 requestName: testName, 429 }, { 430 name: "missing name should break", 431 existingName: testName, 432 requestName: "invalidName", 433 wantErr: true, 434 }, 435 } 436 437 ctx := context.Background() 438 for _, tc := range testCases { 439 setupServer(nil, &TestWriteHandler{}) 440 server.status[tc.existingName] = &pb.QueryWriteStatusResponse{} 441 442 _, err := server.rpc.QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{ 443 ResourceName: tc.requestName, 444 }) 445 446 if gotErr := (err != nil); gotErr != tc.wantErr { 447 t.Errorf("%s: QueryWriteStatus(%q) got err=%v, wantErr=%t", tc.name, tc.requestName, err, tc.wantErr) 448 } 449 } 450} 451 452func TestServerRead(t *testing.T) { 453 testCases := []struct { 454 name string 455 readHandler ReadHandler 456 input *pb.ReadRequest 457 readCount int 458 wantErr bool 459 wantResponse []string 460 }{ 461 { 462 name: "empty resource name", 463 readHandler: &TestReadHandler{}, 464 input: &pb.ReadRequest{ 465 ReadLimit: 1, 466 }, 467 readCount: 1, 468 wantErr: true, 469 wantResponse: []string{}, 470 }, { 471 name: "test ReadLimit=-1", 472 readHandler: &TestReadHandler{buf: testData}, 473 input: &pb.ReadRequest{ 474 ResourceName: testName, 475 ReadOffset: 0, 476 ReadLimit: -1, 477 }, 478 readCount: 1, 479 wantErr: true, 480 }, { 481 name: "test ReadLimit=1", 482 readHandler: &TestReadHandler{buf: testData}, 483 input: &pb.ReadRequest{ 484 ResourceName: testName, 485 ReadOffset: 0, 486 ReadLimit: 1, 487 }, 488 readCount: 1, 489 wantResponse: []string{"0"}, 490 }, { 491 name: "test ReadLimit=2", 492 readHandler: &TestReadHandler{buf: testData}, 493 input: &pb.ReadRequest{ 494 ResourceName: testName, 495 ReadOffset: 0, 496 ReadLimit: 2, 497 }, 498 readCount: 1, 499 wantResponse: []string{"01"}, 500 }, { 501 name: "test ReadOffset=1 ReadLimit=2", 502 readHandler: &TestReadHandler{buf: testData}, 503 input: &pb.ReadRequest{ 504 ResourceName: testName, 505 ReadOffset: 1, 506 ReadLimit: 2, 507 }, 508 readCount: 1, 509 wantResponse: []string{"12"}, 510 }, { 511 name: "test ReadOffset=2 ReadLimit=2", 512 readHandler: &TestReadHandler{buf: testData}, 513 input: &pb.ReadRequest{ 514 ResourceName: testName, 515 ReadOffset: 2, 516 ReadLimit: 2, 517 }, 518 readCount: 1, 519 wantResponse: []string{"23"}, 520 }, { 521 name: "read all testData at exactly the limit", 522 readHandler: &TestReadHandler{buf: testData}, 523 input: &pb.ReadRequest{ 524 ResourceName: testName, 525 ReadOffset: 0, 526 ReadLimit: int64(len(testData)), 527 }, 528 readCount: 1, 529 wantResponse: []string{"0123456789"}, 530 }, { 531 name: "read all testData", 532 readHandler: &TestReadHandler{buf: testData}, 533 input: &pb.ReadRequest{ 534 ResourceName: testName, 535 ReadOffset: 0, 536 ReadLimit: int64(len(testData)) * 2, 537 }, 538 readCount: 1, 539 wantResponse: []string{"0123456789"}, 540 }, { 541 name: "read all testData 2 times", 542 readHandler: &TestReadHandler{buf: testData}, 543 input: &pb.ReadRequest{ 544 ResourceName: testName, 545 ReadOffset: 0, 546 ReadLimit: int64(len(testData)) * 2, 547 }, 548 readCount: 2, 549 wantResponse: []string{"0123456789", "0123456789"}, 550 }, { 551 name: "test ReadLimit=0", 552 readHandler: &TestReadHandler{buf: testData}, 553 input: &pb.ReadRequest{ 554 ResourceName: testName, 555 ReadOffset: 0, 556 ReadLimit: 0, 557 }, 558 readCount: 1, 559 wantResponse: []string{"0123456789"}, 560 }, { 561 name: "test ReadLimit=1000", 562 readHandler: &TestReadHandler{buf: testData}, 563 input: &pb.ReadRequest{ 564 ResourceName: testName, 565 ReadOffset: 0, 566 ReadLimit: 1000, 567 }, 568 readCount: 1, 569 wantResponse: []string{"0123456789"}, 570 }, { 571 name: "fails with UngettableReadHandler", 572 readHandler: &UngettableReadHandler{}, 573 input: &pb.ReadRequest{ 574 ResourceName: testName, 575 ReadOffset: 0, 576 ReadLimit: int64(len(testData)), 577 }, 578 readCount: 1, 579 wantErr: true, 580 }, { 581 name: "fails with UnreadableReadHandler", 582 readHandler: &UnreadableReadHandler{}, 583 input: &pb.ReadRequest{ 584 ResourceName: testName, 585 ReadOffset: 0, 586 ReadLimit: int64(len(testData)), 587 }, 588 readCount: 1, 589 wantErr: true, 590 }, { 591 name: "fails with UnclosableReadHandler", 592 readHandler: &UnclosableReadHandler{buf: testData}, 593 input: &pb.ReadRequest{ 594 ResourceName: testName, 595 ReadOffset: 0, 596 ReadLimit: int64(len(testData)) * 2, 597 }, 598 readCount: 1, 599 wantErr: true, 600 wantResponse: []string{"0123456789"}, 601 }, { 602 name: "fails with nil ReadRequest", 603 readHandler: &TestReadHandler{buf: testData}, 604 readCount: 1, 605 wantErr: true, 606 }, { 607 name: "fails with nil ReadHandler", 608 readHandler: nil, 609 readCount: 1, 610 wantErr: true, 611 }, 612 } 613 614 ctx := context.Background() 615 for _, tc := range testCases { 616 var writeHandler WriteHandler 617 if tc.readHandler == nil { 618 writeHandler = &TestWriteHandler{} 619 } 620 setupServer(tc.readHandler, writeHandler) 621 var responseCount int 622 var err error 623 624 for i := 0; i < tc.readCount; i++ { 625 err = server.rpc.Read(tc.input, &fakeReadServerImpl{ 626 ctx: ctx, 627 sender: func(response *pb.ReadResponse) error { 628 if responseCount >= len(tc.wantResponse) { 629 t.Fatalf("%s: got %d call(s) to Send(), want %d", tc.name, responseCount+1, len(tc.wantResponse)) 630 } 631 if got, want := string(response.Data), tc.wantResponse[responseCount]; got != want { 632 t.Fatalf("%s: response[%d] got %q, want %q", tc.name, responseCount, got, want) 633 } 634 responseCount++ 635 return nil 636 }, 637 }) 638 gotErr := (err != nil) 639 if i+1 < tc.readCount { 640 if gotErr { 641 t.Errorf("%s: Read got err=%v, wantErr=%t, but on Read[%d/%d]. Error should not happen until last call to Read", tc.name, err, tc.wantErr, i+1, tc.readCount) 642 break 643 } 644 } else if gotErr != tc.wantErr { 645 t.Errorf("%s: Read got err=%v, wantErr=%t", tc.name, err, tc.wantErr) 646 break 647 } 648 } 649 if responseCount != len(tc.wantResponse) { 650 t.Errorf("%s: got %d call(s) to Send, want %d", tc.name, responseCount, len(tc.wantResponse)) 651 } 652 } 653} 654 655func TestServerRead_SendError(t *testing.T) { 656 setupServer(&TestReadHandler{buf: testData}, nil) 657 658 err := server.rpc.Read(&pb.ReadRequest{ 659 ResourceName: testName, 660 ReadOffset: 0, 661 ReadLimit: int64(len(testData)) * 2, 662 }, &fakeReadServerImpl{ 663 ctx: context.Background(), 664 sender: func(response *pb.ReadResponse) error { 665 if string(response.Data) != testData { 666 t.Fatalf("Send: got %v, want %q", response, testData) 667 } 668 return errors.New("TestServerRead Send() error") 669 }, 670 }) 671 672 if err == nil { 673 t.Fatal("Read() should have failed, but succeeded") 674 } 675} 676 677type fakeWriteServerImpl struct { 678 pb.ByteStream_WriteServer 679 ctx context.Context 680 receiver func() (*pb.WriteRequest, error) 681 sender func(*pb.WriteResponse) error 682} 683 684func (fake *fakeWriteServerImpl) Context() context.Context { 685 return fake.ctx 686} 687 688func (fake *fakeWriteServerImpl) Recv() (*pb.WriteRequest, error) { 689 return fake.receiver() 690} 691 692func (fake *fakeWriteServerImpl) SendMsg(m interface{}) error { 693 return fake.sender(m.(*pb.WriteResponse)) 694} 695 696func (fake *fakeWriteServerImpl) SendAndClose(m *pb.WriteResponse) error { 697 fake.sender(m) 698 return nil 699} 700 701type fakeReadServerImpl struct { 702 pb.ByteStream_ReadServer 703 ctx context.Context 704 sender func(*pb.ReadResponse) error 705} 706 707func (fake *fakeReadServerImpl) Context() context.Context { 708 return fake.ctx 709} 710 711func (fake *fakeReadServerImpl) Send(response *pb.ReadResponse) error { 712 return fake.sender(response) 713} 714 715type TestWriteHandler struct { 716 buf bytes.Buffer // bytes.Buffer implements io.Writer 717 name string // This service can handle one name only. 718} 719 720func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 721 if w.name == "" { 722 w.name = name 723 } else if w.name != name { 724 return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name) 725 } 726 // initOffset is ignored. 727 return &w.buf, nil 728} 729 730func (w *TestWriteHandler) Close(ctx context.Context, name string) error { 731 w.name = "" 732 w.buf.Reset() 733 return nil 734} 735 736type UngettableWriteHandler struct{} 737 738func (w *UngettableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 739 return nil, errors.New("UngettableWriteHandler.GetWriter() always fails") 740} 741 742func (w *UngettableWriteHandler) Close(ctx context.Context, name string) error { 743 return nil 744} 745 746type UnwritableWriter struct{} 747 748func (w *UnwritableWriter) Write(p []byte) (int, error) { 749 return 0, errors.New("UnwritableWriter.Write() always fails") 750} 751 752type UnwritableWriteHandler struct{} 753 754func (w *UnwritableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 755 return &UnwritableWriter{}, nil 756} 757 758func (w *UnwritableWriteHandler) Close(ctx context.Context, name string) error { 759 return nil 760} 761 762type UnclosableWriter struct{} 763 764func (w *UnclosableWriter) Write(p []byte) (int, error) { 765 return len(p), nil 766} 767 768type UnclosableWriteHandler struct{} 769 770func (w *UnclosableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) { 771 return &UnclosableWriter{}, nil 772} 773 774func (w *UnclosableWriteHandler) Close(ctx context.Context, name string) error { 775 return errors.New("UnclosableWriteHandler.Close() always fails") 776} 777 778type TestReadHandler struct { 779 buf string 780 name string // This service can handle one name only. 781} 782 783// GetWriter() returns an io.ReaderAt to accept reads from the given name. 784func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 785 if r.name == "" { 786 r.name = name 787 } else if r.name != name { 788 return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name) 789 } 790 return bytes.NewReader([]byte(r.buf)), nil 791} 792 793// Close does nothing. 794func (r *TestReadHandler) Close(ctx context.Context, name string) error { 795 return nil 796} 797 798type UngettableReadHandler struct{} 799 800func (r *UngettableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 801 return nil, errors.New("UngettableReadHandler.GetReader() always fails") 802} 803 804func (r *UngettableReadHandler) Close(ctx context.Context, name string) error { 805 return nil 806} 807 808type UnreadableReader struct{} 809 810func (r *UnreadableReader) ReadAt(p []byte, offset int64) (int, error) { 811 return 0, errors.New("UnreadableReader.ReadAt() always fails") 812} 813 814type UnreadableReadHandler struct{} 815 816func (r *UnreadableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 817 return &UnreadableReader{}, nil 818} 819 820func (r *UnreadableReadHandler) Close(ctx context.Context, name string) error { 821 return nil 822} 823 824type UnclosableReadHandler struct { 825 buf string 826} 827 828func (r *UnclosableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) { 829 return bytes.NewReader([]byte(r.buf)), nil 830} 831 832func (r *UnclosableReadHandler) Close(ctx context.Context, name string) error { 833 return fmt.Errorf("UnclosableReader.Close(%s) always fails", name) 834} 835 836func registerServer() { 837 gsrv := grpc.NewServer() 838 var err error 839 server, err = NewServer(gsrv, &TestReadHandler{}, &TestWriteHandler{}) 840 if err != nil { 841 log.Fatalf("NewServer() failed: %v", err) 842 } 843} 844 845func setupServer(readHandler ReadHandler, writeHandler WriteHandler) { 846 setupServerOnce.Do(registerServer) 847 server.status = make(map[string]*pb.QueryWriteStatusResponse) 848 server.readHandler = readHandler 849 server.writeHandler = writeHandler 850} 851