Upsert Records With Multiple JDBC Connections
This example shows you how to asynchronously upsert records to a database table using multiple JDBC connections.
Upsert (a portmanteau of update and insert) is a database operation that will update an existing row if a specified value already exists in a table, and insert a new row otherwise.
DataPipeline's JdbcMultiUpsertWriter allows you to speed up an Upsert operation by using multiple JdbcUpsertWriter
instances each writing asynchronously with their own separate thread.
For this example, MySqlUpsert strategy is configured but you can also use other upsert strategies supported by DataPipeline such as GenericUpsert, MergeUpsert, PostgreSqlUpsert, and some others.
Input CSV
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A 312,Butler,Gerard,90.00,1000.00,2003-8-6,B 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B 312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A 317,Murray,Bill,789.65,5000.00,2007-2-5,C 317,Murray,Bill,1,5000.00,2007-2-5,A
Notice how records in the above dataset have redundant values for Account
field.
Java Code Listing
package com.northconcepts.datapipeline; import com.mysql.cj.jdbc.MysqlDataSource; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade; import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory; import com.northconcepts.datapipeline.jdbc.JdbcMultiUpsertWriter; import com.northconcepts.datapipeline.jdbc.upsert.MySqlUpsert; import com.northconcepts.datapipeline.job.Job; import javax.sql.DataSource; import java.io.File; public class UpsertWithMultipleJdbcConnections { private static final String SERVER_NAME = "localhost"; private static final String DATABASE_NAME = "datapipeline"; // create this database if not exists. private static final String DATABASE_USERNAME = "etl"; private static final String DATABASE_PASSWORD = "etl"; private static final String DATABASE_TABLE = "CreditBalance"; private static final String PRIMARY_KEY = "Account"; public static void main(String[] args) throws Throwable { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setServerName(SERVER_NAME); dataSource.setDatabaseName(DATABASE_NAME); dataSource.setUser(DATABASE_USERNAME); dataSource.setPassword(DATABASE_PASSWORD); createTable(dataSource); DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true); // Write to a database using 3 connections. DataWriter writer = new JdbcMultiUpsertWriter(dataSource, 3, 1, DATABASE_TABLE, new MySqlUpsert(), PRIMARY_KEY); Job.run(reader, writer); } public static void createTable(DataSource dataSource) { JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource)); String dropTableQuery = "DROP TABLE IF EXISTS CreditBalance;"; String createTableQuery = "CREATE TABLE CreditBalance (" + "Account INTEGER, " + "LastName VARCHAR(256), " + "FirstName VARCHAR(256), " + "Balance double precision, " + "CreditLimit double precision, " + "AccountCreated DATE, " + "Rating CHAR, " + "PRIMARY KEY (Account)" + ")"; jdbcFacade.execute(dropTableQuery); jdbcFacade.execute(createTableQuery); jdbcFacade.close(); } }
Code Walkthrough
- Create
MysqlDataSource
with the specified connection details. - Create a table named
CreditBalance
in the connected MySql database (drop it if already exists). - Read records from an input CSV file
credit-balance-02.csv
. setFieldNamesInFirstRow(true)
is invoked to specify that the first row of the input CSV file should be used as field names (disabled by default). If this method is not invoked, the fields would be named asA1
,A2
by default.- Create
JdbcMultiUpsertWriter
to asynchronously upsert records intoCreditBalance
table using 3 database connections. - Configure
MySqlUpsert
strategy which will be used by internalJdbcUpsertWriter
instances. - Behind the scenes,
JdbcMultiUpsertWriter
creates 3JdbcUpsertWriter
instances; each instance writing asynchronously using a separate thread. - If no upsert strategy is specified, GenericUpsert strategy will be used by default.
- Use
Account
field as a key to determine whether a record already exists in the table. Multiple fields can be used as a key. - Transfer records from CSVReader to
JdbcMultiUpsertWriter
viaJob.run()
method to populate your database table with records obtained from the CSV file. See how to compile and run data pipeline jobs.