Read and Write Kafka Events
Updated: Nov 21, 2023
This example shows how to read and write Kafka events.
Java Code Listing
package com.northconcepts.datapipeline.examples.kafka;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.kafka.KafkaReader;
import com.northconcepts.datapipeline.kafka.KafkaWriter;
import java.io.File;
import java.util.Properties;
public class ReadAndWriteKafkaEvents {
private static String BOOTSTRAP_SERVERS = "localhost:9092,another.host:port";
private static String GROUP_ID = "group_id";
private static String TOPIC = "jewelry";
public static void main(String[] args) {
DataReader reader = new CSVReader(new File("data/input/jewelry.csv"))
.setAllowMultiLineText(true)
.setFieldNamesInFirstRow(true);
Properties writerProps = new Properties();
writerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
DataWriter writer = new KafkaWriter(writerProps, TOPIC);
Job.runAsync(reader, writer);
Properties readerProps = new Properties();
readerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
readerProps.put("group.id", GROUP_ID);
reader = new KafkaReader(readerProps, TOPIC, 5000L).setKeepPolling(false);
writer = new CSVWriter(new File("data/output/jewelry-events.csv"));
Job.run(reader, writer);
}
}
Code Walkthrough
- An instance of CSVReader that reads data from the file
jewelry.csvis created. - Writer
Propertiesis instantiated and the configuration forbootstrap.serversadded. - A KafkaWriter instance is created which takes the writer
PropertiesandTOPICas arguments. - Job.runAsync() is then used to transfer data from the
readerto KafkaWriter asynchronously. - Reader
Propertiesis instantiated and the configuration forbootstrap.serversandgroup.idadded. - A KafkaReader instance is created which takes the reader
Properties,TOPICand poll timeout as arguments. .setKeepPolling(false)is used to specify to only poll for records once.- Job.run() is then used to transfer data from the
readerto CSVWriter.
