Read From A Collection

Data Pipeline is not limited to reading from files or databases; it can also treat in-memory Java collections as a data source. This is particularly useful when you have data that is already generated or present within your application and you want to process it through a pipeline without first writing it to a temporary file.

This example demonstrates how to use CollectionReader to read from a List of custom Java objects. It uses a lambda expression to define the logic for converting each object in the collection into a Data Pipeline Record.

Java Code

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.CollectionReader;

import java.util.ArrayList;
import java.util.List;

public class ReadFromACollection {

    public static class User {
        private final int id;
        private final String username;
        private final String email;
        private final boolean isActive;

        public User(int id, String username, String email, boolean isActive) {
            this.id = id;
            this.username = username;
            this.email = email;
            this.isActive = isActive;
        }

        public int getId() {
            return id;
        }

        public String getUsername() {
            return username;
        }

        public String getEmail() {
            return email;
        }

        public boolean isActive() {
            return isActive;
        }
    }

    public static void main(String[] args) {
        List users = new ArrayList<>();
        users.add(new User(1, "john.doe", "john.doe@example.com", true));
        users.add(new User(2, "jane.smith", "jane.smith@example.com", false));
        users.add(new User(3, "admin", "admin@example.com", true));

        DataReader reader = new CollectionReader<>(users, user -> {
            Record record = new Record();
            record.setField("user_id", user.getId());
            record.setField("username", user.getUsername());
            record.setField("email_address", user.getEmail());
            record.setField("active", user.isActive());
            return record;
        });

        DataWriter writer = new StreamWriter(System.out);

        Job.run(reader, writer);
    }
}

Code Walkthrough

  1. A simple Plain Old Java Object (POJO), User, is defined to model our data. An ArrayList of User objects is created and populated. This collection will serve as the source for our data pipeline.
  2. A CollectionReader is instantiated. It takes two arguments: the users collection and a lambda function.
  3. The lambda function user -> { ... } provides the logic to convert each User object into a Record. Inside the lambda, a new Record is created, and its fields (user_id, username, etc.) are populated from the properties of the User object.
  4. A StreamWriter is created to write the records to the console (System.out).
  5. Finally, Job.run(reader, writer) executes the pipeline, transferring data from the CollectionReader to the StreamWriter. The reader iterates through the list, converts each User object to a Record, and passes it to the writer, which then prints it to the console.

Console Output

-----------------------------------------------
0 - Record {
    0:[user_id]:INTEGER=[1]:Integer
    1:[username]:STRING=[john.doe]:String
    2:[email_address]:STRING=[john.doe@example.com]:String
    3:[active]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
1 - Record {
    0:[user_id]:INTEGER=[2]:Integer
    1:[username]:STRING=[jane.smith]:String
    2:[email_address]:STRING=[jane.smith@example.com]:String
    3:[active]:BOOLEAN=[false]:Boolean
}

-----------------------------------------------
2 - Record {
    0:[user_id]:INTEGER=[3]:Integer
    1:[username]:STRING=[admin]:String
    2:[email_address]:STRING=[admin@example.com]:String
    3:[active]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
3 records
Mobile Analytics