Write to a Database Using Multiple Connections
Updated: Sep 12, 2023
This example speeds up database write operations by using multiple connections. It is particularly useful for scenarios where high-speed data ingestion and parallel processing are essential. Real-life use cases include bulk data insertion, log storage, real-time data warehousing, and high-throughput data streaming applications.
Input Excel File
event_type id agent_id phone_number start_time end_time disposition STARTED 1 40 (807) 4254-5586 18/03/2016 21:11 ENDED 1 40 (807) 4254-5586 18/03/2016 21:11 18/03/2016 21:11 DROPPED_WHILE_TALKING STARTED 2 15 (437) 2869-3170 18/03/2016 21:11 ENDED 2 15 (437) 2869-3170 18/03/2016 21:11 18/03/2016 21:11 DROPPED_WHILE_TALKING STARTED 3 34 (343) 3322-7289 18/03/2016 21:11
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.excel.ExcelDocument;
import com.northconcepts.datapipeline.excel.ExcelReader;
import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter;
import com.northconcepts.datapipeline.job.Job;
public class WriteToDatabaseUsingMultipleConnections {
public static void main(String[] args) {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setServerName("localhost");
dataSource.setDatabaseName("datapipeline");
dataSource.setUser("etl");
dataSource.setPassword("etl");
ExcelDocument doc = new ExcelDocument();
DataReader reader = new ExcelReader(doc.open(
new File("example/data/input/call-center-inbound-call-2.xlsx")))
.setFieldNamesInFirstRow(true);
DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, "inbound_calls");
Job.run(reader, writer);
}
/* JdbcMultiWriter does not create the table and columns, it must be created beforehand.
When setFieldNamesInFirstRow(true), the column names must equal the field names.
mysql> describe inbound_calls;
+--------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-------------+------+-----+---------+-------+
| event_type | varchar(45) | YES | | NULL | |
| id | varchar(45) | YES | | NULL | |
| agent_id | varchar(45) | YES | | NULL | |
| phone_number | varchar(45) | YES | | NULL | |
| start_time | varchar(45) | YES | | NULL | |
| end_time | varchar(45) | YES | | NULL | |
| disposition | varchar(45) | YES | | NULL | |
+--------------+-------------+------+-----+---------+-------+
When setFieldNamesInFirstRow(false), the column names start at A, B... so on (Excel style)
mysql> describe inbound_calls;
+-------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| A | varchar(45) | YES | | NULL | |
| B | varchar(45) | YES | | NULL | |
| C | varchar(45) | YES | | NULL | |
| D | varchar(45) | YES | | NULL | |
| E | varchar(45) | YES | | NULL | |
| F | varchar(45) | YES | | NULL | |
| G | varchar(45) | YES | | NULL | |
+-------+-------------+------+-----+---------+-------+
7 rows in set (0.00 sec)
*/
}
Code Walkthrough
- First, MySql database connection properties are defined using MysqlDataSource instance.
- Next, ExcelDocument and ExcelReader are created to read records from the input file
call-center-inbound-call-2.xlsx. - JdbcMultiWriter instance is used to insert records into
inbound_callstable using 5 database connections; each connection writing asynchronously using a separate thread. One thing to note is that JdbcMultiWriter does not create the table and columns. They must be created in advance, otherwise exception is thrown. - Finally, Job.run() is used to transfer the data from
readertowriter. See how to compile and run data pipeline jobs.
Console Output
18:32:38,693 DEBUG [main] datapipeline:196 - creating connection 0... 18:32:38,929 DEBUG [main] datapipeline:196 - creating connection 1... 18:32:38,945 DEBUG [main] datapipeline:196 - creating connection 2... 18:32:38,962 DEBUG [main] datapipeline:196 - creating connection 3... 18:32:38,975 DEBUG [main] datapipeline:196 - creating connection 4...
