Write to a Database Using Custom Jdbc Insert Strategy
This example shows you how to use DataPipeline to insert records to database using a custom JDBC insert strategy. You will also see how to apply transformation to records as they are passing through the pipeline.
The demo code will read records from a CSV file using CSVReader, transform them using TransformingReader, and write them into an HSQLDB database using custom SQL.
Transforming records are useful in situations such as when you want insert records to a database as you read from a file. You can transform the field types to match the column types in your database table.
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import java.io.OutputStreamWriter; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.jdbc.JdbcWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.TransformingReader; public class WriteToDatabaseUsingCustomJdbcInsertStrategy { public static void main(String[] args) throws Throwable { final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver"; final String DATABASE_URL = "jdbc:hsqldb:mem:aname"; final String DATABASE_USERNAME = "sa"; final String DATABASE_PASSWORD = ""; final String DATABASE_TABLE = "CreditBalance"; Class.forName(DATABASE_DRIVER); Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD); createTable(connection); DataReader reader = new CSVReader(new File("example/data/input/credit-balance-insert-records.csv")) .setFieldNamesInFirstRow(true); reader = new TransformingReader(reader) .add(new BasicFieldTransformer("Account").stringToInt()) .add(new BasicFieldTransformer("Balance").stringToDouble()) .add(new BasicFieldTransformer("CreditLimit").stringToDouble()) .add(new BasicFieldTransformer("Rating").stringToChar()); DataWriter writer = new JdbcWriter(connection, DATABASE_TABLE, new CustomJdbcInsertStrategy()) .setDebug(true); Job.run(reader, writer); reader = new JdbcReader(connection, "Select * from CreditBalance;"); writer = new CSVWriter(new OutputStreamWriter(System.out)); Job.run(reader, writer); } public static void createTable(Connection connection) throws Throwable{ PreparedStatement preparedStatement; String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;"; String createTableQuery = "CREATE TABLE CreditBalance (" + "Account INTEGER, " + "LastName VARCHAR(256), " + "FirstName VARCHAR(256), " + "Balance DOUBLE, " + "CreditLimit DOUBLE, " + "Rating CHAR, " + "PRIMARY KEY (Account));"; preparedStatement = connection.prepareStatement(dropTableQuery); preparedStatement.execute(); preparedStatement = connection.prepareStatement(createTableQuery); preparedStatement.execute(); } }
Code walkthrough
- First, database connection parameters are specified.
Class.forName()
is used to register a database driver. In this examplehsqldb.jdbcDriver
is registered.- A method
createTable()
creates a table in your database( drops it first if it already exists). - CSVReader is created corresponding to an input file
credit-balance-insert-records.csv
. A methodCSVReader.setFieldNamesInFirstRow(true)
causes theCSVReader
to use the names specified in the first row of the input data as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default. TransformingReader
is created to transform records as they passing through the pipeline.- Adding
BasicFieldTransformer
to theTransformingReader
causes theTransformingReader
to apply transformation to fields in a record. BasicFieldTransformer("Account").stringToInt())
transforms the field specified in the parameter i.eAccount
from a string into an integer.- JdbcWriter is created to write records to the specified database table using the specified strategy.
setDebug(true)
enables theJdbcWriter
to log the generated SQL(default false). - Data are transferred from CSVReader to
JdbcWriter
viaJob.run()
method. - JdbcReader is created to obtain records from the database using the specified query string.
- CSVWriter is created to write records which are obtained from the database to the console.
- Data are transferred from
JdbcReader
toCSVWriter
viaJob.run()
method. See how to compile and run data pipeline jobs.
JdbcReader
Obtains records from a database query. It extends DataReader class and it's constructor takes Connection object and query string as a parameter.
JdbcWriter
Writes records to a database table. It extends DataWriter class and it's constructor takes Connection object, a table name and optionally a IInsert object. The optional third parameter specifies a strategy that is used to write records to a database.
TransformingReader
A proxy that applies transformations to records passing through. It extends ProxyReader an can be with DataReader object. You can add Transformer
using add() method to apply transformation on fields of a record.
BasicFieldTransformer
A proxy that applies transformations to fields of a record passing through. It extends FieldTransformer class. and it's constructor takes one or more names of target fields which are going to be transformed. It includes several methods which applies transformation on fields for example stringToInt()
method transforms a field with a string type to an integer type.
CSVReader
Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true
to method setFieldNamesInFirstRow() in this class enables the CSVReader
to use the names specified in the first row of the input data as field names.
CSVWriter
Writes records to a Comma Separated Value (CSV) stream. It extends LinedTextWriter and it's constructor takes File or Writer object as parameter.
Database Output
The output of this program would be records obtained from CreditBalance
database table.