1 // Copyright 2017 Google Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/row_reader.h"
16 #include "google/cloud/bigtable/table.h"
17 #include "google/cloud/bigtable/testing/mock_read_rows_reader.h"
18 #include "google/cloud/bigtable/testing/table_test_fixture.h"
19 #include "google/cloud/internal/api_client_header.h"
20 #include "google/cloud/internal/throw_delegate.h"
21 #include "google/cloud/testing_util/assert_ok.h"
22 #include "google/cloud/testing_util/capture_log_lines_backend.h"
23 #include "google/cloud/testing_util/validate_metadata.h"
24 #include "absl/memory/memory.h"
25 #include <gmock/gmock.h>
26 #include <deque>
27 #include <initializer_list>
28 
29 using ::google::bigtable::v2::ReadRowsRequest;
30 using ::google::bigtable::v2::ReadRowsResponse_CellChunk;
31 using ::google::cloud::testing_util::IsContextMDValid;
32 using ::testing::_;
33 using ::testing::Contains;
34 using ::testing::DoAll;
35 using ::testing::Eq;
36 using ::testing::HasSubstr;
37 using ::testing::Matcher;
38 using ::testing::Not;
39 using ::testing::Property;
40 using ::testing::Return;
41 using ::testing::SetArgPointee;
42 
43 namespace bigtable = google::cloud::bigtable;
44 using bigtable::Row;
45 using bigtable::testing::MockReadRowsReader;
46 
47 namespace {
48 class ReadRowsParserMock : public bigtable::internal::ReadRowsParser {
49  public:
50   MOCK_METHOD2(HandleChunkHook,
51                void(ReadRowsResponse_CellChunk chunk, grpc::Status& status));
HandleChunk(ReadRowsResponse_CellChunk chunk,grpc::Status & status)52   void HandleChunk(ReadRowsResponse_CellChunk chunk,
53                    grpc::Status& status) override {
54     HandleChunkHook(chunk, status);
55   }
56 
57   MOCK_METHOD1(HandleEndOfStreamHook, void(grpc::Status& status));
HandleEndOfStream(grpc::Status & status)58   void HandleEndOfStream(grpc::Status& status) override {
59     HandleEndOfStreamHook(status);
60   }
61 
HasNext() const62   bool HasNext() const override { return !rows_.empty(); }
63 
Next(grpc::Status &)64   Row Next(grpc::Status&) override {
65     Row row = rows_.front();
66     rows_.pop_front();
67     return row;
68   }
69 
SetRows(std::initializer_list<std::string> l)70   void SetRows(std::initializer_list<std::string> l) {
71     std::transform(l.begin(), l.end(), std::back_inserter(rows_),
72                    [](std::string const& s) -> Row {
73                      return Row(s, std::vector<bigtable::Cell>());
74                    });
75   }
76 
77  private:
78   std::deque<Row> rows_;
79 };
80 
81 // Returns a preconfigured set of parsers, so expectations can be set on each.
82 class ReadRowsParserMockFactory
83     : public bigtable::internal::ReadRowsParserFactory {
84   using ParserPtr = std::unique_ptr<bigtable::internal::ReadRowsParser>;
85 
86  public:
AddParser(ParserPtr parser)87   void AddParser(ParserPtr parser) { parsers_.emplace_back(std::move(parser)); }
88 
89   // We only need a hook here because MOCK_METHOD0 would not add the
90   // 'override' keyword that a compiler warning expects for Create().
91   MOCK_METHOD0(CreateHook, void());
Create()92   ParserPtr Create() override {
93     CreateHook();
94     if (parsers_.empty()) {
95       return ParserPtr(new bigtable::internal::ReadRowsParser);
96     }
97     ParserPtr parser = std::move(parsers_.front());
98     parsers_.pop_front();
99     return parser;
100   }
101 
102  private:
103   std::deque<ParserPtr> parsers_;
104 };
105 
106 class RetryPolicyMock : public bigtable::RPCRetryPolicy {
107  public:
108   RetryPolicyMock() = default;
clone() const109   std::unique_ptr<RPCRetryPolicy> clone() const override {
110     google::cloud::internal::ThrowRuntimeError("Mocks cannot be copied.");
111   }
112 
113   MOCK_CONST_METHOD1(SetupHook, void(grpc::ClientContext&));
Setup(grpc::ClientContext & context) const114   void Setup(grpc::ClientContext& context) const override {
115     SetupHook(context);
116   }
117 
118   MOCK_METHOD1(OnFailureHook, bool(grpc::Status const& status));
OnFailure(grpc::Status const & status)119   bool OnFailure(grpc::Status const& status) override {
120     return OnFailureHook(status);
121   }
OnFailure(google::cloud::Status const &)122   bool OnFailure(google::cloud::Status const&) override { return true; }
123 };
124 
125 class BackoffPolicyMock : public bigtable::RPCBackoffPolicy {
126  public:
127   BackoffPolicyMock() = default;
clone() const128   std::unique_ptr<RPCBackoffPolicy> clone() const override {
129     google::cloud::internal::ThrowRuntimeError("Mocks cannot be copied.");
130   }
Setup(grpc::ClientContext &) const131   void Setup(grpc::ClientContext&) const override {}
132   MOCK_METHOD1(OnCompletionHook,
133                std::chrono::milliseconds(grpc::Status const& s));
OnCompletion(grpc::Status const & s)134   std::chrono::milliseconds OnCompletion(grpc::Status const& s) override {
135     return OnCompletionHook(s);
136   }
OnCompletion(google::cloud::Status const &)137   std::chrono::milliseconds OnCompletion(
138       google::cloud::Status const&) override {
139     return std::chrono::milliseconds(0);
140   }
141 };
142 
143 // Match the number of expected row keys in a request in EXPECT_CALL
RequestWithRowKeysCount(int n)144 Matcher<const ReadRowsRequest&> RequestWithRowKeysCount(int n) {
145   return Property(
146       &ReadRowsRequest::rows,
147       Property(&google::bigtable::v2::RowSet::row_keys_size, Eq(n)));
148 }
149 
150 // Match the row limit in a request
RequestWithRowsLimit(std::int64_t n)151 Matcher<const ReadRowsRequest&> RequestWithRowsLimit(std::int64_t n) {
152   return Property(&ReadRowsRequest::rows_limit, Eq(n));
153 }
154 
155 }  // anonymous namespace
156 
157 class RowReaderTest : public bigtable::testing::TableTestFixture {
158  public:
RowReaderTest()159   RowReaderTest()
160       : retry_policy_(new RetryPolicyMock),
161         backoff_policy_(new BackoffPolicyMock),
162         metadata_update_policy_(kTableName,
163                                 bigtable::MetadataParamTypes::TABLE_NAME),
164         parser_factory_(new ReadRowsParserMockFactory) {}
165 
166   std::unique_ptr<RetryPolicyMock> retry_policy_;
167   std::unique_ptr<BackoffPolicyMock> backoff_policy_;
168   bigtable::MetadataUpdatePolicy metadata_update_policy_;
169   std::unique_ptr<ReadRowsParserMockFactory> parser_factory_;
170 };
171 
TEST_F(RowReaderTest,EmptyReaderHasNoRows)172 TEST_F(RowReaderTest, EmptyReaderHasNoRows) {
173   // wrapped in unique_ptr by ReadRows
174   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
175   EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
176   EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
177   EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
178 
179   bigtable::RowReader reader(
180       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
181       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
182       std::move(backoff_policy_), metadata_update_policy_,
183       std::move(parser_factory_));
184 
185   EXPECT_EQ(reader.begin(), reader.end());
186 }
187 
TEST_F(RowReaderTest,ReadOneRow)188 TEST_F(RowReaderTest, ReadOneRow) {
189   // wrapped in unique_ptr by ReadRows
190   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
191   auto parser = absl::make_unique<ReadRowsParserMock>();
192   parser->SetRows({"r1"});
193   EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
194   {
195     testing::InSequence s;
196     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
197     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
198     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
199     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
200   }
201 
202   parser_factory_->AddParser(std::move(parser));
203   bigtable::RowReader reader(
204       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
205       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
206       std::move(backoff_policy_), metadata_update_policy_,
207       std::move(parser_factory_));
208 
209   auto it = reader.begin();
210   EXPECT_NE(it, reader.end());
211   ASSERT_STATUS_OK(*it);
212   EXPECT_EQ((*it)->row_key(), "r1");
213   EXPECT_EQ(++it, reader.end());
214 }
215 
TEST_F(RowReaderTest,ReadOneRowAppProfileId)216 TEST_F(RowReaderTest, ReadOneRowAppProfileId) {
217   // wrapped in unique_ptr by ReadRows
218   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
219   auto parser = absl::make_unique<ReadRowsParserMock>();
220   parser->SetRows({"r1"});
221   EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
222   {
223     testing::InSequence s;
224     std::string expected_id = "test-id";
225     EXPECT_CALL(*client_, ReadRows(_, _))
226         .WillOnce([expected_id, &stream](grpc::ClientContext* context,
227                                          ReadRowsRequest const& req) {
228           EXPECT_STATUS_OK(
229               IsContextMDValid(*context, "google.bigtable.v2.Bigtable.ReadRows",
230                                google::cloud::internal::ApiClientHeader()));
231           EXPECT_EQ(expected_id, req.app_profile_id());
232           return stream->AsUniqueMocked();
233         });
234     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
235     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
236     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
237   }
238 
239   parser_factory_->AddParser(std::move(parser));
240   bigtable::RowReader reader(
241       client_, "test-id", "", bigtable::RowSet(),
242       bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
243       std::move(retry_policy_), std::move(backoff_policy_),
244       metadata_update_policy_, std::move(parser_factory_));
245 
246   auto it = reader.begin();
247   EXPECT_NE(it, reader.end());
248   ASSERT_STATUS_OK(*it);
249   EXPECT_EQ((*it)->row_key(), "r1");
250   EXPECT_EQ(++it, reader.end());
251 }
252 
TEST_F(RowReaderTest,ReadOneRowIteratorPostincrement)253 TEST_F(RowReaderTest, ReadOneRowIteratorPostincrement) {
254   // wrapped in unique_ptr by ReadRows
255   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
256   auto parser = absl::make_unique<ReadRowsParserMock>();
257   parser->SetRows({"r1"});
258   EXPECT_CALL(*parser, HandleEndOfStreamHook(_)).Times(1);
259   {
260     testing::InSequence s;
261     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
262     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
263     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
264     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
265   }
266 
267   parser_factory_->AddParser(std::move(parser));
268   bigtable::RowReader reader(
269       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
270       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
271       std::move(backoff_policy_), metadata_update_policy_,
272       std::move(parser_factory_));
273 
274   auto it = reader.begin();
275   EXPECT_NE(it, reader.end());
276   // This postincrement is what we are testing
277   auto it2 = it++;
278   ASSERT_STATUS_OK(*it2);
279   EXPECT_EQ((*it2)->row_key(), "r1");
280   EXPECT_EQ(it, reader.end());
281 }
282 
TEST_F(RowReaderTest,ReadOneOfTwoRowsClosesStream)283 TEST_F(RowReaderTest, ReadOneOfTwoRowsClosesStream) {
284   // wrapped in unique_ptr by ReadRows
285   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
286   auto parser = absl::make_unique<ReadRowsParserMock>();
287   parser->SetRows({"r1"});
288   {
289     testing::InSequence s;
290     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
291     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
292     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
293     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
294     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
295     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
296   }
297 
298   parser_factory_->AddParser(std::move(parser));
299   bigtable::RowReader reader(
300       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
301       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
302       std::move(backoff_policy_), metadata_update_policy_,
303       std::move(parser_factory_));
304 
305   auto it = reader.begin();
306   EXPECT_NE(it, reader.end());
307   ASSERT_STATUS_OK(*it);
308   EXPECT_EQ((*it)->row_key(), "r1");
309   EXPECT_NE(it, reader.end());
310   // Do not finish the iteration.  We still expect the stream to be finalized,
311   // and the previously setup expectations on the mock `stream` check that.
312 }
313 
TEST_F(RowReaderTest,FailedStreamIsRetried)314 TEST_F(RowReaderTest, FailedStreamIsRetried) {
315   // wrapped in unique_ptr by ReadRows
316   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
317   auto parser = absl::make_unique<ReadRowsParserMock>();
318   parser->SetRows({"r1"});
319   {
320     testing::InSequence s;
321     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
322     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
323     EXPECT_CALL(*stream, Finish())
324         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
325 
326     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
327     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
328         .WillOnce(Return(std::chrono::milliseconds(0)));
329 
330     // the stub will free it
331     auto* stream_retry =
332         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
333     EXPECT_CALL(*client_, ReadRows(_, _))
334         .WillOnce(stream_retry->MakeMockReturner());
335     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
336     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
337     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
338   }
339 
340   parser_factory_->AddParser(std::move(parser));
341   bigtable::RowReader reader(
342       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
343       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
344       std::move(backoff_policy_), metadata_update_policy_,
345       std::move(parser_factory_));
346 
347   auto it = reader.begin();
348   EXPECT_NE(it, reader.end());
349   ASSERT_STATUS_OK(*it);
350   EXPECT_EQ((*it)->row_key(), "r1");
351   EXPECT_EQ(++it, reader.end());
352 }
353 
TEST_F(RowReaderTest,FailedStreamWithNoRetryThrowsNoExcept)354 TEST_F(RowReaderTest, FailedStreamWithNoRetryThrowsNoExcept) {
355   // wrapped in unique_ptr by ReadRows
356   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
357   auto parser = absl::make_unique<ReadRowsParserMock>();
358   {
359     testing::InSequence s;
360     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
361     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
362     EXPECT_CALL(*stream, Finish())
363         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
364 
365     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(false));
366     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_)).Times(0);
367   }
368 
369   parser_factory_->AddParser(std::move(parser));
370   bigtable::RowReader reader(
371       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
372       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
373       std::move(backoff_policy_), metadata_update_policy_,
374       std::move(parser_factory_));
375 
376   auto it = reader.begin();
377   EXPECT_NE(it, reader.end());
378   EXPECT_FALSE(*it);
379 }
380 
TEST_F(RowReaderTest,FailedStreamRetriesSkipAlreadyReadRows)381 TEST_F(RowReaderTest, FailedStreamRetriesSkipAlreadyReadRows) {
382   // wrapped in unique_ptr by ReadRows
383   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
384   auto parser = absl::make_unique<ReadRowsParserMock>();
385   parser->SetRows({"r1"});
386   {
387     testing::InSequence s;
388     // For sanity, check we have two rows in the initial request
389     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(2)))
390         .WillOnce(stream->MakeMockReturner());
391 
392     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
393     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
394     EXPECT_CALL(*stream, Finish())
395         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
396 
397     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
398     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
399         .WillOnce(Return(std::chrono::milliseconds(0)));
400 
401     // the stub will free it
402     auto* stream_retry =
403         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
404     // First row should be removed from the retried request, leaving one row
405     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(1)))
406         .WillOnce(stream_retry->MakeMockReturner());
407     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
408     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
409   }
410 
411   parser_factory_->AddParser(std::move(parser));
412   bigtable::RowReader reader(
413       client_, "", bigtable::RowSet("r1", "r2"),
414       bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
415       std::move(retry_policy_), std::move(backoff_policy_),
416       metadata_update_policy_, std::move(parser_factory_));
417 
418   auto it = reader.begin();
419   EXPECT_NE(it, reader.end());
420   ASSERT_STATUS_OK(*it);
421   EXPECT_EQ((*it)->row_key(), "r1");
422   EXPECT_EQ(++it, reader.end());
423 }
424 
TEST_F(RowReaderTest,FailedParseIsRetried)425 TEST_F(RowReaderTest, FailedParseIsRetried) {
426   // wrapped in unique_ptr by ReadRows
427   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
428   auto parser = absl::make_unique<ReadRowsParserMock>();
429   parser->SetRows({"r1"});
430   auto response = bigtable::testing::ReadRowsResponseFromString("chunks {}");
431   {
432     testing::InSequence s;
433     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
434     EXPECT_CALL(*stream, Read(_))
435         .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
436     EXPECT_CALL(*parser, HandleChunkHook(_, _))
437         .WillOnce(testing::SetArgReferee<1>(
438             grpc::Status(grpc::StatusCode::INTERNAL, "parser exception")));
439 
440     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
441     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
442         .WillOnce(Return(std::chrono::milliseconds(0)));
443 
444     // the stub will free it
445     auto* stream_retry =
446         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
447     EXPECT_CALL(*client_, ReadRows(_, _))
448         .WillOnce(stream_retry->MakeMockReturner());
449     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
450     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
451     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
452   }
453 
454   parser_factory_->AddParser(std::move(parser));
455   bigtable::RowReader reader(
456       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
457       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
458       std::move(backoff_policy_), metadata_update_policy_,
459       std::move(parser_factory_));
460 
461   auto it = reader.begin();
462   EXPECT_NE(it, reader.end());
463   ASSERT_STATUS_OK(*it);
464   EXPECT_EQ((*it)->row_key(), "r1");
465   EXPECT_EQ(++it, reader.end());
466 }
467 
TEST_F(RowReaderTest,FailedParseRetriesSkipAlreadyReadRows)468 TEST_F(RowReaderTest, FailedParseRetriesSkipAlreadyReadRows) {
469   // wrapped in unique_ptr by ReadRows
470   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
471   auto parser = absl::make_unique<ReadRowsParserMock>();
472   parser->SetRows({"r1"});
473   {
474     testing::InSequence s;
475     // For sanity, check we have two rows in the initial request
476     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(2)))
477         .WillOnce(stream->MakeMockReturner());
478 
479     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
480     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
481     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
482     grpc::Status status;
483     EXPECT_CALL(*parser, HandleEndOfStreamHook(_))
484         .WillOnce(testing::SetArgReferee<0>(
485             grpc::Status(grpc::StatusCode::INTERNAL, "InternalError")));
486 
487     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
488     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
489         .WillOnce(Return(std::chrono::milliseconds(0)));
490 
491     // the stub will free it
492     auto* stream_retry =
493         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
494     // First row should be removed from the retried request, leaving one row
495     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(1)))
496         .WillOnce(stream_retry->MakeMockReturner());
497     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
498     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
499   }
500 
501   parser_factory_->AddParser(std::move(parser));
502   bigtable::RowReader reader(
503       client_, "", bigtable::RowSet("r1", "r2"),
504       bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
505       std::move(retry_policy_), std::move(backoff_policy_),
506       metadata_update_policy_, std::move(parser_factory_));
507 
508   auto it = reader.begin();
509   EXPECT_NE(it, reader.end());
510   ASSERT_STATUS_OK(*it);
511   EXPECT_EQ((*it)->row_key(), "r1");
512   EXPECT_EQ(++it, reader.end());
513 }
514 
TEST_F(RowReaderTest,FailedParseWithNoRetryThrowsNoExcept)515 TEST_F(RowReaderTest, FailedParseWithNoRetryThrowsNoExcept) {
516   // wrapped in unique_ptr by ReadRows
517   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
518   auto parser = absl::make_unique<ReadRowsParserMock>();
519   {
520     testing::InSequence s;
521 
522     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
523     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
524     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
525     grpc::Status status;
526     EXPECT_CALL(*parser, HandleEndOfStreamHook(_))
527         .WillOnce(testing::SetArgReferee<0>(
528             grpc::Status(grpc::StatusCode::INTERNAL, "InternalError")));
529     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(false));
530     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_)).Times(0);
531   }
532 
533   parser_factory_->AddParser(std::move(parser));
534   bigtable::RowReader reader(
535       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
536       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
537       std::move(backoff_policy_), metadata_update_policy_,
538       std::move(parser_factory_));
539 
540   auto it = reader.begin();
541   EXPECT_NE(it, reader.end());
542   EXPECT_FALSE(*it);
543 }
544 
TEST_F(RowReaderTest,FailedStreamWithAllRequiedRowsSeenShouldNotRetry)545 TEST_F(RowReaderTest, FailedStreamWithAllRequiedRowsSeenShouldNotRetry) {
546   // wrapped in unique_ptr by ReadRows
547   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
548   auto parser = absl::make_unique<ReadRowsParserMock>();
549   parser->SetRows({"r2"});
550   {
551     testing::InSequence s;
552     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
553 
554     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
555     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
556     EXPECT_CALL(*stream, Finish())
557         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL,
558                                       "this exception must be ignored")));
559 
560     // Note there is no expectation of a new connection, because the
561     // set of rows to read should become empty after reading "r2" and
562     // intersecting the requested ["r1", "r2"] with ("r2", "") for the
563     // retry.
564   }
565 
566   parser_factory_->AddParser(std::move(parser));
567   bigtable::RowReader reader(
568       client_, "", bigtable::RowSet(bigtable::RowRange::Closed("r1", "r2")),
569       bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
570       std::move(retry_policy_), std::move(backoff_policy_),
571       metadata_update_policy_, std::move(parser_factory_));
572 
573   auto it = reader.begin();
574   EXPECT_NE(it, reader.end());
575   ASSERT_STATUS_OK(*it);
576   EXPECT_EQ((*it)->row_key(), "r2");
577   EXPECT_EQ(++it, reader.end());
578 }
579 
TEST_F(RowReaderTest,RowLimitIsSent)580 TEST_F(RowReaderTest, RowLimitIsSent) {
581   // wrapped in unique_ptr by ReadRows
582   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
583   EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(442)))
584       .WillOnce(stream->MakeMockReturner());
585   EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
586   EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
587 
588   bigtable::RowReader reader(
589       client_, "", bigtable::RowSet(), 442, bigtable::Filter::PassAllFilter(),
590       std::move(retry_policy_), std::move(backoff_policy_),
591       metadata_update_policy_, std::move(parser_factory_));
592 
593   auto it = reader.begin();
594   EXPECT_EQ(it, reader.end());
595 }
596 
TEST_F(RowReaderTest,RowLimitIsDecreasedOnRetry)597 TEST_F(RowReaderTest, RowLimitIsDecreasedOnRetry) {
598   // wrapped in unique_ptr by ReadRows
599   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
600   auto parser = absl::make_unique<ReadRowsParserMock>();
601   parser->SetRows({"r1"});
602   {
603     testing::InSequence s;
604     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(42)))
605         .WillOnce(stream->MakeMockReturner());
606 
607     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
608     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
609     EXPECT_CALL(*stream, Finish())
610         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
611 
612     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
613     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
614         .WillOnce(Return(std::chrono::milliseconds(0)));
615 
616     // the stub will free it
617     auto* stream_retry =
618         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
619     // 41 instead of 42
620     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(41)))
621         .WillOnce(stream_retry->MakeMockReturner());
622     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
623     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
624   }
625 
626   parser_factory_->AddParser(std::move(parser));
627   bigtable::RowReader reader(
628       client_, "", bigtable::RowSet(), 42, bigtable::Filter::PassAllFilter(),
629       std::move(retry_policy_), std::move(backoff_policy_),
630       metadata_update_policy_, std::move(parser_factory_));
631 
632   auto it = reader.begin();
633   EXPECT_NE(it, reader.end());
634   ASSERT_STATUS_OK(*it);
635   EXPECT_EQ((*it)->row_key(), "r1");
636   EXPECT_EQ(++it, reader.end());
637 }
638 
TEST_F(RowReaderTest,RowLimitIsNotDecreasedToZero)639 TEST_F(RowReaderTest, RowLimitIsNotDecreasedToZero) {
640   // wrapped in unique_ptr by ReadRows
641   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
642   auto parser = absl::make_unique<ReadRowsParserMock>();
643   parser->SetRows({"r1"});
644   {
645     testing::InSequence s;
646     EXPECT_CALL(*client_, ReadRows(_, RequestWithRowsLimit(1)))
647         .WillOnce(stream->MakeMockReturner());
648 
649     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
650     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
651     EXPECT_CALL(*stream, Finish())
652         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL,
653                                       "this exception must be ignored")));
654 
655     // Note there is no expectation of a new connection, because the
656     // row limit reaches zero.
657   }
658 
659   parser_factory_->AddParser(std::move(parser));
660   bigtable::RowReader reader(
661       client_, "", bigtable::RowSet(), 1, bigtable::Filter::PassAllFilter(),
662       std::move(retry_policy_), std::move(backoff_policy_),
663       metadata_update_policy_, std::move(parser_factory_));
664 
665   auto it = reader.begin();
666   EXPECT_NE(it, reader.end());
667   ASSERT_STATUS_OK(*it);
668   EXPECT_EQ((*it)->row_key(), "r1");
669   EXPECT_EQ(++it, reader.end());
670 }
671 
TEST_F(RowReaderTest,BeginThrowsAfterCancelClosesStreamNoExcept)672 TEST_F(RowReaderTest, BeginThrowsAfterCancelClosesStreamNoExcept) {
673   auto parser = absl::make_unique<ReadRowsParserMock>();
674   parser->SetRows({"r1"});
675   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
676   {
677     testing::InSequence s;
678     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
679     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
680     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
681     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(true));
682     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
683     EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
684   }
685 
686   parser_factory_->AddParser(std::move(parser));
687   bigtable::RowReader reader(
688       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
689       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
690       std::move(backoff_policy_), metadata_update_policy_,
691       std::move(parser_factory_));
692 
693   auto it = reader.begin();
694   EXPECT_NE(it, reader.end());
695   ASSERT_STATUS_OK(*it);
696   EXPECT_EQ((*it)->row_key(), "r1");
697   EXPECT_NE(it, reader.end());
698   // Manually cancel the call.
699   reader.Cancel();
700   it = reader.begin();
701   EXPECT_NE(it, reader.end());
702   EXPECT_FALSE(*it);
703 }
704 
TEST_F(RowReaderTest,BeginThrowsAfterImmediateCancelNoExcept)705 TEST_F(RowReaderTest, BeginThrowsAfterImmediateCancelNoExcept) {
706   auto backend =
707       std::make_shared<google::cloud::testing_util::CaptureLogLinesBackend>();
708   auto id = google::cloud::LogSink::Instance().AddBackend(backend);
709 
710   std::unique_ptr<bigtable::RowReader> reader(new bigtable::RowReader(
711       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
712       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
713       std::move(backoff_policy_), metadata_update_policy_,
714       std::move(parser_factory_)));
715   // Manually cancel the call before a stream was created.
716   reader->Cancel();
717   reader->begin();
718 
719   auto it = reader->begin();
720   EXPECT_NE(it, reader->end());
721   EXPECT_FALSE(*it);
722 
723   // Delete the reader and verify no log is produced because we handled the
724   // error.
725   reader.reset();
726 
727   google::cloud::LogSink::Instance().RemoveBackend(id);
728 
729   EXPECT_THAT(
730       backend->ClearLogLines(),
731       Not(Contains(HasSubstr(
732           "RowReader has an error, and the error status was not retrieved"))));
733 }
734 
TEST_F(RowReaderTest,RowReaderConstructorDoesNotCallRpc)735 TEST_F(RowReaderTest, RowReaderConstructorDoesNotCallRpc) {
736   // The RowReader constructor/destructor by themselves should not
737   // invoke the RPC or create parsers (the latter restriction because
738   // parsers are per-connection and non-reusable).
739   EXPECT_CALL(*client_, ReadRows(_, _)).Times(0);
740   EXPECT_CALL(*parser_factory_, CreateHook()).Times(0);
741 
742   bigtable::RowReader reader(
743       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
744       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
745       std::move(backoff_policy_), metadata_update_policy_,
746       std::move(parser_factory_));
747 }
748 
TEST_F(RowReaderTest,FailedStreamRetryNewContext)749 TEST_F(RowReaderTest, FailedStreamRetryNewContext) {
750   // Every retry should use a new ClientContext object.
751   // wrapped in unique_ptr by ReadRows
752   auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
753   auto parser = absl::make_unique<ReadRowsParserMock>();
754   parser->SetRows({"r1"});
755 
756   void* previous_context = nullptr;
757   EXPECT_CALL(*retry_policy_, SetupHook(_))
758       .Times(2)
759       .WillRepeatedly([&previous_context](grpc::ClientContext& context) {
760         // This is a big hack, we want to make sure the context is new,
761         // but there is no easy way to check that, so we compare addresses.
762         EXPECT_NE(previous_context, &context);
763         previous_context = &context;
764       });
765 
766   {
767     testing::InSequence s;
768     EXPECT_CALL(*client_, ReadRows(_, _)).WillOnce(stream->MakeMockReturner());
769     EXPECT_CALL(*stream, Read(_)).WillOnce(Return(false));
770     EXPECT_CALL(*stream, Finish())
771         .WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
772 
773     EXPECT_CALL(*retry_policy_, OnFailureHook(_)).WillOnce(Return(true));
774     EXPECT_CALL(*backoff_policy_, OnCompletionHook(_))
775         .WillOnce(Return(std::chrono::milliseconds(0)));
776 
777     // the stub will free it
778     auto* stream_retry =
779         new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
780     EXPECT_CALL(*client_, ReadRows(_, _))
781         .WillOnce(stream_retry->MakeMockReturner());
782     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(true));
783     EXPECT_CALL(*stream_retry, Read(_)).WillOnce(Return(false));
784     EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
785   }
786 
787   parser_factory_->AddParser(std::move(parser));
788   bigtable::RowReader reader(
789       client_, "", bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
790       bigtable::Filter::PassAllFilter(), std::move(retry_policy_),
791       std::move(backoff_policy_), metadata_update_policy_,
792       std::move(parser_factory_));
793 
794   auto it = reader.begin();
795   EXPECT_NE(it, reader.end());
796   ASSERT_STATUS_OK(*it);
797   EXPECT_EQ((*it)->row_key(), "r1");
798   EXPECT_EQ(++it, reader.end());
799 }
800