1 /*************************************************************************** 2 * * 3 * LinuxSampler - modular, streaming capable sampler * 4 * * 5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck * 6 * Copyright (C) 2005 - 2020 Christian Schoenebeck * 7 * Copyright (C) 2009 - 2012 Grigor Iliev * 8 * Copyright (C) 2013 - 2016 Andreas Persson * 9 * * 10 * This program is free software; you can redistribute it and/or modify * 11 * it under the terms of the GNU General Public License as published by * 12 * the Free Software Foundation; either version 2 of the License, or * 13 * (at your option) any later version. * 14 * * 15 * This program is distributed in the hope that it will be useful, * 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 18 * GNU General Public License for more details. * 19 * * 20 * You should have received a copy of the GNU General Public License * 21 * along with this program; if not, write to the Free Software * 22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, * 23 * MA 02111-1307 USA * 24 ***************************************************************************/ 25 26 #ifndef __LS_DISKTHREADBASE_H__ 27 #define __LS_DISKTHREADBASE_H__ 28 29 #include <map> 30 31 #include "StreamBase.h" 32 #include "../EngineChannel.h" 33 #include "../InstrumentManagerBase.h" 34 35 #include "../../common/global_private.h" 36 37 #include "../../common/Thread.h" 38 #include "../../common/RingBuffer.h" 39 #include "../../common/atomic.h" 40 41 namespace LinuxSampler { 42 43 int CompareStreamWriteSpace(const void* A, const void* B); 44 45 /** @brief Disk Reader Thread 46 * 47 * The disk reader thread is responsible for periodically refilling 48 * disk streams in parallel to the audio thread's rendering process. 49 * 50 * There is also a function for releasing parts of instruments not 51 * in use anymore (as this is not real time safe, the audio thread 52 * cannot do it directly). 53 */ 54 template <class R /* Resource */, class IM /* Instrument Manager */> 55 class DiskThreadBase : public Thread { 56 private: 57 // Private Types 58 struct create_command_t { 59 Stream::OrderID_t OrderID; 60 Stream::Handle hStream; 61 Stream::reference_t* pStreamRef; 62 R* pRegion; 63 unsigned long SampleOffset; 64 bool DoLoop; 65 }; 66 struct delete_command_t { 67 Stream* pStream; 68 Stream::Handle hStream; 69 Stream::OrderID_t OrderID; 70 bool bNotify; 71 }; 72 struct program_change_command_t { 73 uint32_t Program; 74 EngineChannel* pEngineChannel; 75 }; 76 // Attributes 77 bool IsIdle; 78 uint Streams; 79 RingBuffer<create_command_t,false>* CreationQueue; ///< Contains commands to create streams 80 RingBuffer<delete_command_t,false>* DeletionQueue; ///< Contains commands to delete streams 81 RingBuffer<delete_command_t,false>* GhostQueue; ///< Contains handles to streams that are not used anymore and weren't deletable immediately 82 RingBuffer<Stream::Handle,false> DeletionNotificationQueue; ///< In case the original sender requested a notification for its stream deletion order, this queue will receive the handle of the respective stream once actually be deleted by the disk thread. 83 RingBuffer<R*,false>* DeleteRegionQueue; ///< Contains dimension regions that are not used anymore and should be handed back to the instrument resource manager 84 RingBuffer<program_change_command_t,false> ProgramChangeQueue; ///< Contains requests for MIDI program change 85 unsigned int RefillStreamsPerRun; ///< How many streams should be refilled in each loop run 86 Stream** pStreams; ///< Contains all disk streams (whether used or unused) 87 Stream** pCreatedStreams; ///< This is where the voice (audio thread) picks up it's meanwhile hopefully created disk stream. 88 static Stream* SLOT_RESERVED; ///< This value is used to mark an entry in pCreatedStreams[] as reserved. 89 90 // Methods 91 CreateStream(create_command_t & Command)92 void CreateStream(create_command_t& Command) { 93 // search for unused stream 94 Stream* newstream = NULL; 95 for (int i = Streams - 1; i >= 0; i--) { 96 if (pStreams[i]->GetState() == Stream::state_unused) { 97 newstream = pStreams[i]; 98 break; 99 } 100 } 101 if (!newstream) { 102 std::cerr << "No unused stream found (OrderID:" << Command.OrderID; 103 std::cerr << ") - report if this happens, this is a bug!\n" << std::flush; 104 return; 105 } 106 LaunchStream(newstream, Command.hStream, Command.pStreamRef, Command.pRegion, Command.SampleOffset, Command.DoLoop); 107 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream)); 108 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) { 109 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush; 110 newstream->Kill(); 111 return; 112 } 113 pCreatedStreams[Command.OrderID] = newstream; 114 } 115 DeleteStream(delete_command_t & Command)116 void DeleteStream(delete_command_t& Command) { 117 if (Command.pStream) { 118 Command.pStream->Kill(); 119 if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream); 120 } 121 else { // the stream wasn't created by disk thread or picked up by audio thread yet 122 123 // if stream was created but not picked up yet 124 Stream* pStream = pCreatedStreams[Command.OrderID]; 125 if (pStream && pStream != SLOT_RESERVED) { 126 pStream->Kill(); 127 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order 128 // if original sender requested a notification, let him know now 129 if (Command.bNotify) 130 DeletionNotificationQueue.push(&Command.hStream); 131 return; 132 } 133 134 // the stream was not created yet 135 if (GhostQueue->write_space() > 0) { 136 GhostQueue->push(&Command); 137 } else { // error, queue full 138 if (Command.bNotify) { 139 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n")); 140 } else { 141 dmsg(1,("DiskThread: GhostQueue full!\n")); 142 } 143 } 144 } 145 } 146 RefillStreams()147 void RefillStreams() { 148 // sort the streams by most empty stream 149 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace); 150 151 // refill the most empty streams 152 for (uint i = 0; i < RefillStreamsPerRun; i++) { 153 if (pStreams[i]->GetState() == Stream::state_active) { 154 155 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0; 156 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage)); 157 158 int writespace = pStreams[i]->GetWriteSpaceToEnd(); 159 if (writespace == 0) break; 160 161 int capped_writespace = writespace; 162 // if there is too much buffer space available then cut the read/write 163 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes 164 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE; 165 166 // adjust the amount to read in order to ensure that the buffer wraps correctly 167 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace); 168 // if we wasn't able to refill one of the stream buffers by more than 169 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later 170 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false; 171 } 172 } 173 } 174 CreateHandle()175 Stream::Handle CreateHandle() { 176 static uint32_t counter = 0; 177 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0 178 else counter++; 179 return counter; 180 } 181 CreateOrderID()182 Stream::OrderID_t CreateOrderID() { 183 static Stream::OrderID_t counter(0); 184 for (int i = 0; i < Streams; i++) { 185 if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0 186 else counter++; 187 if (!pCreatedStreams[counter]) { 188 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved 189 return counter; // found empty slot 190 } 191 } 192 return 0; // no free slot 193 } 194 195 atomic_t ActiveStreamCount; 196 public: 197 // Methods DiskThreadBase(int MaxStreams,uint BufferWrapElements,IM * pInstruments)198 DiskThreadBase(int MaxStreams, uint BufferWrapElements, IM* pInstruments) : 199 Thread(true, false, 1, -2), 200 DeletionNotificationQueue(4*MaxStreams), 201 ProgramChangeQueue(512), 202 pInstruments(pInstruments) 203 { 204 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams); 205 DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams); 206 GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams); 207 DeleteRegionQueue = new RingBuffer<R*,false>(4*MaxStreams); 208 pStreams = new Stream*[MaxStreams]; 209 pCreatedStreams = new Stream*[MaxStreams + 1]; 210 Streams = MaxStreams; 211 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN; 212 213 for (int i = 1; i <= MaxStreams; i++) { 214 pCreatedStreams[i] = NULL; 215 } 216 ActiveStreamCountMax = 0; 217 } 218 ~DiskThreadBase()219 virtual ~DiskThreadBase() { 220 for (int i = 0; i < Streams; i++) { 221 if (pStreams[i]) delete pStreams[i]; 222 } 223 if (CreationQueue) delete CreationQueue; 224 if (DeletionQueue) delete DeletionQueue; 225 if (GhostQueue) delete GhostQueue; 226 if (DeleteRegionQueue) delete DeleteRegionQueue; 227 if (pStreams) delete[] pStreams; 228 if (pCreatedStreams) delete[] pCreatedStreams; 229 } 230 231 232 // ######################################################################### 233 // # Foreign Thread Section 234 // # (following code intended to be interface for audio thread) 235 236 /** 237 * Suspend disk thread, kill all active streams, clear all queues and the 238 * pickup array and reset all streams. Call this method to bring everything 239 * in the disk thread to day one. If the disk thread was running, it will be 240 * respawned right after everything was reset. 241 */ Reset()242 void Reset() { 243 bool running = this->IsRunning(); 244 if (running) this->StopThread(); 245 for (int i = 0; i < Streams; i++) { 246 pStreams[i]->Kill(); 247 } 248 for (int i = 1; i <= Streams; i++) { 249 pCreatedStreams[i] = NULL; 250 } 251 GhostQueue->init(); 252 CreationQueue->init(); 253 DeletionQueue->init(); 254 DeletionNotificationQueue.init(); 255 256 // make sure that all DimensionRegions are released 257 while (DeleteRegionQueue->read_space() > 0) { 258 R* pRgn; 259 DeleteRegionQueue->pop(&pRgn); 260 pInstruments->HandBackRegion(pRgn); 261 } 262 DeleteRegionQueue->init(); 263 SetActiveStreamCount(0); 264 ActiveStreamCountMax = 0; 265 if (running) this->StartThread(); // start thread only if it was running before 266 } 267 GetBufferFillBytes()268 String GetBufferFillBytes() { 269 bool activestreams = false; 270 std::stringstream ss; 271 for (uint i = 0; i < this->Streams; i++) { 272 if (pStreams[i]->GetState() == Stream::state_unused) continue; 273 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t); 274 uint streamid = (uint) pStreams[i]->GetHandle(); 275 if (!streamid) continue; 276 277 if (activestreams) ss << ",[" << streamid << ']' << bufferfill; 278 else { 279 ss << '[' << streamid << ']' << bufferfill; 280 activestreams = true; 281 } 282 } 283 return ss.str(); 284 } 285 GetBufferFillPercentage()286 String GetBufferFillPercentage() { 287 bool activestreams = false; 288 std::stringstream ss; 289 for (uint i = 0; i < this->Streams; i++) { 290 if (pStreams[i]->GetState() == Stream::state_unused) continue; 291 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100); 292 uint streamid = (uint) pStreams[i]->GetHandle(); 293 if (!streamid) continue; 294 295 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%'; 296 else { 297 ss << '[' << streamid << ']' << bufferfill; 298 activestreams = true; 299 } 300 } 301 return ss.str(); 302 } 303 304 /** 305 * Returns -1 if command queue or pickup pool is full, 0 on success (will be 306 * called by audio thread within the voice class). 307 */ OrderNewStream(Stream::reference_t * pStreamRef,R * pRegion,unsigned long SampleOffset,bool DoLoop)308 int OrderNewStream(Stream::reference_t* pStreamRef, R* pRegion, unsigned long SampleOffset, bool DoLoop) { 309 dmsg(4,("Disk Thread: new stream ordered\n")); 310 if (CreationQueue->write_space() < 1) { 311 dmsg(1,("DiskThread: Order queue full!\n")); 312 return -1; 313 } 314 315 const Stream::OrderID_t newOrder = CreateOrderID(); 316 if (!newOrder) { 317 dmsg(1,("DiskThread: there was no free slot\n")); 318 return -1; // there was no free slot 319 } 320 321 pStreamRef->State = Stream::state_active; 322 pStreamRef->OrderID = newOrder; 323 pStreamRef->hStream = CreateHandle(); 324 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first 325 326 create_command_t cmd; 327 cmd.OrderID = pStreamRef->OrderID; 328 cmd.hStream = pStreamRef->hStream; 329 cmd.pStreamRef = pStreamRef; 330 cmd.pRegion = pRegion; 331 cmd.SampleOffset = SampleOffset; 332 cmd.DoLoop = DoLoop; 333 334 CreationQueue->push(&cmd); 335 return 0; 336 } 337 338 /** 339 * Request the disk thread to delete the given disk stream. This method 340 * will return immediately, thus it won't block until the respective voice 341 * was actually deleted. (Called by audio thread within the Voice class). 342 * 343 * @param pStreamRef - stream that shall be deleted 344 * @param bRequestNotification - set to true in case you want to receive a 345 * notification once the stream has actually 346 * been deleted 347 * @returns 0 on success, -1 if command queue is full 348 * @see AskForDeletedStream() 349 */ 350 int OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification = false) { 351 dmsg(4,("Disk Thread: stream deletion ordered\n")); 352 if (DeletionQueue->write_space() < 1) { 353 dmsg(1,("DiskThread: Deletion queue full!\n")); 354 return -1; 355 } 356 357 delete_command_t cmd; 358 cmd.pStream = pStreamRef->pStream; 359 cmd.hStream = pStreamRef->hStream; 360 cmd.OrderID = pStreamRef->OrderID; 361 cmd.bNotify = bRequestNotification; 362 363 DeletionQueue->push(&cmd); 364 return 0; 365 } 366 367 /** 368 * Tell the disk thread to release a dimension region that belongs 369 * to an instrument which isn't loaded anymore. The disk thread 370 * will hand back the dimension region to the instrument resource 371 * manager. (OrderDeletionOfDimreg is called from the audio thread 372 * when a voice dies.) 373 */ OrderDeletionOfRegion(R * pReg)374 int OrderDeletionOfRegion(R* pReg) { 375 dmsg(4,("Disk Thread: dimreg deletion ordered\n")); 376 if (DeleteRegionQueue->write_space() < 1) { 377 dmsg(1,("DiskThread: DeleteRegion queue full!\n")); 378 return -1; 379 } 380 DeleteRegionQueue->push(&pReg); 381 return 0; 382 } 383 384 /** 385 * Tell the disk thread to do a program change on the specified 386 * EngineChannel. 387 */ OrderProgramChange(uint32_t Program,EngineChannel * pEngineChannel)388 int OrderProgramChange(uint32_t Program, EngineChannel* pEngineChannel) { 389 program_change_command_t cmd; 390 cmd.Program = Program; 391 cmd.pEngineChannel = pEngineChannel; 392 393 dmsg(4,("Disk Thread: program change ordered\n")); 394 if (ProgramChangeQueue.write_space() < 1) { 395 dmsg(1,("DiskThread: ProgramChange queue full!\n")); 396 return -1; 397 } 398 ProgramChangeQueue.push(&cmd); 399 return 0; 400 } 401 402 /** 403 * Returns the pointer to a disk stream if the ordered disk stream 404 * represented by the \a StreamOrderID was already activated by the disk 405 * thread, returns NULL otherwise. If the call was successful, thus if it 406 * returned a valid stream pointer, the caller has to the store that pointer 407 * by himself, because it's not possible to call this method again with the 408 * same used order ID; this method is just intended for picking up an ordered 409 * disk stream. This method will usually be called by the voice class (within 410 * the audio thread). 411 * 412 * @param StreamOrderID - ID previously returned by OrderNewStream() 413 * @returns pointer to created stream object, NULL otherwise 414 */ AskForCreatedStream(Stream::OrderID_t StreamOrderID)415 Stream* AskForCreatedStream(Stream::OrderID_t StreamOrderID) { 416 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID)); 417 Stream* pStream = pCreatedStreams[StreamOrderID]; 418 if (pStream && pStream != SLOT_RESERVED) { 419 dmsg(4,("(yes created)\n")); 420 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order 421 return pStream; 422 } 423 dmsg(4,("(no not yet created)\n")); 424 return NULL; 425 } 426 427 /** 428 * In case the original sender requested a notification with his stream 429 * deletion order, he can use this method to poll if the respective stream 430 * has actually been deleted. 431 * 432 * @returns handle / identifier of the deleted stream, or 433 * Stream::INVALID_HANDLE if no notification present 434 */ AskForDeletedStream()435 Stream::Handle AskForDeletedStream() { 436 if (DeletionNotificationQueue.read_space()) { 437 Stream::Handle hStream; 438 DeletionNotificationQueue.pop(&hStream); 439 return hStream; 440 } else return Stream::INVALID_HANDLE; // no notification received yet 441 } 442 443 // the number of streams currently in usage 444 // printed on the console the main thread (along with the active voice count) GetActiveStreamCount()445 uint GetActiveStreamCount() { return atomic_read(&ActiveStreamCount); } SetActiveStreamCount(uint Streams)446 void SetActiveStreamCount(uint Streams) { atomic_set(&ActiveStreamCount, Streams); } 447 int ActiveStreamCountMax; 448 449 protected: 450 IM* pInstruments; ///< The instrument resource manager of the engine that is using this disk thread. Used by the dimension region deletion feature. 451 452 // ######################################################################### 453 // # Disk Thread Only Section 454 // # (following code should only be executed by the disk thread) 455 456 // Implementation of virtual method from class Thread Main()457 int Main() { 458 dmsg(3,("Disk thread running\n")); 459 460 #if DEBUG 461 Thread::setNameOfCaller("DiskIO"); 462 #endif 463 464 while (true) { 465 466 TestCancel(); 467 468 IsIdle = true; // will be set to false if a stream got filled 469 470 // prevent disk thread from being cancelled 471 // (e.g. to prevent deadlocks while holding mutex lock(s)) 472 pushCancelable(false); 473 474 // if there are ghost streams, delete them 475 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient 476 delete_command_t ghostStream; 477 GhostQueue->pop(&ghostStream); 478 bool found = false; 479 for (int i = 0; i < this->Streams; i++) { 480 if (pStreams[i]->GetHandle() == ghostStream.hStream) { 481 pStreams[i]->Kill(); 482 found = true; 483 // if original sender requested a notification, let him know now 484 if (ghostStream.bNotify) 485 DeletionNotificationQueue.push(&ghostStream.hStream); 486 break; 487 } 488 } 489 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue 490 } 491 492 // if there are creation commands, create new streams 493 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) { 494 create_command_t command; 495 CreationQueue->pop(&command); 496 CreateStream(command); 497 } 498 499 // if there are deletion commands, delete those streams 500 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) { 501 delete_command_t command; 502 DeletionQueue->pop(&command); 503 DeleteStream(command); 504 } 505 506 // release DimensionRegions that belong to instruments 507 // that are no longer loaded 508 while (DeleteRegionQueue->read_space() > 0) { 509 R* pRgn; 510 DeleteRegionQueue->pop(&pRgn); 511 pInstruments->HandBackRegion(pRgn); 512 } 513 514 // perform MIDI program change commands 515 if (ProgramChangeQueue.read_space() > 0) { 516 program_change_command_t cmd; 517 std::map<EngineChannel*,uint32_t> cmds; 518 // skip all old ones, only process the latest program 519 // change command on each engine channel 520 do { 521 ProgramChangeQueue.pop(&cmd); 522 cmds[cmd.pEngineChannel] = cmd.Program; 523 } while (ProgramChangeQueue.read_space() > 0); 524 // now execute those latest program change commands on 525 // their respective engine channel 526 for (std::map<EngineChannel*,uint32_t>::const_iterator it = cmds.begin(); 527 it != cmds.end(); ++it) 528 { 529 it->first->ExecuteProgramChange(it->second); 530 } 531 } 532 533 RefillStreams(); // refill the most empty streams 534 535 int streamsInUsage = 0; 536 for (int i = Streams - 1; i >= 0; i--) { 537 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++; 538 } 539 SetActiveStreamCount(streamsInUsage); 540 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage; 541 542 // now allow disk thread being cancelled again 543 // (since all mutexes are now unlocked and data structures 544 // are at consistent states) 545 popCancelable(); 546 547 // if nothing was done during this iteration (i.e. no 548 // stream buffer filled with data) then sleep for 30ms 549 if (IsIdle) 550 usleep(30000); //NOTE: defined as cancellation point by POSIX 551 } 552 553 return EXIT_FAILURE; 554 } 555 556 virtual Stream* CreateStream(long BufferSize, uint BufferWrapElements) = 0; 557 CreateAllStreams(int MaxStreams,uint BufferWrapElements)558 void CreateAllStreams(int MaxStreams, uint BufferWrapElements) { 559 for (int i = 0; i < MaxStreams; i++) { 560 pStreams[i] = CreateStream(CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); 561 } 562 } 563 564 virtual void LaunchStream ( 565 Stream* pStream, 566 Stream::Handle hStream, 567 Stream::reference_t* pExportReference, 568 R* pRgn, 569 unsigned long SampleOffset, 570 bool DoLoop 571 ) = 0; 572 573 friend class Stream; 574 }; 575 } // namespace LinuxSampler 576 577 #endif // __LS_DISKTHREADBASE_H__ 578