Configure the Insert Strategy used in JdbcMultiWriter

This example shows you how to configure the insert strategy in DataPipeline's  JdbcMultiWriter. The ability to configure the insert strategy will give you additional flexibility to control how the insert operations are performed at the query level.

JdbcMultiWriter is used for multi-threaded writing to one or more database connections concurrently. The configured strategy will be passed down to its internal JdbcWriter instances which will run on separate threads.

For this example, you will be configuring the MySqlPreparedStatementInsert strategy. However, if you would like to use a different insert strategy that is supported by DataPipeline, you can easily modify the configuration to do so.

Input Excel

event_type, id, agent_id, phone_number, start_time, end_time, disposition
STARTED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:27
ENDED, 1, 40, (807) 4254-5586, 03/18/2016 21:11:29, 03/18/2016 21:11:29, DROPPED_WHILE_TALKING
...
ENDED, 984, 16, (705) 4086-4986, 03/18/2016 22:15:42, 03/18/2016 22:18:06, COMPLAINT

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.excel.ExcelDocument;
import com.northconcepts.datapipeline.excel.ExcelReader;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter;
import com.northconcepts.datapipeline.jdbc.insert.MySqlPreparedStatementInsert;
import com.northconcepts.datapipeline.job.Job;

import javax.sql.DataSource;
import java.io.File;

public class ConfigureInsertStrategyInJdbcMultiWriter {

    private static final String SERVER_NAME = "localhost";
    private static final String DATABASE_NAME = "datapipeline";
    private static final String DATABASE_USERNAME = "etl";
    private static final String DATABASE_PASSWORD = "etl";
    private static final String DATABASE_TABLE = "inbound_calls";

    public static void main(String[] args) {
        MysqlDataSource dataSource = new MysqlDataSource();

        dataSource.setServerName(SERVER_NAME);
        dataSource.setDatabaseName(DATABASE_NAME);
        dataSource.setUser(DATABASE_USERNAME);
        dataSource.setPassword(DATABASE_PASSWORD);

        createTable(dataSource);

        ExcelDocument doc = new ExcelDocument().open(new File("example/data/input/call-center-inbound-call-2.xlsx"));
        DataReader reader = new ExcelReader(doc).setFieldNamesInFirstRow(true);

        // Configure MySqlPreparedStatementInsert strategy
        DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, DATABASE_TABLE, new MySqlPreparedStatementInsert().setDebug(true))
                .setBatchSize(10); // set batch size

        Job.run(reader, writer);
    }

    public static void createTable(DataSource dataSource) {
        JdbcFacade jdbcFacade = new JdbcFacade(JdbcConnectionFactory.wrap(dataSource));
        String dropTableQuery = "DROP TABLE IF EXISTS inbound_calls;";
        String createTableQuery = "CREATE TABLE inbound_calls ("
                + "id INTEGER, "
                + "event_type VARCHAR(45), "
                + "agent_id INTEGER, "
                + "phone_number VARCHAR(45), "
                + "start_time VARCHAR(45), "
                + "end_time VARCHAR(45), "
                + "disposition VARCHAR(45));";

        jdbcFacade.execute(dropTableQuery);
        jdbcFacade.execute(createTableQuery);

        jdbcFacade.close();
    }
}


Code Walkthrough

  1. Create MysqlDataSource with the specified connection details.
  2. Create a table named inbound_calls in the connected MySql database.
  3. Read records from an input Excel file call-center-inbound-call-2.xlsx.
  4. setFieldNamesInFirstRow(true) is invoked to specify that the first row of the input Excel file should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  5. Create JdbcMultiWriter to asynchronously insert records into inbound_calls table using 5 database connections.
  6. Configure MySqlPreparedStatementInsert strategy which will be used by internal JdbcWriter instances.
  7. Behind the scenes, JdbcMultiWriter creates 5 JdbcWriter instances; each instance writing asynchronously using a separate thread.
  8. If no insert strategy is specified, PreparedStatementInsert strategy will be used by default.
  9. setBatchSize(10) sets the number of records that will be chunked together in a batch to 10.
  10. setDebug(true) enables the MySqlPreparedStatementInsert to log the generated SQL (disabled by default).
  11. Transfer records from ExcelReader to JdbcMultiWriter via Job.run() method to populate your database table with records obtained from the Excel file. See how to compile and run data pipeline jobs.

 

Mobile Analytics