Save and Restore Pipeline from JSON

This example shows how to save and restore data processing pipelines in JSON format. This capability enables you to preserve and share your pipeline configurations, including custom actions and transformations, for future use or collaboration.

This can be used for version control of pipeline configurations, sharing data processing workflows across teams, and reproducing data transformations in different environments. By providing a seamless way to serialize and deserialize pipelines, DataPipeline enhances the portability, maintainability, and scalability of data processing tasks, promoting efficient and consistent data handling practices in various data-driven applications.

 

Input JSON file

Sell,List,Living,Rooms,Beds,Baths,Age,Acres,Taxes,Rating,Garage
142,160,28,10,5,,60,0.28,3167,A,
1,193,28,11,,3,60,0.28,3168,B,
10,110,28,,5,3,60,0.28,3169,B,
3,132,,12,5,3,60,0.28,3170,C,
5,,180,13,5,3,60,0.28,3171,C,

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import com.northconcepts.datapipeline.foundations.file.LocalFileSink;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.action.convert.ConvertStringToNumberAction;
import com.northconcepts.datapipeline.foundations.pipeline.action.transform.AddFieldsAction;
import com.northconcepts.datapipeline.foundations.pipeline.action.transform.RenameFieldsAction;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;
import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput;

public class SaveAndRestorePipelineFromJson {

    public static void main(String[] args) throws Throwable{

        CsvPipelineInput pipelineInput = new CsvPipelineInput()
                .setFileSource(new LocalFileSource().setPath("data/input/Listing.csv"))
                .setFieldNamesInFirstRow(true);

        ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput()
                .setFileSink(new LocalFileSink().setPath("data/output/output.xlsx"))
                .setFieldNamesInFirstRow(true);

        Pipeline pipeline = new Pipeline();
        pipeline.setInput(pipelineInput);
        pipeline.setOutput(pipelineOutput);

        pipeline.addAction(new RenameFieldsAction().add("Taxes", "Taxes_Renamed"));
        pipeline.addAction(new ConvertStringToNumberAction()
                .add("Sell", "List")
                .setType(ConvertStringToNumberAction.FieldType.DOUBLE)
                .setPattern("0.00"));
        pipeline.addAction(new AddFieldsAction().add("new_column", AddFieldsAction.FieldType.EXPRESSION, "List - Sell"));

        String json = pipeline.toJson();

        System.out.println(json);
        
        Pipeline pipeline2 = new Pipeline();
        pipeline2.fromJson(json);

        pipeline2.run();
    }
}

 

Code Walkthrough

  1. CsvPipelineInput instance is created to read data from the input file Listing.csv.
  2. The setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  3. ExcelPipelineOutput instance is created to specify the output Excel file output.xlsx and its path.
  4. A Pipeline instance is created and input and output are declared for that pipeline.
  5. Different actions are applied for the pipeline to process data. RenameFieldsAction, ConvertStringToNumberAction, and AddFieldsAction are executed to change field names and types and to create a new column in the input data respectively.
  6. A Pipeline is then converted to JSON string and printed on the console.
  7. A Pipeline is then rebuilt from that JSON string and run.

 

Output File

Sell	List	Living	Rooms	Beds	Baths	Age	Acres	Taxes_Renamed	Rating	Garage	new_column
142.00	160.00	28	10	5		60	0.28	3167	        A		18.00
1.00	193.00	28	11		3	60	0.28	3168	        B		192.00
10.00	110.00	28		5	3	60	0.28	3169	        B		100.00
3.00	132.00		12	5	3	60	0.28	3170	        C		129.00
5.00		180	13	5	3	60	0.28	3171	        C		

 

Console Output

{
  "name": null,
  "description": null,
  "input": {
    "__class__": "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput",
    "saveLineage": false,
    "fieldSeparator": ",",
    "startingQuote": "\"",
    "endingQuote": "\"",
    "lineSeparators": "\\n,\\r\\n,\\r",
    "allowMultiLineText": false,
    "allowQuoteInField": false,
    "trimFields": true,
    "skipEmptyRows": false,
    "charset": "UTF-8",
    "fieldNamesInFirstRow": true,
    "fileSource": {
      "__class__": "com.northconcepts.datapipeline.foundations.file.LocalFileSource",
      "name": null,
      "path": "data/input/Listing.csv"
    }
  },
  "sourceEntity": null,
  "targetEntity": null,
  "output": {
    "__class__": "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput",
    "providerType": "POI_XSSF",
    "sheetName": null,
    "sheetIndex": -1,
    "firstRowIndex": 0,
    "firstColumnIndex": 0,
    "autofitColumns": false,
    "autoFilterColumns": false,
    "fieldNamesInFirstRow": true,
    "fileSink": {
      "__class__": "com.northconcepts.datapipeline.foundations.file.LocalFileSink",
      "name": null,
      "path": "data/output/output.xlsx",
      "append": false
    }
  },
  "multithreaded": true,
  "actions": [
    {
      "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.transform.RenameFieldsAction",
      "description": null,
      "mapping": [
        {
          "source": "Taxes",
          "target": "Taxes_Renamed"
        }
      ]
    },
    {
      "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.convert.ConvertStringToNumberAction",
      "description": null,
      "fields": [
        "Sell",
        "List"
      ],
      "type": "DOUBLE",
      "pattern": "0.00"
    },
    {
      "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.transform.AddFieldsAction",
      "description": null,
      "fields": [
        {
          "fieldName": "new_column",
          "type": "EXPRESSION",
          "value": "List - Sell"
        }
      ]
    }
  ]
}
Mobile Analytics