1 /*=========================================================================
2 
3   Program:   Visualization Toolkit
4 
5   Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
6   All rights reserved.
7   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
8 
9      This software is distributed WITHOUT ANY WARRANTY; without even
10      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
11      PURPOSE.  See the above copyright notice for more information.
12 
13 =========================================================================*/
14 
15 #include "vtkVRCollaborationClient.h"
16 
17 #include "vtkCallbackCommand.h"
18 #include "vtkCamera.h"
19 #include "vtkObjectFactory.h"
20 #include "vtkOpenGLAvatar.h"
21 #include "vtkOpenGLRenderer.h"
22 #include "vtkProperty.h"
23 #include "vtkRenderWindowInteractor.h"
24 #include "vtkTextProperty.h"
25 #include "vtkTimerLog.h"
26 #include "vtkTransform.h"
27 #include "vtkVRModel.h"
28 #include "vtkVRRenderWindow.h"
29 
30 #include <sstream>
31 #include <zmq.h>
32 namespace
33 {
34 const double RAY_LENGTH = 200.0;    // in meters
35 const double AVATAR_TIMEOUT = 10.0; // in seconds
36 const int HEARTBEAT_INTERVAL = 1.0; // in seconds
37 const int LIVE_COUNT = 3;
38 
39 // http://colorbrewer2.org/#type=qualitative&scheme=Pastel1&n=9
40 double AVATAR_COLORS[][3] = {
41   { 179 / 255.0, 205 / 255.0, 227 / 255.0 },
42   { 204 / 255.0, 235 / 255.0, 197 / 255.0 },
43   { 222 / 255.0, 203 / 255.0, 228 / 255.0 },
44   { 254 / 255.0, 217 / 255.0, 166 / 255.0 },
45   { 255 / 255.0, 255 / 255.0, 204 / 255.0 },
46   { 229 / 255.0, 216 / 255.0, 189 / 255.0 },
47   { 253 / 255.0, 218 / 255.0, 236 / 255.0 },
48   { 242 / 255.0, 242 / 255.0, 242 / 255.0 },
49   { 251 / 255.0, 180 / 255.0, 174 / 255.0 },
50 };
51 
52 const int NUM_COLORS = sizeof(AVATAR_COLORS) / sizeof(AVATAR_COLORS[0]);
53 
54 // two local helper functions for libzmq
_zmq_string_recv(void * socket,int flags)55 std::string _zmq_string_recv(void* socket, int flags)
56 {
57   zmq_msg_t msg;
58   int rc = zmq_msg_init(&msg);
59   assert(rc == 0);
60   rc = zmq_msg_recv(&msg, socket, flags);
61   assert(rc != -1);
62   return std::string(static_cast<const char*>(zmq_msg_data(&msg)), zmq_msg_size(&msg));
63 }
64 
65 //  Receives all remaining message parts from socket, does nothing.
_zmq_sock_clear(void * socket)66 void _zmq_sock_clear(void* socket)
67 {
68   int more;
69   size_t more_size = sizeof(more);
70   zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
71   while (more)
72   {
73     //  Process all parts of the message
74     zmq_msg_t part;
75     int rc = zmq_msg_init(&part);
76     assert(rc == 0);
77     /* Block until a message is available to be received from socket */
78     rc = zmq_msg_recv(&part, socket, 0);
79     assert(rc != -1);
80 
81     zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
82   }
83 }
84 
85 } // end anon namespace
86 
87 // simple macro to package up arguments to call Log()
88 #define mvLog(verbosity, x)                                                                        \
89   std::ostringstream ss;                                                                           \
90   ss << x;                                                                                         \
91   this->Log(verbosity, ss.str());
92 
93 // PIMPL to keep zeromq out of the class interface
94 class vtkVRCollaborationClientInternal
95 {
96 public:
97   void* Context;
98   void* Requester;
99   void* Subscriber;
100   zmq_pollitem_t CollabPollItems[2];
101 
vtkVRCollaborationClientInternal()102   vtkVRCollaborationClientInternal()
103     : CollabPollItems{ { nullptr, 0, ZMQ_POLLIN, 0 }, { nullptr, 0, ZMQ_POLLIN, 0 } }
104   {
105     // ceate context
106     this->Context = zmq_ctx_new();
107     assert(this->Context);
108     int rc = zmq_ctx_set(this->Context, ZMQ_IO_THREADS, 1);
109     assert(rc == 0);
110     rc = zmq_ctx_set(this->Context, ZMQ_MAX_SOCKETS, ZMQ_MAX_SOCKETS_DFLT);
111     assert(rc == 0);
112 
113     this->Requester = zmq_socket(this->Context, ZMQ_DEALER);
114     this->Subscriber = zmq_socket(this->Context, ZMQ_SUB);
115 
116     this->CollabPollItems[0].socket = this->Requester;
117     this->CollabPollItems[1].socket = this->Subscriber;
118   }
119 
~vtkVRCollaborationClientInternal()120   ~vtkVRCollaborationClientInternal()
121   {
122     if (this->Requester)
123     {
124       zmq_close(this->Requester);
125     }
126     if (this->Subscriber)
127     {
128       zmq_close(this->Subscriber);
129     }
130     int rc;
131     do
132     {
133       rc = zmq_ctx_term(this->Context);
134     } while (rc == -1 && errno == EINTR);
135   }
136 };
137 
138 vtkStandardNewMacro(vtkVRCollaborationClient);
139 
vtkVRCollaborationClient()140 vtkVRCollaborationClient::vtkVRCollaborationClient()
141   : Connected(false)
142   , DisplayOwnAvatar(false)
143   , MoveObserver(-1)
144   , Callback(nullptr)
145   , YourLastAvatarUpdateTime(0.0)
146 {
147   this->Internal = new vtkVRCollaborationClientInternal();
148 
149   this->CollabPort = 5555;
150   // Position MineView Zeromq, default when none is specified.
151   this->CollabSession = "PMVZ";
152   this->RetryCount = 1; // start in retry state.
153   this->NeedHeartbeat = 0;
154   this->NeedReply = 0;
155   this->PublishAvailable = false; // publish socket not sending yet.
156 
157   this->EventCommand = vtkCallbackCommand::New();
158   this->EventCommand->SetClientData(this);
159   this->EventCommand->SetCallback(vtkVRCollaborationClient::EventCallback);
160 
161   // setup default scale callback
162   this->ScaleCallback = [this]() {
163     auto ovrrw = vtkVRRenderWindow::SafeDownCast(this->RenderWindow);
164     return ovrrw ? ovrrw->GetPhysicalScale() : 1.0;
165   };
166 }
167 
~vtkVRCollaborationClient()168 vtkVRCollaborationClient::~vtkVRCollaborationClient()
169 {
170   this->Disconnect();
171   this->EventCommand->Delete();
172   delete this->Internal;
173 }
174 
Log(vtkLogger::Verbosity verbosity,std::string const & msg)175 void vtkVRCollaborationClient::Log(vtkLogger::Verbosity verbosity, std::string const& msg)
176 {
177   if (this->Callback)
178   {
179     this->Callback(msg, verbosity);
180   }
181   else
182   {
183     vtkWarningMacro(<< msg);
184   }
185 }
186 
Disconnect()187 void vtkVRCollaborationClient::Disconnect()
188 {
189   if (!this->Connected)
190   {
191     return;
192   }
193 
194   mvLog(vtkLogger::VERBOSITY_INFO, "Collab server disconnecting. " << std::endl);
195 
196   if (this->Internal->Requester != nullptr)
197   {
198     zmq_close(this->Internal->Requester);
199   }
200   if (this->Internal->Subscriber != nullptr)
201   {
202     zmq_close(this->Internal->Subscriber);
203   }
204   for (auto it : this->AvatarUpdateTime)
205   {
206     this->Renderer->RemoveActor(this->Avatars[it.first]);
207     this->Avatars.erase(it.first);
208   }
209   this->AvatarUpdateTime.clear();
210 
211   if (this->MoveObserver >= 0 && this->RenderWindow && this->RenderWindow->GetInteractor())
212   {
213     this->RenderWindow->GetInteractor()->RemoveObserver(this->MoveObserver);
214     this->MoveObserver = -1;
215   }
216   this->Connected = false;
217   this->CollabID.clear();
218 }
219 
AddArguments(vtksys::CommandLineArguments & arguments)220 void vtkVRCollaborationClient::AddArguments(vtksys::CommandLineArguments& arguments)
221 {
222   typedef vtksys::CommandLineArguments argT;
223 
224   arguments.AddArgument("--collab-server", argT::EQUAL_ARGUMENT, &(this->CollabHost),
225     "(optional) Connect to collaboration server at this hostname");
226   arguments.AddArgument("--collab-port", argT::EQUAL_ARGUMENT, &(this->CollabPort),
227     "(default:5555) Connect to collaboration server at this port");
228   arguments.AddArgument("--collab-session", argT::EQUAL_ARGUMENT, &(this->CollabSession),
229     "Connect to a separate collaboration session - each collaborator should use a matching value");
230   arguments.AddArgument("--collab-name", argT::EQUAL_ARGUMENT, &(this->CollabName),
231     "Name to display over your avatar to other collaborators");
232   this->DisplayOwnAvatar = false;
233   arguments.AddBooleanArgument("--show-my-avatar", &this->DisplayOwnAvatar,
234     "(default false) Show an avatar at my own position.");
235 }
236 
Render()237 void vtkVRCollaborationClient::Render()
238 {
239   if (this->Connected)
240   {
241     // if windowed update avatar position based on camera pos
242     if (this->MoveObserver == -1)
243     {
244       this->UpdateAvatarPoseFromCamera();
245     }
246     this->HandleCollabMessage();
247     this->EraseIdleAvatars();
248   }
249 }
250 
UpdateAvatarPoseFromCamera()251 void vtkVRCollaborationClient::UpdateAvatarPoseFromCamera()
252 {
253   // act like a Move3D event for the head
254   int idevice = static_cast<int>(vtkEventDataDevice::HeadMountedDisplay);
255   double* pos = this->DevicePoses[idevice].Position.data();
256   double* orient = this->DevicePoses[idevice].Orientation.data();
257   this->HasPoseForDevice[idevice] = true;
258 
259   this->Renderer->GetActiveCamera()->GetPosition(pos);
260   auto cori = this->Renderer->GetActiveCamera()->GetOrientationWXYZ();
261 
262   // currently have a mismatch between wxyz and euler angles. Convert.
263   this->TempTransform->Identity();
264   this->TempTransform->RotateWXYZ(-cori[0], &cori[1]);
265   // angles need to be rotated 90
266   this->TempTransform->RotateY(90);
267   this->TempTransform->GetOrientation(orient);
268 
269   this->SendLatestDevicePoses();
270 }
271 
SendLatestDevicePoses()272 void vtkVRCollaborationClient::SendLatestDevicePoses()
273 {
274   // don't send a message if we haven't gotten one during the last
275   // heartbeat. View messages, however, are always sent (queued).
276   if (this->RetryCount > 0)
277   {
278     return;
279   }
280 
281   // throttle avatar pose updates
282   double currentTime = vtkTimerLog::GetUniversalTime();
283   if (currentTime - this->YourLastAvatarUpdateTime > 0.02)
284   {
285     // package up the device pose messages
286     std::vector<int32_t> devices;
287     std::vector<double> poses;
288     bool haveHead = false;
289     for (int i = 0; i < vtkEventDataNumberOfDevices; ++i)
290     {
291       if (this->HasPoseForDevice[i])
292       {
293         if (i == static_cast<int>(vtkEventDataDevice::HeadMountedDisplay))
294         {
295           haveHead = true;
296         }
297         devices.push_back(i);
298         poses.insert(
299           poses.end(), this->DevicePoses[i].Position.begin(), this->DevicePoses[i].Position.end());
300         poses.insert(poses.end(), this->DevicePoses[i].Orientation.begin(),
301           this->DevicePoses[i].Orientation.end());
302       }
303       this->HasPoseForDevice[i] = false;
304     }
305 
306     // if no data ignore
307     // Don't send hand messages without head data
308     if (devices.size() == 0 || !haveHead)
309     {
310       return;
311     }
312 
313     double scale = this->ScaleCallback();
314 
315     std::vector<vtkVRCollaborationClient::Argument> args;
316     args.resize(3);
317     args[0].SetInt32Vector(devices.data(), static_cast<int32_t>(devices.size()));
318     args[1].SetDoubleVector(poses.data(), static_cast<int32_t>(poses.size()));
319     args[2].SetDouble(scale);
320     this->YourLastAvatarUpdateTime = currentTime;
321     this->SendAMessage("A", args);
322   }
323 }
324 
SendAMessage(std::string const & msgType,std::vector<Argument> const & args)325 void vtkVRCollaborationClient::SendAMessage(
326   std::string const& msgType, std::vector<Argument> const& args)
327 {
328   if (this->CollabID.empty())
329   {
330     return;
331   }
332 
333   // send header, our ID, session.
334   zmq_send_const(this->Internal->Requester, "PMVZ", 4, ZMQ_SNDMORE);
335   zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), ZMQ_SNDMORE);
336   zmq_send(this->Internal->Requester, this->CollabSession.c_str(), this->CollabSession.size(),
337     ZMQ_SNDMORE);
338   zmq_send(this->Internal->Requester, msgType.c_str(), msgType.size(), ZMQ_SNDMORE);
339 
340   // send the number of arguments
341   uint16_t numArgs = static_cast<uint16_t>(args.size());
342   zmq_send(this->Internal->Requester, &numArgs, sizeof(numArgs), ZMQ_SNDMORE);
343 
344   // now send the arguments
345   for (int i = 0; i < numArgs; ++i)
346   {
347     auto& arg = args[i];
348 
349     // send the arg type
350     uint16_t type = static_cast<uint16_t>(arg.Type);
351     zmq_send(this->Internal->Requester, &type, sizeof(type), ZMQ_SNDMORE);
352 
353     // send the arg count (how many in the vector)
354     uint16_t count = static_cast<uint16_t>(arg.Count);
355     zmq_send(this->Internal->Requester, &count, sizeof(count), ZMQ_SNDMORE);
356 
357     // finally send the data
358     switch (arg.Type)
359     {
360       case Double:
361       {
362         zmq_send(this->Internal->Requester, arg.Data.get(), sizeof(double) * arg.Count,
363           (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
364         break;
365       }
366       case Int32:
367       {
368         zmq_send(this->Internal->Requester, arg.Data.get(), sizeof(int32_t) * arg.Count,
369           (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
370         break;
371       }
372       case String:
373       {
374         zmq_send(this->Internal->Requester, arg.Data.get(), arg.Count,
375           (i == numArgs - 1 ? 0 : ZMQ_SNDMORE));
376         break;
377       }
378     }
379   }
380 }
381 
GetString(std::string & result)382 bool vtkVRCollaborationClient::Argument::GetString(std::string& result)
383 {
384   if (this->Type != String || !this->Data)
385   {
386     return false;
387   }
388 
389   char* cstr = static_cast<char*>(this->Data.get());
390   // make sure it is terminated
391   cstr[this->Count - 1] = 0;
392   result = std::string(cstr);
393   return true;
394 }
395 
SetString(std::string const & in)396 void vtkVRCollaborationClient::Argument::SetString(std::string const& in)
397 {
398   this->Type = String;
399   this->Count = static_cast<uint16_t>(in.size() + 1);
400   char* cdata = static_cast<char*>(malloc(in.size() + 1));
401   this->Data = std::shared_ptr<void>(cdata, free);
402   std::copy(in.begin(), in.end(), cdata);
403   cdata[in.size()] = 0;
404 }
405 
GetStringVector(std::vector<std::string> & result)406 bool vtkVRCollaborationClient::Argument::GetStringVector(std::vector<std::string>& result)
407 {
408   if (this->Type != String || !this->Data)
409   {
410     return false;
411   }
412 
413   char* cstr = static_cast<char*>(this->Data.get());
414   // make sure it is terminated
415   cstr[this->Count - 1] = 0;
416 
417   size_t pos = 0;
418   while (pos < this->Count)
419   {
420     std::string tmp = std::string(cstr + pos);
421     result.push_back(tmp);
422     pos += tmp.size() + 1;
423   }
424 
425   return true;
426 }
427 
SetStringVector(std::vector<std::string> const & in)428 void vtkVRCollaborationClient::Argument::SetStringVector(std::vector<std::string> const& in)
429 {
430   this->Type = String;
431 
432   size_t byteCount = 0;
433   for (size_t i = 0; i < in.size(); i++)
434   {
435     byteCount += in[i].size();
436     byteCount++;
437   }
438   this->Count = static_cast<uint16_t>(byteCount);
439 
440   char* cdata = static_cast<char*>(malloc(byteCount));
441   this->Data = std::shared_ptr<void>(cdata, free);
442 
443   char* cptr = cdata;
444   for (size_t i = 0; i < in.size(); i++)
445   {
446     std::copy(in[i].begin(), in[i].end(), cptr);
447     cptr += in[i].size();
448     *cptr = 0;
449     cptr++;
450   }
451 }
452 
GetDoubleVector(std::vector<double> & result)453 bool vtkVRCollaborationClient::Argument::GetDoubleVector(std::vector<double>& result)
454 {
455   if (this->Type != Double || !this->Data)
456   {
457     return false;
458   }
459 
460   result.resize(this->Count);
461   double* dptr = static_cast<double*>(this->Data.get());
462   std::copy(dptr, dptr + this->Count, result.begin());
463   return true;
464 }
465 
SetDoubleVector(double const * in,uint16_t size)466 void vtkVRCollaborationClient::Argument::SetDoubleVector(double const* in, uint16_t size)
467 {
468   this->Type = Double;
469   this->Count = size;
470   double* cdata = static_cast<double*>(malloc(sizeof(double) * size));
471   this->Data = std::shared_ptr<void>(cdata, free);
472   std::copy(in, in + size, cdata);
473 }
474 
SetDouble(double in)475 void vtkVRCollaborationClient::Argument::SetDouble(double in)
476 {
477   this->Type = Double;
478   this->Count = 1;
479   double* cdata = static_cast<double*>(malloc(sizeof(double)));
480   cdata[0] = in;
481   this->Data = std::shared_ptr<void>(cdata, free);
482 }
483 
GetDouble(double & result)484 bool vtkVRCollaborationClient::Argument::GetDouble(double& result)
485 {
486   if (this->Type != Double || !this->Data || this->Count != 1)
487   {
488     return false;
489   }
490 
491   double* dptr = static_cast<double*>(this->Data.get());
492   result = *dptr;
493   return true;
494 }
495 
GetInt32Vector(std::vector<int32_t> & result)496 bool vtkVRCollaborationClient::Argument::GetInt32Vector(std::vector<int32_t>& result)
497 {
498   if (this->Type != Int32 || !this->Data)
499   {
500     return false;
501   }
502 
503   result.resize(this->Count);
504   int32_t* dptr = static_cast<int32_t*>(this->Data.get());
505   std::copy(dptr, dptr + this->Count, result.begin());
506   return true;
507 }
508 
SetInt32Vector(int32_t const * in,uint16_t size)509 void vtkVRCollaborationClient::Argument::SetInt32Vector(int32_t const* in, uint16_t size)
510 {
511   this->Type = Int32;
512   this->Count = size;
513   int32_t* cdata = static_cast<int32_t*>(malloc(sizeof(int32_t) * size));
514   this->Data = std::shared_ptr<void>(cdata, free);
515   std::copy(in, in + size, cdata);
516 }
517 
SetInt32(int32_t in)518 void vtkVRCollaborationClient::Argument::SetInt32(int32_t in)
519 {
520   this->Type = Int32;
521   this->Count = 1;
522   int32_t* cdata = static_cast<int32_t*>(malloc(sizeof(int32_t)));
523   cdata[0] = in;
524   this->Data = std::shared_ptr<void>(cdata, free);
525 }
526 
GetInt32(int32_t & result)527 bool vtkVRCollaborationClient::Argument::GetInt32(int32_t& result)
528 {
529   if (this->Type != Int32 || !this->Data || this->Count != 1)
530   {
531     return false;
532   }
533 
534   int32_t* dptr = static_cast<int32_t*>(this->Data.get());
535   result = *dptr;
536   return true;
537 }
538 
GetMessageArguments()539 std::vector<vtkVRCollaborationClient::Argument> vtkVRCollaborationClient::GetMessageArguments()
540 {
541   std::vector<Argument> result;
542 
543   uint16_t numArgs = 0;
544   zmq_recv(this->Internal->Subscriber, &numArgs, sizeof(numArgs), 0);
545 
546   result.resize(numArgs);
547 
548   for (int i = 0; i < numArgs; ++i)
549   {
550     Argument& arg = result[i];
551 
552     // get the arg type
553     uint16_t argType = Double;
554     zmq_recv(this->Internal->Subscriber, &argType, sizeof(argType), 0);
555     arg.Type = static_cast<ArgumentType>(argType);
556 
557     // get the arg count
558     uint16_t argCount = 0;
559     zmq_recv(this->Internal->Subscriber, &argCount, sizeof(argCount), 0);
560     arg.Count = argCount;
561 
562     switch (arg.Type)
563     {
564       case Double:
565       {
566         arg.Data = std::shared_ptr<void>(malloc(sizeof(double) * arg.Count), free);
567         auto zresult =
568           zmq_recv(this->Internal->Subscriber, arg.Data.get(), sizeof(double) * arg.Count, 0);
569         if (zresult != sizeof(double) * arg.Count)
570         {
571           vtkErrorMacro("failed to get valid argument");
572         }
573         break;
574       }
575       case Int32:
576       {
577         arg.Data = std::shared_ptr<void>(malloc(sizeof(int32_t) * arg.Count), free);
578         auto zresult =
579           zmq_recv(this->Internal->Subscriber, arg.Data.get(), sizeof(int32_t) * arg.Count, 0);
580         if (zresult != sizeof(int32_t) * arg.Count)
581         {
582           vtkErrorMacro("failed to get valid argument");
583         }
584         break;
585       }
586       case String:
587       {
588         arg.Data = std::shared_ptr<void>(malloc(arg.Count), free);
589         auto zresult = zmq_recv(this->Internal->Subscriber, arg.Data.get(), arg.Count, 0);
590         if (zresult != arg.Count)
591         {
592           vtkErrorMacro("failed to get valid argument");
593         }
594         break;
595       }
596     }
597   }
598 
599   return result;
600 }
601 
SendAMessage(std::string const & msgType)602 void vtkVRCollaborationClient::SendAMessage(std::string const& msgType)
603 {
604   if (this->CollabID.empty())
605   {
606     return;
607   }
608   // send header, our ID, session.
609   zmq_send_const(this->Internal->Requester, "PMVZ", 4, ZMQ_SNDMORE);
610   zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), ZMQ_SNDMORE);
611   zmq_send(this->Internal->Requester, this->CollabSession.c_str(), this->CollabSession.size(),
612     ZMQ_SNDMORE);
613   zmq_send(this->Internal->Requester, msgType.c_str(), msgType.size(), 0);
614 }
615 
SendPoseMessage(std::string const & msgType,int index,double pos[3],double dir[3])616 void vtkVRCollaborationClient::SendPoseMessage(
617   std::string const& msgType, int index, double pos[3], double dir[3])
618 {
619   std::vector<vtkVRCollaborationClient::Argument> args;
620   args.resize(3);
621   args[0].SetInt32(index);
622   args[1].SetDoubleVector(pos, 3);
623   args[2].SetDoubleVector(dir, 3);
624   this->SendAMessage(msgType, args);
625 }
626 
HandleBroadcastMessage(std::string const & otherID,std::string const & type)627 void vtkVRCollaborationClient::HandleBroadcastMessage(
628   std::string const& otherID, std::string const& type)
629 {
630   if (type == "A")
631   {
632     std::vector<Argument> args = this->GetMessageArguments();
633 
634     std::vector<double> poses;
635     std::vector<int32_t> devices;
636     double ascale = 1.0;
637     if (args.size() != 3 || !args[0].GetInt32Vector(devices) || !args[1].GetDoubleVector(poses) ||
638       !args[2].GetDouble(ascale))
639     {
640       mvLog(vtkLogger::VERBOSITY_ERROR,
641         "Incorrect arguments for A (avatar pose) collaboration message" << std::endl);
642       return;
643     }
644 
645     // if this update is from us, we ignore it by default.
646     if (otherID != this->CollabID || this->DisplayOwnAvatar)
647     {
648       double scale = this->ScaleCallback();
649       auto avatar = this->GetAvatar(otherID);
650       avatar->SetScale(0.3 * scale);
651 
652       bool haveLeft = false;
653       bool haveRight = false;
654       for (size_t i = 0; i < devices.size(); ++i)
655       {
656         vtkEventDataDevice device = static_cast<vtkEventDataDevice>(devices[i]);
657 
658         double* updatePos = poses.data() + i * 7;
659         double* updateOrient = poses.data() + i * 7 + 3;
660 
661         if (device == vtkEventDataDevice::LeftController)
662         {
663           avatar->SetLeftHandPosition(updatePos);
664           avatar->SetLeftHandOrientation(updateOrient);
665           if (!avatar->GetUseLeftHand())
666           {
667             avatar->UseLeftHandOn();
668           }
669           haveLeft = true;
670         }
671         else if (device == vtkEventDataDevice::RightController)
672         {
673           avatar->SetRightHandPosition(updatePos);
674           avatar->SetRightHandOrientation(updateOrient);
675           if (!avatar->GetUseRightHand())
676           {
677             avatar->UseRightHandOn();
678           }
679           haveRight = true;
680         }
681         else if (device == vtkEventDataDevice::HeadMountedDisplay)
682         {
683           avatar->SetHeadPosition(updatePos);
684           avatar->SetHeadOrientation(updateOrient);
685         }
686         this->AvatarUpdateTime[otherID][static_cast<int>(device)] = vtkTimerLog::GetUniversalTime();
687       }
688 
689       // adjust hand positions based on sending avatar scale
690       double adjustment = scale / ascale;
691       auto* headPos = avatar->GetHeadPosition();
692       if (haveRight)
693       {
694         auto* handPos = avatar->GetRightHandPosition();
695         avatar->SetRightHandPosition(headPos[0] + adjustment * (handPos[0] - headPos[0]),
696           headPos[1] + adjustment * (handPos[1] - headPos[1]),
697           headPos[2] + adjustment * (handPos[2] - headPos[2]));
698       }
699       if (haveLeft)
700       {
701         auto* handPos = avatar->GetLeftHandPosition();
702         avatar->SetLeftHandPosition(headPos[0] + adjustment * (handPos[0] - headPos[0]),
703           headPos[1] + adjustment * (handPos[1] - headPos[1]),
704           headPos[2] + adjustment * (handPos[2] - headPos[2]));
705       }
706     }
707 
708     // Check if we were idle, and re-send join messages.
709     if (otherID == this->CollabID && this->AvatarIdle(this->CollabID))
710     {
711       mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << " return from idle " << std::endl);
712 
713       std::vector<vtkVRCollaborationClient::Argument> args2;
714       args2.resize(1);
715       args2[0].SetString(this->CollabID);
716       this->SendAMessage("J", args2);
717     }
718   }
719   else if (type == "J")
720   {
721     std::vector<Argument> args = this->GetMessageArguments();
722 
723     std::string extraID;
724     if (args.size() != 1 || !args[0].GetString(extraID))
725     {
726       mvLog(vtkLogger::VERBOSITY_ERROR,
727         "Incorrect arguments for J (join) collaboration message" << std::endl);
728       return;
729     }
730 
731     // Join message, send our list of views.
732     // if we are idle, don't respond to join messages - send a join when
733     // we are not idle anymore.
734     if (this->AvatarIdle(this->CollabID))
735     {
736       return;
737     }
738     mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << ", Join" << std::endl);
739     if (!this->CollabName.empty())
740     {
741       std::vector<vtkVRCollaborationClient::Argument> args2;
742       args2.resize(1);
743       args2[0].SetString(this->CollabName);
744       this->SendAMessage("N", args2);
745     }
746   }
747   else if (type == "SR" || type == "HR")
748   {
749     // show/hide a ray
750     std::vector<Argument> args = this->GetMessageArguments();
751     int32_t device;
752     if (args.size() != 1 || !args[0].GetInt32(device))
753     {
754       mvLog(vtkLogger::VERBOSITY_ERROR,
755         "Incorrect arguments for SR/HR (ray) collaboration message" << std::endl);
756       return;
757     }
758 
759     bool show = (type == "SR");
760     if (this->Avatars.count(otherID) != 0)
761     {
762       auto avatar = this->GetAvatar(otherID);
763       if (device == static_cast<int>(vtkEventDataDevice::LeftController))
764       {
765         avatar->SetLeftShowRay(show);
766       }
767       else if (device == static_cast<int>(vtkEventDataDevice::RightController))
768       {
769         avatar->SetRightShowRay(show);
770       }
771       double scale = this->ScaleCallback();
772       avatar->SetRayLength(RAY_LENGTH * scale);
773     }
774   }
775   else if (type == "N")
776   {
777     std::vector<Argument> args = this->GetMessageArguments();
778     // Set avatar's name, displayed above head.
779     std::string avatarName;
780     if (args.size() != 1 || !args[0].GetString(avatarName))
781     {
782       mvLog(vtkLogger::VERBOSITY_ERROR,
783         "Incorrect arguments for N (name) collaboration message" << std::endl);
784       return;
785     }
786     mvLog(vtkLogger::VERBOSITY_INFO, "Collab " << otherID << ", Name " << avatarName << std::endl);
787     if (!avatarName.empty() && otherID != this->CollabID)
788     {
789       this->GetAvatar(otherID)->SetLabel(avatarName.c_str());
790     }
791   }
792 }
793 
GetAvatar(std::string otherID)794 vtkSmartPointer<vtkOpenGLAvatar> vtkVRCollaborationClient::GetAvatar(std::string otherID)
795 {
796   // if it's from a new collaborator, add an avatar
797   if (this->Avatars.count(otherID) == 0)
798   {
799     mvLog(vtkLogger::VERBOSITY_INFO, "Adding Avatar " << otherID << std::endl);
800     this->Avatars[otherID] = vtkSmartPointer<vtkOpenGLAvatar>::New();
801     auto newAvatar = this->Avatars[otherID];
802     this->Renderer->AddActor(newAvatar);
803     // meters -> ft conversion.
804     double scale = this->ScaleCallback();
805     newAvatar->SetScale(0.3 * scale);
806     newAvatar->SetUpVector(0, 0, 1);
807     size_t colorIndex = this->Avatars.size() - 1;
808     // base the color on the server's index of avatars.
809     try
810     {
811       colorIndex = std::stoi(otherID);
812     }
813     catch (...)
814     {
815     }
816     newAvatar->GetProperty()->SetColor(AVATAR_COLORS[(colorIndex) % NUM_COLORS]);
817     newAvatar->GetLabelTextProperty()->SetColor(AVATAR_COLORS[(colorIndex) % NUM_COLORS]);
818     newAvatar->GetLabelTextProperty()->SetFontSize(16);
819     if (otherID == this->CollabID)
820     {
821       // Display only the hands
822       newAvatar->SetShowHandsOnly(true);
823       auto ovrrw = vtkVRRenderWindow::SafeDownCast(this->RenderWindow);
824       if (ovrrw)
825       {
826         vtkVRModel* cmodel = ovrrw->GetTrackedDeviceModel(vtkEventDataDevice::LeftController);
827         if (cmodel)
828         {
829           cmodel->SetVisibility(false);
830         }
831         cmodel = ovrrw->GetTrackedDeviceModel(vtkEventDataDevice::RightController);
832         if (cmodel)
833         {
834           cmodel->SetVisibility(false);
835         }
836       }
837     }
838     for (int i = 0; i < vtkEventDataNumberOfDevices; ++i)
839     {
840       this->AvatarUpdateTime[otherID][i] = 0;
841     }
842   }
843   return this->Avatars[otherID];
844 }
845 
HandleCollabMessage()846 void vtkVRCollaborationClient::HandleCollabMessage()
847 {
848   double currTime = vtkTimerLog::GetUniversalTime();
849   bool receivedMsg = true;
850   do
851   {
852     // timeout is 0, return immediately.
853     zmq_poll(&(this->Internal->CollabPollItems)[0], 2, 0);
854     if (this->Internal->CollabPollItems[0].revents & ZMQ_POLLIN)
855     {
856       // reply on the request-reply (dealer) socket - expect ID or error.
857       std::string reply = _zmq_string_recv(this->Internal->Requester, ZMQ_DONTWAIT);
858       if (reply == "ERROR")
859       {
860         mvLog(vtkLogger::VERBOSITY_ERROR, "Collab server returned error " << std::endl);
861       }
862       else if (reply == "pong")
863       {
864         // update server alive time, below.
865       }
866       else if (reply.empty())
867       {
868         // error, do nothing.
869         mvLog(vtkLogger::VERBOSITY_ERROR, "Error: empty reply " << std::endl);
870       }
871       else
872       {
873         this->CollabID = reply;
874         mvLog(vtkLogger::VERBOSITY_INFO, "Received ID " << this->CollabID << std::endl);
875         this->RetryCount = 0;
876         // ideally send "J" join message here, but pub-sub not ready yet.
877       }
878     }
879 
880     // handle broadcast messages
881     //
882     // A - avatar position update
883     // J - New client joined message
884     // N - Client name
885     // SR/HR - show or hide a ray
886     // V - View change
887     // P - New TourStop
888     // VL - ViewList
889     //
890     if (this->Internal->CollabPollItems[1].revents & ZMQ_POLLIN)
891     {
892       std::string sig = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
893       if (sig.size())
894       {
895         // verify the signature
896         // we can get bad data, so make sure the first message contains the
897         // correct data before requesting other pieces (which could block
898         // and hang the app if the data was bad)
899         if (sig == this->CollabSession)
900         {
901           // the first sub-msg contains the session string for the subscription
902           //  process other avatar updates
903           std::string otherID = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
904           std::string type = _zmq_string_recv(this->Internal->Subscriber, ZMQ_DONTWAIT);
905           if (otherID.empty() || type.empty())
906           {
907             // error, ignore
908             mvLog(vtkLogger::VERBOSITY_ERROR, "empty ID or ID " << otherID << ",  " << type);
909             _zmq_sock_clear(this->Internal->Subscriber);
910             continue;
911           }
912 
913           this->HandleBroadcastMessage(otherID, type);
914         }
915         else
916         {
917           mvLog(vtkLogger::VERBOSITY_ERROR,
918             "Error: mismatched session header with signature of: " << sig);
919           _zmq_sock_clear(this->Internal->Subscriber);
920         }
921 
922         // we got a message on the publish socket, see if this is the first one.
923         if (!this->PublishAvailable)
924         {
925           this->PublishAvailable = true;
926           // send join message, to trigger view setup.
927           std::vector<vtkVRCollaborationClient::Argument> args;
928           args.resize(1);
929           args[0].SetString(this->CollabID);
930           this->SendAMessage("J", args);
931         }
932       }
933       else
934       {
935         mvLog(vtkLogger::VERBOSITY_ERROR, "Error: empty session header");
936         _zmq_sock_clear(this->Internal->Subscriber);
937         continue;
938       }
939     }
940 
941     receivedMsg = (this->Internal->CollabPollItems[0].revents & ZMQ_POLLIN ||
942       this->Internal->CollabPollItems[1].revents & ZMQ_POLLIN);
943     if (receivedMsg)
944     {
945       // got a message, reset heartbeat.
946       this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
947       this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT;
948       this->RetryCount = 0;
949     }
950     else if (currTime > this->NeedHeartbeat && !this->CollabID.empty())
951     {
952       // heartbeat only if we have an ID. send ping, expect pong
953       if (this->RetryCount == 0)
954       {
955         this->RetryCount = 1;
956       }
957       zmq_send_const(this->Internal->Requester, "ping", 4, ZMQ_SNDMORE);
958       zmq_send(this->Internal->Requester, this->CollabID.c_str(), this->CollabID.size(), 0);
959       this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
960     }
961 
962     // if heartbeat fails multiple times
963     if (currTime > this->NeedReply)
964     {
965       if (this->RetryCount > LIVE_COUNT)
966       {
967         this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT * this->RetryCount;
968         mvLog(vtkLogger::VERBOSITY_WARNING, "Collab server disconnected, waiting. " << std::endl);
969       }
970       else
971       {
972         mvLog(vtkLogger::VERBOSITY_WARNING,
973           "Collab server not responding, retry " << this->RetryCount << std::endl);
974         ++this->RetryCount;
975         // disconnect and reconnect sockets, clear ID
976         this->Initialize(this->Renderer);
977       }
978     }
979   } while (receivedMsg);
980 }
981 
AvatarIdle(std::string id)982 bool vtkVRCollaborationClient::AvatarIdle(std::string id)
983 {
984   double currTime = vtkTimerLog::GetUniversalTime();
985   auto times = this->AvatarUpdateTime[id];
986 
987   // if we've never received a head position message, the avatar isn't idle.
988   if (times[0] == 0)
989   {
990     return false;
991   }
992 
993   double avatarTime = times[0];
994   // consider ourselves idle slightly before any collaborators do, avoiding races.
995   double timeout = id == this->CollabID ? 0.98 * AVATAR_TIMEOUT : AVATAR_TIMEOUT;
996   return (currTime - avatarTime > timeout);
997 }
998 
EraseIdleAvatars()999 void vtkVRCollaborationClient::EraseIdleAvatars()
1000 {
1001   double currTime = vtkTimerLog::GetUniversalTime();
1002   for (auto it : this->AvatarUpdateTime)
1003   {
1004     if (it.second[0] == 0)
1005     {
1006       continue;
1007     }
1008     double avatarTime = it.second[0];
1009     if (currTime - avatarTime > AVATAR_TIMEOUT && it.first != this->CollabID &&
1010       this->Avatars.count(it.first) != 0)
1011     {
1012       mvLog(vtkLogger::VERBOSITY_INFO, "Removing Avatar: " << it.first << std::endl);
1013       this->Renderer->RemoveActor(this->Avatars[it.first]);
1014       this->Avatars.erase(it.first);
1015       this->AvatarUpdateTime.erase(it.first);
1016       // send join message, to trigger view setup.
1017       std::vector<vtkVRCollaborationClient::Argument> args;
1018       args.resize(1);
1019       args[0].SetString(this->CollabID);
1020       this->SendAMessage("J", args);
1021       break;
1022     }
1023 
1024     if (this->Avatars.count(it.first) == 0)
1025     {
1026       continue;
1027     }
1028 
1029     // see if the hands are idle, or not present at all.
1030     int device = static_cast<int>(vtkEventDataDevice::LeftController);
1031     if (currTime - it.second[device] > AVATAR_TIMEOUT)
1032     {
1033       auto currAvatar = this->Avatars[it.first];
1034       if (currAvatar->GetUseLeftHand())
1035       {
1036         currAvatar->UseLeftHandOff();
1037       }
1038     }
1039     device = static_cast<int>(vtkEventDataDevice::RightController);
1040     if (currTime - it.second[device] > AVATAR_TIMEOUT)
1041     {
1042       auto currAvatar = this->Avatars[it.first];
1043       if (currAvatar->GetUseRightHand())
1044       {
1045         currAvatar->UseRightHandOff();
1046       }
1047     }
1048   }
1049 }
1050 
EventCallback(vtkObject *,unsigned long eventID,void * clientdata,void * calldata)1051 void vtkVRCollaborationClient::EventCallback(
1052   vtkObject*, unsigned long eventID, void* clientdata, void* calldata)
1053 {
1054   vtkVRCollaborationClient* self = static_cast<vtkVRCollaborationClient*>(clientdata);
1055 
1056   if (eventID == vtkCommand::Move3DEvent)
1057   {
1058     vtkEventData* edata = static_cast<vtkEventData*>(calldata);
1059     vtkEventDataDevice3D* edd = edata->GetAsEventDataDevice3D();
1060     if (!edd)
1061     {
1062       return;
1063     }
1064 
1065     auto device = edd->GetDevice();
1066     int idevice = static_cast<int>(device);
1067     if (device == vtkEventDataDevice::LeftController ||
1068       device == vtkEventDataDevice::RightController ||
1069       device == vtkEventDataDevice::HeadMountedDisplay)
1070     {
1071       double* pos = self->DevicePoses[idevice].Position.data();
1072       double* orient = self->DevicePoses[idevice].Orientation.data();
1073       edd->GetWorldPosition(pos);
1074       // empirically, the Oculus sometimes gives nonsense positions
1075       if (fabs(pos[0]) > 1e07)
1076       {
1077         return;
1078       }
1079       double wxyz[4] = { 0 };
1080       edd->GetWorldOrientation(wxyz);
1081 
1082       // currently have a mismatch between wxyz and euler angles. Convert.
1083       self->TempTransform->Identity();
1084       self->TempTransform->RotateWXYZ(wxyz[0], &wxyz[1]);
1085       // angles need to be rotated 90
1086       self->TempTransform->RotateY(90);
1087       self->TempTransform->GetOrientation(orient);
1088 
1089       // hands are also too far forward in x.
1090       if (device != vtkEventDataDevice::HeadMountedDisplay)
1091       {
1092         double adjust[3] = { -0.15, 0, 0 };
1093         self->TempTransform->TransformPoint(adjust, adjust);
1094         pos[0] += adjust[0];
1095         pos[1] += adjust[1];
1096         pos[2] += adjust[2];
1097       }
1098       self->HasPoseForDevice[idevice] = true;
1099       self->SendLatestDevicePoses();
1100     }
1101     return;
1102   }
1103 }
1104 
1105 // disconnect if needed, then connect to server.
1106 // Retry count is set externally.
Initialize(vtkOpenGLRenderer * ren)1107 bool vtkVRCollaborationClient::Initialize(vtkOpenGLRenderer* ren)
1108 {
1109   if (!ren)
1110   {
1111     return false;
1112   }
1113 
1114   this->Renderer = ren;
1115   this->RenderWindow = static_cast<vtkOpenGLRenderWindow*>(ren->GetVTKWindow());
1116 
1117   if (this->CollabHost.empty())
1118   {
1119     return false;
1120   }
1121 
1122   if (this->RetryCount == 1)
1123   {
1124     mvLog(vtkLogger::VERBOSITY_INFO, "Connecting to collaboration server..." << std::endl);
1125   }
1126   std::stringstream ss;
1127   ss << "tcp://" << this->CollabHost << ":" << this->CollabPort;
1128   std::string requesterEndpoint = ss.str();
1129   ss.str(std::string());
1130   ss << "tcp://" << this->CollabHost << ":" << (this->CollabPort + 1);
1131   std::string subscriberEndpoint = ss.str();
1132   if (this->Internal->Requester != nullptr)
1133   {
1134     zmq_close(this->Internal->Requester);
1135   }
1136   if (this->Internal->Subscriber != nullptr)
1137   {
1138     zmq_close(this->Internal->Subscriber);
1139   }
1140   this->Connected = false;
1141   this->Internal->Requester = zmq_socket(this->Internal->Context, ZMQ_DEALER);
1142   this->Internal->Subscriber = zmq_socket(this->Internal->Context, ZMQ_SUB);
1143 
1144   zmq_connect(this->Internal->Requester, requesterEndpoint.c_str());
1145   zmq_connect(this->Internal->Subscriber, subscriberEndpoint.c_str());
1146   // Subscribe to messages for our session, subscription required by zmq
1147   // We won't receive messages from other sessions.
1148   zmq_setsockopt(this->Internal->Subscriber, ZMQ_SUBSCRIBE, this->CollabSession.c_str(),
1149     this->CollabSession.size());
1150   // once we close, we want the socket to close immediately, and drop messages.
1151   int linger = 0;
1152   zmq_setsockopt(this->Internal->Requester, ZMQ_LINGER, &linger, sizeof(linger));
1153   this->Internal->CollabPollItems[0].socket = this->Internal->Requester;
1154   this->Internal->CollabPollItems[1].socket = this->Internal->Subscriber;
1155   this->Connected = true;
1156 
1157   this->CollabID.clear();
1158   double currTime = vtkTimerLog::GetUniversalTime();
1159   this->NeedHeartbeat = currTime + HEARTBEAT_INTERVAL;
1160   this->NeedReply = currTime + HEARTBEAT_INTERVAL * LIVE_COUNT * this->RetryCount;
1161   this->PublishAvailable = false;
1162   zmq_send_const(this->Internal->Requester, "HelloPMVZ", 9, 0);
1163   // async reply, so get ID in HandleCollabMessage()
1164 
1165   // add observer based on VR versus windowed
1166   if (this->RenderWindow->IsA("vtkVRRenderWindow"))
1167   {
1168     if (this->MoveObserver == -1)
1169     {
1170       this->MoveObserver = this->RenderWindow->GetInteractor()->AddObserver(
1171         vtkCommand::Move3DEvent, this->EventCommand, 1.0);
1172     }
1173   }
1174 
1175   return true;
1176 }
1177 
1178 //------------------------------------------------------------------------------
PrintSelf(ostream & os,vtkIndent indent)1179 void vtkVRCollaborationClient::PrintSelf(ostream& os, vtkIndent indent)
1180 {
1181   this->Superclass::PrintSelf(os, indent);
1182 }
1183