xref: /reactos/dll/directx/wine/quartz/pin.c (revision 64daf542)
1 /*
2  * Generic Implementation of IPin Interface
3  *
4  * Copyright 2003 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20 
21 #include "quartz_private.h"
22 
23 #define ALIGNDOWN(value,boundary) ((value)/(boundary)*(boundary))
24 #define ALIGNUP(value,boundary) (ALIGNDOWN((value)+(boundary)-1, (boundary)))
25 
26 typedef HRESULT (*SendPinFunc)( IPin *to, LPVOID arg );
27 
28 /** Helper function, there are a lot of places where the error code is inherited
29  * The following rules apply:
30  *
31  * Return the first received error code (E_NOTIMPL is ignored)
32  * If no errors occur: return the first received non-error-code that isn't S_OK
33  */
34 static HRESULT updatehres( HRESULT original, HRESULT new )
35 {
36     if (FAILED( original ) || new == E_NOTIMPL)
37         return original;
38 
39     if (FAILED( new ) || original == S_OK)
40         return new;
41 
42     return original;
43 }
44 
45 /** Sends a message from a pin further to other, similar pins
46  * fnMiddle is called on each pin found further on the stream.
47  * fnEnd (can be NULL) is called when the message can't be sent any further (this is a renderer or source)
48  *
49  * If the pin given is an input pin, the message will be sent downstream to other input pins
50  * If the pin given is an output pin, the message will be sent upstream to other output pins
51  */
52 static HRESULT SendFurther( IPin *from, SendPinFunc fnMiddle, LPVOID arg, SendPinFunc fnEnd )
53 {
54     PIN_INFO pin_info;
55     ULONG amount = 0;
56     HRESULT hr = S_OK;
57     HRESULT hr_return = S_OK;
58     IEnumPins *enumpins = NULL;
59     BOOL foundend = TRUE;
60     PIN_DIRECTION from_dir;
61 
62     IPin_QueryDirection( from, &from_dir );
63 
64     hr = IPin_QueryInternalConnections( from, NULL, &amount );
65     if (hr != E_NOTIMPL && amount)
66         FIXME("Use QueryInternalConnections!\n");
67 
68     pin_info.pFilter = NULL;
69     hr = IPin_QueryPinInfo( from, &pin_info );
70     if (FAILED(hr))
71         goto out;
72 
73     hr = IBaseFilter_EnumPins( pin_info.pFilter, &enumpins );
74     if (FAILED(hr))
75         goto out;
76 
77     hr = IEnumPins_Reset( enumpins );
78     while (hr == S_OK) {
79         IPin *pin = NULL;
80         hr = IEnumPins_Next( enumpins, 1, &pin, NULL );
81         if (hr == VFW_E_ENUM_OUT_OF_SYNC)
82         {
83             hr = IEnumPins_Reset( enumpins );
84             continue;
85         }
86         if (pin)
87         {
88             PIN_DIRECTION dir;
89 
90             IPin_QueryDirection( pin, &dir );
91             if (dir != from_dir)
92             {
93                 IPin *connected = NULL;
94 
95                 foundend = FALSE;
96                 IPin_ConnectedTo( pin, &connected );
97                 if (connected)
98                 {
99                     HRESULT hr_local;
100 
101                     hr_local = fnMiddle( connected, arg );
102                     hr_return = updatehres( hr_return, hr_local );
103                     IPin_Release(connected);
104                 }
105             }
106             IPin_Release( pin );
107         }
108         else
109         {
110             hr = S_OK;
111             break;
112         }
113     }
114 
115     if (!foundend)
116         hr = hr_return;
117     else if (fnEnd) {
118         HRESULT hr_local;
119 
120         hr_local = fnEnd( from, arg );
121         hr_return = updatehres( hr_return, hr_local );
122     }
123 
124 out:
125     if (enumpins)
126         IEnumPins_Release( enumpins );
127     if (pin_info.pFilter)
128         IBaseFilter_Release( pin_info.pFilter );
129     return hr;
130 }
131 
132 
133 static void Copy_PinInfo(PIN_INFO * pDest, const PIN_INFO * pSrc)
134 {
135     /* Tempting to just do a memcpy, but the name field is
136        128 characters long! We will probably never exceed 10
137        most of the time, so we are better off copying
138        each field manually */
139     strcpyW(pDest->achName, pSrc->achName);
140     pDest->dir = pSrc->dir;
141     pDest->pFilter = pSrc->pFilter;
142 }
143 
144 static HRESULT deliver_endofstream(IPin* pin, LPVOID unused)
145 {
146     return IPin_EndOfStream( pin );
147 }
148 
149 static HRESULT deliver_beginflush(IPin* pin, LPVOID unused)
150 {
151     return IPin_BeginFlush( pin );
152 }
153 
154 static HRESULT deliver_endflush(IPin* pin, LPVOID unused)
155 {
156     return IPin_EndFlush( pin );
157 }
158 
159 typedef struct newsegmentargs
160 {
161     REFERENCE_TIME tStart, tStop;
162     double rate;
163 } newsegmentargs;
164 
165 static HRESULT deliver_newsegment(IPin *pin, LPVOID data)
166 {
167     newsegmentargs *args = data;
168     return IPin_NewSegment(pin, args->tStart, args->tStop, args->rate);
169 }
170 
171 /*** PullPin implementation ***/
172 
173 static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData,
174                             QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl)
175 {
176     /* Common attributes */
177     pPinImpl->pin.IPin_iface.lpVtbl = PullPin_Vtbl;
178     pPinImpl->pin.refCount = 1;
179     pPinImpl->pin.pConnectedTo = NULL;
180     pPinImpl->pin.pCritSec = pCritSec;
181     Copy_PinInfo(&pPinImpl->pin.pinInfo, pPinInfo);
182     ZeroMemory(&pPinImpl->pin.mtCurrent, sizeof(AM_MEDIA_TYPE));
183 
184     /* Input pin attributes */
185     pPinImpl->pUserData = pUserData;
186     pPinImpl->fnQueryAccept = pQueryAccept;
187     pPinImpl->fnSampleProc = pSampleProc;
188     pPinImpl->fnCleanProc = pCleanUp;
189     pPinImpl->fnDone = pDone;
190     pPinImpl->fnPreConnect = NULL;
191     pPinImpl->pAlloc = NULL;
192     pPinImpl->prefAlloc = NULL;
193     pPinImpl->pReader = NULL;
194     pPinImpl->hThread = NULL;
195     pPinImpl->hEventStateChanged = CreateEventW(NULL, TRUE, TRUE, NULL);
196     pPinImpl->thread_sleepy = CreateEventW(NULL, FALSE, FALSE, NULL);
197 
198     pPinImpl->rtStart = 0;
199     pPinImpl->rtCurrent = 0;
200     pPinImpl->rtStop = ((LONGLONG)0x7fffffff << 32) | 0xffffffff;
201     pPinImpl->dRate = 1.0;
202     pPinImpl->state = Req_Die;
203     pPinImpl->fnCustomRequest = pCustomRequest;
204     pPinImpl->stop_playback = TRUE;
205 
206     InitializeCriticalSection(&pPinImpl->thread_lock);
207     pPinImpl->thread_lock.DebugInfo->Spare[0] = (DWORD_PTR)( __FILE__ ": PullPin.thread_lock");
208 
209     return S_OK;
210 }
211 
212 HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo,
213                           SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept,
214                           CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone,
215                           LPCRITICAL_SECTION pCritSec, IPin ** ppPin)
216 {
217     PullPin * pPinImpl;
218 
219     *ppPin = NULL;
220 
221     if (pPinInfo->dir != PINDIR_INPUT)
222     {
223         ERR("Pin direction(%x) != PINDIR_INPUT\n", pPinInfo->dir);
224         return E_INVALIDARG;
225     }
226 
227     pPinImpl = CoTaskMemAlloc(sizeof(*pPinImpl));
228 
229     if (!pPinImpl)
230         return E_OUTOFMEMORY;
231 
232     if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pDone, pCritSec, pPinImpl)))
233     {
234         *ppPin = &pPinImpl->pin.IPin_iface;
235         return S_OK;
236     }
237 
238     CoTaskMemFree(pPinImpl);
239     return E_FAIL;
240 }
241 
242 static HRESULT PullPin_InitProcessing(PullPin * This);
243 
244 HRESULT WINAPI PullPin_ReceiveConnection(IPin * iface, IPin * pReceivePin, const AM_MEDIA_TYPE * pmt)
245 {
246     PIN_DIRECTION pindirReceive;
247     HRESULT hr = S_OK;
248     PullPin *This = impl_PullPin_from_IPin(iface);
249 
250     TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
251     dump_AM_MEDIA_TYPE(pmt);
252 
253     EnterCriticalSection(This->pin.pCritSec);
254     if (!This->pin.pConnectedTo)
255     {
256         ALLOCATOR_PROPERTIES props;
257 
258         props.cBuffers = 3;
259         props.cbBuffer = 64 * 1024; /* 64 KB */
260         props.cbAlign = 1;
261         props.cbPrefix = 0;
262 
263         if (This->fnQueryAccept(This->pUserData, pmt) != S_OK)
264             hr = VFW_E_TYPE_NOT_ACCEPTED; /* FIXME: shouldn't we just map common errors onto
265                                            * VFW_E_TYPE_NOT_ACCEPTED and pass the value on otherwise? */
266 
267         if (SUCCEEDED(hr))
268         {
269             IPin_QueryDirection(pReceivePin, &pindirReceive);
270 
271             if (pindirReceive != PINDIR_OUTPUT)
272             {
273                 ERR("Can't connect from non-output pin\n");
274                 hr = VFW_E_INVALID_DIRECTION;
275             }
276         }
277 
278         This->pReader = NULL;
279         This->pAlloc = NULL;
280         This->prefAlloc = NULL;
281         if (SUCCEEDED(hr))
282         {
283             hr = IPin_QueryInterface(pReceivePin, &IID_IAsyncReader, (LPVOID *)&This->pReader);
284         }
285 
286         if (SUCCEEDED(hr) && This->fnPreConnect)
287         {
288             hr = This->fnPreConnect(iface, pReceivePin, &props);
289         }
290 
291         /*
292          * Some custom filters (such as the one used by Fallout 3
293          * and Fallout: New Vegas) expect to be passed a non-NULL
294          * preferred allocator.
295          */
296         if (SUCCEEDED(hr))
297         {
298             hr = StdMemAllocator_create(NULL, (LPVOID *) &This->prefAlloc);
299         }
300 
301         if (SUCCEEDED(hr))
302         {
303             hr = IAsyncReader_RequestAllocator(This->pReader, This->prefAlloc, &props, &This->pAlloc);
304         }
305 
306         if (SUCCEEDED(hr))
307         {
308             CopyMediaType(&This->pin.mtCurrent, pmt);
309             This->pin.pConnectedTo = pReceivePin;
310             IPin_AddRef(pReceivePin);
311             hr = IMemAllocator_Commit(This->pAlloc);
312         }
313 
314         if (SUCCEEDED(hr))
315             hr = PullPin_InitProcessing(This);
316 
317         if (FAILED(hr))
318         {
319              if (This->pReader)
320                  IAsyncReader_Release(This->pReader);
321              This->pReader = NULL;
322              if (This->prefAlloc)
323                  IMemAllocator_Release(This->prefAlloc);
324              This->prefAlloc = NULL;
325              if (This->pAlloc)
326                  IMemAllocator_Release(This->pAlloc);
327              This->pAlloc = NULL;
328         }
329     }
330     else
331         hr = VFW_E_ALREADY_CONNECTED;
332     LeaveCriticalSection(This->pin.pCritSec);
333     return hr;
334 }
335 
336 HRESULT WINAPI PullPin_QueryInterface(IPin * iface, REFIID riid, LPVOID * ppv)
337 {
338     PullPin *This = impl_PullPin_from_IPin(iface);
339 
340     TRACE("(%p/%p)->(%s, %p)\n", This, iface, qzdebugstr_guid(riid), ppv);
341 
342     *ppv = NULL;
343 
344     if (IsEqualIID(riid, &IID_IUnknown))
345         *ppv = iface;
346     else if (IsEqualIID(riid, &IID_IPin))
347         *ppv = iface;
348     else if (IsEqualIID(riid, &IID_IMediaSeeking) ||
349              IsEqualIID(riid, &IID_IQualityControl))
350     {
351         return IBaseFilter_QueryInterface(This->pin.pinInfo.pFilter, riid, ppv);
352     }
353 
354     if (*ppv)
355     {
356         IUnknown_AddRef((IUnknown *)(*ppv));
357         return S_OK;
358     }
359 
360     FIXME("No interface for %s!\n", qzdebugstr_guid(riid));
361 
362     return E_NOINTERFACE;
363 }
364 
365 ULONG WINAPI PullPin_Release(IPin *iface)
366 {
367     PullPin *This = impl_PullPin_from_IPin(iface);
368     ULONG refCount = InterlockedDecrement(&This->pin.refCount);
369 
370     TRACE("(%p)->() Release from %d\n", This, refCount + 1);
371 
372     if (!refCount)
373     {
374         WaitForSingleObject(This->hEventStateChanged, INFINITE);
375         assert(!This->hThread);
376 
377         if(This->prefAlloc)
378             IMemAllocator_Release(This->prefAlloc);
379         if(This->pAlloc)
380             IMemAllocator_Release(This->pAlloc);
381         if(This->pReader)
382             IAsyncReader_Release(This->pReader);
383         CloseHandle(This->thread_sleepy);
384         CloseHandle(This->hEventStateChanged);
385         This->thread_lock.DebugInfo->Spare[0] = 0;
386         DeleteCriticalSection(&This->thread_lock);
387         CoTaskMemFree(This);
388         return 0;
389     }
390     return refCount;
391 }
392 
393 static void PullPin_Flush(PullPin *This)
394 {
395     IMediaSample *pSample;
396     TRACE("Flushing!\n");
397 
398     if (This->pReader)
399     {
400         /* Do not allow state to change while flushing */
401         EnterCriticalSection(This->pin.pCritSec);
402 
403         /* Flush outstanding samples */
404         IAsyncReader_BeginFlush(This->pReader);
405 
406         for (;;)
407         {
408             DWORD_PTR dwUser;
409 
410             pSample = NULL;
411             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
412 
413             if (!pSample)
414                 break;
415 
416             assert(!IMediaSample_GetActualDataLength(pSample));
417 
418             IMediaSample_Release(pSample);
419         }
420 
421         IAsyncReader_EndFlush(This->pReader);
422 
423         LeaveCriticalSection(This->pin.pCritSec);
424     }
425 }
426 
427 static void PullPin_Thread_Process(PullPin *This)
428 {
429     HRESULT hr;
430     IMediaSample * pSample = NULL;
431     ALLOCATOR_PROPERTIES allocProps;
432 
433     hr = IMemAllocator_GetProperties(This->pAlloc, &allocProps);
434 
435     This->cbAlign = allocProps.cbAlign;
436 
437     if (This->rtCurrent < This->rtStart)
438         This->rtCurrent = MEDIATIME_FROM_BYTES(ALIGNDOWN(BYTES_FROM_MEDIATIME(This->rtStart), This->cbAlign));
439 
440     TRACE("Start\n");
441 
442     if (This->rtCurrent >= This->rtStop)
443     {
444         IPin_EndOfStream(&This->pin.IPin_iface);
445         return;
446     }
447 
448     /* There is no sample in our buffer */
449     hr = This->fnCustomRequest(This->pUserData);
450 
451     if (FAILED(hr))
452         ERR("Request error: %x\n", hr);
453 
454     EnterCriticalSection(This->pin.pCritSec);
455     SetEvent(This->hEventStateChanged);
456     LeaveCriticalSection(This->pin.pCritSec);
457 
458     if (SUCCEEDED(hr))
459     do
460     {
461         DWORD_PTR dwUser;
462 
463         TRACE("Process sample\n");
464 
465         pSample = NULL;
466         hr = IAsyncReader_WaitForNext(This->pReader, 10000, &pSample, &dwUser);
467 
468         /* Return an empty sample on error to the implementation in case it does custom parsing, so it knows it's gone */
469         if (SUCCEEDED(hr))
470         {
471             hr = This->fnSampleProc(This->pUserData, pSample, dwUser);
472         }
473         else
474         {
475             if (hr == VFW_E_TIMEOUT)
476             {
477                 if (pSample != NULL)
478                     WARN("Non-NULL sample returned with VFW_E_TIMEOUT.\n");
479                 hr = S_OK;
480             }
481             /* FIXME: Errors are not well handled yet! */
482             else
483                 ERR("Processing error: %x\n", hr);
484         }
485 
486         if (pSample)
487         {
488             IMediaSample_Release(pSample);
489             pSample = NULL;
490         }
491     } while (This->rtCurrent < This->rtStop && hr == S_OK && !This->stop_playback);
492 
493     /*
494      * Sample was rejected, and we are asked to terminate.  When there is more than one buffer
495      * it is possible for a filter to have several queued samples, making it necessary to
496      * release all of these pending samples.
497      */
498     if (This->stop_playback || FAILED(hr))
499     {
500         DWORD_PTR dwUser;
501 
502         do
503         {
504             if (pSample)
505                 IMediaSample_Release(pSample);
506             pSample = NULL;
507             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
508         } while(pSample);
509     }
510 
511     /* Can't reset state to Sleepy here because that might race, instead PauseProcessing will do that for us
512      * Flush remaining samples
513      */
514     if (This->fnDone)
515         This->fnDone(This->pUserData);
516 
517     TRACE("End: %08x, %d\n", hr, This->stop_playback);
518 }
519 
520 static void PullPin_Thread_Pause(PullPin *This)
521 {
522     PullPin_Flush(This);
523 
524     EnterCriticalSection(This->pin.pCritSec);
525     This->state = Req_Sleepy;
526     SetEvent(This->hEventStateChanged);
527     LeaveCriticalSection(This->pin.pCritSec);
528 }
529 
530 static void  PullPin_Thread_Stop(PullPin *This)
531 {
532     TRACE("(%p)->()\n", This);
533 
534     EnterCriticalSection(This->pin.pCritSec);
535     {
536         CloseHandle(This->hThread);
537         This->hThread = NULL;
538         SetEvent(This->hEventStateChanged);
539     }
540     LeaveCriticalSection(This->pin.pCritSec);
541 
542     IBaseFilter_Release(This->pin.pinInfo.pFilter);
543 
544     CoUninitialize();
545     ExitThread(0);
546 }
547 
548 static DWORD WINAPI PullPin_Thread_Main(LPVOID pv)
549 {
550     PullPin *This = pv;
551     CoInitializeEx(NULL, COINIT_MULTITHREADED);
552 
553     PullPin_Flush(This);
554 
555     for (;;)
556     {
557         WaitForSingleObject(This->thread_sleepy, INFINITE);
558 
559         TRACE("State: %d\n", This->state);
560 
561         switch (This->state)
562         {
563         case Req_Die: PullPin_Thread_Stop(This); break;
564         case Req_Run: PullPin_Thread_Process(This); break;
565         case Req_Pause: PullPin_Thread_Pause(This); break;
566         case Req_Sleepy: ERR("Should not be signalled with SLEEPY!\n"); break;
567         default: ERR("Unknown state request: %d\n", This->state); break;
568         }
569     }
570     return 0;
571 }
572 
573 static HRESULT PullPin_InitProcessing(PullPin * This)
574 {
575     HRESULT hr = S_OK;
576 
577     TRACE("(%p)->()\n", This);
578 
579     /* if we are connected */
580     if (This->pAlloc)
581     {
582         DWORD dwThreadId;
583 
584         WaitForSingleObject(This->hEventStateChanged, INFINITE);
585         EnterCriticalSection(This->pin.pCritSec);
586 
587         assert(!This->hThread);
588         assert(This->state == Req_Die);
589         assert(This->stop_playback);
590         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
591         This->state = Req_Sleepy;
592 
593         /* AddRef the filter to make sure it and its pins will be around
594          * as long as the thread */
595         IBaseFilter_AddRef(This->pin.pinInfo.pFilter);
596 
597 
598         This->hThread = CreateThread(NULL, 0, PullPin_Thread_Main, This, 0, &dwThreadId);
599         if (!This->hThread)
600         {
601             hr = HRESULT_FROM_WIN32(GetLastError());
602             IBaseFilter_Release(This->pin.pinInfo.pFilter);
603         }
604 
605         if (SUCCEEDED(hr))
606         {
607             SetEvent(This->hEventStateChanged);
608             /* If assert fails, that means a command was not processed before the thread previously terminated */
609         }
610         LeaveCriticalSection(This->pin.pCritSec);
611     }
612 
613     TRACE(" -- %x\n", hr);
614 
615     return hr;
616 }
617 
618 HRESULT PullPin_StartProcessing(PullPin * This)
619 {
620     /* if we are connected */
621     TRACE("(%p)->()\n", This);
622     if(This->pAlloc)
623     {
624         assert(This->hThread);
625 
626         PullPin_WaitForStateChange(This, INFINITE);
627 
628         assert(This->state == Req_Sleepy);
629 
630         /* Wake up! */
631         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
632         This->state = Req_Run;
633         This->stop_playback = FALSE;
634         ResetEvent(This->hEventStateChanged);
635         SetEvent(This->thread_sleepy);
636     }
637 
638     return S_OK;
639 }
640 
641 HRESULT PullPin_PauseProcessing(PullPin * This)
642 {
643     /* if we are connected */
644     TRACE("(%p)->()\n", This);
645     if(This->pAlloc)
646     {
647         assert(This->hThread);
648 
649         PullPin_WaitForStateChange(This, INFINITE);
650 
651         EnterCriticalSection(This->pin.pCritSec);
652 
653         assert(!This->stop_playback);
654         assert(This->state == Req_Run|| This->state == Req_Sleepy);
655 
656         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
657 
658         This->state = Req_Pause;
659         This->stop_playback = TRUE;
660         ResetEvent(This->hEventStateChanged);
661         SetEvent(This->thread_sleepy);
662 
663         /* Release any outstanding samples */
664         if (This->pReader)
665         {
666             IMediaSample *pSample;
667             DWORD_PTR dwUser;
668 
669             do
670             {
671                 pSample = NULL;
672                 IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
673                 if (pSample)
674                     IMediaSample_Release(pSample);
675             } while(pSample);
676         }
677 
678         LeaveCriticalSection(This->pin.pCritSec);
679     }
680 
681     return S_OK;
682 }
683 
684 static HRESULT PullPin_StopProcessing(PullPin * This)
685 {
686     TRACE("(%p)->()\n", This);
687 
688     /* if we are alive */
689     assert(This->hThread);
690 
691     PullPin_WaitForStateChange(This, INFINITE);
692 
693     assert(This->state == Req_Pause || This->state == Req_Sleepy);
694 
695     This->stop_playback = TRUE;
696     This->state = Req_Die;
697     assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
698     ResetEvent(This->hEventStateChanged);
699     SetEvent(This->thread_sleepy);
700     return S_OK;
701 }
702 
703 HRESULT PullPin_WaitForStateChange(PullPin * This, DWORD dwMilliseconds)
704 {
705     if (WaitForSingleObject(This->hEventStateChanged, dwMilliseconds) == WAIT_TIMEOUT)
706         return S_FALSE;
707     return S_OK;
708 }
709 
710 HRESULT WINAPI PullPin_QueryAccept(IPin * iface, const AM_MEDIA_TYPE * pmt)
711 {
712     PullPin *This = impl_PullPin_from_IPin(iface);
713 
714     TRACE("(%p/%p)->(%p)\n", This, iface, pmt);
715 
716     return (This->fnQueryAccept(This->pUserData, pmt) == S_OK ? S_OK : S_FALSE);
717 }
718 
719 HRESULT WINAPI PullPin_EndOfStream(IPin * iface)
720 {
721     PullPin *This = impl_PullPin_from_IPin(iface);
722     HRESULT hr = S_FALSE;
723 
724     TRACE("(%p)->()\n", iface);
725 
726     EnterCriticalSection(This->pin.pCritSec);
727     hr = SendFurther( iface, deliver_endofstream, NULL, NULL );
728     SetEvent(This->hEventStateChanged);
729     LeaveCriticalSection(This->pin.pCritSec);
730 
731     return hr;
732 }
733 
734 HRESULT WINAPI PullPin_BeginFlush(IPin * iface)
735 {
736     PullPin *This = impl_PullPin_from_IPin(iface);
737     TRACE("(%p)->()\n", This);
738 
739     EnterCriticalSection(This->pin.pCritSec);
740     {
741         SendFurther( iface, deliver_beginflush, NULL, NULL );
742     }
743     LeaveCriticalSection(This->pin.pCritSec);
744 
745     EnterCriticalSection(&This->thread_lock);
746     {
747         if (This->pReader)
748             IAsyncReader_BeginFlush(This->pReader);
749         PullPin_WaitForStateChange(This, INFINITE);
750 
751         if (This->hThread && This->state == Req_Run)
752         {
753             PullPin_PauseProcessing(This);
754             PullPin_WaitForStateChange(This, INFINITE);
755         }
756     }
757     LeaveCriticalSection(&This->thread_lock);
758 
759     EnterCriticalSection(This->pin.pCritSec);
760     {
761         This->fnCleanProc(This->pUserData);
762     }
763     LeaveCriticalSection(This->pin.pCritSec);
764 
765     return S_OK;
766 }
767 
768 HRESULT WINAPI PullPin_EndFlush(IPin * iface)
769 {
770     PullPin *This = impl_PullPin_from_IPin(iface);
771 
772     TRACE("(%p)->()\n", iface);
773 
774     /* Send further first: Else a race condition might terminate processing early */
775     EnterCriticalSection(This->pin.pCritSec);
776     SendFurther( iface, deliver_endflush, NULL, NULL );
777     LeaveCriticalSection(This->pin.pCritSec);
778 
779     EnterCriticalSection(&This->thread_lock);
780     {
781         FILTER_STATE state;
782 
783         if (This->pReader)
784             IAsyncReader_EndFlush(This->pReader);
785 
786         IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
787 
788         if (state != State_Stopped)
789             PullPin_StartProcessing(This);
790 
791         PullPin_WaitForStateChange(This, INFINITE);
792     }
793     LeaveCriticalSection(&This->thread_lock);
794 
795     return S_OK;
796 }
797 
798 HRESULT WINAPI PullPin_Disconnect(IPin *iface)
799 {
800     HRESULT hr;
801     PullPin *This = impl_PullPin_from_IPin(iface);
802 
803     TRACE("()\n");
804 
805     EnterCriticalSection(This->pin.pCritSec);
806     {
807         if (FAILED(hr = IMemAllocator_Decommit(This->pAlloc)))
808             ERR("Allocator decommit failed with error %x. Possible memory leak\n", hr);
809 
810         if (This->pin.pConnectedTo)
811         {
812             IPin_Release(This->pin.pConnectedTo);
813             This->pin.pConnectedTo = NULL;
814             PullPin_StopProcessing(This);
815 
816             FreeMediaType(&This->pin.mtCurrent);
817             ZeroMemory(&This->pin.mtCurrent, sizeof(This->pin.mtCurrent));
818             hr = S_OK;
819         }
820         else
821             hr = S_FALSE;
822     }
823     LeaveCriticalSection(This->pin.pCritSec);
824 
825     return hr;
826 }
827 
828 HRESULT WINAPI PullPin_NewSegment(IPin * iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate)
829 {
830     newsegmentargs args;
831     FIXME("(%p)->(%s, %s, %g) stub\n", iface, wine_dbgstr_longlong(tStart), wine_dbgstr_longlong(tStop), dRate);
832 
833     args.tStart = tStart;
834     args.tStop = tStop;
835     args.rate = dRate;
836 
837     return SendFurther( iface, deliver_newsegment, &args, NULL );
838 }
839