Figure 2    IQueue Interface Methods

Method Description
Open Opens a queue specified by pathname or format name for sending or receiving messages
Close Closes a queue
GetTimeSent Returns the time that the most recently received message was sent to the queue (corresponds to the message's PROPID_M_SENTTIME)
GetTimeArrived Returns the time that the most recently received message arrived in the queue (corresponds to the message's PROPID_M_ARRIVEDTIME)
Send Sends a message to the queue
AwaitMessage Waits for messages to arrive in the queue or for a timeout interval to elapse
Receive Gets a message from the queue
FreeBuffer Frees a buffer acquired by Receive


Figure 6   CQueue::Open

//===========================
// CQueue::Open
//===========================

STDMETHODIMP CQueue::Open(LPCWSTR pwszQueueOrFormatName, DWORD dwAccess)
{
    Close();

    HRESULT hr = S_OK;

    // Save input name
    m_pwszQueueOrFormatName = _wcsdup(pwszQueueOrFormatName); 

    // Obtain queue's Format Name

    // Input name is already a format name
    if (IsFormatName(pwszQueueOrFormatName)) 
    {
        // Use caller's name "as is" without conversion to a format name
        m_pwszFormatName = _wcsdup(pwszQueueOrFormatName);
    }
    else// queue name is taken to be a path name
    {
        // Convert path name to Format Name, using the MQIS

        // MSMQ doesn't document a maximum FORMAT_NAME_SIZE, so we
        // make a reasonable guess and provide an expanding buffer.

        DWORD dwNumChars = FORMAT_NAME_SIZE;

        do {
            delete [] m_pwszFormatName;
            m_pwszFormatName = new WCHAR[dwNumChars];

            hr = MQPathNameToFormatName(pwszQueueOrFormatName, m_pwszFormatName, &dwNumChars);
            if (hr) ReportHR(hr, "MQPathNameToFormatName");

        } while (hr == MQ_ERROR_FORMATNAME_BUFFER_TOO_SMALL);

    }    // else queue name is a path name

    // Now open the queue
    if (SUCCEEDED(hr))
    {
        hr = MQOpenQueue(m_pwszFormatName, dwAccess, MQ_DENY_NONE, &m_qh);
        if (hr) ReportHR(hr, "MQOpenQueue");
    }
    return hr;
}

Figure 7    CQueue::Send

//===========================
// CQueue::Send
//===========================

STDMETHODIMP CQueue::Send(const BYTE * pbBuffer, DWORD dwcbBuffer)
{
    _ASSERTE(pbBuffer);
    _ASSERTE(m_qh);

    const int cProperties = 1;    // number of message properties

    MSGPROPID      rgMsgPropId[cProperties];      // property ids
    MQPROPVARIANT  rgMqPropVariant[cProperties];  // property variants
    HRESULT        rghr[cProperties];

    MQMSGPROPS    MqMsgProps = {cProperties, rgMsgPropId, rgMqPropVariant,
                                rghr};

    rgMsgPropId[0]                  = PROPID_M_BODY;
    rgMqPropVariant[0].vt           = VT_UI1 | VT_VECTOR;
    rgMqPropVariant[0].caub.pElems  = const_cast<BYTE *>(pbBuffer);
    rgMqPropVariant[0].caub.cElems  = dwcbBuffer;
    rghr[0]                         = S_OK;

    // Develop the right value for the pTransaction parameter.

    // Note: This code assumes that the queue is a transacted queue.
    // We could extract the queue's transactional attribute with
    // MQGetQueueProperties(), but that requires MQIS access.

    ITransaction * pTransaction = 
        IsMTSContextAvailable() ? MQ_MTS_TRANSACTION : MQ_SINGLE_MESSAGE;

    HRESULT hr = MQSendMessage(m_qh, &MqMsgProps, pTransaction);
    if (hr) ReportHR(hr, "MQSendMessage");
    return hr;
}

Figure 11    QueueServer

//====================================
// QueueServer main
//===================================

HRESULT _cdecl main()
{
    // initialize COM library
    HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);    
    ReportHR(hr, "CoInitializeEx");
    if(SUCCEEDED(hr))
    {
        // Create new Queue COM Object

        CComPtr<IQueue> pIQueue;
        hr = CoCreateInstance(CLSID_Queue, NULL, CLSCTX_INPROC_SERVER,
                              IID_IQueue, (void **) &pIQueue);
        ReportHR(hr, "CoCreateInstance CLSID_Queue");
        if (SUCCEEDED(hr))
        {
            // Open input queue
            hr = pIQueue->Open(L".\\MSJQueue", MQ_RECEIVE_ACCESS);
            ReportHR(hr, "pIQueue->Open");
            if (SUCCEEDED(hr))
            {
                // Create a COM MessageServer object to process
                // messages transactionally.

                CComPtr<IMessageServer> pIMessageServer;
                hr = CoCreateInstance(CLSID_MessageServer, NULL,
                                      CLSCTX_INPROC_SERVER,
                                      IID_IMessageServer,
                                      (void **) &pIMessageServer);
                ReportHR(hr, "CoCreateInstance CLSID_MessageServer");

                while(SUCCEEDED(hr))
                {
                    // Wait for a message to arrive, or for 60 seconds
                    hr = pIQueue->AwaitMessage(60000);
                    if (SUCCEEDED(hr))
                    {
                        ReportHR(hr, "pIQueue->AwaitMessage");

                        // Process the message inside a transaction.
                        
                        hr = pIMessageServer->DoWork(pIQueue);
                        ReportHR(hr, "pIMessageServer->DoWork");
                    }
                    else if (hr == MQ_ERROR_IO_TIMEOUT)
                    {
                        // Treat queue timeout as a retriable warning
                        _RPTF0(_CRT_WARN, "pIQueue->AwaitMessage timeout\n");
                        hr = S_OK;
                    }
                    else    // any unexpected error
                        ReportHR(hr, "pIQueue->AwaitMessage");
                }
            }
        }
        CoUninitialize();
    }
    return hr;
}

Figure 12    CQueue::AwaitMessage

//===========================
// CQueue::AwaitMessage
//===========================

STDMETHODIMP CQueue::AwaitMessage(DWORD dwTimeOut)
{
    _ASSERTE(m_qh);

    const int cProperties = 1;    // number of message properties

    MSGPROPID        rgMsgPropId[cProperties];        // property ids
    MQPROPVARIANT    rgMqPropVariant[cProperties];    // property variants
    HRESULT            rghr[cProperties];             // status results

    MQMSGPROPS    MqMsgProps = {cProperties, rgMsgPropId, rgMqPropVariant, 
                                rghr};

    rgMsgPropId[0]            = PROPID_M_BODY_SIZE;
    rgMqPropVariant[0].vt    = VT_UI4;
    rghr[0]                    = S_OK;

    // Wait for a message to arrive.  Fetch just its body size.
    // We can't wait within a transaction.

    HRESULT hr = MQReceiveMessage(m_qh,                   // hSource
                                  dwTimeOut,              // dwTimeout
                                  MQ_ACTION_PEEK_CURRENT, // dwAction
                                  &MqMsgProps,            // pMessageProps
                                  NULL,                   // lpOverlapped
                                  NULL,                   // fnReceiveCallback
                                  NULL,                   // hCursor
                                  MQ_NO_TRANSACTION);     // pTransaction
        
    // save message length hint
    m_dwcbMessageBody = rgMqPropVariant[0].ulVal;    

    // Treat a Receive timeout error as a warning.

    if (hr == MQ_ERROR_IO_TIMEOUT)
        _RPTF0(_CRT_WARN, "CQueue::AwaitMessage MQReceiveMessage MQ_ERROR_IO_TIMEOUT\n");
    else if (hr) ReportHR(hr, "CQueue::AwaitMessage MQReceiveMessage");

    return hr; 
}


Figure 13   CMessageServer::DoWork  

STDMETHODIMP CMessageServer::DoWork(IUnknown * punkIQueue)
{
    HRESULT hr = E_NOINTERFACE;
    CComQIPtr<IQueue, &IID_IQueue> pIQueue(punkIQueue);
    if (pIQueue)
    {
        BYTE *     pbBuffer = 0;
        DWORD     dwcbBuffer = 0;
        hr = pIQueue->Receive(&pbBuffer, &dwcbBuffer);
        if (hr != S_OK) DisplayHR(hr, "pIQueue->Receive");
        if (SUCCEEDED(hr))
        {
            DWORD dwtimeSent, dwtimeArrived;
            pIQueue->GetTimeSent(&dwtimeSent);
            pIQueue->GetTimeArrived(&dwtimeArrived);

            // Process the input message

            hr = ProcessBuffer(pbBuffer, dwcbBuffer, dwtimeSent, 
                               dwtimeArrived);

            if (hr != S_OK) DisplayHR(hr, "ProcessBuffer");

            // Free the input message buffer
            pIQueue->FreeBuffer(pbBuffer);
            pbBuffer = 0;
        }
    }

    CComPtr<IObjectContext> pIObjectContext;
    HRESULT hrGetContext = GetObjectContext(&pIObjectContext);

    if (SUCCEEDED(hrGetContext))
    {
        // SetComplete or SetAbort based on results
        if (SUCCEEDED(hr))
        {
            HRESULT hrSetComplete = pIObjectContext->SetComplete();
            if (hrSetComplete != S_OK) DisplayHR(hrSetComplete, "SetComplete");
        }
        else
        {
            HRESULT hrSetAbort = pIObjectContext->SetAbort();
            if (hrSetAbort != S_OK) DisplayHR(hrSetAbort, "SetAbort");
        }
    }
    return hr;
}