1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigtable/row_reader.h"
16 #include "google/cloud/bigtable/table.h"
17 #include "google/cloud/bigtable/testing/mock_read_rows_reader.h"
18 #include "google/cloud/bigtable/testing/table_test_fixture.h"
19 #include "google/cloud/internal/api_client_header.h"
20 #include "google/cloud/internal/throw_delegate.h"
21 #include "google/cloud/testing_util/assert_ok.h"
22 #include "google/cloud/testing_util/capture_log_lines_backend.h"
23 #include "google/cloud/testing_util/validate_metadata.h"
24 #include "absl/memory/memory.h"
25 #include <gmock/gmock.h>
26 #include <deque>
27 #include <initializer_list>
28
29 using ::google::bigtable::v2::ReadRowsRequest;
30 using ::google::bigtable::v2::ReadRowsResponse_CellChunk;
31 using ::google::cloud::testing_util::IsContextMDValid;
32 using ::testing::_;
33 using ::testing::Contains;
34 using ::testing::DoAll;
35 using ::testing::Eq;
36 using ::testing::HasSubstr;
37 using ::testing::Matcher;
38 using ::testing::Not;
39 using ::testing::Property;
40 using ::testing::Return;
41 using ::testing::SetArgPointee;
42
43 namespace bigtable = google::cloud::bigtable;
44 using bigtable::Row;
45 using bigtable::testing::MockReadRowsReader;
46
47 namespace {
48 class ReadRowsParserMock : public bigtable::internal::ReadRowsParser {
49 public:
50 MOCK_METHOD2(HandleChunkHook,
51 void(ReadRowsResponse_CellChunk chunk, grpc::Status& status));
HandleChunk(ReadRowsResponse_CellChunk chunk,grpc::Status & status)52 void HandleChunk(ReadRowsResponse_CellChunk chunk,
53 grpc::Status& status) override {
54 HandleChunkHook(chunk, status);
55 }
56
57 MOCK_METHOD1(HandleEndOfStreamHook, void(grpc::Status& status));
HandleEndOfStream(grpc::Status & status)58 void HandleEndOfStream(grpc::Status& status) override {
59 HandleEndOfStreamHook(status);
60 }
61
HasNext() const62 bool HasNext() const override { return !rows_.empty(); }
63
Next(grpc::Status &)64 Row Next(grpc::Status&) override {
65 Row row = rows_.front();
66 rows_.pop_front();
67 return row;
68 }
69
SetRows(std::initializer_list<std::string> l)70 void SetRows(std::initializer_list<std::string> l) {
71 std::transform(l.begin(), l.end(), std::back_inserter(rows_),
72 [](std::string const& s) -> Row {
73 return Row(s, std::vector<bigtable::Cell>());
74 });
75 }
76
77 private:
78 std::deque<Row> rows_;
79 };
80
81 // Returns a preconfigured set of parsers, so expectations can be set on each.
82 class ReadRowsParserMockFactory
83 : public bigtable::internal::ReadRowsParserFactory {
84 using ParserPtr = std::unique_ptr<bigtable::internal::ReadRowsParser>;
85
86 public:
AddParser(ParserPtr parser)87 void AddParser(ParserPtr parser) { parsers_.emplace_back(std::move(parser)); }
88
89 // We only need a hook here because MOCK_METHOD0 would not add the
90 // 'override' keyword that a compiler warning expects for Create().
91 MOCK_METHOD0(CreateHook, void());
Create()92 ParserPtr Create() override {
93 CreateHook();
94 if (parsers_.empty()) {
95 return ParserPtr(new bigtable::internal::ReadRowsParser);
96 }
97 ParserPtr parser = std::move(parsers_.front());
98 parsers_.pop_front();
99 return parser;
100 }
101
102 private:
103 std::deque<ParserPtr> parsers_;
104 };
105
106 class RetryPolicyMock : public bigtable::RPCRetryPolicy {
107 public:
108 RetryPolicyMock() = default;
clone() const109 std::unique_ptr<RPCRetryPolicy> clone() const override {
110 google::cloud::internal::ThrowRuntimeError("Mocks cannot be copied.");
111 }
112
113 MOCK_CONST_METHOD1(SetupHook, void(grpc::ClientContext&));
Setup(grpc::ClientContext & context) const114 void Setup(grpc::ClientContext& context) const override {
115 SetupHook(context);
116 }
117
118 MOCK_METHOD1(OnFailureHook, bool(grpc::Status const& status));
OnFailure(grpc::Status const & status)119 bool OnFailure(grpc::Status const& status) override {
120 return OnFailureHook(status);
121 }
OnFailure(google::cloud::Status const &)122 bool OnFailure(google::cloud::Status const&) override { return true; }
123 };
124
125 class BackoffPolicyMock : public bigtable::RPCBackoffPolicy {
126 public:
127 BackoffPolicyMock() = default;
clone() const128 std::unique_ptr<RPCBackoffPolicy> clone() const override {
129 google::cloud::internal::ThrowRuntimeError("Mocks cannot be copied.");
130 }
Setup(grpc::ClientContext &) const131 void Setup(grpc::ClientContext&) const override {}
132 MOCK_METHOD1(OnCompletionHook,
133 std::chrono::milliseconds(grpc::Status const& s));
OnCompletion(grpc::Status const & s)134 std::chrono::milliseconds OnCompletion(grpc::Status const& s) override {
135 return OnCompletionHook(s);
136 }
OnCompletion(google::cloud::Status const &)137 std::chrono::milliseconds OnCompletion(
138 google::cloud::Status const&) override {
139 return std::chrono::milliseconds(0);
140 }
141 };
142
143 // Match the number of expected row keys in a request in EXPECT_CALL
RequestWithRowKeysCount(int n)144 Matcher<const ReadRowsRequest&> RequestWithRowKeysCount(int n) {
145 return Property(
146 &ReadRowsRequest::rows,
147 Property(&google::bigtable::v2::RowSet::row_keys_size, Eq(n)));
148 }
149
150 // Match the row limit in a request
RequestWithRowsLimit(std::int64_t n)151 Matcher<const ReadRowsRequest&> RequestWithRowsLimit(std::int64_t n) {
152 return Property(&ReadRowsRequest::rows_limit, Eq(n));
153 }
154
155 } // anonymous namespace
156
157 class RowReaderTest : public bigtable::testing::TableTestFixture {
158 public:
RowReaderTest()159 RowReaderTest()
160 : retry_policy_(new RetryPolicyMock),
161 backoff_policy_(new BackoffPolicyMock),
162 metadata_update_policy_(kTableName,
163 bigtable::MetadataParamTypes::TABLE_NAME),
164 parser_factory_(new ReadRowsParserMockFactory) {}
165
166 std::unique_ptr<RetryPolicyMock> retry_policy_;
167 std::unique_ptr<BackoffPolicyMock> backoff_policy_;
168 bigtable::MetadataUpdatePolicy metadata_update_policy_;
169 std::unique_ptr<ReadRowsParserMockFactory> parser_factory_;
170 };
171
TEST_F(RowReaderTest,EmptyReaderHasNoRows)172 TEST_F(RowReaderTest, EmptyReaderHasNoRows) {
173 // wrapped in unique_ptr by ReadRows
174 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
175 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
176 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
177 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
178
179 bigtable::RowReader reader(
180 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
181 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
182 std::move(backoff_policy_), metadata_update_policy_,
183 std::move(parser_factory_));
184
185 EXPECT_EQ(reader.begin(), reader.end());
186 }
187
TEST_F(RowReaderTest,ReadOneRow)188 TEST_F(RowReaderTest, ReadOneRow) {
189 // wrapped in unique_ptr by ReadRows
190 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
191 auto parser = absl::make_unique<ReadRowsParserMock>();
192 parser->SetRows({"r1"});
193 EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
194 {
195 testing::InSequence s;
196 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
197 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
198 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
199 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
200 }
201
202 parser_factory_->AddParser(std::move(parser));
203 bigtable::RowReader reader(
204 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
205 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
206 std::move(backoff_policy_), metadata_update_policy_,
207 std::move(parser_factory_));
208
209 auto it = reader.begin();
210 EXPECT_NE(it, reader.end());
211 ASSERT_STATUS_OK(*it);
212 EXPECT_EQ((*it)->row_key(), "r1");
213 EXPECT_EQ(++it, reader.end());
214 }
215
TEST_F(RowReaderTest,ReadOneRowAppProfileId)216 TEST_F(RowReaderTest, ReadOneRowAppProfileId) {
217 // wrapped in unique_ptr by ReadRows
218 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
219 auto parser = absl::make_unique<ReadRowsParserMock>();
220 parser->SetRows({"r1"});
221 EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
222 {
223 testing::InSequence s;
224 std::string expected_id = "test-id";
225 EXPECT_CALL(*client_, ReadRows(_, _))
226 .WillOnce([expected_id, &stream](grpc::ClientContext* context,
227 ReadRowsRequest const& req) {
228 EXPECT_STATUS_OK(
229 IsContextMDValid(*context, "google.bigtable.v2.Bigtable.ReadRows",
230 google::cloud::internal::ApiClientHeader()));
231 EXPECT_EQ(expected_id, req.app_profile_id());
232 return stream->AsUniqueMocked();
233 });
234 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
235 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
236 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
237 }
238
239 parser_factory_->AddParser(std::move(parser));
240 bigtable::RowReader reader(
241 client_, "test-id", "", bigtable::RowSet(),
242 bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
243 std::move(retry_policy_), std::move(backoff_policy_),
244 metadata_update_policy_, std::move(parser_factory_));
245
246 auto it = reader.begin();
247 EXPECT_NE(it, reader.end());
248 ASSERT_STATUS_OK(*it);
249 EXPECT_EQ((*it)->row_key(), "r1");
250 EXPECT_EQ(++it, reader.end());
251 }
252
TEST_F(RowReaderTest,ReadOneRowIteratorPostincrement)253 TEST_F(RowReaderTest, ReadOneRowIteratorPostincrement) {
254 // wrapped in unique_ptr by ReadRows
255 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
256 auto parser = absl::make_unique<ReadRowsParserMock>();
257 parser->SetRows({"r1"});
258 EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
259 {
260 testing::InSequence s;
261 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
262 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
263 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
264 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
265 }
266
267 parser_factory_->AddParser(std::move(parser));
268 bigtable::RowReader reader(
269 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
270 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
271 std::move(backoff_policy_), metadata_update_policy_,
272 std::move(parser_factory_));
273
274 auto it = reader.begin();
275 EXPECT_NE(it, reader.end());
276 // This postincrement is what we are testing
277 auto it2 = it++;
278 ASSERT_STATUS_OK(*it2);
279 EXPECT_EQ((*it2)->row_key(), "r1");
280 EXPECT_EQ(it, reader.end());
281 }
282
TEST_F(RowReaderTest,ReadOneOfTwoRowsClosesStream)283 TEST_F(RowReaderTest, ReadOneOfTwoRowsClosesStream) {
284 // wrapped in unique_ptr by ReadRows
285 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
286 auto parser = absl::make_unique<ReadRowsParserMock>();
287 parser->SetRows({"r1"});
288 {
289 testing::InSequence s;
290 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
291 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
292 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
293 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
294 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
295 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
296 }
297
298 parser_factory_->AddParser(std::move(parser));
299 bigtable::RowReader reader(
300 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
301 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
302 std::move(backoff_policy_), metadata_update_policy_,
303 std::move(parser_factory_));
304
305 auto it = reader.begin();
306 EXPECT_NE(it, reader.end());
307 ASSERT_STATUS_OK(*it);
308 EXPECT_EQ((*it)->row_key(), "r1");
309 EXPECT_NE(it, reader.end());
310 // Do not finish the iteration. We still expect the stream to be finalized,
311 // and the previously setup expectations on the mock `stream` check that.
312 }
313
TEST_F(RowReaderTest,FailedStreamIsRetried)314 TEST_F(RowReaderTest, FailedStreamIsRetried) {
315 // wrapped in unique_ptr by ReadRows
316 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
317 auto parser = absl::make_unique<ReadRowsParserMock>();
318 parser->SetRows({"r1"});
319 {
320 testing::InSequence s;
321 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
322 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
323 EXPECT_CALL(*stream, Finish())
324 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
325
326 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
327 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
328 .WillOnce(Return(std::chrono::milliseconds(0)));
329
330 // the stub will free it
331 auto* stream_retry =
332 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
333 EXPECT_CALL(*client_, ReadRows(_, _))
334 .WillOnce(stream_retry->MakeMockReturner());
335 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
336 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
337 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
338 }
339
340 parser_factory_->AddParser(std::move(parser));
341 bigtable::RowReader reader(
342 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
343 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
344 std::move(backoff_policy_), metadata_update_policy_,
345 std::move(parser_factory_));
346
347 auto it = reader.begin();
348 EXPECT_NE(it, reader.end());
349 ASSERT_STATUS_OK(*it);
350 EXPECT_EQ((*it)->row_key(), "r1");
351 EXPECT_EQ(++it, reader.end());
352 }
353
TEST_F(RowReaderTest,FailedStreamWithNoRetryThrowsNoExcept)354 TEST_F(RowReaderTest, FailedStreamWithNoRetryThrowsNoExcept) {
355 // wrapped in unique_ptr by ReadRows
356 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
357 auto parser = absl::make_unique<ReadRowsParserMock>();
358 {
359 testing::InSequence s;
360 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
361 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
362 EXPECT_CALL(*stream, Finish())
363 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
364
365 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(false));
366 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_)).Times(0);
367 }
368
369 parser_factory_->AddParser(std::move(parser));
370 bigtable::RowReader reader(
371 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
372 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
373 std::move(backoff_policy_), metadata_update_policy_,
374 std::move(parser_factory_));
375
376 auto it = reader.begin();
377 EXPECT_NE(it, reader.end());
378 EXPECT_FALSE(*it);
379 }
380
TEST_F(RowReaderTest,FailedStreamRetriesSkipAlreadyReadRows)381 TEST_F(RowReaderTest, FailedStreamRetriesSkipAlreadyReadRows) {
382 // wrapped in unique_ptr by ReadRows
383 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
384 auto parser = absl::make_unique<ReadRowsParserMock>();
385 parser->SetRows({"r1"});
386 {
387 testing::InSequence s;
388 // For sanity, check we have two rows in the initial request
389 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(2)))
390 .WillOnce(stream->MakeMockReturner());
391
392 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
393 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
394 EXPECT_CALL(*stream, Finish())
395 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
396
397 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
398 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
399 .WillOnce(Return(std::chrono::milliseconds(0)));
400
401 // the stub will free it
402 auto* stream_retry =
403 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
404 // First row should be removed from the retried request, leaving one row
405 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(1)))
406 .WillOnce(stream_retry->MakeMockReturner());
407 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
408 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
409 }
410
411 parser_factory_->AddParser(std::move(parser));
412 bigtable::RowReader reader(
413 client_, "", bigtable::RowSet("r1", "r2"),
414 bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
415 std::move(retry_policy_), std::move(backoff_policy_),
416 metadata_update_policy_, std::move(parser_factory_));
417
418 auto it = reader.begin();
419 EXPECT_NE(it, reader.end());
420 ASSERT_STATUS_OK(*it);
421 EXPECT_EQ((*it)->row_key(), "r1");
422 EXPECT_EQ(++it, reader.end());
423 }
424
TEST_F(RowReaderTest,FailedParseIsRetried)425 TEST_F(RowReaderTest, FailedParseIsRetried) {
426 // wrapped in unique_ptr by ReadRows
427 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
428 auto parser = absl::make_unique<ReadRowsParserMock>();
429 parser->SetRows({"r1"});
430 auto response = bigtable::testing::ReadRowsResponseFromString("chunks {}");
431 {
432 testing::InSequence s;
433 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
434 EXPECT_CALL(*stream, Read(_))
435 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
436 EXPECT_CALL(*parser, HandleChunkHook(_, _))
437 .WillOnce(testing::SetArgReferee<1>(
438 grpc::Status(grpc::StatusCode::INTERNAL, "parser exception")));
439
440 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
441 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
442 .WillOnce(Return(std::chrono::milliseconds(0)));
443
444 // the stub will free it
445 auto* stream_retry =
446 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
447 EXPECT_CALL(*client_, ReadRows(_, _))
448 .WillOnce(stream_retry->MakeMockReturner());
449 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
450 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
451 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
452 }
453
454 parser_factory_->AddParser(std::move(parser));
455 bigtable::RowReader reader(
456 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
457 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
458 std::move(backoff_policy_), metadata_update_policy_,
459 std::move(parser_factory_));
460
461 auto it = reader.begin();
462 EXPECT_NE(it, reader.end());
463 ASSERT_STATUS_OK(*it);
464 EXPECT_EQ((*it)->row_key(), "r1");
465 EXPECT_EQ(++it, reader.end());
466 }
467
TEST_F(RowReaderTest,FailedParseRetriesSkipAlreadyReadRows)468 TEST_F(RowReaderTest, FailedParseRetriesSkipAlreadyReadRows) {
469 // wrapped in unique_ptr by ReadRows
470 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
471 auto parser = absl::make_unique<ReadRowsParserMock>();
472 parser->SetRows({"r1"});
473 {
474 testing::InSequence s;
475 // For sanity, check we have two rows in the initial request
476 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(2)))
477 .WillOnce(stream->MakeMockReturner());
478
479 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
480 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
481 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
482 grpc::Status status;
483 EXPECT_CALL(*parser, HandleEndOfStreamHook(_))
484 .WillOnce(testing::SetArgReferee<0>(
485 grpc::Status(grpc::StatusCode::INTERNAL, "InternalError")));
486
487 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
488 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
489 .WillOnce(Return(std::chrono::milliseconds(0)));
490
491 // the stub will free it
492 auto* stream_retry =
493 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
494 // First row should be removed from the retried request, leaving one row
495 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(1)))
496 .WillOnce(stream_retry->MakeMockReturner());
497 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
498 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
499 }
500
501 parser_factory_->AddParser(std::move(parser));
502 bigtable::RowReader reader(
503 client_, "", bigtable::RowSet("r1", "r2"),
504 bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
505 std::move(retry_policy_), std::move(backoff_policy_),
506 metadata_update_policy_, std::move(parser_factory_));
507
508 auto it = reader.begin();
509 EXPECT_NE(it, reader.end());
510 ASSERT_STATUS_OK(*it);
511 EXPECT_EQ((*it)->row_key(), "r1");
512 EXPECT_EQ(++it, reader.end());
513 }
514
TEST_F(RowReaderTest,FailedParseWithNoRetryThrowsNoExcept)515 TEST_F(RowReaderTest, FailedParseWithNoRetryThrowsNoExcept) {
516 // wrapped in unique_ptr by ReadRows
517 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
518 auto parser = absl::make_unique<ReadRowsParserMock>();
519 {
520 testing::InSequence s;
521
522 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
523 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
524 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
525 grpc::Status status;
526 EXPECT_CALL(*parser, HandleEndOfStreamHook(_))
527 .WillOnce(testing::SetArgReferee<0>(
528 grpc::Status(grpc::StatusCode::INTERNAL, "InternalError")));
529 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(false));
530 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_)).Times(0);
531 }
532
533 parser_factory_->AddParser(std::move(parser));
534 bigtable::RowReader reader(
535 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
536 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
537 std::move(backoff_policy_), metadata_update_policy_,
538 std::move(parser_factory_));
539
540 auto it = reader.begin();
541 EXPECT_NE(it, reader.end());
542 EXPECT_FALSE(*it);
543 }
544
TEST_F(RowReaderTest,FailedStreamWithAllRequiedRowsSeenShouldNotRetry)545 TEST_F(RowReaderTest, FailedStreamWithAllRequiedRowsSeenShouldNotRetry) {
546 // wrapped in unique_ptr by ReadRows
547 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
548 auto parser = absl::make_unique<ReadRowsParserMock>();
549 parser->SetRows({"r2"});
550 {
551 testing::InSequence s;
552 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
553
554 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
555 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
556 EXPECT_CALL(*stream, Finish())
557 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL,
558 "this exception must be ignored")));
559
560 // Note there is no expectation of a new connection, because the
561 // set of rows to read should become empty after reading "r2" and
562 // intersecting the requested ["r1", "r2"] with ("r2", "") for the
563 // retry.
564 }
565
566 parser_factory_->AddParser(std::move(parser));
567 bigtable::RowReader reader(
568 client_, "", bigtable::RowSet(bigtable::RowRange::Closed("r1", "r2")),
569 bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
570 std::move(retry_policy_), std::move(backoff_policy_),
571 metadata_update_policy_, std::move(parser_factory_));
572
573 auto it = reader.begin();
574 EXPECT_NE(it, reader.end());
575 ASSERT_STATUS_OK(*it);
576 EXPECT_EQ((*it)->row_key(), "r2");
577 EXPECT_EQ(++it, reader.end());
578 }
579
TEST_F(RowReaderTest,RowLimitIsSent)580 TEST_F(RowReaderTest, RowLimitIsSent) {
581 // wrapped in unique_ptr by ReadRows
582 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
583 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(442)))
584 .WillOnce(stream->MakeMockReturner());
585 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
586 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
587
588 bigtable::RowReader reader(
589 client_, "", bigtable::RowSet(), 442, bigtable::Filter::PassAllFilter(),
590 std::move(retry_policy_), std::move(backoff_policy_),
591 metadata_update_policy_, std::move(parser_factory_));
592
593 auto it = reader.begin();
594 EXPECT_EQ(it, reader.end());
595 }
596
TEST_F(RowReaderTest,RowLimitIsDecreasedOnRetry)597 TEST_F(RowReaderTest, RowLimitIsDecreasedOnRetry) {
598 // wrapped in unique_ptr by ReadRows
599 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
600 auto parser = absl::make_unique<ReadRowsParserMock>();
601 parser->SetRows({"r1"});
602 {
603 testing::InSequence s;
604 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(42)))
605 .WillOnce(stream->MakeMockReturner());
606
607 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
608 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
609 EXPECT_CALL(*stream, Finish())
610 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
611
612 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
613 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
614 .WillOnce(Return(std::chrono::milliseconds(0)));
615
616 // the stub will free it
617 auto* stream_retry =
618 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
619 // 41 instead of 42
620 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(41)))
621 .WillOnce(stream_retry->MakeMockReturner());
622 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
623 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
624 }
625
626 parser_factory_->AddParser(std::move(parser));
627 bigtable::RowReader reader(
628 client_, "", bigtable::RowSet(), 42, bigtable::Filter::PassAllFilter(),
629 std::move(retry_policy_), std::move(backoff_policy_),
630 metadata_update_policy_, std::move(parser_factory_));
631
632 auto it = reader.begin();
633 EXPECT_NE(it, reader.end());
634 ASSERT_STATUS_OK(*it);
635 EXPECT_EQ((*it)->row_key(), "r1");
636 EXPECT_EQ(++it, reader.end());
637 }
638
TEST_F(RowReaderTest,RowLimitIsNotDecreasedToZero)639 TEST_F(RowReaderTest, RowLimitIsNotDecreasedToZero) {
640 // wrapped in unique_ptr by ReadRows
641 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
642 auto parser = absl::make_unique<ReadRowsParserMock>();
643 parser->SetRows({"r1"});
644 {
645 testing::InSequence s;
646 EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(1)))
647 .WillOnce(stream->MakeMockReturner());
648
649 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
650 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
651 EXPECT_CALL(*stream, Finish())
652 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL,
653 "this exception must be ignored")));
654
655 // Note there is no expectation of a new connection, because the
656 // row limit reaches zero.
657 }
658
659 parser_factory_->AddParser(std::move(parser));
660 bigtable::RowReader reader(
661 client_, "", bigtable::RowSet(), 1, bigtable::Filter::PassAllFilter(),
662 std::move(retry_policy_), std::move(backoff_policy_),
663 metadata_update_policy_, std::move(parser_factory_));
664
665 auto it = reader.begin();
666 EXPECT_NE(it, reader.end());
667 ASSERT_STATUS_OK(*it);
668 EXPECT_EQ((*it)->row_key(), "r1");
669 EXPECT_EQ(++it, reader.end());
670 }
671
TEST_F(RowReaderTest,BeginThrowsAfterCancelClosesStreamNoExcept)672 TEST_F(RowReaderTest, BeginThrowsAfterCancelClosesStreamNoExcept) {
673 auto parser = absl::make_unique<ReadRowsParserMock>();
674 parser->SetRows({"r1"});
675 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
676 {
677 testing::InSequence s;
678 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
679 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
680 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
681 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
682 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
683 EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
684 }
685
686 parser_factory_->AddParser(std::move(parser));
687 bigtable::RowReader reader(
688 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
689 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
690 std::move(backoff_policy_), metadata_update_policy_,
691 std::move(parser_factory_));
692
693 auto it = reader.begin();
694 EXPECT_NE(it, reader.end());
695 ASSERT_STATUS_OK(*it);
696 EXPECT_EQ((*it)->row_key(), "r1");
697 EXPECT_NE(it, reader.end());
698 // Manually cancel the call.
699 reader.Cancel();
700 it = reader.begin();
701 EXPECT_NE(it, reader.end());
702 EXPECT_FALSE(*it);
703 }
704
TEST_F(RowReaderTest,BeginThrowsAfterImmediateCancelNoExcept)705 TEST_F(RowReaderTest, BeginThrowsAfterImmediateCancelNoExcept) {
706 auto backend =
707 std::make_shared<google::cloud::testing_util::CaptureLogLinesBackend>();
708 auto id = google::cloud::LogSink::Instance().AddBackend(backend);
709
710 std::unique_ptr<bigtable::RowReader> reader(new bigtable::RowReader(
711 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
712 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
713 std::move(backoff_policy_), metadata_update_policy_,
714 std::move(parser_factory_)));
715 // Manually cancel the call before a stream was created.
716 reader->Cancel();
717 reader->begin();
718
719 auto it = reader->begin();
720 EXPECT_NE(it, reader->end());
721 EXPECT_FALSE(*it);
722
723 // Delete the reader and verify no log is produced because we handled the
724 // error.
725 reader.reset();
726
727 google::cloud::LogSink::Instance().RemoveBackend(id);
728
729 EXPECT_THAT(
730 backend->ClearLogLines(),
731 Not(Contains(HasSubstr(
732 "RowReader has an error, and the error status was not retrieved"))));
733 }
734
TEST_F(RowReaderTest,RowReaderConstructorDoesNotCallRpc)735 TEST_F(RowReaderTest, RowReaderConstructorDoesNotCallRpc) {
736 // The RowReader constructor/destructor by themselves should not
737 // invoke the RPC or create parsers (the latter restriction because
738 // parsers are per-connection and non-reusable).
739 EXPECT_CALL(*client_, ReadRows(_, _)).Times(0);
740 EXPECT_CALL(*parser_factory_, CreateHook()).Times(0);
741
742 bigtable::RowReader reader(
743 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
744 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
745 std::move(backoff_policy_), metadata_update_policy_,
746 std::move(parser_factory_));
747 }
748
TEST_F(RowReaderTest,FailedStreamRetryNewContext)749 TEST_F(RowReaderTest, FailedStreamRetryNewContext) {
750 // Every retry should use a new ClientContext object.
751 // wrapped in unique_ptr by ReadRows
752 auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
753 auto parser = absl::make_unique<ReadRowsParserMock>();
754 parser->SetRows({"r1"});
755
756 void* previous_context = nullptr;
757 EXPECT_CALL(*retry_policy_, SetupHook(_))
758 .Times(2)
759 .WillRepeatedly([&previous_context](grpc::ClientContext& context) {
760 // This is a big hack, we want to make sure the context is new,
761 // but there is no easy way to check that, so we compare addresses.
762 EXPECT_NE(previous_context, &context);
763 previous_context = &context;
764 });
765
766 {
767 testing::InSequence s;
768 EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
769 EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
770 EXPECT_CALL(*stream, Finish())
771 .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
772
773 EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
774 EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
775 .WillOnce(Return(std::chrono::milliseconds(0)));
776
777 // the stub will free it
778 auto* stream_retry =
779 new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
780 EXPECT_CALL(*client_, ReadRows(_, _))
781 .WillOnce(stream_retry->MakeMockReturner());
782 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
783 EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
784 EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
785 }
786
787 parser_factory_->AddParser(std::move(parser));
788 bigtable::RowReader reader(
789 client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
790 bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
791 std::move(backoff_policy_), metadata_update_policy_,
792 std::move(parser_factory_));
793
794 auto it = reader.begin();
795 EXPECT_NE(it, reader.end());
796 ASSERT_STATUS_OK(*it);
797 EXPECT_EQ((*it)->row_key(), "r1");
798 EXPECT_EQ(++it, reader.end());
799 }
800