1 /***************************************************/
2 /*! \class Messager
3     \brief STK input control message parser.
4 
5     This class reads and parses control messages from a variety of
6     sources, such as a scorefile, MIDI port, socket connection, or
7     stdin.  MIDI messages are retrieved using the RtMidi class.  All
8     other input sources (scorefile, socket, or stdin) are assumed to
9     provide SKINI formatted messages.  This class can be compiled with
10     generic, non-realtime support, in which case only scorefile
11     reading is possible.
12 
13     The various \e realtime message acquisition mechanisms (from MIDI,
14     socket, or stdin) take place asynchronously, filling the message
15     queue.  A call to popMessage() will pop the next available control
16     message from the queue and return it via the referenced Message
17     structure.  When a \e non-realtime scorefile is set, it is not
18     possible to start reading realtime input messages (from MIDI,
19     socket, or stdin).  Likewise, it is not possible to read from a
20     scorefile when a realtime input mechanism is running.
21 
22     When MIDI input is started, input is also automatically read from
23     stdin.  This allows for program termination via the terminal
24     window.  An __SK_Exit_ message is pushed onto the stack whenever
25     an "exit" or "Exit" message is received from stdin or when all
26     socket connections close and no stdin thread is running.
27 
28     This class is primarily for use in STK example programs but it is
29     generic enough to work in many other contexts.
30 
31     by Perry R. Cook and Gary P. Scavone, 1995--2021.
32 */
33 /***************************************************/
34 
35 #include "Messager.h"
36 #include <iostream>
37 #include <algorithm>
38 #include "SKINImsg.h"
39 
40 namespace stk {
41 
42 #if defined(__STK_REALTIME__)
43 
44 extern "C" THREAD_RETURN THREAD_TYPE stdinHandler(void * ptr);
45 extern "C" THREAD_RETURN THREAD_TYPE socketHandler(void * ptr);
46 
47 #endif // __STK_REALTIME__
48 
49 typedef int MessagerSourceType;
50 MessagerSourceType STK_FILE   = 0x1;
51 MessagerSourceType STK_MIDI   = 0x2;
52 MessagerSourceType STK_STDIN   = 0x4;
53 MessagerSourceType STK_SOCKET = 0x8;
54 
Messager()55 Messager :: Messager()
56 {
57   data_.sources = 0;
58   data_.queueLimit = DEFAULT_QUEUE_LIMIT;
59 #if defined(__STK_REALTIME__)
60   data_.socket = 0;
61   data_.midi = 0;
62 #endif
63 }
64 
~Messager()65 Messager :: ~Messager()
66 {
67   // Clear the queue in case any thread is waiting on its limit.
68 #if defined(__STK_REALTIME__)
69   data_.mutex.lock();
70 #endif
71   while ( data_.queue.size() ) data_.queue.pop();
72   data_.sources = 0;
73 
74 #if defined(__STK_REALTIME__)
75   data_.mutex.unlock();
76   if ( data_.socket ) {
77     socketThread_.wait();
78     delete data_.socket;
79   }
80 
81   if ( data_.midi ) delete data_.midi;
82 #endif
83 }
84 
setScoreFile(const char * filename)85 bool Messager :: setScoreFile( const char* filename )
86 {
87   if ( data_.sources ) {
88     if ( data_.sources == STK_FILE ) {
89       oStream_ << "Messager::setScoreFile: already reading a scorefile!";
90       handleError( StkError::WARNING );
91     }
92     else {
93       oStream_ << "Messager::setScoreFile: already reading realtime control input ... cannot do scorefile input too!";
94       handleError( StkError::WARNING );
95     }
96     return false;
97   }
98 
99   if ( !data_.skini.setFile( filename ) ) return false;
100   data_.sources = STK_FILE;
101   return true;
102 }
103 
popMessage(Skini::Message & message)104 void Messager :: popMessage( Skini::Message& message )
105 {
106   if ( data_.sources == STK_FILE ) { // scorefile input
107     if ( !data_.skini.nextMessage( message ) )
108       message.type = __SK_Exit_;
109     return;
110   }
111 
112   if ( data_.queue.size() == 0 ) {
113     // An empty (or invalid) message is indicated by a type = 0.
114     message.type = 0;
115     return;
116   }
117 
118   // Copy queued message to the message pointer structure and then "pop" it.
119 #if defined(__STK_REALTIME__)
120   data_.mutex.lock();
121 #endif
122   message = data_.queue.front();
123   data_.queue.pop();
124 #if defined(__STK_REALTIME__)
125   data_.mutex.unlock();
126 #endif
127 }
128 
pushMessage(Skini::Message & message)129 void Messager :: pushMessage( Skini::Message& message )
130 {
131 #if defined(__STK_REALTIME__)
132   data_.mutex.lock();
133 #endif
134   data_.queue.push( message );
135 #if defined(__STK_REALTIME__)
136   data_.mutex.unlock();
137 #endif
138 }
139 
140 #if defined(__STK_REALTIME__)
141 
startStdInput()142 bool Messager :: startStdInput()
143 {
144   if ( data_.sources == STK_FILE ) {
145     oStream_ << "Messager::startStdInput: already reading a scorefile ... cannot do realtime control input too!";
146     handleError( StkError::WARNING );
147     return false;
148   }
149 
150   if ( data_.sources & STK_STDIN ) {
151     oStream_ << "Messager::startStdInput: stdin input thread already started.";
152     handleError( StkError::WARNING );
153     return false;
154   }
155 
156   // Start the stdin input thread.
157   if ( !stdinThread_.start( (THREAD_FUNCTION)&stdinHandler, &data_ ) ) {
158     oStream_ << "Messager::startStdInput: unable to start stdin input thread!";
159     handleError( StkError::WARNING );
160     return false;
161   }
162   data_.sources |= STK_STDIN;
163   return true;
164 }
165 
stdinHandler(void * ptr)166 THREAD_RETURN THREAD_TYPE stdinHandler(void *ptr)
167 {
168   Messager::MessagerData *data = (Messager::MessagerData *) ptr;
169   Skini::Message message;
170 
171   std::string line;
172   while ( !std::getline( std::cin, line).eof() ) {
173     if ( line.empty() ) continue;
174     if ( line.compare(0, 4, "Exit") == 0 || line.compare(0, 4, "exit") == 0 )
175       break;
176 
177     data->mutex.lock();
178     if ( data->skini.parseString( line, message ) )
179       data->queue.push( message );
180     data->mutex.unlock();
181 
182     while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
183   }
184 
185   // We assume here that if someone types an "exit" message in the
186   // terminal window, all processing should stop.
187   message.type = __SK_Exit_;
188   data->queue.push( message );
189   data->sources &= ~STK_STDIN;
190 
191   return NULL;
192 }
193 
midiHandler(double timeStamp,std::vector<unsigned char> * bytes,void * ptr)194 void midiHandler( double timeStamp, std::vector<unsigned char> *bytes, void *ptr )
195 {
196   if ( bytes->size() < 2 ) return;
197 
198   // Parse the MIDI bytes ... only keep MIDI channel messages.
199   if ( bytes->at(0) > 239 ) return;
200 
201   Messager::MessagerData *data = (Messager::MessagerData *) ptr;
202   Skini::Message message;
203 
204   message.type = bytes->at(0) & 0xF0;
205   message.channel = bytes->at(0) & 0x0F;
206   message.time = 0.0; // realtime messages should have delta time = 0.0
207   message.intValues[0] = bytes->at(1);
208   message.floatValues[0] = (StkFloat) message.intValues[0];
209   if ( ( message.type != 0xC0 ) && ( message.type != 0xD0 ) ) {
210     if ( bytes->size() < 3 ) return;
211     message.intValues[1] = bytes->at(2);
212     if ( message.type == 0xE0 ) { // combine pithbend into single "14-bit" value
213       message.intValues[0] += message.intValues[1] <<= 7;
214       message.floatValues[0] = (StkFloat) message.intValues[0];
215       message.intValues[1] = 0;
216     }
217     else
218       message.floatValues[1] = (StkFloat) message.intValues[1];
219   }
220 
221   while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
222 
223   data->mutex.lock();
224   data->queue.push( message );
225   data->mutex.unlock();
226 }
227 
startMidiInput(int port)228 bool Messager :: startMidiInput( int port )
229 {
230   if ( data_.sources == STK_FILE ) {
231     oStream_ << "Messager::startMidiInput: already reading a scorefile ... cannot do realtime control input too!";
232     handleError( StkError::WARNING );
233     return false;
234   }
235 
236   if ( data_.sources & STK_MIDI ) {
237     oStream_ << "Messager::startMidiInput: MIDI input already started.";
238     handleError( StkError::WARNING );
239     return false;
240   }
241 
242   // First start the stdin input thread if it isn't already running
243   // (to allow the user to exit).
244   if ( !( data_.sources & STK_STDIN ) ) {
245     if ( this->startStdInput() == false ) {
246       oStream_ << "Messager::startMidiInput: unable to start input from stdin.";
247       handleError( StkError::WARNING );
248       return false;
249     }
250   }
251 
252   try {
253     data_.midi = new RtMidiIn();
254     data_.midi->setCallback( &midiHandler, (void *) &data_ );
255     if ( port == -1 ) data_.midi->openVirtualPort();
256     else data_.midi->openPort( (unsigned int)port );
257   }
258   catch ( RtMidiError &error ) {
259     oStream_ << "Messager::startMidiInput: error creating RtMidiIn instance (" << error.getMessage() << ").";
260     handleError( StkError::WARNING );
261     return false;
262   }
263 
264   data_.sources |= STK_MIDI;
265   return true;
266 }
267 
startSocketInput(int port)268 bool Messager :: startSocketInput( int port )
269 {
270   if ( data_.sources == STK_FILE ) {
271     oStream_ << "Messager::startSocketInput: already reading a scorefile ... cannot do realtime control input too!";
272     handleError( StkError::WARNING );
273     return false;
274   }
275 
276   if ( data_.sources & STK_SOCKET ) {
277     oStream_ << "Messager::startSocketInput: socket input thread already started.";
278     handleError( StkError::WARNING );
279     return false;
280   }
281 
282   // Create the socket server.
283   try {
284     data_.socket = new TcpServer( port );
285   }
286   catch ( StkError& ) {
287     return false;
288   }
289 
290   oStream_ << "Socket server listening for connection(s) on port " << port << "...";
291   handleError( StkError::STATUS );
292 
293   // Initialize socket descriptor information.
294   FD_ZERO(&data_.mask);
295   int fd = data_.socket->id();
296   FD_SET( fd, &data_.mask );
297   data_.fd.push_back( fd );
298 
299   // Start the socket thread.
300   if ( !socketThread_.start( (THREAD_FUNCTION)&socketHandler, &data_ ) ) {
301     oStream_ << "Messager::startSocketInput: unable to start socket input thread!";
302     handleError( StkError::WARNING );
303     return false;
304   }
305 
306   data_.sources |= STK_SOCKET;
307   return true;
308 }
309 
310 #if (defined(__OS_IRIX__) || defined(__OS_LINUX__) || defined(__OS_MACOSX__))
311   #include <sys/time.h>
312   #include <errno.h>
313 #endif
314 
socketHandler(void * ptr)315 THREAD_RETURN THREAD_TYPE socketHandler(void *ptr)
316 {
317   Messager::MessagerData *data = (Messager::MessagerData *) ptr;
318   Skini::Message message;
319   std::vector<int>& fd = data->fd;
320 
321   struct timeval timeout;
322   fd_set rmask;
323   int newfd;
324   unsigned int i;
325   const int bufferSize = 1024;
326   char buffer[bufferSize];
327   int index = 0, bytesRead = 0;
328   std::string line;
329   std::vector<int> fdclose;
330 
331   while ( data->sources & STK_SOCKET ) {
332 
333     // Use select function to periodically poll socket desriptors.
334     rmask = data->mask;
335     timeout.tv_sec = 0; timeout.tv_usec = 50000; // 50 milliseconds
336     if ( select( fd.back()+1, &rmask, (fd_set *)0, (fd_set *)0, &timeout ) <= 0 ) continue;
337 
338     // A file descriptor is set.  Check if there's a new socket connection available.
339     if ( FD_ISSET( data->socket->id(), &rmask ) ) {
340 
341       // Accept and service new connection.
342       newfd = data->socket->accept();
343       if ( newfd >= 0 ) {
344         std::cout << "New socket connection made.\n" << std::endl;
345 
346         // Set the socket to non-blocking mode.
347         Socket::setBlocking( newfd, false );
348 
349         // Save the descriptor and update the masks.
350         fd.push_back( newfd );
351         std::sort( fd.begin(), data->fd.end() );
352         FD_SET( newfd, &data->mask );
353         FD_CLR( data->socket->id(), &rmask );
354       }
355       else
356         std::cerr << "Messager: Couldn't accept connection request!\n";
357     }
358 
359     // Check the other descriptors.
360     for ( i=0; i<fd.size(); i++ ) {
361 
362       if ( !FD_ISSET( fd[i], &rmask ) ) continue;
363 
364       // This connection has data.  Read and parse it.
365       bytesRead = 0;
366       index = 0;
367 #if ( defined(__OS_IRIX__) || defined(__OS_LINUX__) || defined(__OS_MACOSX__) )
368       errno = 0;
369       while (bytesRead != -1 && errno != EAGAIN) {
370 #elif defined(__OS_WINDOWS__)
371       while (bytesRead != SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) {
372 #endif
373 
374         while ( index < bytesRead ) {
375           line += buffer[index];
376           if ( buffer[index++] == '\n' ) {
377             data->mutex.lock();
378             if ( line.compare(0, 4, "Exit") == 0 || line.compare(0, 4, "exit") == 0 ) {
379               // Ignore this line and assume the connection will be
380               // closed on a subsequent read call.
381               ;
382             }
383             else if ( data->skini.parseString( line, message ) )
384               data->queue.push( message );
385             data->mutex.unlock();
386             line.erase();
387           }
388         }
389         index = 0;
390 
391         bytesRead = Socket::readBuffer(fd[i], buffer, bufferSize, 0);
392         if (bytesRead == 0) {
393           // This socket connection closed.
394           FD_CLR( fd[i], &data->mask );
395           Socket::close( fd[i] );
396           fdclose.push_back( fd[i] );
397         }
398       }
399     }
400 
401     // Now remove descriptors for closed connections.
402     for ( i=0; i<fdclose.size(); i++ ) {
403       for ( unsigned int j=0; j<fd.size(); j++ ) {
404         if ( fd[j] == fdclose[i] ) {
405           fd.erase( fd.begin() + j );
406           break;
407         }
408       }
409 
410       // Check to see whether all connections are closed.  Note that
411       // the server descriptor will always remain.
412       if ( fd.size() == 1 ) {
413         data->sources &= ~STK_SOCKET;
414         if ( data->sources & STK_MIDI )
415           std::cout << "MIDI input still running ... type 'exit<cr>' to quit.\n" << std::endl;
416         else if ( !(data->sources & STK_STDIN) ) {
417           // No stdin thread running, so quit now.
418           message.type = __SK_Exit_;
419           data->queue.push( message );
420         }
421       }
422       fdclose.clear();
423     }
424 
425     // Wait until we're below the queue limit.
426     while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
427   }
428 
429   return NULL;
430 }
431 
432 #endif
433 
434 } // stk namespace
435 
436