1 /* 2 * Generic Implementation of COutputQueue 3 * 4 * Copyright 2011 Aric Stewart, CodeWeavers 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, write to the Free Software 18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 19 */ 20 21 #include "strmbase_private.h" 22 23 #include <wine/list.h> 24 25 enum {SAMPLE_PACKET, EOS_PACKET}; 26 27 typedef struct tagQueuedEvent { 28 int type; 29 struct list entry; 30 31 IMediaSample *pSample; 32 } QueuedEvent; 33 34 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data) 35 { 36 OutputQueue *This = (OutputQueue *)data; 37 return This->pFuncsTable->pfnThreadProc(This); 38 } 39 40 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue) 41 { 42 struct list *cursor, *cursor2; 43 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList) 44 { 45 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 46 list_remove(cursor); 47 HeapFree(GetProcessHeap(),0,qev); 48 } 49 } 50 51 HRESULT WINAPI OutputQueue_Construct( 52 BaseOutputPin *pInputPin, 53 BOOL bAuto, 54 BOOL bQueue, 55 LONG lBatchSize, 56 BOOL bBatchExact, 57 DWORD dwPriority, 58 const OutputQueueFuncTable* pFuncsTable, 59 OutputQueue **ppOutputQueue ) 60 61 { 62 BOOL threaded = FALSE; 63 DWORD tid; 64 65 OutputQueue *This; 66 67 if (!pInputPin || !pFuncsTable || !ppOutputQueue) 68 return E_INVALIDARG; 69 70 *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue)); 71 if (!*ppOutputQueue) 72 return E_OUTOFMEMORY; 73 74 This = *ppOutputQueue; 75 This->pFuncsTable = pFuncsTable; 76 This->lBatchSize = lBatchSize; 77 This->bBatchExact = bBatchExact; 78 InitializeCriticalSection(&This->csQueue); 79 This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue"); 80 This->SampleList = HeapAlloc(GetProcessHeap(),0,sizeof(struct list)); 81 if (!This->SampleList) 82 { 83 OutputQueue_Destroy(This); 84 *ppOutputQueue = NULL; 85 return E_OUTOFMEMORY; 86 } 87 list_init(This->SampleList); 88 89 This->pInputPin = pInputPin; 90 IPin_AddRef(&pInputPin->pin.IPin_iface); 91 92 EnterCriticalSection(&This->csQueue); 93 if (bAuto && pInputPin->pMemInputPin) 94 threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin) == S_OK; 95 else 96 threaded = bQueue; 97 98 if (threaded) 99 { 100 This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid); 101 if (This->hThread) 102 { 103 SetThreadPriority(This->hThread, dwPriority); 104 This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL); 105 } 106 } 107 LeaveCriticalSection(&This->csQueue); 108 109 return S_OK; 110 } 111 112 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue) 113 { 114 EnterCriticalSection(&pOutputQueue->csQueue); 115 OutputQueue_FreeSamples(pOutputQueue); 116 pOutputQueue->bTerminate = TRUE; 117 SetEvent(pOutputQueue->hProcessQueue); 118 LeaveCriticalSection(&pOutputQueue->csQueue); 119 120 pOutputQueue->csQueue.DebugInfo->Spare[0] = 0; 121 DeleteCriticalSection(&pOutputQueue->csQueue); 122 CloseHandle(pOutputQueue->hProcessQueue); 123 124 HeapFree(GetProcessHeap(),0,pOutputQueue->SampleList); 125 126 IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface); 127 HeapFree(GetProcessHeap(),0,pOutputQueue); 128 return S_OK; 129 } 130 131 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed) 132 { 133 HRESULT hr = S_OK; 134 int i; 135 136 if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin) 137 return VFW_E_NOT_CONNECTED; 138 139 if (!pOutputQueue->hThread) 140 { 141 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin); 142 hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed); 143 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin); 144 } 145 else 146 { 147 EnterCriticalSection(&pOutputQueue->csQueue); 148 *nSamplesProcessed = 0; 149 150 for (i = 0; i < nSamples; i++) 151 { 152 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent)); 153 if (!qev) 154 { 155 ERR("Out of Memory\n"); 156 hr = E_OUTOFMEMORY; 157 break; 158 } 159 qev->type = SAMPLE_PACKET; 160 qev->pSample = ppSamples[i]; 161 IMediaSample_AddRef(ppSamples[i]); 162 list_add_tail(pOutputQueue->SampleList, &qev->entry); 163 (*nSamplesProcessed)++; 164 } 165 166 if (!pOutputQueue->bBatchExact || list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize) 167 SetEvent(pOutputQueue->hProcessQueue); 168 LeaveCriticalSection(&pOutputQueue->csQueue); 169 } 170 return hr; 171 } 172 173 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample) 174 { 175 LONG processed; 176 return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed); 177 } 178 179 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue) 180 { 181 if (pOutputQueue->hThread) 182 { 183 EnterCriticalSection(&pOutputQueue->csQueue); 184 if (!list_empty(pOutputQueue->SampleList)) 185 { 186 pOutputQueue->bSendAnyway = TRUE; 187 SetEvent(pOutputQueue->hProcessQueue); 188 } 189 LeaveCriticalSection(&pOutputQueue->csQueue); 190 } 191 } 192 193 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue) 194 { 195 EnterCriticalSection(&pOutputQueue->csQueue); 196 if (pOutputQueue->hThread) 197 { 198 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent)); 199 if (!qev) 200 { 201 ERR("Out of Memory\n"); 202 LeaveCriticalSection(&pOutputQueue->csQueue); 203 return; 204 } 205 qev->type = EOS_PACKET; 206 qev->pSample = NULL; 207 list_add_tail(pOutputQueue->SampleList, &qev->entry); 208 } 209 else 210 { 211 IPin* ppin = NULL; 212 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin); 213 if (ppin) 214 { 215 IPin_EndOfStream(ppin); 216 IPin_Release(ppin); 217 } 218 } 219 LeaveCriticalSection(&pOutputQueue->csQueue); 220 /* Covers sending the Event to the worker Thread */ 221 OutputQueue_SendAnyway(pOutputQueue); 222 } 223 224 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue) 225 { 226 do 227 { 228 EnterCriticalSection(&pOutputQueue->csQueue); 229 if (!list_empty(pOutputQueue->SampleList) && 230 (!pOutputQueue->bBatchExact || 231 list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize || 232 pOutputQueue->bSendAnyway 233 ) 234 ) 235 { 236 while (!list_empty(pOutputQueue->SampleList)) 237 { 238 IMediaSample **ppSamples; 239 LONG nSamples; 240 LONG nSamplesProcessed; 241 struct list *cursor, *cursor2; 242 int i = 0; 243 244 /* First Pass Process Samples */ 245 i = list_count(pOutputQueue->SampleList); 246 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i); 247 nSamples = 0; 248 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList) 249 { 250 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 251 if (qev->type == SAMPLE_PACKET) 252 ppSamples[nSamples++] = qev->pSample; 253 else 254 break; 255 list_remove(cursor); 256 HeapFree(GetProcessHeap(),0,qev); 257 } 258 259 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin) 260 { 261 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin); 262 LeaveCriticalSection(&pOutputQueue->csQueue); 263 IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed); 264 EnterCriticalSection(&pOutputQueue->csQueue); 265 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin); 266 } 267 for (i = 0; i < nSamples; i++) 268 IMediaSample_Release(ppSamples[i]); 269 HeapFree(GetProcessHeap(),0,ppSamples); 270 271 /* Process Non-Samples */ 272 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList) 273 { 274 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry); 275 if (qev->type == EOS_PACKET) 276 { 277 IPin* ppin = NULL; 278 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin); 279 if (ppin) 280 { 281 IPin_EndOfStream(ppin); 282 IPin_Release(ppin); 283 } 284 } 285 else if (qev->type == SAMPLE_PACKET) 286 break; 287 else 288 FIXME("Unhandled Event type %i\n",qev->type); 289 list_remove(cursor); 290 HeapFree(GetProcessHeap(),0,qev); 291 } 292 } 293 pOutputQueue->bSendAnyway = FALSE; 294 } 295 LeaveCriticalSection(&pOutputQueue->csQueue); 296 WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE); 297 } 298 while (!pOutputQueue->bTerminate); 299 return S_OK; 300 } 301