1 /*=========================================================================
2
3 Program: Visualization Toolkit
4
5 Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
6 All rights reserved.
7 See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
8
9 This software is distributed WITHOUT ANY WARRANTY; without even
10 the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
11 PURPOSE. See the above copyright notice for more information.
12
13 =========================================================================*/
14
15 #include "vtkVRCollaborationClient.h"
16
17 #include "vtkCallbackCommand.h"
18 #include "vtkCamera.h"
19 #include "vtkObjectFactory.h"
20 #include "vtkOpenGLAvatar.h"
21 #include "vtkOpenGLRenderer.h"
22 #include "vtkProperty.h"
23 #include "vtkRenderWindowInteractor.h"
24 #include "vtkTextProperty.h"
25 #include "vtkTimerLog.h"
26 #include "vtkTransform.h"
27 #include "vtkVRModel.h"
28 #include "vtkVRRenderWindow.h"
29
30 #include <sstream>
31 #include <zmq.h>
32 namespace
33 {
34 const double RAY_LENGTH = 200.0; // in meters
35 const double AVATAR_TIMEOUT = 10.0; // in seconds
36 const int HEARTBEAT_INTERVAL = 1.0; // in seconds
37 const int LIVE_COUNT = 3;
38
39 // http://colorbrewer2.org/#type=qualitative&scheme=Pastel1&n=9
40 double AVATAR_COLORS[][3] = {
41 { 179 / 255.0, 205 / 255.0, 227 / 255.0 },
42 { 204 / 255.0, 235 / 255.0, 197 / 255.0 },
43 { 222 / 255.0, 203 / 255.0, 228 / 255.0 },
44 { 254 / 255.0, 217 / 255.0, 166 / 255.0 },
45 { 255 / 255.0, 255 / 255.0, 204 / 255.0 },
46 { 229 / 255.0, 216 / 255.0, 189 / 255.0 },
47 { 253 / 255.0, 218 / 255.0, 236 / 255.0 },
48 { 242 / 255.0, 242 / 255.0, 242 / 255.0 },
49 { 251 / 255.0, 180 / 255.0, 174 / 255.0 },
50 };
51
52 const int NUM_COLORS = sizeof(AVATAR_COLORS) / sizeof(AVATAR_COLORS[0]);
53
54 // two local helper functions for libzmq
_zmq_string_recv(void * socket,int flags)55 std::string _zmq_string_recv(void* socket, int flags)
56 {
57 zmq_msg_t msg;
58 int rc = zmq_msg_init(&msg);
59 assert(rc == 0);
60 rc = zmq_msg_recv(&msg, socket, flags);
61 assert(rc != -1);
62 return std::string(static_cast<const char*>(zmq_msg_data(&msg)), zmq_msg_size(&msg));
63 }
64
65 // Receives all remaining message parts from socket, does nothing.
_zmq_sock_clear(void * socket)66 void _zmq_sock_clear(void* socket)
67 {
68 int more;
69 size_t more_size = sizeof(more);
70 zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
71 while (more)
72 {
73 // Process all parts of the message
74 zmq_msg_t part;
75 int rc = zmq_msg_init(&part);
76 assert(rc == 0);
77 /* Block until a message is available to be received from socket */
78 rc = zmq_msg_recv(&part, socket, 0);
79 assert(rc != -1);
80
81 zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
82 }
83 }
84
85 } // end anon namespace
86
87 // simple macro to package up arguments to call Log()
88 #define mvLog(verbosity, x) \
89 std::ostringstream ss; \
90 ss << x; \
91 this->Log(verbosity, ss.str());
92
93 // PIMPL to keep zeromq out of the class interface
94 class vtkVRCollaborationClientInternal
95 {
96 public:
97 void* Context;
98 void* Requester;
99 void* Subscriber;
100 zmq_pollitem_t CollabPollItems[2];
101
vtkVRCollaborationClientInternal()102 vtkVRCollaborationClientInternal()
103 : CollabPollItems{ { nullptr, 0, ZMQ_POLLIN, 0 }, { nullptr, 0, ZMQ_POLLIN, 0 } }
104 {
105 // ceate context
106 this->Context = zmq_ctx_new();
107 assert(this->Context);
108 int rc = zmq_ctx_set(this->Context, ZMQ_IO_THREADS, 1);
109 assert(rc == 0);
110 rc = zmq_ctx_set(this->Context, ZMQ_MAX_SOCKETS, ZMQ_MAX_SOCKETS_DFLT);
111 assert(rc == 0);
112
113 this->Requester = zmq_socket(this->Context, ZMQ_DEALER);
114 this->Subscriber = zmq_socket(this->Context, ZMQ_SUB);
115
116 this->CollabPollItems[0].socket = this->Requester;
117 this->CollabPollItems[1].socket = this->Subscriber;
118 }
119
~vtkVRCollaborationClientInternal()120 ~vtkVRCollaborationClientInternal()
121 {
122 if (this->Requester)
123 {
124 zmq_close(this->Requester);
125 }
126 if (this->Subscriber)
127 {
128 zmq_close(this->Subscriber);
129 }
130 int rc;
131 do
132 {
133 rc = zmq_ctx_term(this->Context);
134 } while (rc == -1 && errno == EINTR);
135 }
136 };
137
138 vtkStandardNewMacro(vtkVRCollaborationClient);
139
vtkVRCollaborationClient()140 vtkVRCollaborationClient::vtkVRCollaborationClient()
141 : Connected(false)
142 , DisplayOwnAvatar(false)
143 , MoveObserver(-1)
144 , Callback(nullptr)
145 , YourLastAvatarUpdateTime(0.0)
146 {
147 this->Internal = new vtkVRCollaborationClientInternal();
148
149 this->CollabPort = 5555;
150 // Position MineView Zeromq, default when none is specified.
151 this->CollabSession = "PMVZ";
152 this->RetryCount = 1; // start in retry state.
153 this->NeedHeartbeat = 0;
154 this->NeedReply = 0;
155 this->PublishAvailable = false; // publish socket not sending yet.
156
157 this->EventCommand = vtkCallbackCommand::New();
158 this->EventCommand->SetClientData(this);
159 this->EventCommand->SetCallback(vtkVRCollaborationClient::EventCallback);
160
161 // setup default scale callback
162 this->ScaleCallback = [this]() {
163 auto ovrrw = vtkVRRenderWindow::SafeDownCast(this->RenderWindow);
164 return ovrrw ? ovrrw->GetPhysicalScale() : 1.0;
165 };
166 }
167
~vtkVRCollaborationClient()168 vtkVRCollaborationClient::~vtkVRCollaborationClient()
169 {
170 this->Disconnect();
171 this->EventCommand->Delete();
172 delete this->Internal;
173 }
174
Log(vtkLogger::Verbosity verbosity,std::string const & msg)175 void vtkVRCollaborationClient::Log(vtkLogger::Verbosity verbosity, std::string const& msg)
176 {
177 if (this->Callback)
178 {
179 this->Callback(msg, verbosity);
180 }
181 else
182 {
183 vtkWarningMacro(<< msg);
184 }
185 }
186
Disconnect()187 void vtkVRCollaborationClient::Disconnect()
188 {
189 if (!this->Connected)
190 {
191 return;
192 }
193
194 mvLog(vtkLogger::VERBOSITY_INFO, "Collab server disconnecting. " << std::endl);
195
196 if (this->Internal->Requester != nullptr)
197 {
198 zmq_close(this->Internal->Requester);
199 }
200 if (this->Internal->Subscriber != nullptr)
201 {
202 zmq_close(this->Internal->Subscriber);
203 }
204 for (auto it : this->AvatarUpdateTime)
205 {
206 this->Renderer->RemoveActor(this->Avatars[it.first]);
207 this->Avatars.erase(it.first);
208 }
209 this->AvatarUpdateTime.clear();
210
211 if (this->MoveObserver >= 0 && this->RenderWindow && this->RenderWindow->GetInteractor())
212 {
213 this->RenderWindow->GetInteractor()->RemoveObserver(this->MoveObserver);
214 this->MoveObserver = -1;
215 }
216 this->Connected = false;
217 this->CollabID.clear();
218 }
219
AddArguments(vtksys::CommandLineArguments & arguments)220 void vtkVRCollaborationClient::AddArguments(vtksys::CommandLineArguments& arguments)
221 {
222 typedef vtksys::CommandLineArguments argT;
223
224 arguments.AddArgument("--collab-server", argT::EQUAL_ARGUMENT, &(this->CollabHost),
225 "(optional) Connect to collaboration server at this hostname");
226 arguments.AddArgument("--collab-port", argT::EQUAL_ARGUMENT, &(this->CollabPort),
227 "(default:5555) Connect to collaboration server at this port");
228 arguments.AddArgument("--collab-session", argT::EQUAL_ARGUMENT, &(this->CollabSession),
229 "Connect to a separate collaboration session - each collaborator should use a matching value");
230 arguments.AddArgument("--collab-name", argT::EQUAL_ARGUMENT, &(this->CollabName),
231 "Name to display over your avatar to other collaborators");
232 this->DisplayOwnAvatar = false;
233 arguments.AddBooleanArgument("--show-my-avatar", &this->DisplayOwnAvatar,
234 "(default false) Show an avatar at my own position.");
235 }
236
Render()237 void vtkVRCollaborationClient::Render()
238 {
239 if (this->Connected)
240 {
241 // if windowed update avatar position based on camera pos
242 if (this->MoveObserver == -1)
243 {
244 this->UpdateAvatarPoseFromCamera();
245 }
246 this->HandleCollabMessage();
247 this->EraseIdleAvatars();
248 }
249 }
250
UpdateAvatarPoseFromCamera()251 void vtkVRCollaborationClient::UpdateAvatarPoseFromCamera()
252 {
253 // act like a Move3D event for the head
254 int idevice = static_cast<int>(vtkEventDataDevice::HeadMountedDisplay);
255 double* pos = this->DevicePoses[idevice].Position.data();
256 double* orient = this->DevicePoses[idevice].Orientation.data();
257 this->HasPoseForDevice[idevice] = true;
258
259 this->Renderer->GetActiveCamera()->GetPosition(pos);
260 auto cori = this->Renderer->GetActiveCamera()->GetOrientationWXYZ();
261
262 // currently have a mismatch between wxyz and euler angles. Convert.
263 this->TempTransform->Identity();
264 this->TempTransform->RotateWXYZ(-cori[0], &cori[1]);
265 // angles need to be rotated 90
266 this->TempTransform->RotateY(90);
267 this->TempTransform->GetOrientation(orient);
268
269 this->SendLatestDevicePoses();
270 }
271
SendLatestDevicePoses()272 void vtkVRCollaborationClient::SendLatestDevicePoses()
273 {
274 // don't send a message if we haven't gotten one during the last
275 // heartbeat. View messages, however, are always sent (queued).
276 if (this->RetryCount > 0)
277 {
278 return;
279 }
280
281 // throttle avatar pose updates
282 double currentTime = vtkTimerLog::GetUniversalTime();
283 if (currentTime - this->YourLastAvatarUpdateTime > 0.02)
284 {
285 // package up the device pose messages
286 std::vector<int32_t> devices;
287 std::vector<double> poses;
288 bool haveHead = false;
289 for (int i = 0; i < vtkEventDataNumberOfDevices; ++i)
290 {
291 if (this->HasPoseForDevice[i])
292 {
293 if (i == static_cast<int>(vtkEventDataDevice::HeadMountedDisplay))
294 {
295 haveHead = true;
296 }
297 devices.push_back(i);
298 poses.insert(
299 poses.end(), this->DevicePoses[i].Position.begin(), this->DevicePoses[i].Position.end());
300 poses.insert(poses.end(), this->DevicePoses[i].Orientation.begin(),
301 this->DevicePoses[i].Orientation.end());
302 }
303 this->HasPoseForDevice[i] = false;
304 }
305
306 // if no data ignore
307 // Don't send hand messages without head data
308 if (devices.size() == 0 || !haveHead)
309 {
310 return;
311 }
312
313 double scale = this->ScaleCallback();
314
315 std::vector<vtkVRCollaborationClient::Argument> args;
316 args.resize(3);
317 args[0].SetInt32Vector(devices.data(), static_cast<int32_t>(devices.size()));
318 args[1].SetDoubleVector(poses.data(), static_cast<int32_t>(poses.size()));
319 args[2].SetDouble(scale);
320 this->YourLastAvatarUpdateTime = currentTime;
321 this->SendAMessage("A", args);
322 }
323 }
324
SendAMessage(std::string const & msgType,std::vector<Argument> const & args)325 void vtkVRCollaborationClient::SendAMessage(
326 std::string const& msgType, std::vector<Argument> const& args)
327 {
328 if (this->CollabID.empty())
329 {
330 return;
331 }
332
333 // send header, our ID, session.
334 zmq_send_const(this->Internal->Requester, "PMVZ", 4, ZMQ_SNDMORE);
335 zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), ZMQ_SNDMORE);
336 zmq_send(this->Internal->Requester, this->CollabSession.c_str(), this->CollabSession.size(),
337 ZMQ_SNDMORE);
338 zmq_send(this->Internal->Requester, msgType.c_str(), msgType.size(), ZMQ_SNDMORE);
339
340 // send the number of arguments
341 uint16_t numArgs = static_cast<uint16_t>(args.size());
342 zmq_send(this->Internal->Requester, &numArgs, sizeof(numArgs), ZMQ_SNDMORE);
343
344 // now send the arguments
345 for (int i = 0; i < numArgs; ++i)
346 {
347 auto& arg = args[i];
348
349 // send the arg type
350 uint16_t type = static_cast<uint16_t>(arg.Type);
351 zmq_send(this->Internal->Requester, &type, sizeof(type), ZMQ_SNDMORE);
352
353 // send the arg count (how many in the vector)
354 uint16_t count = static_cast<uint16_t>(arg.Count);
355 zmq_send(this->Internal->Requester, &count, sizeof(count), ZMQ_SNDMORE);
356
357 // finally send the data
358 switch (arg.Type)
359 {
360 case Double:
361 {
362 zmq_send(this->Internal->Requester, arg.Data.get(), sizeof(double) * arg.Count,
363 (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
364 break;
365 }
366 case Int32:
367 {
368 zmq_send(this->Internal->Requester, arg.Data.get(), sizeof(int32_t) * arg.Count,
369 (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
370 break;
371 }
372 case String:
373 {
374 zmq_send(this->Internal->Requester, arg.Data.get(), arg.Count,
375 (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
376 break;
377 }
378 }
379 }
380 }
381
GetString(std::string & result)382 bool vtkVRCollaborationClient::Argument::GetString(std::string& result)
383 {
384 if (this->Type != String || !this->Data)
385 {
386 return false;
387 }
388
389 char* cstr = static_cast<char*>(this->Data.get());
390 // make sure it is terminated
391 cstr[this->Count - 1] = 0;
392 result = std::string(cstr);
393 return true;
394 }
395
SetString(std::string const & in)396 void vtkVRCollaborationClient::Argument::SetString(std::string const& in)
397 {
398 this->Type = String;
399 this->Count = static_cast<uint16_t>(in.size() + 1);
400 char* cdata = static_cast<char*>(malloc(in.size() + 1));
401 this->Data = std::shared_ptr<void>(cdata, free);
402 std::copy(in.begin(), in.end(), cdata);
403 cdata[in.size()] = 0;
404 }
405
GetStringVector(std::vector<std::string> & result)406 bool vtkVRCollaborationClient::Argument::GetStringVector(std::vector<std::string>& result)
407 {
408 if (this->Type != String || !this->Data)
409 {
410 return false;
411 }
412
413 char* cstr = static_cast<char*>(this->Data.get());
414 // make sure it is terminated
415 cstr[this->Count - 1] = 0;
416
417 size_t pos = 0;
418 while (pos < this->Count)
419 {
420 std::string tmp = std::string(cstr + pos);
421 result.push_back(tmp);
422 pos += tmp.size() + 1;
423 }
424
425 return true;
426 }
427
SetStringVector(std::vector<std::string> const & in)428 void vtkVRCollaborationClient::Argument::SetStringVector(std::vector<std::string> const& in)
429 {
430 this->Type = String;
431
432 size_t byteCount = 0;
433 for (size_t i = 0; i < in.size(); i++)
434 {
435 byteCount += in[i].size();
436 byteCount++;
437 }
438 this->Count = static_cast<uint16_t>(byteCount);
439
440 char* cdata = static_cast<char*>(malloc(byteCount));
441 this->Data = std::shared_ptr<void>(cdata, free);
442
443 char* cptr = cdata;
444 for (size_t i = 0; i < in.size(); i++)
445 {
446 std::copy(in[i].begin(), in[i].end(), cptr);
447 cptr += in[i].size();
448 *cptr = 0;
449 cptr++;
450 }
451 }
452
GetDoubleVector(std::vector<double> & result)453 bool vtkVRCollaborationClient::Argument::GetDoubleVector(std::vector<double>& result)
454 {
455 if (this->Type != Double || !this->Data)
456 {
457 return false;
458 }
459
460 result.resize(this->Count);
461 double* dptr = static_cast<double*>(this->Data.get());
462 std::copy(dptr, dptr + this->Count, result.begin());
463 return true;
464 }
465
SetDoubleVector(double const * in,uint16_t size)466 void vtkVRCollaborationClient::Argument::SetDoubleVector(double const* in, uint16_t size)
467 {
468 this->Type = Double;
469 this->Count = size;
470 double* cdata = static_cast<double*>(malloc(sizeof(double) * size));
471 this->Data = std::shared_ptr<void>(cdata, free);
472 std::copy(in, in + size, cdata);
473 }
474
SetDouble(double in)475 void vtkVRCollaborationClient::Argument::SetDouble(double in)
476 {
477 this->Type = Double;
478 this->Count = 1;
479 double* cdata = static_cast<double*>(malloc(sizeof(double)));
480 cdata[0] = in;
481 this->Data = std::shared_ptr<void>(cdata, free);
482 }
483
GetDouble(double & result)484 bool vtkVRCollaborationClient::Argument::GetDouble(double& result)
485 {
486 if (this->Type != Double || !this->Data || this->Count != 1)
487 {
488 return false;
489 }
490
491 double* dptr = static_cast<double*>(this->Data.get());
492 result = *dptr;
493 return true;
494 }
495
GetInt32Vector(std::vector<int32_t> & result)496 bool vtkVRCollaborationClient::Argument::GetInt32Vector(std::vector<int32_t>& result)
497 {
498 if (this->Type != Int32 || !this->Data)
499 {
500 return false;
501 }
502
503 result.resize(this->Count);
504 int32_t* dptr = static_cast<int32_t*>(this->Data.get());
505 std::copy(dptr, dptr + this->Count, result.begin());
506 return true;
507 }
508
SetInt32Vector(int32_t const * in,uint16_t size)509 void vtkVRCollaborationClient::Argument::SetInt32Vector(int32_t const* in, uint16_t size)
510 {
511 this->Type = Int32;
512 this->Count = size;
513 int32_t* cdata = static_cast<int32_t*>(malloc(sizeof(int32_t) * size));
514 this->Data = std::shared_ptr<void>(cdata, free);
515 std::copy(in, in + size, cdata);
516 }
517
SetInt32(int32_t in)518 void vtkVRCollaborationClient::Argument::SetInt32(int32_t in)
519 {
520 this->Type = Int32;
521 this->Count = 1;
522 int32_t* cdata = static_cast<int32_t*>(malloc(sizeof(int32_t)));
523 cdata[0] = in;
524 this->Data = std::shared_ptr<void>(cdata, free);
525 }
526
GetInt32(int32_t & result)527 bool vtkVRCollaborationClient::Argument::GetInt32(int32_t& result)
528 {
529 if (this->Type != Int32 || !this->Data || this->Count != 1)
530 {
531 return false;
532 }
533
534 int32_t* dptr = static_cast<int32_t*>(this->Data.get());
535 result = *dptr;
536 return true;
537 }
538
GetMessageArguments()539 std::vector<vtkVRCollaborationClient::Argument> vtkVRCollaborationClient::GetMessageArguments()
540 {
541 std::vector<Argument> result;
542
543 uint16_t numArgs = 0;
544 zmq_recv(this->Internal->Subscriber, &numArgs, sizeof(numArgs), 0);
545
546 result.resize(numArgs);
547
548 for (int i = 0; i < numArgs; ++i)
549 {
550 Argument& arg = result[i];
551
552 // get the arg type
553 uint16_t argType = Double;
554 zmq_recv(this->Internal->Subscriber, &argType, sizeof(argType), 0);
555 arg.Type = static_cast<ArgumentType>(argType);
556
557 // get the arg count
558 uint16_t argCount = 0;
559 zmq_recv(this->Internal->Subscriber, &argCount, sizeof(argCount), 0);
560 arg.Count = argCount;
561
562 switch (arg.Type)
563 {
564 case Double:
565 {
566 arg.Data = std::shared_ptr<void>(malloc(sizeof(double) * arg.Count), free);
567 auto zresult =
568 zmq_recv(this->Internal->Subscriber, arg.Data.get(), sizeof(double) * arg.Count, 0);
569 if (zresult != sizeof(double) * arg.Count)
570 {
571 vtkErrorMacro("failed to get valid argument");
572 }
573 break;
574 }
575 case Int32:
576 {
577 arg.Data = std::shared_ptr<void>(malloc(sizeof(int32_t) * arg.Count), free);
578 auto zresult =
579 zmq_recv(this->Internal->Subscriber, arg.Data.get(), sizeof(int32_t) * arg.Count, 0);
580 if (zresult != sizeof(int32_t) * arg.Count)
581 {
582 vtkErrorMacro("failed to get valid argument");
583 }
584 break;
585 }
586 case String:
587 {
588 arg.Data = std::shared_ptr<void>(malloc(arg.Count), free);
589 auto zresult = zmq_recv(this->Internal->Subscriber, arg.Data.get(), arg.Count, 0);
590 if (zresult != arg.Count)
591 {
592 vtkErrorMacro("failed to get valid argument");
593 }
594 break;
595 }
596 }
597 }
598
599 return result;
600 }
601
SendAMessage(std::string const & msgType)602 void vtkVRCollaborationClient::SendAMessage(std::string const& msgType)
603 {
604 if (this->CollabID.empty())
605 {
606 return;
607 }
608 // send header, our ID, session.
609 zmq_send_const(this->Internal->Requester, "PMVZ", 4, ZMQ_SNDMORE);
610 zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), ZMQ_SNDMORE);
611 zmq_send(this->Internal->Requester, this->CollabSession.c_str(), this->CollabSession.size(),
612 ZMQ_SNDMORE);
613 zmq_send(this->Internal->Requester, msgType.c_str(), msgType.size(), 0);
614 }
615
SendPoseMessage(std::string const & msgType,int index,double pos[3],double dir[3])616 void vtkVRCollaborationClient::SendPoseMessage(
617 std::string const& msgType, int index, double pos[3], double dir[3])
618 {
619 std::vector<vtkVRCollaborationClient::Argument> args;
620 args.resize(3);
621 args[0].SetInt32(index);
622 args[1].SetDoubleVector(pos, 3);
623 args[2].SetDoubleVector(dir, 3);
624 this->SendAMessage(msgType, args);
625 }
626
HandleBroadcastMessage(std::string const & otherID,std::string const & type)627 void vtkVRCollaborationClient::HandleBroadcastMessage(
628 std::string const& otherID, std::string const& type)
629 {
630 if (type == "A")
631 {
632 std::vector<Argument> args = this->GetMessageArguments();
633
634 std::vector<double> poses;
635 std::vector<int32_t> devices;
636 double ascale = 1.0;
637 if (args.size() != 3 || !args[0].GetInt32Vector(devices) || !args[1].GetDoubleVector(poses) ||
638 !args[2].GetDouble(ascale))
639 {
640 mvLog(vtkLogger::VERBOSITY_ERROR,
641 "Incorrect arguments for A (avatar pose) collaboration message" << std::endl);
642 return;
643 }
644
645 // if this update is from us, we ignore it by default.
646 if (otherID != this->CollabID || this->DisplayOwnAvatar)
647 {
648 double scale = this->ScaleCallback();
649 auto avatar = this->GetAvatar(otherID);
650 avatar->SetScale(0.3 * scale);
651
652 bool haveLeft = false;
653 bool haveRight = false;
654 for (size_t i = 0; i < devices.size(); ++i)
655 {
656 vtkEventDataDevice device = static_cast<vtkEventDataDevice>(devices[i]);
657
658 double* updatePos = poses.data() + i * 7;
659 double* updateOrient = poses.data() + i * 7 + 3;
660
661 if (device == vtkEventDataDevice::LeftController)
662 {
663 avatar->SetLeftHandPosition(updatePos);
664 avatar->SetLeftHandOrientation(updateOrient);
665 if (!avatar->GetUseLeftHand())
666 {
667 avatar->UseLeftHandOn();
668 }
669 haveLeft = true;
670 }
671 else if (device == vtkEventDataDevice::RightController)
672 {
673 avatar->SetRightHandPosition(updatePos);
674 avatar->SetRightHandOrientation(updateOrient);
675 if (!avatar->GetUseRightHand())
676 {
677 avatar->UseRightHandOn();
678 }
679 haveRight = true;
680 }
681 else if (device == vtkEventDataDevice::HeadMountedDisplay)
682 {
683 avatar->SetHeadPosition(updatePos);
684 avatar->SetHeadOrientation(updateOrient);
685 }
686 this->AvatarUpdateTime[otherID][static_cast<int>(device)] = vtkTimerLog::GetUniversalTime();
687 }
688
689 // adjust hand positions based on sending avatar scale
690 double adjustment = scale / ascale;
691 auto* headPos = avatar->GetHeadPosition();
692 if (haveRight)
693 {
694 auto* handPos = avatar->GetRightHandPosition();
695 avatar->SetRightHandPosition(headPos[0] + adjustment * (handPos[0] - headPos[0]),
696 headPos[1] + adjustment * (handPos[1] - headPos[1]),
697 headPos[2] + adjustment * (handPos[2] - headPos[2]));
698 }
699 if (haveLeft)
700 {
701 auto* handPos = avatar->GetLeftHandPosition();
702 avatar->SetLeftHandPosition(headPos[0] + adjustment * (handPos[0] - headPos[0]),
703 headPos[1] + adjustment * (handPos[1] - headPos[1]),
704 headPos[2] + adjustment * (handPos[2] - headPos[2]));
705 }
706 }
707
708 // Check if we were idle, and re-send join messages.
709 if (otherID == this->CollabID && this->AvatarIdle(this->CollabID))
710 {
711 mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << " return from idle " << std::endl);
712
713 std::vector<vtkVRCollaborationClient::Argument> args2;
714 args2.resize(1);
715 args2[0].SetString(this->CollabID);
716 this->SendAMessage("J", args2);
717 }
718 }
719 else if (type == "J")
720 {
721 std::vector<Argument> args = this->GetMessageArguments();
722
723 std::string extraID;
724 if (args.size() != 1 || !args[0].GetString(extraID))
725 {
726 mvLog(vtkLogger::VERBOSITY_ERROR,
727 "Incorrect arguments for J (join) collaboration message" << std::endl);
728 return;
729 }
730
731 // Join message, send our list of views.
732 // if we are idle, don't respond to join messages - send a join when
733 // we are not idle anymore.
734 if (this->AvatarIdle(this->CollabID))
735 {
736 return;
737 }
738 mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << ", Join" << std::endl);
739 if (!this->CollabName.empty())
740 {
741 std::vector<vtkVRCollaborationClient::Argument> args2;
742 args2.resize(1);
743 args2[0].SetString(this->CollabName);
744 this->SendAMessage("N", args2);
745 }
746 }
747 else if (type == "SR" || type == "HR")
748 {
749 // show/hide a ray
750 std::vector<Argument> args = this->GetMessageArguments();
751 int32_t device;
752 if (args.size() != 1 || !args[0].GetInt32(device))
753 {
754 mvLog(vtkLogger::VERBOSITY_ERROR,
755 "Incorrect arguments for SR/HR (ray) collaboration message" << std::endl);
756 return;
757 }
758
759 bool show = (type == "SR");
760 if (this->Avatars.count(otherID) != 0)
761 {
762 auto avatar = this->GetAvatar(otherID);
763 if (device == static_cast<int>(vtkEventDataDevice::LeftController))
764 {
765 avatar->SetLeftShowRay(show);
766 }
767 else if (device == static_cast<int>(vtkEventDataDevice::RightController))
768 {
769 avatar->SetRightShowRay(show);
770 }
771 double scale = this->ScaleCallback();
772 avatar->SetRayLength(RAY_LENGTH * scale);
773 }
774 }
775 else if (type == "N")
776 {
777 std::vector<Argument> args = this->GetMessageArguments();
778 // Set avatar's name, displayed above head.
779 std::string avatarName;
780 if (args.size() != 1 || !args[0].GetString(avatarName))
781 {
782 mvLog(vtkLogger::VERBOSITY_ERROR,
783 "Incorrect arguments for N (name) collaboration message" << std::endl);
784 return;
785 }
786 mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << ", Name " << avatarName << std::endl);
787 if (!avatarName.empty() && otherID != this->CollabID)
788 {
789 this->GetAvatar(otherID)->SetLabel(avatarName.c_str());
790 }
791 }
792 }
793
GetAvatar(std::string otherID)794 vtkSmartPointer<vtkOpenGLAvatar> vtkVRCollaborationClient::GetAvatar(std::string otherID)
795 {
796 // if it's from a new collaborator, add an avatar
797 if (this->Avatars.count(otherID) == 0)
798 {
799 mvLog(vtkLogger::VERBOSITY_INFO, "Adding Avatar " << otherID << std::endl);
800 this->Avatars[otherID] = vtkSmartPointer<vtkOpenGLAvatar>::New();
801 auto newAvatar = this->Avatars[otherID];
802 this->Renderer->AddActor(newAvatar);
803 // meters -> ft conversion.
804 double scale = this->ScaleCallback();
805 newAvatar->SetScale(0.3 * scale);
806 newAvatar->SetUpVector(0, 0, 1);
807 size_t colorIndex = this->Avatars.size() - 1;
808 // base the color on the server's index of avatars.
809 try
810 {
811 colorIndex = std::stoi(otherID);
812 }
813 catch (...)
814 {
815 }
816 newAvatar->GetProperty()->SetColor(AVATAR_COLORS[(colorIndex) % NUM_COLORS]);
817 newAvatar->GetLabelTextProperty()->SetColor(AVATAR_COLORS[(colorIndex) % NUM_COLORS]);
818 newAvatar->GetLabelTextProperty()->SetFontSize(16);
819 if (otherID == this->CollabID)
820 {
821 // Display only the hands
822 newAvatar->SetShowHandsOnly(true);
823 auto ovrrw = vtkVRRenderWindow::SafeDownCast(this->RenderWindow);
824 if (ovrrw)
825 {
826 vtkVRModel* cmodel = ovrrw->GetTrackedDeviceModel(vtkEventDataDevice::LeftController);
827 if (cmodel)
828 {
829 cmodel->SetVisibility(false);
830 }
831 cmodel = ovrrw->GetTrackedDeviceModel(vtkEventDataDevice::RightController);
832 if (cmodel)
833 {
834 cmodel->SetVisibility(false);
835 }
836 }
837 }
838 for (int i = 0; i < vtkEventDataNumberOfDevices; ++i)
839 {
840 this->AvatarUpdateTime[otherID][i] = 0;
841 }
842 }
843 return this->Avatars[otherID];
844 }
845
HandleCollabMessage()846 void vtkVRCollaborationClient::HandleCollabMessage()
847 {
848 double currTime = vtkTimerLog::GetUniversalTime();
849 bool receivedMsg = true;
850 do
851 {
852 // timeout is 0, return immediately.
853 zmq_poll(&(this->Internal->CollabPollItems)[0], 2, 0);
854 if (this->Internal->CollabPollItems[0].revents & ZMQ_POLLIN)
855 {
856 // reply on the request-reply (dealer) socket - expect ID or error.
857 std::string reply = _zmq_string_recv(this->Internal->Requester, ZMQ_DONTWAIT);
858 if (reply == "ERROR")
859 {
860 mvLog(vtkLogger::VERBOSITY_ERROR, "Collab server returned error " << std::endl);
861 }
862 else if (reply == "pong")
863 {
864 // update server alive time, below.
865 }
866 else if (reply.empty())
867 {
868 // error, do nothing.
869 mvLog(vtkLogger::VERBOSITY_ERROR, "Error: empty reply " << std::endl);
870 }
871 else
872 {
873 this->CollabID = reply;
874 mvLog(vtkLogger::VERBOSITY_INFO, "Received ID " << this->CollabID << std::endl);
875 this->RetryCount = 0;
876 // ideally send "J" join message here, but pub-sub not ready yet.
877 }
878 }
879
880 // handle broadcast messages
881 //
882 // A - avatar position update
883 // J - New client joined message
884 // N - Client name
885 // SR/HR - show or hide a ray
886 // V - View change
887 // P - New TourStop
888 // VL - ViewList
889 //
890 if (this->Internal->CollabPollItems[1].revents & ZMQ_POLLIN)
891 {
892 std::string sig = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
893 if (sig.size())
894 {
895 // verify the signature
896 // we can get bad data, so make sure the first message contains the
897 // correct data before requesting other pieces (which could block
898 // and hang the app if the data was bad)
899 if (sig == this->CollabSession)
900 {
901 // the first sub-msg contains the session string for the subscription
902 // process other avatar updates
903 std::string otherID = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
904 std::string type = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
905 if (otherID.empty() || type.empty())
906 {
907 // error, ignore
908 mvLog(vtkLogger::VERBOSITY_ERROR, "empty ID or ID " << otherID << ", " << type);
909 _zmq_sock_clear(this->Internal->Subscriber);
910 continue;
911 }
912
913 this->HandleBroadcastMessage(otherID, type);
914 }
915 else
916 {
917 mvLog(vtkLogger::VERBOSITY_ERROR,
918 "Error: mismatched session header with signature of: " << sig);
919 _zmq_sock_clear(this->Internal->Subscriber);
920 }
921
922 // we got a message on the publish socket, see if this is the first one.
923 if (!this->PublishAvailable)
924 {
925 this->PublishAvailable = true;
926 // send join message, to trigger view setup.
927 std::vector<vtkVRCollaborationClient::Argument> args;
928 args.resize(1);
929 args[0].SetString(this->CollabID);
930 this->SendAMessage("J", args);
931 }
932 }
933 else
934 {
935 mvLog(vtkLogger::VERBOSITY_ERROR, "Error: empty session header");
936 _zmq_sock_clear(this->Internal->Subscriber);
937 continue;
938 }
939 }
940
941 receivedMsg = (this->Internal->CollabPollItems[0].revents & ZMQ_POLLIN ||
942 this->Internal->CollabPollItems[1].revents & ZMQ_POLLIN);
943 if (receivedMsg)
944 {
945 // got a message, reset heartbeat.
946 this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
947 this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT;
948 this->RetryCount = 0;
949 }
950 else if (currTime > this->NeedHeartbeat && !this->CollabID.empty())
951 {
952 // heartbeat only if we have an ID. send ping, expect pong
953 if (this->RetryCount == 0)
954 {
955 this->RetryCount = 1;
956 }
957 zmq_send_const(this->Internal->Requester, "ping", 4, ZMQ_SNDMORE);
958 zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), 0);
959 this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
960 }
961
962 // if heartbeat fails multiple times
963 if (currTime > this->NeedReply)
964 {
965 if (this->RetryCount > LIVE_COUNT)
966 {
967 this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT * this->RetryCount;
968 mvLog(vtkLogger::VERBOSITY_WARNING, "Collab server disconnected, waiting. " << std::endl);
969 }
970 else
971 {
972 mvLog(vtkLogger::VERBOSITY_WARNING,
973 "Collab server not responding, retry " << this->RetryCount << std::endl);
974 ++this->RetryCount;
975 // disconnect and reconnect sockets, clear ID
976 this->Initialize(this->Renderer);
977 }
978 }
979 } while (receivedMsg);
980 }
981
AvatarIdle(std::string id)982 bool vtkVRCollaborationClient::AvatarIdle(std::string id)
983 {
984 double currTime = vtkTimerLog::GetUniversalTime();
985 auto times = this->AvatarUpdateTime[id];
986
987 // if we've never received a head position message, the avatar isn't idle.
988 if (times[0] == 0)
989 {
990 return false;
991 }
992
993 double avatarTime = times[0];
994 // consider ourselves idle slightly before any collaborators do, avoiding races.
995 double timeout = id == this->CollabID ? 0.98 * AVATAR_TIMEOUT : AVATAR_TIMEOUT;
996 return (currTime - avatarTime > timeout);
997 }
998
EraseIdleAvatars()999 void vtkVRCollaborationClient::EraseIdleAvatars()
1000 {
1001 double currTime = vtkTimerLog::GetUniversalTime();
1002 for (auto it : this->AvatarUpdateTime)
1003 {
1004 if (it.second[0] == 0)
1005 {
1006 continue;
1007 }
1008 double avatarTime = it.second[0];
1009 if (currTime - avatarTime > AVATAR_TIMEOUT && it.first != this->CollabID &&
1010 this->Avatars.count(it.first) != 0)
1011 {
1012 mvLog(vtkLogger::VERBOSITY_INFO, "Removing Avatar: " << it.first << std::endl);
1013 this->Renderer->RemoveActor(this->Avatars[it.first]);
1014 this->Avatars.erase(it.first);
1015 this->AvatarUpdateTime.erase(it.first);
1016 // send join message, to trigger view setup.
1017 std::vector<vtkVRCollaborationClient::Argument> args;
1018 args.resize(1);
1019 args[0].SetString(this->CollabID);
1020 this->SendAMessage("J", args);
1021 break;
1022 }
1023
1024 if (this->Avatars.count(it.first) == 0)
1025 {
1026 continue;
1027 }
1028
1029 // see if the hands are idle, or not present at all.
1030 int device = static_cast<int>(vtkEventDataDevice::LeftController);
1031 if (currTime - it.second[device] > AVATAR_TIMEOUT)
1032 {
1033 auto currAvatar = this->Avatars[it.first];
1034 if (currAvatar->GetUseLeftHand())
1035 {
1036 currAvatar->UseLeftHandOff();
1037 }
1038 }
1039 device = static_cast<int>(vtkEventDataDevice::RightController);
1040 if (currTime - it.second[device] > AVATAR_TIMEOUT)
1041 {
1042 auto currAvatar = this->Avatars[it.first];
1043 if (currAvatar->GetUseRightHand())
1044 {
1045 currAvatar->UseRightHandOff();
1046 }
1047 }
1048 }
1049 }
1050
EventCallback(vtkObject *,unsigned long eventID,void * clientdata,void * calldata)1051 void vtkVRCollaborationClient::EventCallback(
1052 vtkObject*, unsigned long eventID, void* clientdata, void* calldata)
1053 {
1054 vtkVRCollaborationClient* self = static_cast<vtkVRCollaborationClient*>(clientdata);
1055
1056 if (eventID == vtkCommand::Move3DEvent)
1057 {
1058 vtkEventData* edata = static_cast<vtkEventData*>(calldata);
1059 vtkEventDataDevice3D* edd = edata->GetAsEventDataDevice3D();
1060 if (!edd)
1061 {
1062 return;
1063 }
1064
1065 auto device = edd->GetDevice();
1066 int idevice = static_cast<int>(device);
1067 if (device == vtkEventDataDevice::LeftController ||
1068 device == vtkEventDataDevice::RightController ||
1069 device == vtkEventDataDevice::HeadMountedDisplay)
1070 {
1071 double* pos = self->DevicePoses[idevice].Position.data();
1072 double* orient = self->DevicePoses[idevice].Orientation.data();
1073 edd->GetWorldPosition(pos);
1074 // empirically, the Oculus sometimes gives nonsense positions
1075 if (fabs(pos[0]) > 1e07)
1076 {
1077 return;
1078 }
1079 double wxyz[4] = { 0 };
1080 edd->GetWorldOrientation(wxyz);
1081
1082 // currently have a mismatch between wxyz and euler angles. Convert.
1083 self->TempTransform->Identity();
1084 self->TempTransform->RotateWXYZ(wxyz[0], &wxyz[1]);
1085 // angles need to be rotated 90
1086 self->TempTransform->RotateY(90);
1087 self->TempTransform->GetOrientation(orient);
1088
1089 // hands are also too far forward in x.
1090 if (device != vtkEventDataDevice::HeadMountedDisplay)
1091 {
1092 double adjust[3] = { -0.15, 0, 0 };
1093 self->TempTransform->TransformPoint(adjust, adjust);
1094 pos[0] += adjust[0];
1095 pos[1] += adjust[1];
1096 pos[2] += adjust[2];
1097 }
1098 self->HasPoseForDevice[idevice] = true;
1099 self->SendLatestDevicePoses();
1100 }
1101 return;
1102 }
1103 }
1104
1105 // disconnect if needed, then connect to server.
1106 // Retry count is set externally.
Initialize(vtkOpenGLRenderer * ren)1107 bool vtkVRCollaborationClient::Initialize(vtkOpenGLRenderer* ren)
1108 {
1109 if (!ren)
1110 {
1111 return false;
1112 }
1113
1114 this->Renderer = ren;
1115 this->RenderWindow = static_cast<vtkOpenGLRenderWindow*>(ren->GetVTKWindow());
1116
1117 if (this->CollabHost.empty())
1118 {
1119 return false;
1120 }
1121
1122 if (this->RetryCount == 1)
1123 {
1124 mvLog(vtkLogger::VERBOSITY_INFO, "Connecting to collaboration server..." << std::endl);
1125 }
1126 std::stringstream ss;
1127 ss << "tcp://" << this->CollabHost << ":" << this->CollabPort;
1128 std::string requesterEndpoint = ss.str();
1129 ss.str(std::string());
1130 ss << "tcp://" << this->CollabHost << ":" << (this->CollabPort + 1);
1131 std::string subscriberEndpoint = ss.str();
1132 if (this->Internal->Requester != nullptr)
1133 {
1134 zmq_close(this->Internal->Requester);
1135 }
1136 if (this->Internal->Subscriber != nullptr)
1137 {
1138 zmq_close(this->Internal->Subscriber);
1139 }
1140 this->Connected = false;
1141 this->Internal->Requester = zmq_socket(this->Internal->Context, ZMQ_DEALER);
1142 this->Internal->Subscriber = zmq_socket(this->Internal->Context, ZMQ_SUB);
1143
1144 zmq_connect(this->Internal->Requester, requesterEndpoint.c_str());
1145 zmq_connect(this->Internal->Subscriber, subscriberEndpoint.c_str());
1146 // Subscribe to messages for our session, subscription required by zmq
1147 // We won't receive messages from other sessions.
1148 zmq_setsockopt(this->Internal->Subscriber, ZMQ_SUBSCRIBE, this->CollabSession.c_str(),
1149 this->CollabSession.size());
1150 // once we close, we want the socket to close immediately, and drop messages.
1151 int linger = 0;
1152 zmq_setsockopt(this->Internal->Requester, ZMQ_LINGER, &linger, sizeof(linger));
1153 this->Internal->CollabPollItems[0].socket = this->Internal->Requester;
1154 this->Internal->CollabPollItems[1].socket = this->Internal->Subscriber;
1155 this->Connected = true;
1156
1157 this->CollabID.clear();
1158 double currTime = vtkTimerLog::GetUniversalTime();
1159 this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
1160 this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT * this->RetryCount;
1161 this->PublishAvailable = false;
1162 zmq_send_const(this->Internal->Requester, "HelloPMVZ", 9, 0);
1163 // async reply, so get ID in HandleCollabMessage()
1164
1165 // add observer based on VR versus windowed
1166 if (this->RenderWindow->IsA("vtkVRRenderWindow"))
1167 {
1168 if (this->MoveObserver == -1)
1169 {
1170 this->MoveObserver = this->RenderWindow->GetInteractor()->AddObserver(
1171 vtkCommand::Move3DEvent, this->EventCommand, 1.0);
1172 }
1173 }
1174
1175 return true;
1176 }
1177
1178 //------------------------------------------------------------------------------
PrintSelf(ostream & os,vtkIndent indent)1179 void vtkVRCollaborationClient::PrintSelf(ostream& os, vtkIndent indent)
1180 {
1181 this->Superclass::PrintSelf(os, indent);
1182 }
1183