1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
22
23 #include <inttypes.h>
24
25 #include "absl/memory/memory.h"
26
27 namespace grpc {
28 namespace load_reporter {
29
Run(bool ok)30 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
31 GPR_ASSERT(handler_function_ != nullptr);
32 GPR_ASSERT(handler_ != nullptr);
33 handler_function_(std::move(handler_), ok);
34 }
35
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)36 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
37 std::unique_ptr<ServerCompletionQueue> cq)
38 : cq_(std::move(cq)) {
39 thread_ = absl::make_unique<::grpc_core::Thread>("server_load_reporting",
40 Work, this);
41 std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
42 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
43 cpu_stats_provider = absl::make_unique<CpuStatsProviderDefaultImpl>();
44 #endif
45 load_reporter_ = absl::make_unique<LoadReporter>(
46 kFeedbackSampleWindowSeconds,
47 std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
48 std::move(cpu_stats_provider));
49 }
50
~LoadReporterAsyncServiceImpl()51 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
52 // We will reach here after the server starts shutting down.
53 shutdown_ = true;
54 {
55 grpc_core::MutexLock lock(&cq_shutdown_mu_);
56 cq_->Shutdown();
57 }
58 if (next_fetch_and_sample_alarm_ != nullptr) {
59 next_fetch_and_sample_alarm_->Cancel();
60 }
61 thread_->Join();
62 }
63
ScheduleNextFetchAndSample()64 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
65 auto next_fetch_and_sample_time =
66 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
67 gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
68 GPR_TIMESPAN));
69 {
70 grpc_core::MutexLock lock(&cq_shutdown_mu_);
71 if (shutdown_) return;
72 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
73 // instance for multiple events.
74 next_fetch_and_sample_alarm_ = absl::make_unique<Alarm>();
75 next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
76 this);
77 }
78 gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
79 }
80
FetchAndSample(bool ok)81 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
82 if (!ok) {
83 gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
84 return;
85 }
86 gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
87 load_reporter_->FetchAndSample();
88 ScheduleNextFetchAndSample();
89 }
90
Work(void * arg)91 void LoadReporterAsyncServiceImpl::Work(void* arg) {
92 LoadReporterAsyncServiceImpl* service =
93 static_cast<LoadReporterAsyncServiceImpl*>(arg);
94 service->FetchAndSample(true /* ok */);
95 // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
96 // to figure out why cq is not ready after service starts.
97 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
98 gpr_time_from_seconds(1, GPR_TIMESPAN)));
99 ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
100 service->load_reporter_.get());
101 void* tag;
102 bool ok;
103 while (true) {
104 if (!service->cq_->Next(&tag, &ok)) {
105 // The completion queue is shutting down.
106 GPR_ASSERT(service->shutdown_);
107 break;
108 }
109 if (tag == service) {
110 service->FetchAndSample(ok);
111 } else {
112 auto* next_step = static_cast<CallableTag*>(tag);
113 next_step->Run(ok);
114 }
115 }
116 }
117
StartThread()118 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
119
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)120 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
121 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
122 LoadReporter* load_reporter) {
123 std::shared_ptr<ReportLoadHandler> handler =
124 std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
125 ReportLoadHandler* p = handler.get();
126 {
127 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
128 if (service->shutdown_) return;
129 p->on_done_notified_ =
130 CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
131 std::placeholders::_1, std::placeholders::_2),
132 handler);
133 p->next_inbound_ =
134 CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
135 std::placeholders::_1, std::placeholders::_2),
136 std::move(handler));
137 p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
138 service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
139 &p->next_inbound_);
140 }
141 }
142
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)143 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
144 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
145 LoadReporter* load_reporter)
146 : cq_(cq),
147 service_(service),
148 load_reporter_(load_reporter),
149 stream_(&ctx_),
150 call_status_(WAITING_FOR_DELIVERY) {}
151
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)152 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
153 std::shared_ptr<ReportLoadHandler> self, bool ok) {
154 if (ok) {
155 call_status_ = DELIVERED;
156 } else {
157 // AsyncNotifyWhenDone() needs to be called before the call starts, but the
158 // tag will not pop out if the call never starts (
159 // https://github.com/grpc/grpc/issues/10136). So we need to manually
160 // release the ownership of the handler in this case.
161 GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
162 }
163 if (!ok || shutdown_) {
164 // The value of ok being false means that the server is shutting down.
165 Shutdown(std::move(self), "OnRequestDelivered");
166 return;
167 }
168 // Spawn a new handler instance to serve the next new client. Every handler
169 // instance will deallocate itself when it's done.
170 CreateAndStart(cq_, service_, load_reporter_);
171 {
172 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
173 if (service_->shutdown_) {
174 lock.Release();
175 Shutdown(std::move(self), "OnRequestDelivered");
176 return;
177 }
178 next_inbound_ =
179 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
180 std::placeholders::_1, std::placeholders::_2),
181 std::move(self));
182 stream_.Read(&request_, &next_inbound_);
183 }
184 // LB ID is unique for each load reporting stream.
185 lb_id_ = load_reporter_->GenerateLbId();
186 gpr_log(GPR_INFO,
187 "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
188 "Start reading the initial request...",
189 service_, lb_id_.c_str(), this);
190 }
191
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)192 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
193 std::shared_ptr<ReportLoadHandler> self, bool ok) {
194 if (!ok || shutdown_) {
195 if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
196 // The client may have half-closed the stream or the stream is broken.
197 gpr_log(GPR_INFO,
198 "[LRS %p] Failed reading the initial request from the stream "
199 "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
200 service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
201 static_cast<int>(is_cancelled_));
202 }
203 Shutdown(std::move(self), "OnReadDone");
204 return;
205 }
206 // We only receive one request, which is the initial request.
207 if (call_status_ < INITIAL_REQUEST_RECEIVED) {
208 if (!request_.has_initial_request()) {
209 Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
210 } else {
211 call_status_ = INITIAL_REQUEST_RECEIVED;
212 const auto& initial_request = request_.initial_request();
213 load_balanced_hostname_ = initial_request.load_balanced_hostname();
214 load_key_ = initial_request.load_key();
215 load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
216 load_key_);
217 const auto& load_report_interval = initial_request.load_report_interval();
218 load_report_interval_ms_ =
219 static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
220 load_report_interval.nanos() / 1000);
221 gpr_log(GPR_INFO,
222 "[LRS %p] Initial request received. Start load reporting (load "
223 "balanced host: %s, interval: %" PRIu64
224 " ms, lb_id_: %s, handler: %p)...",
225 service_, load_balanced_hostname_.c_str(),
226 load_report_interval_ms_, lb_id_.c_str(), this);
227 SendReport(self, true /* ok */);
228 // Expect this read to fail.
229 {
230 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
231 if (service_->shutdown_) {
232 lock.Release();
233 Shutdown(std::move(self), "OnReadDone");
234 return;
235 }
236 next_inbound_ =
237 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
238 std::placeholders::_1, std::placeholders::_2),
239 std::move(self));
240 stream_.Read(&request_, &next_inbound_);
241 }
242 }
243 } else {
244 // Another request received! This violates the spec.
245 gpr_log(GPR_ERROR,
246 "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
247 service_, lb_id_.c_str(), this);
248 Shutdown(std::move(self), "OnReadDone+second_request");
249 }
250 }
251
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)252 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
253 std::shared_ptr<ReportLoadHandler> self, bool ok) {
254 if (!ok || shutdown_) {
255 Shutdown(std::move(self), "ScheduleNextReport");
256 return;
257 }
258 auto next_report_time = gpr_time_add(
259 gpr_now(GPR_CLOCK_MONOTONIC),
260 gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
261 {
262 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
263 if (service_->shutdown_) {
264 lock.Release();
265 Shutdown(std::move(self), "ScheduleNextReport");
266 return;
267 }
268 next_outbound_ =
269 CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
270 std::placeholders::_1, std::placeholders::_2),
271 std::move(self));
272 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
273 // instance for multiple events.
274 next_report_alarm_ = absl::make_unique<Alarm>();
275 next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
276 }
277 gpr_log(GPR_DEBUG,
278 "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
279 service_, lb_id_.c_str(), this);
280 }
281
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)282 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
283 std::shared_ptr<ReportLoadHandler> self, bool ok) {
284 if (!ok || shutdown_) {
285 Shutdown(std::move(self), "SendReport");
286 return;
287 }
288 ::grpc::lb::v1::LoadReportResponse response;
289 auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
290 response.mutable_load()->Swap(&loads);
291 auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
292 response.mutable_load_balancing_feedback()->Swap(&feedback);
293 if (call_status_ < INITIAL_RESPONSE_SENT) {
294 auto initial_response = response.mutable_initial_response();
295 initial_response->set_load_balancer_id(lb_id_);
296 initial_response->set_implementation_id(
297 ::grpc::lb::v1::InitialLoadReportResponse::CPP);
298 initial_response->set_server_version(kVersion);
299 call_status_ = INITIAL_RESPONSE_SENT;
300 }
301 {
302 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
303 if (service_->shutdown_) {
304 lock.Release();
305 Shutdown(std::move(self), "SendReport");
306 return;
307 }
308 next_outbound_ =
309 CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
310 std::placeholders::_1, std::placeholders::_2),
311 std::move(self));
312 stream_.Write(response, &next_outbound_);
313 gpr_log(GPR_INFO,
314 "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
315 "count: %d)...",
316 service_, lb_id_.c_str(), this, response.load().size());
317 }
318 }
319
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)320 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
321 std::shared_ptr<ReportLoadHandler> self, bool ok) {
322 GPR_ASSERT(ok);
323 done_notified_ = true;
324 if (ctx_.IsCancelled()) {
325 is_cancelled_ = true;
326 }
327 gpr_log(GPR_INFO,
328 "[LRS %p] Load reporting call is notified done (handler: %p, "
329 "is_cancelled: %d).",
330 service_, this, static_cast<int>(is_cancelled_));
331 Shutdown(std::move(self), "OnDoneNotified");
332 }
333
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)334 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
335 std::shared_ptr<ReportLoadHandler> self, const char* reason) {
336 if (!shutdown_) {
337 gpr_log(GPR_INFO,
338 "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
339 "reason: %s).",
340 service_, lb_id_.c_str(), this, reason);
341 shutdown_ = true;
342 if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
343 load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
344 next_report_alarm_->Cancel();
345 }
346 }
347 // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
348 // try to Finish() every time we are in Shutdown().
349 if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
350 grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
351 if (!service_->shutdown_) {
352 on_finish_done_ =
353 CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
354 std::placeholders::_1, std::placeholders::_2),
355 std::move(self));
356 // TODO(juanlishen): Maybe add a message proto for the client to
357 // explicitly cancel the stream so that we can return OK status in such
358 // cases.
359 stream_.Finish(Status::CANCELLED, &on_finish_done_);
360 call_status_ = FINISH_CALLED;
361 }
362 }
363 }
364
OnFinishDone(std::shared_ptr<ReportLoadHandler>,bool ok)365 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
366 // NOLINTNEXTLINE(performance-unnecessary-value-param)
367 std::shared_ptr<ReportLoadHandler> /*self*/, bool ok) {
368 if (ok) {
369 gpr_log(GPR_INFO,
370 "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
371 service_, lb_id_.c_str(), this);
372 }
373 }
374
375 } // namespace load_reporter
376 } // namespace grpc
377