Read from JMS Queue
Updated: Jun 16, 2023
In this example, you are going to learn how to read messages from a Message Queue (MQ) using Java Messaging Service (JMS).
Message Queues are used to asynchronously transfer messages across distributed environments.
Here is a demo code showing the implementation.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.util.Properties;
import javax.naming.Context;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.jms.JmsDestinationType;
import com.northconcepts.datapipeline.jms.JmsReader;
import com.northconcepts.datapipeline.jms.JmsSettings;
import com.northconcepts.datapipeline.job.Job;
public class ReadFromJmsQueue {
// run activemq before running the code below
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
DataReader reader = new JmsReader(new JmsSettings("premierLeague", JmsDestinationType.QUEUE, props))
.setReceiveTimeout(4000L);
Job.run(reader, new StreamWriter(System.out));
}
/* output
-----------------------------------------------
0 - Record (MODIFIED) {
0:[Points]:INT=[90]:Integer
1:[Team]:STRING=[Manchester United]:String
2:[jms_message_id]:STRING=[ID:asus_k43s-50319-1483615879678-1:1:1:1:1]:String
3:[jms_timestamp]:LONG=[1483615880082]:Long
4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
5:[jms_correlation_id]:UNDEFINED=[null]
6:[jms_reply_to]:UNDEFINED=[null]
7:[jms_destination]:STRING=[queue://premierLeague]:String
8:[jms_delivery_mode]:INT=[2]:Integer
9:[jms_redelivered]:BOOLEAN=[false]:Boolean
10:[jms_type]:UNDEFINED=[null]
11:[jms_expiration]:LONG=[0]:Long
12:[jms_priority]:INT=[4]:Integer
}
-----------------------------------------------
1 records
*/
}
Code Walkthrough
- A
Propertiesobject is initialized with properties such as the context factory for the MQ provider and the URL. - A
DataReaderis used to read from the queue"premierLeague"with the properties set above. setReceiveTimeout(4000L)method is invoked so a message is fetched every 4 seconds.- A Job is run with this
DataReaderand aStreamWriterwhich displays the output on the console.
Console Output
-----------------------------------------------
0 - Record (MODIFIED) {
0:[Points]:INT=[90]:Integer
1:[Team]:STRING=[Manchester United]:String
2:[jms_message_id]:STRING=[ID:asus_k43s-50319-1483615879678-1:1:1:1:1]:String
3:[jms_timestamp]:LONG=[1483615880082]:Long
4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
5:[jms_correlation_id]:UNDEFINED=[null]
6:[jms_reply_to]:UNDEFINED=[null]
7:[jms_destination]:STRING=[queue://premierLeague]:String
8:[jms_delivery_mode]:INT=[2]:Integer
9:[jms_redelivered]:BOOLEAN=[false]:Boolean
10:[jms_type]:UNDEFINED=[null]
11:[jms_expiration]:LONG=[0]:Long
12:[jms_priority]:INT=[4]:Integer
}
-----------------------------------------------
1 records
