Upsert Records With Multiple JDBC Connections
This example shows you how to asynchronously upsert records to a database table using multiple JDBC connections.
Upsert (a portmanteau of update and insert) is a database operation that will update an existing row if a specified value already exists in a table, and insert a new row otherwise.
DataPipeline's JdbcMultiUpsertWriter allows you to speed up an Upsert operation by using multiple JdbcUpsertWriter instances each writing asynchronously with their own separate thread.
For this example, MySqlUpsert strategy is configured but you can also use other upsert strategies supported by DataPipeline such as GenericUpsert, MergeUpsert, PostgreSqlUpsert, and some others.
Input CSV
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A 312,Butler,Gerard,90.00,1000.00,2003-8-6,B 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B 312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A 317,Murray,Bill,789.65,5000.00,2007-2-5,C 317,Murray,Bill,1,5000.00,2007-2-5,A
Notice how records in the above dataset have redundant values for Account field.
Java Code Listing
package com.northconcepts.datapipeline;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcMultiUpsertWriter;
import com.northconcepts.datapipeline.jdbc.upsert.MySqlUpsert;
import com.northconcepts.datapipeline.job.Job;
import javax.sql.DataSource;
import java.io.File;
public class UpsertWithMultipleJdbcConnections {
private static final String SERVER_NAME = "localhost";
private static final String DATABASE_NAME = "datapipeline"; // create this database if not exists.
private static final String DATABASE_USERNAME = "etl";
private static final String DATABASE_PASSWORD = "etl";
private static final String DATABASE_TABLE = "CreditBalance";
private static final String PRIMARY_KEY = "Account";
public static void main(String[] args) throws Throwable {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setServerName(SERVER_NAME);
dataSource.setDatabaseName(DATABASE_NAME);
dataSource.setUser(DATABASE_USERNAME);
dataSource.setPassword(DATABASE_PASSWORD);
createTable(dataSource);
DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true);
// Write to a database using 3 connections.
DataWriter writer = new JdbcMultiUpsertWriter(dataSource, 3, 1, DATABASE_TABLE, new MySqlUpsert(), PRIMARY_KEY);
Job.run(reader, writer);
}
public static void createTable(DataSource dataSource) {
JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource));
String dropTableQuery = "DROP TABLE IF EXISTS CreditBalance;";
String createTableQuery = "CREATE TABLE CreditBalance ("
+ "Account INTEGER, "
+ "LastName VARCHAR(256), "
+ "FirstName VARCHAR(256), "
+ "Balance double precision, "
+ "CreditLimit double precision, "
+ "AccountCreated DATE, "
+ "Rating CHAR, "
+ "PRIMARY KEY (Account)"
+ ")";
jdbcFacade.execute(dropTableQuery);
jdbcFacade.execute(createTableQuery);
jdbcFacade.close();
}
}
Code Walkthrough
- Create
MysqlDataSourcewith the specified connection details. - Create a table named
CreditBalancein the connected MySql database (drop it if already exists). - Read records from an input CSV file
credit-balance-02.csv. setFieldNamesInFirstRow(true)is invoked to specify that the first row of the input CSV file should be used as field names (disabled by default). If this method is not invoked, the fields would be named asA1,A2by default.- Create
JdbcMultiUpsertWriterto asynchronously upsert records intoCreditBalancetable using 3 database connections. - Configure
MySqlUpsertstrategy which will be used by internalJdbcUpsertWriterinstances. - Behind the scenes,
JdbcMultiUpsertWritercreates 3JdbcUpsertWriterinstances; each instance writing asynchronously using a separate thread. - If no upsert strategy is specified, GenericUpsert strategy will be used by default.
- Use
Accountfield as a key to determine whether a record already exists in the table. Multiple fields can be used as a key. - Transfer records from CSVReader to
JdbcMultiUpsertWriterviaJob.run()method to populate your database table with records obtained from the CSV file. See how to compile and run data pipeline jobs.
