/***************************************************************************/ /* */ /* (c) Copyright IBM Corp. 1998 All rights reserved. */ /* */ /* This sample program is owned by International Business Machines */ /* Corporation or one of its subsidiaries ("IBM") and is copyrighted */ /* and licensed, not sold. */ /* */ /* You may copy, modify, and distribute this sample program in any */ /* form without payment to IBM, for any purpose including developing, */ /* using, marketing or distributing programs that include or are */ /* derivative works of the sample program. */ /* */ /* The sample program is provided to you on an "AS IS" basis, without */ /* warranty of any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, */ /* EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED */ /* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* Some jurisdictions do not allow for the exclusion or limitation of */ /* implied warranties, so the above limitations or exclusions may not */ /* apply to you. IBM shall not be liable for any damages you suffer as */ /* a result of using, modifying or distributing the sample program or */ /* its derivatives. */ /* */ /* Each copy of any portion of this sample program or any derivative */ /* work, must include the above copyright notice and disclaimer of */ /* warranty. */ /* */ /***************************************************************************/ /* */ /* Program name: mqthrds */ /* */ /* Description: Sample C program that shows the use of threads to perform */ /* MQSeries functions. */ /* */ /***************************************************************************/ /* */ /* Function: */ /* */ /* This program shows the use of multiple threads to perform MQSeries */ /* function calls. It uses the Sun Solaris threading environment. */ /* It has one function that performs MQPUTs or MQGETs based on the */ /* parameters that are passed to it from the main thread. Both the */ /* MQPUT and MQGET thread are based on the MQSeries sample programs */ /* amqsput0.c and amqsget0.c. See them for the details on the MQSeries */ /* calls. */ /* */ /***************************************************************************/ /* */ /* Invocation: */ /* */ /* mqthrds has one required parameter: a queue name. It also accepts an */ /* optional parameter of a queue manager name. */ /* */ /* mqthrds queue.name qmgr.name */ /* */ /***************************************************************************/ /* */ /* This program was built and tested on a Sun Solaris 2.5.1 system using */ /* the Sun SPARCompiler C Release 4.0 with MQSeries for Sun Solaris V5.0. */ /* */ /* Server version: */ /* cc -o mqthrds mqthrds.c -mt -lmqm -lmqmcs -lmqmzse -lsocket -lnsl -ldl */ /* */ /* Client version: */ /* cc -o mqthrds mqthrds.c -mt -lmqic -lmqmcs -lsocket -lnsl -ldl */ /* */ /* See the MQSeries Application Programming Guide for more details on */ /* build MQSeries programs. */ /* */ /***************************************************************************/ #include #include #include #include #include /**************************************************************/ /* The following structure is used by the program to pass the */ /* command line parameters to the threads. It contains the */ /* following: */ /* - number of messages to put (hard-coded to 5) */ /* - whether to perform gets or puts */ /* - the queue manager name (from command line) */ /* - the queue name (from command line) */ /**************************************************************/ typedef struct _threadparms { int iLoop; char cFunction[4]; char cQMgr[50]; char cQueue[MQ_Q_NAME_LENGTH]; } THREADPARMS; void *MQThread(void *tparms); void main(int argc, char** argv) { thread_t tidPut; thread_t tidGet; THREADPARMS tParmsPut; THREADPARMS tParmsGet; /********************************************************/ /* Look for the same parameters as amqsput and amqsget. */ /* If there is no queue name, then give an error and */ /* exit. Only the queue manager name is optional. */ /********************************************************/ if (argc < 2) { printf("mqthrds: missing queue name parameter\n"); exit(99); } /****************************************************/ /* Set up to use default queue manager. If one was */ /* entered on command line, use it instead. */ /****************************************************/ tParmsPut.cQMgr[0] = 0; /* default */ tParmsGet.cQMgr[0] = 0; /* default */ if (argc > 2) { strcpy(tParmsPut.cQMgr, argv[2]); strcpy(tParmsGet.cQMgr, argv[2]); } /****************************************************/ /* Get the name of the queue that was entered on */ /* the command line. */ /****************************************************/ strncpy(tParmsPut.cQueue, argv[1], (size_t)MQ_Q_NAME_LENGTH); strncpy(tParmsGet.cQueue, argv[1], (size_t)MQ_Q_NAME_LENGTH); /****************************************************/ /* Set up the rest of the Put thread parameters. */ /****************************************************/ tParmsPut.iLoop = 5; strcpy(tParmsPut.cFunction, "PUT"); thr_create(NULL, NULL, MQThread, (void *)&tParmsPut, NULL, &tidPut); printf("Put thread has an ID of %d\n", tidPut); /****************************************************/ /* Set up the rest of the Get thread parameters. */ /****************************************************/ strcpy(tParmsGet.cFunction, "GET"); thr_create(NULL, NULL, MQThread, (void *)&tParmsGet, NULL, &tidGet); printf("Get thread has an ID of %d\n", tidGet); /****************************************************/ /* Wait for the threads to end */ /****************************************************/ thr_join(tidPut, NULL, NULL); thr_join(tidGet, NULL, NULL); return; } /***************************************************************/ /* The MQThread function performs the MQSeries work. It will */ /* either put messages or get messages based on the parameters */ /* that were passed into it from the main thread. */ /***************************************************************/ void *MQThread(void *tparm) { MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */ MQLONG messlen; /* message length received */ char QMName[50]; /* queue manager name */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQLONG CReason; /* reason code for MQCONN */ MQLONG buflen; /* buffer length */ char buffer[100]; /* message buffer */ int iLoop; THREADPARMS *pThreadParms; pThreadParms = (THREADPARMS *)tparm; /****************************************************************/ /* Connect to queue manager */ /****************************************************************/ MQCONN(pThreadParms->cQMgr, &Hcon, &CompCode, &CReason); /* report reason and stop if it failed */ if (CompCode == MQCC_FAILED) { printf("Thread %d MQCONN ended with reason code %ld\n", thr_self(), CReason); return( 0 ); } /**********************************************/ /* Test to see if performing PUT or GET work. */ /**********************************************/ if (strcmp(pThreadParms->cFunction, "PUT") == 0) { /******************************************************************/ /* Set up the name of the target queue */ /******************************************************************/ strncpy(od.ObjectName, pThreadParms->cQueue, (size_t)MQ_Q_NAME_LENGTH); printf("Thread %d target queue is %s\n", thr_self(), od.ObjectName); /****************************************************************/ /* Open the target message queue for output */ /****************************************************************/ O_options = MQOO_OUTPUT /* open queue for output */ + MQOO_FAIL_IF_QUIESCING; /* but not if MQM is stopping */ MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); /* report reason, if any; stop if failed */ if (Reason != MQRC_NONE) { printf("Thread %d MQOPEN ended with reason code %ld\n", thr_self(), Reason); } if (OpenCode == MQCC_FAILED) { printf("Thread %d unable to open queue for output\n", thr_self()); } else { memcpy(md.Format, /* character string format */ MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH); /******************************************************************/ /* Loop through the number of iterations specified and write a */ /* message to the queue. The message will contain the thread ID */ /* and loop iteration. */ /******************************************************************/ for (iLoop=0; iLoop < pThreadParms->iLoop; iLoop++) { sprintf(buffer, "Thread %d writing buffer iteration %d", thr_self(), iLoop); buflen = strlen(buffer); /* length without null */ /****************************************************************/ /* Put each buffer to the message queue */ /****************************************************************/ memcpy(md.MsgId, /* reset MsgId to get a new one */ MQMI_NONE, sizeof(md.MsgId) ); memcpy(md.CorrelId, /* reset CorrelId to get a new one */ MQCI_NONE, sizeof(md.CorrelId) ); MQPUT(Hcon, Hobj, &md, &pmo, buflen, buffer, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQPUT ended with reason code %ld\n", thr_self(), Reason); } } } } else { /******************************************************************/ /* Set up the name of the input queue */ /******************************************************************/ strncpy(od.ObjectName, pThreadParms->cQueue, (size_t)MQ_Q_NAME_LENGTH); /****************************************************************/ /* Open the named message queue for input; exclusive or shared */ /* use of the queue is controlled by the queue definition here */ /****************************************************************/ O_options = MQOO_INPUT_AS_Q_DEF /* open queue for input */ + MQOO_FAIL_IF_QUIESCING; /* but not if MQM is stopping */ MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); /* report reason, if any; stop if failed */ if (Reason != MQRC_NONE) { printf("Thread %d MQOPEN ended with reason code %ld\n", thr_self(), Reason); } if (OpenCode == MQCC_FAILED) { printf("Thread %d unable to open queue for input\n", thr_self()); } /****************************************************************/ /* Get messages from the message queue */ /* Loop until there is a failure */ /****************************************************************/ CompCode = OpenCode; /* use MQOPEN result for initial test */ while (CompCode != MQCC_FAILED) { buflen = sizeof(buffer) - 1; /* buffer size available for GET */ gmo.Options = MQGMO_WAIT /* wait for new messages */ + MQGMO_CONVERT;/* convert if necessary */ gmo.WaitInterval = 15000; /* 15 second limit for waiting */ /*************************************************************/ /* In order to read the messages in sequence, MsgId and */ /* CorrelID must have the default value. MQGET sets them */ /* to the values in the message it returns, so it */ /* re-initializes them before every call. */ /*************************************************************/ memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); /****************************************************************/ /* MQGET sets Encoding and CodedCharSetId to the values in */ /* the message returned, so these fields should be reset to */ /* the default values before every call, as MQGMO_CONVERT is */ /* specified. */ /****************************************************************/ md.Encoding = MQENC_NATIVE; md.CodedCharSetId = MQCCSI_Q_MGR; MQGET(Hcon, Hobj, &md, &gmo, buflen, buffer, &messlen, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { if (Reason == MQRC_NO_MSG_AVAILABLE) { printf("Thread %d no more messages\n", thr_self()); } else { printf("Thread %d MQGET ended with reason code %ld\n", thr_self(), Reason); /* treat truncated message as a failure for this sample */ if (Reason == MQRC_TRUNCATED_MSG_FAILED) { CompCode = MQCC_FAILED; } } } /****************************************************************/ /* Display each message received */ /****************************************************************/ if (CompCode != MQCC_FAILED) { buffer[messlen] = '\0'; /* add terminator */ printf("Thread %d message <%s>\n", thr_self(), buffer); } } } /******************************************************************/ /* Close the target queue (if it was opened) */ /******************************************************************/ if (OpenCode != MQCC_FAILED) { C_options = 0; /* no close options */ MQCLOSE(Hcon, &Hobj, C_options, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQCLOSE ended with reason code %ld\n", thr_self(), Reason); } } /******************************************************************/ /* Disconnect from MQM if not already connected */ /******************************************************************/ if (CReason != MQRC_ALREADY_CONNECTED) { MQDISC(&Hcon, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQDISC ended with reason code %ld\n", thr_self(), Reason); } } printf("Thread %d ending....\n", thr_self()); return(0); }