/*****************************************************************************/ /* */ /* (c) Copyright IBM Corp. 2002 All rights reserved. */ /* */ /* This sample program is owned by International Business Machines */ /* Corporation or one of its subsidiaries ("IBM") and is copyrighted */ /* and licensed, not sold. */ /* */ /* You may copy, modify, and distribute this sample program in any */ /* form without payment to IBM, for any purpose including developing, */ /* using, marketing or distributing programs that include or are */ /* derivative works of the sample program. */ /* */ /* The sample program is provided to you on an "AS IS" basis, without */ /* warranty of any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, */ /* EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED */ /* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* Some jurisdictions do not allow for the exclusion or limitation of */ /* implied warranties, so the above limitations or exclusions may not */ /* apply to you. IBM shall not be liable for any damages you suffer as */ /* a result of using, modifying or distributing the sample program or */ /* its derivatives. */ /* */ /*****************************************************************************/ /* */ /* Program name: UnSubscribe */ /* */ /* Description: Sample java program to show how orphaned subscription */ /* registrations can be removed from the Pub/Sub broker */ /* (SupportPac ma0c). To get a list of potential orphaned */ /* registrations, use amqspsd supplied with ma0c or sample */ /* program DumpBroker.java */ /* */ /* Function: This program collects user data uniquely identifying the */ /* subscription to remove and then formats and sends the */ /* request to the pub/sub broker. The user data is entered as */ /* parameters or read from a file. */ /* */ /* Prereqs: This sample program requires MQSeries SupportPac ma0c. */ /* */ /* Setup: Create a model definition on the queue manager where the */ /* broker resides using the following definition: */ /* def qmodel(AMQSPSD.PERMDYN.MODEL.QUEUE) replace + */ /* descr('Message Broker Dumper model queue') + */ /* deftype(permdyn) + */ /* share + */ /* defsopt(shared) */ /* */ /* This program is run as follows: */ /* */ /* java -Df=inputfile UnSubscribe queuemanagername */ /* or */ /* java -Dt=topic -Dq=queuename <-Dc=correlid> */ /* <-Ds=streamname> <-Dp=puborsub> <-Dm=qmgrname> */ /* queuemanagername UnSubscribe */ /* */ /* where registration to unsubscribe is uniquely identified */ /* by topic and queuename. Optional parameters are */ /* correlationid (default is no correlid) */ /* streamname (default is SYSTEM.DEFAULT.BROKER.STREAM) */ /* puborsub (default is Subscribers) */ /* qmgrname (default is queuemanagername) */ /* */ /* inputfile is the output from amqspsd or DumpBroker.java */ /* that is redirected to inputfile. The filename can be */ /* edited to remove registrations that should not be */ /* unsubscribed. If an entry is removed from the file, the */ /* complete entry must be removed. For example remove the */ /* RegistrationQMgrName, Qname, UserIdentifier, etc. */ /* If it is the only entry for a topic, remove the entire */ /* topic block. Do not change the order that items appear in */ /* the file. A subscription is only deleted after reading */ /* RegistrationQMgrName, RegistrationQName, */ /* RegistrationCorrelId (optional) and RegistrationOptions. */ /* If any of the 3 required fields is missing, the */ /* subscription is not deregistered. */ /* */ /* queuemanagername is queue manager where broker is running */ /* */ /* Build: */ /* This program has been tested with Sun JDK 1.3 and */ /* MQSeries V5.2 CSD4 on Windows/2000. */ /* The following jar files must be in the classpath: */ /* com.ibm.mq.jar, jta.jar, connector.jar */ /* as well as \java\lib */ /* */ /* Compile it with javac UnSubscribe.java */ /* */ /*****************************************************************************/ import java.io.*; import java.util.*; import com.ibm.mq.*; public class UnSubscribe { /****************************************************************/ /* RFH Defines needed from cmqc.h */ /****************************************************************/ final public static String MQFMT_RF_HEADER = "MQHRF "; final public static byte [] MQRFH_STRUC_ID = {'R','F','H',' '}; final public static byte [] MQFMT_STRING_ARRAY = {'M','Q','S','T','R',' ',' ',' '}; final public static int MQRFH_VERSION_1 = 1; final public static int MQRFH_STRUC_LENGTH_FIXED = 32; final public static int MQRFH_NONE = 0x00000000; final public static int MQRC_UNKNOWN_OBJECT_NAME = 2085; final public static int MQRC_NO_MSG_AVAILABLE = 2033; /****************************************************************/ /* MQPS Defines needed from cmqpsc.h */ /****************************************************************/ final public static String MQPS_COMMAND_B = "MQPSCommand "; final public static String MQPS_REGISTRATION_OPTIONS_B = " MQPSRegOpts "; final public static String MQPS_STREAM_NAME_B = " MQPSStreamName "; final public static String MQPS_TOPIC_B = " MQPSTopic "; final public static String MQPS_Q_MGR_NAME_B = " MQPSQMgrName "; final public static String MQPS_Q_NAME_B = " MQPSQName "; final public static String MQPS_DEREGISTER_SUBSCRIBER = "DeregSub"; /****************************************************************/ /* Class defines */ /****************************************************************/ final public static String modelQueue = "AMQSPSD.PERMDYN.MODEL.QUEUE"; final public static String commandQueue = "SYSTEM.BROKER.CONTROL.QUEUE"; final public static String brokerDefaultStream = "SYSTEM.BROKER.DEFAULT.STREAM"; final public static int shortWait = 3000; final public static int longWait = 300000; final public static int qMgrNameLength = 48; final public static int correlIDLength = 24; /****************************************************************/ /* Instance variables */ /****************************************************************/ private MQQueueManager qMgr; private MQQueue brokerQueue; private MQQueue replyQueue; private boolean firstGet; private int goodCnt; private int badCnt; private void init (String qMgrName) throws MQException { qMgr = new MQQueueManager (qMgrName); brokerQueue = qMgr.accessQueue (commandQueue, MQC.MQOO_OUTPUT, "", "", "mqm"); /****************************************************************/ /* The broker requires that the reply queue can accept */ /* persistent messages so the queue must be static or permanent */ /* dynamic. */ /* Set closeoptions to mqco_delete so the permdyn queue is */ /* deleted after close. */ /****************************************************************/ try { replyQueue = qMgr.accessQueue (modelQueue, MQC.MQOO_INPUT_EXCLUSIVE, "", "BRK_REPLY*", "mqm"); } catch (MQException mq) { if (mq.reasonCode == MQRC_UNKNOWN_OBJECT_NAME) { System.err.println ("\n" + modelQueue + " must be defined for this queue manager\n"); } throw (mq); } replyQueue.closeOptions = MQC.MQCO_DELETE; firstGet = true; goodCnt = 0; badCnt = 0; } private int sendBrokerCommand (String requestStr, String correlID) throws IOException, MQException { MQPutMessageOptions pmo = new MQPutMessageOptions (); MQGetMessageOptions gmo = new MQGetMessageOptions (); MQMessage requestMsg = new MQMessage (); MQMessage replyMsg = new MQMessage (); pmo.options = MQC.MQPMO_NEW_MSG_ID; requestMsg.messageType = MQC.MQMT_REQUEST; requestMsg.expiry = longWait; requestMsg.feedback = MQC.MQFB_NONE; requestMsg.format = MQFMT_RF_HEADER; requestMsg.replyToQueueName = replyQueue.name; requestMsg.encoding = MQC.MQENC_NATIVE; requestMsg.characterSet = MQC.MQCCSI_Q_MGR; /*******************************************************************/ /* Note that the correl id is not added as name/value pair. */ /* If MQPSRegOpts says use correlid, it uses this one in the MQMD */ /*******************************************************************/ if (correlID != null) { requestMsg.correlationId = formatCorrelID (correlID); } /*******************************************************************/ /* Imbed an RF Header into the requestMsg stream */ /*******************************************************************/ requestMsg.write (MQRFH_STRUC_ID); //StructId requestMsg.writeInt (MQRFH_VERSION_1); //RFH Version requestMsg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length()); requestMsg.writeInt (MQC.MQENC_NATIVE); //Encoding requestMsg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID requestMsg.write (MQFMT_STRING_ARRAY); //Format requestMsg.writeInt (MQRFH_NONE); //Flags /*******************************************************************/ /* Append the name/values pairs to the message stream */ /*******************************************************************/ requestMsg.writeBytes (requestStr); /****************************************************************/ /* put the command requestMsg on the broker queue and then */ /* receive the reply */ /****************************************************************/ brokerQueue.put (requestMsg, pmo); gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT; gmo.waitInterval = shortWait; replyMsg.messageId = MQC.MQMI_NONE; replyMsg.correlationId = requestMsg.messageId; replyQueue.get (replyMsg, gmo); firstGet = false; String replyText = replyMsg.readString (replyMsg.getMessageLength()); int rc = parseReplyText (replyText); return rc; } public void deRegisterSubscriber (String topic, String streamName, String queueName, String correlID) throws IOException, MQException { System.out.println ("\nDeregister topic: " + topic); System.out.println ("Deregister stream: " + streamName); System.out.println ("Deregister queue: " + queueName); System.out.println ("Deregister correlid: " + new String (formatCorrelID (correlID))); /*******************************************************************/ /* Build the name/value string first to make the length known */ /*******************************************************************/ StringBuffer nameValues = new StringBuffer (MQPS_COMMAND_B + MQPS_DEREGISTER_SUBSCRIBER); nameValues.append (MQPS_TOPIC_B); nameValues.append (topic); if (correlID != null) { nameValues.append (MQPS_REGISTRATION_OPTIONS_B); nameValues.append ("CorrelAsId"); } nameValues.append (MQPS_STREAM_NAME_B); nameValues.append (streamName); nameValues.append (MQPS_Q_NAME_B); nameValues.append (queueName); nameValues.append (MQPS_Q_MGR_NAME_B); nameValues.append (formatQMgr (qMgr.name)); // pad to 16 byte boundary while ( (nameValues.length () % 16) != 0 ) { nameValues.append (" "); } int rc = sendBrokerCommand (nameValues.toString (), correlID); if (rc == 0) { goodCnt++; System.out.println ("Unsubscribe successful."); } else { badCnt++; System.out.print ("Unsubscribe failed for reason "); switch (rc) { case 3071: System.out.println ("STREAM_ERROR"); break; case 3072: System.out.println ("TOPIC_ERROR"); break; case 3073: System.out.println ("NOT_REGISTERED"); break; case 3074: System.out.println ("STREAM_ERROR"); break; case 3075: System.out.println ("TOPIC_ERROR"); break; case 3076: System.out.println ("Q_NAME_ERROR"); break; case 3080: System.out.println ("CORREL_ID_ERROR"); break; case 3081: System.out.println ("NOT_AUTHORIZED"); break; case 3082: System.out.println ("UNKNOWN_STREAM"); break; default: System.out.println ("ReasonCode " + rc + ". Enter mqrc " + rc + " on the command line to get a description."); } } } private void processFile (String inFile) throws IOException, MQException { String topic = null; String qmgr = formatQMgr (qMgr.name); String queue = null; String stream = brokerDefaultStream; String correlID = null; String pubOrSub = new String ("Subscribers"); File inF = new File (inFile); if ( !inF.exists () || !inF.canRead () ) { System.err.println ("Unable to read file " + inFile); return; } BufferedReader inBR = new BufferedReader (new FileReader (inF)); String inString; int line = 0; while ( (inString = inBR.readLine ()) != null ) { line++; try { StringTokenizer tok = new StringTokenizer (inString.trim ()); if ( tok != null && tok.hasMoreTokens () ) { String tag = tok.nextToken (); if ( tag.equals ("Publishers") ) { pubOrSub = "Publishers"; } else if ( tag.equals ("Subscribers") ) { pubOrSub = "Subscribers"; } else if ( tag.equals ("StreamName:") ) { stream = tok.nextToken (); } else if ( tag.equals ("Topic:") ) { // topic may be null for publishers if ( tok.hasMoreTokens () ) { // it may include spaces topic = tok.nextToken ("\r\n"); } } else if ( tag.equals ("RegistrationQMgrName:") ) { qmgr = tok.nextToken (); } else if ( tag.equals ("RegistrationQName:") ) { queue = tok.nextToken (); } else if ( tag.equals ("RegistrationCorrellId:") ) { correlID = tok.nextToken (); } else if ( tag.equals ("RegistrationOptions:") ) { if ( topic != null && qmgr != null && queue != null ) { if ( inString.indexOf ("MQREGO_CORREL_ID_AS_IDENTITY") == -1 ) { correlID = null; } deRegisterSubscriber (topic, stream, queue, correlID); } qmgr = null; queue = null; correlID = null; } } } catch (NoSuchElementException nse) { System.err.println ("Format error in source file on line " + line); } } } public void processDeRegistrations (String [] args) throws IOException { boolean readFile = false; try { /********************************************************/ /* Get the program arguments. */ /* if filename is supplied, get all the info from it */ /* so the others are ignored */ /********************************************************/ String topic = System.getProperty ("t"); String queue = System.getProperty ("q"); String stream = System.getProperty ("s"); String correlID = System.getProperty ("c"); String pubOrSub = System.getProperty ("p"); String inFile = System.getProperty ("f"); if (inFile != null) { readFile = true; System.out.println ("\n\tReading deregister data from file: " + inFile); } else { if (topic == null || queue == null) { System.out.println ("\n\tEither a filename or topic and queue is required."); showSyntax (); return; } } /********************************************************/ /* Turn off reporting that is not controlled by this */ /* application. */ /********************************************************/ MQEnvironment.disableTracing (); MQException.log = null; init (args[0]); if (readFile) { processFile (inFile); } else { if (pubOrSub == null) { pubOrSub = "Subscribers"; } if (stream == null) { stream = brokerDefaultStream; } deRegisterSubscriber(topic, stream, queue, correlID); } System.out.print ("\n\t" + (badCnt + goodCnt) + " Deregistrations attempted. "); System.out.print ("\t" + goodCnt + " successfully removed."); qMgr.disconnect (); } catch (MQException mq) { /********************************************************/ /* All MQ errors are thrown back to this level so the */ /* cleanup (disconnect) can be done in one place */ /********************************************************/ if (firstGet && mq.reasonCode == MQRC_NO_MSG_AVAILABLE) { System.err.println ("\nUnable to communicate with the broker. is it running?\n"); } else { System.err.println ("An MQSeries error occurred. Completion code: " + mq.completionCode + " Reason code: " + mq.reasonCode + "\n\n"); } try { qMgr.disconnect (); } catch (MQException mqe) { } } } private int parseReplyText (String replyMsg) { /********************************************************/ /* The name/value replies follow the reply's RF header. */ /* The names are MQPSCompCode MQPSReason and */ /* MQPSReasonText + request command string */ /********************************************************/ String replyText = replyMsg.substring (MQRFH_STRUC_LENGTH_FIXED); StringTokenizer toks = new StringTokenizer (replyText); for (int idx = 0; idx < 3; idx++) { String arg = toks.nextToken (); } Integer reaCode = new Integer (toks.nextToken ()); return reaCode.intValue (); } private String formatQMgr (String qMgrName) { StringBuffer paddedName = new StringBuffer (" "); paddedName = paddedName.insert (0,qMgrName); paddedName.setLength (qMgrNameLength); return "\"" + paddedName.toString () + "\""; } private byte [] formatCorrelID (String hexCorrelID) { byte [] correlID = {0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0}; if ( hexCorrelID != null && hexCorrelID.length () > 1 ) { for (int cnt=0; cnt < hexCorrelID.length ()-1; cnt=cnt+2) { int nibble1 = Character.getNumericValue (hexCorrelID.charAt( cnt) ) ; int nibble2 = Character.getNumericValue (hexCorrelID.charAt (cnt+1) ) ; byte bChar = (byte) (nibble1*16+nibble2); correlID [cnt/2] = bChar; } } return correlID; } private static void showSyntax () { System.err.println ("\n\tSyntax: java -Df=filename UnSubscribe queueManagerName"); System.err.println ("\tor"); System.err.print ("\tjava -Dt=topic -Dq=queuename <-Dc=correlid> <-Ds=streamname> "); System.err.println ("<-Dp=puborsub> UnSubscribe queueManagerName"); System.err.println ("\n\twhere:"); System.err.println ("\tfilename is a file derived from DumpBroker."); System.err.println ("\ttopic is the topic to unsubscribe."); System.err.println ("\tqueue is the registration queue name."); System.err.println ("\tcorrelid is an optional correlation id in hex."); System.err.println ("\tstreamname is optional stream. Default is " + brokerDefaultStream + "."); System.err.println ("\tpuborsub is Publishers or Subscribers. Default is Subscribers."); } public static void main (String [] args) { UnSubscribe unSubscribe; try { if (args.length != 1) { UnSubscribe.showSyntax (); System.exit(0); } unSubscribe = new UnSubscribe (); unSubscribe.processDeRegistrations (args); System.exit(0); } catch (ArrayIndexOutOfBoundsException abe) { System.err.println ("Syntax: UnSubscribe queueManagerName"); abe.printStackTrace(); } catch (Exception e) { System.err.println (e); e.printStackTrace(); } } }