Upsert Records With Multiple JDBC Connections

This example shows you how to asynchronously upsert records to a database table using multiple JDBC connections.

Upsert (a portmanteau of update and insert) is a database operation that will update an existing row if a specified value already exists in a table, and insert a new row otherwise.

DataPipeline's JdbcMultiUpsertWriter allows you to speed up an Upsert operation by using multiple JdbcUpsertWriter instances each writing asynchronously with their own separate thread.

For this example, MySqlUpsert strategy is configured but you can also use other upsert strategies supported by DataPipeline such as GenericUpsert, MergeUpsert, PostgreSqlUpsert, and some others.

Input CSV

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A
312,Butler,Gerard,90.00,1000.00,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A
317,Murray,Bill,789.65,5000.00,2007-2-5,C
317,Murray,Bill,1,5000.00,2007-2-5,A

Notice how records in the above dataset have redundant values for Account field.

Java Code Listing

package com.northconcepts.datapipeline;

import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcMultiUpsertWriter;
import com.northconcepts.datapipeline.jdbc.upsert.MySqlUpsert;
import com.northconcepts.datapipeline.job.Job;

import javax.sql.DataSource;
import java.io.File;

public class UpsertWithMultipleJdbcConnections {

    private static final String SERVER_NAME = "localhost";
    private static final String DATABASE_NAME = "datapipeline"; // create this database if not exists.
    private static final String DATABASE_USERNAME = "etl";
    private static final String DATABASE_PASSWORD = "etl";
    private static final String DATABASE_TABLE = "CreditBalance";
    private static final String PRIMARY_KEY = "Account";

    public static void main(String[] args) throws Throwable {
        MysqlDataSource dataSource = new MysqlDataSource();

        dataSource.setServerName(SERVER_NAME);
        dataSource.setDatabaseName(DATABASE_NAME);
        dataSource.setUser(DATABASE_USERNAME);
        dataSource.setPassword(DATABASE_PASSWORD);

        createTable(dataSource);

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true);
        // Write to a database using 3 connections.
        DataWriter writer = new JdbcMultiUpsertWriter(dataSource, 3, 1,  DATABASE_TABLE, new MySqlUpsert(), PRIMARY_KEY);

        Job.run(reader, writer);
    }

    public static void createTable(DataSource dataSource) {
        JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource));
        String dropTableQuery = "DROP TABLE IF EXISTS CreditBalance;";
        String createTableQuery = "CREATE TABLE CreditBalance ("
                + "Account INTEGER, "
                + "LastName VARCHAR(256), "
                + "FirstName VARCHAR(256), "
                + "Balance double precision, "
                + "CreditLimit double precision, "
                + "AccountCreated DATE, "
                + "Rating CHAR, "
                + "PRIMARY KEY (Account)"
                + ")";

        jdbcFacade.execute(dropTableQuery);
        jdbcFacade.execute(createTableQuery);

        jdbcFacade.close();
    }
}

Code Walkthrough

  1. Create MysqlDataSource with the specified connection details.
  2. Create a table named CreditBalance in the connected MySql database (drop it if already exists).
  3. Read records from an input CSV file credit-balance-02.csv.
  4. setFieldNamesInFirstRow(true) is invoked to specify that the first row of the input CSV file should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  5. Create JdbcMultiUpsertWriter to asynchronously upsert records into CreditBalance table using 3 database connections.
  6. Configure MySqlUpsert strategy which will be used by internal JdbcUpsertWriter instances.
  7. Behind the scenes, JdbcMultiUpsertWriter creates 3 JdbcUpsertWriter instances; each instance writing asynchronously using a separate thread.
  8. If no upsert strategy is specified, GenericUpsert strategy will be used by default.
  9. Use Account  field as a key to determine whether a record already exists in the table. Multiple fields can be used as a key.
  10. Transfer records from CSVReader to JdbcMultiUpsertWriter via Job.run() method to populate your database table with records obtained from the CSV file. See how to compile and run data pipeline jobs.

 

Mobile Analytics