1 // StreamBinder.cpp
2
3 #include "StdAfx.h"
4
5 #include "StreamBinder.h"
6 #include "../../Common/Defs.h"
7 #include "../../Common/MyCom.h"
8
9 using namespace NWindows;
10 using namespace NSynchronization;
11
12 class CSequentialInStreamForBinder:
13 public ISequentialInStream,
14 public CMyUnknownImp
15 {
16 public:
17 MY_UNKNOWN_IMP
18
19 STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
20 private:
21 CStreamBinder *m_StreamBinder;
22 public:
~CSequentialInStreamForBinder()23 ~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); }
SetBinder(CStreamBinder * streamBinder)24 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
25 };
26
Read(void * data,UInt32 size,UInt32 * processedSize)27 STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
28 { return m_StreamBinder->Read(data, size, processedSize); }
29
30 class CSequentialOutStreamForBinder:
31 public ISequentialOutStream,
32 public CMyUnknownImp
33 {
34 public:
35 MY_UNKNOWN_IMP
36
37 STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
38
39 private:
40 CStreamBinder *m_StreamBinder;
41 public:
~CSequentialOutStreamForBinder()42 ~CSequentialOutStreamForBinder() { m_StreamBinder->CloseWrite(); }
SetBinder(CStreamBinder * streamBinder)43 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
44 };
45
Write(const void * data,UInt32 size,UInt32 * processedSize)46 STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
47 { return m_StreamBinder->Write(data, size, processedSize); }
48
49
50 //////////////////////////
51 // CStreamBinder
52 // (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished.
53
CreateEvents()54 void CStreamBinder::CreateEvents()
55 {
56 _allBytesAreWritenEvent = new CManualResetEvent(true);
57 _thereAreBytesToReadEvent = new CManualResetEvent(false);
58 _readStreamIsClosedEvent = new CManualResetEvent(false);
59 }
60
ReInit()61 void CStreamBinder::ReInit()
62 {
63 _thereAreBytesToReadEvent->Reset();
64 _readStreamIsClosedEvent->Reset();
65 ProcessedSize = 0;
66 }
67
~CStreamBinder()68 CStreamBinder::~CStreamBinder()
69 {
70 if (_allBytesAreWritenEvent != NULL)
71 delete _allBytesAreWritenEvent;
72 if (_thereAreBytesToReadEvent != NULL)
73 delete _thereAreBytesToReadEvent;
74 if (_readStreamIsClosedEvent != NULL)
75 delete _readStreamIsClosedEvent;
76 }
77
78
79
80
CreateStreams(ISequentialInStream ** inStream,ISequentialOutStream ** outStream)81 void CStreamBinder::CreateStreams(ISequentialInStream **inStream,
82 ISequentialOutStream **outStream)
83 {
84 CSequentialInStreamForBinder *inStreamSpec = new
85 CSequentialInStreamForBinder;
86 CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec);
87 inStreamSpec->SetBinder(this);
88 *inStream = inStreamLoc.Detach();
89
90 CSequentialOutStreamForBinder *outStreamSpec = new
91 CSequentialOutStreamForBinder;
92 CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec);
93 outStreamSpec->SetBinder(this);
94 *outStream = outStreamLoc.Detach();
95
96 _buffer = NULL;
97 _bufferSize= 0;
98 ProcessedSize = 0;
99 }
100
Read(void * data,UInt32 size,UInt32 * processedSize)101 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
102 {
103 UInt32 sizeToRead = size;
104 if (size > 0)
105 {
106 if(!_thereAreBytesToReadEvent->Lock())
107 return E_FAIL;
108 sizeToRead = MyMin(_bufferSize, size);
109 if (_bufferSize > 0)
110 {
111 MoveMemory(data, _buffer, sizeToRead);
112 _buffer = ((const Byte *)_buffer) + sizeToRead;
113 _bufferSize -= sizeToRead;
114 if (_bufferSize == 0)
115 {
116 _thereAreBytesToReadEvent->Reset();
117 _allBytesAreWritenEvent->Set();
118 }
119 }
120 }
121 if (processedSize != NULL)
122 *processedSize = sizeToRead;
123 ProcessedSize += sizeToRead;
124 return S_OK;
125 }
126
CloseRead()127 void CStreamBinder::CloseRead()
128 {
129 _readStreamIsClosedEvent->Set();
130 }
131
Write(const void * data,UInt32 size,UInt32 * processedSize)132 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
133 {
134 if (size > 0)
135 {
136 _buffer = data;
137 _bufferSize = size;
138 _allBytesAreWritenEvent->Reset();
139 _thereAreBytesToReadEvent->Set();
140
141 HANDLE events[2];
142 events[0] = *_allBytesAreWritenEvent;
143 events[1] = *_readStreamIsClosedEvent;
144 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
145 if (waitResult != WAIT_OBJECT_0 + 0)
146 {
147 // ReadingWasClosed = true;
148 return E_FAIL;
149 }
150 // if(!_allBytesAreWritenEvent.Lock())
151 // return E_FAIL;
152 }
153 if (processedSize != NULL)
154 *processedSize = size;
155 return S_OK;
156 }
157
CloseWrite()158 void CStreamBinder::CloseWrite()
159 {
160 // _bufferSize must be = 0
161 _thereAreBytesToReadEvent->Set();
162 }
163