Read and Write Kafka Events

Updated: Nov 21, 2023

This example shows how to read and write Kafka events.

Java Code Listing

package com.northconcepts.datapipeline.examples.kafka;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.kafka.KafkaReader;
import com.northconcepts.datapipeline.kafka.KafkaWriter;

import java.io.File;
import java.util.Properties;

public class ReadAndWriteKafkaEvents {

    private static String BOOTSTRAP_SERVERS = "localhost:9092,another.host:port";
    private static String GROUP_ID = "group_id";
    private static String TOPIC = "jewelry";

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("data/input/jewelry.csv"))
            .setAllowMultiLineText(true)
            .setFieldNamesInFirstRow(true);

        Properties writerProps = new Properties();
        writerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        DataWriter writer = new KafkaWriter(writerProps, TOPIC);

        Job.runAsync(reader, writer);

        Properties readerProps = new Properties();
        readerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        readerProps.put("group.id", GROUP_ID);

        reader = new KafkaReader(readerProps, TOPIC, 5000L).setKeepPolling(false);
        writer = new CSVWriter(new File("data/output/jewelry-events.csv"));

        Job.run(reader, writer);
    }
}


Code Walkthrough

  1.  An instance of CSVReader that reads data from the file jewelry.csv is created.
  2. Writer Properties is instantiated and the configuration for bootstrap.servers added.
  3. A KafkaWriter instance is created which takes the writer Properties and TOPIC as arguments.
  4. Job.runAsync() is then used to transfer data from the reader to KafkaWriter asynchronously.
  5. Reader Properties is instantiated and the configuration for bootstrap.servers and group.id added.
  6. A KafkaReader instance is created which takes the reader PropertiesTOPIC and poll timeout as arguments.
  7. .setKeepPolling(false) is used to specify to only poll for records once.
  8. Job.run() is then used to transfer data from the reader to CSVWriter.

 

 

Mobile Analytics