Filter Json Records in a Pipeline

This example shows how to filter JSON records within a data pipeline. The pipeline is defined with the input source, output destination, and filtration expressions to be applied to the JSON records. This allows for a flexible and customizable approach to processing and filtering JSON data based on specific criteria or conditions.

You can utilize DataPipeline to perform data cleansing tasks on JSON records within a pipeline. This can involve filtering out irrelevant or incorrect data, removing duplicates, standardizing data formats, or applying custom validation rules. By defining the filtration operations in the pipeline configuration, you can easily process and clean large volumes of JSON data, ensuring data quality and consistency.

 

Input JSON File

[{"id":1,"first_name":"Sarette","last_name":"Sharkey","email":"ssharkey0@vkontakte.ru","ip_address":"236.5.24.150"},
{"id":2,"first_name":"Wylie","last_name":"Greeno","email":"wgreeno1@cornell.edu","ip_address":"39.67.151.237"},
{"id":3,"first_name":"Lukas","last_name":"Rowcastle","email":"lrowcastle2@redcross.org","ip_address":"132.62.80.207"},
{"id":4,"first_name":"Tirrell","last_name":"Huxter","email":"thuxter3@marriott.com","ip_address":"154.241.123.197"},
{"id":5,"first_name":"Galvin","last_name":"McEntegart","email":"gmcentegart4@mail.ru","ip_address":"69.245.242.195"},
...

 

Java Code

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import com.northconcepts.datapipeline.foundations.file.FileSource;
import com.northconcepts.datapipeline.foundations.file.LocalFileSink;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.action.filter.FilterMatchExpression;
import com.northconcepts.datapipeline.foundations.pipeline.input.JsonPipelineInput;
import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput;
import com.northconcepts.datapipeline.xml.XmlReader;

public class FilterJsonRecordsInAPipeline {

    public static void main(String... args) {
        Pipeline pipeline = new Pipeline();

        FileSource source = new LocalFileSource()
            .setName("input-json")
            .setPath("example/data/input/pipeline/MOCK_DATA.json")
            .detectFileType();
        
        JsonPipelineInput input = new JsonPipelineInput()
            .setFileSource(source)
            .addRecordBreak("//array/object")
            .addField("output_id", "//array/object/id")
            .addField("output_first_name", "//array/object/first_name")
            .addField("output_last_name", "//array/object/last_name")
            .addField("output_email", "//array/object/email")
            .addField("output_ip_address", "//array/object/ip_address")
            .setDuplicateFieldPolicy(XmlReader.DuplicateFieldPolicy.USE_LAST_VALUE);
        
        pipeline.setInput(input);

        FilterMatchExpression filter = new FilterMatchExpression();
        filter.setExpression("endsWith(output_email, 'org')");
        pipeline.addAction(filter);

        LocalFileSink sink = new LocalFileSink().setPath("data/output/test.xlsx");
        pipeline.setOutput(new ExcelPipelineOutput().setFileSink(sink).setFieldNamesInFirstRow(true));

        pipeline.run();
    }
}

 

Code Walkthrough

  1. Pipeline object is created.
  2. LocalFileSource instance defines the input file source MOCK_DATA.json.
  3. JsonPipelineInput object is created. It takes data from the file source declared in the previous step and defines a separator between records, and field definitions (name, location path).
  4. JsonPipelineInput instance is declared as an input of the Pipeline.
  5. Filtration of records is implemented in the FilterMatchExpression object. In the given example, only the records whose output_email field ends with "org" are selected for the next action.
  6. LocalFileSink is created corresponding to the output file test.xlsx.
  7. ExcelPipelineOutput is declared as a pipeline output. This instance internally uses LocalFileSink in the previous step.
  8. The pipeline is run.

 

Output File

output_id	output_first_name	output_last_name	output_email	output_ip_address
3	                   Lukas	Rowcastle	      lrowcastle2@redcross.org	132.62.80.207
21	                   Herve	Jaycox	              hjaycoxk@edublogs.org	205.88.212.178
53	                   Baryram	Hindhaugh	      bhindhaugh1g@mozilla.org	80.98.255.137
57	                   Talia	Tulloch	              ttulloch1k@wikipedia.org	156.89.201.140
65	                   Laurent	Robjant	              lrobjant1s@drupal.org	64.62.81.61
...
Mobile Analytics