1 /* 2 * Generic Implementation of COutputQueue 3 * 4 * Copyright 2011 Aric Stewart, CodeWeavers 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, write to the Free Software 18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 19 */ 20 21 #define COBJMACROS 22 23 #include "dshow.h" 24 #include "wine/debug.h" 25 #include "wine/unicode.h" 26 #include "wine/list.h" 27 #include "wine/strmbase.h" 28 #include "uuids.h" 29 #include "vfwmsgs.h" 30 #include <assert.h> 31 32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase); 33 34 enum {SAMPLE_PACKET, EOS_PACKET}; 35 36 typedef struct tagQueuedEvent { 37 int type; 38 struct list entry; 39 40 IMediaSample *pSample; 41 } QueuedEvent; 42 43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data) 44 { 45 OutputQueue *This = (OutputQueue *)data; 46 return This->pFuncsTable->pfnThreadProc(This); 47 } 48 49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue) 50 { 51 struct list *cursor, *cursor2; 52 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList) 53 { 54 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 55 list_remove(cursor); 56 HeapFree(GetProcessHeap(),0,qev); 57 } 58 } 59 60 HRESULT WINAPI OutputQueue_Construct( 61 BaseOutputPin *pInputPin, 62 BOOL bAuto, 63 BOOL bQueue, 64 LONG lBatchSize, 65 BOOL bBatchExact, 66 DWORD dwPriority, 67 const OutputQueueFuncTable* pFuncsTable, 68 OutputQueue **ppOutputQueue ) 69 70 { 71 BOOL threaded = FALSE; 72 DWORD tid; 73 74 OutputQueue *This; 75 76 if (!pInputPin || !pFuncsTable || !ppOutputQueue) 77 return E_INVALIDARG; 78 79 *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue)); 80 if (!*ppOutputQueue) 81 return E_OUTOFMEMORY; 82 83 This = *ppOutputQueue; 84 This->pFuncsTable = pFuncsTable; 85 This->lBatchSize = lBatchSize; 86 This->bBatchExact = bBatchExact; 87 InitializeCriticalSection(&This->csQueue); 88 This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue"); 89 list_init(&This->SampleList); 90 91 This->pInputPin = pInputPin; 92 IPin_AddRef(&pInputPin->pin.IPin_iface); 93 94 EnterCriticalSection(&This->csQueue); 95 if (bAuto && pInputPin->pMemInputPin) 96 threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin) == S_OK; 97 else 98 threaded = bQueue; 99 100 if (threaded) 101 { 102 This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid); 103 if (This->hThread) 104 { 105 SetThreadPriority(This->hThread, dwPriority); 106 This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL); 107 } 108 } 109 LeaveCriticalSection(&This->csQueue); 110 111 return S_OK; 112 } 113 114 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue) 115 { 116 EnterCriticalSection(&pOutputQueue->csQueue); 117 OutputQueue_FreeSamples(pOutputQueue); 118 pOutputQueue->bTerminate = TRUE; 119 SetEvent(pOutputQueue->hProcessQueue); 120 LeaveCriticalSection(&pOutputQueue->csQueue); 121 122 pOutputQueue->csQueue.DebugInfo->Spare[0] = 0; 123 DeleteCriticalSection(&pOutputQueue->csQueue); 124 CloseHandle(pOutputQueue->hProcessQueue); 125 126 IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface); 127 HeapFree(GetProcessHeap(),0,pOutputQueue); 128 return S_OK; 129 } 130 131 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed) 132 { 133 HRESULT hr = S_OK; 134 int i; 135 136 if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin) 137 return VFW_E_NOT_CONNECTED; 138 139 if (!pOutputQueue->hThread) 140 { 141 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin); 142 hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed); 143 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin); 144 } 145 else 146 { 147 EnterCriticalSection(&pOutputQueue->csQueue); 148 *nSamplesProcessed = 0; 149 150 for (i = 0; i < nSamples; i++) 151 { 152 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent)); 153 if (!qev) 154 { 155 ERR("Out of Memory\n"); 156 hr = E_OUTOFMEMORY; 157 break; 158 } 159 qev->type = SAMPLE_PACKET; 160 qev->pSample = ppSamples[i]; 161 IMediaSample_AddRef(ppSamples[i]); 162 list_add_tail(&pOutputQueue->SampleList, &qev->entry); 163 (*nSamplesProcessed)++; 164 } 165 166 if (!pOutputQueue->bBatchExact || list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize) 167 SetEvent(pOutputQueue->hProcessQueue); 168 LeaveCriticalSection(&pOutputQueue->csQueue); 169 } 170 return hr; 171 } 172 173 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample) 174 { 175 LONG processed; 176 return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed); 177 } 178 179 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue) 180 { 181 if (pOutputQueue->hThread) 182 { 183 EnterCriticalSection(&pOutputQueue->csQueue); 184 if (!list_empty(&pOutputQueue->SampleList)) 185 { 186 pOutputQueue->bSendAnyway = TRUE; 187 SetEvent(pOutputQueue->hProcessQueue); 188 } 189 LeaveCriticalSection(&pOutputQueue->csQueue); 190 } 191 } 192 193 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue) 194 { 195 EnterCriticalSection(&pOutputQueue->csQueue); 196 if (pOutputQueue->hThread) 197 { 198 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent)); 199 if (!qev) 200 { 201 ERR("Out of Memory\n"); 202 LeaveCriticalSection(&pOutputQueue->csQueue); 203 return; 204 } 205 qev->type = EOS_PACKET; 206 qev->pSample = NULL; 207 list_add_tail(&pOutputQueue->SampleList, &qev->entry); 208 } 209 else 210 { 211 IPin* ppin = NULL; 212 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin); 213 if (ppin) 214 { 215 IPin_EndOfStream(ppin); 216 IPin_Release(ppin); 217 } 218 } 219 LeaveCriticalSection(&pOutputQueue->csQueue); 220 /* Covers sending the Event to the worker Thread */ 221 OutputQueue_SendAnyway(pOutputQueue); 222 } 223 224 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue) 225 { 226 do 227 { 228 EnterCriticalSection(&pOutputQueue->csQueue); 229 if (!list_empty(&pOutputQueue->SampleList) && 230 (!pOutputQueue->bBatchExact || 231 list_count(&pOutputQueue->SampleList) >= pOutputQueue->lBatchSize || 232 pOutputQueue->bSendAnyway 233 ) 234 ) 235 { 236 while (!list_empty(&pOutputQueue->SampleList)) 237 { 238 IMediaSample **ppSamples; 239 LONG nSamples; 240 LONG nSamplesProcessed; 241 struct list *cursor, *cursor2; 242 int i = 0; 243 244 /* First Pass Process Samples */ 245 i = list_count(&pOutputQueue->SampleList); 246 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i); 247 nSamples = 0; 248 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList) 249 { 250 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 251 if (qev->type == SAMPLE_PACKET) 252 ppSamples[nSamples++] = qev->pSample; 253 else 254 break; 255 list_remove(cursor); 256 HeapFree(GetProcessHeap(),0,qev); 257 } 258 259 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin) 260 { 261 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin); 262 LeaveCriticalSection(&pOutputQueue->csQueue); 263 IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed); 264 EnterCriticalSection(&pOutputQueue->csQueue); 265 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin); 266 } 267 for (i = 0; i < nSamples; i++) 268 IMediaSample_Release(ppSamples[i]); 269 HeapFree(GetProcessHeap(),0,ppSamples); 270 271 /* Process Non-Samples */ 272 LIST_FOR_EACH_SAFE(cursor, cursor2, &pOutputQueue->SampleList) 273 { 274 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 275 if (qev->type == EOS_PACKET) 276 { 277 IPin* ppin = NULL; 278 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin); 279 if (ppin) 280 { 281 IPin_EndOfStream(ppin); 282 IPin_Release(ppin); 283 } 284 } 285 else if (qev->type == SAMPLE_PACKET) 286 break; 287 else 288 FIXME("Unhandled Event type %i\n",qev->type); 289 list_remove(cursor); 290 HeapFree(GetProcessHeap(),0,qev); 291 } 292 } 293 pOutputQueue->bSendAnyway = FALSE; 294 } 295 LeaveCriticalSection(&pOutputQueue->csQueue); 296 WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE); 297 } 298 while (!pOutputQueue->bTerminate); 299 return S_OK; 300 } 301