1 // StreamBinder.cpp
2 
3 #include "StdAfx.h"
4 
5 #include "../../Common/MyCom.h"
6 
7 #include "StreamBinder.h"
8 
9 class CBinderInStream:
10   public ISequentialInStream,
11   public CMyUnknownImp
12 {
13   CStreamBinder *_binder;
14 public:
15   MY_UNKNOWN_IMP1(ISequentialInStream)
16   STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
~CBinderInStream()17   ~CBinderInStream() { _binder->CloseRead_CallOnce(); }
CBinderInStream(CStreamBinder * binder)18   CBinderInStream(CStreamBinder *binder): _binder(binder) {}
19 };
20 
Read(void * data,UInt32 size,UInt32 * processedSize)21 STDMETHODIMP CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)
22   { return _binder->Read(data, size, processedSize); }
23 
24 class CBinderOutStream:
25   public ISequentialOutStream,
26   public CMyUnknownImp
27 {
28   CStreamBinder *_binder;
29 public:
30   MY_UNKNOWN_IMP1(ISequentialOutStream)
31   STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
~CBinderOutStream()32   ~CBinderOutStream() { _binder->CloseWrite(); }
CBinderOutStream(CStreamBinder * binder)33   CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
34 };
35 
Write(const void * data,UInt32 size,UInt32 * processedSize)36 STDMETHODIMP CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)
37   { return _binder->Write(data, size, processedSize); }
38 
39 
Event__Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent & event)40 static HRESULT Event__Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent &event)
41 {
42   WRes wres;
43   if (event.IsCreated())
44     wres = event.Reset();
45   else
46     wres = event.Create();
47   return HRESULT_FROM_WIN32(wres);
48 }
49 
Create_ReInit()50 HRESULT CStreamBinder::Create_ReInit()
51 {
52   RINOK(Event__Create_or_Reset(_canRead_Event));
53   // RINOK(Event__Create_or_Reset(_canWrite_Event));
54 
55   // _canWrite_Semaphore.Close();
56   // we need at least 3 items of maxCount: 1 for normal unlock in Read(), 2 items for unlock in CloseRead_CallOnce()
57   _canWrite_Semaphore.OptCreateInit(0, 3);
58 
59   // _readingWasClosed = false;
60   _readingWasClosed2 = false;
61 
62   _waitWrite = true;
63   _bufSize = 0;
64   _buf = NULL;
65   ProcessedSize = 0;
66   // WritingWasCut = false;
67   return S_OK;
68 }
69 
70 
CreateStreams2(CMyComPtr<ISequentialInStream> & inStream,CMyComPtr<ISequentialOutStream> & outStream)71 void CStreamBinder::CreateStreams2(CMyComPtr<ISequentialInStream> &inStream, CMyComPtr<ISequentialOutStream> &outStream)
72 {
73   inStream = new CBinderInStream(this);
74   outStream = new CBinderOutStream(this);
75 }
76 
77 // (_canRead_Event && _bufSize == 0) means that stream is finished.
78 
Read(void * data,UInt32 size,UInt32 * processedSize)79 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
80 {
81   if (processedSize)
82     *processedSize = 0;
83   if (size != 0)
84   {
85     if (_waitWrite)
86     {
87       WRes wres = _canRead_Event.Lock();
88       if (wres != 0)
89         return HRESULT_FROM_WIN32(wres);
90       _waitWrite = false;
91     }
92     if (size > _bufSize)
93       size = _bufSize;
94     if (size != 0)
95     {
96       memcpy(data, _buf, size);
97       _buf = ((const Byte *)_buf) + size;
98       ProcessedSize += size;
99       if (processedSize)
100         *processedSize = size;
101       _bufSize -= size;
102 
103       /*
104       if (_bufSize == 0), then we have read whole buffer
105       we have two ways here:
106         - if we       check (_bufSize == 0) here, we unlock Write only after full data Reading - it reduces the number of syncs
107         - if we don't check (_bufSize == 0) here, we unlock Write after partial data Reading
108       */
109       if (_bufSize == 0)
110       {
111         _waitWrite = true;
112         // _canWrite_Event.Set();
113         _canWrite_Semaphore.Release();
114       }
115     }
116   }
117   return S_OK;
118 }
119 
120 
Write(const void * data,UInt32 size,UInt32 * processedSize)121 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
122 {
123   if (processedSize)
124     *processedSize = 0;
125   if (size == 0)
126     return S_OK;
127 
128   if (!_readingWasClosed2)
129   {
130     _buf = data;
131     _bufSize = size;
132     _canRead_Event.Set();
133 
134     /*
135     _canWrite_Event.Lock();
136     if (_readingWasClosed)
137       _readingWasClosed2 = true;
138     */
139 
140     _canWrite_Semaphore.Lock();
141 
142     // _bufSize : is remain size that was not read
143     size -= _bufSize;
144 
145     // size : is size of data that was read
146     if (size != 0)
147     {
148       // if some data was read, then we report that size and return
149       if (processedSize)
150         *processedSize = size;
151       return S_OK;
152     }
153     _readingWasClosed2 = true;
154   }
155 
156   // WritingWasCut = true;
157   return k_My_HRESULT_WritingWasCut;
158 }
159