Write to a Database Using Multiple Connections
Updated: Sep 12, 2023
This example speeds up database write operations by using multiple connections. It is particularly useful for scenarios where high-speed data ingestion and parallel processing are essential. Real-life use cases include bulk data insertion, log storage, real-time data warehousing, and high-throughput data streaming applications.
Input Excel File
event_type id agent_id phone_number start_time end_time disposition STARTED 1 40 (807) 4254-5586 18/03/2016 21:11 ENDED 1 40 (807) 4254-5586 18/03/2016 21:11 18/03/2016 21:11 DROPPED_WHILE_TALKING STARTED 2 15 (437) 2869-3170 18/03/2016 21:11 ENDED 2 15 (437) 2869-3170 18/03/2016 21:11 18/03/2016 21:11 DROPPED_WHILE_TALKING STARTED 3 34 (343) 3322-7289 18/03/2016 21:11
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.mysql.cj.jdbc.MysqlDataSource; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.excel.ExcelDocument; import com.northconcepts.datapipeline.excel.ExcelReader; import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter; import com.northconcepts.datapipeline.job.Job; public class WriteToDatabaseUsingMultipleConnections { public static void main(String[] args) { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setServerName("localhost"); dataSource.setDatabaseName("datapipeline"); dataSource.setUser("etl"); dataSource.setPassword("etl"); ExcelDocument doc = new ExcelDocument(); DataReader reader = new ExcelReader(doc.open( new File("example/data/input/call-center-inbound-call-2.xlsx"))) .setFieldNamesInFirstRow(true); DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, "inbound_calls"); Job.run(reader, writer); } /* JdbcMultiWriter does not create the table and columns, it must be created beforehand. When setFieldNamesInFirstRow(true), the column names must equal the field names. mysql> describe inbound_calls; +--------------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------------+-------------+------+-----+---------+-------+ | event_type | varchar(45) | YES | | NULL | | | id | varchar(45) | YES | | NULL | | | agent_id | varchar(45) | YES | | NULL | | | phone_number | varchar(45) | YES | | NULL | | | start_time | varchar(45) | YES | | NULL | | | end_time | varchar(45) | YES | | NULL | | | disposition | varchar(45) | YES | | NULL | | +--------------+-------------+------+-----+---------+-------+ When setFieldNamesInFirstRow(false), the column names start at A, B... so on (Excel style) mysql> describe inbound_calls; +-------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+-------------+------+-----+---------+-------+ | A | varchar(45) | YES | | NULL | | | B | varchar(45) | YES | | NULL | | | C | varchar(45) | YES | | NULL | | | D | varchar(45) | YES | | NULL | | | E | varchar(45) | YES | | NULL | | | F | varchar(45) | YES | | NULL | | | G | varchar(45) | YES | | NULL | | +-------+-------------+------+-----+---------+-------+ 7 rows in set (0.00 sec) */ }
Code Walkthrough
- First, MySql database connection properties are defined using MysqlDataSource instance.
- Next, ExcelDocument and ExcelReader are created to read records from the input file
call-center-inbound-call-2.xlsx
. - JdbcMultiWriter instance is used to insert records into
inbound_calls
table using 5 database connections; each connection writing asynchronously using a separate thread. One thing to note is that JdbcMultiWriter does not create the table and columns. They must be created in advance, otherwise exception is thrown. - Finally, Job.run() is used to transfer the data from
reader
towriter
. See how to compile and run data pipeline jobs.
Console Output
18:32:38,693 DEBUG [main] datapipeline:196 - creating connection 0... 18:32:38,929 DEBUG [main] datapipeline:196 - creating connection 1... 18:32:38,945 DEBUG [main] datapipeline:196 - creating connection 2... 18:32:38,962 DEBUG [main] datapipeline:196 - creating connection 3... 18:32:38,975 DEBUG [main] datapipeline:196 - creating connection 4...