1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigtable/completion_queue.h"
16 #include "google/cloud/bigtable/version.h"
17 #include "google/cloud/testing_util/assert_ok.h"
18 #include "absl/memory/memory.h"
19 #include <google/bigtable/v2/bigtable.grpc.pb.h>
20 #include <gmock/gmock.h>
21 #include <future>
22
23 namespace google {
24 namespace cloud {
25 namespace bigtable {
26 inline namespace BIGTABLE_CLIENT_NS {
27 namespace {
28
29 namespace btproto = google::bigtable::v2;
30
31 /**
32 * Implement a single streaming read RPC to test the wrappers.
33 */
34 class BulkApplyImpl final : public google::bigtable::v2::Bigtable::Service {
35 public:
36 BulkApplyImpl() = default;
37
MutateRows(grpc::ServerContext * context,google::bigtable::v2::MutateRowsRequest const * request,grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer)38 grpc::Status MutateRows(
39 grpc::ServerContext* context,
40 google::bigtable::v2::MutateRowsRequest const* request,
41 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer)
42 override {
43 std::unique_lock<std::mutex> lk(mu_);
44 if (!callback_) {
45 return grpc::Status::OK;
46 }
47 Callback cb;
48 cb.swap(callback_);
49 lk.unlock();
50 return cb(context, request, writer);
51 }
52
53 using Callback = std::function<grpc::Status(
54 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
55 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*)>;
56
SetCallback(Callback callback)57 void SetCallback(Callback callback) {
58 std::unique_lock<std::mutex> lk(mu_);
59 callback_ = std::move(callback);
60 }
61
62 private:
63 std::mutex mu_;
64 Callback callback_;
65 };
66
67 /**
68 * This test starts a server in a separate thread, and then executes against
69 * that server. We want to test the wrappers end-to-end, particularly with
70 * respect to error handling, and cancellation.
71 */
72 class AsyncReadStreamTest : public ::testing::Test {
73 protected:
SetUp()74 void SetUp() override {
75 int port;
76 std::string server_address("[::]:0");
77 builder_.AddListeningPort(server_address, grpc::InsecureServerCredentials(),
78 &port);
79 builder_.RegisterService(&impl_);
80 server_ = builder_.BuildAndStart();
81 server_thread_ = std::thread([this]() { server_->Wait(); });
82
83 std::shared_ptr<grpc::Channel> channel =
84 grpc::CreateChannel("localhost:" + std::to_string(port),
85 grpc::InsecureChannelCredentials());
86 stub_ = google::bigtable::v2::Bigtable::NewStub(channel);
87
88 cq_thread_ = std::thread([this] { cq_.Run(); });
89 }
90
TearDown()91 void TearDown() override {
92 cq_.Shutdown();
93 if (cq_thread_.joinable()) {
94 cq_thread_.join();
95 }
96 WaitForServerShutdown();
97 }
98
WaitForServerShutdown()99 void WaitForServerShutdown() {
100 server_->Shutdown();
101 if (server_thread_.joinable()) {
102 server_thread_.join();
103 }
104 }
105
WriteOne(grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer,int index)106 static void WriteOne(
107 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer,
108 int index) {
109 google::bigtable::v2::MutateRowsResponse response;
110 response.add_entries()->set_index(index);
111 writer->Write(response, grpc::WriteOptions().set_write_through());
112 }
113
WriteLast(grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse> * writer,int index)114 static void WriteLast(
115 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer,
116 int index) {
117 google::bigtable::v2::MutateRowsResponse response;
118 response.add_entries()->set_index(index);
119 writer->Write(response,
120 grpc::WriteOptions().set_write_through().set_last_message());
121 }
122
123 BulkApplyImpl impl_;
124 grpc::ServerBuilder builder_;
125 std::unique_ptr<grpc::Server> server_;
126 std::thread server_thread_;
127 std::unique_ptr<google::bigtable::v2::Bigtable::StubInterface> stub_;
128
129 CompletionQueue cq_;
130 std::thread cq_thread_;
131 };
132
133 // A synchronization primitive to block a thread until it is allowed to
134 // continue.
135 class SimpleBarrier {
136 public:
137 SimpleBarrier() = default;
138
Wait()139 void Wait() { impl_.get_future().get(); }
Lift()140 void Lift() { impl_.set_value(); }
141
142 private:
143 promise<void> impl_;
144 };
145
146 struct HandlerResult {
147 std::vector<btproto::MutateRowsResponse> reads;
148 Status status;
149 SimpleBarrier done;
150 };
151
152 /// @test Verify that completion queues correctly validate asynchronous
153 /// streaming read RPC callables.
TEST_F(AsyncReadStreamTest,MetaFunctions)154 TEST_F(AsyncReadStreamTest, MetaFunctions) {
155 auto async_call = [this](grpc::ClientContext* context,
156 btproto::MutateRowsRequest const& request,
157 grpc::CompletionQueue* cq) {
158 return stub_->PrepareAsyncMutateRows(context, request, cq);
159 };
160 static_assert(
161 std::is_same<
162 btproto::MutateRowsResponse,
163 google::cloud::internal::AsyncStreamingReadResponseType<
164 decltype(async_call), btproto::MutateRowsRequest>::type>::value,
165 "Unexpected type for AsyncStreamingReadResponseType<>");
166 }
167
168 /// @test Verify that AsyncReadStream works even if the server does not exist.
TEST_F(AsyncReadStreamTest,CannotConnect)169 TEST_F(AsyncReadStreamTest, CannotConnect) {
170 std::shared_ptr<grpc::Channel> channel =
171 grpc::CreateChannel("localhost:1", grpc::InsecureChannelCredentials());
172 std::unique_ptr<google::bigtable::v2::Bigtable::StubInterface> stub =
173 google::bigtable::v2::Bigtable::NewStub(channel);
174
175 btproto::MutateRowsRequest request;
176 auto context = absl::make_unique<grpc::ClientContext>();
177 HandlerResult result;
178 cq_.MakeStreamingReadRpc(
179 [&stub](grpc::ClientContext* context,
180 btproto::MutateRowsRequest const& request,
181 grpc::CompletionQueue* cq) {
182 return stub->PrepareAsyncMutateRows(context, request, cq);
183 },
184 request, std::move(context),
185 [&result](btproto::MutateRowsResponse r) {
186 result.reads.emplace_back(std::move(r));
187 return make_ready_future(true);
188 },
189 [&result](Status s) {
190 result.status = std::move(s);
191 result.done.Lift();
192 });
193
194 result.done.Wait();
195 EXPECT_TRUE(result.reads.empty());
196 EXPECT_EQ(StatusCode::kUnavailable, result.status.code());
197 }
198
199 /// @test Verify that the AsyncReadStream handles an empty stream.
TEST_F(AsyncReadStreamTest,Empty)200 TEST_F(AsyncReadStreamTest, Empty) {
201 btproto::MutateRowsRequest request;
202 auto context = absl::make_unique<grpc::ClientContext>();
203 HandlerResult result;
204 cq_.MakeStreamingReadRpc(
205 [this](grpc::ClientContext* context,
206 btproto::MutateRowsRequest const& request,
207 grpc::CompletionQueue* cq) {
208 return stub_->PrepareAsyncMutateRows(context, request, cq);
209 },
210 request, std::move(context),
211 [&result](btproto::MutateRowsResponse r) {
212 result.reads.emplace_back(std::move(r));
213 return make_ready_future(true);
214 },
215 [&result](Status s) {
216 result.status = std::move(s);
217 result.done.Lift();
218 });
219
220 result.done.Wait();
221 EXPECT_TRUE(result.reads.empty());
222 EXPECT_STATUS_OK(result.status);
223 }
224
225 /// @test Verify that the AsyncReadStream handles an error in a empty stream.
TEST_F(AsyncReadStreamTest,FailImmediately)226 TEST_F(AsyncReadStreamTest, FailImmediately) {
227 impl_.SetCallback(
228 [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
229 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*) {
230 return grpc::Status(grpc::StatusCode::PERMISSION_DENIED, "uh oh");
231 });
232
233 btproto::MutateRowsRequest request;
234 auto context = absl::make_unique<grpc::ClientContext>();
235 HandlerResult result;
236 cq_.MakeStreamingReadRpc(
237 [this](grpc::ClientContext* context,
238 btproto::MutateRowsRequest const& request,
239 grpc::CompletionQueue* cq) {
240 return stub_->PrepareAsyncMutateRows(context, request, cq);
241 },
242 request, std::move(context),
243 [&result](btproto::MutateRowsResponse r) {
244 result.reads.emplace_back(std::move(r));
245 return make_ready_future(true);
246 },
247 [&result](Status s) {
248 result.status = std::move(s);
249 result.done.Lift();
250 });
251
252 result.done.Wait();
253 EXPECT_TRUE(result.reads.empty());
254 EXPECT_EQ(StatusCode::kPermissionDenied, result.status.code());
255 }
256
257 /// @test Verify that the AsyncReadStream handles a stream with 3 elements.
TEST_F(AsyncReadStreamTest,Return3)258 TEST_F(AsyncReadStreamTest, Return3) {
259 impl_.SetCallback(
260 [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
261 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
262 WriteOne(writer, 0);
263 WriteOne(writer, 1);
264 WriteLast(writer, 2);
265 return grpc::Status::OK;
266 });
267
268 btproto::MutateRowsRequest request;
269 auto context = absl::make_unique<grpc::ClientContext>();
270 HandlerResult result;
271 cq_.MakeStreamingReadRpc(
272 [this](grpc::ClientContext* context,
273 btproto::MutateRowsRequest const& request,
274 grpc::CompletionQueue* cq) {
275 return stub_->PrepareAsyncMutateRows(context, request, cq);
276 },
277 request, std::move(context),
278 [&result](btproto::MutateRowsResponse r) {
279 result.reads.emplace_back(std::move(r));
280 return make_ready_future(true);
281 },
282 [&result](Status s) {
283 result.status = std::move(s);
284 result.done.Lift();
285 });
286
287 result.done.Wait();
288 EXPECT_STATUS_OK(result.status);
289 ASSERT_EQ(3U, result.reads.size());
290 for (int i = 0; i != 3; ++i) {
291 SCOPED_TRACE("Running iteration: " + std::to_string(i));
292 ASSERT_EQ(1, result.reads[i].entries_size());
293 EXPECT_EQ(i, result.reads[i].entries(0).index());
294 }
295 }
296
297 /// @test Verify that the AsyncReadStream detect errors reported by the server.
TEST_F(AsyncReadStreamTest,Return3ThenFail)298 TEST_F(AsyncReadStreamTest, Return3ThenFail) {
299 // Very rarely (in the CI builds, with high load), all 3 responses and the
300 // error message are coalesced into a single message from the server, and then
301 // the OnRead() calls do not happen. We need to explicitly synchronize the
302 // client and server threads.
303 SimpleBarrier server_barrier;
304 impl_.SetCallback(
305 [&server_barrier](
306 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
307 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
308 writer) {
309 WriteOne(writer, 0);
310 WriteOne(writer, 1);
311 // Cannot use WriteLast() because that blocks until the status is
312 // returned, and we want to pause in `server_barrier` to ensure all
313 // messages are received.
314 WriteOne(writer, 2);
315 // Block until the client has received the responses.
316 server_barrier.Wait();
317 return grpc::Status(grpc::StatusCode::INTERNAL, "bad luck");
318 });
319
320 btproto::MutateRowsRequest request;
321 auto context = absl::make_unique<grpc::ClientContext>();
322 HandlerResult result;
323 cq_.MakeStreamingReadRpc(
324 [this](grpc::ClientContext* context,
325 btproto::MutateRowsRequest const& request,
326 grpc::CompletionQueue* cq) {
327 return stub_->PrepareAsyncMutateRows(context, request, cq);
328 },
329 request, std::move(context),
330 [&result, &server_barrier](btproto::MutateRowsResponse r) {
331 result.reads.emplace_back(std::move(r));
332 if (result.reads.size() == 3) {
333 server_barrier.Lift();
334 }
335 return make_ready_future(true);
336 },
337 [&result](Status s) {
338 result.status = std::move(s);
339 result.done.Lift();
340 });
341
342 result.done.Wait();
343 ASSERT_EQ(3U, result.reads.size());
344 for (int i = 0; i != 3; ++i) {
345 SCOPED_TRACE("Running iteration: " + std::to_string(i));
346 ASSERT_EQ(1, result.reads[i].entries_size());
347 EXPECT_EQ(i, result.reads[i].entries(0).index());
348 }
349 EXPECT_EQ(StatusCode::kInternal, result.status.code());
350 }
351
352 /// @test Verify that the AsyncReadStream wrappers work even if the server does
353 /// not explicitly signal end-of-stream.
TEST_F(AsyncReadStreamTest,Return3NoLast)354 TEST_F(AsyncReadStreamTest, Return3NoLast) {
355 impl_.SetCallback(
356 [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
357 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
358 WriteOne(writer, 0);
359 WriteOne(writer, 1);
360 WriteOne(writer, 2);
361 return grpc::Status::OK;
362 });
363
364 btproto::MutateRowsRequest request;
365 auto context = absl::make_unique<grpc::ClientContext>();
366 HandlerResult result;
367 cq_.MakeStreamingReadRpc(
368 [this](grpc::ClientContext* context,
369 btproto::MutateRowsRequest const& request,
370 grpc::CompletionQueue* cq) {
371 return stub_->PrepareAsyncMutateRows(context, request, cq);
372 },
373 request, std::move(context),
374 [&result](btproto::MutateRowsResponse r) {
375 result.reads.emplace_back(std::move(r));
376 return make_ready_future(true);
377 },
378 [&result](Status s) {
379 result.status = std::move(s);
380 result.done.Lift();
381 });
382
383 result.done.Wait();
384 ASSERT_EQ(3U, result.reads.size());
385 ASSERT_STATUS_OK(result.status);
386 for (int i = 0; i != 3; ++i) {
387 SCOPED_TRACE("Running iteration: " + std::to_string(i));
388 ASSERT_EQ(1, result.reads[i].entries_size());
389 EXPECT_EQ(i, result.reads[i].entries(0).index());
390 }
391 }
392
393 /// @test Verify that the AsyncReadStream wrappers work even if the last Read()
394 /// blocks for a bit.
TEST_F(AsyncReadStreamTest,Return3LastIsBlocked)395 TEST_F(AsyncReadStreamTest, Return3LastIsBlocked) {
396 SimpleBarrier client_barrier;
397 SimpleBarrier server_barrier;
398 impl_.SetCallback(
399 [&client_barrier, &server_barrier](
400 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
401 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
402 writer) {
403 WriteOne(writer, 0);
404 WriteOne(writer, 1);
405 WriteOne(writer, 2);
406 client_barrier.Lift();
407 server_barrier.Wait();
408 return grpc::Status::OK;
409 });
410
411 HandlerResult result;
412 auto on_read = [&client_barrier, &server_barrier,
413 &result](btproto::MutateRowsResponse r) {
414 result.reads.emplace_back(std::move(r));
415 if (result.reads.size() == 3) {
416 client_barrier.Wait();
417 server_barrier.Lift();
418 }
419 return make_ready_future(true);
420 };
421
422 btproto::MutateRowsRequest request;
423 auto context = absl::make_unique<grpc::ClientContext>();
424 cq_.MakeStreamingReadRpc(
425 [this](grpc::ClientContext* context,
426 btproto::MutateRowsRequest const& request,
427 grpc::CompletionQueue* cq) {
428 return stub_->PrepareAsyncMutateRows(context, request, cq);
429 },
430 request, std::move(context), on_read,
431 [&result](Status s) {
432 result.status = std::move(s);
433 result.done.Lift();
434 });
435
436 result.done.Wait();
437 ASSERT_EQ(3U, result.reads.size());
438 ASSERT_STATUS_OK(result.status);
439 for (int i = 0; i != 3; ++i) {
440 SCOPED_TRACE("Running iteration: " + std::to_string(i));
441 ASSERT_EQ(1, result.reads[i].entries_size());
442 EXPECT_EQ(i, result.reads[i].entries(0).index());
443 }
444 }
445
446 /// @test Verify that AsyncReadStream::Cancel() works in the middle of a Read().
TEST_F(AsyncReadStreamTest,CancelWhileBlocked)447 TEST_F(AsyncReadStreamTest, CancelWhileBlocked) {
448 SimpleBarrier client_barrier;
449 SimpleBarrier server_barrier;
450 impl_.SetCallback(
451 [&client_barrier, &server_barrier](
452 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
453 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
454 writer) {
455 WriteOne(writer, 0);
456 WriteOne(writer, 1);
457 client_barrier.Lift();
458 server_barrier.Wait();
459 WriteOne(writer, 2);
460 return grpc::Status::OK;
461 });
462
463 HandlerResult result;
464 auto on_read = [&client_barrier, &result](btproto::MutateRowsResponse r) {
465 result.reads.emplace_back(std::move(r));
466 if (result.reads.size() == 2) {
467 client_barrier.Wait();
468 return make_ready_future(false);
469 }
470 return make_ready_future(true);
471 };
472
473 btproto::MutateRowsRequest request;
474 auto context = absl::make_unique<grpc::ClientContext>();
475 cq_.MakeStreamingReadRpc(
476 [this](grpc::ClientContext* context,
477 btproto::MutateRowsRequest const& request,
478 grpc::CompletionQueue* cq) {
479 return stub_->PrepareAsyncMutateRows(context, request, cq);
480 },
481 request, std::move(context), on_read,
482 [&result](Status s) {
483 result.status = std::move(s);
484 result.done.Lift();
485 });
486
487 // The server remains blocked until the stream finishes, therefore, the only
488 // way this actually unblocks is if the Cancel() succeeds.
489 result.done.Wait();
490 ASSERT_EQ(2U, result.reads.size());
491 EXPECT_EQ(StatusCode::kCancelled, result.status.code());
492 for (int i = 0; i != 2; ++i) {
493 SCOPED_TRACE("Running iteration: " + std::to_string(i));
494 ASSERT_EQ(1, result.reads[i].entries_size());
495 EXPECT_EQ(i, result.reads[i].entries(0).index());
496 }
497
498 // The barriers go out of scope when this function exits, but the server may
499 // still be using them, so wait for the server to shutdown before leaving the
500 // scope.
501 server_barrier.Lift();
502 WaitForServerShutdown();
503 }
504
505 /// @test Verify that AsyncReadStream works when one calls Cancel() more than
506 /// once.
TEST_F(AsyncReadStreamTest,DoubleCancel)507 TEST_F(AsyncReadStreamTest, DoubleCancel) {
508 SimpleBarrier server_sent_responses_barrier;
509 SimpleBarrier cancel_done_server_barrier;
510 impl_.SetCallback(
511 [&server_sent_responses_barrier, &cancel_done_server_barrier](
512 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
513 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
514 writer) {
515 WriteOne(writer, 0);
516 WriteOne(writer, 1);
517 server_sent_responses_barrier.Lift();
518 cancel_done_server_barrier.Wait();
519 WriteOne(writer, 2);
520 return grpc::Status::OK;
521 });
522
523 HandlerResult result;
524 SimpleBarrier read_received_barrier;
525 SimpleBarrier cancel_done_read_barrier;
526 auto on_read = [&read_received_barrier, &cancel_done_read_barrier,
527 &result](btproto::MutateRowsResponse r) {
528 result.reads.emplace_back(std::move(r));
529 if (result.reads.size() == 2) {
530 read_received_barrier.Lift();
531 cancel_done_read_barrier.Wait();
532 }
533 return make_ready_future(true);
534 };
535
536 btproto::MutateRowsRequest request;
537 auto context = absl::make_unique<grpc::ClientContext>();
538 auto op = cq_.MakeStreamingReadRpc(
539 [this](grpc::ClientContext* context,
540 btproto::MutateRowsRequest const& request,
541 grpc::CompletionQueue* cq) {
542 return stub_->PrepareAsyncMutateRows(context, request, cq);
543 },
544 request, std::move(context), on_read,
545 [&result](Status s) {
546 result.status = std::move(s);
547 result.done.Lift();
548 });
549
550 server_sent_responses_barrier.Wait();
551 read_received_barrier.Wait();
552 op->Cancel();
553 op->Cancel();
554 cancel_done_server_barrier.Lift();
555 cancel_done_read_barrier.Lift();
556
557 // The server remains blocked until the stream finishes, therefore, the only
558 // way this actually unblocks is if the Cancel() succeeds.
559 result.done.Wait();
560 ASSERT_EQ(2U, result.reads.size());
561 EXPECT_EQ(StatusCode::kCancelled, result.status.code());
562 for (int i = 0; i != 2; ++i) {
563 SCOPED_TRACE("Running iteration: " + std::to_string(i));
564 ASSERT_EQ(1, result.reads[i].entries_size());
565 EXPECT_EQ(i, result.reads[i].entries(0).index());
566 }
567
568 // The barriers go out of scope when this function exits, but the server may
569 // still be using them, so wait for the server to shutdown before leaving the
570 // scope.
571 WaitForServerShutdown();
572 }
573
574 /// @test Verify that AsyncReadStream works when one Cancels() before reading.
TEST_F(AsyncReadStreamTest,CancelBeforeRead)575 TEST_F(AsyncReadStreamTest, CancelBeforeRead) {
576 SimpleBarrier server_started_barrier;
577 SimpleBarrier cancel_done_server_barrier;
578 impl_.SetCallback(
579 [&server_started_barrier, &cancel_done_server_barrier](
580 grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
581 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>*
582 writer) {
583 server_started_barrier.Lift();
584 WriteOne(writer, 0);
585 WriteOne(writer, 1);
586 WriteOne(writer, 2);
587 cancel_done_server_barrier.Wait();
588 return grpc::Status::OK;
589 });
590
591 HandlerResult result;
592 btproto::MutateRowsRequest request;
593 auto context = absl::make_unique<grpc::ClientContext>();
594 auto op = cq_.MakeStreamingReadRpc(
595 [this](grpc::ClientContext* context,
596 btproto::MutateRowsRequest const& request,
597 grpc::CompletionQueue* cq) {
598 return stub_->PrepareAsyncMutateRows(context, request, cq);
599 },
600 request, std::move(context),
601 [&result](btproto::MutateRowsResponse r) {
602 result.reads.emplace_back(std::move(r));
603 return make_ready_future(true);
604 },
605 [&result](Status s) {
606 result.status = std::move(s);
607 result.done.Lift();
608 });
609
610 server_started_barrier.Wait();
611 op->Cancel();
612
613 // The server remains blocked until the stream finishes, therefore, the only
614 // way this actually unblocks is if the Cancel() succeeds.
615 result.done.Wait();
616 // There is no guarantee on how many messages will be received before the
617 // cancel succeeds, but we certainly expect fewer messages than we sent.
618 EXPECT_LE(result.reads.size(), 3U);
619 EXPECT_EQ(StatusCode::kCancelled, result.status.code());
620
621 // The barriers go out of scope when this function exits, but the server may
622 // still be using them, so wait for the server to shutdown before leaving the
623 // scope.
624 cancel_done_server_barrier.Lift();
625 WaitForServerShutdown();
626 }
627
628 /// @test Verify that AsyncReadStream works even if Cancel() is misused.
TEST_F(AsyncReadStreamTest,CancelAfterFinish)629 TEST_F(AsyncReadStreamTest, CancelAfterFinish) {
630 impl_.SetCallback(
631 [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
632 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
633 WriteOne(writer, 0);
634 WriteOne(writer, 1);
635 WriteLast(writer, 2);
636 return grpc::Status::OK;
637 });
638
639 btproto::MutateRowsRequest request;
640 auto context = absl::make_unique<grpc::ClientContext>();
641 HandlerResult result;
642 SimpleBarrier on_finish_stop_before_cancel;
643 SimpleBarrier on_finish_continue_after_cancel;
644 auto on_finish = [&result, &on_finish_stop_before_cancel,
645 &on_finish_continue_after_cancel](Status s) {
646 result.status = std::move(s);
647 on_finish_stop_before_cancel.Lift();
648 on_finish_continue_after_cancel.Wait();
649 result.done.Lift();
650 };
651 auto op = cq_.MakeStreamingReadRpc(
652 [this](grpc::ClientContext* context,
653 btproto::MutateRowsRequest const& request,
654 grpc::CompletionQueue* cq) {
655 return stub_->PrepareAsyncMutateRows(context, request, cq);
656 },
657 request, std::move(context),
658 [&result](btproto::MutateRowsResponse r) {
659 result.reads.emplace_back(std::move(r));
660 return make_ready_future(true);
661 },
662 on_finish);
663
664 // Call Cancel() while the on_finish() callback is running.
665 on_finish_stop_before_cancel.Wait();
666 op->Cancel();
667 on_finish_continue_after_cancel.Lift();
668
669 result.done.Wait();
670 EXPECT_STATUS_OK(result.status);
671 ASSERT_EQ(3U, result.reads.size());
672 for (int i = 0; i != 3; ++i) {
673 SCOPED_TRACE("Running iteration: " + std::to_string(i));
674 ASSERT_EQ(1, result.reads[i].entries_size());
675 EXPECT_EQ(i, result.reads[i].entries(0).index());
676 }
677 }
678
679 /// @test Verify that AsyncReadStream works when returning false from OnRead().
TEST_F(AsyncReadStreamTest,DiscardAfterReturningFalse)680 TEST_F(AsyncReadStreamTest, DiscardAfterReturningFalse) {
681 impl_.SetCallback(
682 [](grpc::ServerContext*, google::bigtable::v2::MutateRowsRequest const*,
683 grpc::ServerWriter<google::bigtable::v2::MutateRowsResponse>* writer) {
684 for (int i = 0; i != 10; ++i) {
685 WriteOne(writer, i);
686 }
687 WriteLast(writer, 10);
688 return grpc::Status::OK;
689 });
690
691 btproto::MutateRowsRequest request;
692 auto context = absl::make_unique<grpc::ClientContext>();
693 HandlerResult result;
694 auto op = cq_.MakeStreamingReadRpc(
695 [this](grpc::ClientContext* context,
696 btproto::MutateRowsRequest const& request,
697 grpc::CompletionQueue* cq) {
698 return stub_->PrepareAsyncMutateRows(context, request, cq);
699 },
700 request, std::move(context),
701 [&result](btproto::MutateRowsResponse r) {
702 result.reads.emplace_back(std::move(r));
703 // Cancel on *every* request, we do not expect additional calls after
704 // the first one.
705 return make_ready_future(false);
706 },
707 [&result](Status s) {
708 result.status = std::move(s);
709 result.done.Lift();
710 });
711
712 result.done.Wait();
713 ASSERT_EQ(StatusCode::kCancelled, result.status.code());
714 ASSERT_EQ(1U, result.reads.size());
715 EXPECT_EQ(1, result.reads[0].entries_size());
716 EXPECT_EQ(0, result.reads[0].entries(0).index());
717 }
718
719 } // namespace
720 } // namespace BIGTABLE_CLIENT_NS
721 } // namespace bigtable
722 } // namespace cloud
723 } // namespace google
724