Configure the Insert Strategy used in JdbcMultiWriter
Updated: Apr 5, 2023
This example shows you how to configure the insert strategy in DataPipeline's JdbcMultiWriter. The ability to configure the insert strategy will give you additional flexibility to control how the insert operations are performed at the query level.
JdbcMultiWriter
is used for multi-threaded writing to one or more database connections concurrently. The configured strategy will be passed down to its internal JdbcWriter instances which will run on separate threads.
For this example, you will be configuring the MySqlPreparedStatementInsert
strategy. However, if you would like to use a different insert strategy that is supported by DataPipeline, you can easily modify the configuration to do so.
Input Excel
event_type, id, agent_id, phone_number, start_time, end_time, disposition STARTED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:27 ENDED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:29, 03/18/2016 21:11:29, DROPPED_WHILE_TALKING ... ENDED, 984, 16, (705) 4086-4986, 03/18/2016 22:15:42, 03/18/2016 22:18:06, COMPLAINT
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import com.mysql.cj.jdbc.MysqlDataSource; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.excel.ExcelDocument; import com.northconcepts.datapipeline.excel.ExcelReader; import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade; import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory; import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter; import com.northconcepts.datapipeline.jdbc.insert.MySqlPreparedStatementInsert; import com.northconcepts.datapipeline.job.Job; import javax.sql.DataSource; import java.io.File; public class ConfigureInsertStrategyInJdbcMultiWriter { private static final String SERVER_NAME = "localhost"; private static final String DATABASE_NAME = "datapipeline"; private static final String DATABASE_USERNAME = "etl"; private static final String DATABASE_PASSWORD = "etl"; private static final String DATABASE_TABLE = "inbound_calls"; public static void main(String[] args) { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setServerName(SERVER_NAME); dataSource.setDatabaseName(DATABASE_NAME); dataSource.setUser(DATABASE_USERNAME); dataSource.setPassword(DATABASE_PASSWORD); createTable(dataSource); ExcelDocument doc = new ExcelDocument().open(new File("example/data/input/call-center-inbound-call-2.xlsx")); DataReader reader = new ExcelReader(doc).setFieldNamesInFirstRow(true); // Configure MySqlPreparedStatementInsert strategy DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, DATABASE_TABLE, new MySqlPreparedStatementInsert().setDebug(true)) .setBatchSize(10); // set batch size Job.run(reader, writer); } public static void createTable(DataSource dataSource) { JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource)); String dropTableQuery = "DROP TABLE IF EXISTS inbound_calls;"; String createTableQuery = "CREATE TABLE inbound_calls (" + "id INTEGER, " + "event_type VARCHAR(45), " + "agent_id INTEGER, " + "phone_number VARCHAR(45), " + "start_time VARCHAR(45), " + "end_time VARCHAR(45), " + "disposition VARCHAR(45));"; jdbcFacade.execute(dropTableQuery); jdbcFacade.execute(createTableQuery); jdbcFacade.close(); } }
Code Walkthrough
- Create
MysqlDataSource
with the specified connection details. - Create a table named
inbound_calls
in the connected MySql database. - Read records from an input Excel file
call-center-inbound-call-2.xlsx
. setFieldNamesInFirstRow(true)
is invoked to specify that the first row of the input Excel file should be used as field names (disabled by default). If this method is not invoked, the fields would be named asA1
,A2
by default.- Create
JdbcMultiWriter
to asynchronously insert records intoinbound_calls
table using 5 database connections. - Configure
MySqlPreparedStatementInsert
strategy which will be used by internalJdbcWriter
instances. - Behind the scenes,
JdbcMultiWriter
creates 5JdbcWriter
instances; each instance writing asynchronously using a separate thread. - If no insert strategy is specified, PreparedStatementInsert strategy will be used by default.
setBatchSize(10)
sets the number of records that will be chunked together in a batch to 10.setDebug(true)
enables theMySqlPreparedStatementInsert
to log the generated SQL (disabled by default).- Transfer records from
ExcelReader
toJdbcMultiWriter
viaJob.run()
method to populate your database table with records obtained from the Excel file. See how to compile and run data pipeline jobs.