Write to JMS Topic
This example shows you how to write records to JMS( Java Message Service) using Data Pipeline's JmsWriter.
JMS is a message-oriented middleware (MOM) which supports both messaging models: point-to-point (queuing) and publish-subscribe. It defines a common enterprise messaging API that is designed to be easily and efficiently supported by a wide range of enterprise messaging products.
Java code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.util.Properties; import javax.naming.Context; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.jms.JmsDestinationType; import com.northconcepts.datapipeline.jms.JmsSettings; import com.northconcepts.datapipeline.jms.JmsWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; public class WriteToJmsTopic { // run activemq before running the code below public static void main(String[] args) { Record record = new Record(); record.setField("Team", "Manchester United"); record.setField("Points", 90); RecordList recordList = new RecordList(record); MemoryReader memoryReader = new MemoryReader(recordList); Properties props = new Properties(); props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory"); props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616"); DataWriter writer = new JmsWriter(new JmsSettings("Premier League", JmsDestinationType.TOPIC, props)); Job.run(memoryReader, writer); } }
Code Walkthrough
- Record is created to persist data in a key-value field.
setField()
method is used to add new fields with the specified field name and value. For this example you are going to write a Record with two fieldsTeams
andPoints
and value can be added to each field asrecord.setField("Points", 90);
.- RecordList is created to store a list of
Record
objects inMemoryReader
. - MemoryReader is created to obtain records from an in-memory
RecordList
. - Properties is created to specify the initial context and provider URL of the JMS provider(i.e ActiveMQ in this example).
JmsWriter
is created to write records to the specified JMS provider.- JmsSettings specifies the configurations used to connect to a
JmsWriter
. - JmsDestinationType is created to specify a constant for the type of messaging to use. For this example
JmsDestinationType.TOPIC
is specified to allowJmsWriter
to use publish/subscribe messaging. - Data are transferred from
MemoryReader
toJmsWriter
via Job.run() method. See how to compile and run data pipeline jobs.
JmsWriter
Writes records to a Java Message Service (JMS) provider. It extends DataWriter and it's constructor takes JmsSettings object as a parameter.
JmsSettings
Specifies the configuration used to connect to a JmsReader or JmsWriter. It extends DataObject class and it's constructor takes instance name, instance of JmsDestinationType
and an optional instance of Properties
as a parameter.
JmsDestinationType
An enum class which contains constants for the type of messaging to use. Data Pipeline supports two kinds of destination QUEUE
(which is used in point-to-point messaging) and TOPIC
(which is used in publish/subscribe messaging.).
Record
Record class holds persistent data in key-value fields as it flows through the pipeline. A method setField()
in this class creates a new field as key-value pair by taking field name and a value as a parameter. If you don't' want to add values to fields upon creation, you can create an empty field with filed name and type using setFieldNull()
method.
RecordList
As the name suggests it is used to store a list of Record objects in memory. It implements Iterable so you can perform operations similar to Java Collection to this object.
MemoryReader
Obtains records from an in-memory RecordList. It extends DataReader class and can be created with RecordList
object(optional).