Our Transfer
object has the following interface (called ITransfer
):
Method | Parameters | Description |
PutOnQueue | [in] BSTR bstrItem | Take selected item out of database, and put it on queue |
TakeOffQueue | [in] BSTR bstrQueue, [in] BSTR bstrItem | Take selected item off queue and put into database |
It’s also got one property:
Property | Type | Description |
DBName | BSTR | Connection string for database |
Notice, incidentally, that this is a bona fide middle-tier object. There’s no persistent data here; it’s simply a component that gets called into action to carry out a specific job, following which it is destroyed.
Here is the code for the PutOnQueue()
method. We start off by using some ADO calls to open up a connection to the database. Notice our old friend the object context making a reappearance. With this, we’ll abort the transaction in case of any serious failure.
STDMETHODIMP CTransfer::PutOnQueue(BSTR bstrItem)
{
CComPtr<IObjectContext> pContext;
HRESULT hr = GetObjectContext(&pContext);
if (FAILED(hr))
return hr;
CComPtr<ADOConnection> pConnection;
pContext->CreateInstance(CLSID_CADOConnection,
IID_IADOConnection,
reinterpret_cast<void**>(&pConnection));
CComBSTR bstrConnection;
get_DBName(&bstrConnection);
hr = pConnection->Open(bstrConnection, NULL, NULL, -1);
if (FAILED(hr))
{
pContext->SetAbort();
return hr;
}
Now we find the required record.
CComPtr<ADORecordset> pRecordset;
pContext->CreateInstance(CLSID_CADORecordset,
IID_IADORecordset,
reinterpret_cast<void**>(&pRecordset));
WCHAR szCommand[512];
swprintf(szCommand, L"SELECT * FROM checklist WHERE description = '%s'",
bstrItem);
CComVariant vCommand(szCommand);
CComVariant vConnection(pConnection);
hr = pRecordset->Open(vCommand, vConnection, adOpenKeyset,
adLockPessimistic, adCmdText);
if (FAILED(hr))
{
pContext->SetAbort();
return hr;
}
Now let’s check to see if the Beanie Baby is currently wanted. If it isn’t wanted, that means that we’ve already got it, so there’s no point in offering it up for others to search for. In this case, we’ll abort the transaction:
CComPtr<ADOFields> pFields;
pRecordset->get_Fields(&pFields);
long count;
pFields->get_Count(&count);
for (long field = 0; field < count; field++)
{
CComVariant vField(field);
CComPtr<ADOField> pField;
pFields->get_Item(vField, &pField);
CComBSTR bstrName;
pField->get_Name(&bstrName);
if (wcscmp(bstrName, L"wanted"))
continue;
CComVariant vValue;
pField->get_Value(&vValue);
if (vValue.boolVal == VARIANT_FALSE)
{
pContext->SetAbort();
return S_OK;
}
}
If we get this far, it isn't in our collection, so let’s delete it from our local database. No need to panic about rollback, because we’re part of a transaction here.
hr = pRecordset->Delete(adAffectCurrent);
if (FAILED(hr))
{
pContext->SetAbort();
return hr;
}
OK, time for the message queue stuff. Let’s find our outgoing queue, and open it up for sending.
CComPtr<IMSMQQueueInfo> pInfo;
pContext->CreateInstance(CLSID_MSMQQueueInfo,
IID_IMSMQQueueInfo,
reinterpret_cast<void**>(&pInfo));
TCHAR szMachine[MAX_COMPUTERNAME_LENGTH + 1];
ULONG length = MAX_COMPUTERNAME_LENGTH;
GetComputerName(szMachine, &length);
CComBSTR bstrMachine(szMachine);
bstrMachine.Append("\\offered");
pInfo->put_PathName(bstrMachine);
CComPtr<IMSMQQueue> pQueue;
hr = pInfo->Open(MQ_SEND_ACCESS, NULL, &pQueue);
if (FAILED(hr))
{
pContext->SetAbort();
return hr;
}
Now we create the message:
CComPtr<IMSMQMessage> pMessage;
pContext->CreateInstance(CLSID_MSMQMessage,
IID_IMSMQMessage,
reinterpret_cast<void**>(&pMessage));
pMessage->put_Label(bstrItem);
VARIANT vTrans;
V_VT(&vTrans) = VT_UI4;
V_UI4(&vTrans) = MQ_MTS_TRANSACTION;
hr = pMessage->Send(pQueue, &vTrans);
Hold on, what’s this? We’re setting the second parameter in the Send()
to indicate that it’s a transactional message. This is really the only thing that is seriously different about this use of the message queuing software:
OK, we’ve reached the point of no return. If the message has successfully been put on the queue, we can commit. Otherwise, let’s abort and let MTS unwind us back to where we were before. (Incidentally, what sort of things can go wrong? Well, maybe we have a disconnected client or the local site server has gone down, and we can’t see the queue configuration any more. Or perhaps we’ve failed to make the queue transactional — this is a neat way of testing whether or not the rollback is working, by the way.)
if (FAILED(hr))
pContext->SetAbort();
else
pContext->SetComplete();
return hr;
}
The TakeOffQueue()
method does pretty much the same sort of thing as PutOnQueue()
, except the other way around, and I won’t waste valuable trees by including it here. However, I will include the code that handles getting the property DBName
:
STDMETHODIMP CTransfer::get_DBName(BSTR* pVal)
{
TCHAR szMachine[MAX_COMPUTERNAME_LENGTH + 1];
ULONG length = MAX_COMPUTERNAME_LENGTH;
GetComputerName(szMachine, &length);
for (ULONG iChar = 0; iChar < length; iChar++)
if (szMachine[iChar] == '-')
szMachine[iChar] = '_';
CComBSTR bstrMachine(szMachine);
WCHAR szName[128];
swprintf(szName, L"FILEDSN=WPSamples;Database=%s;UID=sa;PWD=;",
bstrMachine);
*pVal = SysAllocString(szName);
return S_OK;
}
This constructs a database name that we can pass to the ADO Connection
Open()
method, based on the client’s machine name. (Notice that SQL Server doesn’t like hyphens!)
Don’t forget to specify Requires a new transaction or Requires a transaction when you insert the component into MTS.
So there we are. That’s what transactional message queue operations look like, and frankly, they really aren’t that much different from ordinary message queue operations.
What else can MTS do for us? I didn’t include it in this example, but it’s an interesting experiment to try sending two or more messages in the same transaction. If you do this, MSMQ is very fussy about preserving the order of the messages, and won’t let you take them off the queue out of sequence.
That’s all I’m going to say about MSMQ and MTS — a pretty powerful twosome, if you ask me. What I’m sure you’re dying to know more about is using MSMQ as a transport layer for DCOM. This is what Microsoft aims to do at some point in the future, in order to provide support for one-way asynchronous methods. However, we’re impatient people, and we want it now. Remember the streaming and marshaling stuff back in Chapter 3? I think it’s about time we finished it.