OUTPUTQ.CPP

//==========================================================================; 
//
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//
//--------------------------------------------------------------------------;

/*
outputq.cpp

COutputQueue class

This is a class used by an output pin which may sometimes want
to queue output samples on a separate thread and sometimes
call Receive() directly on the input pin
*/

#include <streams.h>


//
// COutputQueue Constructor :
//
// Determines if a thread is to be created and creates resources
//
// pInputPin - the downstream input pin we're queueing samples to
//
// phr - changed to a failure code if this function fails
// (otherwise unchanges)
//
// bAuto - Ask pInputPin if it can block in Receive by calling
// its ReceiveCanBlock method and create a thread if
// it can block, otherwise not.
//
// bQueue - if bAuto == FALSE then we create a thread if and only
// if bQueue == TRUE
//
// lBatchSize - work in batches of lBatchSize
//
// bBatchEact - Use exact batch sizes so don't send until the
// batch is full or SendAnyway() is called
//
// lListSize - If we create a thread make the list of samples queued
// to the thread have this size cache
//
// dwPriority - If we create a thread set its priority to this
//
COutputQueue::COutputQueue(
IPin *pInputPin, // Pin to send stuff to
HRESULT *phr, // 'Return code'
BOOL bAuto, // Ask pin if queue or not
BOOL bQueue, // Send through queue
LONG lBatchSize, // Batch
BOOL bBatchExact, // Batch exactly to BatchSize
LONG lListSize,
DWORD dwPriority
) : m_lBatchSize(lBatchSize),
m_bBatchExact(bBatchExact && (lBatchSize > 1)),
m_hThread(NULL),
m_hSem(NULL),
m_List(NULL),
m_pPin(pInputPin),
m_ppSamples(NULL),
m_lWaiting(0),
m_pInputPin(NULL),
m_bSendAnyway(FALSE),
m_nBatched(0),
m_bFlushing(FALSE),
m_bFlushed(TRUE),
m_bTerminate(FALSE),
m_hr(S_OK)
{
ASSERT(m_lBatchSize > 0);


if (FAILED(*phr)) {
return;
}

// Check the input pin is OK and cache its IMemInputPin interface

*phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
if (FAILED(*phr)) {
return;
}

// See if we should ask the downstream pin

if (bAuto) {
HRESULT hr = m_pInputPin->ReceiveCanBlock();
if (SUCCEEDED(hr)) {
bQueue = hr == S_OK;
}
}

// Create our sample batch

m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
if (m_ppSamples == NULL) {
*phr = E_OUTOFMEMORY;
return;
}

// If we're queueing allocate resources

if (bQueue) {
DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
if (m_hSem == NULL) {
DWORD dwError = GetLastError();
*phr = HRESULT_FROM_WIN32(dwError);
return;
}
m_List = new CSampleList(NAME("Sample Queue List"),
lListSize,
FALSE // No lock
);
if (m_List == NULL) {
*phr = E_OUTOFMEMORY;
return;
}


DWORD dwThreadId;
m_hThread = CreateThread(NULL,
0,
InitialThreadProc,
(LPVOID)this,
0,
&dwThreadId);
if (m_hThread == NULL) {
DWORD dwError = GetLastError();
*phr = HRESULT_FROM_WIN32(dwError);
return;
}
SetThreadPriority(m_hThread, dwPriority);
} else {
DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
}
}

//
// COutputQueuee Destructor :
//
// Free all resources -
//
// Thread,
// Batched samples
//
COutputQueue::~COutputQueue()
{
DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
/* Free our pointer */
if (m_pInputPin != NULL) {
m_pInputPin->Release();
}
if (m_hThread != NULL) {
{
CAutoLock lck(this);
m_bTerminate = TRUE;
m_hr = S_FALSE;
NotifyThread();
}
DbgWaitForSingleObject(m_hThread);
EXECUTE_ASSERT(CloseHandle(m_hThread));

// The thread frees the samples when asked to terminate

ASSERT(m_List->GetCount() == 0);
delete m_List;
} else {
FreeSamples();
}
if (m_hSem != NULL) {
EXECUTE_ASSERT(CloseHandle(m_hSem));
}
delete [] m_ppSamples;
}

//
// Call the real thread proc as a member function
//
DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv)
{
COutputQueue *pSampleQueue = (COutputQueue *)pv;
CoInitialize(NULL);
DWORD dwReturn = pSampleQueue->ThreadProc();
CoUninitialize();
return dwReturn;
}

//
// Thread sending the samples downstream :
//
// When there is nothing to do the thread sets m_lWaiting (while
// holding the critical section) and then waits for m_hSem to be
// set (not holding the critical section)
//
DWORD COutputQueue::ThreadProc()
{
while (TRUE) {
BOOL bWait = FALSE;
IMediaSample *pSample;
LONG lNumberToSend; // Local copy
NewSegmentPacket* ppacket;

//
// Get a batch of samples and send it if possible
// In any case exit the loop if there is a control action
// requested
//
{
CAutoLock lck(this);
while (TRUE) {

if (m_bTerminate) {
FreeSamples();
return 0;
}
if (m_bFlushing) {
FreeSamples();
SetEvent(m_evFlushComplete);
}

// Get a sample off the list

pSample = m_List->RemoveHead();

if (pSample != NULL &&
!IsSpecialSample(pSample)) {

// If its just a regular sample just add it to the batch
// and exit the loop if the batch is full

m_ppSamples[m_nBatched++] = pSample;
if (m_nBatched == m_lBatchSize) {
break;
}
} else {

// If there was nothing in the queue and there's nothing
// to send (either because there's nothing or the batch
// isn't full) then prepare to wait

if (pSample == NULL &&
(m_bBatchExact || m_nBatched == 0)) {

// Tell other thread to set the event when there's
// something do to

ASSERT(m_lWaiting == 0);
m_lWaiting++;
bWait = TRUE;
} else {

// We break out of the loop on SEND_PACKET unless
// there's nothing to send

if (pSample == SEND_PACKET && m_nBatched == 0) {
continue;
}

if (pSample == NEW_SEGMENT) {
// now we need the parameters - we are
// guaranteed that the next packet contains them
ppacket = (NewSegmentPacket *) m_List->RemoveHead();
ASSERT(ppacket);
}
// EOS_PACKET falls through here and we exit the loop
// In this way it acts like SEND_PACKET
}
break;
}
}
if (!bWait) {
// We look at m_nBatched from the client side so keep
// it up to date inside the critical section
lNumberToSend = m_nBatched; // Local copy
m_nBatched = 0;
}
}

// Wait for some more data

if (bWait) {
DbgWaitForSingleObject(m_hSem);
continue;
}



// OK - send it if there's anything to send
// We DON'T check m_bBatchExact here because either we've got
// a full batch or we dropped through because we got
// SEND_PACKET or EOS_PACKET - both of which imply we should
// flush our batch

if (lNumberToSend != 0) {
long nProcessed;
if (m_hr == S_OK) {
ASSERT(!m_bFlushed);
HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
lNumberToSend,
&nProcessed);
/* Don't overwrite a flushing state HRESULT */
CAutoLock lck(this);
if (m_hr == S_OK) {
m_hr = hr;
}
ASSERT(!m_bFlushed);
}
while (lNumberToSend != 0) {
m_ppSamples[--lNumberToSend]->Release();
}
if (m_hr != S_OK) {

// In any case wait for more data - S_OK just
// means there wasn't an error

DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
m_hr));
}
}

// Check for end of stream

if (pSample == EOS_PACKET) {

// We don't send even end of stream on if we've previously
// returned something other than S_OK
// This is because in that case the pin which returned
// something other than S_OK should have either sent
// EndOfStream() or notified the filter graph

if (m_hr == S_OK) {
DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
HRESULT hr = m_pPin->EndOfStream();
if (FAILED(hr)) {
DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
}
}
}

// Data from a new source

if (pSample == RESET_PACKET) {
m_hr = S_OK;
SetEvent(m_evFlushComplete);
}

if (pSample == NEW_SEGMENT) {
m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
delete ppacket;
}
}
}

// Send batched stuff anyway
void COutputQueue::SendAnyway()
{
if (!IsQueued()) {

// m_bSendAnyway is a secret hack parameter to ReceiveMultiple

m_bSendAnyway = TRUE;
LONG nProcessed;
ReceiveMultiple(NULL, 0, &nProcessed);
m_bSendAnyway = FALSE;

} else {
CAutoLock lck(this);
QueueSample(SEND_PACKET);
NotifyThread();
}
}

void
COutputQueue::NewSegment(
REFERENCE_TIME tStart,
REFERENCE_TIME tStop,
double dRate)
{
if (!IsQueued()) {
if (S_OK == m_hr) {
if (m_bBatchExact) {
SendAnyway();
}
m_pPin->NewSegment(tStart, tStop, dRate);
}
} else {
if (m_hr == S_OK) {
//
// we need to queue the new segment to appear in order in the
// data, but we need to pass parameters to it. Rather than
// take the hit of wrapping every single sample so we can tell
// special ones apart, we queue special pointers to indicate
// special packets, and we guarantee (by holding the
// critical section) that the packet immediately following a
// NEW_SEGMENT value is a NewSegmentPacket containing the
// parameters.
NewSegmentPacket * ppack = new NewSegmentPacket;
if (ppack == NULL) {
return;
}
ppack->tStart = tStart;
ppack->tStop = tStop;
ppack->dRate = dRate;

CAutoLock lck(this);
QueueSample(NEW_SEGMENT);
QueueSample( (IMediaSample*) ppack);
NotifyThread();
}
}
}


//
// End of Stream is queued to output device
//
void COutputQueue::EOS()
{
CAutoLock lck(this);
if (!IsQueued()) {
if (m_bBatchExact) {
SendAnyway();
}
if (m_hr == S_OK) {
DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
m_bFlushed = FALSE;
HRESULT hr = m_pPin->EndOfStream();
if (FAILED(hr)) {
DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
}
}
} else {
if (m_hr == S_OK) {
m_bFlushed = FALSE;
QueueSample(EOS_PACKET);
NotifyThread();
}
}
}

//
// Flush all the samples in the queue
//
void COutputQueue::BeginFlush()
{
if (IsQueued()) {
{
CAutoLock lck(this);

// block receives -- we assume this is done by the
// filter in which we are a component

// discard all queued data

m_bFlushing = TRUE;

// Make sure we discard all samples from now on

if (m_hr == S_OK) {
m_hr = S_FALSE;
}

// Optimize so we don't keep calling downstream all the time

if (m_bFlushed) {
return;
}

// Make sure we really wait for the flush to complete
m_evFlushComplete.Reset();

NotifyThread();
}

// pass this downstream

m_pPin->BeginFlush();
} else {
// pass downstream first to avoid deadlocks
m_pPin->BeginFlush();
CAutoLock lck(this);
// discard all queued data

m_bFlushing = TRUE;

// Make sure we discard all samples from now on

if (m_hr == S_OK) {
m_hr = S_FALSE;
}
}

}

//
// leave flush mode - pass this downstream
void COutputQueue::EndFlush()
{
{
CAutoLock lck(this);
ASSERT(m_bFlushing);
if (m_bFlushed && IsQueued()) {
m_bFlushing = FALSE;
m_hr = S_OK;
return;
}
}

// sync with pushing thread -- done in BeginFlush
// ensure no more data to go downstream -- done in BeginFlush
//
// Because we are synching here there is no need to hold the critical
// section (in fact we'd deadlock if we did!)

if (IsQueued()) {
m_evFlushComplete.Wait();
} else {
FreeSamples();
}

// Be daring - the caller has guaranteed no samples will arrive
// before EndFlush() returns

m_bFlushing = FALSE;
m_bFlushed = TRUE;

// call EndFlush on downstream pins

m_pPin->EndFlush();

m_hr = S_OK;
}

// COutputQueue::QueueSample
//
// private method to Send a sample to the output queue
// The critical section MUST be held when this is called

void COutputQueue::QueueSample(IMediaSample *pSample)
{
if (NULL == m_List->AddTail(pSample)) {
if (!IsSpecialSample(pSample)) {
pSample->Release();
}
}
}

//
// COutputQueue::Receive()
//
// Send a single sample by the multiple sample route
// (NOTE - this could be optimized if necessary)
//
// On return the sample will have been Release()'d
//

HRESULT COutputQueue::Receive(IMediaSample *pSample)
{
LONG nProcessed;
return ReceiveMultiple(&pSample, 1, &nProcessed);
}

//
// COutputQueue::ReceiveMultiple()
//
// Send a set of samples to the downstream pin
//
// ppSamples - array of samples
// nSamples - how many
// nSamplesProcessed - How many were processed
//
// On return all samples will have been Release()'d
//

HRESULT COutputQueue::ReceiveMultiple (
IMediaSample **ppSamples,
long nSamples,
long *nSamplesProcessed)
{
CAutoLock lck(this);
// Either call directly or queue up the samples

if (!IsQueued()) {

// If we already had a bad return code then just return

if (S_OK != m_hr) {

// If we've never received anything since the last Flush()
// and the sticky return code is not S_OK we must be
// flushing
// ((!A || B) is equivalent to A implies B)
ASSERT(!m_bFlushed || m_bFlushing);

// We're supposed to Release() them anyway!
*nSamplesProcessed = 0;
for (int i = 0; i < nSamples; i++) {
DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
nSamples, m_hr));
ppSamples[i]->Release();
}

return m_hr;
}
//
// If we're flushing the sticky return code should be S_FALSE
//
ASSERT(!m_bFlushing);
m_bFlushed = FALSE;

ASSERT(m_nBatched < m_lBatchSize);
ASSERT(m_nBatched == 0 || m_bBatchExact);

// Loop processing the samples in batches

LONG iLost = 0;
for (long iDone = 0;
iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
) {

//pragma message (REMIND("Implement threshold scheme"))
ASSERT(m_nBatched < m_lBatchSize);
if (iDone < nSamples) {
m_ppSamples[m_nBatched++] = ppSamples[iDone++];
}
if (m_nBatched == m_lBatchSize ||
nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
LONG nDone;
DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
m_nBatched));

if (m_hr == S_OK) {
m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
m_nBatched,
&nDone);
} else {
nDone = 0;
}
iLost += m_nBatched - nDone;
for (LONG i = 0; i < m_nBatched; i++) {
m_ppSamples[i]->Release();
}
m_nBatched = 0;
}
}
*nSamplesProcessed = iDone - iLost;
if (*nSamplesProcessed < 0) {
*nSamplesProcessed = 0;
}
return m_hr;
} else {
/* We're sending to our thread */

if (m_hr != S_OK) {
*nSamplesProcessed = 0;
DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
nSamples, m_hr));
for (int i = 0; i < nSamples; i++) {
ppSamples[i]->Release();
}
return m_hr;
}
m_bFlushed = FALSE;
for (long i = 0; i < nSamples; i++) {
QueueSample(ppSamples[i]);
}
*nSamplesProcessed = nSamples;
if (!m_bBatchExact ||
m_nBatched + m_List->GetCount() >= m_lBatchSize) {
NotifyThread();
}
return S_OK;
}
}

// Get ready for new data - cancels sticky m_hr
void COutputQueue::Reset()
{
if (!IsQueued()) {
m_hr = S_OK;
} else {
CAutoLock lck(this);
QueueSample(RESET_PACKET);
NotifyThread();
m_evFlushComplete.Wait();
}
}

// Remove and Release() all queued and Batched samples
void COutputQueue::FreeSamples()
{
CAutoLock lck(this);
if (IsQueued()) {
while (TRUE) {
IMediaSample *pSample = m_List->RemoveHead();
if (pSample == NULL) {
break;
}
if (!IsSpecialSample(pSample)) {
pSample->Release();
} else {
if (pSample == NEW_SEGMENT) {
// Free NEW_SEGMENT packet
NewSegmentPacket *ppacket =
(NewSegmentPacket *) m_List->RemoveHead();
ASSERT(ppacket != NULL);
delete ppacket;
}
}
}
}
for (int i = 0; i < m_nBatched; i++) {
m_ppSamples[i]->Release();
}
m_nBatched = 0;
}

// Notify the thread if there is something to do
//
// The critical section MUST be held when this is called
void COutputQueue::NotifyThread()
{
// Optimize - no need to signal if it's not waiting
ASSERT(IsQueued());
if (m_lWaiting) {
ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
m_lWaiting = 0;
}
}

// See if there's any work to do
// Returns
// TRUE if there is nothing on the queue and nothing in the batch
// and all data has been sent
// FALSE otherwise
//
BOOL COutputQueue::IsIdle()
{
CAutoLock lck(this);

// We're idle if
// there is no thread (!IsQueued()) OR
// the thread is waiting for more work (m_lWaiting != 0)
// AND
// there's nothing in the current batch (m_nBatched == 0)

if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
return FALSE;
} else {

// If we're idle it shouldn't be possible for there
// to be anything on the work queue

ASSERT(!IsQueued() || m_List->GetCount() == 0);
return TRUE;
}
}