1
2 ///grpcPlugin add a GRPC server to any PyBullet/BulletRobotics
3 ///physics server. You can connect using PyBullet connect.GRPC method
4
5 #include "grpcPlugin.h"
6 #include "SharedMemory/SharedMemoryPublic.h"
7 #include "../b3PluginContext.h"
8 #include "Bullet3Common/b3AlignedObjectArray.h"
9 #include "SharedMemory/SharedMemoryCommands.h"
10 #include "SharedMemory/PhysicsCommandProcessorInterface.h"
11
12 #include <stdio.h>
13
14 #include <mutex>
15 #include <thread>
16
17 #include <grpc++/grpc++.h>
18 #include <grpc/support/log.h>
19 #include "../../../Utils/b3Clock.h"
20 #include "SharedMemory/grpc/proto/pybullet.grpc.pb.h"
21 #include "SharedMemory/grpc/ConvertGRPCBullet.h"
22 using grpc::Server;
23 using grpc::ServerAsyncResponseWriter;
24 using grpc::ServerBuilder;
25 using grpc::ServerCompletionQueue;
26 using grpc::ServerContext;
27 using grpc::Status;
28 using pybullet_grpc::PyBulletAPI;
29 using pybullet_grpc::PyBulletCommand;
30 using pybullet_grpc::PyBulletStatus;
31
32 bool gVerboseNetworkMessagesServer4 = false;
33
34 class ServerImpl final
35 {
36 public:
ServerImpl()37 ServerImpl()
38 {
39 }
~ServerImpl()40 ~ServerImpl()
41 {
42 Exit();
43 }
44
Exit()45 void Exit()
46 {
47 if (server_)
48 {
49 server_->Shutdown();
50 m_requestThreadCancelled = true;
51 m_requestThread->join();
52 delete m_requestThread;
53 // Always shutdown the completion queue after the server.
54 cq_->Shutdown();
55 server_ = 0;
56 }
57 }
58
Init(PhysicsCommandProcessorInterface * comProc,const std::string & hostNamePort)59 void Init(PhysicsCommandProcessorInterface* comProc, const std::string& hostNamePort)
60 {
61 // Listen on the given address without any authentication mechanism.
62 m_builder.AddListeningPort(hostNamePort, grpc::InsecureServerCredentials());
63 // Register "service_" as the instance through which we'll communicate with
64 // clients. In this case it corresponds to an *asynchronous* service.
65 m_builder.RegisterService(&service_);
66 // Get hold of the completion queue used for the asynchronous communication
67 // with the gRPC runtime.
68 cq_ = m_builder.AddCompletionQueue();
69 // Finally assemble the server.
70 server_ = m_builder.BuildAndStart();
71 std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl;
72
73 //Start the thread to gather the requests.
74 m_requestThreadCancelled = false;
75 m_requestThread = new std::thread(&ServerImpl::GatherRequests, this);
76
77 // Proceed to the server's main loop.
78 InitRpcs(comProc);
79 }
80
81 // This can be run in multiple threads if needed.
HandleSingleRpc()82 bool HandleSingleRpc()
83 {
84 CallData::CallStatus status = CallData::CallStatus::CREATE;
85 std::lock_guard<std::mutex> guard(m_queueMutex);
86 if (!m_requestQueue.empty()) {
87 void* tag = m_requestQueue.front();
88 m_requestQueue.pop_front();
89 status = static_cast<CallData*>(tag)->Proceed();
90 }
91
92 return status == CallData::CallStatus::TERMINATE;
93 }
94
95 private:
96 // Class encompasing the state and logic needed to serve a request.
97 class CallData
98 {
99 public:
100 // Take in the "service" instance (in this case representing an asynchronous
101 // server) and the completion queue "cq" used for asynchronous communication
102 // with the gRPC runtime.
CallData(PyBulletAPI::AsyncService * service,ServerCompletionQueue * cq,PhysicsCommandProcessorInterface * comProc)103 CallData(PyBulletAPI::AsyncService* service, ServerCompletionQueue* cq, PhysicsCommandProcessorInterface* comProc)
104 : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), m_finished(false), m_comProc(comProc)
105 {
106 // Invoke the serving logic right away.
107 Proceed();
108 }
109
110 enum CallStatus
111 {
112 CREATE,
113 PROCESS,
114 FINISH,
115 TERMINATE
116 };
117
Proceed()118 CallStatus Proceed()
119 {
120 if (status_ == CREATE)
121 {
122 // Make this instance progress to the PROCESS state.
123 status_ = PROCESS;
124
125 // As part of the initial CREATE state, we *request* that the system
126 // start processing SayHello requests. In this request, "this" acts are
127 // the tag uniquely identifying the request (so that different CallData
128 // instances can serve different requests concurrently), in this case
129 // the memory address of this CallData instance.
130
131 service_->RequestSubmitCommand(&ctx_, &m_command, &responder_, cq_, cq_,
132 this);
133 }
134 else if (status_ == PROCESS)
135 {
136 // Spawn a new CallData instance to serve new clients while we process
137 // the one for this CallData. The instance will deallocate itself as
138 // part of its FINISH state.
139 new CallData(service_, cq_, m_comProc);
140 status_ = FINISH;
141
142 std::string replyString;
143 // The actual processing.
144
145 SharedMemoryStatus serverStatus;
146 b3AlignedObjectArray<char> buffer;
147 buffer.resize(SHARED_MEMORY_MAX_STREAM_CHUNK_SIZE);
148 SharedMemoryCommand cmd;
149 SharedMemoryCommand* cmdPtr = 0;
150
151 m_status.set_statustype(CMD_UNKNOWN_COMMAND_FLUSHED);
152
153 if (m_command.has_checkversioncommand())
154 {
155 m_status.set_statustype(CMD_CLIENT_COMMAND_COMPLETED);
156 m_status.mutable_checkversionstatus()->set_serverversion(SHARED_MEMORY_MAGIC_NUMBER);
157 }
158 else
159 {
160 cmdPtr = convertGRPCToBulletCommand(m_command, cmd);
161
162 if (cmdPtr)
163 {
164 bool hasStatus = m_comProc->processCommand(*cmdPtr, serverStatus, &buffer[0], buffer.size());
165
166 double timeOutInSeconds = 10;
167 b3Clock clock;
168 double startTimeSeconds = clock.getTimeInSeconds();
169 double curTimeSeconds = clock.getTimeInSeconds();
170
171 while ((!hasStatus) && ((curTimeSeconds - startTimeSeconds) < timeOutInSeconds))
172 {
173 hasStatus = m_comProc->receiveStatus(serverStatus, &buffer[0], buffer.size());
174 curTimeSeconds = clock.getTimeInSeconds();
175 }
176
177 if (gVerboseNetworkMessagesServer4)
178 {
179 //printf("buffer.size = %d\n", buffer.size());
180 printf("serverStatus.m_numDataStreamBytes = %d\n", serverStatus.m_numDataStreamBytes);
181 }
182 if (hasStatus)
183 {
184 b3AlignedObjectArray<unsigned char> packetData;
185 unsigned char* statBytes = (unsigned char*)&serverStatus;
186
187 convertStatusToGRPC(serverStatus, &buffer[0], buffer.size(), m_status);
188 }
189 }
190
191 if (m_command.has_terminateservercommand())
192 {
193 status_ = TERMINATE;
194 }
195 }
196
197 // And we are done! Let the gRPC runtime know we've finished, using the
198 // memory address of this instance as the uniquely identifying tag for
199 // the event.
200
201 responder_.Finish(m_status, Status::OK, this);
202 }
203 else
204 {
205 GPR_ASSERT(status_ == FINISH);
206 // Once in the FINISH state, deallocate ourselves (CallData).
207 CallData::CallStatus tmpStatus = status_;
208 delete this;
209 return tmpStatus;
210 }
211 return status_;
212 }
213
214 private:
215 // The means of communication with the gRPC runtime for an asynchronous
216 // server.
217 PyBulletAPI::AsyncService* service_;
218 // The producer-consumer queue where for asynchronous server notifications.
219 ServerCompletionQueue* cq_;
220 // Context for the rpc, allowing to tweak aspects of it such as the use
221 // of compression, authentication, as well as to send metadata back to the
222 // client.
223 ServerContext ctx_;
224
225 // What we get from the client.
226 PyBulletCommand m_command;
227 // What we send back to the client.
228 PyBulletStatus m_status;
229
230 // The means to get back to the client.
231 ServerAsyncResponseWriter<PyBulletStatus> responder_;
232
233 // Let's implement a tiny state machine with the following states.
234
235 CallStatus status_; // The current serving state.
236
237 bool m_finished;
238
239 PhysicsCommandProcessorInterface* m_comProc; //physics server command processor
240 };
241
242 // This can be run in multiple threads if needed.
InitRpcs(PhysicsCommandProcessorInterface * comProc)243 void InitRpcs(PhysicsCommandProcessorInterface* comProc)
244 {
245 // Spawn a new CallData instance to serve new clients.
246 new CallData(&service_, cq_.get(), comProc);
247 }
248
249 ServerBuilder m_builder;
250 std::unique_ptr<ServerCompletionQueue> cq_;
251 PyBulletAPI::AsyncService service_;
252 std::unique_ptr<Server> server_;
253
254 // Mutex to protect access to the request queue variables (m_requestQueue,
255 // m_requestThread, m_requestThreadCancelled).
256 std::mutex m_queueMutex;
257
258 // List of outstanding request tags.
259 std::list<void*> m_requestQueue;
260
261 // Whether or not the gathering thread is cancelled.
262 bool m_requestThreadCancelled;
263
264 // Thread to gather requests from the completion queue.
265 std::thread* m_requestThread;
266
GatherRequests()267 void GatherRequests() {
268 void* tag; // uniquely identifies a request.
269 bool ok;
270
271 while(!m_requestThreadCancelled) {
272 // Block waiting to read the next event from the completion queue. The
273 // event is uniquely identified by its tag, which in this case is the
274 // memory address of a CallData instance.
275 // The return value of Next should always be checked. This return value
276 // tells us whether there is any kind of event or cq_ is shutting down.
277 grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
278 if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
279 {
280 GPR_ASSERT(ok);
281 std::lock_guard<std::mutex> guard(m_queueMutex);
282 m_requestQueue.push_back(tag);
283 }
284 }
285 }
286 };
287
288 struct grpcMyClass
289 {
290 int m_testData;
291
292 ServerImpl m_grpcServer;
293 bool m_grpcInitialized;
294 bool m_grpcTerminated;
295
grpcMyClassgrpcMyClass296 grpcMyClass()
297 : m_testData(42),
298 m_grpcInitialized(false),
299 m_grpcTerminated(false)
300 {
301 }
~grpcMyClassgrpcMyClass302 virtual ~grpcMyClass()
303 {
304 }
305 };
306
initPlugin_grpcPlugin(struct b3PluginContext * context)307 B3_SHARED_API int initPlugin_grpcPlugin(struct b3PluginContext* context)
308 {
309 grpcMyClass* obj = new grpcMyClass();
310 context->m_userPointer = obj;
311
312 return SHARED_MEMORY_MAGIC_NUMBER;
313 }
314
preTickPluginCallback_grpcPlugin(struct b3PluginContext * context)315 B3_SHARED_API int preTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
316 {
317 //process grpc server messages
318 return 0;
319 }
320
processClientCommands_grpcPlugin(struct b3PluginContext * context)321 B3_SHARED_API int processClientCommands_grpcPlugin(struct b3PluginContext* context)
322 {
323 grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
324
325 if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
326 {
327 obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
328 }
329
330 obj->m_testData++;
331 return 0;
332 }
333
postTickPluginCallback_grpcPlugin(struct b3PluginContext * context)334 B3_SHARED_API int postTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
335 {
336 grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
337 obj->m_testData++;
338 return 0;
339 }
340
executePluginCommand_grpcPlugin(struct b3PluginContext * context,const struct b3PluginArguments * arguments)341 B3_SHARED_API int executePluginCommand_grpcPlugin(struct b3PluginContext* context, const struct b3PluginArguments* arguments)
342 {
343 ///3 cases:
344 /// 1: send a non-empty string to start the GRPC server
345 /// 2: send some integer n, to call n times to HandleSingleRpc
346 /// 3: send nothing to terminate the GRPC server
347
348 grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
349
350 if (strlen(arguments->m_text))
351 {
352 if (!obj->m_grpcInitialized && context->m_rpcCommandProcessorInterface)
353 {
354 obj->m_grpcServer.Init(context->m_rpcCommandProcessorInterface, arguments->m_text);
355 }
356 obj->m_grpcInitialized = true;
357 }
358 else
359 {
360 if (arguments->m_numInts > 0)
361 {
362 for (int i = 0; i < arguments->m_ints[0]; i++)
363 {
364 if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
365 {
366 obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
367 }
368 }
369 }
370 else
371 {
372 obj->m_grpcServer.Exit();
373 obj->m_grpcInitialized = false;
374 }
375 }
376
377 return 0;
378 }
379
exitPlugin_grpcPlugin(struct b3PluginContext * context)380 B3_SHARED_API void exitPlugin_grpcPlugin(struct b3PluginContext* context)
381 {
382 grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
383 obj->m_grpcServer.Exit();
384 delete obj;
385 context->m_userPointer = 0;
386 }
387