/***************************************************************************/ /* */ /* (c) Copyright IBM Corp. 2006 All rights reserved. */ /* */ /* This sample program is owned by International Business Machines */ /* Corporation or one of its subsidiaries ("IBM") and is copyrighted */ /* and licensed, not sold. */ /* */ /* You may copy, modify, and distribute this sample program in any */ /* form without payment to IBM, for any purpose including developing, */ /* using, marketing or distributing programs that include or are */ /* derivative works of the sample program. */ /* */ /* The sample program is provided to you on an "AS IS" basis, without */ /* warranty of any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, */ /* EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED */ /* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* Some jurisdictions do not allow for the exclusion or limitation of */ /* implied warranties, so the above limitations or exclusions may not */ /* apply to you. IBM shall not be liable for any damages you suffer as */ /* a result of using, modifying or distributing the sample program or */ /* its derivatives. */ /* */ /***************************************************************************/ /* */ /* Program name: CorrelExample.java */ /* */ /* Description: Sample java program that shows use of correlation IDs in */ /* message selectors. */ /* */ /***************************************************************************/ /* */ /* Function: */ /* */ /* This program will show the use of correlation IDs in message */ /* selectors. It has three cases: */ /* */ /* a) JMS - JMS. use strings for the correlation ID. */ /* */ /* b) JMS <-> legacy. JMS application sends a request message to a legacy */ /* application, which sends a reply with the original message ID copied */ /* to the corrleation ID. Uses 'opaque' IDs provided by the vendor. */ /* */ /* c) legacy -> JMS. When the correlation ID originates in some other way */ /* than by copying the sent message, we have to construct a selector as a */ /* hex string prefixed by ID: */ /* */ /* To support case b), this class spins off a Thread to act as a server */ /* application which copies the messageID to the correlationID and posts */ /* the message back to the queue. */ /* */ /* The program is run without any parameters, and should produce output */ /* similar to the following: */ /* */ /* $ java CorrelExample */ /* put placeholder message on queue, messageID is ID:414d5120636172646875 */ /* 2e716d312020c5bbc03e20000501 */ /* */ /* */ /* Case a) arbitrary strings */ /* sent message, messageID=ID:414d51206361726468752e716d312020c5bbc03e20000502 */ /* Creating a receiver using filter: JMSCorrelationID = 'This is an arbit */ /* rary and rather long JMS String correlation ID' */ /* got correct message */ /* */ /* */ /* Case b) correlID set to messageID of request */ /* sent message, messageID=ID:414d51206361726468752e716d312020c5bbc03e20000503 */ /* Creating a receiver using filter: JMSCorrelationID = 'ID:414d512063617 */ /* 26468752e716d312020c5bbc03e20000503' */ /* Server started */ /* got correct message */ /* waiting for server to quit */ /* Server finished */ /* */ /* */ /* Case c) byte values */ /* sent message, messageID=ID:414d51206361726468752e716d312020c5bbc03e20000504 */ /* creating receiver with filter: JMSCorrelationID = 'ID:01051b64f9000000 */ /* 00000000000000000000000000000000' */ /* got correct message */ /* */ /* checking for placeholder message */ /* got placeholder message */ /* */ /***************************************************************************/ /* */ /* This program will connect to the default queue manager. If it is being */ /* used on a machine without a default queue manager, the line: */ /* */ /* // ((MQQueueConnectionFactory)qcf).setQueueManager("my.qmgr"); */ /* */ /* needs to be uncommented and the correct queue manager name specified. */ /* */ /* This program has been tested with WebSphere MQ v6.0.1.0 and Java 1.5.0. */ /* */ /***************************************************************************/ import javax.jms.*; import com.ibm.mq.jms.*; import com.ibm.mq.MQC; public class CorrelExample extends Thread { Queue requestQ; QueueConnection con; // flag for simple server shutdown mechanism boolean going = true; public CorrelExample(Queue q, QueueConnection c) { requestQ = q; con = c; } static public void main(String[] args) { QueueConnection con = null; String filter; QueueReceiver receiver; TextMessage m1; Message m2; String messageID; try { // Connection factory would usually be looked up from JNDI, but for // convenience we will construct one in-line. The default settings // will connect via bindings to the default queue manager. You can // customise the qcf for other configurations. QueueConnectionFactory qcf = new MQQueueConnectionFactory(); // ((MQQueueConnectionFactory)qcf).setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); // ((MQQueueConnectionFactory)qcf).setChannel("SYSTEM.DEF.SVRCONN"); // ((MQQueueConnectionFactory)qcf).setPort(1414); // ((MQQueueConnectionFactory)qcf).setHostName("hostname); // ((MQQueueConnectionFactory)qcf).setQueueManager("my.qmgr"); // create and start the connection con = qcf.createQueueConnection(); con.start(); // Create a session QueueSession session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // Create a temporary queue Queue queue = session.createTemporaryQueue(); // create a Sender for sending messages QueueSender sender = session.createSender(null); // Put a placeholder message on the queue, with nothing set for // correlID. This gives us something that we shouldn't receive if the // selectors work correctly, and proves that the implementation isn't // simply returning the 1st message it finds each time. m1 = session.createTextMessage("placeholder message"); sender.send(queue, m1); // remember the messageID String placeHolderMID = m1.getJMSMessageID(); System.out.println("put placeholder message on queue, messageID is "+placeHolderMID); // a) JMS - JMS, arbitrary string correlIDs System.out.println("\n\nCase a) arbitrary strings"); m1 = session.createTextMessage("test message, case a)"); String correl = "This is an arbitrary and rather long JMS String correlation ID"; m1.setJMSCorrelationID(correl); sender.send(queue, m1); // remember the messageID messageID = m1.getJMSMessageID(); System.out.println("sent message, messageID="+messageID); // Create a SQL selector for the above correlation ID. Note that for JMS // string correlation IDs, you do not need an ID: prefix. filter = "JMSCorrelationID = '"+correl+"'"; // Create a receiver using this filter System.out.println("Creating a receiver using filter: "+filter); receiver = session.createReceiver(queue, filter); // attempt to get the message m2 = receiver.receive(1000); if (m2 == null) { System.out.println("ERROR: no message received"); } else { // Get the message ID String rcvdMID = m2.getJMSMessageID(); // we can check it's the correct message by comparing the message IDs if (rcvdMID.equals(messageID)) { System.out.println("got correct message"); } else { System.out.println("ERROR: got wrong message: "+m2); } } // finished with this receiver receiver.close(); // Case b) JMS application sends to legacy server which copies message id // to correlation id of the response message. System.out.println("\n\nCase b) correlID set to messageID of request"); // For this case we need to start another thread to act as the server // application. It will receive our sent message and produce a response // message using the messageID of the request message as the correlation // ID. This is a common design pattern for server applications. // We will use another temporary queue for sending requests Queue requestQ = session.createTemporaryQueue(); CorrelExample server = new CorrelExample(requestQ, con); server.start(); m1 = session.createTextMessage("test message, case b)"); // tell the server where to send the response m1.setJMSReplyTo(queue); sender.send(requestQ, m1); // remember the messageID messageID = m1.getJMSMessageID(); System.out.println("sent message, messageID="+messageID); // Create the SQL selector using the messageID of the sent message as the // correlationID filter = "JMSCorrelationID = '"+messageID+"'"; // Create a receiver using this filter System.out.println("Creating a receiver using filter: "+filter); receiver = session.createReceiver(queue, filter); // attempt to get the message m2 = receiver.receive(10000); if (m2 == null) { System.out.println("ERROR: no message received"); } else { // Get the message ID String rcvdMID = m2.getJMSMessageID(); // In this case we can't check the messageIDs, because the reply message will // will have a different ID to the one we sent. Instead we will check for a // boolean property 'reply' which is set by the server thread. if (m2.getBooleanProperty("reply")) { System.out.println("got correct message"); } else { System.out.println("ERROR: got wrong message: "+m2); } } // finished with this receiver receiver.close(); // shutdown the server server.going = false; System.out.println("waiting for server to quit"); try { server.join(); } catch (InterruptedException e) { } // Case c) CorrelationID as bytes, used when correlation ID is set to arbitrary // values by some external application System.out.println("\n\nCase c) byte values"); m1 = session.createTextMessage("test message, case c)"); byte[] correlBytes = { 1, 5, 27, 100, -7 }; m1.setJMSCorrelationIDAsBytes(correlBytes); sender.send(queue, m1); // remember the messageID messageID = m1.getJMSMessageID(); System.out.println("sent message, messageID="+messageID); // To build a filter that can cope with a correlID set by an external // legacy App we need to use the ID: form, write 2 byte hex // representations of the bytes and pad to 24 bytes. StringBuffer fBuf = new StringBuffer("JMSCorrelationID = 'ID:"); for(int i=0; i=correlBytes.length) { fBuf.append("00"); } else { byte b = correlBytes[i]; String hexStr = Integer.toHexString(b); // -ve values produce 8 char results if (hexStr.length()>2) hexStr = hexStr.substring(hexStr.length()-2); // small values produce 1 char results if (hexStr.length()<2) fBuf.append("0"); fBuf.append(hexStr); } } fBuf.append("'"); filter = fBuf.toString(); System.out.println("creating receiver with filter: "+filter); receiver = session.createReceiver(queue, filter); m2 = receiver.receive(1000); if (m2 == null) { System.out.println("ERROR: no message received"); } else { // Get the message ID String rcvdMID = m2.getJMSMessageID(); // we can check it's the correct message by comparing the message IDs if (rcvdMID.equals(messageID)) { System.out.println("got correct message"); } else { System.out.println("ERROR: got wrong message: "+m2); } } receiver.close(); // To finish off, get the placeholder message back again System.out.println("\nchecking for placeholder message"); receiver = session.createReceiver(queue); m2 = receiver.receive(1000); if (m2 == null) { System.out.println("ERROR: no message received"); } else { // Get the message ID String rcvdMID = m2.getJMSMessageID(); // we can check it's the correct message by comparing the message IDs if (rcvdMID.equals(placeHolderMID)) { System.out.println("got placeholder message"); } else { System.out.println("ERROR: got wrong message: "+m2); } } receiver.close(); } catch (JMSException je) { System.out.println("caught "+je); Exception le = je.getLinkedException(); if (le != null) System.out.println("linked exception "+le); } finally { if (con != null) { try { con.close(); } catch (JMSException je) { } } } } /** Implementation of Thread.run This method acts as a server for case b) of the tests. */ public void run() { try { // Create a session QueueSession session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // create a receiver to accept requests QueueReceiver receiver = session.createReceiver(requestQ); // create a Sender for sending replies QueueSender sender = session.createSender(null); System.out.println("Server started"); while(going) { Message msg = receiver.receive(1000); if (msg != null) { // get the messageID for use in the correlationID of the reply String msgID = msg.getJMSMessageID(); TextMessage reply = session.createTextMessage("reply message"); reply.setJMSCorrelationID(msgID); // the main thread uses the following property to check it has got // the correct message reply.setBooleanProperty("reply", true); sender.send((Queue)msg.getJMSReplyTo(), reply); } } // close session (sender & receiver will be closed automatically) session.close(); System.out.println("Server finished"); } catch (JMSException je) { System.out.println("server thread failed with "+je); Exception le = je.getLinkedException(); if (le != null) System.out.println("linked exception "+le); } } }