1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <grpc/support/log.h>
25 #include <grpcpp/alarm.h>
26 #include <grpcpp/grpcpp.h>
27 
28 #include "src/core/lib/gprpp/sync.h"
29 #include "src/core/lib/gprpp/thd.h"
30 #include "src/cpp/server/load_reporter/load_reporter.h"
31 
32 namespace grpc {
33 namespace load_reporter {
34 
35 // Async load reporting service. It's mainly responsible for controlling the
36 // procedure of incoming requests. The real business logic is handed off to the
37 // LoadReporter. There should be at most one instance of this service on a
38 // server to avoid spreading the load data into multiple places.
39 class LoadReporterAsyncServiceImpl
40     : public grpc::lb::v1::LoadReporter::AsyncService {
41  public:
42   explicit LoadReporterAsyncServiceImpl(
43       std::unique_ptr<ServerCompletionQueue> cq);
44   ~LoadReporterAsyncServiceImpl() override;
45 
46   // Starts the working thread.
47   void StartThread();
48 
49   // Not copyable nor movable.
50   LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
51   LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
52       delete;
53 
54  private:
55   class ReportLoadHandler;
56 
57   // A tag that can be called with a bool argument. It's tailored for
58   // ReportLoadHandler's use. Before being used, it should be constructed with a
59   // method of ReportLoadHandler and a shared pointer to the handler. The
60   // shared pointer will be moved to the invoked function and the function can
61   // only be invoked once. That makes ref counting of the handler easier,
62   // because the shared pointer is not bound to the function and can be gone
63   // once the invoked function returns (if not used any more).
64   class CallableTag {
65    public:
66     using HandlerFunction =
67         std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
68 
CallableTag()69     CallableTag() {}
70 
CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)71     CallableTag(HandlerFunction func,
72                 std::shared_ptr<ReportLoadHandler> handler)
73         : handler_function_(std::move(func)), handler_(std::move(handler)) {
74       GPR_ASSERT(handler_function_ != nullptr);
75       GPR_ASSERT(handler_ != nullptr);
76     }
77 
78     // Runs the tag. This should be called only once. The handler is no longer
79     // owned by this tag after this method is invoked.
80     void Run(bool ok);
81 
82     // Releases and returns the shared pointer to the handler.
ReleaseHandler()83     std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
84       return std::move(handler_);
85     }
86 
87    private:
88     HandlerFunction handler_function_ = nullptr;
89     std::shared_ptr<ReportLoadHandler> handler_;
90   };
91 
92   // Each handler takes care of one load reporting stream. It contains
93   // per-stream data and it will access the members of the parent class (i.e.,
94   // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
95   class ReportLoadHandler {
96    public:
97     // Instantiates a ReportLoadHandler and requests the next load reporting
98     // call. The handler object will manage its own lifetime, so no action is
99     // needed from the caller any more regarding that object.
100     static void CreateAndStart(ServerCompletionQueue* cq,
101                                LoadReporterAsyncServiceImpl* service,
102                                LoadReporter* load_reporter);
103 
104     // This ctor is public because we want to use std::make_shared<> in
105     // CreateAndStart(). This ctor shouldn't be used elsewhere.
106     ReportLoadHandler(ServerCompletionQueue* cq,
107                       LoadReporterAsyncServiceImpl* service,
108                       LoadReporter* load_reporter);
109 
110    private:
111     // After the handler has a call request delivered, it starts reading the
112     // initial request. Also, a new handler is spawned so that we can keep
113     // servicing future calls.
114     void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
115 
116     // The first Read() is expected to succeed, after which the handler starts
117     // sending load reports back to the balancer. The second Read() is
118     // expected to fail, which happens when the balancer half-closes the
119     // stream to signal that it's no longer interested in the load reports. For
120     // the latter case, the handler will then close the stream.
121     void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
122 
123     // The report sending operations are sequential as: send report -> send
124     // done, schedule the next send -> waiting for the alarm to fire -> alarm
125     // fires, send report -> ...
126     void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
127     void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
128 
129     // Called when Finish() is done.
130     void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
131 
132     // Called when AsyncNotifyWhenDone() notifies us.
133     void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
134 
135     void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
136 
137     // The key fields of the stream.
138     std::string lb_id_;
139     std::string load_balanced_hostname_;
140     std::string load_key_;
141     uint64_t load_report_interval_ms_;
142 
143     // The data for RPC communication with the load reportee.
144     ServerContext ctx_;
145     ::grpc::lb::v1::LoadReportRequest request_;
146 
147     // The members passed down from LoadReporterAsyncServiceImpl.
148     ServerCompletionQueue* cq_;
149     LoadReporterAsyncServiceImpl* service_;
150     LoadReporter* load_reporter_;
151     ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
152                             ::grpc::lb::v1::LoadReportRequest>
153         stream_;
154 
155     // The status of the RPC progress.
156     enum CallStatus {
157       WAITING_FOR_DELIVERY,
158       DELIVERED,
159       INITIAL_REQUEST_RECEIVED,
160       INITIAL_RESPONSE_SENT,
161       FINISH_CALLED
162     } call_status_;
163     bool shutdown_{false};
164     bool done_notified_{false};
165     bool is_cancelled_{false};
166     CallableTag on_done_notified_;
167     CallableTag on_finish_done_;
168     CallableTag next_inbound_;
169     CallableTag next_outbound_;
170     std::unique_ptr<Alarm> next_report_alarm_;
171   };
172 
173   // Handles the incoming requests and drives the completion queue in a loop.
174   static void Work(void* arg);
175 
176   // Schedules the next data fetching from Census and LB feedback sampling.
177   void ScheduleNextFetchAndSample();
178 
179   // Fetches data from Census and samples LB feedback.
180   void FetchAndSample(bool ok);
181 
182   std::unique_ptr<ServerCompletionQueue> cq_;
183   // To synchronize the operations related to shutdown state of cq_, so that we
184   // don't enqueue new tags into cq_ after it is already shut down.
185   grpc_core::Mutex cq_shutdown_mu_;
186   std::atomic_bool shutdown_{false};
187   std::unique_ptr<::grpc_core::Thread> thread_;
188   std::unique_ptr<LoadReporter> load_reporter_;
189   std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
190 };
191 
192 }  // namespace load_reporter
193 }  // namespace grpc
194 
195 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
196