Join CSV Files

Updated: Dec 23, 2023
CSV

This example shows how to merge data Venn diagram style from two CSV files using SQL-style join operations. It facilitates the integration and enrichment of datasets by applying join operations on columns shared between the input CSV files.

DataPipeline supports various types of joins, including INNER, LEFT, RIGHT, and FULL joins, providing flexibility in combining data based on specified conditions. Developers can easily configure join parameters and customize the merging process according to their specific use cases.

DataConverter provides an easy-to-use tool to join CSV files online.

 

Input CSV Files

user_account.csv

Id,AccountNo,LastName,FirstName,DoB,Country
1,101,Reeves,Keanu,09-02-1964,Canada
2,102,Butler,Gerard,11-13-1969,Scotland
3,103,Hewitt,Jennifer,02-21-1979,United States
4,104,Pinkett-Smith,Jada,09-18-1971,Untied States
5,105,Murray,Bill,09-21-1950,United States

 

credit-balance-insert-records2.csv

Account,Balance,CreditLimit,Rating
101,9315.45,10000,A
102,90,1000,B
103,0,17000,B
104,49654.87,100000,A
105,789.65,5000,C

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.jdbc;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer;
import com.northconcepts.datapipeline.foundations.tools.GenerateEntityFromDataset;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.jdbc.JdbcWriter;
import com.northconcepts.datapipeline.jdbc.sql.select.Select;
import com.northconcepts.datapipeline.job.DataReaderFactory;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.sql.mysql.CreateMySqlDdlFromSchemaDef;
import com.northconcepts.datapipeline.transform.TransformingReader;

import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;

public class JoinCsvFiles {

    private static final File FILE1 = new File("example/data/input/user_account.csv");
    private static final File FILE2 = new File("example/data/input/credit-balance-insert-records2.csv");
    
    private static final String TABLE1 = "Account";
    private static final String TABLE2 = "CreditBalance";
    
    public static final String DATABASE_FILE = new File("example/data/output/JoinCSVFiles.h2").getAbsolutePath();

    
    public static void main(String[] args) throws Throwable {
        
        DataReaderFactory dataReaderFactory1 = () -> new CSVReader(FILE1).setFieldNamesInFirstRow(true);
        DataReaderFactory dataReaderFactory2 = () -> new CSVReader(FILE2).setFieldNamesInFirstRow(true);
        
        GenerateEntityFromDataset entityGenerator = new GenerateEntityFromDataset();
        EntityDef entityDef1 = entityGenerator.generateEntity(dataReaderFactory1.createDataReader()).setName(TABLE1);
        EntityDef entityDef2 = entityGenerator.generateEntity(dataReaderFactory2.createDataReader()).setName(TABLE2);
        SchemaDef schemaDef = new SchemaDef().addEntity(entityDef1).addEntity(entityDef2);
        

        JdbcConnectionFactory connectionFactory = JdbcConnectionFactory.wrap("org.h2.Driver", "jdbc:h2:file:" + DATABASE_FILE + ";MODE=MySQL", "sa", "");
        
        createTables(schemaDef, connectionFactory);

        Job job1 = importFileToDatabase(dataReaderFactory1, entityDef1, connectionFactory, TABLE1);
        Job job2 = importFileToDatabase(dataReaderFactory2, entityDef2, connectionFactory, TABLE2);

        job1.waitUntilFinished();
        job2.waitUntilFinished();

        Select select = new Select("CreditBalance")
            .leftJoin("Account", "CreditBalance.Account=Account.AccountNo")
            ;

        DataReader reader = new JdbcReader(connectionFactory, select.getSqlFragment());
        DataWriter writer = new CSVWriter(new File("example/data/output/joined-csv.csv"));
        Job.run(reader, writer);
    }

    private static Job importFileToDatabase(DataReaderFactory dataReaderFactory, EntityDef entityDef, JdbcConnectionFactory connectionFactory, String tableName) {
        DataReader reader = dataReaderFactory.createDataReader();
        reader = new TransformingReader(reader).add(new SchemaTransformer(entityDef));
        return Job.runAsync(reader, new JdbcWriter(connectionFactory, tableName));
    }

    private static void createTables(SchemaDef schemaDef, JdbcConnectionFactory jdbcConnectionFactory) throws Throwable {
        CreateMySqlDdlFromSchemaDef ddl = new CreateMySqlDdlFromSchemaDef(schemaDef)
            .setPretty(true)
            .setDropTable(true)
            .setCheckIfTableNotExists(false)
            ;

        String sql = ddl.getSqlFragment();
        
        System.out.println(sql);
        System.out.println("--------------------------");
        
        try (Connection connection = jdbcConnectionFactory.createConnection()) {
            try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
                preparedStatement.execute();
            }
        }
    }

}

 

Code Walkthrough

  1. DataReaderFactory implements the Factory pattern to generate CsvReaders.
  2. A GenerateEntityFromDataset is instantiated and used to create EntityDefs via its generateEntity() method.
  3. A SchemaDef is instantiated and EntityDefs entityDef1 and entityDef2 are added to it.
  4. JdbcConnectionFactory is instantiated which takes in the database connection details as parameters.
  5. We pass a  SchemaDef and a JdbcConnectionFactory to the createTables() method. In this method, the following steps are done:
    1. CreateMySqlDdlFromSchemaDef is instantiated where a SchemaDef is passed as a parameter.
    2. An sqlis generated using the getSqlFragment() method of CreateMySqlDdlFromSchemaDef.
    3. A Connection is instantiated using the createConnection() method of JdbcConnectionFactory.
    4. A PreparedStatement is instantiated using the prepareStatement() method of Connection where we pass the generated sql as parameter.
    5. Finally, we call the execute()method of the PreparedStatement.
  6. The importFileToDatabase()method takes in a DataReaderFactory, EntityDef, JdbcConnectionFactory, and table name as parameter is called. This method does the following:
    1. An instance of CSVReader is created by calling the createDataReader() method of JdbcConnectionFactory.
    2. We pass the CSVReader as a parameter to instantiate a TransformingReader .
    3. A SchemaTransformer is instantiated which takes in an EntityDef and is added to the TransformingReader.
    4. We are using a JdbcWriter which takes in a JdbcConnectionFactory and the table name as parameters to write to our database.
    5. Data is read and written to the database asynchronously by using the runAsync() method of the Job class.
    6. a reference of the Job instance is returned.
  7. We use the waitUntilFinished() from the Job class to make sure that all data from both files have been written to the database.
  8. A Select instance is instantiated that takes in the CreditBalance table name as a parameter. We use its leftJoin() method and pass the Account table name and the Join condition to combine rows from the 2 tables.
  9. We pass a JdbcConnectionFactory and the query String generated from the getSqlFragment()  of the Select class to to instantiate a JdbcReader.
  10. Job.run() is then used to transfer data from the JdbcReader to CSVWriter.

 

Output File

ACCOUNT,BALANCE,CREDITLIMIT,RATING,ID,ACCOUNTNO,LASTNAME,FIRSTNAME,DOB,COUNTRY
101,9315.45,10000,A,1,101,Reeves,Keanu,1964-09-02,Canada
102,90.0,1000,B,2,102,Butler,Gerard,1969-11-13,Scotland
103,0.0,17000,B,3,103,Hewitt,Jennifer,1979-02-21,United States
104,49654.87,100000,A,4,104,Pinkett-Smith,Jada,1971-09-18,Untied States
105,789.65,5000,C,5,105,Murray,Bill,1950-09-21,United States
Mobile Analytics