1 /*
2 * Generic Implementation of COutputQueue
3 *
4 * Copyright 2011 Aric Stewart, CodeWeavers
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 */
20
21 #define COBJMACROS
22
23 #include "dshow.h"
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
28 #include "uuids.h"
29 #include "vfwmsgs.h"
30 #include <assert.h>
31
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
33
34 enum {SAMPLE_PACKET, EOS_PACKET};
35
36 typedef struct tagQueuedEvent {
37 int type;
38 struct list entry;
39
40 IMediaSample *pSample;
41 } QueuedEvent;
42
OutputQueue_InitialThreadProc(LPVOID data)43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
44 {
45 OutputQueue *This = (OutputQueue *)data;
46 return This->pFuncsTable->pfnThreadProc(This);
47 }
48
OutputQueue_FreeSamples(OutputQueue * pOutputQueue)49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
50 {
51 struct list *cursor, *cursor2;
52 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
53 {
54 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
55 list_remove(cursor);
56 HeapFree(GetProcessHeap(),0,qev);
57 }
58 }
59
OutputQueue_Construct(BaseOutputPin * pInputPin,BOOL bAuto,BOOL bQueue,LONG lBatchSize,BOOL bBatchExact,DWORD dwPriority,const OutputQueueFuncTable * pFuncsTable,OutputQueue ** ppOutputQueue)60 HRESULT WINAPI OutputQueue_Construct(
61 BaseOutputPin *pInputPin,
62 BOOL bAuto,
63 BOOL bQueue,
64 LONG lBatchSize,
65 BOOL bBatchExact,
66 DWORD dwPriority,
67 const OutputQueueFuncTable* pFuncsTable,
68 OutputQueue **ppOutputQueue )
69
70 {
71 BOOL threaded = FALSE;
72 DWORD tid;
73
74 OutputQueue *This;
75
76 if (!pInputPin || !pFuncsTable || !ppOutputQueue)
77 return E_INVALIDARG;
78
79 *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
80 if (!*ppOutputQueue)
81 return E_OUTOFMEMORY;
82
83 This = *ppOutputQueue;
84 This->pFuncsTable = pFuncsTable;
85 This->lBatchSize = lBatchSize;
86 This->bBatchExact = bBatchExact;
87 InitializeCriticalSection(&This->csQueue);
88 This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
89 list_init(&This->SampleList);
90
91 This->pInputPin = pInputPin;
92 IPin_AddRef(&pInputPin->pin.IPin_iface);
93
94 EnterCriticalSection(&This->csQueue);
95 if (bAuto && pInputPin->pMemInputPin)
96 threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin) == S_OK;
97 else
98 threaded = bQueue;
99
100 if (threaded)
101 {
102 This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
103 if (This->hThread)
104 {
105 SetThreadPriority(This->hThread, dwPriority);
106 This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
107 }
108 }
109 LeaveCriticalSection(&This->csQueue);
110
111 return S_OK;
112 }
113
OutputQueue_Destroy(OutputQueue * pOutputQueue)114 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
115 {
116 EnterCriticalSection(&pOutputQueue->csQueue);
117 OutputQueue_FreeSamples(pOutputQueue);
118 pOutputQueue->bTerminate = TRUE;
119 SetEvent(pOutputQueue->hProcessQueue);
120 LeaveCriticalSection(&pOutputQueue->csQueue);
121
122 pOutputQueue->csQueue.DebugInfo->Spare[0] = 0;
123 DeleteCriticalSection(&pOutputQueue->csQueue);
124 CloseHandle(pOutputQueue->hProcessQueue);
125
126 IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface);
127 HeapFree(GetProcessHeap(),0,pOutputQueue);
128 return S_OK;
129 }
130
OutputQueue_ReceiveMultiple(OutputQueue * pOutputQueue,IMediaSample ** ppSamples,LONG nSamples,LONG * nSamplesProcessed)131 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
132 {
133 HRESULT hr = S_OK;
134 int i;
135
136 if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
137 return VFW_E_NOT_CONNECTED;
138
139 if (!pOutputQueue->hThread)
140 {
141 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
142 hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
143 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
144 }
145 else
146 {
147 EnterCriticalSection(&pOutputQueue->csQueue);
148 *nSamplesProcessed = 0;
149
150 for (i = 0; i < nSamples; i++)
151 {
152 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
153 if (!qev)
154 {
155 ERR("Out of Memory\n");
156 hr = E_OUTOFMEMORY;
157 break;
158 }
159 qev->type = SAMPLE_PACKET;
160 qev->pSample = ppSamples[i];
161 IMediaSample_AddRef(ppSamples[i]);
162 list_add_tail(&pOutputQueue->SampleList, &qev->entry);
163 (*nSamplesProcessed)++;
164 }
165
166 if (!pOutputQueue->bBatchExact || list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
167 SetEvent(pOutputQueue->hProcessQueue);
168 LeaveCriticalSection(&pOutputQueue->csQueue);
169 }
170 return hr;
171 }
172
OutputQueue_Receive(OutputQueue * pOutputQueue,IMediaSample * pSample)173 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
174 {
175 LONG processed;
176 return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
177 }
178
OutputQueue_SendAnyway(OutputQueue * pOutputQueue)179 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
180 {
181 if (pOutputQueue->hThread)
182 {
183 EnterCriticalSection(&pOutputQueue->csQueue);
184 if (!list_empty(&pOutputQueue->SampleList))
185 {
186 pOutputQueue->bSendAnyway = TRUE;
187 SetEvent(pOutputQueue->hProcessQueue);
188 }
189 LeaveCriticalSection(&pOutputQueue->csQueue);
190 }
191 }
192
OutputQueue_EOS(OutputQueue * pOutputQueue)193 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
194 {
195 EnterCriticalSection(&pOutputQueue->csQueue);
196 if (pOutputQueue->hThread)
197 {
198 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
199 if (!qev)
200 {
201 ERR("Out of Memory\n");
202 LeaveCriticalSection(&pOutputQueue->csQueue);
203 return;
204 }
205 qev->type = EOS_PACKET;
206 qev->pSample = NULL;
207 list_add_tail(&pOutputQueue->SampleList, &qev->entry);
208 }
209 else
210 {
211 IPin* ppin = NULL;
212 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
213 if (ppin)
214 {
215 IPin_EndOfStream(ppin);
216 IPin_Release(ppin);
217 }
218 }
219 LeaveCriticalSection(&pOutputQueue->csQueue);
220 /* Covers sending the Event to the worker Thread */
221 OutputQueue_SendAnyway(pOutputQueue);
222 }
223
OutputQueueImpl_ThreadProc(OutputQueue * pOutputQueue)224 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
225 {
226 do
227 {
228 EnterCriticalSection(&pOutputQueue->csQueue);
229 if (!list_empty(&pOutputQueue->SampleList) &&
230 (!pOutputQueue->bBatchExact ||
231 list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
232 pOutputQueue->bSendAnyway
233 )
234 )
235 {
236 while (!list_empty(&pOutputQueue->SampleList))
237 {
238 IMediaSample **ppSamples;
239 LONG nSamples;
240 LONG nSamplesProcessed;
241 struct list *cursor, *cursor2;
242 int i = 0;
243
244 /* First Pass Process Samples */
245 i = list_count(&pOutputQueue->SampleList);
246 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
247 nSamples = 0;
248 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
249 {
250 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
251 if (qev->type == SAMPLE_PACKET)
252 ppSamples[nSamples++] = qev->pSample;
253 else
254 break;
255 list_remove(cursor);
256 HeapFree(GetProcessHeap(),0,qev);
257 }
258
259 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
260 {
261 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
262 LeaveCriticalSection(&pOutputQueue->csQueue);
263 IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
264 EnterCriticalSection(&pOutputQueue->csQueue);
265 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
266 }
267 for (i = 0; i < nSamples; i++)
268 IMediaSample_Release(ppSamples[i]);
269 HeapFree(GetProcessHeap(),0,ppSamples);
270
271 /* Process Non-Samples */
272 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
273 {
274 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
275 if (qev->type == EOS_PACKET)
276 {
277 IPin* ppin = NULL;
278 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
279 if (ppin)
280 {
281 IPin_EndOfStream(ppin);
282 IPin_Release(ppin);
283 }
284 }
285 else if (qev->type == SAMPLE_PACKET)
286 break;
287 else
288 FIXME("Unhandled Event type %i\n",qev->type);
289 list_remove(cursor);
290 HeapFree(GetProcessHeap(),0,qev);
291 }
292 }
293 pOutputQueue->bSendAnyway = FALSE;
294 }
295 LeaveCriticalSection(&pOutputQueue->csQueue);
296 WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
297 }
298 while (!pOutputQueue->bTerminate);
299 return S_OK;
300 }
301