/***************************************************************************/ /* */ /* (c) Copyright IBM Corp. 1999 All rights reserved. */ /* */ /* This sample program is owned by International Business Machines */ /* Corporation or one of its subsidiaries ("IBM") and is copyrighted */ /* and licensed, not sold. */ /* */ /* You may copy, modify, and distribute this sample program in any */ /* form without payment to IBM, for any purpose including developing, */ /* using, marketing or distributing programs that include or are */ /* derivative works of the sample program. */ /* */ /* The sample program is provided to you on an "AS IS" basis, without */ /* warranty of any kind. IBM HEREBY EXPRESSLY DISCLAIMS ALL WARRANTIES, */ /* EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED */ /* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* Some jurisdictions do not allow for the exclusion or limitation of */ /* implied warranties, so the above limitations or exclusions may not */ /* apply to you. IBM shall not be liable for any damages you suffer as */ /* a result of using, modifying or distributing the sample program or */ /* its derivatives. */ /* */ /* Each copy of any portion of this sample program or any derivative */ /* work, must include the above copyright notice and disclaimer of */ /* warranty. */ /* */ /***************************************************************************/ /* */ /* Program name: mqthrdnt */ /* */ /* Description: Sample C program that uses multi-threading in an NT */ /* environment. */ /* */ /***************************************************************************/ /* */ /* Function: */ /* */ /* This program allows the user to specify how many threads to start */ /* that will put and get messages to the specified queue on the */ /* specified queue manager. Every other thread will be a put or get */ /* thread (evens are put, odds are get). If enough threads are used, */ /* it can be seen that the get threads will share the work in getting */ /* the put thread messages. */ /* */ /* This program has been tested with the MSVC++ 4.2 compiler: */ /* */ /* cl -MT -Z7 -c -W1 -D_X86=1 -DWIN32 mqthrdnt.c */ /* link -OUT:mqthrdnt.exe mqthrdnt.obj mqm.lib */ /* */ /* Note: Use mqic32.lib instead of mqm.lib if you want a client connect. */ /* */ /***************************************************************************/ /* */ /* mqthrdnt has 5 optional parameters (no spaces between parameter and */ /* value): */ /* -q the name of the queue to use; default is */ /* SYSTEM.DEFAULT.LOCAL.QUEUE */ /* -m the name of the queue manager to use; default is the */ /* default queue manager */ /* -l the number of messages the put threads will put on the */ /* queue; default is 10 */ /* -t the number of threads to start; default is 10 (see */ /* MY_MAX_THREADS */ /* -j whether to start each thread and wait for it to end (S) */ /* or start all threads and wait for all to end (A); */ /* default is A */ /* */ /***************************************************************************/ #include #include #include #include #include #include #include #define MY_MAX_THREADS 10 typedef struct _threadparms { int iLoop; int iValue; char cFunction[4]; char cQMgr[50]; char cQueue[MQ_Q_NAME_LENGTH]; } THREADPARMS; int bQueue = 0; /* assume we don't have a queue */ int bQmgr = 0; /* assume we don't have a qmgr */ int iLoop = 10; /* each thread will loop 10 times */ int iThreads = MY_MAX_THREADS; /* we'll run 10 threads */ int iJoin = 0; /* 0 = start all, 1 = wait on each */ char cQmgr[50]; char cQueue[MQ_Q_NAME_LENGTH+1]; void MQThread(void *tparms); long parseArgs(int argc, char *argv[]); int main(int argc, char** argv) { struct tm *ptr; time_t st; time_t lt; THREADPARMS tMQParms[MY_MAX_THREADS]; int i; unsigned long ThreadIDs[MY_MAX_THREADS]; HANDLE ThreadHandles[MY_MAX_THREADS]; parseArgs(argc, argv); /****************************************************/ /* Capture the start time for reference at the end. */ /****************************************************/ st = time(NULL); /*****************************************************************/ /* For each thread that the user requested, set up the structure */ /* that contains the information for the thread and then start */ /* the thread. */ /*****************************************************************/ for (i=0; i < iThreads; i++) { /******************************************************/ /* Set up the queue manager name if one was specified */ /******************************************************/ tMQParms[i].cQMgr[0] = 0; /* default */ if (bQmgr) { strcpy(tMQParms[i].cQMgr, cQmgr); } /******************************************************/ /* Set up the queue name if one was specified */ /******************************************************/ if (bQueue) { strncpy(tMQParms[i].cQueue, cQueue, (size_t)MQ_Q_NAME_LENGTH); } else { strncpy(tMQParms[i].cQueue, "SYSTEM.DEFAULT.LOCAL.QUEUE", (size_t)MQ_Q_NAME_LENGTH); } /******************************************************/ /* Set up number of loop iterations, which thread */ /******************************************************/ tMQParms[i].iLoop = iLoop; tMQParms[i].iValue = i; /********************************************************/ /* Indicate if this is a PUT (even) or GET (odd) thread */ /********************************************************/ if ( fmod( (double)i, 2.0) == 0.0) { strcpy(tMQParms[i].cFunction, "PUT"); } else { strcpy(tMQParms[i].cFunction, "GET"); } /*********************************************/ /* Start the thread and indicate that we did */ /*********************************************/ ThreadHandles[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)MQThread, (void*)&tMQParms[i], 0, &ThreadIDs[i]); lt = time(NULL); ptr = localtime(<); printf("i=%d, function = %s thread started %s\n", i, tMQParms[i].cFunction, asctime(ptr)); /*********************************************/ /* If requested, wait for this thread to end */ /* before going on to next thread. */ /*********************************************/ if (iJoin == 1) { printf(" Wait for thread %d to end %s\n", i, asctime(ptr)); WaitForSingleObject(ThreadHandles[i], INFINITE); } } /* end for */ /*****************************************************/ /* If requested, wait for all of the threads to end. */ /*****************************************************/ if (iJoin == 0) { lt = time(NULL); ptr = localtime(<); printf("All threads started, wait for all to end %s\n", asctime(ptr)); WaitForMultipleObjects(iThreads, ThreadHandles, TRUE, INFINITE); lt = time(NULL); ptr = localtime(<); printf("All threads have ended %s\n", asctime(ptr)); } /*****************************************************/ /* For reference, log start and end times. */ /*****************************************************/ lt = time(NULL); ptr = localtime(&st); printf("Start time %s", asctime(ptr)); ptr = localtime(<); printf("End Time %s", asctime(ptr)); return(0); } /****************************************************************/ /* This is the actual thread. It is a big if statement around */ /* the function of either a PUT or a GET. The Put thread will */ /* put the number of messages that is passed in. The Get */ /* thread will get messages until there are no more. Note: It */ /* is probable that each Get thread will get a different number */ /* of threads depending on thread swapping. */ /****************************************************************/ void MQThread(void *tparm) { MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */ MQLONG messlen; /* message length received */ char QMName[50]; /* queue manager name */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQLONG CReason; /* reason code for MQCONN */ MQLONG buflen; /* buffer length */ char buffer[100]; /* message buffer */ int iLoop; THREADPARMS *pThreadParms; pThreadParms = (THREADPARMS *)tparm; /****************************************************************/ /* Connect to queue manager */ /****************************************************************/ MQCONN(pThreadParms->cQMgr, &Hcon, &CompCode, &CReason); /* report reason and stop if it failed */ if (CompCode == MQCC_FAILED) { printf("Thread %d MQCONN ended with reason code %ld\n", pThreadParms->iValue, CReason); return; } printf("Thread %d function %s\n", pThreadParms->iValue, pThreadParms->cFunction); /**********************************************/ /* Test to see if performing PUT or GET work. */ /**********************************************/ if (strcmp(pThreadParms->cFunction, "PUT") == 0) { /******************************************************************/ /* Set up the name of the target queue */ /******************************************************************/ strncpy(od.ObjectName, pThreadParms->cQueue, (size_t)MQ_Q_NAME_LENGTH); printf("Thread %d target queue is %s\n", pThreadParms->iValue, od.ObjectName); /****************************************************************/ /* Open the target message queue for output */ /****************************************************************/ O_options = MQOO_OUTPUT /* open queue for output */ + MQOO_FAIL_IF_QUIESCING; /* but not if MQM stopping */ MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); /* report reason, if any; stop if failed */ if (Reason != MQRC_NONE) { printf("Thread %d MQOPEN ended with reason code %ld\n", pThreadParms->iValue, Reason); } if (OpenCode == MQCC_FAILED) { printf("Thread %d unable to open queue for output\n", pThreadParms->iValue); } else { memcpy(md.Format, /* character string format */ MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH); /******************************************************************/ /* Loop through the number of iterations specified and write a */ /* message to the queue. The message will contain the thread ID */ /* and loop iteration. */ /******************************************************************/ for (iLoop=0; iLoop < pThreadParms->iLoop; iLoop++) { sprintf(buffer, "Thread %d writing buffer iteration %d", pThreadParms->iValue, iLoop); buflen = strlen(buffer); /* length without null */ /****************************************************************/ /* Put each buffer to the message queue */ /****************************************************************/ memcpy(md.MsgId, /* reset MsgId to get a new one */ MQMI_NONE, sizeof(md.MsgId) ); memcpy(md.CorrelId, /* reset CorrelId to get a new one */ MQCI_NONE, sizeof(md.CorrelId) ); MQPUT(Hcon, Hobj, &md, &pmo, buflen, buffer, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQPUT ended with reason code %ld\n", pThreadParms->iValue ,Reason); } } } } else { /* get part of the thread */ /******************************************************************/ /* Set up the name of the input queue */ /******************************************************************/ strncpy(od.ObjectName, pThreadParms->cQueue, (size_t)MQ_Q_NAME_LENGTH); /****************************************************************/ /* Open the named message queue for input; exclusive or shared */ /* use of the queue is controlled by the queue definition here */ /****************************************************************/ O_options = MQOO_INPUT_AS_Q_DEF /* open queue for input */ + MQOO_FAIL_IF_QUIESCING; /* but not if MQM stopping */ MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); /* report reason, if any; stop if failed */ if (Reason != MQRC_NONE) { printf("Thread %d MQOPEN ended with reason code %ld\n", pThreadParms->iValue ,Reason); } if (OpenCode == MQCC_FAILED) { printf("Thread %d unable to open queue for input\n", pThreadParms->iValue); } /****************************************************************/ /* Get messages from the message queue */ /* Loop until there is a failure */ /****************************************************************/ CompCode = OpenCode; /* use MQOPEN result for initial test */ while (CompCode != MQCC_FAILED) { buflen = sizeof(buffer) - 1; /* buffer size available for GET */ gmo.Options = MQGMO_WAIT /* wait for new messages */ + MQGMO_CONVERT;/* convert if necessary */ gmo.WaitInterval = 5000; /* 5 second limit for waiting */ /*************************************************************/ /* In order to read the messages in sequence, MsgId and */ /* CorrelID must have the default value. MQGET sets them */ /* to the values in for message it returns, so re-initialise */ /* them before every call */ /*************************************************************/ memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); /****************************************************************/ /* MQGET sets Encoding and CodedCharSetId to the values in */ /* the message returned, so these fields should be reset to */ /* the default values before every call, as MQGMO_CONVERT is */ /* specified. */ /****************************************************************/ md.Encoding = MQENC_NATIVE; md.CodedCharSetId = MQCCSI_Q_MGR; MQGET(Hcon, Hobj, &md, &gmo, buflen, buffer, &messlen, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { if (Reason == MQRC_NO_MSG_AVAILABLE) { printf("Thread %d no more messages\n", pThreadParms->iValue); } else { printf("Thread %d MQGET ended with reason code %ld\n", pThreadParms->iValue, Reason); /* treat truncated message as a failure for this sample */ if (Reason == MQRC_TRUNCATED_MSG_FAILED) { CompCode = MQCC_FAILED; } } } /****************************************************************/ /* Display each message received */ /****************************************************************/ if (CompCode != MQCC_FAILED) { buffer[messlen] = '\0'; /* add terminator */ printf("Thread %d message <%s>\n", pThreadParms->iValue, buffer); } } } /******************************************************************/ /* Close the target queue (if it was opened) */ /******************************************************************/ if (OpenCode != MQCC_FAILED) { C_options = 0; /* no close options */ MQCLOSE(Hcon, &Hobj, C_options, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQCLOSE ended with reason code %ld\n", pThreadParms->iValue, Reason); } } /******************************************************************/ /* Disconnect from MQM if not already connected */ /******************************************************************/ if (CReason != MQRC_ALREADY_CONNECTED) { MQDISC(&Hcon, &CompCode, &Reason); /* report reason, if any */ if (Reason != MQRC_NONE) { printf("Thread %d MQDISC ended with reason code %ld\n", pThreadParms->iValue, Reason); } } printf(" Thread %d ending....\n", pThreadParms->iValue); return ; } long parseArgs(int argc, char *argv[]) { short i; char cTemp[10]; if ( argc > 1 ) { for (i=1; i < argc; i++ ) { /* is it an option switch */ if ((*argv[i] == '/') || (*argv[i] == '-')) { switch ( *(argv[i] + 1) ) { case 'q': case 'Q': strcpy(cQueue, &argv[i][2]); printf("Queue= %s\n", cQueue); bQueue = 1; break; case 'm': case 'M': strcpy(cQmgr, &argv[i][2]); printf("Qmgr = %s\n", cQmgr); bQmgr = 1; break; case 't': case 'T': iThreads = atoi(&argv[i][2]); if (iThreads > MY_MAX_THREADS) { iThreads = MY_MAX_THREADS; } /* endif */ printf("iThreads = %d\n", iThreads); break; case 'l': case 'L': iLoop = atoi(&argv[i][2]); printf("iLoop = %d\n", iLoop); break; case 'j': case 'J': strcpy(cTemp, &argv[i][2]); if ( (strcmp(cTemp, "S") == 0) || (strcmp(cTemp, "s") == 0) ) { iJoin = 1; printf("iJoin = %d, start one, wait one\n", iJoin); } else { iJoin = 0; printf("iJoin = %d, start all, wait all\n", iJoin); } } /* endswitch */ } /* endif checking current argument */ } /* endfor loop checking each argument */ } /* endif ... no arguments passed in */ return(0); }