1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
22 
23 #include <inttypes.h>
24 
25 #include "absl/memory/memory.h"
26 
27 namespace grpc {
28 namespace load_reporter {
29 
Run(bool ok)30 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
31   GPR_ASSERT(handler_function_ != nullptr);
32   GPR_ASSERT(handler_ != nullptr);
33   handler_function_(std::move(handler_), ok);
34 }
35 
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)36 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
37     std::unique_ptr<ServerCompletionQueue> cq)
38     : cq_(std::move(cq)) {
39   thread_ = absl::make_unique<::grpc_core::Thread>("server_load_reporting",
40                                                    Work, this);
41   std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
42 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
43   cpu_stats_provider = absl::make_unique<CpuStatsProviderDefaultImpl>();
44 #endif
45   load_reporter_ = absl::make_unique<LoadReporter>(
46       kFeedbackSampleWindowSeconds,
47       std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
48       std::move(cpu_stats_provider));
49 }
50 
~LoadReporterAsyncServiceImpl()51 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
52   // We will reach here after the server starts shutting down.
53   shutdown_ = true;
54   {
55     grpc_core::MutexLock lock(&cq_shutdown_mu_);
56     cq_->Shutdown();
57   }
58   if (next_fetch_and_sample_alarm_ != nullptr) {
59     next_fetch_and_sample_alarm_->Cancel();
60   }
61   thread_->Join();
62 }
63 
ScheduleNextFetchAndSample()64 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
65   auto next_fetch_and_sample_time =
66       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
67                    gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
68                                         GPR_TIMESPAN));
69   {
70     grpc_core::MutexLock lock(&cq_shutdown_mu_);
71     if (shutdown_) return;
72     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
73     // instance for multiple events.
74     next_fetch_and_sample_alarm_ = absl::make_unique<Alarm>();
75     next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
76                                       this);
77   }
78   gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
79 }
80 
FetchAndSample(bool ok)81 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
82   if (!ok) {
83     gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
84     return;
85   }
86   gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
87   load_reporter_->FetchAndSample();
88   ScheduleNextFetchAndSample();
89 }
90 
Work(void * arg)91 void LoadReporterAsyncServiceImpl::Work(void* arg) {
92   LoadReporterAsyncServiceImpl* service =
93       static_cast<LoadReporterAsyncServiceImpl*>(arg);
94   service->FetchAndSample(true /* ok */);
95   // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
96   // to figure out why cq is not ready after service starts.
97   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
98                                gpr_time_from_seconds(1, GPR_TIMESPAN)));
99   ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
100                                     service->load_reporter_.get());
101   void* tag;
102   bool ok;
103   while (true) {
104     if (!service->cq_->Next(&tag, &ok)) {
105       // The completion queue is shutting down.
106       GPR_ASSERT(service->shutdown_);
107       break;
108     }
109     if (tag == service) {
110       service->FetchAndSample(ok);
111     } else {
112       auto* next_step = static_cast<CallableTag*>(tag);
113       next_step->Run(ok);
114     }
115   }
116 }
117 
StartThread()118 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
119 
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)120 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
121     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
122     LoadReporter* load_reporter) {
123   std::shared_ptr<ReportLoadHandler> handler =
124       std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
125   ReportLoadHandler* p = handler.get();
126   {
127     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
128     if (service->shutdown_) return;
129     p->on_done_notified_ =
130         CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
131                               std::placeholders::_1, std::placeholders::_2),
132                     handler);
133     p->next_inbound_ =
134         CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
135                               std::placeholders::_1, std::placeholders::_2),
136                     std::move(handler));
137     p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
138     service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
139                                &p->next_inbound_);
140   }
141 }
142 
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)143 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
144     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
145     LoadReporter* load_reporter)
146     : cq_(cq),
147       service_(service),
148       load_reporter_(load_reporter),
149       stream_(&ctx_),
150       call_status_(WAITING_FOR_DELIVERY) {}
151 
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)152 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
153     std::shared_ptr<ReportLoadHandler> self, bool ok) {
154   if (ok) {
155     call_status_ = DELIVERED;
156   } else {
157     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
158     // tag will not pop out if the call never starts (
159     // https://github.com/grpc/grpc/issues/10136). So we need to manually
160     // release the ownership of the handler in this case.
161     GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
162   }
163   if (!ok || shutdown_) {
164     // The value of ok being false means that the server is shutting down.
165     Shutdown(std::move(self), "OnRequestDelivered");
166     return;
167   }
168   // Spawn a new handler instance to serve the next new client. Every handler
169   // instance will deallocate itself when it's done.
170   CreateAndStart(cq_, service_, load_reporter_);
171   {
172     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
173     if (service_->shutdown_) {
174       lock.Release();
175       Shutdown(std::move(self), "OnRequestDelivered");
176       return;
177     }
178     next_inbound_ =
179         CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
180                               std::placeholders::_1, std::placeholders::_2),
181                     std::move(self));
182     stream_.Read(&request_, &next_inbound_);
183   }
184   // LB ID is unique for each load reporting stream.
185   lb_id_ = load_reporter_->GenerateLbId();
186   gpr_log(GPR_INFO,
187           "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
188           "Start reading the initial request...",
189           service_, lb_id_.c_str(), this);
190 }
191 
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)192 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
193     std::shared_ptr<ReportLoadHandler> self, bool ok) {
194   if (!ok || shutdown_) {
195     if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
196       // The client may have half-closed the stream or the stream is broken.
197       gpr_log(GPR_INFO,
198               "[LRS %p] Failed reading the initial request from the stream "
199               "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
200               service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
201               static_cast<int>(is_cancelled_));
202     }
203     Shutdown(std::move(self), "OnReadDone");
204     return;
205   }
206   // We only receive one request, which is the initial request.
207   if (call_status_ < INITIAL_REQUEST_RECEIVED) {
208     if (!request_.has_initial_request()) {
209       Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
210     } else {
211       call_status_ = INITIAL_REQUEST_RECEIVED;
212       const auto& initial_request = request_.initial_request();
213       load_balanced_hostname_ = initial_request.load_balanced_hostname();
214       load_key_ = initial_request.load_key();
215       load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
216                                           load_key_);
217       const auto& load_report_interval = initial_request.load_report_interval();
218       load_report_interval_ms_ =
219           static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
220                                      load_report_interval.nanos() / 1000);
221       gpr_log(GPR_INFO,
222               "[LRS %p] Initial request received. Start load reporting (load "
223               "balanced host: %s, interval: %" PRIu64
224               " ms, lb_id_: %s, handler: %p)...",
225               service_, load_balanced_hostname_.c_str(),
226               load_report_interval_ms_, lb_id_.c_str(), this);
227       SendReport(self, true /* ok */);
228       // Expect this read to fail.
229       {
230         grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
231         if (service_->shutdown_) {
232           lock.Release();
233           Shutdown(std::move(self), "OnReadDone");
234           return;
235         }
236         next_inbound_ =
237             CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
238                                   std::placeholders::_1, std::placeholders::_2),
239                         std::move(self));
240         stream_.Read(&request_, &next_inbound_);
241       }
242     }
243   } else {
244     // Another request received! This violates the spec.
245     gpr_log(GPR_ERROR,
246             "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
247             service_, lb_id_.c_str(), this);
248     Shutdown(std::move(self), "OnReadDone+second_request");
249   }
250 }
251 
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)252 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
253     std::shared_ptr<ReportLoadHandler> self, bool ok) {
254   if (!ok || shutdown_) {
255     Shutdown(std::move(self), "ScheduleNextReport");
256     return;
257   }
258   auto next_report_time = gpr_time_add(
259       gpr_now(GPR_CLOCK_MONOTONIC),
260       gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
261   {
262     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
263     if (service_->shutdown_) {
264       lock.Release();
265       Shutdown(std::move(self), "ScheduleNextReport");
266       return;
267     }
268     next_outbound_ =
269         CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
270                               std::placeholders::_1, std::placeholders::_2),
271                     std::move(self));
272     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
273     // instance for multiple events.
274     next_report_alarm_ = absl::make_unique<Alarm>();
275     next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
276   }
277   gpr_log(GPR_DEBUG,
278           "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
279           service_, lb_id_.c_str(), this);
280 }
281 
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)282 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
283     std::shared_ptr<ReportLoadHandler> self, bool ok) {
284   if (!ok || shutdown_) {
285     Shutdown(std::move(self), "SendReport");
286     return;
287   }
288   ::grpc::lb::v1::LoadReportResponse response;
289   auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
290   response.mutable_load()->Swap(&loads);
291   auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
292   response.mutable_load_balancing_feedback()->Swap(&feedback);
293   if (call_status_ < INITIAL_RESPONSE_SENT) {
294     auto initial_response = response.mutable_initial_response();
295     initial_response->set_load_balancer_id(lb_id_);
296     initial_response->set_implementation_id(
297         ::grpc::lb::v1::InitialLoadReportResponse::CPP);
298     initial_response->set_server_version(kVersion);
299     call_status_ = INITIAL_RESPONSE_SENT;
300   }
301   {
302     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
303     if (service_->shutdown_) {
304       lock.Release();
305       Shutdown(std::move(self), "SendReport");
306       return;
307     }
308     next_outbound_ =
309         CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
310                               std::placeholders::_1, std::placeholders::_2),
311                     std::move(self));
312     stream_.Write(response, &next_outbound_);
313     gpr_log(GPR_INFO,
314             "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
315             "count: %d)...",
316             service_, lb_id_.c_str(), this, response.load().size());
317   }
318 }
319 
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)320 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
321     std::shared_ptr<ReportLoadHandler> self, bool ok) {
322   GPR_ASSERT(ok);
323   done_notified_ = true;
324   if (ctx_.IsCancelled()) {
325     is_cancelled_ = true;
326   }
327   gpr_log(GPR_INFO,
328           "[LRS %p] Load reporting call is notified done (handler: %p, "
329           "is_cancelled: %d).",
330           service_, this, static_cast<int>(is_cancelled_));
331   Shutdown(std::move(self), "OnDoneNotified");
332 }
333 
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)334 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
335     std::shared_ptr<ReportLoadHandler> self, const char* reason) {
336   if (!shutdown_) {
337     gpr_log(GPR_INFO,
338             "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
339             "reason: %s).",
340             service_, lb_id_.c_str(), this, reason);
341     shutdown_ = true;
342     if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
343       load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
344       next_report_alarm_->Cancel();
345     }
346   }
347   // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
348   // try to Finish() every time we are in Shutdown().
349   if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
350     grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
351     if (!service_->shutdown_) {
352       on_finish_done_ =
353           CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
354                                 std::placeholders::_1, std::placeholders::_2),
355                       std::move(self));
356       // TODO(juanlishen): Maybe add a message proto for the client to
357       // explicitly cancel the stream so that we can return OK status in such
358       // cases.
359       stream_.Finish(Status::CANCELLED, &on_finish_done_);
360       call_status_ = FINISH_CALLED;
361     }
362   }
363 }
364 
OnFinishDone(std::shared_ptr<ReportLoadHandler>,bool ok)365 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
366     // NOLINTNEXTLINE(performance-unnecessary-value-param)
367     std::shared_ptr<ReportLoadHandler> /*self*/, bool ok) {
368   if (ok) {
369     gpr_log(GPR_INFO,
370             "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
371             service_, lb_id_.c_str(), this);
372   }
373 }
374 
375 }  // namespace load_reporter
376 }  // namespace grpc
377