/*****************************************************************************/ /* */ /* (c) Copyright IBM Corp. 2002 All rights reserved. */ /* */ /* This sample program is owned by International Business Machines */ /* Corporation or one of its subsidiaries ("IBM") and is copyrighted */ /* and licensed, not sold. */ /* */ /* You may copy, modify, and distribute this sample program in any */ /* form without payment to IBM, for any purpose including developing, */ /* using, marketing or distributing programs that include or are */ /* derivative works of the sample program. */ /* */ /* The sample program is provided to you on an "AS IS" basis, without */ /* warranty of any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, */ /* EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED */ /* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* Some jurisdictions do not allow for the exclusion or limitation of */ /* implied warranties, so the above limitations or exclusions may not */ /* apply to you. IBM shall not be liable for any damages you suffer as */ /* a result of using, modifying or distributing the sample program or */ /* its derivatives. */ /* */ /*****************************************************************************/ /* */ /* Program name: DumpBroker */ /* */ /* Description: Sample java program to show how pcf messages can be sent */ /* to the MQSeries Pub/Sub broker (SupportPac ma0c) to */ /* query the active list of subscriptions. It is based on */ /* amqspsda.c supplied with ma0c. */ /* NOTE: UnSubscribe.java is provided on this same site as */ /* a sample to aid in removing unwanted subscriptions. */ /* */ /* Function: This program connects to the requested queue manager and */ /* sends pcf messages to the broker to register for the */ /* subscription list for publishers, receive the subscriptions,*/ /* and format the output. This is repeated for the subscribers.*/ /* */ /* Prereqs: This sample program requires MQSeries SupportPacs ma0c */ /* and ms0b. */ /* */ /* Setup: Create a model definition on the queue manager where the */ /* broker resides using the following definition: */ /* def qmodel(AMQSPSD.PERMDYN.MODEL.QUEUE) replace + */ /* descr('Message Broker Dumper model queue') + */ /* deftype(permdyn) + */ /* share + */ /* defsopt(shared) */ /* */ /* This program is run as follows: */ /* */ /* java DumpBroker queuemanagername */ /* */ /* */ /* Build: */ /* This program has been tested with Sun JDK 1.3 and */ /* MQSeries V5.2 CSD4 on Windows/2000. */ /* The following jar files must be in the classpath: */ /* com.ibm.mq.pcf.jar, com.ibm.mq.jar, jta.jar, connector.jar */ /* as well as \java\lib */ /* */ /* Compile it with javac DumpBroker.java */ /* */ /*****************************************************************************/ import java.io.*; import com.ibm.mq.*; import com.ibm.mq.pcf.*; public class DumpBroker { final public static String modelQueue = "AMQSPSD.PERMDYN.MODEL.QUEUE"; final public static String commandQueue = "SYSTEM.BROKER.CONTROL.QUEUE"; final public static String brokerAdminStream = "SYSTEM.BROKER.ADMIN.STREAM"; final public static String brokerDefaultStream = "SYSTEM.BROKER.DEFAULT.STREAM"; final public static int shortWait = 8000; final public static int longWait = 300000; private MQQueueManager qMgr; private MQQueue brokerQueue; private MQQueue replyQueue; private MQMessage message; private boolean firstGet; private void init (String qMgrName) throws MQException { qMgr = new MQQueueManager (qMgrName); brokerQueue = qMgr.accessQueue (commandQueue, MQC.MQOO_OUTPUT, "", "", "mqm"); /****************************************************************/ /* The broker requires that the reply queue can accept */ /* persistent messages so the queue must be static or permanent */ /* dynamic. */ /* Set closeoptions to mqco_delete so the permdyn queue is */ /* deleted after close. */ /****************************************************************/ try { replyQueue = qMgr.accessQueue (modelQueue, MQC.MQOO_INPUT_EXCLUSIVE, "", "PCF_REPLY*", "mqm"); } catch (MQException mq) { if (mq.reasonCode == CMQC.MQRC_UNKNOWN_OBJECT_NAME) { System.err.println ("\n" + modelQueue + " must be defined for this queue manager\n"); } throw (mq); } replyQueue.closeOptions = MQC.MQCO_DELETE; message = new MQMessage (); firstGet = true; } private void setMessageHeader () throws IOException { message.clearMessage (); message.messageType = MQC.MQMT_REQUEST; message.expiry = shortWait; message.feedback = MQC.MQFB_NONE; message.format = MQC.MQFMT_PCF; message.replyToQueueName = replyQueue.name; message.encoding = MQC.MQENC_NATIVE; message.characterSet = MQC.MQCCSI_Q_MGR; } private void sendBrokerCommand (String cmdStr, PCFMessage request, int waitTime) throws IOException, MQException { MQPutMessageOptions pmo = new MQPutMessageOptions (); MQGetMessageOptions gmo = new MQGetMessageOptions (); PCFMessage response = null; MQMessage replyMsg = new MQMessage (); MQCFH cfh; PCFParameter p; /****************************************************************/ /* put the command message on the broker queue and then receive */ /* all replies that may come back. */ /****************************************************************/ brokerQueue.put (message, pmo); gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT; gmo.waitInterval = waitTime; do { replyMsg.messageId = MQC.MQMI_NONE; replyMsg.correlationId = message.messageId; replyQueue.get (replyMsg, gmo); firstGet = false; /*********************************************************/ /* The reply uses a MQCFH header */ /*********************************************************/ cfh = new MQCFH (replyMsg); if (cfh.reason == 0) { for (int i = 0; i < cfh.parameterCount; i++) { /*********************************************************/ /* dump the contents of the reply */ /*********************************************************/ System.out.println (PCFParameter.nextParameter (replyMsg)); } } else { /*********************************************************/ /* Iterate the returned error messages and screen out */ /* and report the non-expected ones */ /*********************************************************/ for (int i = 0; i < cfh.parameterCount; i++) { switch (cfh.reason) { case CMQCFC.MQRCCF_NOT_REGISTERED: case CMQCFC.MQRCCF_COMMAND_FAILED: case CMQCFC.MQRCCF_NO_RETAINED_MSG: PCFParameter.nextParameter (replyMsg); break; default: System.err.println ("PCF error processing " + cmdStr + ":\n" + cfh); System.err.println (PCFParameter.nextParameter (replyMsg)); } } } } while (cfh.control != CMQCFC.MQCFC_LAST); } private void registerSubscriber (String topic, String streamName) throws IOException, MQException { PCFMessage request = new PCFMessage (CMQCFC.MQCMD_REGISTER_SUBSCRIBER); int regOptions = CMQCFC.MQREGO_ANONYMOUS | CMQCFC.MQREGO_PUBLISH_ON_REQUEST_ONLY; request.addParameter (CMQCFC.MQCACF_TOPIC, topic); request.addParameter (CMQCFC.MQIACF_REGISTRATION_OPTIONS, regOptions); request.addParameter (CMQCFC.MQCACF_STREAM_NAME, streamName); request.addParameter (CMQC.MQCA_Q_MGR_NAME, padQManager (qMgr.name)); request.addParameter (CMQC.MQCA_Q_NAME, replyQueue.name); setMessageHeader (); request.write (message); sendBrokerCommand ("Register Subscriber", request, shortWait); } private void deRegisterSubscriber (String topic, String streamName) throws IOException, MQException { PCFMessage request = new PCFMessage (CMQCFC.MQCMD_DEREGISTER_SUBSCRIBER); int regOptions = CMQCFC.MQREGO_NONE; request.addParameter (CMQCFC.MQCACF_TOPIC, topic); request.addParameter (CMQCFC.MQIACF_REGISTRATION_OPTIONS, regOptions); request.addParameter (CMQCFC.MQCACF_STREAM_NAME, streamName); request.addParameter (CMQC.MQCA_Q_MGR_NAME, padQManager (qMgr.name)); request.addParameter (CMQC.MQCA_Q_NAME, replyQueue.name); setMessageHeader (); request.write (message); sendBrokerCommand ("Deregister Subscriber", request, shortWait); } private void requestUpdate (String topic, String streamName) throws IOException, MQException { PCFMessage request = new PCFMessage (CMQCFC.MQCMD_REQUEST_UPDATE); int regOptions = CMQCFC.MQREGO_NONE; request.addParameter (CMQCFC.MQCACF_TOPIC, topic); request.addParameter (CMQCFC.MQIACF_REGISTRATION_OPTIONS, regOptions); request.addParameter (CMQCFC.MQCACF_STREAM_NAME, streamName); request.addParameter (CMQC.MQCA_Q_MGR_NAME, padQManager (qMgr.name)); request.addParameter (CMQC.MQCA_Q_NAME, replyQueue.name); setMessageHeader (); request.write (message); sendBrokerCommand ("Request Update", request, longWait); } private void dumpPubSubData (String topic) throws IOException, MQException { MQGetMessageOptions gmo = new MQGetMessageOptions (); MQMessage replyMsg = new MQMessage (); MQCFH cfh; PCFParameter pcfp; boolean moreWork = true; int replyType = 0; int parameter = 0; gmo.options = MQC.MQGMO_NO_WAIT | MQC.MQGMO_CONVERT; /****************************************************************/ /* As the broker sends reply messages back, continue to get from*/ /* the queue until all are received. There are a number of */ /* expected responses and the formatting is based on the */ /* attribute contained in the PCFParameter of the reply. */ /****************************************************************/ do { replyMsg.messageId = MQC.MQMI_NONE; replyMsg.correlationId = MQC.MQMI_NONE; try { replyQueue.get (replyMsg, gmo); } catch (MQException mq) { if (mq.reasonCode != CMQC.MQRC_NO_MSG_AVAILABLE) { throw (mq); } moreWork = false; } if (moreWork) { cfh = new MQCFH (replyMsg); if (cfh.reason == 0) { for (int i = 0; i < cfh.parameterCount; i++) { pcfp = PCFParameter.nextParameter (replyMsg); parameter = pcfp.getParameter (); replyType = pcfp.getType (); switch (parameter) { case CMQCFC.MQCACF_TOPIC: String shortTopic = pcfp.getStringValue ().substring ( topic.length () ); System.out.println ("Topic: " + shortTopic); break; case CMQCFC.MQCACF_REG_Q_MGR_NAME: System.out.println (" RegistrationQMgrName: " + pcfp.getStringValue ()); break; case CMQCFC.MQCACF_REG_Q_NAME: System.out.println (" RegistrationQName: " + pcfp.getStringValue ()); break; case CMQCFC.MQCACF_REG_CORREL_ID: System.out.println (" RegistrationCorrellId: " + pcfp.getStringValue ()); break; case CMQCFC.MQCACF_REG_USER_ID: System.out.println (" RegistrationUserIdentifier: " + pcfp.getStringValue ()); break; case CMQCFC.MQIACF_REG_REG_OPTIONS: System.out.print (" RegistrationOptions: " + pcfp.getValue () + " : "); String sOptions = pcfp.getStringValue (); Integer iOptions = new Integer (sOptions); int regOptions = iOptions.intValue (); if ( regOptions == CMQCFC.MQREGO_NONE ) System.out.print ("MQREGO_NONE "); if ( (regOptions & CMQCFC.MQREGO_CORREL_ID_AS_IDENTITY) == CMQCFC.MQREGO_CORREL_ID_AS_IDENTITY) System.out.print ("MQREGO_CORREL_ID_AS_IDENTITY "); if ( (regOptions & CMQCFC.MQREGO_ANONYMOUS) == CMQCFC.MQREGO_ANONYMOUS) System.out.print ("MQREGO_ANONYMOUS "); if ( (regOptions & CMQCFC.MQREGO_LOCAL) == CMQCFC.MQREGO_LOCAL) System.out.print ("MQREGO_LOCAL "); if ( (regOptions & CMQCFC.MQREGO_NEW_PUBLICATIONS_ONLY) == CMQCFC.MQREGO_NEW_PUBLICATIONS_ONLY) System.out.print ("MQREGO_NEW_PUBLICATIONS_ONLY "); if ( (regOptions & CMQCFC.MQREGO_DIRECT_REQUESTS) == CMQCFC.MQREGO_DIRECT_REQUESTS) System.out.print ("MQREGO_DIRECT_REQUESTS "); if ( (regOptions & CMQCFC.MQREGO_PUBLISH_ON_REQUEST_ONLY) == CMQCFC.MQREGO_PUBLISH_ON_REQUEST_ONLY) System.out.print ("MQREGO_PUBLISH_ON_REQUEST_ONLY "); if ( (regOptions & CMQCFC.MQREGO_DEREGISTER_ALL) == CMQCFC.MQREGO_DEREGISTER_ALL) System.out.print ("MQREGO_DEREGISTER_ALL "); if ( (regOptions & CMQCFC.MQREGO_INCLUDE_STREAM_NAME) == CMQCFC.MQREGO_INCLUDE_STREAM_NAME) System.out.print ("MQREGO_INCLUDE_STREAM_NAME "); if ( (regOptions & CMQCFC.MQREGO_INFORM_IF_RETAINED) == CMQCFC.MQREGO_INFORM_IF_RETAINED) System.out.print ("MQREGO_INFORM_IF_RETAINED "); System.out.println(); break; case CMQCFC.MQCACF_REG_TIME: System.out.println (" RegistrationTime: " + pcfp.getValue () + "\n"); break; case CMQCFC.MQIACF_BROKER_COUNT: System.out.println (" BrokerCount: " + pcfp.getValue ()); break; case CMQCFC.MQIACF_ANONYMOUS_COUNT: System.out.println (" AnonymousCount: " + pcfp.getValue ()); break; case CMQCFC.MQIACF_APPL_COUNT: System.out.println (" ApplCount: " + pcfp.getValue ()); break; default: break; } } } else { System.out.println ("PCF error:\n" + cfh); for (int i = 0; i < cfh.parameterCount; i++) { System.out.println (PCFParameter.nextParameter (replyMsg)); } } } } while (moreWork); } private void dumpPublisherOrSubscriber (String pubOrSub) throws IOException, MQException { String streamName = brokerDefaultStream; String topic; boolean registered = false; /********************************************************/ /* Send a bogus command to the broker to see if we can */ /* communicate with it. The error is caught in */ /* in SendBrokerCommand */ /********************************************************/ deRegisterSubscriber ("Test", streamName); System.out.println ("\n" + pubOrSub); System.out.println ("--------------"); System.out.println ("Stream Name: " + streamName); topic = new String("MQ/S/" + padQManager (qMgr.name) + "/" + pubOrSub + "/Identities/"); /********************************************************/ /* Subscribe on Identities/*. This produces a list of */ /* active subscriptions. */ /********************************************************/ registerSubscriber (topic + "*", streamName); registered = true; requestUpdate (topic + "*", streamName); dumpPubSubData(topic); if (registered) { deRegisterSubscriber(topic + "*", streamName); registered = false; } } public void processSubscriptions (String qMgrName) throws IOException { try { /********************************************************/ /* Turn off reporting that is not controlled by this */ /* application. */ /********************************************************/ MQEnvironment.disableTracing (); MQException.log = null; init (qMgrName); dumpPublisherOrSubscriber ("Publishers"); dumpPublisherOrSubscriber ("Subscribers"); qMgr.disconnect (); } catch (MQException mq) { /********************************************************/ /* All MQ errors are thrown back to this level so the */ /* cleanup (disconnect) can be done in one place */ /********************************************************/ if (firstGet && mq.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) { System.err.println ("\nUnable to communicate with the broker. is it running?\n"); } else { System.err.println ("An MQSeries error occurred. Completion code: " + mq.completionCode + " Reason code: " + mq.reasonCode + "\n\n"); } try { qMgr.disconnect (); } catch (MQException mqe) { } } } private String padQManager (String qMgrName) { StringBuffer paddedName = new StringBuffer (" "); paddedName = paddedName.insert (0,qMgrName); paddedName.setLength (CMQC.MQ_Q_MGR_NAME_LENGTH); return paddedName.toString (); } public static void main (String [] args) { DumpBroker dumpBroker; try { if (args.length != 1) { System.err.println ("Syntax: dumpBroker queueManagerName"); System.exit(0); } dumpBroker = new DumpBroker (); dumpBroker.processSubscriptions (args[0]); System.exit(0); } catch (ArrayIndexOutOfBoundsException abe) { System.err.println ("Syntax: DumpBroker queueManagerName"); abe.printStackTrace(); } catch (Exception e) { System.err.println (e); e.printStackTrace(); } } }