Upsert Records to MySql or MariaDB

Updated: Apr 5, 2023

This example will show you how you can use DataPipline to upsert records to MySql or MariaDB.

To accomplish this both JdbcUpsertWriter and MySqlUpsert will be used.

This example can be easily modified to Upsert Records to Oracle or Upsert Records to PostgreSql.

Input CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A
312,Butler,Gerard,90.00,1000.00,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A
317,Murray,Bill,789.65,5000.00,2007-2-5,C
317,Murray,Bill,1,5000.00,2007-2-5,A

Java code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.jdbc.JdbcUpsertWriter;
import com.northconcepts.datapipeline.jdbc.upsert.MySqlUpsert;
import com.northconcepts.datapipeline.job.Job;

public class UpsertRecordsToMySqlOrMariaDB {

	public static void main(String[] args) throws Throwable {
		final String DATABASE_DRIVER = "com.mysql.jdbc.Driver";
		final String DATABASE_NAME = "test"; // create this database if not exists.
		final String DATABASE_URL = "jdbc:mysql://localhost:3306/" + DATABASE_NAME;
		final String DATABASE_USERNAME = "root";
		final String DATABASE_PASSWORD = "root";
		final String DATABASE_TABLE = "CreditBalance";
		final String PRIMARY_KEY = "Account";

		Class.forName(DATABASE_DRIVER);
		Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);

		createTable(connection);

		DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02.csv")).setFieldNamesInFirstRow(true);
		DataWriter writer = new JdbcUpsertWriter(connection, DATABASE_TABLE, new MySqlUpsert(), PRIMARY_KEY);

		Job.run(reader, writer);

		reader = new JdbcReader(connection, "Select * from CreditBalance;");
		writer = new CSVWriter(new OutputStreamWriter(System.out));

		Job.run(reader, writer);

	}

	public static void createTable(Connection connection) throws Throwable {
		PreparedStatement preparedStatement;
		String dropTableQuery = "DROP TABLE IF EXISTS CreditBalance";
		String createTableQuery = "CREATE TABLE CreditBalance ("
			+ "Account INTEGER, "
			+ "LastName VARCHAR(256), "
			+ "FirstName VARCHAR(256), "
			+ "Balance double precision, "
			+ "CreditLimit double precision, "
			+ "AccountCreated DATE, "
			+ "Rating CHAR, "
			+ "PRIMARY KEY (Account)"
			+ ")";

		preparedStatement = connection.prepareStatement(dropTableQuery);
		preparedStatement.execute();

		preparedStatement = connection.prepareStatement(createTableQuery);
		preparedStatement.execute();
	}
	
}

Code Walkthrough

  1. First you need to feed your database details.
  2. DriverManager is used to create a connection to the database.
  3. createTable(connection) creates a table CreditBalance in the database test. Before creating the table it first drops it if it already exists.
  4. A CSVReader is created using the file path of the input file credit-balance-02.csv.
  5. The CSVReader.setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  6. JdbcUpsertWriter will write the records to the database table test. For this example JdbcUpsertWriter takes four arguments i.e. Connection(connection), Table Name(CreditBalance), strategy(MySqlUpsert()) and keyFieldNames(Account).
  7. Data is transferred from the reader to writer via Job.run() method.
  8. JdbcReader is used to obtain the records from the database using the query Select * from CreditBalance.
  9. CSVWriter is used to write the CSV record to the output console
  10. Job.run(reader, writer) is used to transfer the data from the reader to the writer.

JdbcUpsertWriter

Writes records to a database table using the upsert idiom (attempt to insert, but update if duplicate key exists). The default strategy used is GenericUpsert, however several other are provided: MergeUpsert, VariableFieldsUpsert, MySqlUpsert.

CSVReader

CSVReader is an input reader which can be used to read CSV files. It is a sub-class of TextReader and inherits the open and close among other methods. The CSVReader.setFieldNamesInFirstRow(true) method causes the CSVReader to use the names specified in the first row of the input data as field names. If this method is not invoked, the fields would be named as A1, A2, etc. similar to MS Excel. If those fields names need to be changed, a rename transformation can be added on top of CSVReader or any other type (Refer Rename a field for example).

Output

The output will be written to both the console and the test database.

Console Output

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A
312,Butler,Gerard,90.00,1000.00,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A
317,Murray,Bill,789.65,5000.00,2007-2-5,C
317,Murray,Bill,1,5000.00,2007-2-5,A
Mobile Analytics