Json based Apache Spark framework

We at Arrah Technology are creator of osDQ – https://sourceforge.net/projects/dataquality/ world first open source data quality product.

We are now working on json based apache spark framework where –

  • Data processing steps can be written in JSON
  • You not need spark developer to write the code. Framework will take care of
  • Business Analyst can write JSON
  • JSON has many utilities to from smooth transfer of complex SQL queries to JSON
  • We will later add UI so that JSON building can be smooth and seamless from UI

Here is a sample JSON

{
  "sparkconfig": 
    {
      "appname": "TransformRunner",
      "master":"local[3]",
      "verbose":"true",
      "noexecutor":"3",
      "nocore":"3",
      "executormemory":"3G",
      "confparam":"spark.speculation,false,spark.hadoop.mapreduce.map.speculative,false,spark.hadoop.mapreduce.reduce.speculative,false"
    },
  "datasources": [
    {
      "name": "SampleData",
      "location":"/Users/vsingh007c/Downloads/drive-download-20180718T060717Z-001/cdc/modified.csv",
      "locationType": "static",
      "format": "csv",
      "selectedColumns": []
    },
    {
      "name": "SampleDataJDBC",
      "format": "jdbc",
      "jdbcparam":"url,jdbc:mysql://localhost:3306/mysql,driver,com.mysql.jdbc.Driver,user,root,password,root,dbtable,(select * from help_category) AS T,partitionColumn,parent_category_id,lowerBound,0,upperBound,10000,numPartitions,10",
      "selectedColumns": []
    }
  ],
  "transformations": [
    {
      "name": "SampleData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 1,
      "cache": false,
      "conditions": [
        {
          "condition": "random",
          "value": ["0.20"]
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "transformation",
      "source": "SampleData",
      "priority": 2,
      "cache": false,
      "conditions": [
        {
          "condition":"UDF",
          "funcname":"concat",
          "value": [ "col1","col2"],
          "column": "testcolumnUDF",
          "datatype": "string"
        },
        {
          "condition":"UDF",
          "funcname":"regex_replace",
          "value": ["col6"],
          "column": "regexcolumnUDF",
          "aggrcondition": "N.*,vivek",
          "datatype": "string"
        },
        {
          "condition":"normalise",
          "funcname":"zeroscore",
          "value": ["col10"],
          "column": "normcol10",
          "datatype": "Long"
        },
        {
          "condition":"normalise",
          "funcname":"zscore",
          "value": ["col10"],
          "column": "zcol10",
          "datatype": "Long"
        }
      ]
    },
    {
      "name": "SampleData-Filter",
      "type": "filter",
      "source": "SampleData",
      "priority": 3,
      "cache": false,
      "conditions": [
        {
          "condition": "Expression",
          "value": ["constant","col5 is null"],
          "datatype": "String"
        },
        {
          "condition": "RLIKE",
          "column":"col6",
          "value": ["constant","^N"],
          "datatype": "String"
        },
        {
          "condition": "DROPDUPLICATES",
          "value": ["constant","col6","col10"],
          "datatype": "String"
        },
        {
          "condition": "selectexpr",
          "value": ["constant","col6","col10 as none", "'5'", "col10*col10","*"],
          "datatype": "String"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 4,
      "cache": false,
      "conditions": [
        {
          "condition": "addcolumns",
          "aggrcondition": "testcolumn1:trim(col10):substrcol1:substr(now(),1,4):newcolumn:concat(testcolumn1,col4)"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 5,
      "cache": false,
      "conditions": [
        {
          "joinType": "Unionall"
        },
        {
          "joinType": "Intersect"
        }
      ]
    },
    {
      "name": "StratifiedData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 6,
      "cache": false,
      "conditions": [
        {
          "condition": "stratified",
          "value": ["0.002","I","0.8"],
          "column": "col2"
        }
      ]
    },
    
    {
      "name": "Groupdata",
      "type": "aggregate",
      "source": "StratifiedData",
      "priority": 7,
      "cache": false,
      "conditions": [
        {
          "condition": "groupby",
          "value": [ "col2"],
          "aggrcondition": "col2,count"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 8,
      "cache": false,
      "conditions": [
        {
          "condition": "renamecolumns",
          "aggrcondition": "testcolumn1,rename1_vivek,substrcol1,rename2,newcolumn,rename3"
        },
        {
          "condition": "reordercolumns",
          "aggrcondition": "rename1_vivek,rename2,rename3"
        },
        {
          "condition": "replacecolumns",
          "aggrcondition": "rename3:5"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 9,
      "cache": false,
      "conditions": [
        {
          "joinType": "left_outer",
          "leftcolumn": "rename1_vivek,rename2,rename3",
          "condition": "not equal",
          "rightcolumn": "rename1_vivek,rename2",
          "dropcolumn": "rename2"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 10,
      "cache": false,
      "conditions": [
        {
          "condition": "renameallcolumns",
          "aggrcondition": "sample_"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol:null"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol2:case when(newcol == null) then 'not null' else 'null value' END"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol1:if(newcol2='not null',0,5*5)"
        }
      ]
    }
  ],
  "outputs": [
    {
      "name": "BASE_TABLE",
      "location": "/Users/vsingh007c/Downloads/",
      "sources": ["UDFData-Filter-newCol"],
      "format": "parquet",
      "savemode": "append",
      "selectedColumns": []
    }
  ]
}

Soon we will publish readme and open source the framework code. If you have any requirement/use case where we want to migrate data processing or migrate you old SQL to spark feel free to contact us at support@arrahtech.com

Advertisements