Write to JMS Topic

Updated: Jun 8, 2023

This example shows you how to write records to JMS( Java Message Service) using Data Pipeline's JmsWriter.

JMS is a message-oriented middleware (MOM) which supports both messaging models: point-to-point (queuing) and publish-subscribe. It defines a common enterprise messaging API that is designed to be easily and efficiently supported by a wide range of enterprise messaging products.

Java code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.util.Properties;

import javax.naming.Context;

import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.jms.JmsDestinationType;
import com.northconcepts.datapipeline.jms.JmsSettings;
import com.northconcepts.datapipeline.jms.JmsWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;

public class WriteToJmsTopic {
    
    // run activemq before running the code below
    public static void main(String[] args) {
        Record record = new Record();
        record.setField("Team", "Manchester United");
        record.setField("Points", 90);
        
        RecordList recordList = new RecordList(record);
        MemoryReader memoryReader = new MemoryReader(recordList);
        
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        
        DataWriter writer = new JmsWriter(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props));
        Job.run(memoryReader, writer);
    }

}

Code Walkthrough

  1. Record is created to persist data in a key-value field.
  2. setField() method is used to add new fields with the specified field name and value. For this example you are going to write a Record with two fields Teams and Points and value can be added to each field as record.setField("Points", 90);.
  3. RecordList is created to store a list of Record objects in MemoryReader.
  4. MemoryReader is created to obtain records from an in-memory RecordList.
  5. Properties is created to specify the initial context and provider URL of the JMS provider(i.e ActiveMQ in this example).
  6. JmsWriter is created to write records to the specified JMS provider.
  7. JmsSettings specifies the configurations used to connect to a JmsWriter.
  8. JmsDestinationType is created to specify a constant for the type of messaging to use. For this example JmsDestinationType.TOPIC is specified to allow JmsWriter to use publish/subscribe messaging.
  9. Data are transferred from MemoryReader to JmsWriter via Job.run() method. See how to compile and run data pipeline jobs.

JmsWriter

Writes records to a Java Message Service (JMS) provider. It extends DataWriter and it's constructor takes JmsSettings object as a parameter.

JmsSettings

Specifies the configuration used to connect to a JmsReader or JmsWriter. It extends DataObject class and it's constructor takes instance name, instance of JmsDestinationType and an optional instance of Properties as a parameter.

JmsDestinationType

An enum class which contains constants for the type of messaging to use. Data Pipeline supports two kinds of destination QUEUE (which is used in point-to-point messaging) and TOPIC (which is used in publish/subscribe messaging.).

Record

Record class holds persistent data in key-value fields as it flows through the pipeline. A method setField() in this class creates a new field as key-value pair by taking field name and a value as a parameter. If you don't' want to add values to fields upon creation, you can create an empty field with filed name and type using setFieldNull() method.

RecordList

As the name suggests it is used to store a list of Record objects in memory. It implements Iterable so you can perform operations similar to Java Collection to this object.

MemoryReader

Obtains records from an in-memory RecordList. It extends DataReader class and can be created with RecordList object(optional).

Mobile Analytics