Save and Restore Pipeline from JSON
Updated: Aug 12, 2023
This example shows how to save and restore data processing pipelines in JSON format. This capability enables you to preserve and share your pipeline configurations, including custom actions and transformations, for future use or collaboration.
This can be used for version control of pipeline configurations, sharing data processing workflows across teams, and reproducing data transformations in different environments. By providing a seamless way to serialize and deserialize pipelines, DataPipeline enhances the portability, maintainability, and scalability of data processing tasks, promoting efficient and consistent data handling practices in various data-driven applications.
Input JSON file
Sell,List,Living,Rooms,Beds,Baths,Age,Acres,Taxes,Rating,Garage 142,160,28,10,5,,60,0.28,3167,A, 1,193,28,11,,3,60,0.28,3168,B, 10,110,28,,5,3,60,0.28,3169,B, 3,132,,12,5,3,60,0.28,3170,C, 5,,180,13,5,3,60,0.28,3171,C,
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.pipeline; import com.northconcepts.datapipeline.foundations.file.LocalFileSink; import com.northconcepts.datapipeline.foundations.file.LocalFileSource; import com.northconcepts.datapipeline.foundations.pipeline.Pipeline; import com.northconcepts.datapipeline.foundations.pipeline.action.convert.ConvertStringToNumberAction; import com.northconcepts.datapipeline.foundations.pipeline.action.transform.AddFieldsAction; import com.northconcepts.datapipeline.foundations.pipeline.action.transform.RenameFieldsAction; import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput; import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput; public class SaveAndRestorePipelineFromJson { public static void main(String[] args) throws Throwable{ CsvPipelineInput pipelineInput = new CsvPipelineInput() .setFileSource(new LocalFileSource().setPath("data/input/Listing.csv")) .setFieldNamesInFirstRow(true); ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput() .setFileSink(new LocalFileSink().setPath("data/output/output.xlsx")) .setFieldNamesInFirstRow(true); Pipeline pipeline = new Pipeline(); pipeline.setInput(pipelineInput); pipeline.setOutput(pipelineOutput); pipeline.addAction(new RenameFieldsAction().add("Taxes", "Taxes_Renamed")); pipeline.addAction(new ConvertStringToNumberAction() .add("Sell", "List") .setType(ConvertStringToNumberAction.FieldType.DOUBLE) .setPattern("0.00")); pipeline.addAction(new AddFieldsAction().add("new_column", AddFieldsAction.FieldType.EXPRESSION, "List - Sell")); String json = pipeline.toJson(); System.out.println(json); Pipeline pipeline2 = new Pipeline(); pipeline2.fromJson(json); pipeline2.run(); } }
Code Walkthrough
- CsvPipelineInput instance is created to read data from the input file
Listing.csv
. - The
setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - ExcelPipelineOutput instance is created to specify the output Excel file
output.xlsx
and its path. - A Pipeline instance is created and input and output are declared for that pipeline.
- Different actions are applied for the pipeline to process data. RenameFieldsAction, ConvertStringToNumberAction, and AddFieldsAction are executed to change field names and types and to create a new column in the input data respectively.
- A Pipeline is then converted to JSON string and printed on the console.
- A Pipeline is then rebuilt from that JSON string and run.
Output File
Sell List Living Rooms Beds Baths Age Acres Taxes_Renamed Rating Garage new_column 142.00 160.00 28 10 5 60 0.28 3167 A 18.00 1.00 193.00 28 11 3 60 0.28 3168 B 192.00 10.00 110.00 28 5 3 60 0.28 3169 B 100.00 3.00 132.00 12 5 3 60 0.28 3170 C 129.00 5.00 180 13 5 3 60 0.28 3171 C
Console Output
{ "name": null, "description": null, "input": { "__class__": "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput", "saveLineage": false, "fieldSeparator": ",", "startingQuote": "\"", "endingQuote": "\"", "lineSeparators": "\\n,\\r\\n,\\r", "allowMultiLineText": false, "allowQuoteInField": false, "trimFields": true, "skipEmptyRows": false, "charset": "UTF-8", "fieldNamesInFirstRow": true, "fileSource": { "__class__": "com.northconcepts.datapipeline.foundations.file.LocalFileSource", "name": null, "path": "data/input/Listing.csv" } }, "sourceEntity": null, "targetEntity": null, "output": { "__class__": "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput", "providerType": "POI_XSSF", "sheetName": null, "sheetIndex": -1, "firstRowIndex": 0, "firstColumnIndex": 0, "autofitColumns": false, "autoFilterColumns": false, "fieldNamesInFirstRow": true, "fileSink": { "__class__": "com.northconcepts.datapipeline.foundations.file.LocalFileSink", "name": null, "path": "data/output/output.xlsx", "append": false } }, "multithreaded": true, "actions": [ { "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.transform.RenameFieldsAction", "description": null, "mapping": [ { "source": "Taxes", "target": "Taxes_Renamed" } ] }, { "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.convert.ConvertStringToNumberAction", "description": null, "fields": [ "Sell", "List" ], "type": "DOUBLE", "pattern": "0.00" }, { "__class__": "com.northconcepts.datapipeline.foundations.pipeline.action.transform.AddFieldsAction", "description": null, "fields": [ { "fieldName": "new_column", "type": "EXPRESSION", "value": "List - Sell" } ] } ] }