1 /*
2  * Generic Implementation of COutputQueue
3  *
4  * Copyright 2011 Aric Stewart, CodeWeavers
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20 
21 #define COBJMACROS
22 
23 #include "dshow.h"
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
28 #include "uuids.h"
29 #include "vfwmsgs.h"
30 #include <assert.h>
31 
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
33 
34 enum {SAMPLE_PACKET, EOS_PACKET};
35 
36 typedef struct tagQueuedEvent {
37     int type;
38     struct list entry;
39 
40     IMediaSample *pSample;
41 } QueuedEvent;
42 
43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
44 {
45     OutputQueue *This = (OutputQueue *)data;
46     return This->pFuncsTable->pfnThreadProc(This);
47 }
48 
49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
50 {
51     struct list *cursor, *cursor2;
52     LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
53     {
54         QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
55         list_remove(cursor);
56         HeapFree(GetProcessHeap(),0,qev);
57     }
58 }
59 
60 HRESULT WINAPI OutputQueue_Construct(
61     BaseOutputPin *pInputPin,
62     BOOL bAuto,
63     BOOL bQueue,
64     LONG lBatchSize,
65     BOOL bBatchExact,
66     DWORD dwPriority,
67     const OutputQueueFuncTable* pFuncsTable,
68     OutputQueue **ppOutputQueue )
69 
70 {
71     BOOL threaded = FALSE;
72     DWORD tid;
73 
74     OutputQueue *This;
75 
76     if (!pInputPin || !pFuncsTable || !ppOutputQueue)
77         return E_INVALIDARG;
78 
79     *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
80     if (!*ppOutputQueue)
81         return E_OUTOFMEMORY;
82 
83     This = *ppOutputQueue;
84     This->pFuncsTable = pFuncsTable;
85     This->lBatchSize = lBatchSize;
86     This->bBatchExact = bBatchExact;
87     InitializeCriticalSection(&This->csQueue);
88     This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
89     list_init(&This->SampleList);
90 
91     This->pInputPin = pInputPin;
92     IPin_AddRef(&pInputPin->pin.IPin_iface);
93 
94     EnterCriticalSection(&This->csQueue);
95     if (bAuto && pInputPin->pMemInputPin)
96         threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin) == S_OK;
97     else
98         threaded = bQueue;
99 
100     if (threaded)
101     {
102         This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
103         if (This->hThread)
104         {
105             SetThreadPriority(This->hThread, dwPriority);
106             This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
107         }
108     }
109     LeaveCriticalSection(&This->csQueue);
110 
111     return S_OK;
112 }
113 
114 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
115 {
116     EnterCriticalSection(&pOutputQueue->csQueue);
117     OutputQueue_FreeSamples(pOutputQueue);
118     pOutputQueue->bTerminate = TRUE;
119     SetEvent(pOutputQueue->hProcessQueue);
120     LeaveCriticalSection(&pOutputQueue->csQueue);
121 
122     pOutputQueue->csQueue.DebugInfo->Spare[0] = 0;
123     DeleteCriticalSection(&pOutputQueue->csQueue);
124     CloseHandle(pOutputQueue->hProcessQueue);
125 
126     IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface);
127     HeapFree(GetProcessHeap(),0,pOutputQueue);
128     return S_OK;
129 }
130 
131 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
132 {
133     HRESULT hr = S_OK;
134     int i;
135 
136     if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
137         return VFW_E_NOT_CONNECTED;
138 
139     if (!pOutputQueue->hThread)
140     {
141         IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
142         hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
143         IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
144     }
145     else
146     {
147         EnterCriticalSection(&pOutputQueue->csQueue);
148         *nSamplesProcessed = 0;
149 
150         for (i = 0; i < nSamples; i++)
151         {
152             QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
153             if (!qev)
154             {
155                 ERR("Out of Memory\n");
156                 hr = E_OUTOFMEMORY;
157                 break;
158             }
159             qev->type = SAMPLE_PACKET;
160             qev->pSample = ppSamples[i];
161             IMediaSample_AddRef(ppSamples[i]);
162             list_add_tail(&pOutputQueue->SampleList, &qev->entry);
163             (*nSamplesProcessed)++;
164         }
165 
166         if (!pOutputQueue->bBatchExact || list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
167             SetEvent(pOutputQueue->hProcessQueue);
168         LeaveCriticalSection(&pOutputQueue->csQueue);
169     }
170     return hr;
171 }
172 
173 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
174 {
175     LONG processed;
176     return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
177 }
178 
179 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
180 {
181     if (pOutputQueue->hThread)
182     {
183         EnterCriticalSection(&pOutputQueue->csQueue);
184         if (!list_empty(&pOutputQueue->SampleList))
185         {
186             pOutputQueue->bSendAnyway = TRUE;
187             SetEvent(pOutputQueue->hProcessQueue);
188         }
189         LeaveCriticalSection(&pOutputQueue->csQueue);
190     }
191 }
192 
193 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
194 {
195     EnterCriticalSection(&pOutputQueue->csQueue);
196     if (pOutputQueue->hThread)
197     {
198         QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
199         if (!qev)
200         {
201             ERR("Out of Memory\n");
202             LeaveCriticalSection(&pOutputQueue->csQueue);
203             return;
204         }
205         qev->type = EOS_PACKET;
206         qev->pSample = NULL;
207         list_add_tail(&pOutputQueue->SampleList, &qev->entry);
208     }
209     else
210     {
211         IPin* ppin = NULL;
212         IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
213         if (ppin)
214         {
215             IPin_EndOfStream(ppin);
216             IPin_Release(ppin);
217         }
218     }
219     LeaveCriticalSection(&pOutputQueue->csQueue);
220     /* Covers sending the Event to the worker Thread */
221     OutputQueue_SendAnyway(pOutputQueue);
222 }
223 
224 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
225 {
226     do
227     {
228         EnterCriticalSection(&pOutputQueue->csQueue);
229         if (!list_empty(&pOutputQueue->SampleList) &&
230             (!pOutputQueue->bBatchExact ||
231             list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
232             pOutputQueue->bSendAnyway
233             )
234            )
235         {
236             while (!list_empty(&pOutputQueue->SampleList))
237             {
238                 IMediaSample **ppSamples;
239                 LONG nSamples;
240                 LONG nSamplesProcessed;
241                 struct list *cursor, *cursor2;
242                 int i = 0;
243 
244                 /* First Pass Process Samples */
245                 i = list_count(&pOutputQueue->SampleList);
246                 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
247                 nSamples = 0;
248                 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
249                 {
250                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
251                     if (qev->type == SAMPLE_PACKET)
252                         ppSamples[nSamples++] = qev->pSample;
253                     else
254                         break;
255                     list_remove(cursor);
256                     HeapFree(GetProcessHeap(),0,qev);
257                 }
258 
259                 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
260                 {
261                     IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
262                     LeaveCriticalSection(&pOutputQueue->csQueue);
263                     IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
264                     EnterCriticalSection(&pOutputQueue->csQueue);
265                     IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
266                 }
267                 for (i = 0; i < nSamples; i++)
268                     IMediaSample_Release(ppSamples[i]);
269                 HeapFree(GetProcessHeap(),0,ppSamples);
270 
271                 /* Process Non-Samples */
272                 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList)
273                 {
274                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
275                     if (qev->type == EOS_PACKET)
276                     {
277                         IPin* ppin = NULL;
278                         IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
279                         if (ppin)
280                         {
281                             IPin_EndOfStream(ppin);
282                             IPin_Release(ppin);
283                         }
284                     }
285                     else if (qev->type == SAMPLE_PACKET)
286                         break;
287                     else
288                         FIXME("Unhandled Event type %i\n",qev->type);
289                     list_remove(cursor);
290                     HeapFree(GetProcessHeap(),0,qev);
291                 }
292             }
293             pOutputQueue->bSendAnyway = FALSE;
294         }
295         LeaveCriticalSection(&pOutputQueue->csQueue);
296         WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
297     }
298     while (!pOutputQueue->bTerminate);
299     return S_OK;
300 }
301