This lab demonstrates the Streaming Queue feature added to MQ in the 9.2.3.0 CD release. Streaming Queues allow you to configure any local or model queue with the name of second named queue. This second queue is referred to as a Stream Queue.
When messages are put to the original queue, a duplicate copy of each message is also placed on the stream queue. The Streaming Queue feature allows you to create a duplicate stream of messages which can be used for later analysis, logging, or storage without affecting the business applications using the original queue.
In this lab, you will:
We will not demonstrate messages being consumed from either of the queues. Stream queues are regular MQ local queues and getting messages from them is done in the same way as any other MQ queue.
We will not demonstrate using a queue alias to topic as the stream queue, although this is supported. More information on using a queue alias as the streamqueue is available in the IBM Documentation.
This lab was written by Matthew Whitehead (mwhitehead@uk.ibm.com).
Streaming Queues in IBM Knowledge Center
Topology overview for stream queues
The topology diagram shows the basic capability of stream queues. A regular local queue that is currently being used by MQ applications can be configured to stream a duplicate of every message put to that queue, to a second destination called a stream queue.
The streaming queues feature of IBM® MQ is configured by the administrator on individual queues, and the messages are streamed by the queue manager, not by the application itself.
This means that in almost all cases the application putting messages to the original queue is completely unaware that streaming is taking place. Similarly, the application consuming messages from the original queue is unaware that message streaming has taken place.
The version of the IBM MQ client library does not need upgrading to make use of streaming queues, and the original messages are completely unchanged by the streaming process.
You can configure streaming queues in one of two modes:
Best effort
In this mode, the queue manager considers it more important that delivery of the original message is not affected by delivery of the streamed message. If the original message can be delivered, but the streamed message cannot, the original message is still delivered to its queue. This mode is best suited to those applications, where it is important for the original business application to remain unaffected by the streaming process.
Must duplicate
In this mode, the queue manager ensures that both the original message and the streamed message are successfully delivered to their queues. If, for some reason, the streamed message cannot be delivered to its queue, for example, because the second queue is full, then the original message is not delivered to its queue either. The putting application receives an error reason code and must try to put the message again.
In most cases, the copy of the message delivered to the second queue is a duplicate of the original message. This includes all of the message descriptor fields, including the message ID and correlation ID. The streamed messages are intended to be very close copies of the original messages, so that they are easier to find and, if necessary, replay them back into another IBM MQ system.
There are some message descriptor fields that are not retained on the streamed message. The following changes are made to the streamed message before it is placed on the second queue:
The expiry of the streamed message is set to MQEI_UNLIMITED, regardless of the expiry of the original message. If CAPEXPRY has been configured on the secondary queue this value is applied to the streamed message.
If any of the following report options are set on the original message, they are not enabled on the streamed message. This is to ensure that no unexpected report messages are delivered to applications that are not designed to receive them:
Due to the near-identical nature of the streamed messages, most of the attributes of the secondary queue have no affect on the message descriptor fields of the streamed message. For example, the DEFPSIST and DEFPRTY attributes of the secondary queue have no affect on the streamed message.
The following exceptions apply to the streamed message:
CAPEXPRY set on the CUSTOM attribute
If the secondary queue has been configured with a CAPEXPRY value in the CUSTOM attribute, this expiry cap is applied to the expiry of the streamed message.
DEFBIND for cluster queues
If the secondary queue is a cluster queue, the streamed message is put using the bind option set in the DEFBIND attribute of the secondary queue.
Certain configurations are not supported when using streaming queues in IBM® MQ, and these are documented here.
The following list specifies the configurations that are not supported:
Defining STREAMQ on any queue named SYSTEM.*, with the following exceptions:
The streaming queues feature allows a message put to one queue, to be duplicated to a second queue. In most cases the two messages are put to their respective queues under a unit of work.
If the original message was put using MQPMO_SYNCPOINT, the duplicate message is put to the stream queue under the same unit of work that was started for the original put.
If the original was put with MQPMO_NO_SYNCPOINT, a unit of work will be started even though the original put did not request one. This is done for two reasons:
It ensures the duplicate message is not delivered if the original message could not be. The streaming queues feature only delivers messages to stream queues if the original message was also delivered.
There can be a performance improvement by doing both puts inside a unit of work
The only time the messages are not delivered inside a unit of work is when the original MQPUT is non-persistent with MQPMO_NO_SYNCPOINT, and the STRMQOS attribute of the queue is set to BESTEF (best effort).
The additional put to the stream queue does not count towards the MAXUMSGS limit. In the case of a queue configured with STRMQOS(BESTEF), failure to deliver the duplicate message does not cause the unit of work to be rolled back.
It is possible to stream messages from a local queue to a cluster queue and to stream messages from cluster queue instances to a local queue.
This can be useful if you have a local queue where original messages are delivered, and would like to stream a copy of every message to one or more instances of a cluster queue. This could be to workload balance the processing of the duplicate messages, or simply to have duplicate messages streamed to another queue elsewhere in the cluster.
When streaming messages to a cluster queue, messages are distributed using the cluster workload balancing algorithm. A cluster queue instance is chosen based on the DEFBIND attribute of the cluster queue.
For example, if the cluster queue is configured with DEFBIND(OPEN), an instance of the cluster queue is chosen when the original queue is opened. All duplicate messages go to the same cluster queue instance, until the original queue is reopened by the application.
If the cluster queue is configured with DEFBIND(NOTFIXED), an instance of the cluster queue will be chosen for every MQPUT operation.
You should configure all cluster queue instances with the same value for the DEFBIND attribute.
This can be useful if you already send messages to several instances of a cluster queue, and would like a copy of each message to be delivered to a streaming queue, on the same queue manager, as the cluster queue instance.
When the original message is delivered to one of the cluster queue instances, a duplicate message is delivered to the stream queue by the cluster-receiver channel.
This lab was designed to be run on IBM Cloud Pak for Integration (CP4I) 2023.4 on RedHat OpenShift 4.12 or later.
Entering commands for this lab - In this lab, you will come across some quite long commands to enter. You will be using Linux Shell like bash. To avoid a lot of typing, you may copy the commands from this document and execute on the Shell.
To create an MQ queue manager, you could use the UI as you did in Lab 1. Most administrators use scripting to define things which novice users would do with the UI.
This lab shows you how to write yaml to create the queue manager, application queues and associated streaming queue, and channels.
You should still be logged into the OpenShift environment. If not, click on your username on the top right menu of the OpenShift Console, then click on Copy Login Command.
Click Display Token then copy the token.
Paste the copied token into a terminal window and run the command by hitting enter.
You should be logged into project *
oc project <your namespace>
This lab is using student student1 You shouldn’t need to run this command unless your login has timed out. And make sure you using your own namespace
Open a new terminal window by double-clicking the icon on the desktop.
Navigate to the MQonCP4I/streamq/deploy directory using the following commnand:
cd ~/MQonCP4I/streamq/deploy
ls -l
Now run the following command to view the template yaml file.
gedit streamq.yaml_template
Pay particular attention to each kind stanza:
In the QueueManager stanza, notice the name, the type (ephemeral) and references to the ConfigMap and Secret. In the ConfigMap stanza you will see the mqsc commands to define the queues APPQ and MY.LOG.QUEUE. Notice that MY.LOG.QUEUE is the streaming queue for APPQ.
Open a new terminal window and navigate to the deploy directory again with the command:
cd ~/MQonCP4I/streamq/deploy
Run the strm-install.sh script:
./strm-install.sh
The results are displayed showing the objects (mentioned above) which were created.
Open your web browser tab for the Platform Navigator. Refresh the page. The status for mqxxstrm will have a status of Pending until completely deployed.
After a couple of minutes the instance is deployed and the status changes to Ready.
In the Platform Navigator right click the hyperlink for your instance and open in new tab. This will open MQ Console for your queue manager, but first you must respond to the security warnings if prompted. Click Advanced then Accept the Risk and Continue.
You are then taken to the MQ Console for your queue manager. Recall that you named the queue manager with your student ID prefix and strm - mqxxstrm. The instance name, mqxxstrm, is the same as the queue manager name. Click Manage.
On the Manage page you will see Queues, Events, Applications, and MQ network. Click on Queues you will see APPQ and MY.LOG.QUEUE which were defined by the mqsc commands in the yaml defined ConfigMap.
Verify that there are no messages on these queues. Queue depth should be zero.
Click Applications > App Channels where you will find the channel MQXXSTRMCHL. The channel was also defined in the mqsc commands in the yaml defined ConfigMap. Click the elipsis on the right side then select View Configuration to display or edit the channel properties.
Click the breadcrumb to return to the Manage page.
Notice the View Configuration hyperlink in the top right corner. This takes you to the queue manager properties. Click the link now.
You arrive at the overall queue manager properties to be displayed or edited. Click Security > Channel authentication.
Continue to explore the MQ Console as long as you like.
You can also see that the pods are running from the command line. In your terminal window enter the following command using your student ID in place of xx.
oc get pods | grep mqxx
In a terminal window navigate to /home/student/MQonCP4I/streamq/test directory. You will find three files (and additional files for TLS):
Open them in gedit.
In the ccdt.json file, you need to update the host next to host: with your host name. To get your host name, run the following command in a terminal window: Replace 99 with your student number
oc get route | grep mqxxstrm
Replace the xx with your student ID. Your host name should start with mqxxstrm-ibm-mq-qm where xx is your student ID.
Return to the editor and paste the value into the host: field of ccdt.json. Under channel > name: insert the name of your SVRCONN channel (MQxxSTRMCHL). Then input the value of your queue manager in the queueManager: field.
Click Save to save ccdt.json.
Now edit getMessage.sh and sendMessge.sh. You need to change the same values in each file. In the export statements, change 00 to your student ID. Verify that the paths for MQCCDTURL and MQSSLKEYR are set to /home/student/…. Click the Save button for each file.
In the terminal window in the ./test directory initiate the testing by running the following command:
./sendMessage.sh
The script will then connect to MQ and start sending messages incessantly. Leave this window open to keep sending messages.
sendMessage.sh uses the amqsphac sample application shipped with MQ to put some messages to the application queue APPQ.
The script will not put any messages directly to MY.LOG.QUEUE – the queue manager will do that for us.
After a number of messages have been put, end the script with
Return to the browser where the MQ Console is running. Click Manage > Queues. Scroll down if necessary to see all queues. Compare the number of messages for APPQ to the number of messages for its streaming queue MY.LOG.QUEUE. They should be the same.
Note that even though we didn’t put any messages to MY.LOG.QUEUE it has the same number of messages on it as APPQ. The stream queue feature has taken a copy of each message we put to APPQ and streamed it to MY.LOG.QUEUE.
Open another command window, navigate to /home/student/mqoncp4i-master/MQonCP4I/streamq/test directory and run the following command:
./getMessage.sh
When all messages have been read, end the script with
In the MQ Console, click the refresh button and compare the number of messages on both queues again.
getMessage.sh retrieved the messages from the APPQ queue. The messages on the streaming queue MY.LOG.QUEUE remain. This is the expected behavior as the streaming queue is intended to be a log for another eventing application to read.
First clear the messages from the stream queue. Still in MQ Console, click the ellipsis to right of MY.LOG.QUEUE and select Clear queue.
In the pop-up, click Clear Queue to confirm you want to delete the messages.
Note:
Important to make sure both queue are empty.</span>
Both queues are now empty.
Click the elipsis for MY.LOG.QUEUE and select View configuration.
You are going to change the Max Queue Depth for the queue. Click Extended then click the Edit button.
Locate the Max Queue Depth field and change it to 10. Click Save.
Return to the terminal window where you ran the sendMessage.sh command. Repeat and run the command. Let the script run until at least twenty messages have been put to APPQ. End the program with
In the MQ Console, navigate to the Manage > Queues display. You will find that APPQ has all twenty messages. But what happened to the stream queue MY.LOG.QUEUE?
The display indicates an error on the stream queue. Max queue depth is now 10. But 20 messages were put to the application queue which 200% over max. Hover over the error icon and it indicates that number of messages exceeds 100% of max.
Click the elipsis for APPQ and select View configuration again.
Click Storage then Edit.
Note that the Streaming queue Quality of Service (QOS) is set to Best effort. The BESTEF quality of service is the default value and indicates that MQ should attempt to stream the duplicate messages to the stream queue but that it should deliver the original message to APPQ even if there is a problem delivering the duplicate.
This quality-of-service is best suited to applications where you want to make sure that the original behavior of the application is not affected by the stream queue feature. In this mode a putting application will never receive an error from the MQPUT API call due to an error with the streaming feature.
Steam queues offer a second quality-of-service, MUSTDUP (must duplicate). When MUSTDUP is used, MQ will either deliver both messages, or neither of them. If there is a problem delivering the duplicate to the stream queue then the original message will not be delivered to the application queue and the putting application will receive an error reason code. Let’s change our APPQ to use the MUSTDUP quality of service and see what affect that has.
Click the dropdown for Streaming queue QOS and select Must duplicate. Click Save.
Now we will re-run the sendMessage.sh script again and see what happens when we try to put more messages to APPQ. Remember that MY.LOG.QUEUE is still full.
amqsphac has received an error, MQRC 2053 (MQRC_Q_FULL). This is not because APPQ is full, but it is because MY.LOG.QUEUE is full. When MUSTDUP is used any error with the stream queue will be passed back to the application.
If there was a problem with APPQ then the application would receive that error instead and MQ would not attempt to deliver the message to the stream queue.
Reset the Max queue depth for MY.LOG.QUEUE to the default value 5000. If you forgot how to set the property review the previous instructions.
You have completed this lab Streaming Queue for MQ on CP4I. Now continue to lab 2b and you will do the MQ Source connector lab to move MQ messages to kafka topics.