Read from JMS Topic

Updated: Aug 22, 2023

This example shows how to read data from Java Message Service (JMS) topics, enabling integration and communication between distributed systems. This can be used in event-driven microservices, real-time analytics, and communication between loosely coupled components, where JMS topics serve as a reliable and scalable means of transmitting data.


Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.util.List;
import java.util.Properties;

import javax.naming.Context;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.jms.JmsDestinationType;
import com.northconcepts.datapipeline.jms.JmsReader;
import com.northconcepts.datapipeline.jms.JmsSettings;
import com.northconcepts.datapipeline.jms.JmsWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.memory.MemoryWriter;

public class ReadFromJmsTopic {
    
    // run activemq before running the code below
    public static void main(String[] args) throws Exception {
        Record record = new Record();
        record.setField("Team", "Manchester United");
        record.setField("Points", 90);
        
        RecordList recordList = new RecordList(record);
        MemoryReader memoryReader = new MemoryReader(recordList);
        
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        
        DataReader reader = new JmsReader(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props));
        
        MemoryWriter memoryWriter = new MemoryWriter();
        Job asyncJob = new Job(reader, memoryWriter);
        asyncJob.runAsync();
        
        Thread.sleep(5000); // allow JmsReader time to be registered as subscriber
        
        DataWriter writer = new JmsWriter(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props));
        Job.run(memoryReader, writer);
        
        Thread.sleep(5000); // allow JmsReader time to read
        
        asyncJob.cancel();
        
        List records = memoryWriter.getRecordList().getRecords();
        System.out.println("List of Records: " + records);
        
        System.exit(0);
    }
/* output
List of Records: [Record (MODIFIED) {
    0:[Points]:INT=[90]:Integer
    1:[Team]:STRING=[Manchester United]:String
    2:[jms_message_id]:STRING=[ID:asus_k43s-50726-1483620205667-3:1:1:1:1]:String
    3:[jms_timestamp]:LONG=[1483620210454]:Long
    4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
    5:[jms_correlation_id]:UNDEFINED=[null]
    6:[jms_reply_to]:UNDEFINED=[null]
    7:[jms_destination]:STRING=[topic://Premier League]:String
    8:[jms_delivery_mode]:INT=[2]:Integer
    9:[jms_redelivered]:BOOLEAN=[false]:Boolean
    10:[jms_type]:UNDEFINED=[null]
    11:[jms_expiration]:LONG=[0]:Long
    12:[jms_priority]:INT=[4]:Integer
}
]
*/
}

 

Code Walkthrough

  1. Record is created to persist data in a key-value field.
  2. setField() method is used to add new fields with the specified field name and value. For this example, you are going to write a Record with two fields Teams and Points and value can be added to each field as record.setField("Points", 90);.
  3. RecordList is created to store a list of Record objects in MemoryReader.
  4. MemoryReader is created to obtain records from an in-memory RecordList.
  5. Properties is created to specify the initial context and provider URL of the JMS provider(i.e. ActiveMQ in this example).
  6. JmsReader is created to read records from the specified JMS provider.
  7. JmsSettings specifies the configurations used to connect to a JmsReader.
  8. JmsDestinationType is created to specify a constant for the type of messaging to use. For this example JmsDestinationType.TOPIC is specified to allow JmsReader to use publish/subscribe messaging.
  9. Data are transferred from JmsReader to MemoryWriter via Job.runAsync() method.
  10. The working thread sleeps for some time to allow JmsReader time to be registered as a subscriber.
  11. JmsWriter is created to write records to the specified JMS provider.
  12. Data are transferred from MemoryReader to JmsWriter via Job.run() method. See how to compile and run data pipeline jobs.
  13. The working thread sleeps for some time to allow JmsReader time to read.
  14. Records are obtained from MemoryWriter and printed on the console.

 

Console Output

List of Records: [Record (MODIFIED) {
    0:[Points]:INT=[90]:Integer
    1:[Team]:STRING=[Manchester United]:String
    2:[jms_message_id]:STRING=[ID:asus_k43s-50726-1483620205667-3:1:1:1:1]:String
    3:[jms_timestamp]:LONG=[1483620210454]:Long
    4:[jms_correlation_id_as_bytes]:UNDEFINED=[null]
    5:[jms_correlation_id]:UNDEFINED=[null]
    6:[jms_reply_to]:UNDEFINED=[null]
    7:[jms_destination]:STRING=[topic://Premier League]:String
    8:[jms_delivery_mode]:INT=[2]:Integer
    9:[jms_redelivered]:BOOLEAN=[false]:Boolean
    10:[jms_type]:UNDEFINED=[null]
    11:[jms_expiration]:LONG=[0]:Long
    12:[jms_priority]:INT=[4]:Integer
}
]
Mobile Analytics