1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/completion_queue.h"
16 #include "google/cloud/bigtable/version.h"
17 #include "google/cloud/testing_util/assert_ok.h"
18 #include "absl/memory/memory.h"
19 #include <google/bigtable/v2/bigtable.grpc.pb.h>
20 #include <gmock/gmock.h>
21 #include <future>
22 
23 namespace google {
24 namespace cloud {
25 namespace bigtable {
26 inline namespace BIGTABLE_CLIENT_NS {
27 namespace {
28 
29 namespace btproto = google::bigtable::v2;
30 
31 /**
32  * Implement a single streaming read RPC to test the wrappers.
33  */
34 class BulkApplyImpl final : public google::bigtable::v2::Bigtable::Service {
35  public:
36   BulkApplyImpl() = default;
37 
MutateRows(grpc::ServerContext * context,google::bigtable::v2::MutateRowsRequest const * request,grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer)38   grpc::Status MutateRows(
39       grpc::ServerContext* context,
40       google::bigtable::v2::MutateRowsRequest const* request,
41       grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer)
42       override {
43     std::unique_lock<std::mutex> lk(mu_);
44     if (!callback_) {
45       return grpc::Status::OK;
46     }
47     Callback cb;
48     cb.swap(callback_);
49     lk.unlock();
50     return cb(context, request, writer);
51   }
52 
53   using Callback = std::function<grpc::Status(
54       grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
55       grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*)>;
56 
SetCallback(Callback callback)57   void SetCallback(Callback callback) {
58     std::unique_lock<std::mutex> lk(mu_);
59     callback_ = std::move(callback);
60   }
61 
62  private:
63   std::mutex mu_;
64   Callback callback_;
65 };
66 
67 /**
68  * This test starts a server in a separate thread, and then executes against
69  * that server. We want to test the wrappers end-to-end, particularly with
70  * respect to error handling, and cancellation.
71  */
72 class AsyncReadStreamTest : public ::testing::Test {
73  protected:
SetUp()74   void SetUp() override {
75     int port;
76     std::string server_address("[::]:0");
77     builder_.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
78                               &port);
79     builder_.RegisterService(&impl_);
80     server_ = builder_.BuildAndStart();
81     server_thread_ = std::thread([this]() { server_->Wait(); });
82 
83     std::shared_ptr<grpc::Channel> channel =
84         grpc::CreateChannel("localhost:" + std::to_string(port),
85                             grpc::InsecureChannelCredentials());
86     stub_ = google::bigtable::v2::Bigtable::NewStub(channel);
87 
88     cq_thread_ = std::thread([this] { cq_.Run(); });
89   }
90 
TearDown()91   void TearDown() override {
92     cq_.Shutdown();
93     if (cq_thread_.joinable()) {
94       cq_thread_.join();
95     }
96     WaitForServerShutdown();
97   }
98 
WaitForServerShutdown()99   void WaitForServerShutdown() {
100     server_->Shutdown();
101     if (server_thread_.joinable()) {
102       server_thread_.join();
103     }
104   }
105 
WriteOne(grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer,int index)106   static void WriteOne(
107       grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer,
108       int index) {
109     google::bigtable::v2::MutateRowsResponse response;
110     response.add_entries()->set_index(index);
111     writer->Write(response, grpc::WriteOptions().set_write_through());
112   }
113 
WriteLast(grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer,int index)114   static void WriteLast(
115       grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer,
116       int index) {
117     google::bigtable::v2::MutateRowsResponse response;
118     response.add_entries()->set_index(index);
119     writer->Write(response,
120                   grpc::WriteOptions().set_write_through().set_last_message());
121   }
122 
123   BulkApplyImpl impl_;
124   grpc::ServerBuilder builder_;
125   std::unique_ptr<grpc::Server> server_;
126   std::thread server_thread_;
127   std::unique_ptr<google::bigtable::v2::Bigtable::StubInterface> stub_;
128 
129   CompletionQueue cq_;
130   std::thread cq_thread_;
131 };
132 
133 // A synchronization primitive to block a thread until it is allowed to
134 // continue.
135 class SimpleBarrier {
136  public:
137   SimpleBarrier() = default;
138 
Wait()139   void Wait() { impl_.get_future().get(); }
Lift()140   void Lift() { impl_.set_value(); }
141 
142  private:
143   promise<void> impl_;
144 };
145 
146 struct HandlerResult {
147   std::vector<btproto::MutateRowsResponse> reads;
148   Status status;
149   SimpleBarrier done;
150 };
151 
152 /// @test Verify that completion queues correctly validate asynchronous
153 /// streaming read RPC callables.
TEST_F(AsyncReadStreamTest,MetaFunctions)154 TEST_F(AsyncReadStreamTest, MetaFunctions) {
155   auto async_call = [this](grpc::ClientContext* context,
156                            btproto::MutateRowsRequest const& request,
157                            grpc::CompletionQueue* cq) {
158     return stub_->PrepareAsyncMutateRows(context, request, cq);
159   };
160   static_assert(
161       std::is_same<
162           btproto::MutateRowsResponse,
163           google::cloud::internal::AsyncStreamingReadResponseType<
164               decltype(async_call), btproto::MutateRowsRequest>::type>::value,
165       "Unexpected type for AsyncStreamingReadResponseType<>");
166 }
167 
168 /// @test Verify that AsyncReadStream works even if the server does not exist.
TEST_F(AsyncReadStreamTest,CannotConnect)169 TEST_F(AsyncReadStreamTest, CannotConnect) {
170   std::shared_ptr<grpc::Channel> channel =
171       grpc::CreateChannel("localhost:1", grpc::InsecureChannelCredentials());
172   std::unique_ptr<google::bigtable::v2::Bigtable::StubInterface> stub =
173       google::bigtable::v2::Bigtable::NewStub(channel);
174 
175   btproto::MutateRowsRequest request;
176   auto context = absl::make_unique<grpc::ClientContext>();
177   HandlerResult result;
178   cq_.MakeStreamingReadRpc(
179       [&stub](grpc::ClientContext* context,
180               btproto::MutateRowsRequest const& request,
181               grpc::CompletionQueue* cq) {
182         return stub->PrepareAsyncMutateRows(context, request, cq);
183       },
184       request, std::move(context),
185       [&result](btproto::MutateRowsResponse r) {
186         result.reads.emplace_back(std::move(r));
187         return make_ready_future(true);
188       },
189       [&result](Status s) {
190         result.status = std::move(s);
191         result.done.Lift();
192       });
193 
194   result.done.Wait();
195   EXPECT_TRUE(result.reads.empty());
196   EXPECT_EQ(StatusCode::kUnavailable, result.status.code());
197 }
198 
199 /// @test Verify that the AsyncReadStream handles an empty stream.
TEST_F(AsyncReadStreamTest,Empty)200 TEST_F(AsyncReadStreamTest, Empty) {
201   btproto::MutateRowsRequest request;
202   auto context = absl::make_unique<grpc::ClientContext>();
203   HandlerResult result;
204   cq_.MakeStreamingReadRpc(
205       [this](grpc::ClientContext* context,
206              btproto::MutateRowsRequest const& request,
207              grpc::CompletionQueue* cq) {
208         return stub_->PrepareAsyncMutateRows(context, request, cq);
209       },
210       request, std::move(context),
211       [&result](btproto::MutateRowsResponse r) {
212         result.reads.emplace_back(std::move(r));
213         return make_ready_future(true);
214       },
215       [&result](Status s) {
216         result.status = std::move(s);
217         result.done.Lift();
218       });
219 
220   result.done.Wait();
221   EXPECT_TRUE(result.reads.empty());
222   EXPECT_STATUS_OK(result.status);
223 }
224 
225 /// @test Verify that the AsyncReadStream handles an error in a empty stream.
TEST_F(AsyncReadStreamTest,FailImmediately)226 TEST_F(AsyncReadStreamTest, FailImmediately) {
227   impl_.SetCallback(
228       [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
229          grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*) {
230         return grpc::Status(grpc::StatusCode::PERMISSION_DENIED, "uh oh");
231       });
232 
233   btproto::MutateRowsRequest request;
234   auto context = absl::make_unique<grpc::ClientContext>();
235   HandlerResult result;
236   cq_.MakeStreamingReadRpc(
237       [this](grpc::ClientContext* context,
238              btproto::MutateRowsRequest const& request,
239              grpc::CompletionQueue* cq) {
240         return stub_->PrepareAsyncMutateRows(context, request, cq);
241       },
242       request, std::move(context),
243       [&result](btproto::MutateRowsResponse r) {
244         result.reads.emplace_back(std::move(r));
245         return make_ready_future(true);
246       },
247       [&result](Status s) {
248         result.status = std::move(s);
249         result.done.Lift();
250       });
251 
252   result.done.Wait();
253   EXPECT_TRUE(result.reads.empty());
254   EXPECT_EQ(StatusCode::kPermissionDenied, result.status.code());
255 }
256 
257 /// @test Verify that the AsyncReadStream handles a stream with 3 elements.
TEST_F(AsyncReadStreamTest,Return3)258 TEST_F(AsyncReadStreamTest, Return3) {
259   impl_.SetCallback(
260       [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
261          grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
262         WriteOne(writer, 0);
263         WriteOne(writer, 1);
264         WriteLast(writer, 2);
265         return grpc::Status::OK;
266       });
267 
268   btproto::MutateRowsRequest request;
269   auto context = absl::make_unique<grpc::ClientContext>();
270   HandlerResult result;
271   cq_.MakeStreamingReadRpc(
272       [this](grpc::ClientContext* context,
273              btproto::MutateRowsRequest const& request,
274              grpc::CompletionQueue* cq) {
275         return stub_->PrepareAsyncMutateRows(context, request, cq);
276       },
277       request, std::move(context),
278       [&result](btproto::MutateRowsResponse r) {
279         result.reads.emplace_back(std::move(r));
280         return make_ready_future(true);
281       },
282       [&result](Status s) {
283         result.status = std::move(s);
284         result.done.Lift();
285       });
286 
287   result.done.Wait();
288   EXPECT_STATUS_OK(result.status);
289   ASSERT_EQ(3U, result.reads.size());
290   for (int i = 0; i != 3; ++i) {
291     SCOPED_TRACE("Running iteration: " + std::to_string(i));
292     ASSERT_EQ(1, result.reads[i].entries_size());
293     EXPECT_EQ(i, result.reads[i].entries(0).index());
294   }
295 }
296 
297 /// @test Verify that the AsyncReadStream detect errors reported by the server.
TEST_F(AsyncReadStreamTest,Return3ThenFail)298 TEST_F(AsyncReadStreamTest, Return3ThenFail) {
299   // Very rarely (in the CI builds, with high load), all 3 responses and the
300   // error message are coalesced into a single message from the server, and then
301   // the OnRead() calls do not happen. We need to explicitly synchronize the
302   // client and server threads.
303   SimpleBarrier server_barrier;
304   impl_.SetCallback(
305       [&server_barrier](
306           grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
307           grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
308               writer) {
309         WriteOne(writer, 0);
310         WriteOne(writer, 1);
311         // Cannot use WriteLast() because that blocks until the status is
312         // returned, and we want to pause in `server_barrier` to ensure all
313         // messages are received.
314         WriteOne(writer, 2);
315         // Block until the client has received the responses.
316         server_barrier.Wait();
317         return grpc::Status(grpc::StatusCode::INTERNAL, "bad luck");
318       });
319 
320   btproto::MutateRowsRequest request;
321   auto context = absl::make_unique<grpc::ClientContext>();
322   HandlerResult result;
323   cq_.MakeStreamingReadRpc(
324       [this](grpc::ClientContext* context,
325              btproto::MutateRowsRequest const& request,
326              grpc::CompletionQueue* cq) {
327         return stub_->PrepareAsyncMutateRows(context, request, cq);
328       },
329       request, std::move(context),
330       [&result, &server_barrier](btproto::MutateRowsResponse r) {
331         result.reads.emplace_back(std::move(r));
332         if (result.reads.size() == 3) {
333           server_barrier.Lift();
334         }
335         return make_ready_future(true);
336       },
337       [&result](Status s) {
338         result.status = std::move(s);
339         result.done.Lift();
340       });
341 
342   result.done.Wait();
343   ASSERT_EQ(3U, result.reads.size());
344   for (int i = 0; i != 3; ++i) {
345     SCOPED_TRACE("Running iteration: " + std::to_string(i));
346     ASSERT_EQ(1, result.reads[i].entries_size());
347     EXPECT_EQ(i, result.reads[i].entries(0).index());
348   }
349   EXPECT_EQ(StatusCode::kInternal, result.status.code());
350 }
351 
352 /// @test Verify that the AsyncReadStream wrappers work even if the server does
353 /// not explicitly signal end-of-stream.
TEST_F(AsyncReadStreamTest,Return3NoLast)354 TEST_F(AsyncReadStreamTest, Return3NoLast) {
355   impl_.SetCallback(
356       [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
357          grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
358         WriteOne(writer, 0);
359         WriteOne(writer, 1);
360         WriteOne(writer, 2);
361         return grpc::Status::OK;
362       });
363 
364   btproto::MutateRowsRequest request;
365   auto context = absl::make_unique<grpc::ClientContext>();
366   HandlerResult result;
367   cq_.MakeStreamingReadRpc(
368       [this](grpc::ClientContext* context,
369              btproto::MutateRowsRequest const& request,
370              grpc::CompletionQueue* cq) {
371         return stub_->PrepareAsyncMutateRows(context, request, cq);
372       },
373       request, std::move(context),
374       [&result](btproto::MutateRowsResponse r) {
375         result.reads.emplace_back(std::move(r));
376         return make_ready_future(true);
377       },
378       [&result](Status s) {
379         result.status = std::move(s);
380         result.done.Lift();
381       });
382 
383   result.done.Wait();
384   ASSERT_EQ(3U, result.reads.size());
385   ASSERT_STATUS_OK(result.status);
386   for (int i = 0; i != 3; ++i) {
387     SCOPED_TRACE("Running iteration: " + std::to_string(i));
388     ASSERT_EQ(1, result.reads[i].entries_size());
389     EXPECT_EQ(i, result.reads[i].entries(0).index());
390   }
391 }
392 
393 /// @test Verify that the AsyncReadStream wrappers work even if the last Read()
394 /// blocks for a bit.
TEST_F(AsyncReadStreamTest,Return3LastIsBlocked)395 TEST_F(AsyncReadStreamTest, Return3LastIsBlocked) {
396   SimpleBarrier client_barrier;
397   SimpleBarrier server_barrier;
398   impl_.SetCallback(
399       [&client_barrier, &server_barrier](
400           grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
401           grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
402               writer) {
403         WriteOne(writer, 0);
404         WriteOne(writer, 1);
405         WriteOne(writer, 2);
406         client_barrier.Lift();
407         server_barrier.Wait();
408         return grpc::Status::OK;
409       });
410 
411   HandlerResult result;
412   auto on_read = [&client_barrier, &server_barrier,
413                   &result](btproto::MutateRowsResponse r) {
414     result.reads.emplace_back(std::move(r));
415     if (result.reads.size() == 3) {
416       client_barrier.Wait();
417       server_barrier.Lift();
418     }
419     return make_ready_future(true);
420   };
421 
422   btproto::MutateRowsRequest request;
423   auto context = absl::make_unique<grpc::ClientContext>();
424   cq_.MakeStreamingReadRpc(
425       [this](grpc::ClientContext* context,
426              btproto::MutateRowsRequest const& request,
427              grpc::CompletionQueue* cq) {
428         return stub_->PrepareAsyncMutateRows(context, request, cq);
429       },
430       request, std::move(context), on_read,
431       [&result](Status s) {
432         result.status = std::move(s);
433         result.done.Lift();
434       });
435 
436   result.done.Wait();
437   ASSERT_EQ(3U, result.reads.size());
438   ASSERT_STATUS_OK(result.status);
439   for (int i = 0; i != 3; ++i) {
440     SCOPED_TRACE("Running iteration: " + std::to_string(i));
441     ASSERT_EQ(1, result.reads[i].entries_size());
442     EXPECT_EQ(i, result.reads[i].entries(0).index());
443   }
444 }
445 
446 /// @test Verify that AsyncReadStream::Cancel() works in the middle of a Read().
TEST_F(AsyncReadStreamTest,CancelWhileBlocked)447 TEST_F(AsyncReadStreamTest, CancelWhileBlocked) {
448   SimpleBarrier client_barrier;
449   SimpleBarrier server_barrier;
450   impl_.SetCallback(
451       [&client_barrier, &server_barrier](
452           grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
453           grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
454               writer) {
455         WriteOne(writer, 0);
456         WriteOne(writer, 1);
457         client_barrier.Lift();
458         server_barrier.Wait();
459         WriteOne(writer, 2);
460         return grpc::Status::OK;
461       });
462 
463   HandlerResult result;
464   auto on_read = [&client_barrier, &result](btproto::MutateRowsResponse r) {
465     result.reads.emplace_back(std::move(r));
466     if (result.reads.size() == 2) {
467       client_barrier.Wait();
468       return make_ready_future(false);
469     }
470     return make_ready_future(true);
471   };
472 
473   btproto::MutateRowsRequest request;
474   auto context = absl::make_unique<grpc::ClientContext>();
475   cq_.MakeStreamingReadRpc(
476       [this](grpc::ClientContext* context,
477              btproto::MutateRowsRequest const& request,
478              grpc::CompletionQueue* cq) {
479         return stub_->PrepareAsyncMutateRows(context, request, cq);
480       },
481       request, std::move(context), on_read,
482       [&result](Status s) {
483         result.status = std::move(s);
484         result.done.Lift();
485       });
486 
487   // The server remains blocked until the stream finishes, therefore, the only
488   // way this actually unblocks is if the Cancel() succeeds.
489   result.done.Wait();
490   ASSERT_EQ(2U, result.reads.size());
491   EXPECT_EQ(StatusCode::kCancelled, result.status.code());
492   for (int i = 0; i != 2; ++i) {
493     SCOPED_TRACE("Running iteration: " + std::to_string(i));
494     ASSERT_EQ(1, result.reads[i].entries_size());
495     EXPECT_EQ(i, result.reads[i].entries(0).index());
496   }
497 
498   // The barriers go out of scope when this function exits, but the server may
499   // still be using them, so wait for the server to shutdown before leaving the
500   // scope.
501   server_barrier.Lift();
502   WaitForServerShutdown();
503 }
504 
505 /// @test Verify that AsyncReadStream works when one calls Cancel() more than
506 /// once.
TEST_F(AsyncReadStreamTest,DoubleCancel)507 TEST_F(AsyncReadStreamTest, DoubleCancel) {
508   SimpleBarrier server_sent_responses_barrier;
509   SimpleBarrier cancel_done_server_barrier;
510   impl_.SetCallback(
511       [&server_sent_responses_barrier, &cancel_done_server_barrier](
512           grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
513           grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
514               writer) {
515         WriteOne(writer, 0);
516         WriteOne(writer, 1);
517         server_sent_responses_barrier.Lift();
518         cancel_done_server_barrier.Wait();
519         WriteOne(writer, 2);
520         return grpc::Status::OK;
521       });
522 
523   HandlerResult result;
524   SimpleBarrier read_received_barrier;
525   SimpleBarrier cancel_done_read_barrier;
526   auto on_read = [&read_received_barrier, &cancel_done_read_barrier,
527                   &result](btproto::MutateRowsResponse r) {
528     result.reads.emplace_back(std::move(r));
529     if (result.reads.size() == 2) {
530       read_received_barrier.Lift();
531       cancel_done_read_barrier.Wait();
532     }
533     return make_ready_future(true);
534   };
535 
536   btproto::MutateRowsRequest request;
537   auto context = absl::make_unique<grpc::ClientContext>();
538   auto op = cq_.MakeStreamingReadRpc(
539       [this](grpc::ClientContext* context,
540              btproto::MutateRowsRequest const& request,
541              grpc::CompletionQueue* cq) {
542         return stub_->PrepareAsyncMutateRows(context, request, cq);
543       },
544       request, std::move(context), on_read,
545       [&result](Status s) {
546         result.status = std::move(s);
547         result.done.Lift();
548       });
549 
550   server_sent_responses_barrier.Wait();
551   read_received_barrier.Wait();
552   op->Cancel();
553   op->Cancel();
554   cancel_done_server_barrier.Lift();
555   cancel_done_read_barrier.Lift();
556 
557   // The server remains blocked until the stream finishes, therefore, the only
558   // way this actually unblocks is if the Cancel() succeeds.
559   result.done.Wait();
560   ASSERT_EQ(2U, result.reads.size());
561   EXPECT_EQ(StatusCode::kCancelled, result.status.code());
562   for (int i = 0; i != 2; ++i) {
563     SCOPED_TRACE("Running iteration: " + std::to_string(i));
564     ASSERT_EQ(1, result.reads[i].entries_size());
565     EXPECT_EQ(i, result.reads[i].entries(0).index());
566   }
567 
568   // The barriers go out of scope when this function exits, but the server may
569   // still be using them, so wait for the server to shutdown before leaving the
570   // scope.
571   WaitForServerShutdown();
572 }
573 
574 /// @test Verify that AsyncReadStream works when one Cancels() before reading.
TEST_F(AsyncReadStreamTest,CancelBeforeRead)575 TEST_F(AsyncReadStreamTest, CancelBeforeRead) {
576   SimpleBarrier server_started_barrier;
577   SimpleBarrier cancel_done_server_barrier;
578   impl_.SetCallback(
579       [&server_started_barrier, &cancel_done_server_barrier](
580           grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
581           grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
582               writer) {
583         server_started_barrier.Lift();
584         WriteOne(writer, 0);
585         WriteOne(writer, 1);
586         WriteOne(writer, 2);
587         cancel_done_server_barrier.Wait();
588         return grpc::Status::OK;
589       });
590 
591   HandlerResult result;
592   btproto::MutateRowsRequest request;
593   auto context = absl::make_unique<grpc::ClientContext>();
594   auto op = cq_.MakeStreamingReadRpc(
595       [this](grpc::ClientContext* context,
596              btproto::MutateRowsRequest const& request,
597              grpc::CompletionQueue* cq) {
598         return stub_->PrepareAsyncMutateRows(context, request, cq);
599       },
600       request, std::move(context),
601       [&result](btproto::MutateRowsResponse r) {
602         result.reads.emplace_back(std::move(r));
603         return make_ready_future(true);
604       },
605       [&result](Status s) {
606         result.status = std::move(s);
607         result.done.Lift();
608       });
609 
610   server_started_barrier.Wait();
611   op->Cancel();
612 
613   // The server remains blocked until the stream finishes, therefore, the only
614   // way this actually unblocks is if the Cancel() succeeds.
615   result.done.Wait();
616   // There is no guarantee on how many messages will be received before the
617   // cancel succeeds, but we certainly expect fewer messages than we sent.
618   EXPECT_LE(result.reads.size(), 3U);
619   EXPECT_EQ(StatusCode::kCancelled, result.status.code());
620 
621   // The barriers go out of scope when this function exits, but the server may
622   // still be using them, so wait for the server to shutdown before leaving the
623   // scope.
624   cancel_done_server_barrier.Lift();
625   WaitForServerShutdown();
626 }
627 
628 /// @test Verify that AsyncReadStream works even if Cancel() is misused.
TEST_F(AsyncReadStreamTest,CancelAfterFinish)629 TEST_F(AsyncReadStreamTest, CancelAfterFinish) {
630   impl_.SetCallback(
631       [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
632          grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
633         WriteOne(writer, 0);
634         WriteOne(writer, 1);
635         WriteLast(writer, 2);
636         return grpc::Status::OK;
637       });
638 
639   btproto::MutateRowsRequest request;
640   auto context = absl::make_unique<grpc::ClientContext>();
641   HandlerResult result;
642   SimpleBarrier on_finish_stop_before_cancel;
643   SimpleBarrier on_finish_continue_after_cancel;
644   auto on_finish = [&result, &on_finish_stop_before_cancel,
645                     &on_finish_continue_after_cancel](Status s) {
646     result.status = std::move(s);
647     on_finish_stop_before_cancel.Lift();
648     on_finish_continue_after_cancel.Wait();
649     result.done.Lift();
650   };
651   auto op = cq_.MakeStreamingReadRpc(
652       [this](grpc::ClientContext* context,
653              btproto::MutateRowsRequest const& request,
654              grpc::CompletionQueue* cq) {
655         return stub_->PrepareAsyncMutateRows(context, request, cq);
656       },
657       request, std::move(context),
658       [&result](btproto::MutateRowsResponse r) {
659         result.reads.emplace_back(std::move(r));
660         return make_ready_future(true);
661       },
662       on_finish);
663 
664   // Call Cancel() while the on_finish() callback is running.
665   on_finish_stop_before_cancel.Wait();
666   op->Cancel();
667   on_finish_continue_after_cancel.Lift();
668 
669   result.done.Wait();
670   EXPECT_STATUS_OK(result.status);
671   ASSERT_EQ(3U, result.reads.size());
672   for (int i = 0; i != 3; ++i) {
673     SCOPED_TRACE("Running iteration: " + std::to_string(i));
674     ASSERT_EQ(1, result.reads[i].entries_size());
675     EXPECT_EQ(i, result.reads[i].entries(0).index());
676   }
677 }
678 
679 /// @test Verify that AsyncReadStream works when returning false from OnRead().
TEST_F(AsyncReadStreamTest,DiscardAfterReturningFalse)680 TEST_F(AsyncReadStreamTest, DiscardAfterReturningFalse) {
681   impl_.SetCallback(
682       [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
683          grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
684         for (int i = 0; i != 10; ++i) {
685           WriteOne(writer, i);
686         }
687         WriteLast(writer, 10);
688         return grpc::Status::OK;
689       });
690 
691   btproto::MutateRowsRequest request;
692   auto context = absl::make_unique<grpc::ClientContext>();
693   HandlerResult result;
694   auto op = cq_.MakeStreamingReadRpc(
695       [this](grpc::ClientContext* context,
696              btproto::MutateRowsRequest const& request,
697              grpc::CompletionQueue* cq) {
698         return stub_->PrepareAsyncMutateRows(context, request, cq);
699       },
700       request, std::move(context),
701       [&result](btproto::MutateRowsResponse r) {
702         result.reads.emplace_back(std::move(r));
703         // Cancel on *every* request, we do not expect additional calls after
704         // the first one.
705         return make_ready_future(false);
706       },
707       [&result](Status s) {
708         result.status = std::move(s);
709         result.done.Lift();
710       });
711 
712   result.done.Wait();
713   ASSERT_EQ(StatusCode::kCancelled, result.status.code());
714   ASSERT_EQ(1U, result.reads.size());
715   EXPECT_EQ(1, result.reads[0].entries_size());
716   EXPECT_EQ(0, result.reads[0].entries(0).index());
717 }
718 
719 }  // namespace
720 }  // namespace BIGTABLE_CLIENT_NS
721 }  // namespace bigtable
722 }  // namespace cloud
723 }  // namespace google
724