1 // StreamBinder.cpp
2
3 #include "StdAfx.h"
4
5 #include "../../Common/MyCom.h"
6
7 #include "StreamBinder.h"
8
9 class CBinderInStream:
10 public ISequentialInStream,
11 public CMyUnknownImp
12 {
13 CStreamBinder *_binder;
14 public:
15 MY_UNKNOWN_IMP1(ISequentialInStream)
16 STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
~CBinderInStream()17 ~CBinderInStream() { _binder->CloseRead_CallOnce(); }
CBinderInStream(CStreamBinder * binder)18 CBinderInStream(CStreamBinder *binder): _binder(binder) {}
19 };
20
Read(void * data,UInt32 size,UInt32 * processedSize)21 STDMETHODIMP CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)
22 { return _binder->Read(data, size, processedSize); }
23
24 class CBinderOutStream:
25 public ISequentialOutStream,
26 public CMyUnknownImp
27 {
28 CStreamBinder *_binder;
29 public:
30 MY_UNKNOWN_IMP1(ISequentialOutStream)
31 STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
~CBinderOutStream()32 ~CBinderOutStream() { _binder->CloseWrite(); }
CBinderOutStream(CStreamBinder * binder)33 CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
34 };
35
Write(const void * data,UInt32 size,UInt32 * processedSize)36 STDMETHODIMP CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)
37 { return _binder->Write(data, size, processedSize); }
38
39
Event__Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent & event)40 static HRESULT Event__Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent &event)
41 {
42 WRes wres;
43 if (event.IsCreated())
44 wres = event.Reset();
45 else
46 wres = event.Create();
47 return HRESULT_FROM_WIN32(wres);
48 }
49
Create_ReInit()50 HRESULT CStreamBinder::Create_ReInit()
51 {
52 RINOK(Event__Create_or_Reset(_canRead_Event));
53 // RINOK(Event__Create_or_Reset(_canWrite_Event));
54
55 // _canWrite_Semaphore.Close();
56 // we need at least 3 items of maxCount: 1 for normal unlock in Read(), 2 items for unlock in CloseRead_CallOnce()
57 _canWrite_Semaphore.OptCreateInit(0, 3);
58
59 // _readingWasClosed = false;
60 _readingWasClosed2 = false;
61
62 _waitWrite = true;
63 _bufSize = 0;
64 _buf = NULL;
65 ProcessedSize = 0;
66 // WritingWasCut = false;
67 return S_OK;
68 }
69
70
CreateStreams2(CMyComPtr<ISequentialInStream> & inStream,CMyComPtr<ISequentialOutStream> & outStream)71 void CStreamBinder::CreateStreams2(CMyComPtr<ISequentialInStream> &inStream, CMyComPtr<ISequentialOutStream> &outStream)
72 {
73 inStream = new CBinderInStream(this);
74 outStream = new CBinderOutStream(this);
75 }
76
77 // (_canRead_Event && _bufSize == 0) means that stream is finished.
78
Read(void * data,UInt32 size,UInt32 * processedSize)79 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
80 {
81 if (processedSize)
82 *processedSize = 0;
83 if (size != 0)
84 {
85 if (_waitWrite)
86 {
87 WRes wres = _canRead_Event.Lock();
88 if (wres != 0)
89 return HRESULT_FROM_WIN32(wres);
90 _waitWrite = false;
91 }
92 if (size > _bufSize)
93 size = _bufSize;
94 if (size != 0)
95 {
96 memcpy(data, _buf, size);
97 _buf = ((const Byte *)_buf) + size;
98 ProcessedSize += size;
99 if (processedSize)
100 *processedSize = size;
101 _bufSize -= size;
102
103 /*
104 if (_bufSize == 0), then we have read whole buffer
105 we have two ways here:
106 - if we check (_bufSize == 0) here, we unlock Write only after full data Reading - it reduces the number of syncs
107 - if we don't check (_bufSize == 0) here, we unlock Write after partial data Reading
108 */
109 if (_bufSize == 0)
110 {
111 _waitWrite = true;
112 // _canWrite_Event.Set();
113 _canWrite_Semaphore.Release();
114 }
115 }
116 }
117 return S_OK;
118 }
119
120
Write(const void * data,UInt32 size,UInt32 * processedSize)121 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
122 {
123 if (processedSize)
124 *processedSize = 0;
125 if (size == 0)
126 return S_OK;
127
128 if (!_readingWasClosed2)
129 {
130 _buf = data;
131 _bufSize = size;
132 _canRead_Event.Set();
133
134 /*
135 _canWrite_Event.Lock();
136 if (_readingWasClosed)
137 _readingWasClosed2 = true;
138 */
139
140 _canWrite_Semaphore.Lock();
141
142 // _bufSize : is remain size that was not read
143 size -= _bufSize;
144
145 // size : is size of data that was read
146 if (size != 0)
147 {
148 // if some data was read, then we report that size and return
149 if (processedSize)
150 *processedSize = size;
151 return S_OK;
152 }
153 _readingWasClosed2 = true;
154 }
155
156 // WritingWasCut = true;
157 return k_My_HRESULT_WritingWasCut;
158 }
159