Write to a Database Using Custom Jdbc Insert Strategy

This example shows you how to use DataPipeline to insert records to database using a custom JDBC insert strategy.  You will also see how to apply transformation to records as they are passing through the pipeline.

The demo code will read records from a CSV file using CSVReader, transform them using TransformingReader, and write them into an HSQLDB database using custom SQL.

Transforming records are useful in situations such as when you want insert records to a database as you read from a file.  You can transform the field types to match the column types in your database table.

Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.jdbc.JdbcWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.transform.BasicFieldTransformer;
import com.northconcepts.datapipeline.transform.TransformingReader;

public class WriteToDatabaseUsingCustomJdbcInsertStrategy {

    public static void main(String[] args) throws Throwable {

        final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver";
        final String DATABASE_URL = "jdbc:hsqldb:mem:aname";
        final String DATABASE_USERNAME = "sa";
        final String DATABASE_PASSWORD = "";
        final String DATABASE_TABLE = "CreditBalance";

        Class.forName(DATABASE_DRIVER);
        Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);

        createTable(connection);

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-insert-records.csv"))
                .setFieldNamesInFirstRow(true);
        reader = new TransformingReader(reader)
                .add(new BasicFieldTransformer("Account").stringToInt())
                .add(new BasicFieldTransformer("Balance").stringToDouble())
                .add(new BasicFieldTransformer("CreditLimit").stringToDouble())
                .add(new BasicFieldTransformer("Rating").stringToChar());

        DataWriter writer = new JdbcWriter(connection, DATABASE_TABLE, new CustomJdbcInsertStrategy())
                .setDebug(true);

        Job.run(reader, writer);

        reader = new JdbcReader(connection, "Select * from CreditBalance;");
        writer = new CSVWriter(new OutputStreamWriter(System.out));

        Job.run(reader, writer);
    }

    public static void createTable(Connection connection) throws Throwable{
        PreparedStatement preparedStatement;
        String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;";
        String createTableQuery = "CREATE TABLE CreditBalance ("
                + "Account INTEGER, "
                + "LastName VARCHAR(256), "
                + "FirstName VARCHAR(256), "
                + "Balance DOUBLE, "
                + "CreditLimit DOUBLE, "
                + "Rating CHAR, "
                + "PRIMARY KEY (Account));";

        preparedStatement = connection.prepareStatement(dropTableQuery);
        preparedStatement.execute();

        preparedStatement = connection.prepareStatement(createTableQuery);
        preparedStatement.execute();
    }

}

Code walkthrough

  1. First, database connection parameters are specified.
  2. Class.forName() is used to register a database driver. In this example hsqldb.jdbcDriver is registered.
  3. A method createTable() creates a table in your database( drops it first if it already exists).
  4. CSVReader is created corresponding to an input file credit-balance-insert-records.csv. A method CSVReader.setFieldNamesInFirstRow(true) causes the CSVReader to use the names specified in the first row of the input data as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  5. TransformingReader is created to transform records as they passing through the pipeline.
  6. Adding BasicFieldTransformer to the TransformingReader causes the TransformingReader to apply transformation to fields in a record.
  7. BasicFieldTransformer("Account").stringToInt()) transforms the field specified in the parameter i.e Account from a string into an integer.
  8. JdbcWriter is created to write records to the specified database table using the specified strategy. setDebug(true) enables the JdbcWriter to log the generated SQL(default false).
  9. Data are transferred from CSVReader to JdbcWriter via Job.run() method.
  10. JdbcReader is created to obtain records from the database using the specified query string.
  11. CSVWriter is created to write records which are obtained from the database to the console.
  12. Data are transferred from JdbcReader to CSVWriter via Job.run() method. See how to compile and run data pipeline jobs.

JdbcReader

Obtains records from a database query. It extends DataReader class and it's constructor takes Connection object and query string as a parameter.

JdbcWriter

Writes records to a database table. It extends DataWriter class and it's constructor takes Connection object, a table name and optionally a IInsert object. The optional third parameter specifies a strategy that is used to write records to a database.

TransformingReader

A proxy that applies transformations to records passing through. It extends ProxyReader an can be with DataReader object. You can add Transformer using add() method to apply transformation on fields of a record.

BasicFieldTransformer

A proxy that applies transformations to fields of a record passing through. It extends FieldTransformer class. and it's constructor takes one or more names of target fields which are going to be transformed. It includes several methods which applies transformation on fields for example stringToInt() method transforms a field with a string type to an integer type.

CSVReader

Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true to method setFieldNamesInFirstRow() in this class enables the CSVReader to use the names specified in the first row of the input data as field names.

CSVWriter

Writes records to a Comma Separated Value (CSV) stream. It extends LinedTextWriter and it's constructor takes File or Writer object as parameter.

Database Output

The output of this program would be records obtained from CreditBalance database table.

Mobile Analytics