//==========================================================================;
//
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//
//--------------------------------------------------------------------------;
// implementation of CPullPin class - pulls data from IAsyncReader
#include <streams.h>
#include "pullpin.h"
CPullPin::CPullPin()
: m_pReader(NULL),
m_pAlloc(NULL),
m_State(TM_Exit)
{
}
CPullPin::~CPullPin()
{
Disconnect();
}
// returns S_OK if successfully connected to an IAsyncReader interface
// from this object
// Optional allocator should be proposed as a preferred allocator if
// necessary
HRESULT
CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
{
CAutoLock lock(&m_AccessLock);
if (m_pReader) {
return VFW_E_ALREADY_CONNECTED;
}
HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
if (FAILED(hr)) {
return(hr);
}
hr = DecideAllocator(pAlloc, NULL);
if (FAILED(hr)) {
Disconnect();
return hr;
}
LONGLONG llTotal, llAvail;
hr = m_pReader->Length(&llTotal, &llAvail);
if (FAILED(hr)) {
Disconnect();
return hr;
}
// convert from file position to reference time
m_tDuration = llTotal * UNITS;
m_tStop = m_tDuration;
m_tStart = 0;
m_bSync = bSync;
return S_OK;
}
// disconnect any connection made in Connect
HRESULT
CPullPin::Disconnect()
{
CAutoLock lock(&m_AccessLock);
StopThread();
if (m_pReader) {
m_pReader->Release();
m_pReader = NULL;
}
if (m_pAlloc) {
m_pAlloc->Release();
m_pAlloc = NULL;
}
return S_OK;
}
// agree an allocator using RequestAllocator - optional
// props param specifies your requirements (non-zero fields).
// returns an error code if fail to match requirements.
// optional IMemAllocator interface is offered as a preferred allocator
// but no error occurs if it can't be met.
HRESULT
CPullPin::DecideAllocator(
IMemAllocator * pAlloc,
ALLOCATOR_PROPERTIES * pProps)
{
ALLOCATOR_PROPERTIES *pRequest;
ALLOCATOR_PROPERTIES Request;
if (pProps == NULL) {
Request.cBuffers = 3;
Request.cbBuffer = 64*1024;
Request.cbAlign = 0;
Request.cbPrefix = 0;
pRequest = &Request;
} else {
pRequest = pProps;
}
HRESULT hr = m_pReader->RequestAllocator(
pAlloc,
pRequest,
&m_pAlloc);
return hr;
}
// start pulling data
HRESULT
CPullPin::Active(void)
{
ASSERT(!ThreadExists());
return StartThread();
}
// stop pulling data
HRESULT
CPullPin::Inactive(void)
{
StopThread();
return S_OK;
}
HRESULT
CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
{
CAutoLock lock(&m_AccessLock);
ThreadMsg AtStart = m_State;
if (AtStart == TM_Start) {
BeginFlush();
PauseThread();
EndFlush();
}
m_tStart = tStart;
m_tStop = tStop;
HRESULT hr = S_OK;
if (AtStart == TM_Start) {
hr = StartThread();
}
return hr;
}
HRESULT
CPullPin::Duration(REFERENCE_TIME* ptDuration)
{
*ptDuration = m_tDuration;
return S_OK;
}
HRESULT
CPullPin::StartThread()
{
CAutoLock lock(&m_AccessLock);
if (!m_pAlloc || !m_pReader) {
return E_UNEXPECTED;
}
HRESULT hr;
if (!ThreadExists()) {
// commit allocator
hr = m_pAlloc->Commit();
if (FAILED(hr)) {
return hr;
}
// start thread
if (!Create()) {
return E_FAIL;
}
}
m_State = TM_Start;
hr = (HRESULT) CallWorker(m_State);
return hr;
}
HRESULT
CPullPin::PauseThread()
{
CAutoLock lock(&m_AccessLock);
if (!ThreadExists()) {
return E_UNEXPECTED;
}
// need to flush to ensure the thread is not blocked
// in WaitForNext
HRESULT hr = m_pReader->BeginFlush();
if (FAILED(hr)) {
return hr;
}
m_State = TM_Pause;
hr = CallWorker(TM_Pause);
m_pReader->EndFlush();
return hr;
}
HRESULT
CPullPin::StopThread()
{
CAutoLock lock(&m_AccessLock);
if (!ThreadExists()) {
return S_FALSE;
}
// need to flush to ensure the thread is not blocked
// in WaitForNext
HRESULT hr = m_pReader->BeginFlush();
if (FAILED(hr)) {
return hr;
}
m_State = TM_Exit;
hr = CallWorker(TM_Exit);
m_pReader->EndFlush();
// wait for thread to completely exit
Close();
// decommit allocator
if (m_pAlloc) {
m_pAlloc->Decommit();
}
return S_OK;
}
DWORD
CPullPin::ThreadProc(void)
{
while(1) {
DWORD cmd = GetRequest();
switch(cmd) {
case TM_Exit:
Reply(S_OK);
return 0;
case TM_Pause:
// we are paused already
Reply(S_OK);
break;
case TM_Start:
Reply(S_OK);
Process();
break;
}
// at this point, there should be no outstanding requests on the
// upstream filter.
// We should force begin/endflush to ensure that this is true.
// !!!Note that we may currently be inside a BeginFlush/EndFlush pair
// on another thread, but the premature EndFlush will do no harm now
// that we are idle.
m_pReader->BeginFlush();
CleanupCancelled();
m_pReader->EndFlush();
}
}
HRESULT
CPullPin::QueueSample(
REFERENCE_TIME& tCurrent,
REFERENCE_TIME tAlignStop,
BOOL bDiscontinuity
)
{
IMediaSample* pSample;
HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
if (FAILED(hr)) {
return hr;
}
LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
if (tStopThis > tAlignStop) {
tStopThis = tAlignStop;
}
pSample->SetTime(&tCurrent, &tStopThis);
tCurrent = tStopThis;
pSample->SetDiscontinuity(bDiscontinuity);
hr = m_pReader->Request(
pSample,
0);
if (FAILED(hr)) {
pSample->Release();
CleanupCancelled();
OnError(hr);
}
return hr;
}
HRESULT
CPullPin::CollectAndDeliver(
REFERENCE_TIME tStart,
REFERENCE_TIME tStop)
{
IMediaSample* pSample = NULL; // better be sure pSample is set
DWORD dwUnused;
HRESULT hr = m_pReader->WaitForNext(
INFINITE,
&pSample,
&dwUnused);
if (FAILED(hr)) {
if (pSample) {
pSample->Release();
}
} else {
hr = DeliverSample(pSample, tStart, tStop);
}
if (FAILED(hr)) {
CleanupCancelled();
OnError(hr);
}
return hr;
}
HRESULT
CPullPin::DeliverSample(
IMediaSample* pSample,
REFERENCE_TIME tStart,
REFERENCE_TIME tStop
)
{
// fix up sample if past actual stop (for sector alignment)
REFERENCE_TIME t1, t2;
pSample->GetTime(&t1, &t2);
if (t2 > tStop) {
t2 = tStop;
}
// adjust times to be relative to (aligned) start time
t1 -= tStart;
t2 -= tStart;
pSample->SetTime(&t1, &t2);
HRESULT hr = Receive(pSample);
pSample->Release();
return hr;
}
void
CPullPin::Process(void)
{
// is there anything to do?
if (m_tStop <= m_tStart) {
EndOfStream();
return;
}
BOOL bDiscontinuity = TRUE;
// if there is more than one sample at the allocator,
// then try to queue 2 at once in order to overlap.
// -- get buffer count and required alignment
ALLOCATOR_PROPERTIES Actual;
HRESULT hr = m_pAlloc->GetProperties(&Actual);
// align the start position downwards
REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
REFERENCE_TIME tCurrent = tStart;
REFERENCE_TIME tStop = m_tStop;
if (tStop > m_tDuration) {
tStop = m_tDuration;
}
// align the stop position - may be past stop, but that
// doesn't matter
REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
DWORD dwRequest;
if (!m_bSync) {
// Break out of the loop either if we get to the end or we're asked
// to do something else
while (tCurrent < tAlignStop) {
// Break out without calling EndOfStream if we're asked to
// do something different
if (CheckRequest(&dwRequest)) {
return;
}
// queue a first sample
if (Actual.cBuffers > 1) {
hr = QueueSample(tCurrent, tAlignStop, TRUE);
bDiscontinuity = FALSE;
if (FAILED(hr)) {
return;
}
}
// loop queueing second and waiting for first..
while (tCurrent < tAlignStop) {
hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
bDiscontinuity = FALSE;
if (FAILED(hr)) {
return;
}
hr = CollectAndDeliver(tStart, tStop);
if (S_OK != hr) {
// stop if error, or if downstream filter said
// to stop.
return;
}
}
if (Actual.cBuffers > 1) {
hr = CollectAndDeliver(tStart, tStop);
if (FAILED(hr)) {
return;
}
}
}
} else {
// sync version of above loop
while (tCurrent < tAlignStop) {
// Break out without calling EndOfStream if we're asked to
// do something different
if (CheckRequest(&dwRequest)) {
return;
}
IMediaSample* pSample;
hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
if (FAILED(hr)) {
OnError(hr);
return;
}
LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
if (tStopThis > tAlignStop) {
tStopThis = tAlignStop;
}
pSample->SetTime(&tCurrent, &tStopThis);
tCurrent = tStopThis;
if (bDiscontinuity) {
pSample->SetDiscontinuity(TRUE);
bDiscontinuity = FALSE;
}
hr = m_pReader->SyncReadAligned(pSample);
if (FAILED(hr)) {
pSample->Release();
OnError(hr);
return;
}
hr = DeliverSample(pSample, tStart, tStop);
if (hr != S_OK) {
if (FAILED(hr)) {
OnError(hr);
}
return;
}
}
}
EndOfStream();
}
// after a flush, cancelled i/o will be waiting for collection
// and release
void
CPullPin::CleanupCancelled(void)
{
while (1) {
IMediaSample * pSample;
DWORD dwUnused;
HRESULT hr = m_pReader->WaitForNext(
0, // no wait
&pSample,
&dwUnused);
if(pSample) {
pSample->Release();
} else {
// no more samples
return;
}
}
}