Write to a Database Using Merge Upsert Strategy with Batch
This example shows you how to upsert records as a batch to a database using Data Pipeline's JdbcUpsertWriter and MergeUpsert.
Upsert(a portmanteau of update and insert) is a database operation that will update an existing row if a specified value already exists in a table, and insert a new row if the specified value doesn't already exist. Data Pipeline provides several upsert strategies such as GenericUpsert, MergeUpsert, MySqlUpsert, PostgreSqlUpsert and others.
Batch upserts are a useful technique when inserting or updating large numbers of rows into a database at once. Rather than inserting or updating each row individually, we can send a batch to the database at once, avoiding lots of individual network message round-trips and other per-statement inefficiencies.
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import java.io.OutputStreamWriter; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import com.northconcepts.datapipeline.core.DataException; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.jdbc.JdbcUpsertWriter; import com.northconcepts.datapipeline.jdbc.upsert.MergeUpsert; import com.northconcepts.datapipeline.job.Job; public class WriteToDatabaseUsingMergeUpsertWithBatch { public static void main(String[] args) { final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver"; final String DATABASE_URL = "jdbc:hsqldb:mem:aname"; final String DATABASE_USERNAME = "sa"; final String DATABASE_PASSWORD = ""; final String DATABASE_TABLE = "CreditBalance"; final String PRIMARY_KEY = "Account"; Connection connection; try { Class.forName(DATABASE_DRIVER); connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD); createTable(connection); } catch (Throwable e) { throw DataException.wrap(e); } DataReader reader = new CSVReader(new File("example/data/input/credit-balance-upsert-records.csv")).setFieldNamesInFirstRow(true); DataWriter writer = new JdbcUpsertWriter(connection, DATABASE_TABLE, new MergeUpsert().setDebug(true), PRIMARY_KEY) .setBatchSize(100); Job.run(reader, writer); reader = new JdbcReader(connection, "Select * from CreditBalance;"); writer = new CSVWriter(new OutputStreamWriter(System.out)); Job.run(reader, writer); } public static void createTable(Connection connection) throws Throwable{ PreparedStatement preparedStatement; String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;"; String createTableQuery = "CREATE TABLE CreditBalance (" + "Account INTEGER, " + "LastName VARCHAR(256), " + "FirstName VARCHAR(256), " + "Balance DOUBLE, " + "CreditLimit DOUBLE, " + "AccountCreated DATE, " + "Rating CHAR, " + "PRIMARY KEY (Account));"; preparedStatement = connection.prepareStatement(dropTableQuery); preparedStatement.execute(); preparedStatement = connection.prepareStatement(createTableQuery); preparedStatement.execute(); } }
Code walkthrough
- First, database connection parameters are specified.
Class.forName()
is used to register a database driver. In this examplehsqldb.jdbcDriver
is registered.- A method
createTable()
creates a tableCreditBalance
in your database( drops it first if it already exists). - CSVReader is created corresponding to an input file
credit-balance-upsert-records.csv
. setFieldNamesInFirstRow(true)
is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.JdbcUpsertWriter
is created to upsert records to a database using MergeUpsert strategy. A key field namePRIMARY_KEY
in the parameter indicates thatAccount
column will be used as key to determine whether a record already exists in the table.setDebug(true)
enables theMergeUpsert
to log the generated SQL(disabled by default).setBatchSize(100)
sets the number of records to chunk together in batch upserts to 100.- Data are transferred from CSVReader to
JdbcUpsertWriter
viaJob.run()
method to populate or update your database table with records obtained from the CSV file . - JdbcReader is created to obtain records from the specified database.
- CSVWriter is created to write records to a CSV stream.
- Data are transferred from
JdbcReader
toCSVWriter
viaJob.run()
method. See how to compile and run data pipeline jobs.
JdbcUpsertWriter
Writes records to a database table using the upsert idiom (attempt to insert, but update if duplicate key exists). The default strategy used is GenericUpsert
, however several other are provided: MergeUpsert
, VariableFieldsUpsert
, MySqlUpsert
.
MergeUpsert
A batch-able upsert strategy that relies on the SQL Merge statement. A method setDebug()
allows MergeUpsert
to log the generated SQL(disabled by default). For this example the generated SQL will look something like this
MERGE INTO CreditBalance target USING (VALUES (?)) source(Account) ON target.Account=source.Account WHEN NOT MATCHED THEN INSERT (col1, col2, col3) VALUES (?, ?, ?) WHEN MATCHED THEN UPDATE SET SET col1=?, col2=?, col3=?;
JdbcReader
Obtains records from a database query. It extends DataReader class and it's constructor takes Connection object and query string as a parameter. A method setBatchSize()
can be used to set the number of records to chunk together in batch updates.
CSVReader
Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true
to method setFieldNamesInFirstRow() in this class enables the CSVReader
to use the names specified in the first row of the input data as field names.
CSVWriter
Writes records to a Comma Separated Value (CSV) stream. It extends LinedTextWriter and it's constructor takes a File or a Writer object as parameter.
Console Output
Obtained records will be printed on the console.