xref: /reactos/dll/directx/wine/quartz/pin.c (revision 3c774903)
1 /*
2  * Generic Implementation of IPin Interface
3  *
4  * Copyright 2003 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20 
21 #include "quartz_private.h"
22 #include "pin.h"
23 
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "uuids.h"
27 #include "vfwmsgs.h"
28 #include <assert.h>
29 
30 WINE_DEFAULT_DEBUG_CHANNEL(quartz);
31 
32 #define ALIGNDOWN(value,boundary) ((value)/(boundary)*(boundary))
33 #define ALIGNUP(value,boundary) (ALIGNDOWN((value)+(boundary)-1, (boundary)))
34 
35 typedef HRESULT (*SendPinFunc)( IPin *to, LPVOID arg );
36 
37 /** Helper function, there are a lot of places where the error code is inherited
38  * The following rules apply:
39  *
40  * Return the first received error code (E_NOTIMPL is ignored)
41  * If no errors occur: return the first received non-error-code that isn't S_OK
42  */
43 static HRESULT updatehres( HRESULT original, HRESULT new )
44 {
45     if (FAILED( original ) || new == E_NOTIMPL)
46         return original;
47 
48     if (FAILED( new ) || original == S_OK)
49         return new;
50 
51     return original;
52 }
53 
54 /** Sends a message from a pin further to other, similar pins
55  * fnMiddle is called on each pin found further on the stream.
56  * fnEnd (can be NULL) is called when the message can't be sent any further (this is a renderer or source)
57  *
58  * If the pin given is an input pin, the message will be sent downstream to other input pins
59  * If the pin given is an output pin, the message will be sent upstream to other output pins
60  */
61 static HRESULT SendFurther( IPin *from, SendPinFunc fnMiddle, LPVOID arg, SendPinFunc fnEnd )
62 {
63     PIN_INFO pin_info;
64     ULONG amount = 0;
65     HRESULT hr = S_OK;
66     HRESULT hr_return = S_OK;
67     IEnumPins *enumpins = NULL;
68     BOOL foundend = TRUE;
69     PIN_DIRECTION from_dir;
70 
71     IPin_QueryDirection( from, &from_dir );
72 
73     hr = IPin_QueryInternalConnections( from, NULL, &amount );
74     if (hr != E_NOTIMPL && amount)
75         FIXME("Use QueryInternalConnections!\n");
76 
77     pin_info.pFilter = NULL;
78     hr = IPin_QueryPinInfo( from, &pin_info );
79     if (FAILED(hr))
80         goto out;
81 
82     hr = IBaseFilter_EnumPins( pin_info.pFilter, &enumpins );
83     if (FAILED(hr))
84         goto out;
85 
86     hr = IEnumPins_Reset( enumpins );
87     while (hr == S_OK) {
88         IPin *pin = NULL;
89         hr = IEnumPins_Next( enumpins, 1, &pin, NULL );
90         if (hr == VFW_E_ENUM_OUT_OF_SYNC)
91         {
92             hr = IEnumPins_Reset( enumpins );
93             continue;
94         }
95         if (pin)
96         {
97             PIN_DIRECTION dir;
98 
99             IPin_QueryDirection( pin, &dir );
100             if (dir != from_dir)
101             {
102                 IPin *connected = NULL;
103 
104                 foundend = FALSE;
105                 IPin_ConnectedTo( pin, &connected );
106                 if (connected)
107                 {
108                     HRESULT hr_local;
109 
110                     hr_local = fnMiddle( connected, arg );
111                     hr_return = updatehres( hr_return, hr_local );
112                     IPin_Release(connected);
113                 }
114             }
115             IPin_Release( pin );
116         }
117         else
118         {
119             hr = S_OK;
120             break;
121         }
122     }
123 
124     if (!foundend)
125         hr = hr_return;
126     else if (fnEnd) {
127         HRESULT hr_local;
128 
129         hr_local = fnEnd( from, arg );
130         hr_return = updatehres( hr_return, hr_local );
131     }
132 
133 out:
134     if (enumpins)
135         IEnumPins_Release( enumpins );
136     if (pin_info.pFilter)
137         IBaseFilter_Release( pin_info.pFilter );
138     return hr;
139 }
140 
141 
142 static void Copy_PinInfo(PIN_INFO * pDest, const PIN_INFO * pSrc)
143 {
144     /* Tempting to just do a memcpy, but the name field is
145        128 characters long! We will probably never exceed 10
146        most of the time, so we are better off copying
147        each field manually */
148     strcpyW(pDest->achName, pSrc->achName);
149     pDest->dir = pSrc->dir;
150     pDest->pFilter = pSrc->pFilter;
151 }
152 
153 static HRESULT deliver_endofstream(IPin* pin, LPVOID unused)
154 {
155     return IPin_EndOfStream( pin );
156 }
157 
158 static HRESULT deliver_beginflush(IPin* pin, LPVOID unused)
159 {
160     return IPin_BeginFlush( pin );
161 }
162 
163 static HRESULT deliver_endflush(IPin* pin, LPVOID unused)
164 {
165     return IPin_EndFlush( pin );
166 }
167 
168 typedef struct newsegmentargs
169 {
170     REFERENCE_TIME tStart, tStop;
171     double rate;
172 } newsegmentargs;
173 
174 static HRESULT deliver_newsegment(IPin *pin, LPVOID data)
175 {
176     newsegmentargs *args = data;
177     return IPin_NewSegment(pin, args->tStart, args->tStop, args->rate);
178 }
179 
180 /*** PullPin implementation ***/
181 
182 static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData,
183                             QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl)
184 {
185     /* Common attributes */
186     pPinImpl->pin.IPin_iface.lpVtbl = PullPin_Vtbl;
187     pPinImpl->pin.refCount = 1;
188     pPinImpl->pin.pConnectedTo = NULL;
189     pPinImpl->pin.pCritSec = pCritSec;
190     Copy_PinInfo(&pPinImpl->pin.pinInfo, pPinInfo);
191     ZeroMemory(&pPinImpl->pin.mtCurrent, sizeof(AM_MEDIA_TYPE));
192 
193     /* Input pin attributes */
194     pPinImpl->pUserData = pUserData;
195     pPinImpl->fnQueryAccept = pQueryAccept;
196     pPinImpl->fnSampleProc = pSampleProc;
197     pPinImpl->fnCleanProc = pCleanUp;
198     pPinImpl->fnDone = pDone;
199     pPinImpl->fnPreConnect = NULL;
200     pPinImpl->pAlloc = NULL;
201     pPinImpl->prefAlloc = NULL;
202     pPinImpl->pReader = NULL;
203     pPinImpl->hThread = NULL;
204     pPinImpl->hEventStateChanged = CreateEventW(NULL, TRUE, TRUE, NULL);
205     pPinImpl->thread_sleepy = CreateEventW(NULL, FALSE, FALSE, NULL);
206 
207     pPinImpl->rtStart = 0;
208     pPinImpl->rtCurrent = 0;
209     pPinImpl->rtStop = ((LONGLONG)0x7fffffff << 32) | 0xffffffff;
210     pPinImpl->dRate = 1.0;
211     pPinImpl->state = Req_Die;
212     pPinImpl->fnCustomRequest = pCustomRequest;
213     pPinImpl->stop_playback = TRUE;
214 
215     InitializeCriticalSection(&pPinImpl->thread_lock);
216     pPinImpl->thread_lock.DebugInfo->Spare[0] = (DWORD_PTR)( __FILE__ ": PullPin.thread_lock");
217 
218     return S_OK;
219 }
220 
221 HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo,
222                           SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept,
223                           CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone,
224                           LPCRITICAL_SECTION pCritSec, IPin ** ppPin)
225 {
226     PullPin * pPinImpl;
227 
228     *ppPin = NULL;
229 
230     if (pPinInfo->dir != PINDIR_INPUT)
231     {
232         ERR("Pin direction(%x) != PINDIR_INPUT\n", pPinInfo->dir);
233         return E_INVALIDARG;
234     }
235 
236     pPinImpl = CoTaskMemAlloc(sizeof(*pPinImpl));
237 
238     if (!pPinImpl)
239         return E_OUTOFMEMORY;
240 
241     if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pDone, pCritSec, pPinImpl)))
242     {
243         *ppPin = &pPinImpl->pin.IPin_iface;
244         return S_OK;
245     }
246 
247     CoTaskMemFree(pPinImpl);
248     return E_FAIL;
249 }
250 
251 static HRESULT PullPin_InitProcessing(PullPin * This);
252 
253 HRESULT WINAPI PullPin_ReceiveConnection(IPin * iface, IPin * pReceivePin, const AM_MEDIA_TYPE * pmt)
254 {
255     PIN_DIRECTION pindirReceive;
256     HRESULT hr = S_OK;
257     PullPin *This = impl_PullPin_from_IPin(iface);
258 
259     TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
260     dump_AM_MEDIA_TYPE(pmt);
261 
262     EnterCriticalSection(This->pin.pCritSec);
263     if (!This->pin.pConnectedTo)
264     {
265         ALLOCATOR_PROPERTIES props;
266 
267         props.cBuffers = 3;
268         props.cbBuffer = 64 * 1024; /* 64 KB */
269         props.cbAlign = 1;
270         props.cbPrefix = 0;
271 
272         if (This->fnQueryAccept(This->pUserData, pmt) != S_OK)
273             hr = VFW_E_TYPE_NOT_ACCEPTED; /* FIXME: shouldn't we just map common errors onto
274                                            * VFW_E_TYPE_NOT_ACCEPTED and pass the value on otherwise? */
275 
276         if (SUCCEEDED(hr))
277         {
278             IPin_QueryDirection(pReceivePin, &pindirReceive);
279 
280             if (pindirReceive != PINDIR_OUTPUT)
281             {
282                 ERR("Can't connect from non-output pin\n");
283                 hr = VFW_E_INVALID_DIRECTION;
284             }
285         }
286 
287         This->pReader = NULL;
288         This->pAlloc = NULL;
289         This->prefAlloc = NULL;
290         if (SUCCEEDED(hr))
291         {
292             hr = IPin_QueryInterface(pReceivePin, &IID_IAsyncReader, (LPVOID *)&This->pReader);
293         }
294 
295         if (SUCCEEDED(hr) && This->fnPreConnect)
296         {
297             hr = This->fnPreConnect(iface, pReceivePin, &props);
298         }
299 
300         /*
301          * Some custom filters (such as the one used by Fallout 3
302          * and Fallout: New Vegas) expect to be passed a non-NULL
303          * preferred allocator.
304          */
305         if (SUCCEEDED(hr))
306         {
307             hr = StdMemAllocator_create(NULL, (LPVOID *) &This->prefAlloc);
308         }
309 
310         if (SUCCEEDED(hr))
311         {
312             hr = IAsyncReader_RequestAllocator(This->pReader, This->prefAlloc, &props, &This->pAlloc);
313         }
314 
315         if (SUCCEEDED(hr))
316         {
317             CopyMediaType(&This->pin.mtCurrent, pmt);
318             This->pin.pConnectedTo = pReceivePin;
319             IPin_AddRef(pReceivePin);
320             hr = IMemAllocator_Commit(This->pAlloc);
321         }
322 
323         if (SUCCEEDED(hr))
324             hr = PullPin_InitProcessing(This);
325 
326         if (FAILED(hr))
327         {
328              if (This->pReader)
329                  IAsyncReader_Release(This->pReader);
330              This->pReader = NULL;
331              if (This->prefAlloc)
332                  IMemAllocator_Release(This->prefAlloc);
333              This->prefAlloc = NULL;
334              if (This->pAlloc)
335                  IMemAllocator_Release(This->pAlloc);
336              This->pAlloc = NULL;
337         }
338     }
339     else
340         hr = VFW_E_ALREADY_CONNECTED;
341     LeaveCriticalSection(This->pin.pCritSec);
342     return hr;
343 }
344 
345 HRESULT WINAPI PullPin_QueryInterface(IPin * iface, REFIID riid, LPVOID * ppv)
346 {
347     PullPin *This = impl_PullPin_from_IPin(iface);
348 
349     TRACE("(%p/%p)->(%s, %p)\n", This, iface, qzdebugstr_guid(riid), ppv);
350 
351     *ppv = NULL;
352 
353     if (IsEqualIID(riid, &IID_IUnknown))
354         *ppv = iface;
355     else if (IsEqualIID(riid, &IID_IPin))
356         *ppv = iface;
357     else if (IsEqualIID(riid, &IID_IMediaSeeking) ||
358              IsEqualIID(riid, &IID_IQualityControl))
359     {
360         return IBaseFilter_QueryInterface(This->pin.pinInfo.pFilter, riid, ppv);
361     }
362 
363     if (*ppv)
364     {
365         IUnknown_AddRef((IUnknown *)(*ppv));
366         return S_OK;
367     }
368 
369     FIXME("No interface for %s!\n", qzdebugstr_guid(riid));
370 
371     return E_NOINTERFACE;
372 }
373 
374 ULONG WINAPI PullPin_Release(IPin *iface)
375 {
376     PullPin *This = impl_PullPin_from_IPin(iface);
377     ULONG refCount = InterlockedDecrement(&This->pin.refCount);
378 
379     TRACE("(%p)->() Release from %d\n", This, refCount + 1);
380 
381     if (!refCount)
382     {
383         WaitForSingleObject(This->hEventStateChanged, INFINITE);
384         assert(!This->hThread);
385 
386         if(This->prefAlloc)
387             IMemAllocator_Release(This->prefAlloc);
388         if(This->pAlloc)
389             IMemAllocator_Release(This->pAlloc);
390         if(This->pReader)
391             IAsyncReader_Release(This->pReader);
392         CloseHandle(This->thread_sleepy);
393         CloseHandle(This->hEventStateChanged);
394         This->thread_lock.DebugInfo->Spare[0] = 0;
395         DeleteCriticalSection(&This->thread_lock);
396         CoTaskMemFree(This);
397         return 0;
398     }
399     return refCount;
400 }
401 
402 static void PullPin_Flush(PullPin *This)
403 {
404     IMediaSample *pSample;
405     TRACE("Flushing!\n");
406 
407     if (This->pReader)
408     {
409         /* Do not allow state to change while flushing */
410         EnterCriticalSection(This->pin.pCritSec);
411 
412         /* Flush outstanding samples */
413         IAsyncReader_BeginFlush(This->pReader);
414 
415         for (;;)
416         {
417             DWORD_PTR dwUser;
418 
419             pSample = NULL;
420             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
421 
422             if (!pSample)
423                 break;
424 
425             assert(!IMediaSample_GetActualDataLength(pSample));
426 
427             IMediaSample_Release(pSample);
428         }
429 
430         IAsyncReader_EndFlush(This->pReader);
431 
432         LeaveCriticalSection(This->pin.pCritSec);
433     }
434 }
435 
436 static void PullPin_Thread_Process(PullPin *This)
437 {
438     HRESULT hr;
439     IMediaSample * pSample = NULL;
440     ALLOCATOR_PROPERTIES allocProps;
441 
442     hr = IMemAllocator_GetProperties(This->pAlloc, &allocProps);
443 
444     This->cbAlign = allocProps.cbAlign;
445 
446     if (This->rtCurrent < This->rtStart)
447         This->rtCurrent = MEDIATIME_FROM_BYTES(ALIGNDOWN(BYTES_FROM_MEDIATIME(This->rtStart), This->cbAlign));
448 
449     TRACE("Start\n");
450 
451     if (This->rtCurrent >= This->rtStop)
452     {
453         IPin_EndOfStream(&This->pin.IPin_iface);
454         return;
455     }
456 
457     /* There is no sample in our buffer */
458     hr = This->fnCustomRequest(This->pUserData);
459 
460     if (FAILED(hr))
461         ERR("Request error: %x\n", hr);
462 
463     EnterCriticalSection(This->pin.pCritSec);
464     SetEvent(This->hEventStateChanged);
465     LeaveCriticalSection(This->pin.pCritSec);
466 
467     if (SUCCEEDED(hr))
468     do
469     {
470         DWORD_PTR dwUser;
471 
472         TRACE("Process sample\n");
473 
474         pSample = NULL;
475         hr = IAsyncReader_WaitForNext(This->pReader, 10000, &pSample, &dwUser);
476 
477         /* Return an empty sample on error to the implementation in case it does custom parsing, so it knows it's gone */
478         if (SUCCEEDED(hr))
479         {
480             hr = This->fnSampleProc(This->pUserData, pSample, dwUser);
481         }
482         else
483         {
484             if (hr == VFW_E_TIMEOUT)
485             {
486                 if (pSample != NULL)
487                     WARN("Non-NULL sample returned with VFW_E_TIMEOUT.\n");
488                 hr = S_OK;
489             }
490             /* FIXME: Errors are not well handled yet! */
491             else
492                 ERR("Processing error: %x\n", hr);
493         }
494 
495         if (pSample)
496         {
497             IMediaSample_Release(pSample);
498             pSample = NULL;
499         }
500     } while (This->rtCurrent < This->rtStop && hr == S_OK && !This->stop_playback);
501 
502     /*
503      * Sample was rejected, and we are asked to terminate.  When there is more than one buffer
504      * it is possible for a filter to have several queued samples, making it necessary to
505      * release all of these pending samples.
506      */
507     if (This->stop_playback || FAILED(hr))
508     {
509         DWORD_PTR dwUser;
510 
511         do
512         {
513             if (pSample)
514                 IMediaSample_Release(pSample);
515             pSample = NULL;
516             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
517         } while(pSample);
518     }
519 
520     /* Can't reset state to Sleepy here because that might race, instead PauseProcessing will do that for us
521      * Flush remaining samples
522      */
523     if (This->fnDone)
524         This->fnDone(This->pUserData);
525 
526     TRACE("End: %08x, %d\n", hr, This->stop_playback);
527 }
528 
529 static void PullPin_Thread_Pause(PullPin *This)
530 {
531     PullPin_Flush(This);
532 
533     EnterCriticalSection(This->pin.pCritSec);
534     This->state = Req_Sleepy;
535     SetEvent(This->hEventStateChanged);
536     LeaveCriticalSection(This->pin.pCritSec);
537 }
538 
539 static void  PullPin_Thread_Stop(PullPin *This)
540 {
541     TRACE("(%p)->()\n", This);
542 
543     EnterCriticalSection(This->pin.pCritSec);
544     SetEvent(This->hEventStateChanged);
545     LeaveCriticalSection(This->pin.pCritSec);
546 
547     IBaseFilter_Release(This->pin.pinInfo.pFilter);
548 
549     CoUninitialize();
550     ExitThread(0);
551 }
552 
553 static DWORD WINAPI PullPin_Thread_Main(LPVOID pv)
554 {
555     PullPin *This = pv;
556     CoInitializeEx(NULL, COINIT_MULTITHREADED);
557 
558     PullPin_Flush(This);
559 
560     for (;;)
561     {
562         WaitForSingleObject(This->thread_sleepy, INFINITE);
563 
564         TRACE("State: %d\n", This->state);
565 
566         switch (This->state)
567         {
568         case Req_Die: PullPin_Thread_Stop(This); break;
569         case Req_Run: PullPin_Thread_Process(This); break;
570         case Req_Pause: PullPin_Thread_Pause(This); break;
571         case Req_Sleepy: ERR("Should not be signalled with SLEEPY!\n"); break;
572         default: ERR("Unknown state request: %d\n", This->state); break;
573         }
574     }
575     return 0;
576 }
577 
578 static HRESULT PullPin_InitProcessing(PullPin * This)
579 {
580     HRESULT hr = S_OK;
581 
582     TRACE("(%p)->()\n", This);
583 
584     /* if we are connected */
585     if (This->pAlloc)
586     {
587         DWORD dwThreadId;
588 
589         WaitForSingleObject(This->hEventStateChanged, INFINITE);
590         EnterCriticalSection(This->pin.pCritSec);
591 
592         assert(!This->hThread);
593         assert(This->state == Req_Die);
594         assert(This->stop_playback);
595         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
596         This->state = Req_Sleepy;
597 
598         /* AddRef the filter to make sure it and its pins will be around
599          * as long as the thread */
600         IBaseFilter_AddRef(This->pin.pinInfo.pFilter);
601 
602 
603         This->hThread = CreateThread(NULL, 0, PullPin_Thread_Main, This, 0, &dwThreadId);
604         if (!This->hThread)
605         {
606             hr = HRESULT_FROM_WIN32(GetLastError());
607             IBaseFilter_Release(This->pin.pinInfo.pFilter);
608         }
609 
610         if (SUCCEEDED(hr))
611         {
612             SetEvent(This->hEventStateChanged);
613             /* If assert fails, that means a command was not processed before the thread previously terminated */
614         }
615         LeaveCriticalSection(This->pin.pCritSec);
616     }
617 
618     TRACE(" -- %x\n", hr);
619 
620     return hr;
621 }
622 
623 HRESULT PullPin_StartProcessing(PullPin * This)
624 {
625     /* if we are connected */
626     TRACE("(%p)->()\n", This);
627     if(This->pAlloc)
628     {
629         assert(This->hThread);
630 
631         PullPin_WaitForStateChange(This, INFINITE);
632 
633         assert(This->state == Req_Sleepy);
634 
635         /* Wake up! */
636         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
637         This->state = Req_Run;
638         This->stop_playback = FALSE;
639         ResetEvent(This->hEventStateChanged);
640         SetEvent(This->thread_sleepy);
641     }
642 
643     return S_OK;
644 }
645 
646 HRESULT PullPin_PauseProcessing(PullPin * This)
647 {
648     /* if we are connected */
649     TRACE("(%p)->()\n", This);
650     if(This->pAlloc)
651     {
652         assert(This->hThread);
653 
654         PullPin_WaitForStateChange(This, INFINITE);
655 
656         EnterCriticalSection(This->pin.pCritSec);
657 
658         assert(!This->stop_playback);
659         assert(This->state == Req_Run|| This->state == Req_Sleepy);
660 
661         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
662 
663         This->state = Req_Pause;
664         This->stop_playback = TRUE;
665         ResetEvent(This->hEventStateChanged);
666         SetEvent(This->thread_sleepy);
667 
668         /* Release any outstanding samples */
669         if (This->pReader)
670         {
671             IMediaSample *pSample;
672             DWORD_PTR dwUser;
673 
674             do
675             {
676                 pSample = NULL;
677                 IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
678                 if (pSample)
679                     IMediaSample_Release(pSample);
680             } while(pSample);
681         }
682 
683         LeaveCriticalSection(This->pin.pCritSec);
684     }
685 
686     return S_OK;
687 }
688 
689 static HRESULT PullPin_StopProcessing(PullPin * This)
690 {
691     TRACE("(%p)->()\n", This);
692 
693     /* if we are alive */
694     assert(This->hThread);
695 
696     PullPin_WaitForStateChange(This, INFINITE);
697 
698     assert(This->state == Req_Pause || This->state == Req_Sleepy);
699 
700     This->stop_playback = TRUE;
701     This->state = Req_Die;
702     assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
703     ResetEvent(This->hEventStateChanged);
704     SetEvent(This->thread_sleepy);
705     return S_OK;
706 }
707 
708 HRESULT PullPin_WaitForStateChange(PullPin * This, DWORD dwMilliseconds)
709 {
710     if (WaitForSingleObject(This->hEventStateChanged, dwMilliseconds) == WAIT_TIMEOUT)
711         return S_FALSE;
712     return S_OK;
713 }
714 
715 HRESULT WINAPI PullPin_QueryAccept(IPin * iface, const AM_MEDIA_TYPE * pmt)
716 {
717     PullPin *This = impl_PullPin_from_IPin(iface);
718 
719     TRACE("(%p/%p)->(%p)\n", This, iface, pmt);
720 
721     return (This->fnQueryAccept(This->pUserData, pmt) == S_OK ? S_OK : S_FALSE);
722 }
723 
724 HRESULT WINAPI PullPin_EndOfStream(IPin * iface)
725 {
726     PullPin *This = impl_PullPin_from_IPin(iface);
727     HRESULT hr = S_FALSE;
728 
729     TRACE("(%p)->()\n", iface);
730 
731     EnterCriticalSection(This->pin.pCritSec);
732     hr = SendFurther( iface, deliver_endofstream, NULL, NULL );
733     SetEvent(This->hEventStateChanged);
734     LeaveCriticalSection(This->pin.pCritSec);
735 
736     return hr;
737 }
738 
739 HRESULT WINAPI PullPin_BeginFlush(IPin * iface)
740 {
741     PullPin *This = impl_PullPin_from_IPin(iface);
742     TRACE("(%p)->()\n", This);
743 
744     EnterCriticalSection(This->pin.pCritSec);
745     {
746         SendFurther( iface, deliver_beginflush, NULL, NULL );
747     }
748     LeaveCriticalSection(This->pin.pCritSec);
749 
750     EnterCriticalSection(&This->thread_lock);
751     {
752         if (This->pReader)
753             IAsyncReader_BeginFlush(This->pReader);
754         PullPin_WaitForStateChange(This, INFINITE);
755 
756         if (This->hThread && This->state == Req_Run)
757         {
758             PullPin_PauseProcessing(This);
759             PullPin_WaitForStateChange(This, INFINITE);
760         }
761     }
762     LeaveCriticalSection(&This->thread_lock);
763 
764     EnterCriticalSection(This->pin.pCritSec);
765     {
766         This->fnCleanProc(This->pUserData);
767     }
768     LeaveCriticalSection(This->pin.pCritSec);
769 
770     return S_OK;
771 }
772 
773 HRESULT WINAPI PullPin_EndFlush(IPin * iface)
774 {
775     PullPin *This = impl_PullPin_from_IPin(iface);
776 
777     TRACE("(%p)->()\n", iface);
778 
779     /* Send further first: Else a race condition might terminate processing early */
780     EnterCriticalSection(This->pin.pCritSec);
781     SendFurther( iface, deliver_endflush, NULL, NULL );
782     LeaveCriticalSection(This->pin.pCritSec);
783 
784     EnterCriticalSection(&This->thread_lock);
785     {
786         FILTER_STATE state;
787 
788         if (This->pReader)
789             IAsyncReader_EndFlush(This->pReader);
790 
791         IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
792 
793         if (state != State_Stopped)
794             PullPin_StartProcessing(This);
795 
796         PullPin_WaitForStateChange(This, INFINITE);
797     }
798     LeaveCriticalSection(&This->thread_lock);
799 
800     return S_OK;
801 }
802 
803 HRESULT WINAPI PullPin_Disconnect(IPin *iface)
804 {
805     HRESULT hr;
806     PullPin *This = impl_PullPin_from_IPin(iface);
807 
808     TRACE("()\n");
809 
810     EnterCriticalSection(This->pin.pCritSec);
811     {
812         if (FAILED(hr = IMemAllocator_Decommit(This->pAlloc)))
813             ERR("Allocator decommit failed with error %x. Possible memory leak\n", hr);
814 
815         if (This->pin.pConnectedTo)
816         {
817             IPin_Release(This->pin.pConnectedTo);
818             This->pin.pConnectedTo = NULL;
819             PullPin_StopProcessing(This);
820 
821             FreeMediaType(&This->pin.mtCurrent);
822             ZeroMemory(&This->pin.mtCurrent, sizeof(This->pin.mtCurrent));
823             hr = S_OK;
824         }
825         else
826             hr = S_FALSE;
827     }
828     LeaveCriticalSection(This->pin.pCritSec);
829 
830     WaitForSingleObject(This->hThread, INFINITE);
831     CloseHandle(This->hThread);
832     This->hThread = NULL;
833 
834     return hr;
835 }
836 
837 HRESULT WINAPI PullPin_NewSegment(IPin * iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate)
838 {
839     newsegmentargs args;
840     FIXME("(%p)->(%s, %s, %g) stub\n", iface, wine_dbgstr_longlong(tStart), wine_dbgstr_longlong(tStop), dRate);
841 
842     args.tStart = tStart;
843     args.tStop = tStop;
844     args.rate = dRate;
845 
846     return SendFurther( iface, deliver_newsegment, &args, NULL );
847 }
848