Join CSV Files
Updated: Dec 23, 2023
This example shows how to merge data Venn diagram style from two CSV files using SQL-style join operations. It facilitates the integration and enrichment of datasets by applying join operations on columns shared between the input CSV files.
DataPipeline supports various types of joins, including INNER, LEFT, RIGHT, and FULL joins, providing flexibility in combining data based on specified conditions. Developers can easily configure join parameters and customize the merging process according to their specific use cases.
DataConverter provides an easy-to-use tool to join CSV files online.
Input CSV Files
user_account.csv
Id,AccountNo,LastName,FirstName,DoB,Country 1,101,Reeves,Keanu,09-02-1964,Canada 2,102,Butler,Gerard,11-13-1969,Scotland 3,103,Hewitt,Jennifer,02-21-1979,United States 4,104,Pinkett-Smith,Jada,09-18-1971,Untied States 5,105,Murray,Bill,09-21-1950,United States
credit-balance-insert-records2.csv
Account,Balance,CreditLimit,Rating 101,9315.45,10000,A 102,90,1000,B 103,0,17000,B 104,49654.87,100000,A 105,789.65,5000,C
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.jdbc; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.SchemaDef; import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer; import com.northconcepts.datapipeline.foundations.tools.GenerateEntityFromDataset; import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.jdbc.JdbcWriter; import com.northconcepts.datapipeline.jdbc.sql.select.Select; import com.northconcepts.datapipeline.job.DataReaderFactory; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.sql.mysql.CreateMySqlDdlFromSchemaDef; import com.northconcepts.datapipeline.transform.TransformingReader; import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; public class JoinCsvFiles { private static final File FILE1 = new File("example/data/input/user_account.csv"); private static final File FILE2 = new File("example/data/input/credit-balance-insert-records2.csv"); private static final String TABLE1 = "Account"; private static final String TABLE2 = "CreditBalance"; public static final String DATABASE_FILE = new File("example/data/output/JoinCSVFiles.h2").getAbsolutePath(); public static void main(String[] args) throws Throwable { DataReaderFactory dataReaderFactory1 = () -> new CSVReader(FILE1).setFieldNamesInFirstRow(true); DataReaderFactory dataReaderFactory2 = () -> new CSVReader(FILE2).setFieldNamesInFirstRow(true); GenerateEntityFromDataset entityGenerator = new GenerateEntityFromDataset(); EntityDef entityDef1 = entityGenerator.generateEntity(dataReaderFactory1.createDataReader()).setName(TABLE1); EntityDef entityDef2 = entityGenerator.generateEntity(dataReaderFactory2.createDataReader()).setName(TABLE2); SchemaDef schemaDef = new SchemaDef().addEntity(entityDef1).addEntity(entityDef2); JdbcConnectionFactory connectionFactory = JdbcConnectionFactory.wrap("org.h2.Driver", "jdbc:h2:file:" + DATABASE_FILE + ";MODE=MySQL", "sa", ""); createTables(schemaDef, connectionFactory); Job job1 = importFileToDatabase(dataReaderFactory1, entityDef1, connectionFactory, TABLE1); Job job2 = importFileToDatabase(dataReaderFactory2, entityDef2, connectionFactory, TABLE2); job1.waitUntilFinished(); job2.waitUntilFinished(); Select select = new Select("CreditBalance") .leftJoin("Account", "CreditBalance.Account=Account.AccountNo") ; DataReader reader = new JdbcReader(connectionFactory, select.getSqlFragment()); DataWriter writer = new CSVWriter(new File("example/data/output/joined-csv.csv")); Job.run(reader, writer); } private static Job importFileToDatabase(DataReaderFactory dataReaderFactory, EntityDef entityDef, JdbcConnectionFactory connectionFactory, String tableName) { DataReader reader = dataReaderFactory.createDataReader(); reader = new TransformingReader(reader).add(new SchemaTransformer(entityDef)); return Job.runAsync(reader, new JdbcWriter(connectionFactory, tableName)); } private static void createTables(SchemaDef schemaDef, JdbcConnectionFactory jdbcConnectionFactory) throws Throwable { CreateMySqlDdlFromSchemaDef ddl = new CreateMySqlDdlFromSchemaDef(schemaDef) .setPretty(true) .setDropTable(true) .setCheckIfTableNotExists(false) ; String sql = ddl.getSqlFragment(); System.out.println(sql); System.out.println("--------------------------"); try (Connection connection = jdbcConnectionFactory.createConnection()) { try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { preparedStatement.execute(); } } } }
Code Walkthrough
- DataReaderFactory implements the Factory pattern to generate CsvReaders.
- A GenerateEntityFromDataset is instantiated and used to create EntityDefs via its
generateEntity()
method. - A SchemaDef is instantiated and EntityDefs
entityDef1
andentityDef2
are added to it. - JdbcConnectionFactory is instantiated which takes in the database connection details as parameters.
- We pass a SchemaDef and a JdbcConnectionFactory to the
createTables()
method. In this method, the following steps are done:
- CreateMySqlDdlFromSchemaDef is instantiated where a SchemaDef is passed as a parameter.
- An
sql
is generated using thegetSqlFragment()
method of CreateMySqlDdlFromSchemaDef. - A Connection is instantiated using the
createConnection()
method of JdbcConnectionFactory. - A PreparedStatement is instantiated using the
prepareStatement()
method of Connection where we pass the generatedsql
as parameter. - Finally, we call the
execute()
method of the PreparedStatement.
- The
importFileToDatabase()
method takes in a DataReaderFactory, EntityDef, JdbcConnectionFactory, and table name as parameter is called. This method does the following:
- An instance of CSVReader is created by calling the
createDataReader()
method of JdbcConnectionFactory. - We pass the CSVReader as a parameter to instantiate a TransformingReader .
- A SchemaTransformer is instantiated which takes in an EntityDef and is added to the TransformingReader.
- We are using a JdbcWriter which takes in a JdbcConnectionFactory and the table name as parameters to write to our database.
- Data is read and written to the database asynchronously by using the
runAsync()
method of the Job class. - a reference of the Job instance is returned.
- An instance of CSVReader is created by calling the
- We use the
waitUntilFinished()
from the Job class to make sure that all data from both files have been written to the database. - A Select instance is instantiated that takes in the
CreditBalance
table name as a parameter. We use itsleftJoin()
method and pass theAccount
table name and theJoin
condition to combine rows from the 2 tables. - We pass a JdbcConnectionFactory and the query String generated from the
getSqlFragment()
of the Select class to to instantiate a JdbcReader. - Job.run() is then used to transfer data from the JdbcReader to CSVWriter.
Output File
ACCOUNT,BALANCE,CREDITLIMIT,RATING,ID,ACCOUNTNO,LASTNAME,FIRSTNAME,DOB,COUNTRY 101,9315.45,10000,A,1,101,Reeves,Keanu,1964-09-02,Canada 102,90.0,1000,B,2,102,Butler,Gerard,1969-11-13,Scotland 103,0.0,17000,B,3,103,Hewitt,Jennifer,1979-02-21,United States 104,49654.87,100000,A,4,104,Pinkett-Smith,Jada,1971-09-18,Untied States 105,789.65,5000,C,5,105,Murray,Bill,1950-09-21,United States