Conditionally Write to Multiple Files

This example is designed to read data from an input CSV file and perform conditional checks on the data. It allows users to specify conditions based on specific columns and generate separate files containing data that meets those conditions and data that does not.

You can use this example to categorize products based on attributes such as size, color, or price range from a CSV file, generating separate files for each category, and thus facilitating inventory management or catalog organization.

Input CSV file

"Country (en)";"Country (de)";"Country (local)";"Country code";"Continent";"Capital";"Population";"Area";"Coastline";"Government form";"Currency";"Currency code";"Dialing prefix";"Birthrate";"Deathrate";"Life expectancy";"Url"
"Afghanistan";"Afghanistan";"Afganistan/Afqanestan";"AF";"Asia";;33332025;652230;0;"Presidential islamic republic";"Afghani";"AFN";93;38.3;13.7;51.3;"https://www.laenderdaten.info/Asien/Afghanistan/index.php"
"Egypt";"Ägypten";"Misr";"EG";"Africa";;94666993;1001450;2450;"Presidential republic";"Pfund";"EGP";20;30.3;4.7;72.7;"https://www.laenderdaten.info/Afrika/Aegypten/index.php"
"Åland Islands";"Ålandinseln";"Åland";"AX";"Europe";;29013;1580;0;"Autonomous region of Finland";"Euro";"EUR";358;0;0;0;"https://www.laenderdaten.info/Europa/Aland/index.php"
"Albania";"Albanien";"Shqipëria";"AL";"Europe";;3038594;28748;362;"parliamentary republic";"Lek";"ALL";355;13.1;6.7;78.3;"https://www.laenderdaten.info/Europa/Albanien/index.php"
"Algeria";"Algerien";"Al-Jaza’ir/Algérie";"DZ";"Africa";;40263711;2381741;998;"Presidential republic";"Dinar";"DZD";213;23;4.3;76.8;"https://www.laenderdaten.info/Afrika/Algerien/index.php"
...

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.filter.FilterExpression;
import com.northconcepts.datapipeline.filter.FilteringReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.multiplex.SplitWriter;

public class ConditionallyWriteToMultipleFiles {

    private static final String INPUT = "example/data/input";
    private static final String OUTPUT = "example/data/output";

    public static void main(String[] args) {
        
        DataReader reader = new CSVReader(new File(INPUT, "countries.csv"))
                .setFieldSeparator(';')
                .setFieldNamesInFirstRow(true);

        SplitWriter splitWriter = new SplitWriter();
        
        // Async job writing to USD file
        DataReader reader1 = splitWriter.createReader();
        reader1 = new FilteringReader(reader1).add(new FilterExpression("${Currency code} == 'USD'"));
        DataWriter writer1 = new CSVWriter(new File(OUTPUT, "countries-usd.csv"))
                .setFieldSeparator(',')
                .setFieldNamesInFirstRow(true);
        Job.runAsync(reader1, writer1);
        

        // Async job writing to non-USD file
        DataReader reader2 = splitWriter.createReader();
        reader2 = new FilteringReader(reader2).add(new FilterExpression("${Currency code} != 'USD'"));
        DataWriter writer2 = new CSVWriter(new File(OUTPUT, "countries-not-usd.csv"))
                .setFieldSeparator(',')
                .setFieldNamesInFirstRow(true);
        Job.runAsync(reader2, writer2);
        
        
        // Sync job writing to async jobs
        Job.run(reader, splitWriter);
    }

}

Code walkthrough

  1. CSVReader is created corresponding to the input file countries.csv.
  2. The CSVReader.setFieldSeparator(";") method is used to specify that each cell is separated by ";" (semicolon) in the input file.
  3. The CSVReader.setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  4. SplitWriter is created to convert single source DataReader into multiple downstream sources, meaning that each reader serves to filter the data for  some conditions. 
  5. DataReader is instantiated to filter the countries where currency code is USD. Here FilterExpression object is given as an argument for add() method of FilteringReader.
  6. FilterExpression accepts logicalExpression String argument. In the given example, "${Currency code} == 'USD'" is used to find the countries whose  currency code is "USD".
  7. CSVWriter is used to extract the filtered data to the output file called countries-usd.csv. setFieldSeparator and setFieldNamesInFirstRow methods work similar to CSVReader's mentioned in steps 2 and 3.
  8. Data is transferred from the reader1 to the writer1 via Job.runAsync() method. This method is used to execute the job in a new thread.
  9. DataReader is instantiated from splitWriter to filter the countries where currency code is not USD.
  10. Similar to previous steps, this filtering operation is implemented by giving "${Currency code} != 'USD'" as an argument to FilterExpression. This object is called in the add() method of FilteringReader.
  11. The processed data is written to the output file countries-not-usd.csv using CSVWriter.
  12. Data is transferred from the reader2 to the writer2 via Job.runAsync() method. 
  13. Lastly, Job.run() method is called to do the final operation of transferring data from the original input file reader to the splitWriter.

Output CSV Files

countries-usd.csv

Country (en),Country (de),Country (local),Country code,Continent,Capital,Population,Area,Coastline,Government form,Currency,Currency code,Dialing prefix,Birthrate,Deathrate,Life expectancy,Url
American Samoa,Amerikanisch Samoa,Amerika Samoa,AS,Oceania,,54194,199,116,Presidential democracy (self-governing territory of the US),Dollar,USD,1-684,22.9,4.8,75.4,https://www.laenderdaten.info/Ozeanien/Amerikanisch-Samoa/index.php
British Indian Ocean Territory,Britisches Territorium im Indischen Ozean,British Indian Ocean Territory,IO,Africa,,0,54400,698,British overseas territory,Dollar,USD,246,0,0,0,https://www.laenderdaten.info/Afrika/Britisches-Territorium-im-Indischen-Ozean/index.php
Ecuador,Ecuador,Ecuador,EC,South America,,16080778,283561,2237,Presidential republic,Dollar,USD,593,18.2,5.1,76.8,https://www.laenderdaten.info/Amerika/Ecuador/index.php
El Salvador,El Salvador,El Salvador,SV,Central America,,6156670,21041,307,Presidential republic,Dollar,USD,503,16.3,5.7,74.7,https://www.laenderdaten.info/Amerika/El-Salvador/index.php
Guam,Guam,Guam,GU,Oceania,,162742,544,126,Presidential democracy (self-governing unincorporated territory of the US),Dollar,USD,671,16.7,5.2,79.1,https://www.laenderdaten.info/Ozeanien/Guam/index.php
...

countries-not-usd.csv

Country (en),Country (de),Country (local),Country code,Continent,Capital,Population,Area,Coastline,Government form,Currency,Currency code,Dialing prefix,Birthrate,Deathrate,Life expectancy,Url
Afghanistan,Afghanistan,Afganistan/Afqanestan,AF,Asia,,33332025,652230,0,Presidential islamic republic,Afghani,AFN,93,38.3,13.7,51.3,https://www.laenderdaten.info/Asien/Afghanistan/index.php
Egypt,Ägypten,Misr,EG,Africa,,94666993,1001450,2450,Presidential republic,Pfund,EGP,20,30.3,4.7,72.7,https://www.laenderdaten.info/Afrika/Aegypten/index.php
Åland Islands,Ålandinseln,Åland,AX,Europe,,29013,1580,0,Autonomous region of Finland,Euro,EUR,358,0,0,0,https://www.laenderdaten.info/Europa/Aland/index.php
Albania,Albanien,Shqipëria,AL,Europe,,3038594,28748,362,parliamentary republic,Lek,ALL,355,13.1,6.7,78.3,https://www.laenderdaten.info/Europa/Albanien/index.php
Algeria,Algerien,Al-Jaza’ir/Algérie,DZ,Africa,,40263711,2381741,998,Presidential republic,Dinar,DZD,213,23,4.3,76.8,https://www.laenderdaten.info/Afrika/Algerien/index.php
...
Mobile Analytics