1 /******************************************************************************/
2 /* Mednafen - Multi-system Emulator                                           */
3 /******************************************************************************/
4 /* MTStreamReader.h:
5 **  Copyright (C) 2019 Mednafen Team
6 **
7 ** This program is free software; you can redistribute it and/or
8 ** modify it under the terms of the GNU General Public License
9 ** as published by the Free Software Foundation; either version 2
10 ** of the License, or (at your option) any later version.
11 **
12 ** This program is distributed in the hope that it will be useful,
13 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ** GNU General Public License for more details.
16 **
17 ** You should have received a copy of the GNU General Public License
18 ** along with this program; if not, write to the Free Software Foundation, Inc.,
19 ** 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
20 */
21 
22 #include <mednafen/MThreading.h>
23 #include <mednafen/Stream.h>
24 
25 #ifndef __MDFN_MTSTREAMREADER_H
26 #define __MDFN_MTSTREAMREADER_H
27 
28 namespace Mednafen
29 {
30 //
31 // Prebuffer the entire buffer, then buffer in 1/4 size chunks(1/2 size chunks would create a race condition in get_buffer()).
32 //
33 class MTStreamReader
34 {
35  public:
36 
37  MTStreamReader(const uint64 affinity);
38  ~MTStreamReader();
39 
40  enum { Overbuffer_Count = 256 };
41  enum : size_t { Buffer_Size = 0x10000 }; // 0x100
42  enum : size_t { Buffer_Size_Mask = Buffer_Size - 1 };
43  enum : size_t { Chunk_Size = Buffer_Size / 4 };
44 
get_buffer(size_t count)45  INLINE uint8* get_buffer(size_t count)
46  {
47   //assert(count <= Overbuffer_Count);
48   if(MDFN_UNLIKELY(need_sync))
49   {
50    MThreading::Sem_Wait(ack_sem);
51    //
52    pending_command = Command_NOP;
53    //
54    MThreading::Sem_Post(command_sem);
55    //
56    need_sync = false;
57   }
58 
59   return &buffer[read_pos & Buffer_Size_Mask];
60  }
61 
advance(size_t count)62  INLINE void advance(size_t count)
63  {
64   const size_t prev_pos = read_pos;
65 
66   read_pos += count;
67 
68   if((prev_pos ^ read_pos) & Chunk_Size)
69   {
70    // Wait for command ack, then send read next buffer command to read thread
71    MThreading::Sem_Wait(ack_sem);
72    pending_command = Command_BufferNext;
73    MThreading::Sem_Post(command_sem);
74   }
75  }
76 
77  struct StreamInfo
78  {
79   std::unique_ptr<Stream> stream;
80   uint64 pos;	// [0, size)
81   //
82   uint64 size;
83   uint64 loop_pos; // [0, size)
84  };
85 
86  void add_stream(StreamInfo si);
87  INLINE void set_active_stream(uint32 w, uint64 skip = 0, uint32 pzb = 0)
88  {
89   assert(pzb <= sizeof(buffer));
90   MThreading::Sem_Wait(ack_sem);
91   //
92   pending_command = Command_SetActive;
93   pending_which = w;
94   pending_skip = skip;
95   pending_pzb = pzb;
96 
97   read_pos = 0;
98   write_pos = 0;
99   //
100   MThreading::Sem_Post(command_sem);
101   //
102   need_sync = true;
103  }
104 
105  private:
106 
107  static int read_thread_entry_(void* data);
108  int read_thread_entry(void);
109 
110  void cleanup(void);
111  void zero_into_buffer(uint32 count);
112  void read_into_buffer(uint32 count);
113 
114  enum
115  {
116   Command_SetActive = 0,
117   Command_BufferNext,
118   Command_NOP,
119   Command_Exit
120  };
121 
122  MThreading::Thread* thread = nullptr;
123  MThreading::Sem* command_sem = nullptr;
124  MThreading::Sem* ack_sem = nullptr;
125  uint32 pending_command = 0;
126  uint32 pending_which = 0;
127  uint64 pending_skip = 0;
128  uint32 pending_pzb = 0;
129  bool need_sync = false;
130 
131  std::vector<StreamInfo> streams;
132  StreamInfo* active = nullptr;
133  size_t write_pos = 0;
134  //
135  uint32 read_pos = 0;
136  uint8 buffer[Buffer_Size + Overbuffer_Count];
137 };
138 
139 }
140 #endif
141