Read from JMS Topic
Updated: Aug 22, 2023
This example shows how to read data from Java Message Service (JMS) topics, enabling integration and communication between distributed systems. This can be used in event-driven microservices, real-time analytics, and communication between loosely coupled components, where JMS topics serve as a reliable and scalable means of transmitting data.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.util.List;
import java.util.Properties;
import javax.naming.Context;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.jms.JmsDestinationType;
import com.northconcepts.datapipeline.jms.JmsReader;
import com.northconcepts.datapipeline.jms.JmsSettings;
import com.northconcepts.datapipeline.jms.JmsWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.memory.MemoryWriter;
public class ReadFromJmsTopic {
// run activemq before running the code below
public static void main(String[] args) throws Exception {
Record record = new Record();
record.setField("Team", "Manchester United");
record.setField("Points", 90);
RecordList recordList = new RecordList(record);
MemoryReader memoryReader = new MemoryReader(recordList);
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
DataReader reader = new JmsReader(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props));
MemoryWriter memoryWriter = new MemoryWriter();
Job asyncJob = new Job(reader, memoryWriter);
asyncJob.runAsync();
Thread.sleep(5000); // allow JmsReader time to be registered as subscriber
DataWriter writer = new JmsWriter(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props));
Job.run(memoryReader, writer);
Thread.sleep(5000); // allow JmsReader time to read
asyncJob.cancel();
List records = memoryWriter.getRecordList().getRecords();
System.out.println("List of Records: " + records);
System.exit(0);
}
/* output
List of Records: [Record (MODIFIED) {
0:[Points]:INT=[90]:Integer
1:[Team]:STRING=[Manchester United]:String
2:[jms_message_id]:STRING=[ID:asus_k43s-50726-1483620205667-3:1:1:1:1]:String
3:[jms_timestamp]:LONG=[1483620210454]:Long
4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
5:[jms_correlation_id]:UNDEFINED=[null]
6:[jms_reply_to]:UNDEFINED=[null]
7:[jms_destination]:STRING=[topic://Premier League]:String
8:[jms_delivery_mode]:INT=[2]:Integer
9:[jms_redelivered]:BOOLEAN=[false]:Boolean
10:[jms_type]:UNDEFINED=[null]
11:[jms_expiration]:LONG=[0]:Long
12:[jms_priority]:INT=[4]:Integer
}
]
*/
}
Code Walkthrough
- Record is created to persist data in a key-value field.
setField()method is used to add new fields with the specified field name and value. For this example, you are going to write a Record with two fieldsTeamsandPointsand value can be added to each field asrecord.setField("Points", 90);.- RecordList is created to store a list of
Recordobjects inMemoryReader. - MemoryReader is created to obtain records from an in-memory
RecordList. - Properties is created to specify the initial context and provider URL of the JMS provider(i.e. ActiveMQ in this example).
JmsReaderis created to read records from the specified JMS provider.- JmsSettings specifies the configurations used to connect to a
JmsReader. - JmsDestinationType is created to specify a constant for the type of messaging to use. For this example
JmsDestinationType.TOPICis specified to allowJmsReaderto use publish/subscribe messaging. - Data are transferred from
JmsReadertoMemoryWritervia Job.runAsync() method. - The working thread sleeps for some time to allow JmsReader time to be registered as a subscriber.
JmsWriteris created to write records to the specified JMS provider.- Data are transferred from
MemoryReadertoJmsWritervia Job.run() method. See how to compile and run data pipeline jobs. - The working thread sleeps for some time to allow JmsReader time to read.
- Records are obtained from
MemoryWriterand printed on the console.
Console Output
List of Records: [Record (MODIFIED) {
0:[Points]:INT=[90]:Integer
1:[Team]:STRING=[Manchester United]:String
2:[jms_message_id]:STRING=[ID:asus_k43s-50726-1483620205667-3:1:1:1:1]:String
3:[jms_timestamp]:LONG=[1483620210454]:Long
4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
5:[jms_correlation_id]:UNDEFINED=[null]
6:[jms_reply_to]:UNDEFINED=[null]
7:[jms_destination]:STRING=[topic://Premier League]:String
8:[jms_delivery_mode]:INT=[2]:Integer
9:[jms_redelivered]:BOOLEAN=[false]:Boolean
10:[jms_type]:UNDEFINED=[null]
11:[jms_expiration]:LONG=[0]:Long
12:[jms_priority]:INT=[4]:Integer
}
]
