Write to a Database Using Multiple Connections

This example speeds up database write operations by using multiple connections. It is particularly useful for scenarios where high-speed data ingestion and parallel processing are essential. Real-life use cases include bulk data insertion, log storage, real-time data warehousing, and high-throughput data streaming applications.


Input Excel File

event_type	id	agent_id	phone_number	start_time	        end_time	       disposition
STARTED	        1	40	        (807) 4254-5586	18/03/2016 21:11		
ENDED	        1	40	        (807) 4254-5586	18/03/2016 21:11	18/03/2016 21:11	DROPPED_WHILE_TALKING
STARTED	        2	15	        (437) 2869-3170	18/03/2016 21:11		
ENDED	        2	15	        (437) 2869-3170	18/03/2016 21:11	18/03/2016 21:11	DROPPED_WHILE_TALKING
STARTED	        3	34	        (343) 3322-7289	18/03/2016 21:11		


Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.mysql.cj.jdbc.MysqlDataSource;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.excel.ExcelDocument;
import com.northconcepts.datapipeline.excel.ExcelReader;
import com.northconcepts.datapipeline.jdbc.JdbcMultiWriter;
import com.northconcepts.datapipeline.job.Job;

public class WriteToDatabaseUsingMultipleConnections {

    public static void main(String[] args) {
        MysqlDataSource dataSource = new MysqlDataSource();
        ExcelDocument doc = new ExcelDocument();
        DataReader reader = new ExcelReader(doc.open(
                new File("example/data/input/call-center-inbound-call-2.xlsx")))

        DataWriter writer = new JdbcMultiWriter(dataSource, 5, 10, "inbound_calls");

        Job.run(reader, writer);
/* JdbcMultiWriter does not create the table and columns, it must be created beforehand.
When setFieldNamesInFirstRow(true), the column names must equal the field names.

mysql> describe inbound_calls;
| Field        | Type        | Null | Key | Default | Extra |
| event_type   | varchar(45) | YES  |     | NULL    |       |
| id           | varchar(45) | YES  |     | NULL    |       |
| agent_id     | varchar(45) | YES  |     | NULL    |       |
| phone_number | varchar(45) | YES  |     | NULL    |       |
| start_time   | varchar(45) | YES  |     | NULL    |       |
| end_time     | varchar(45) | YES  |     | NULL    |       |
| disposition  | varchar(45) | YES  |     | NULL    |       |

When setFieldNamesInFirstRow(false), the column names start at A, B... so on (Excel style)

mysql> describe inbound_calls;
| Field | Type        | Null | Key | Default | Extra |
| A     | varchar(45) | YES  |     | NULL    |       |
| B     | varchar(45) | YES  |     | NULL    |       |
| C     | varchar(45) | YES  |     | NULL    |       |
| D     | varchar(45) | YES  |     | NULL    |       |
| E     | varchar(45) | YES  |     | NULL    |       |
| F     | varchar(45) | YES  |     | NULL    |       |
| G     | varchar(45) | YES  |     | NULL    |       |
7 rows in set (0.00 sec)



Code Walkthrough

  1. First, MySql database connection properties are defined using MysqlDataSource instance.
  2. Next, ExcelDocument and ExcelReader are created to read records from the input file call-center-inbound-call-2.xlsx.
  3. JdbcMultiWriter instance is used to insert records into inbound_calls table using 5 database connections; each connection writing asynchronously using a separate thread. One thing to note is that JdbcMultiWriter does not create the table and columns. They must be created in advance, otherwise exception is thrown.
  4. Finally, Job.run() is used to transfer the data from reader to writer. See how to compile and run data pipeline jobs. 


Console Output

18:32:38,693 DEBUG [main] datapipeline:196 - creating connection 0...
18:32:38,929 DEBUG [main] datapipeline:196 - creating connection 1...
18:32:38,945 DEBUG [main] datapipeline:196 - creating connection 2...
18:32:38,962 DEBUG [main] datapipeline:196 - creating connection 3...
18:32:38,975 DEBUG [main] datapipeline:196 - creating connection 4...
Mobile Analytics