Upsert Variable Field Records

Updated: Jan 22, 2023

This example shows you how to upsert records to a database using Data Pipeline's JdbcUpsertWriter and VariableFieldsUpsert strategy.

Upsert(a portmanteau of update and insert) is a database operation that will update an existing row if a specified value already exists in a table, and insert a new row if the specified value doesn't already exist. Data Pipeline provides several other upsert strategies such as GenericUpsert, MergeUpsert, MySqlUpsert, PostgreSqlUpsert and some others.

Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.jdbc.JdbcUpsertWriter;
import com.northconcepts.datapipeline.jdbc.upsert.GenericUpsert;
import com.northconcepts.datapipeline.jdbc.upsert.IUpsert;
import com.northconcepts.datapipeline.jdbc.upsert.VariableFieldsUpsert;
import com.northconcepts.datapipeline.job.Job;

public class UpsertVariableFieldRecords {

	public static void main(String[] args) throws Throwable {

		final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver";
		final String DATABASE_URL = "jdbc:hsqldb:mem:aname";
		final String DATABASE_USERNAME = "sa";
		final String DATABASE_PASSWORD = "";
		final String DATABASE_TABLE = "CreditBalance";
		final String PRIMARY_KEY = "Account";

		Class.forName(DATABASE_DRIVER);
		Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);

		createTable(connection);

		VariableFieldsUpsert upsert = new VariableFieldsUpsert(new VariableFieldsUpsert.IUpsertFactory() {
			@Override
			public IUpsert createUpsert(JdbcUpsertWriter writer, Record record) {
				return new GenericUpsert().setInsertFirst(false);
			}
		}).setDebug(true);

		DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true);
		DataWriter writer = new JdbcUpsertWriter(connection, DATABASE_TABLE, upsert, PRIMARY_KEY);

		Job.run(reader, writer);

		reader = new JdbcReader(connection, "Select * from CreditBalance;");
		writer = new CSVWriter(new OutputStreamWriter(System.out));

		Job.run(reader, writer);
	}

	public static void createTable(Connection connection) throws Throwable{
		PreparedStatement preparedStatement;
		String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;";
		String createTableQuery = "CREATE TABLE CreditBalance ("
			+ "Account INTEGER, "
			+ "LastName VARCHAR(256), "
			+ "FirstName VARCHAR(256), "
			+ "Balance DOUBLE, "
			+ "CreditLimit DOUBLE, "
			+ "AccountCreated DATE, "
			+ "Rating CHAR, "
			+ "PRIMARY KEY (Account));";

		preparedStatement = connection.prepareStatement(dropTableQuery);
		preparedStatement.execute();

		preparedStatement = connection.prepareStatement(createTableQuery);
		preparedStatement.execute();
	}
}

Code walkthrough

  1. First, database connection parameters are specified.
  2. Class.forName() is used to register a database driver. In this example hsqldb.jdbcDriver is registered.
  3. A method createTable() creates a table CreditBalance in your database( drops it first if it already exists).
  4. VariableFieldsUpsert is created to wrap another strategy to support a variable set of fields. GenericUpsert strategy is used for this example.
  5. VariableFieldsUpsert.IUpsertFactory is passed as a parameter to VariableFieldsUpsert to specify which upsert strategy will be used. createUpsert() overrides the method in VariableFieldsUpsert.IUpsertFactory and returns GenericUpsert object.
  6. setInsertFirst(false) allows GenericUpsert to attempt an update before and insert .
  7. setDebug(true) enables the VariableFieldsUpsert to log the generated SQL(disabled by default).
  8. setFieldNamesInFirstRow(true) is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  9. JdbcUpsertWriter is created to upsert records to a database using GenericUpsert strategy. A key field name PRIMARY_KEY in the parameter indicates that Account column will be used as key to determine whether a record already exists in the table.
  10. Data are transferred from CSVReader to JdbcUpsertWriter via Job.run() method to populate or update your database table with records obtained from the CSV file .
  11. JdbcReader is created to obtain records from the specified database.
  12. CSVWriter is created to write records to a CSV stream.
  13. Data are transferred from JdbcReader to CSVWriter via Job.run() method. See how to compile and run data pipeline jobs.

JdbcUpsertWriter

Writes records to a database table using the upsert idiom (attempt to insert, but update if duplicate key exists). The default strategy used is GenericUpsert, however several other are provided: MergeUpsert, VariableFieldsUpsert, MySqlUpsert.

VariableFieldsUpsert

An upsert strategy that wraps another strategy to support a variable set of fields. This allows you to upsert some records with just ID + FIRST_NAME, others with ID + LAST_NAME + ADDRESS, or any other sets of fields. It implements IUpsert interface and it's constructor takes one or more classes which implements IUpsert interface(or simply one or more upsert strategies).

GenericUpsert

JdbcUpsertWriter's default upsert strategy that attempts to either:

  • "insert" and then "update" if SQLIntegrityConstraintViolationException is thrown or
  • update and then insert if no records were affected.
You can use setDebug(true) to enable GenericUpsert to log the generated SQL(disabled by default). For this example the generated SQL will look something like this
  • Insert SQL: INSERT INTO CreditBalance (Account, LastName, FirstName,..) VALUES (?, ?, ?)
  • Update SQL: UPDATE CreditBalance SET Account=?, LastName=?, FirstName=?,... WHERE ID=?

JdbcReader

Obtains records from a database query. It extends DataReader class and it's constructor takes Connection object and query string as a parameter. A method setBatchSize() in this class can be used to set the number of records to chunk together in batch updates.

CSVReader

Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true to method setFieldNamesInFirstRow() in this class enables the CSVReader to use the names specified in the first row of the input data as field names.

CSVWriter

Writes records to a Comma Separated Value (CSV) stream. It extends LinedTextWriter and it's constructor takes a File or a Writer object as parameter.

Console Output

Obtained records will be printed on the console.

Mobile Analytics