Json based Apache Spark framework

We at Arrah Technology are creator of osDQ – https://sourceforge.net/projects/dataquality/ world first open source data quality product.

We are now working on json based apache spark framework where –

  • Data processing steps can be written in JSON
  • You not need spark developer to write the code. Framework will take care of
  • Business Analyst can write JSON
  • JSON has many utilities to from smooth transfer of complex SQL queries to JSON
  • We will later add UI so that JSON building can be smooth and seamless from UI

Here is a sample JSON

{
  "sparkconfig": 
    {
      "appname": "TransformRunner",
      "master":"local[3]",
      "verbose":"true",
      "noexecutor":"3",
      "nocore":"3",
      "executormemory":"3G",
      "confparam":"spark.speculation,false,spark.hadoop.mapreduce.map.speculative,false,spark.hadoop.mapreduce.reduce.speculative,false"
    },
  "datasources": [
    {
      "name": "SampleData",
      "location":"/Users/vsingh007c/Downloads/drive-download-20180718T060717Z-001/cdc/modified.csv",
      "locationType": "static",
      "format": "csv",
      "selectedColumns": []
    },
    {
      "name": "SampleDataJDBC",
      "format": "jdbc",
      "jdbcparam":"url,jdbc:mysql://localhost:3306/mysql,driver,com.mysql.jdbc.Driver,user,root,password,root,dbtable,(select * from help_category) AS T,partitionColumn,parent_category_id,lowerBound,0,upperBound,10000,numPartitions,10",
      "selectedColumns": []
    }
  ],
  "transformations": [
    {
      "name": "SampleData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 1,
      "cache": false,
      "conditions": [
        {
          "condition": "random",
          "value": ["0.20"]
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "transformation",
      "source": "SampleData",
      "priority": 2,
      "cache": false,
      "conditions": [
        {
          "condition":"UDF",
          "funcname":"concat",
          "value": [ "col1","col2"],
          "column": "testcolumnUDF",
          "datatype": "string"
        },
        {
          "condition":"UDF",
          "funcname":"regex_replace",
          "value": ["col6"],
          "column": "regexcolumnUDF",
          "aggrcondition": "N.*,vivek",
          "datatype": "string"
        },
        {
          "condition":"normalise",
          "funcname":"zeroscore",
          "value": ["col10"],
          "column": "normcol10",
          "datatype": "Long"
        },
        {
          "condition":"normalise",
          "funcname":"zscore",
          "value": ["col10"],
          "column": "zcol10",
          "datatype": "Long"
        }
      ]
    },
    {
      "name": "SampleData-Filter",
      "type": "filter",
      "source": "SampleData",
      "priority": 3,
      "cache": false,
      "conditions": [
        {
          "condition": "Expression",
          "value": ["constant","col5 is null"],
          "datatype": "String"
        },
        {
          "condition": "RLIKE",
          "column":"col6",
          "value": ["constant","^N"],
          "datatype": "String"
        },
        {
          "condition": "DROPDUPLICATES",
          "value": ["constant","col6","col10"],
          "datatype": "String"
        },
        {
          "condition": "selectexpr",
          "value": ["constant","col6","col10 as none", "'5'", "col10*col10","*"],
          "datatype": "String"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 4,
      "cache": false,
      "conditions": [
        {
          "condition": "addcolumns",
          "aggrcondition": "testcolumn1:trim(col10):substrcol1:substr(now(),1,4):newcolumn:concat(testcolumn1,col4)"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 5,
      "cache": false,
      "conditions": [
        {
          "joinType": "Unionall"
        },
        {
          "joinType": "Intersect"
        }
      ]
    },
    {
      "name": "StratifiedData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 6,
      "cache": false,
      "conditions": [
        {
          "condition": "stratified",
          "value": ["0.002","I","0.8"],
          "column": "col2"
        }
      ]
    },
    
    {
      "name": "Groupdata",
      "type": "aggregate",
      "source": "StratifiedData",
      "priority": 7,
      "cache": false,
      "conditions": [
        {
          "condition": "groupby",
          "value": [ "col2"],
          "aggrcondition": "col2,count"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 8,
      "cache": false,
      "conditions": [
        {
          "condition": "renamecolumns",
          "aggrcondition": "testcolumn1,rename1_vivek,substrcol1,rename2,newcolumn,rename3"
        },
        {
          "condition": "reordercolumns",
          "aggrcondition": "rename1_vivek,rename2,rename3"
        },
        {
          "condition": "replacecolumns",
          "aggrcondition": "rename3:5"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 9,
      "cache": false,
      "conditions": [
        {
          "joinType": "left_outer",
          "leftcolumn": "rename1_vivek,rename2,rename3",
          "condition": "not equal",
          "rightcolumn": "rename1_vivek,rename2",
          "dropcolumn": "rename2"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 10,
      "cache": false,
      "conditions": [
        {
          "condition": "renameallcolumns",
          "aggrcondition": "sample_"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol:null"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol2:case when(newcol == null) then 'not null' else 'null value' END"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol1:if(newcol2='not null',0,5*5)"
        }
      ]
    }
  ],
  "outputs": [
    {
      "name": "BASE_TABLE",
      "location": "/Users/vsingh007c/Downloads/",
      "sources": ["UDFData-Filter-newCol"],
      "format": "parquet",
      "savemode": "append",
      "selectedColumns": []
    }
  ]
}

Soon we will publish readme and open source the framework code. If you have any requirement/use case where we want to migrate data processing or migrate you old SQL to spark feel free to contact us at support@arrahtech.com

Advertisements

Disruptive ETL – Next Frontier of ETL

Both proprietary and open source ETL tools have been there for decades and have been doing fine. Some of data integration projects were successful while other failed. More than tools, I would blame data architects and company culture for failure – as some of them have been very secretive about data while other did not want to share. ETL tools were doing what they were supposed to do – Extraction from data sources, Transformation into staging area and Load into warehouse schema

Now the million dollar question would be – what next ? Technology is changing, storage is cheap, massive parallel processing is possible, reporting is becoming schema less and search based – so what will be future of ETL ? To answer this question, first we need to analyse the ETL features today as most of so called innovations are either extension of new features or fulfill new requirements. Contemporary ETLs tools are focused on :

1.) Extraction from many data source in batch mode – i.e full load, incremental load, redistributed load etc.
2.) Very heavy on Transformation – i.e Year till date (YTD), Slow changing dimensions, mapping, aggregates etc.
3.) Load into warehouse – i.e star schema, fact tables, dimension tables, aggregate table etc.

So what to going to change in next couple of year. Let proceed in reverse order to start with downstream consumers

Load into warehouse : With the advent of data lake concept as against datawarehouse , Hadoop and nosql as storage as against to RDBMS, and schema-less reporting against cubes and dimensional modelling ,this is certainly going to change. Data architects certainly will not want to silos their data into pre-built schema and want to give more flexibility to end users.
Data scientists do not like aggregation because granularity and lots of information is lost. They hate taking feed from data marts or data warehouse.

Coming days, ETL will focus loading data into datalake kind of system which is metadata and tag driven and will focus less on pre-built schema and aggregation load.

Very heavy on Transformation : This is bread and butter of contemporary ETL tools. They do all kind of transformations but going forward probably all they is not needed. Lot more transformation and formatting will be done by reporting layer and reporting layer will also be built to process massive and big data, hence the aggregation transformation will be redundant.

Coming days, ETL tools will be focusing more on data quality and data munging.

Extraction from many data source in batch mode: I do not see many changes there as data sources keep adding and we need to extract data from there. ETL tools will add new adapters to take real time feeds and data stream. There are tools which already have build adapters and working on it.

I am sure ETL tool will reinvent themselves and adapt to new changes.

ETL will become EQMP Extraction, Quality, Munging and Publishing

Vivek Singh is data architect, Evangelist and main contributor of osDQ – Open Source Data Quality and Profiling