1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/testing_util/fake_completion_queue_impl.h"
16 
17 namespace google {
18 namespace cloud {
19 inline namespace GOOGLE_CLOUD_CPP_NS {
20 namespace testing_util {
21 namespace {
22 class FakeAsyncTimer : public internal::AsyncGrpcOperation {
23  public:
FakeAsyncTimer(std::chrono::system_clock::time_point deadline)24   explicit FakeAsyncTimer(std::chrono::system_clock::time_point deadline)
25       : deadline_(deadline) {}
26 
GetFuture()27   future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() {
28     return promise_.get_future();
29   }
30 
Cancel()31   void Cancel() override {}
32 
Notify(bool ok)33   bool Notify(bool ok) override {
34     if (!ok) {
35       promise_.set_value(Status(StatusCode::kCancelled, "timer canceled"));
36     } else {
37       promise_.set_value(deadline_);
38     }
39     return true;
40   }
41 
42  private:
43   std::chrono::system_clock::time_point const deadline_;
44   promise<StatusOr<std::chrono::system_clock::time_point>> promise_;
45 };
46 
47 class FakeAsyncFunction : public internal::AsyncGrpcOperation {
48  public:
FakeAsyncFunction(std::unique_ptr<internal::RunAsyncBase> fun)49   explicit FakeAsyncFunction(std::unique_ptr<internal::RunAsyncBase> fun)
50       : function_(std::move(fun)) {}
51 
Cancel()52   void Cancel() override {}
53 
54  private:
Notify(bool ok)55   bool Notify(bool ok) override {
56     auto f = std::move(function_);
57     if (!ok) return true;
58     f->exec();
59     return true;
60   }
61 
62   std::unique_ptr<internal::RunAsyncBase> function_;
63 };
64 
65 }  // namespace
66 
Run()67 void FakeCompletionQueueImpl::Run() {
68   std::unique_lock<std::mutex> lk(mu_);
69   cv_.wait(lk, [&] { return shutdown_ && pending_ops_.empty(); });
70 }
71 
Shutdown()72 void FakeCompletionQueueImpl::Shutdown() {
73   std::unique_lock<std::mutex> lk(mu_);
74   shutdown_ = true;
75   while (!pending_ops_.empty()) {
76     auto op = std::move(pending_ops_.back());
77     pending_ops_.pop_back();
78     lk.unlock();
79     op->Notify(false);
80     lk.lock();
81   }
82   cv_.notify_all();
83 }
84 
CancelAll()85 void FakeCompletionQueueImpl::CancelAll() {
86   auto ops = [this] {
87     std::unique_lock<std::mutex> lk(mu_);
88     return pending_ops_;
89   }();
90   for (auto& op : ops) op->Cancel();
91 }
92 
93 future<StatusOr<std::chrono::system_clock::time_point>>
MakeDeadlineTimer(std::chrono::system_clock::time_point deadline)94 FakeCompletionQueueImpl::MakeDeadlineTimer(
95     std::chrono::system_clock::time_point deadline) {
96   auto op = std::make_shared<FakeAsyncTimer>(deadline);
97   std::unique_lock<std::mutex> lk(mu_);
98   if (shutdown_) {
99     lk.unlock();
100     op->Notify(/*ok=*/false);
101     return op->GetFuture();
102   }
103   pending_ops_.push_back(op);
104   return op->GetFuture();
105 }
106 
107 future<StatusOr<std::chrono::system_clock::time_point>>
MakeRelativeTimer(std::chrono::nanoseconds duration)108 FakeCompletionQueueImpl::MakeRelativeTimer(std::chrono::nanoseconds duration) {
109   using std::chrono::system_clock;
110   auto const d = std::chrono::duration_cast<system_clock::duration>(duration);
111   return MakeDeadlineTimer(system_clock::now() + d);
112 }
113 
RunAsync(std::unique_ptr<internal::RunAsyncBase> function)114 void FakeCompletionQueueImpl::RunAsync(
115     std::unique_ptr<internal::RunAsyncBase> function) {
116   auto op = std::make_shared<FakeAsyncFunction>(std::move(function));
117   std::unique_lock<std::mutex> lk(mu_);
118   if (shutdown_) {
119     return;
120   }
121   pending_ops_.push_back(op);
122 }
123 
StartOperation(std::shared_ptr<internal::AsyncGrpcOperation> op,absl::FunctionRef<void (void *)> start)124 void FakeCompletionQueueImpl::StartOperation(
125     std::shared_ptr<internal::AsyncGrpcOperation> op,
126     absl::FunctionRef<void(void*)> start) {
127   std::unique_lock<std::mutex> lk(mu_);
128   if (shutdown_) {
129     lk.unlock();
130     op->Notify(/*ok=*/false);
131     return;
132   }
133   pending_ops_.push_back(op);
134   start(op.get());
135 }
136 
SimulateCompletion(bool ok)137 void FakeCompletionQueueImpl::SimulateCompletion(bool ok) {
138   auto ops = [this] {
139     std::unique_lock<std::mutex> lk(mu_);
140     std::vector<std::shared_ptr<internal::AsyncGrpcOperation>> ops;
141     ops.swap(pending_ops_);
142     return ops;
143   }();
144 
145   for (auto& op : ops) {
146     op->Notify(ok);
147   }
148 }
149 
150 }  // namespace testing_util
151 }  // namespace GOOGLE_CLOUD_CPP_NS
152 }  // namespace cloud
153 }  // namespace google
154