1 // Copyright 2020 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #include "google/cloud/testing_util/fake_completion_queue_impl.h" 16 17 namespace google { 18 namespace cloud { 19 inline namespace GOOGLE_CLOUD_CPP_NS { 20 namespace testing_util { 21 namespace { 22 class FakeAsyncTimer : public internal::AsyncGrpcOperation { 23 public: FakeAsyncTimer(std::chrono::system_clock::time_point deadline)24 explicit FakeAsyncTimer(std::chrono::system_clock::time_point deadline) 25 : deadline_(deadline) {} 26 GetFuture()27 future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() { 28 return promise_.get_future(); 29 } 30 Cancel()31 void Cancel() override {} 32 Notify(bool ok)33 bool Notify(bool ok) override { 34 if (!ok) { 35 promise_.set_value(Status(StatusCode::kCancelled, "timer canceled")); 36 } else { 37 promise_.set_value(deadline_); 38 } 39 return true; 40 } 41 42 private: 43 std::chrono::system_clock::time_point const deadline_; 44 promise<StatusOr<std::chrono::system_clock::time_point>> promise_; 45 }; 46 47 class FakeAsyncFunction : public internal::AsyncGrpcOperation { 48 public: FakeAsyncFunction(std::unique_ptr<internal::RunAsyncBase> fun)49 explicit FakeAsyncFunction(std::unique_ptr<internal::RunAsyncBase> fun) 50 : function_(std::move(fun)) {} 51 Cancel()52 void Cancel() override {} 53 54 private: Notify(bool ok)55 bool Notify(bool ok) override { 56 auto f = std::move(function_); 57 if (!ok) return true; 58 f->exec(); 59 return true; 60 } 61 62 std::unique_ptr<internal::RunAsyncBase> function_; 63 }; 64 65 } // namespace 66 Run()67void FakeCompletionQueueImpl::Run() { 68 std::unique_lock<std::mutex> lk(mu_); 69 cv_.wait(lk, [&] { return shutdown_ && pending_ops_.empty(); }); 70 } 71 Shutdown()72void FakeCompletionQueueImpl::Shutdown() { 73 std::unique_lock<std::mutex> lk(mu_); 74 shutdown_ = true; 75 while (!pending_ops_.empty()) { 76 auto op = std::move(pending_ops_.back()); 77 pending_ops_.pop_back(); 78 lk.unlock(); 79 op->Notify(false); 80 lk.lock(); 81 } 82 cv_.notify_all(); 83 } 84 CancelAll()85void FakeCompletionQueueImpl::CancelAll() { 86 auto ops = [this] { 87 std::unique_lock<std::mutex> lk(mu_); 88 return pending_ops_; 89 }(); 90 for (auto& op : ops) op->Cancel(); 91 } 92 93 future<StatusOr<std::chrono::system_clock::time_point>> MakeDeadlineTimer(std::chrono::system_clock::time_point deadline)94FakeCompletionQueueImpl::MakeDeadlineTimer( 95 std::chrono::system_clock::time_point deadline) { 96 auto op = std::make_shared<FakeAsyncTimer>(deadline); 97 std::unique_lock<std::mutex> lk(mu_); 98 if (shutdown_) { 99 lk.unlock(); 100 op->Notify(/*ok=*/false); 101 return op->GetFuture(); 102 } 103 pending_ops_.push_back(op); 104 return op->GetFuture(); 105 } 106 107 future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(std::chrono::nanoseconds duration)108FakeCompletionQueueImpl::MakeRelativeTimer(std::chrono::nanoseconds duration) { 109 using std::chrono::system_clock; 110 auto const d = std::chrono::duration_cast<system_clock::duration>(duration); 111 return MakeDeadlineTimer(system_clock::now() + d); 112 } 113 RunAsync(std::unique_ptr<internal::RunAsyncBase> function)114void FakeCompletionQueueImpl::RunAsync( 115 std::unique_ptr<internal::RunAsyncBase> function) { 116 auto op = std::make_shared<FakeAsyncFunction>(std::move(function)); 117 std::unique_lock<std::mutex> lk(mu_); 118 if (shutdown_) { 119 return; 120 } 121 pending_ops_.push_back(op); 122 } 123 StartOperation(std::shared_ptr<internal::AsyncGrpcOperation> op,absl::FunctionRef<void (void *)> start)124void FakeCompletionQueueImpl::StartOperation( 125 std::shared_ptr<internal::AsyncGrpcOperation> op, 126 absl::FunctionRef<void(void*)> start) { 127 std::unique_lock<std::mutex> lk(mu_); 128 if (shutdown_) { 129 lk.unlock(); 130 op->Notify(/*ok=*/false); 131 return; 132 } 133 pending_ops_.push_back(op); 134 start(op.get()); 135 } 136 SimulateCompletion(bool ok)137void FakeCompletionQueueImpl::SimulateCompletion(bool ok) { 138 auto ops = [this] { 139 std::unique_lock<std::mutex> lk(mu_); 140 std::vector<std::shared_ptr<internal::AsyncGrpcOperation>> ops; 141 ops.swap(pending_ops_); 142 return ops; 143 }(); 144 145 for (auto& op : ops) { 146 op->Notify(ok); 147 } 148 } 149 150 } // namespace testing_util 151 } // namespace GOOGLE_CLOUD_CPP_NS 152 } // namespace cloud 153 } // namespace google 154