Read and Write Kafka Events
Updated: Nov 21, 2023
This example shows how to read and write Kafka events.
Java Code Listing
package com.northconcepts.datapipeline.examples.kafka; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.kafka.KafkaReader; import com.northconcepts.datapipeline.kafka.KafkaWriter; import java.io.File; import java.util.Properties; public class ReadAndWriteKafkaEvents { private static String BOOTSTRAP_SERVERS = "localhost:9092,another.host:port"; private static String GROUP_ID = "group_id"; private static String TOPIC = "jewelry"; public static void main(String[] args) { DataReader reader = new CSVReader(new File("data/input/jewelry.csv")) .setAllowMultiLineText(true) .setFieldNamesInFirstRow(true); Properties writerProps = new Properties(); writerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS); DataWriter writer = new KafkaWriter(writerProps, TOPIC); Job.runAsync(reader, writer); Properties readerProps = new Properties(); readerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS); readerProps.put("group.id", GROUP_ID); reader = new KafkaReader(readerProps, TOPIC, 5000L).setKeepPolling(false); writer = new CSVWriter(new File("data/output/jewelry-events.csv")); Job.run(reader, writer); } }
Code Walkthrough
- An instance of CSVReader that reads data from the file
jewelry.csv
is created. - Writer
Properties
is instantiated and the configuration forbootstrap.servers
added. - A KafkaWriter instance is created which takes the writer
Properties
andTOPIC
as arguments. - Job.runAsync() is then used to transfer data from the
reader
to KafkaWriter asynchronously. - Reader
Properties
is instantiated and the configuration forbootstrap.servers
andgroup.id
added. - A KafkaReader instance is created which takes the reader
Properties
,TOPIC
and poll timeout as arguments. .setKeepPolling(false)
is used to specify to only poll for records once.- Job.run() is then used to transfer data from the
reader
to CSVWriter.