Configure the Insert Strategy used in JdbcMultiWriter
Updated: Apr 5, 2023
This example shows you how to configure the insert strategy in DataPipeline's JdbcMultiWriter. The ability to configure the insert strategy will give you additional flexibility to control how the insert operations are performed at the query level.
JdbcMultiWriter is used for multi-threaded writing to one or more database connections concurrently. The configured strategy will be passed down to its internal JdbcWriter instances which will run on separate threads.
For this example, you will be configuring the MySqlPreparedStatementInsert strategy. However, if you would like to use a different insert strategy that is supported by DataPipeline, you can easily modify the configuration to do so.
Input Excel
event_type, id, agent_id, phone_number, start_time, end_time, disposition STARTED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:27 ENDED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:29, 03/18/2016 21:11:29, DROPPED_WHILE_TALKING ... ENDED, 984, 16, (705) 4086-4986, 03/18/2016 22:15:42, 03/18/2016 22:18:06, COMPLAINT
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.excel.ExcelDocument;
import com.northconcepts.datapipeline.excel.ExcelReader;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter;
import com.northconcepts.datapipeline.jdbc.insert.MySqlPreparedStatementInsert;
import com.northconcepts.datapipeline.job.Job;
import javax.sql.DataSource;
import java.io.File;
public class ConfigureInsertStrategyInJdbcMultiWriter {
private static final String SERVER_NAME = "localhost";
private static final String DATABASE_NAME = "datapipeline";
private static final String DATABASE_USERNAME = "etl";
private static final String DATABASE_PASSWORD = "etl";
private static final String DATABASE_TABLE = "inbound_calls";
public static void main(String[] args) {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setServerName(SERVER_NAME);
dataSource.setDatabaseName(DATABASE_NAME);
dataSource.setUser(DATABASE_USERNAME);
dataSource.setPassword(DATABASE_PASSWORD);
createTable(dataSource);
ExcelDocument doc = new ExcelDocument().open(new File("example/data/input/call-center-inbound-call-2.xlsx"));
DataReader reader = new ExcelReader(doc).setFieldNamesInFirstRow(true);
// Configure MySqlPreparedStatementInsert strategy
DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, DATABASE_TABLE, new MySqlPreparedStatementInsert().setDebug(true))
.setBatchSize(10); // set batch size
Job.run(reader, writer);
}
public static void createTable(DataSource dataSource) {
JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource));
String dropTableQuery = "DROP TABLE IF EXISTS inbound_calls;";
String createTableQuery = "CREATE TABLE inbound_calls ("
+ "id INTEGER, "
+ "event_type VARCHAR(45), "
+ "agent_id INTEGER, "
+ "phone_number VARCHAR(45), "
+ "start_time VARCHAR(45), "
+ "end_time VARCHAR(45), "
+ "disposition VARCHAR(45));";
jdbcFacade.execute(dropTableQuery);
jdbcFacade.execute(createTableQuery);
jdbcFacade.close();
}
}
Code Walkthrough
- Create
MysqlDataSourcewith the specified connection details. - Create a table named
inbound_callsin the connected MySql database. - Read records from an input Excel file
call-center-inbound-call-2.xlsx. setFieldNamesInFirstRow(true)is invoked to specify that the first row of the input Excel file should be used as field names (disabled by default). If this method is not invoked, the fields would be named asA1,A2by default.- Create
JdbcMultiWriterto asynchronously insert records intoinbound_callstable using 5 database connections. - Configure
MySqlPreparedStatementInsertstrategy which will be used by internalJdbcWriterinstances. - Behind the scenes,
JdbcMultiWritercreates 5JdbcWriterinstances; each instance writing asynchronously using a separate thread. - If no insert strategy is specified, PreparedStatementInsert strategy will be used by default.
setBatchSize(10)sets the number of records that will be chunked together in a batch to 10.setDebug(true)enables theMySqlPreparedStatementInsertto log the generated SQL (disabled by default).- Transfer records from
ExcelReadertoJdbcMultiWriterviaJob.run()method to populate your database table with records obtained from the Excel file. See how to compile and run data pipeline jobs.
