1 
2 ///grpcPlugin add a GRPC server to any PyBullet/BulletRobotics
3 ///physics server. You can connect using PyBullet connect.GRPC method
4 
5 #include "grpcPlugin.h"
6 #include "SharedMemory/SharedMemoryPublic.h"
7 #include "../b3PluginContext.h"
8 #include "Bullet3Common/b3AlignedObjectArray.h"
9 #include "SharedMemory/SharedMemoryCommands.h"
10 #include "SharedMemory/PhysicsCommandProcessorInterface.h"
11 
12 #include <stdio.h>
13 
14 #include <mutex>
15 #include <thread>
16 
17 #include <grpc++/grpc++.h>
18 #include <grpc/support/log.h>
19 #include "../../../Utils/b3Clock.h"
20 #include "SharedMemory/grpc/proto/pybullet.grpc.pb.h"
21 #include "SharedMemory/grpc/ConvertGRPCBullet.h"
22 using grpc::Server;
23 using grpc::ServerAsyncResponseWriter;
24 using grpc::ServerBuilder;
25 using grpc::ServerCompletionQueue;
26 using grpc::ServerContext;
27 using grpc::Status;
28 using pybullet_grpc::PyBulletAPI;
29 using pybullet_grpc::PyBulletCommand;
30 using pybullet_grpc::PyBulletStatus;
31 
32 bool gVerboseNetworkMessagesServer4 = false;
33 
34 class ServerImpl final
35 {
36 public:
ServerImpl()37 	ServerImpl()
38 	{
39 	}
~ServerImpl()40 	~ServerImpl()
41 	{
42 		Exit();
43 	}
44 
Exit()45 	void Exit()
46 	{
47 		if (server_)
48 		{
49 			server_->Shutdown();
50 			m_requestThreadCancelled = true;
51 			m_requestThread->join();
52 			delete m_requestThread;
53 			// Always shutdown the completion queue after the server.
54 			cq_->Shutdown();
55 			server_ = 0;
56 		}
57 	}
58 
Init(PhysicsCommandProcessorInterface * comProc,const std::string & hostNamePort)59 	void Init(PhysicsCommandProcessorInterface* comProc, const std::string& hostNamePort)
60 	{
61 		// Listen on the given address without any authentication mechanism.
62 		m_builder.AddListeningPort(hostNamePort, grpc::InsecureServerCredentials());
63 		// Register "service_" as the instance through which we'll communicate with
64 		// clients. In this case it corresponds to an *asynchronous* service.
65 		m_builder.RegisterService(&service_);
66 		// Get hold of the completion queue used for the asynchronous communication
67 		// with the gRPC runtime.
68 		cq_ = m_builder.AddCompletionQueue();
69 		// Finally assemble the server.
70 		server_ = m_builder.BuildAndStart();
71 		std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl;
72 
73 		//Start the thread to gather the requests.
74 		m_requestThreadCancelled = false;
75 		m_requestThread = new std::thread(&ServerImpl::GatherRequests, this);
76 
77 		// Proceed to the server's main loop.
78 		InitRpcs(comProc);
79 	}
80 
81 	// This can be run in multiple threads if needed.
HandleSingleRpc()82 	bool HandleSingleRpc()
83 	{
84 		CallData::CallStatus status = CallData::CallStatus::CREATE;
85 		std::lock_guard<std::mutex> guard(m_queueMutex);
86     if (!m_requestQueue.empty()) {
87       void* tag = m_requestQueue.front();
88       m_requestQueue.pop_front();
89       status = static_cast<CallData*>(tag)->Proceed();
90     }
91 
92 		return status == CallData::CallStatus::TERMINATE;
93 	}
94 
95 private:
96 	// Class encompasing the state and logic needed to serve a request.
97 	class CallData
98 	{
99 	public:
100 		// Take in the "service" instance (in this case representing an asynchronous
101 		// server) and the completion queue "cq" used for asynchronous communication
102 		// with the gRPC runtime.
CallData(PyBulletAPI::AsyncService * service,ServerCompletionQueue * cq,PhysicsCommandProcessorInterface * comProc)103 		CallData(PyBulletAPI::AsyncService* service, ServerCompletionQueue* cq, PhysicsCommandProcessorInterface* comProc)
104 			: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), m_finished(false), m_comProc(comProc)
105 		{
106 			// Invoke the serving logic right away.
107 			Proceed();
108 		}
109 
110 		enum CallStatus
111 		{
112 			CREATE,
113 			PROCESS,
114 			FINISH,
115 			TERMINATE
116 		};
117 
Proceed()118 		CallStatus Proceed()
119 		{
120 			if (status_ == CREATE)
121 			{
122 				// Make this instance progress to the PROCESS state.
123 				status_ = PROCESS;
124 
125 				// As part of the initial CREATE state, we *request* that the system
126 				// start processing SayHello requests. In this request, "this" acts are
127 				// the tag uniquely identifying the request (so that different CallData
128 				// instances can serve different requests concurrently), in this case
129 				// the memory address of this CallData instance.
130 
131 				service_->RequestSubmitCommand(&ctx_, &m_command, &responder_, cq_, cq_,
132 											   this);
133 			}
134 			else if (status_ == PROCESS)
135 			{
136 				// Spawn a new CallData instance to serve new clients while we process
137 				// the one for this CallData. The instance will deallocate itself as
138 				// part of its FINISH state.
139 				new CallData(service_, cq_, m_comProc);
140 				status_ = FINISH;
141 
142 				std::string replyString;
143 				// The actual processing.
144 
145 				SharedMemoryStatus serverStatus;
146 				b3AlignedObjectArray<char> buffer;
147 				buffer.resize(SHARED_MEMORY_MAX_STREAM_CHUNK_SIZE);
148 				SharedMemoryCommand cmd;
149 				SharedMemoryCommand* cmdPtr = 0;
150 
151 				m_status.set_statustype(CMD_UNKNOWN_COMMAND_FLUSHED);
152 
153 				if (m_command.has_checkversioncommand())
154 				{
155 					m_status.set_statustype(CMD_CLIENT_COMMAND_COMPLETED);
156 					m_status.mutable_checkversionstatus()->set_serverversion(SHARED_MEMORY_MAGIC_NUMBER);
157 				}
158 				else
159 				{
160 					cmdPtr = convertGRPCToBulletCommand(m_command, cmd);
161 
162 					if (cmdPtr)
163 					{
164 						bool hasStatus = m_comProc->processCommand(*cmdPtr, serverStatus, &buffer[0], buffer.size());
165 
166 						double timeOutInSeconds = 10;
167 						b3Clock clock;
168 						double startTimeSeconds = clock.getTimeInSeconds();
169 						double curTimeSeconds = clock.getTimeInSeconds();
170 
171 						while ((!hasStatus) && ((curTimeSeconds - startTimeSeconds) < timeOutInSeconds))
172 						{
173 							hasStatus = m_comProc->receiveStatus(serverStatus, &buffer[0], buffer.size());
174 							curTimeSeconds = clock.getTimeInSeconds();
175 						}
176 
177 						if (gVerboseNetworkMessagesServer4)
178 						{
179 							//printf("buffer.size = %d\n", buffer.size());
180 							printf("serverStatus.m_numDataStreamBytes = %d\n", serverStatus.m_numDataStreamBytes);
181 						}
182 						if (hasStatus)
183 						{
184 							b3AlignedObjectArray<unsigned char> packetData;
185 							unsigned char* statBytes = (unsigned char*)&serverStatus;
186 
187 							convertStatusToGRPC(serverStatus, &buffer[0], buffer.size(), m_status);
188 						}
189 					}
190 
191 					if (m_command.has_terminateservercommand())
192 					{
193 						status_ = TERMINATE;
194 					}
195 				}
196 
197 				// And we are done! Let the gRPC runtime know we've finished, using the
198 				// memory address of this instance as the uniquely identifying tag for
199 				// the event.
200 
201 				responder_.Finish(m_status, Status::OK, this);
202 			}
203 			else
204 			{
205 				GPR_ASSERT(status_ == FINISH);
206 				// Once in the FINISH state, deallocate ourselves (CallData).
207 				CallData::CallStatus tmpStatus = status_;
208 				delete this;
209 				return tmpStatus;
210 			}
211 			return status_;
212 		}
213 
214 	private:
215 		// The means of communication with the gRPC runtime for an asynchronous
216 		// server.
217 		PyBulletAPI::AsyncService* service_;
218 		// The producer-consumer queue where for asynchronous server notifications.
219 		ServerCompletionQueue* cq_;
220 		// Context for the rpc, allowing to tweak aspects of it such as the use
221 		// of compression, authentication, as well as to send metadata back to the
222 		// client.
223 		ServerContext ctx_;
224 
225 		// What we get from the client.
226 		PyBulletCommand m_command;
227 		// What we send back to the client.
228 		PyBulletStatus m_status;
229 
230 		// The means to get back to the client.
231 		ServerAsyncResponseWriter<PyBulletStatus> responder_;
232 
233 		// Let's implement a tiny state machine with the following states.
234 
235 		CallStatus status_;  // The current serving state.
236 
237 		bool m_finished;
238 
239 		PhysicsCommandProcessorInterface* m_comProc;  //physics server command processor
240 	};
241 
242 	// This can be run in multiple threads if needed.
InitRpcs(PhysicsCommandProcessorInterface * comProc)243 	void InitRpcs(PhysicsCommandProcessorInterface* comProc)
244 	{
245 		// Spawn a new CallData instance to serve new clients.
246 		new CallData(&service_, cq_.get(), comProc);
247 	}
248 
249 	ServerBuilder m_builder;
250 	std::unique_ptr<ServerCompletionQueue> cq_;
251 	PyBulletAPI::AsyncService service_;
252 	std::unique_ptr<Server> server_;
253 
254 	// Mutex to protect access to the request queue variables (m_requestQueue,
255 	// m_requestThread, m_requestThreadCancelled).
256 	std::mutex m_queueMutex;
257 
258 	// List of outstanding request tags.
259 	std::list<void*> m_requestQueue;
260 
261 	// Whether or not the gathering thread is cancelled.
262 	bool m_requestThreadCancelled;
263 
264 	// Thread to gather requests from the completion queue.
265 	std::thread* m_requestThread;
266 
GatherRequests()267 	void GatherRequests() {
268 		void* tag;  // uniquely identifies a request.
269 		bool ok;
270 
271 		while(!m_requestThreadCancelled) {
272 			// Block waiting to read the next event from the completion queue. The
273 			// event is uniquely identified by its tag, which in this case is the
274 			// memory address of a CallData instance.
275 			// The return value of Next should always be checked. This return value
276 			// tells us whether there is any kind of event or cq_ is shutting down.
277 			grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
278 			if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
279 			{
280 				GPR_ASSERT(ok);
281 				std::lock_guard<std::mutex> guard(m_queueMutex);
282 				m_requestQueue.push_back(tag);
283 			}
284 		}
285 	}
286 };
287 
288 struct grpcMyClass
289 {
290 	int m_testData;
291 
292 	ServerImpl m_grpcServer;
293 	bool m_grpcInitialized;
294 	bool m_grpcTerminated;
295 
grpcMyClassgrpcMyClass296 	grpcMyClass()
297 		: m_testData(42),
298 		  m_grpcInitialized(false),
299 		  m_grpcTerminated(false)
300 	{
301 	}
~grpcMyClassgrpcMyClass302 	virtual ~grpcMyClass()
303 	{
304 	}
305 };
306 
initPlugin_grpcPlugin(struct b3PluginContext * context)307 B3_SHARED_API int initPlugin_grpcPlugin(struct b3PluginContext* context)
308 {
309 	grpcMyClass* obj = new grpcMyClass();
310 	context->m_userPointer = obj;
311 
312 	return SHARED_MEMORY_MAGIC_NUMBER;
313 }
314 
preTickPluginCallback_grpcPlugin(struct b3PluginContext * context)315 B3_SHARED_API int preTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
316 {
317 	//process grpc server messages
318 	return 0;
319 }
320 
processClientCommands_grpcPlugin(struct b3PluginContext * context)321 B3_SHARED_API int processClientCommands_grpcPlugin(struct b3PluginContext* context)
322 {
323 	grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
324 
325 	if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
326 	{
327 		obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
328 	}
329 
330 	obj->m_testData++;
331 	return 0;
332 }
333 
postTickPluginCallback_grpcPlugin(struct b3PluginContext * context)334 B3_SHARED_API int postTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
335 {
336 	grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
337 	obj->m_testData++;
338 	return 0;
339 }
340 
executePluginCommand_grpcPlugin(struct b3PluginContext * context,const struct b3PluginArguments * arguments)341 B3_SHARED_API int executePluginCommand_grpcPlugin(struct b3PluginContext* context, const struct b3PluginArguments* arguments)
342 {
343 	///3 cases:
344 	/// 1: send a non-empty string to start the GRPC server
345 	/// 2: send some integer n, to call n times to HandleSingleRpc
346 	/// 3: send nothing to terminate the GRPC server
347 
348 	grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
349 
350 	if (strlen(arguments->m_text))
351 	{
352 		if (!obj->m_grpcInitialized && context->m_rpcCommandProcessorInterface)
353 		{
354 			obj->m_grpcServer.Init(context->m_rpcCommandProcessorInterface, arguments->m_text);
355 		}
356 		obj->m_grpcInitialized = true;
357 	}
358 	else
359 	{
360 		if (arguments->m_numInts > 0)
361 		{
362 			for (int i = 0; i < arguments->m_ints[0]; i++)
363 			{
364 				if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
365 				{
366 					obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
367 				}
368 			}
369 		}
370 		else
371 		{
372 			obj->m_grpcServer.Exit();
373 			obj->m_grpcInitialized = false;
374 		}
375 	}
376 
377 	return 0;
378 }
379 
exitPlugin_grpcPlugin(struct b3PluginContext * context)380 B3_SHARED_API void exitPlugin_grpcPlugin(struct b3PluginContext* context)
381 {
382 	grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
383 	obj->m_grpcServer.Exit();
384 	delete obj;
385 	context->m_userPointer = 0;
386 }
387