Upsert Records to a Database Using Insert and Update
This example will show you how you can use DataPipline to upsert records to a database Using Insert and Update.
To accomplish this both JdbcUpsertWriter and GenericUpsert will be used.
This example can be easily modified to Upsert Records to MySql or MariaDB or Upsert Records to PostgreSql.
Input CSV file
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A 312,Butler,Gerard,90.00,1000.00,2003-8-6,B 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B 312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A 317,Murray,Bill,789.65,5000.00,2007-2-5,C 317,Murray,Bill,1,5000.00,2007-2-5,A
Java code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import java.io.OutputStreamWriter; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import com.northconcepts.datapipeline.core.DataException; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.jdbc.JdbcUpsertWriter; import com.northconcepts.datapipeline.jdbc.upsert.GenericUpsert; import com.northconcepts.datapipeline.job.Job; public class GenericUpsertRecords { public static void main(String[] args) { final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver"; final String DATABASE_URL = "jdbc:hsqldb:mem:aname"; final String DATABASE_USERNAME = "sa"; final String DATABASE_PASSWORD = ""; final String DATABASE_TABLE = "CreditBalance"; final String PRIMARY_KEY = "Account"; Connection connection; try { Class.forName(DATABASE_DRIVER); connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD); createTable(connection); } catch (Throwable e) { throw DataException.wrap(e); } DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true); DataWriter writer = new JdbcUpsertWriter(connection, DATABASE_TABLE, new GenericUpsert().setInsertFirst(true), PRIMARY_KEY); Job.run(reader, writer); reader = new JdbcReader(connection, "Select * from CreditBalance;"); writer = new CSVWriter(new OutputStreamWriter(System.out)); Job.run(reader, writer); } public static void createTable(Connection connection) throws Throwable{ PreparedStatement preparedStatement; String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;"; String createTableQuery = "CREATE TABLE CreditBalance (" + "Account INTEGER, " + "LastName VARCHAR(256), " + "FirstName VARCHAR(256), " + "Balance DOUBLE, " + "CreditLimit DOUBLE, " + "AccountCreated DATE, " + "Rating CHAR, " + "PRIMARY KEY (Account));"; preparedStatement = connection.prepareStatement(dropTableQuery); preparedStatement.execute(); preparedStatement = connection.prepareStatement(createTableQuery); preparedStatement.execute(); } }
Code Walkthrough
- First you need to feed your database details.
- DriverManager is used to create a connection to the database.
createTable(connection)
creates a tableCreditBalance
.- A CSVReader is created using the file path of the input file
credit-balance-02.csv
. - The
CSVReader.setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - JdbcUpsertWriter will write the records to the database table. For this example
JdbcUpsertWriter
takes four arguments i.e. Connection(connection
), Table Name(CreditBalance
), strategy(GenericUpsert()
) and keyFieldNames(Account
). - Data is transferred from the
reader
towriter
via Job.run() method. - JdbcReader is used to obtain the records from the database using the query
Select * from CreditBalance
. - CSVWriter is used to write the CSV record to the output console
- Job.run(reader, writer) is used to transfer the data from the reader to the writer.
JdbcUpsertWriter
Writes records to a database table using the upsert idiom (attempt to insert, but update if the duplicate key exists). The default strategy used is GenericUpsert, however, several others are provided: MergeUpsert, VariableFieldsUpsert, and MySqlUpsert.
CSVReader
CSVReader is an input reader which can be used to read CSV files.
It is a sub-class of TextReader and inherits the
open and
close among other methods.
The CSVReader.setFieldNamesInFirstRow(true)
method causes the CSVReader to use the names specified in the first row of the
input data as field names. If this method is not invoked, the fields would be named as A1, A2, etc. similar to MS Excel. If those fields names need to be changed, a rename
transformation can be added on top of CSVReader or any other type
(Refer Rename a field for example).
HSQLDB (HyperSQL Database)
HSQLDB is an open-source project written in Java, it represents a relational database. It follows the SQL and JDBC standards and supports SQL features such as stored procedures and triggers. It can be used in the in-memory mode, or it can be configured to use disk storage.
GenericUpsert
An upsert strategy that attempts to either: - "insert" and then "update" if SQLIntegrityConstraintViolationException is thrown or - update and then insert if no records were affected. Sample Generated SQL: Insert SQL: INSERT INTO tableName (col1, col2, col3) VALUES (?, ?, ?) Update SQL: UPDATE tableName SET col1=?, col2=?, col3=? WHERE ID=?
Output
The output will be written to both the console and the CreditBalance
table.
Console Output
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.0,1998-01-17,A 312,Butler,Gerard,90.0,1000.0,2003-08-06,B 101,Hewitt,Jennifer Love,0.0,17000.0,1985-05-25,B 312,Pinkett-Smith,Jada,49654.87,100000.0,2006-12-05,A 317,Murray,Bill,789.65,5000.0,2007-02-05,C 317,Murray,Bill,1.0,5000.0,2007-02-05,A