Method | Description |
Open | Opens a queue specified by pathname or format name for sending or receiving messages |
Close | Closes a queue |
GetTimeSent | Returns the time that the most recently received message was sent to the queue (corresponds to the message's PROPID_M_SENTTIME) |
GetTimeArrived | Returns the time that the most recently received message arrived in the queue (corresponds to the message's PROPID_M_ARRIVEDTIME) |
Send | Sends a message to the queue |
AwaitMessage | Waits for messages to arrive in the queue or for a timeout interval to elapse |
Receive | Gets a message from the queue |
FreeBuffer | Frees a buffer acquired by Receive |
Figure 6 CQueue::Open
//===========================
// CQueue::Open
//===========================
STDMETHODIMP CQueue::Open(LPCWSTR pwszQueueOrFormatName, DWORD dwAccess)
{
Close();
HRESULT hr = S_OK;
// Save input name
m_pwszQueueOrFormatName = _wcsdup(pwszQueueOrFormatName);
// Obtain queue's Format Name
// Input name is already a format name
if (IsFormatName(pwszQueueOrFormatName))
{
// Use caller's name "as is" without conversion to a format name
m_pwszFormatName = _wcsdup(pwszQueueOrFormatName);
}
else// queue name is taken to be a path name
{
// Convert path name to Format Name, using the MQIS
// MSMQ doesn't document a maximum FORMAT_NAME_SIZE, so we
// make a reasonable guess and provide an expanding buffer.
DWORD dwNumChars = FORMAT_NAME_SIZE;
do {
delete [] m_pwszFormatName;
m_pwszFormatName = new WCHAR[dwNumChars];
hr = MQPathNameToFormatName(pwszQueueOrFormatName, m_pwszFormatName, &dwNumChars);
if (hr) ReportHR(hr, "MQPathNameToFormatName");
} while (hr == MQ_ERROR_FORMATNAME_BUFFER_TOO_SMALL);
} // else queue name is a path name
// Now open the queue
if (SUCCEEDED(hr))
{
hr = MQOpenQueue(m_pwszFormatName, dwAccess, MQ_DENY_NONE, &m_qh);
if (hr) ReportHR(hr, "MQOpenQueue");
}
return hr;
}
Figure 7 CQueue::Send
//===========================
// CQueue::Send
//===========================
STDMETHODIMP CQueue::Send(const BYTE * pbBuffer, DWORD dwcbBuffer)
{
_ASSERTE(pbBuffer);
_ASSERTE(m_qh);
const int cProperties = 1; // number of message properties
MSGPROPID rgMsgPropId[cProperties]; // property ids
MQPROPVARIANT rgMqPropVariant[cProperties]; // property variants
HRESULT rghr[cProperties];
MQMSGPROPS MqMsgProps = {cProperties, rgMsgPropId, rgMqPropVariant,
rghr};
rgMsgPropId[0] = PROPID_M_BODY;
rgMqPropVariant[0].vt = VT_UI1 | VT_VECTOR;
rgMqPropVariant[0].caub.pElems = const_cast<BYTE *>(pbBuffer);
rgMqPropVariant[0].caub.cElems = dwcbBuffer;
rghr[0] = S_OK;
// Develop the right value for the pTransaction parameter.
// Note: This code assumes that the queue is a transacted queue.
// We could extract the queue's transactional attribute with
// MQGetQueueProperties(), but that requires MQIS access.
ITransaction * pTransaction =
IsMTSContextAvailable() ? MQ_MTS_TRANSACTION : MQ_SINGLE_MESSAGE;
HRESULT hr = MQSendMessage(m_qh, &MqMsgProps, pTransaction);
if (hr) ReportHR(hr, "MQSendMessage");
return hr;
}
Figure 11 QueueServer
//====================================
// QueueServer main
//===================================
HRESULT _cdecl main()
{
// initialize COM library
HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
ReportHR(hr, "CoInitializeEx");
if(SUCCEEDED(hr))
{
// Create new Queue COM Object
CComPtr<IQueue> pIQueue;
hr = CoCreateInstance(CLSID_Queue, NULL, CLSCTX_INPROC_SERVER,
IID_IQueue, (void **) &pIQueue);
ReportHR(hr, "CoCreateInstance CLSID_Queue");
if (SUCCEEDED(hr))
{
// Open input queue
hr = pIQueue->Open(L".\\MSJQueue", MQ_RECEIVE_ACCESS);
ReportHR(hr, "pIQueue->Open");
if (SUCCEEDED(hr))
{
// Create a COM MessageServer object to process
// messages transactionally.
CComPtr<IMessageServer> pIMessageServer;
hr = CoCreateInstance(CLSID_MessageServer, NULL,
CLSCTX_INPROC_SERVER,
IID_IMessageServer,
(void **) &pIMessageServer);
ReportHR(hr, "CoCreateInstance CLSID_MessageServer");
while(SUCCEEDED(hr))
{
// Wait for a message to arrive, or for 60 seconds
hr = pIQueue->AwaitMessage(60000);
if (SUCCEEDED(hr))
{
ReportHR(hr, "pIQueue->AwaitMessage");
// Process the message inside a transaction.
hr = pIMessageServer->DoWork(pIQueue);
ReportHR(hr, "pIMessageServer->DoWork");
}
else if (hr == MQ_ERROR_IO_TIMEOUT)
{
// Treat queue timeout as a retriable warning
_RPTF0(_CRT_WARN, "pIQueue->AwaitMessage timeout\n");
hr = S_OK;
}
else // any unexpected error
ReportHR(hr, "pIQueue->AwaitMessage");
}
}
}
CoUninitialize();
}
return hr;
}
Figure 12 CQueue::AwaitMessage
//===========================
// CQueue::AwaitMessage
//===========================
STDMETHODIMP CQueue::AwaitMessage(DWORD dwTimeOut)
{
_ASSERTE(m_qh);
const int cProperties = 1; // number of message properties
MSGPROPID rgMsgPropId[cProperties]; // property ids
MQPROPVARIANT rgMqPropVariant[cProperties]; // property variants
HRESULT rghr[cProperties]; // status results
MQMSGPROPS MqMsgProps = {cProperties, rgMsgPropId, rgMqPropVariant,
rghr};
rgMsgPropId[0] = PROPID_M_BODY_SIZE;
rgMqPropVariant[0].vt = VT_UI4;
rghr[0] = S_OK;
// Wait for a message to arrive. Fetch just its body size.
// We can't wait within a transaction.
HRESULT hr = MQReceiveMessage(m_qh, // hSource
dwTimeOut, // dwTimeout
MQ_ACTION_PEEK_CURRENT, // dwAction
&MqMsgProps, // pMessageProps
NULL, // lpOverlapped
NULL, // fnReceiveCallback
NULL, // hCursor
MQ_NO_TRANSACTION); // pTransaction
// save message length hint
m_dwcbMessageBody = rgMqPropVariant[0].ulVal;
// Treat a Receive timeout error as a warning.
if (hr == MQ_ERROR_IO_TIMEOUT)
_RPTF0(_CRT_WARN, "CQueue::AwaitMessage MQReceiveMessage MQ_ERROR_IO_TIMEOUT\n");
else if (hr) ReportHR(hr, "CQueue::AwaitMessage MQReceiveMessage");
return hr;
}
Figure 13 CMessageServer::DoWork
STDMETHODIMP CMessageServer::DoWork(IUnknown * punkIQueue)
{
HRESULT hr = E_NOINTERFACE;
CComQIPtr<IQueue, &IID_IQueue> pIQueue(punkIQueue);
if (pIQueue)
{
BYTE * pbBuffer = 0;
DWORD dwcbBuffer = 0;
hr = pIQueue->Receive(&pbBuffer, &dwcbBuffer);
if (hr != S_OK) DisplayHR(hr, "pIQueue->Receive");
if (SUCCEEDED(hr))
{
DWORD dwtimeSent, dwtimeArrived;
pIQueue->GetTimeSent(&dwtimeSent);
pIQueue->GetTimeArrived(&dwtimeArrived);
// Process the input message
hr = ProcessBuffer(pbBuffer, dwcbBuffer, dwtimeSent,
dwtimeArrived);
if (hr != S_OK) DisplayHR(hr, "ProcessBuffer");
// Free the input message buffer
pIQueue->FreeBuffer(pbBuffer);
pbBuffer = 0;
}
}
CComPtr<IObjectContext> pIObjectContext;
HRESULT hrGetContext = GetObjectContext(&pIObjectContext);
if (SUCCEEDED(hrGetContext))
{
// SetComplete or SetAbort based on results
if (SUCCEEDED(hr))
{
HRESULT hrSetComplete = pIObjectContext->SetComplete();
if (hrSetComplete != S_OK) DisplayHR(hrSetComplete, "SetComplete");
}
else
{
HRESULT hrSetAbort = pIObjectContext->SetAbort();
if (hrSetAbort != S_OK) DisplayHR(hrSetAbort, "SetAbort");
}
}
return hr;
}